parquet/file/metadata/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::file::metadata::thrift::FileMeta;
19use crate::file::metadata::{
20    ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData,
21};
22use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
23use crate::{
24    basic::ColumnOrder,
25    file::metadata::{FileMetaData, ParquetMetaDataBuilder},
26};
27#[cfg(feature = "encryption")]
28use crate::{
29    encryption::{
30        encrypt::{FileEncryptor, encrypt_thrift_object, write_signed_plaintext_thrift_object},
31        modules::{ModuleType, create_footer_aad, create_module_aad},
32    },
33    file::column_crypto_metadata::ColumnCryptoMetaData,
34    file::metadata::thrift::encryption::{AesGcmV1, EncryptionAlgorithm, FileCryptoMetaData},
35};
36use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData};
37
38use crate::{
39    file::writer::{TrackedWrite, get_file_magic},
40    parquet_thrift::WriteThrift,
41};
42use crate::{
43    file::{
44        metadata::{KeyValue, ParquetMetaData},
45        page_index::offset_index::OffsetIndexMetaData,
46    },
47    parquet_thrift::ThriftCompactOutputProtocol,
48};
49use std::io::Write;
50use std::sync::Arc;
51
52/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
53///
54/// See [`ParquetMetaDataWriter`] for background and example.
55pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
56    buf: &'a mut TrackedWrite<W>,
57    schema_descr: &'a SchemaDescPtr,
58    row_groups: Vec<RowGroupMetaData>,
59    column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
60    offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
61    key_value_metadata: Option<Vec<KeyValue>>,
62    created_by: Option<String>,
63    object_writer: MetadataObjectWriter,
64    writer_version: i32,
65}
66
67impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
68    /// Serialize all the offset indexes to `self.buf`,
69    ///
70    /// Note: also updates the `ColumnChunk::offset_index_offset` and
71    /// `ColumnChunk::offset_index_length` to reflect the position and length
72    /// of the serialized offset indexes.
73    fn write_offset_indexes(
74        &mut self,
75        offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
76    ) -> Result<()> {
77        // iter row group
78        // iter each column
79        // write offset index to the file
80        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
81            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
82                if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
83                    let start_offset = self.buf.bytes_written();
84                    self.object_writer.write_offset_index(
85                        offset_index,
86                        column_metadata,
87                        row_group_idx,
88                        column_idx,
89                        &mut self.buf,
90                    )?;
91                    let end_offset = self.buf.bytes_written();
92                    // set offset and index for offset index
93                    column_metadata.offset_index_offset = Some(start_offset as i64);
94                    column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
95                }
96            }
97        }
98        Ok(())
99    }
100
101    /// Serialize all the column indexes to the `self.buf`
102    ///
103    /// Note: also updates the `ColumnChunk::column_index_offset` and
104    /// `ColumnChunk::column_index_length` to reflect the position and length
105    /// of the serialized column indexes.
106    fn write_column_indexes(
107        &mut self,
108        column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
109    ) -> Result<()> {
110        // iter row group
111        // iter each column
112        // write column index to the file
113        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
114            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
115                if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
116                    let start_offset = self.buf.bytes_written();
117                    // only update column_metadata if the write succeeds
118                    if self.object_writer.write_column_index(
119                        column_index,
120                        column_metadata,
121                        row_group_idx,
122                        column_idx,
123                        &mut self.buf,
124                    )? {
125                        let end_offset = self.buf.bytes_written();
126                        // set offset and index for offset index
127                        column_metadata.column_index_offset = Some(start_offset as i64);
128                        column_metadata.column_index_length =
129                            Some((end_offset - start_offset) as i32);
130                    }
131                }
132            }
133        }
134        Ok(())
135    }
136
137    /// Serialize the column indexes and transform to `Option<ParquetColumnIndex>`
138    fn finalize_column_indexes(&mut self) -> Result<Option<ParquetColumnIndex>> {
139        let column_indexes = std::mem::take(&mut self.column_indexes);
140
141        // Write column indexes to file
142        if let Some(column_indexes) = column_indexes.as_ref() {
143            self.write_column_indexes(column_indexes)?;
144        }
145
146        // check to see if the index is `None` for every row group and column chunk
147        let all_none = column_indexes
148            .as_ref()
149            .is_some_and(|ci| ci.iter().all(|cii| cii.iter().all(|idx| idx.is_none())));
150
151        // transform from Option<Vec<Vec<Option<ColumnIndexMetaData>>>> to
152        // Option<Vec<Vec<ColumnIndexMetaData>>>
153        let column_indexes: Option<ParquetColumnIndex> = if all_none {
154            None
155        } else {
156            column_indexes.map(|ovvi| {
157                ovvi.into_iter()
158                    .map(|vi| {
159                        vi.into_iter()
160                            .map(|ci| ci.unwrap_or(ColumnIndexMetaData::NONE))
161                            .collect()
162                    })
163                    .collect()
164            })
165        };
166
167        Ok(column_indexes)
168    }
169
170    /// Serialize the offset indexes and transform to `Option<ParquetOffsetIndex>`
171    fn finalize_offset_indexes(&mut self) -> Result<Option<ParquetOffsetIndex>> {
172        let offset_indexes = std::mem::take(&mut self.offset_indexes);
173
174        // Write offset indexes to file
175        if let Some(offset_indexes) = offset_indexes.as_ref() {
176            self.write_offset_indexes(offset_indexes)?;
177        }
178
179        // check to see if the index is `None` for every row group and column chunk
180        let all_none = offset_indexes
181            .as_ref()
182            .is_some_and(|oi| oi.iter().all(|oii| oii.iter().all(|idx| idx.is_none())));
183
184        let offset_indexes: Option<ParquetOffsetIndex> = if all_none {
185            None
186        } else {
187            // FIXME(ets): this will panic if there's a missing index.
188            offset_indexes.map(|ovvi| {
189                ovvi.into_iter()
190                    .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
191                    .collect()
192            })
193        };
194
195        Ok(offset_indexes)
196    }
197
198    /// Assembles and writes the final metadata to self.buf
199    pub fn finish(mut self) -> Result<ParquetMetaData> {
200        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
201
202        // serialize page indexes and transform to the proper form for use in ParquetMetaData
203        let column_indexes = self.finalize_column_indexes()?;
204        let offset_indexes = self.finalize_offset_indexes()?;
205
206        // We only include ColumnOrder for leaf nodes.
207        // Currently only supported ColumnOrder is TypeDefinedOrder so we set this
208        // for all leaf nodes.
209        // Even if the column has an undefined sort order, such as INTERVAL, this
210        // is still technically the defined TYPEORDER so it should still be set.
211        let column_orders = self
212            .schema_descr
213            .columns()
214            .iter()
215            .map(|col| {
216                let sort_order = ColumnOrder::sort_order_for_type(
217                    col.logical_type_ref(),
218                    col.converted_type(),
219                    col.physical_type(),
220                );
221                ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
222            })
223            .collect();
224
225        // This field is optional, perhaps in cases where no min/max fields are set
226        // in any Statistics or ColumnIndex object in the whole file.
227        // But for simplicity we always set this field.
228        let column_orders = Some(column_orders);
229
230        let (row_groups, unencrypted_row_groups) = self
231            .object_writer
232            .apply_row_group_encryption(self.row_groups)?;
233
234        #[cfg(feature = "encryption")]
235        let (encryption_algorithm, footer_signing_key_metadata) =
236            self.object_writer.get_plaintext_footer_crypto_metadata();
237        #[cfg(feature = "encryption")]
238        let file_metadata = FileMetaData::new(
239            self.writer_version,
240            num_rows,
241            self.created_by,
242            self.key_value_metadata,
243            self.schema_descr.clone(),
244            column_orders,
245        )
246        .with_encryption_algorithm(encryption_algorithm)
247        .with_footer_signing_key_metadata(footer_signing_key_metadata);
248
249        #[cfg(not(feature = "encryption"))]
250        let file_metadata = FileMetaData::new(
251            self.writer_version,
252            num_rows,
253            self.created_by,
254            self.key_value_metadata,
255            self.schema_descr.clone(),
256            column_orders,
257        );
258
259        let file_meta = FileMeta {
260            file_metadata: &file_metadata,
261            row_groups: &row_groups,
262        };
263
264        // Write file metadata
265        let start_pos = self.buf.bytes_written();
266        self.object_writer
267            .write_file_metadata(&file_meta, &mut self.buf)?;
268        let end_pos = self.buf.bytes_written();
269
270        // Write footer
271        let metadata_len = (end_pos - start_pos) as u32;
272
273        self.buf.write_all(&metadata_len.to_le_bytes())?;
274        self.buf.write_all(self.object_writer.get_file_magic())?;
275
276        // If row group metadata was encrypted, we replace the encrypted row groups with
277        // unencrypted metadata before it is returned to users. This allows the metadata
278        // to be usable for retrieving the row group statistics for example, without users
279        // needing to decrypt the metadata.
280        let builder = ParquetMetaDataBuilder::new(file_metadata)
281            .set_column_index(column_indexes)
282            .set_offset_index(offset_indexes);
283
284        Ok(match unencrypted_row_groups {
285            Some(rg) => builder.set_row_groups(rg).build(),
286            None => builder.set_row_groups(row_groups).build(),
287        })
288    }
289
290    pub fn new(
291        buf: &'a mut TrackedWrite<W>,
292        schema_descr: &'a SchemaDescPtr,
293        row_groups: Vec<RowGroupMetaData>,
294        created_by: Option<String>,
295        writer_version: i32,
296    ) -> Self {
297        Self {
298            buf,
299            schema_descr,
300            row_groups,
301            column_indexes: None,
302            offset_indexes: None,
303            key_value_metadata: None,
304            created_by,
305            object_writer: Default::default(),
306            writer_version,
307        }
308    }
309
310    pub fn with_column_indexes(
311        mut self,
312        column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
313    ) -> Self {
314        self.column_indexes = Some(column_indexes);
315        self
316    }
317
318    pub fn with_offset_indexes(
319        mut self,
320        offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
321    ) -> Self {
322        self.offset_indexes = Some(offset_indexes);
323        self
324    }
325
326    pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
327        self.key_value_metadata = Some(key_value_metadata);
328        self
329    }
330
331    #[cfg(feature = "encryption")]
332    pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
333        self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
334        self
335    }
336}
337
338/// Writes [`ParquetMetaData`] to a byte stream
339///
340/// This structure handles the details of writing the various parts of Parquet
341/// metadata into a byte stream. It is used to write the metadata into a parquet
342/// file and can also write metadata into other locations (such as a store of
343/// bytes).
344///
345/// # Discussion
346///
347/// The process of writing Parquet metadata is tricky because the
348/// metadata is not stored as a single inline thrift structure. It can have
349/// several "out of band" structures such as the [`OffsetIndex`] and
350/// BloomFilters stored in separate structures whose locations are stored as
351/// offsets from the beginning of the file.
352///
353/// Note: this writer does not directly write BloomFilters. In order to write
354/// BloomFilters, write the bloom filters into the buffer before creating the
355/// metadata writer. Then set the corresponding `bloom_filter_offset` and
356/// `bloom_filter_length` on [`ColumnChunkMetaData`] passed to this writer.
357///
358/// # Output Format
359///
360/// The format of the metadata is as follows:
361///
362/// 1. Optional [`ColumnIndex`] (thrift encoded)
363/// 2. Optional [`OffsetIndex`] (thrift encoded)
364/// 3. [`FileMetaData`] (thrift encoded)
365/// 4. Length of encoded `FileMetaData` (4 bytes, little endian)
366/// 5. Parquet Magic Bytes (4 bytes)
367///
368/// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
369/// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData
370/// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
371/// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
372///
373/// ```text
374/// ┌──────────────────────┐
375/// │                      │
376/// │         ...          │
377/// │                      │
378/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
379/// │     ColumnIndex     ◀│─ ─ ─
380/// ││    (Optional)     │ │     │
381/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │
382/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │     │ FileMetadata
383/// │     OffsetIndex      │       contains embedded
384/// ││    (Optional)     │◀┼ ─   │ offsets to
385/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │  │    ColumnIndex and
386/// │╔═══════════════════╗ │     │ OffsetIndex
387/// │║                   ║ │  │
388/// │║                   ║ ┼ ─   │
389/// │║   FileMetadata    ║ │
390/// │║                   ║ ┼ ─ ─ ┘
391/// │║                   ║ │
392/// │╚═══════════════════╝ │
393/// │┌───────────────────┐ │
394/// ││  metadata length  │ │ length of FileMetadata  (only)
395/// │└───────────────────┘ │
396/// │┌───────────────────┐ │
397/// ││      'PAR1'       │ │ Parquet Magic Bytes
398/// │└───────────────────┘ │
399/// └──────────────────────┘
400///      Output Buffer
401/// ```
402///
403/// # Example
404/// ```no_run
405/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataWriter};
406/// # fn get_metadata() -> ParquetMetaData { unimplemented!(); }
407/// // write parquet metadata to an in-memory buffer
408/// let mut buffer = vec![];
409/// let metadata: ParquetMetaData = get_metadata();
410/// let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
411/// // write the metadata to the buffer
412/// writer.finish().unwrap();
413/// assert!(!buffer.is_empty());
414/// ```
415pub struct ParquetMetaDataWriter<'a, W: Write> {
416    buf: TrackedWrite<W>,
417    metadata: &'a ParquetMetaData,
418}
419
420impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
421    /// Create a new `ParquetMetaDataWriter` to write to `buf`
422    ///
423    /// Note any embedded offsets in the metadata will be written assuming the
424    /// metadata is at the start of the buffer. If the metadata is being written
425    /// to a location other than the start of the buffer, see [`Self::new_with_tracked`]
426    ///
427    /// See example on the struct level documentation
428    pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
429        Self::new_with_tracked(TrackedWrite::new(buf), metadata)
430    }
431
432    /// Create a new ParquetMetaDataWriter to write to `buf`
433    ///
434    /// This method is used when the metadata is being written to a location other
435    /// than the start of the buffer.
436    ///
437    /// See example on the struct level documentation
438    pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
439        Self { buf, metadata }
440    }
441
442    /// Write the metadata to the buffer
443    pub fn finish(mut self) -> Result<()> {
444        let file_metadata = self.metadata.file_metadata();
445
446        let schema = Arc::new(file_metadata.schema().clone());
447        let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
448        let created_by = file_metadata.created_by().map(str::to_string);
449
450        let row_groups = self.metadata.row_groups.clone();
451
452        let key_value_metadata = file_metadata.key_value_metadata().cloned();
453
454        let column_indexes = self.convert_column_indexes();
455        let offset_indexes = self.convert_offset_index();
456
457        let mut encoder = ThriftMetadataWriter::new(
458            &mut self.buf,
459            &schema_descr,
460            row_groups,
461            created_by,
462            file_metadata.version(),
463        );
464
465        if let Some(column_indexes) = column_indexes {
466            encoder = encoder.with_column_indexes(column_indexes);
467        }
468
469        if let Some(offset_indexes) = offset_indexes {
470            encoder = encoder.with_offset_indexes(offset_indexes);
471        }
472
473        if let Some(key_value_metadata) = key_value_metadata {
474            encoder = encoder.with_key_value_metadata(key_value_metadata);
475        }
476        encoder.finish()?;
477
478        Ok(())
479    }
480
481    fn convert_column_indexes(&self) -> Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
482        // TODO(ets): we're converting from ParquetColumnIndex to vec<vec<option>>,
483        // but then converting back to ParquetColumnIndex in the end. need to unify this.
484        self.metadata
485            .column_index()
486            .map(|row_group_column_indexes| {
487                (0..self.metadata.row_groups().len())
488                    .map(|rg_idx| {
489                        let column_indexes = &row_group_column_indexes[rg_idx];
490                        column_indexes
491                            .iter()
492                            .map(|column_index| Some(column_index.clone()))
493                            .collect()
494                    })
495                    .collect()
496            })
497    }
498
499    fn convert_offset_index(&self) -> Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
500        self.metadata
501            .offset_index()
502            .map(|row_group_offset_indexes| {
503                (0..self.metadata.row_groups().len())
504                    .map(|rg_idx| {
505                        let offset_indexes = &row_group_offset_indexes[rg_idx];
506                        offset_indexes
507                            .iter()
508                            .map(|offset_index| Some(offset_index.clone()))
509                            .collect()
510                    })
511                    .collect()
512            })
513    }
514}
515
516#[derive(Debug, Default)]
517struct MetadataObjectWriter {
518    #[cfg(feature = "encryption")]
519    file_encryptor: Option<Arc<FileEncryptor>>,
520}
521
522impl MetadataObjectWriter {
523    #[inline]
524    fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> Result<()> {
525        let mut protocol = ThriftCompactOutputProtocol::new(sink);
526        object.write_thrift(&mut protocol)?;
527        Ok(())
528    }
529}
530
531/// Implementations of [`MetadataObjectWriter`] methods for when encryption is disabled
532#[cfg(not(feature = "encryption"))]
533impl MetadataObjectWriter {
534    /// Write [`FileMetaData`] in Thrift format
535    ///
536    /// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
537    fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) -> Result<()> {
538        Self::write_thrift_object(file_metadata, sink)
539    }
540
541    /// Write a column [`OffsetIndex`] in Thrift format
542    ///
543    /// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
544    fn write_offset_index(
545        &self,
546        offset_index: &OffsetIndexMetaData,
547        _column_chunk: &ColumnChunkMetaData,
548        _row_group_idx: usize,
549        _column_idx: usize,
550        sink: impl Write,
551    ) -> Result<()> {
552        Self::write_thrift_object(offset_index, sink)
553    }
554
555    /// Write a column [`ColumnIndex`] in Thrift format
556    ///
557    /// If `column_index` is [`ColumnIndexMetaData::NONE`] the index will not be written and
558    /// this will return `false`. Returns `true` otherwise.
559    ///
560    /// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
561    fn write_column_index(
562        &self,
563        column_index: &ColumnIndexMetaData,
564        _column_chunk: &ColumnChunkMetaData,
565        _row_group_idx: usize,
566        _column_idx: usize,
567        sink: impl Write,
568    ) -> Result<bool> {
569        match column_index {
570            // Missing indexes may also have the placeholder ColumnIndexMetaData::NONE
571            ColumnIndexMetaData::NONE => Ok(false),
572            _ => {
573                Self::write_thrift_object(column_index, sink)?;
574                Ok(true)
575            }
576        }
577    }
578
579    /// No-op implementation of row-group metadata encryption
580    fn apply_row_group_encryption(
581        &self,
582        row_groups: Vec<RowGroupMetaData>,
583    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
584        Ok((row_groups, None))
585    }
586
587    /// Get the "magic" bytes identifying the file type
588    pub fn get_file_magic(&self) -> &[u8; 4] {
589        get_file_magic()
590    }
591}
592
593/// Implementations of [`MetadataObjectWriter`] methods that rely on encryption being enabled
594#[cfg(feature = "encryption")]
595impl MetadataObjectWriter {
596    /// Set the file encryptor to use
597    fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
598        self.file_encryptor = encryptor;
599        self
600    }
601
602    /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if required
603    ///
604    /// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
605    fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl Write) -> Result<()> {
606        match self.file_encryptor.as_ref() {
607            Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
608                // First write FileCryptoMetadata
609                let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
610                let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
611                crypto_metadata.write_thrift(&mut protocol)?;
612
613                // Then write encrypted footer
614                let aad = create_footer_aad(file_encryptor.file_aad())?;
615                let mut encryptor = file_encryptor.get_footer_encryptor()?;
616                encrypt_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
617            }
618            Some(file_encryptor) if file_metadata.file_metadata.encryption_algorithm.is_some() => {
619                let aad = create_footer_aad(file_encryptor.file_aad())?;
620                let mut encryptor = file_encryptor.get_footer_encryptor()?;
621                write_signed_plaintext_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
622            }
623            _ => Self::write_thrift_object(file_metadata, &mut sink),
624        }
625    }
626
627    /// Write a column [`OffsetIndex`] in Thrift format, possibly encrypting it if required
628    ///
629    /// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
630    fn write_offset_index(
631        &self,
632        offset_index: &OffsetIndexMetaData,
633        column_chunk: &ColumnChunkMetaData,
634        row_group_idx: usize,
635        column_idx: usize,
636        sink: impl Write,
637    ) -> Result<()> {
638        match &self.file_encryptor {
639            Some(file_encryptor) => Self::write_thrift_object_with_encryption(
640                offset_index,
641                sink,
642                file_encryptor,
643                column_chunk,
644                ModuleType::OffsetIndex,
645                row_group_idx,
646                column_idx,
647            ),
648            None => Self::write_thrift_object(offset_index, sink),
649        }
650    }
651
652    /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting it if required
653    ///
654    /// If `column_index` is [`ColumnIndexMetaData::NONE`] the index will not be written and
655    /// this will return `false`. Returns `true` otherwise.
656    ///
657    /// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
658    fn write_column_index(
659        &self,
660        column_index: &ColumnIndexMetaData,
661        column_chunk: &ColumnChunkMetaData,
662        row_group_idx: usize,
663        column_idx: usize,
664        sink: impl Write,
665    ) -> Result<bool> {
666        match column_index {
667            // Missing indexes may also have the placeholder ColumnIndexMetaData::NONE
668            ColumnIndexMetaData::NONE => Ok(false),
669            _ => {
670                match &self.file_encryptor {
671                    Some(file_encryptor) => Self::write_thrift_object_with_encryption(
672                        column_index,
673                        sink,
674                        file_encryptor,
675                        column_chunk,
676                        ModuleType::ColumnIndex,
677                        row_group_idx,
678                        column_idx,
679                    )?,
680                    None => Self::write_thrift_object(column_index, sink)?,
681                }
682                Ok(true)
683            }
684        }
685    }
686
687    /// If encryption is enabled and configured, encrypt row group metadata.
688    /// Returns a tuple of the row group metadata to write,
689    /// and possibly unencrypted metadata to be returned to clients if data was encrypted.
690    fn apply_row_group_encryption(
691        &self,
692        row_groups: Vec<RowGroupMetaData>,
693    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
694        match &self.file_encryptor {
695            Some(file_encryptor) => {
696                let unencrypted_row_groups = row_groups.clone();
697                let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
698                Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
699            }
700            None => Ok((row_groups, None)),
701        }
702    }
703
704    /// Get the "magic" bytes identifying the file type
705    fn get_file_magic(&self) -> &[u8; 4] {
706        get_file_magic(
707            self.file_encryptor
708                .as_ref()
709                .map(|encryptor| encryptor.properties()),
710        )
711    }
712
713    fn write_thrift_object_with_encryption(
714        object: &impl WriteThrift,
715        mut sink: impl Write,
716        file_encryptor: &FileEncryptor,
717        column_metadata: &ColumnChunkMetaData,
718        module_type: ModuleType,
719        row_group_index: usize,
720        column_index: usize,
721    ) -> Result<()> {
722        let column_path_vec = column_metadata.column_path().as_ref();
723
724        let joined_column_path;
725        let column_path = if column_path_vec.len() == 1 {
726            &column_path_vec[0]
727        } else {
728            joined_column_path = column_path_vec.join(".");
729            &joined_column_path
730        };
731
732        if file_encryptor.is_column_encrypted(column_path) {
733            use crate::encryption::encrypt::encrypt_thrift_object;
734
735            let aad = create_module_aad(
736                file_encryptor.file_aad(),
737                module_type,
738                row_group_index,
739                column_index,
740                None,
741            )?;
742            let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
743            encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
744        } else {
745            Self::write_thrift_object(object, sink)
746        }
747    }
748
749    fn get_plaintext_footer_crypto_metadata(
750        &self,
751    ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
752        // Only plaintext footers may contain encryption algorithm and footer key metadata.
753        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
754            let encryption_properties = file_encryptor.properties();
755            if !encryption_properties.encrypt_footer() {
756                return (
757                    Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
758                    encryption_properties.footer_key_metadata().cloned(),
759                );
760            }
761        }
762        (None, None)
763    }
764
765    fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
766        let supply_aad_prefix = file_encryptor
767            .properties()
768            .aad_prefix()
769            .map(|_| !file_encryptor.properties().store_aad_prefix());
770        let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
771            file_encryptor.properties().aad_prefix()
772        } else {
773            None
774        };
775        EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
776            aad_prefix: aad_prefix.cloned(),
777            aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
778            supply_aad_prefix,
779        })
780    }
781
782    fn file_crypto_metadata(file_encryptor: &'_ FileEncryptor) -> Result<FileCryptoMetaData<'_>> {
783        let properties = file_encryptor.properties();
784        Ok(FileCryptoMetaData {
785            encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
786            key_metadata: properties.footer_key_metadata().map(|v| v.as_slice()),
787        })
788    }
789
790    fn encrypt_row_groups(
791        row_groups: Vec<RowGroupMetaData>,
792        file_encryptor: &Arc<FileEncryptor>,
793    ) -> Result<Vec<RowGroupMetaData>> {
794        row_groups
795            .into_iter()
796            .enumerate()
797            .map(|(rg_idx, mut rg)| {
798                let cols: Result<Vec<ColumnChunkMetaData>> = rg
799                    .columns
800                    .into_iter()
801                    .enumerate()
802                    .map(|(col_idx, c)| {
803                        Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
804                    })
805                    .collect();
806                rg.columns = cols?;
807                Ok(rg)
808            })
809            .collect()
810    }
811
812    /// Apply column encryption to column chunk metadata
813    fn encrypt_column_chunk(
814        mut column_chunk: ColumnChunkMetaData,
815        file_encryptor: &Arc<FileEncryptor>,
816        row_group_index: usize,
817        column_index: usize,
818    ) -> Result<ColumnChunkMetaData> {
819        // Column crypto metadata should have already been set when the column was created.
820        // Here we apply the encryption by encrypting the column metadata if required.
821        match column_chunk.column_crypto_metadata.as_deref() {
822            None => {}
823            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
824                // When uniform encryption is used the footer is already encrypted,
825                // so the column chunk does not need additional encryption.
826            }
827            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
828                use crate::file::metadata::thrift::serialize_column_meta_data;
829
830                let column_path = col_key.path_in_schema.join(".");
831                let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
832                let aad = create_module_aad(
833                    file_encryptor.file_aad(),
834                    ModuleType::ColumnMetaData,
835                    row_group_index,
836                    column_index,
837                    None,
838                )?;
839                // create temp ColumnMetaData that we can encrypt
840                let mut buffer: Vec<u8> = vec![];
841                {
842                    let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
843                    serialize_column_meta_data(&column_chunk, &mut prot)?;
844                }
845                let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;
846
847                column_chunk.encrypted_column_metadata = Some(ciphertext);
848            }
849        }
850
851        Ok(column_chunk)
852    }
853}