parquet/file/metadata/
thrift_gen.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// a collection of generated structs used to parse thrift metadata
19
20use std::io::Write;
21use std::sync::Arc;
22
23use crate::{
24    basic::{
25        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, PageType, Repetition, Type,
26    },
27    data_type::{ByteArray, FixedLenByteArray, Int96},
28    errors::{ParquetError, Result},
29    file::{
30        metadata::{
31            ColumnChunkMetaData, KeyValue, LevelHistogram, PageEncodingStats, ParquetMetaData,
32            RowGroupMetaData, SortingColumn,
33        },
34        statistics::ValueStatistics,
35    },
36    parquet_thrift::{
37        read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
38        ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
39    },
40    schema::types::{
41        num_nodes, parquet_schema_from_array, ColumnDescriptor, SchemaDescriptor, TypePtr,
42    },
43    thrift_struct, thrift_union,
44    util::bit_util::FromBytes,
45};
46#[cfg(feature = "encryption")]
47use crate::{
48    encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
49    file::column_crypto_metadata::ColumnCryptoMetaData,
50    parquet_thrift::ThriftSliceInputProtocol,
51    schema::types::SchemaDescPtr,
52};
53
54// this needs to be visible to the schema conversion code
55thrift_struct!(
56pub(crate) struct SchemaElement<'a> {
57  /// Data type for this field. Not set if the current element is a non-leaf node
58  1: optional Type r#type;
59  /// If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values.
60  /// Otherwise, if specified, this is the maximum bit length to store any of the values.
61  /// (e.g. a low cardinality INT col could have this set to 3).  Note that this is
62  /// in the schema, and therefore fixed for the entire file.
63  2: optional i32 type_length;
64  /// Repetition of the field. The root of the schema does not have a repetition_type.
65  /// All other nodes must have one.
66  3: optional Repetition repetition_type;
67  /// Name of the field in the schema
68  4: required string<'a> name;
69  /// Nested fields. Since thrift does not support nested fields,
70  /// the nesting is flattened to a single list by a depth-first traversal.
71  /// The children count is used to construct the nested relationship.
72  /// This field is not set when the element is a primitive type.
73  5: optional i32 num_children;
74  /// DEPRECATED: When the schema is the result of a conversion from another model.
75  /// Used to record the original type to help with cross conversion.
76  ///
77  /// This is superseded by logical_type.
78  6: optional ConvertedType converted_type;
79  /// DEPRECATED: Used when this column contains decimal data.
80  /// See the DECIMAL converted type for more details.
81  ///
82  /// This is superseded by using the DecimalType annotation in logical_type.
83  7: optional i32 scale
84  8: optional i32 precision
85  /// When the original schema supports field ids, this will save the
86  /// original field id in the parquet schema
87  9: optional i32 field_id;
88  /// The logical type of this SchemaElement
89  ///
90  /// LogicalType replaces ConvertedType, but ConvertedType is still required
91  /// for some logical types to ensure forward-compatibility in format v1.
92  10: optional LogicalType logical_type
93}
94);
95
96thrift_struct!(
97pub(crate) struct AesGcmV1 {
98  /// AAD prefix
99  1: optional binary aad_prefix
100
101  /// Unique file identifier part of AAD suffix
102  2: optional binary aad_file_unique
103
104  /// In files encrypted with AAD prefix without storing it,
105  /// readers must supply the prefix
106  3: optional bool supply_aad_prefix
107}
108);
109
110thrift_struct!(
111pub(crate) struct AesGcmCtrV1 {
112  /// AAD prefix
113  1: optional binary aad_prefix
114
115  /// Unique file identifier part of AAD suffix
116  2: optional binary aad_file_unique
117
118  /// In files encrypted with AAD prefix without storing it,
119  /// readers must supply the prefix
120  3: optional bool supply_aad_prefix
121}
122);
123
124thrift_union!(
125union EncryptionAlgorithm {
126  1: (AesGcmV1) AES_GCM_V1
127  2: (AesGcmCtrV1) AES_GCM_CTR_V1
128}
129);
130
131#[cfg(feature = "encryption")]
132thrift_struct!(
133/// Crypto metadata for files with encrypted footer
134pub(crate) struct FileCryptoMetaData<'a> {
135  /// Encryption algorithm. This field is only used for files
136  /// with encrypted footer. Files with plaintext footer store algorithm id
137  /// inside footer (FileMetaData structure).
138  1: required EncryptionAlgorithm encryption_algorithm
139
140  /// Retrieval metadata of key used for encryption of footer,
141  /// and (possibly) columns.
142  2: optional binary<'a> key_metadata
143}
144);
145
146// the following are only used internally so are private
147thrift_struct!(
148struct FileMetaData<'a> {
149  1: required i32 version
150  2: required list<'a><SchemaElement> schema;
151  3: required i64 num_rows
152  4: required list<'a><RowGroup> row_groups
153  5: optional list<KeyValue> key_value_metadata
154  6: optional string<'a> created_by
155  7: optional list<ColumnOrder> column_orders;
156  8: optional EncryptionAlgorithm encryption_algorithm
157  9: optional binary<'a> footer_signing_key_metadata
158}
159);
160
161thrift_struct!(
162struct RowGroup<'a> {
163  1: required list<'a><ColumnChunk> columns
164  2: required i64 total_byte_size
165  3: required i64 num_rows
166  4: optional list<SortingColumn> sorting_columns
167  5: optional i64 file_offset
168  // we don't expose total_compressed_size so skip
169  //6: optional i64 total_compressed_size
170  7: optional i16 ordinal
171}
172);
173
174#[cfg(feature = "encryption")]
175thrift_struct!(
176struct ColumnChunk<'a> {
177  1: optional string<'a> file_path
178  2: required i64 file_offset = 0
179  3: optional ColumnMetaData<'a> meta_data
180  4: optional i64 offset_index_offset
181  5: optional i32 offset_index_length
182  6: optional i64 column_index_offset
183  7: optional i32 column_index_length
184  8: optional ColumnCryptoMetaData crypto_metadata
185  9: optional binary<'a> encrypted_column_metadata
186}
187);
188#[cfg(not(feature = "encryption"))]
189thrift_struct!(
190struct ColumnChunk<'a> {
191  1: optional string<'a> file_path
192  2: required i64 file_offset = 0
193  3: optional ColumnMetaData<'a> meta_data
194  4: optional i64 offset_index_offset
195  5: optional i32 offset_index_length
196  6: optional i64 column_index_offset
197  7: optional i32 column_index_length
198}
199);
200
201type CompressionCodec = Compression;
202thrift_struct!(
203struct ColumnMetaData<'a> {
204  1: required Type r#type
205  2: required list<Encoding> encodings
206  // we don't expose path_in_schema so skip
207  //3: required list<string> path_in_schema
208  4: required CompressionCodec codec
209  5: required i64 num_values
210  6: required i64 total_uncompressed_size
211  7: required i64 total_compressed_size
212  // we don't expose key_value_metadata so skip
213  //8: optional list<KeyValue> key_value_metadata
214  9: required i64 data_page_offset
215  10: optional i64 index_page_offset
216  11: optional i64 dictionary_page_offset
217  12: optional Statistics<'a> statistics
218  13: optional list<PageEncodingStats> encoding_stats;
219  14: optional i64 bloom_filter_offset;
220  15: optional i32 bloom_filter_length;
221  16: optional SizeStatistics size_statistics;
222  17: optional GeospatialStatistics geospatial_statistics;
223}
224);
225
226thrift_struct!(
227struct BoundingBox {
228  1: required double xmin;
229  2: required double xmax;
230  3: required double ymin;
231  4: required double ymax;
232  5: optional double zmin;
233  6: optional double zmax;
234  7: optional double mmin;
235  8: optional double mmax;
236}
237);
238
239thrift_struct!(
240struct GeospatialStatistics {
241  1: optional BoundingBox bbox;
242  2: optional list<i32> geospatial_types;
243}
244);
245
246thrift_struct!(
247struct SizeStatistics {
248   1: optional i64 unencoded_byte_array_data_bytes;
249   2: optional list<i64> repetition_level_histogram;
250   3: optional list<i64> definition_level_histogram;
251}
252);
253
254thrift_struct!(
255pub(crate) struct Statistics<'a> {
256   1: optional binary<'a> max;
257   2: optional binary<'a> min;
258   3: optional i64 null_count;
259   4: optional i64 distinct_count;
260   5: optional binary<'a> max_value;
261   6: optional binary<'a> min_value;
262   7: optional bool is_max_value_exact;
263   8: optional bool is_min_value_exact;
264}
265);
266
267// convert collection of thrift RowGroups into RowGroupMetaData
268fn convert_row_groups(
269    mut row_groups: Vec<RowGroup>,
270    schema_descr: Arc<SchemaDescriptor>,
271) -> Result<Vec<RowGroupMetaData>> {
272    let mut res: Vec<RowGroupMetaData> = Vec::with_capacity(row_groups.len());
273    for rg in row_groups.drain(0..) {
274        res.push(convert_row_group(rg, schema_descr.clone())?);
275    }
276
277    Ok(res)
278}
279
280fn convert_row_group(
281    row_group: RowGroup,
282    schema_descr: Arc<SchemaDescriptor>,
283) -> Result<RowGroupMetaData> {
284    if schema_descr.num_columns() != row_group.columns.len() {
285        return Err(general_err!(
286            "Column count mismatch. Schema has {} columns while Row Group has {}",
287            schema_descr.num_columns(),
288            row_group.columns.len()
289        ));
290    }
291
292    let num_rows = row_group.num_rows;
293    let sorting_columns = row_group.sorting_columns;
294    let total_byte_size = row_group.total_byte_size;
295    let file_offset = row_group.file_offset;
296    let ordinal = row_group.ordinal;
297
298    let columns = convert_columns(row_group.columns, schema_descr.clone())?;
299
300    Ok(RowGroupMetaData {
301        columns,
302        num_rows,
303        sorting_columns,
304        total_byte_size,
305        schema_descr,
306        file_offset,
307        ordinal,
308    })
309}
310
311fn convert_columns(
312    mut columns: Vec<ColumnChunk>,
313    schema_descr: Arc<SchemaDescriptor>,
314) -> Result<Vec<ColumnChunkMetaData>> {
315    let mut res: Vec<ColumnChunkMetaData> = Vec::with_capacity(columns.len());
316    for (c, d) in columns.drain(0..).zip(schema_descr.columns()) {
317        res.push(convert_column(c, d.clone())?);
318    }
319
320    Ok(res)
321}
322
323fn convert_column(
324    column: ColumnChunk,
325    column_descr: Arc<ColumnDescriptor>,
326) -> Result<ColumnChunkMetaData> {
327    if column.meta_data.is_none() {
328        return Err(general_err!("Expected to have column metadata"));
329    }
330    let col_metadata = column.meta_data.unwrap();
331    let column_type = col_metadata.r#type;
332    let encodings = col_metadata.encodings;
333    let compression = col_metadata.codec;
334    let file_path = column.file_path.map(|v| v.to_owned());
335    let file_offset = column.file_offset;
336    let num_values = col_metadata.num_values;
337    let total_compressed_size = col_metadata.total_compressed_size;
338    let total_uncompressed_size = col_metadata.total_uncompressed_size;
339    let data_page_offset = col_metadata.data_page_offset;
340    let index_page_offset = col_metadata.index_page_offset;
341    let dictionary_page_offset = col_metadata.dictionary_page_offset;
342    let statistics = convert_stats(column_type, col_metadata.statistics)?;
343    let encoding_stats = col_metadata.encoding_stats;
344    let bloom_filter_offset = col_metadata.bloom_filter_offset;
345    let bloom_filter_length = col_metadata.bloom_filter_length;
346    let offset_index_offset = column.offset_index_offset;
347    let offset_index_length = column.offset_index_length;
348    let column_index_offset = column.column_index_offset;
349    let column_index_length = column.column_index_length;
350    let (unencoded_byte_array_data_bytes, repetition_level_histogram, definition_level_histogram) =
351        if let Some(size_stats) = col_metadata.size_statistics {
352            (
353                size_stats.unencoded_byte_array_data_bytes,
354                size_stats.repetition_level_histogram,
355                size_stats.definition_level_histogram,
356            )
357        } else {
358            (None, None, None)
359        };
360
361    let geo_statistics = convert_geo_stats(col_metadata.geospatial_statistics);
362
363    let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from);
364    let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from);
365
366    let result = ColumnChunkMetaData {
367        column_descr,
368        encodings,
369        file_path,
370        file_offset,
371        num_values,
372        compression,
373        total_compressed_size,
374        total_uncompressed_size,
375        data_page_offset,
376        index_page_offset,
377        dictionary_page_offset,
378        statistics,
379        geo_statistics,
380        encoding_stats,
381        bloom_filter_offset,
382        bloom_filter_length,
383        offset_index_offset,
384        offset_index_length,
385        column_index_offset,
386        column_index_length,
387        unencoded_byte_array_data_bytes,
388        repetition_level_histogram,
389        definition_level_histogram,
390        #[cfg(feature = "encryption")]
391        column_crypto_metadata: column.crypto_metadata,
392        #[cfg(feature = "encryption")]
393        encrypted_column_metadata: None,
394    };
395    Ok(result)
396}
397
398fn convert_geo_stats(
399    stats: Option<GeospatialStatistics>,
400) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
401    stats.map(|st| {
402        let bbox = convert_bounding_box(st.bbox);
403        let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
404        Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
405            bbox,
406            geospatial_types,
407        ))
408    })
409}
410
411fn convert_bounding_box(
412    bbox: Option<BoundingBox>,
413) -> Option<crate::geospatial::bounding_box::BoundingBox> {
414    bbox.map(|bb| {
415        let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
416            bb.xmin.into(),
417            bb.xmax.into(),
418            bb.ymin.into(),
419            bb.ymax.into(),
420        );
421
422        newbb = match (bb.zmin, bb.zmax) {
423            (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
424            // If either None or mismatch, leave it as None and don't error
425            _ => newbb,
426        };
427
428        newbb = match (bb.mmin, bb.mmax) {
429            (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
430            // If either None or mismatch, leave it as None and don't error
431            _ => newbb,
432        };
433
434        newbb
435    })
436}
437
438pub(crate) fn convert_stats(
439    physical_type: Type,
440    thrift_stats: Option<Statistics>,
441) -> Result<Option<crate::file::statistics::Statistics>> {
442    use crate::file::statistics::Statistics as FStatistics;
443    Ok(match thrift_stats {
444        Some(stats) => {
445            // Number of nulls recorded, when it is not available, we just mark it as 0.
446            // TODO this should be `None` if there is no information about NULLS.
447            // see https://github.com/apache/arrow-rs/pull/6216/files
448            let null_count = stats.null_count.unwrap_or(0);
449
450            if null_count < 0 {
451                return Err(ParquetError::General(format!(
452                    "Statistics null count is negative {null_count}",
453                )));
454            }
455
456            // Generic null count.
457            let null_count = Some(null_count as u64);
458            // Generic distinct count (count of distinct values occurring)
459            let distinct_count = stats.distinct_count.map(|value| value as u64);
460            // Whether or not statistics use deprecated min/max fields.
461            let old_format = stats.min_value.is_none() && stats.max_value.is_none();
462            // Generic min value as bytes.
463            let min = if old_format {
464                stats.min
465            } else {
466                stats.min_value
467            };
468            // Generic max value as bytes.
469            let max = if old_format {
470                stats.max
471            } else {
472                stats.max_value
473            };
474
475            fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
476                if let Some(min) = min {
477                    if min.len() < len {
478                        return Err(ParquetError::General(
479                            "Insufficient bytes to parse min statistic".to_string(),
480                        ));
481                    }
482                }
483                if let Some(max) = max {
484                    if max.len() < len {
485                        return Err(ParquetError::General(
486                            "Insufficient bytes to parse max statistic".to_string(),
487                        ));
488                    }
489                }
490                Ok(())
491            }
492
493            match physical_type {
494                Type::BOOLEAN => check_len(&min, &max, 1),
495                Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
496                Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
497                Type::INT96 => check_len(&min, &max, 12),
498                _ => Ok(()),
499            }?;
500
501            // Values are encoded using PLAIN encoding definition, except that
502            // variable-length byte arrays do not include a length prefix.
503            //
504            // Instead of using actual decoder, we manually convert values.
505            let res = match physical_type {
506                Type::BOOLEAN => FStatistics::boolean(
507                    min.map(|data| data[0] != 0),
508                    max.map(|data| data[0] != 0),
509                    distinct_count,
510                    null_count,
511                    old_format,
512                ),
513                Type::INT32 => FStatistics::int32(
514                    min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
515                    max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
516                    distinct_count,
517                    null_count,
518                    old_format,
519                ),
520                Type::INT64 => FStatistics::int64(
521                    min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
522                    max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
523                    distinct_count,
524                    null_count,
525                    old_format,
526                ),
527                Type::INT96 => {
528                    // INT96 statistics may not be correct, because comparison is signed
529                    let min = if let Some(data) = min {
530                        assert_eq!(data.len(), 12);
531                        Some(Int96::try_from_le_slice(data)?)
532                    } else {
533                        None
534                    };
535                    let max = if let Some(data) = max {
536                        assert_eq!(data.len(), 12);
537                        Some(Int96::try_from_le_slice(data)?)
538                    } else {
539                        None
540                    };
541                    FStatistics::int96(min, max, distinct_count, null_count, old_format)
542                }
543                Type::FLOAT => FStatistics::float(
544                    min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
545                    max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
546                    distinct_count,
547                    null_count,
548                    old_format,
549                ),
550                Type::DOUBLE => FStatistics::double(
551                    min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
552                    max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
553                    distinct_count,
554                    null_count,
555                    old_format,
556                ),
557                Type::BYTE_ARRAY => FStatistics::ByteArray(
558                    ValueStatistics::new(
559                        min.map(ByteArray::from),
560                        max.map(ByteArray::from),
561                        distinct_count,
562                        null_count,
563                        old_format,
564                    )
565                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
566                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
567                ),
568                Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
569                    ValueStatistics::new(
570                        min.map(ByteArray::from).map(FixedLenByteArray::from),
571                        max.map(ByteArray::from).map(FixedLenByteArray::from),
572                        distinct_count,
573                        null_count,
574                        old_format,
575                    )
576                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
577                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
578                ),
579            };
580
581            Some(res)
582        }
583        None => None,
584    })
585}
586
587#[cfg(feature = "encryption")]
588fn row_group_from_encrypted_thrift(
589    mut rg: RowGroup,
590    schema_descr: SchemaDescPtr,
591    decryptor: Option<&FileDecryptor>,
592) -> Result<RowGroupMetaData> {
593    if schema_descr.num_columns() != rg.columns.len() {
594        return Err(general_err!(
595            "Column count mismatch. Schema has {} columns while Row Group has {}",
596            schema_descr.num_columns(),
597            rg.columns.len()
598        ));
599    }
600    let total_byte_size = rg.total_byte_size;
601    let num_rows = rg.num_rows;
602    let mut columns = vec![];
603
604    for (i, (mut c, d)) in rg
605        .columns
606        .drain(0..)
607        .zip(schema_descr.columns())
608        .enumerate()
609    {
610        // Read encrypted metadata if it's present and we have a decryptor.
611        if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
612            let column_decryptor = match c.crypto_metadata.as_ref() {
613                None => {
614                    return Err(general_err!(
615                        "No crypto_metadata is set for column '{}', which has encrypted metadata",
616                        d.path().string()
617                    ));
618                }
619                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
620                    let column_name = crypto_metadata.path_in_schema.join(".");
621                    decryptor.get_column_metadata_decryptor(
622                        column_name.as_str(),
623                        crypto_metadata.key_metadata.as_deref(),
624                    )?
625                }
626                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
627                    decryptor.get_footer_decryptor()?
628                }
629            };
630
631            let column_aad = crate::encryption::modules::create_module_aad(
632                decryptor.file_aad(),
633                crate::encryption::modules::ModuleType::ColumnMetaData,
634                rg.ordinal.unwrap() as usize,
635                i,
636                None,
637            )?;
638
639            let buf = c.encrypted_column_metadata.unwrap();
640            let decrypted_cc_buf =
641                column_decryptor
642                    .decrypt(buf, column_aad.as_ref())
643                    .map_err(|_| {
644                        general_err!(
645                            "Unable to decrypt column '{}', perhaps the column key is wrong?",
646                            d.path().string()
647                        )
648                    })?;
649
650            let mut prot = ThriftSliceInputProtocol::new(decrypted_cc_buf.as_slice());
651            let col_meta = ColumnMetaData::read_thrift(&mut prot)?;
652            c.meta_data = Some(col_meta);
653            columns.push(convert_column(c, d.clone())?);
654        } else {
655            columns.push(convert_column(c, d.clone())?);
656        }
657    }
658
659    let sorting_columns = rg.sorting_columns;
660    let file_offset = rg.file_offset;
661    let ordinal = rg.ordinal;
662
663    Ok(RowGroupMetaData {
664        columns,
665        num_rows,
666        sorting_columns,
667        total_byte_size,
668        schema_descr,
669        file_offset,
670        ordinal,
671    })
672}
673
674#[cfg(feature = "encryption")]
675/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted.
676///
677/// Typically this is used to decode the metadata from the end of a parquet
678/// file. The format of `buf` is the Thrift compact binary protocol, as specified
679/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
680/// ciphers as specfied in the [Parquet Encryption Spec].
681///
682/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
683/// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
684pub(crate) fn parquet_metadata_with_encryption(
685    file_decryption_properties: Option<&FileDecryptionProperties>,
686    encrypted_footer: bool,
687    buf: &[u8],
688) -> Result<ParquetMetaData> {
689    let mut prot = ThriftSliceInputProtocol::new(buf);
690    let mut file_decryptor = None;
691    let decrypted_fmd_buf;
692
693    if encrypted_footer {
694        if let Some(file_decryption_properties) = file_decryption_properties {
695            let t_file_crypto_metadata: FileCryptoMetaData =
696                FileCryptoMetaData::read_thrift(&mut prot)
697                    .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
698            let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
699                EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix,
700                _ => Some(false),
701            }
702            .unwrap_or(false);
703            if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
704                return Err(general_err!(
705                        "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
706                        but no AAD prefix was provided in the file decryption properties"
707                    ));
708            }
709            let decryptor = get_file_decryptor(
710                t_file_crypto_metadata.encryption_algorithm,
711                t_file_crypto_metadata.key_metadata,
712                file_decryption_properties,
713            )?;
714            let footer_decryptor = decryptor.get_footer_decryptor();
715            let aad_footer = crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
716
717            decrypted_fmd_buf = footer_decryptor?
718                .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
719                .map_err(|_| {
720                    general_err!(
721                        "Provided footer key and AAD were unable to decrypt parquet footer"
722                    )
723                })?;
724            prot = ThriftSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
725
726            file_decryptor = Some(decryptor);
727        } else {
728            return Err(general_err!(
729                "Parquet file has an encrypted footer but decryption properties were not provided"
730            ));
731        }
732    }
733
734    let file_meta = FileMetaData::read_thrift(&mut prot)
735        .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
736
737    let version = file_meta.version;
738    let num_rows = file_meta.num_rows;
739    let created_by = file_meta.created_by.map(|c| c.to_owned());
740    let key_value_metadata = file_meta.key_value_metadata;
741
742    let val = parquet_schema_from_array(file_meta.schema)?;
743    let schema_descr = Arc::new(SchemaDescriptor::new(val));
744
745    if let (Some(algo), Some(file_decryption_properties)) =
746        (file_meta.encryption_algorithm, file_decryption_properties)
747    {
748        // File has a plaintext footer but encryption algorithm is set
749        let file_decryptor_value = get_file_decryptor(
750            algo,
751            file_meta.footer_signing_key_metadata,
752            file_decryption_properties,
753        )?;
754        if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
755            file_decryptor_value.verify_plaintext_footer_signature(buf)?;
756        }
757        file_decryptor = Some(file_decryptor_value);
758    }
759
760    // decrypt column chunk info
761    let mut row_groups = Vec::with_capacity(file_meta.row_groups.len());
762    for rg in file_meta.row_groups {
763        let r = row_group_from_encrypted_thrift(rg, schema_descr.clone(), file_decryptor.as_ref())?;
764        row_groups.push(r);
765    }
766
767    // need to map read column orders to actual values based on the schema
768    if file_meta
769        .column_orders
770        .as_ref()
771        .is_some_and(|cos| cos.len() != schema_descr.num_columns())
772    {
773        return Err(general_err!("Column order length mismatch"));
774    }
775
776    let column_orders = file_meta.column_orders.map(|cos| {
777        let mut res = Vec::with_capacity(cos.len());
778        for (i, column) in schema_descr.columns().iter().enumerate() {
779            match cos[i] {
780                ColumnOrder::TYPE_DEFINED_ORDER(_) => {
781                    let sort_order = ColumnOrder::get_sort_order(
782                        column.logical_type(),
783                        column.converted_type(),
784                        column.physical_type(),
785                    );
786                    res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
787                }
788                _ => res.push(cos[i]),
789            }
790        }
791        res
792    });
793
794    let fmd = crate::file::metadata::FileMetaData::new(
795        version,
796        num_rows,
797        created_by,
798        key_value_metadata,
799        schema_descr,
800        column_orders,
801    );
802    let mut metadata = ParquetMetaData::new(fmd, row_groups);
803
804    metadata.with_file_decryptor(file_decryptor);
805
806    Ok(metadata)
807}
808
809#[cfg(feature = "encryption")]
810fn get_file_decryptor(
811    encryption_algorithm: EncryptionAlgorithm,
812    footer_key_metadata: Option<&[u8]>,
813    file_decryption_properties: &FileDecryptionProperties,
814) -> Result<FileDecryptor> {
815    match encryption_algorithm {
816        EncryptionAlgorithm::AES_GCM_V1(algo) => {
817            let aad_file_unique = algo
818                .aad_file_unique
819                .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
820            let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
821                aad_prefix.clone()
822            } else {
823                algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
824            };
825            let aad_file_unique = aad_file_unique.to_vec();
826
827            FileDecryptor::new(
828                file_decryption_properties,
829                footer_key_metadata,
830                aad_file_unique,
831                aad_prefix,
832            )
833        }
834        EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
835            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
836        )),
837    }
838}
839
840/// Create ParquetMetaData from thrift input. Note that this only decodes the file metadata in
841/// the Parquet footer. Page indexes will need to be added later.
842impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ParquetMetaData {
843    fn read_thrift(prot: &mut R) -> Result<Self> {
844        let file_meta = FileMetaData::read_thrift(prot)?;
845
846        let version = file_meta.version;
847        let num_rows = file_meta.num_rows;
848        let row_groups = file_meta.row_groups;
849        let created_by = file_meta.created_by.map(|c| c.to_owned());
850        let key_value_metadata = file_meta.key_value_metadata;
851
852        let val = parquet_schema_from_array(file_meta.schema)?;
853        let schema_descr = Arc::new(SchemaDescriptor::new(val));
854
855        // need schema_descr to get final RowGroupMetaData
856        let row_groups = convert_row_groups(row_groups, schema_descr.clone())?;
857
858        // need to map read column orders to actual values based on the schema
859        if file_meta
860            .column_orders
861            .as_ref()
862            .is_some_and(|cos| cos.len() != schema_descr.num_columns())
863        {
864            return Err(general_err!("Column order length mismatch"));
865        }
866
867        let column_orders = file_meta.column_orders.map(|cos| {
868            let mut res = Vec::with_capacity(cos.len());
869            for (i, column) in schema_descr.columns().iter().enumerate() {
870                match cos[i] {
871                    ColumnOrder::TYPE_DEFINED_ORDER(_) => {
872                        let sort_order = ColumnOrder::get_sort_order(
873                            column.logical_type(),
874                            column.converted_type(),
875                            column.physical_type(),
876                        );
877                        res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
878                    }
879                    _ => res.push(cos[i]),
880                }
881            }
882            res
883        });
884
885        let fmd = crate::file::metadata::FileMetaData::new(
886            version,
887            num_rows,
888            created_by,
889            key_value_metadata,
890            schema_descr,
891            column_orders,
892        );
893
894        Ok(ParquetMetaData::new(fmd, row_groups))
895    }
896}
897
898thrift_struct!(
899    pub(crate) struct IndexPageHeader {}
900);
901
902thrift_struct!(
903pub(crate) struct DictionaryPageHeader {
904  /// Number of values in the dictionary
905  1: required i32 num_values;
906
907  /// Encoding using this dictionary page
908  2: required Encoding encoding
909
910  /// If true, the entries in the dictionary are sorted in ascending order
911  3: optional bool is_sorted;
912}
913);
914
915thrift_struct!(
916/// Statistics for the page header.
917///
918/// This is a duplicate of the [`Statistics`] struct above. Because the page reader uses
919/// the [`Read`] API, we cannot read the min/max values as slices. This should not be
920/// a huge problem since this crate no longer reads the page header statistics by default.
921///
922/// [`Read`]: crate::parquet_thrift::ThriftReadInputProtocol
923pub(crate) struct PageStatistics {
924   1: optional binary max;
925   2: optional binary min;
926   3: optional i64 null_count;
927   4: optional i64 distinct_count;
928   5: optional binary max_value;
929   6: optional binary min_value;
930   7: optional bool is_max_value_exact;
931   8: optional bool is_min_value_exact;
932}
933);
934
935thrift_struct!(
936pub(crate) struct DataPageHeader {
937  1: required i32 num_values
938  2: required Encoding encoding
939  3: required Encoding definition_level_encoding;
940  4: required Encoding repetition_level_encoding;
941  5: optional PageStatistics statistics;
942}
943);
944
945impl DataPageHeader {
946    // reader that skips decoding page statistics
947    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
948    where
949        R: ThriftCompactInputProtocol<'a>,
950    {
951        let mut num_values: Option<i32> = None;
952        let mut encoding: Option<Encoding> = None;
953        let mut definition_level_encoding: Option<Encoding> = None;
954        let mut repetition_level_encoding: Option<Encoding> = None;
955        let statistics: Option<PageStatistics> = None;
956        let mut last_field_id = 0i16;
957        loop {
958            let field_ident = prot.read_field_begin(last_field_id)?;
959            if field_ident.field_type == FieldType::Stop {
960                break;
961            }
962            match field_ident.id {
963                1 => {
964                    let val = i32::read_thrift(&mut *prot)?;
965                    num_values = Some(val);
966                }
967                2 => {
968                    let val = Encoding::read_thrift(&mut *prot)?;
969                    encoding = Some(val);
970                }
971                3 => {
972                    let val = Encoding::read_thrift(&mut *prot)?;
973                    definition_level_encoding = Some(val);
974                }
975                4 => {
976                    let val = Encoding::read_thrift(&mut *prot)?;
977                    repetition_level_encoding = Some(val);
978                }
979                _ => {
980                    prot.skip(field_ident.field_type)?;
981                }
982            };
983            last_field_id = field_ident.id;
984        }
985        let Some(num_values) = num_values else {
986            return Err(ParquetError::General(
987                "Required field num_values is missing".to_owned(),
988            ));
989        };
990        let Some(encoding) = encoding else {
991            return Err(ParquetError::General(
992                "Required field encoding is missing".to_owned(),
993            ));
994        };
995        let Some(definition_level_encoding) = definition_level_encoding else {
996            return Err(ParquetError::General(
997                "Required field definition_level_encoding is missing".to_owned(),
998            ));
999        };
1000        let Some(repetition_level_encoding) = repetition_level_encoding else {
1001            return Err(ParquetError::General(
1002                "Required field repetition_level_encoding is missing".to_owned(),
1003            ));
1004        };
1005        Ok(Self {
1006            num_values,
1007            encoding,
1008            definition_level_encoding,
1009            repetition_level_encoding,
1010            statistics,
1011        })
1012    }
1013}
1014
1015thrift_struct!(
1016pub(crate) struct DataPageHeaderV2 {
1017  1: required i32 num_values
1018  2: required i32 num_nulls
1019  3: required i32 num_rows
1020  4: required Encoding encoding
1021  5: required i32 definition_levels_byte_length;
1022  6: required i32 repetition_levels_byte_length;
1023  7: optional bool is_compressed = true;
1024  8: optional PageStatistics statistics;
1025}
1026);
1027
1028impl DataPageHeaderV2 {
1029    // reader that skips decoding page statistics
1030    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1031    where
1032        R: ThriftCompactInputProtocol<'a>,
1033    {
1034        let mut num_values: Option<i32> = None;
1035        let mut num_nulls: Option<i32> = None;
1036        let mut num_rows: Option<i32> = None;
1037        let mut encoding: Option<Encoding> = None;
1038        let mut definition_levels_byte_length: Option<i32> = None;
1039        let mut repetition_levels_byte_length: Option<i32> = None;
1040        let mut is_compressed: Option<bool> = None;
1041        let statistics: Option<PageStatistics> = None;
1042        let mut last_field_id = 0i16;
1043        loop {
1044            let field_ident = prot.read_field_begin(last_field_id)?;
1045            if field_ident.field_type == FieldType::Stop {
1046                break;
1047            }
1048            match field_ident.id {
1049                1 => {
1050                    let val = i32::read_thrift(&mut *prot)?;
1051                    num_values = Some(val);
1052                }
1053                2 => {
1054                    let val = i32::read_thrift(&mut *prot)?;
1055                    num_nulls = Some(val);
1056                }
1057                3 => {
1058                    let val = i32::read_thrift(&mut *prot)?;
1059                    num_rows = Some(val);
1060                }
1061                4 => {
1062                    let val = Encoding::read_thrift(&mut *prot)?;
1063                    encoding = Some(val);
1064                }
1065                5 => {
1066                    let val = i32::read_thrift(&mut *prot)?;
1067                    definition_levels_byte_length = Some(val);
1068                }
1069                6 => {
1070                    let val = i32::read_thrift(&mut *prot)?;
1071                    repetition_levels_byte_length = Some(val);
1072                }
1073                7 => {
1074                    let val = field_ident.bool_val.unwrap();
1075                    is_compressed = Some(val);
1076                }
1077                _ => {
1078                    prot.skip(field_ident.field_type)?;
1079                }
1080            };
1081            last_field_id = field_ident.id;
1082        }
1083        let Some(num_values) = num_values else {
1084            return Err(ParquetError::General(
1085                "Required field num_values is missing".to_owned(),
1086            ));
1087        };
1088        let Some(num_nulls) = num_nulls else {
1089            return Err(ParquetError::General(
1090                "Required field num_nulls is missing".to_owned(),
1091            ));
1092        };
1093        let Some(num_rows) = num_rows else {
1094            return Err(ParquetError::General(
1095                "Required field num_rows is missing".to_owned(),
1096            ));
1097        };
1098        let Some(encoding) = encoding else {
1099            return Err(ParquetError::General(
1100                "Required field encoding is missing".to_owned(),
1101            ));
1102        };
1103        let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1104            return Err(ParquetError::General(
1105                "Required field definition_levels_byte_length is missing".to_owned(),
1106            ));
1107        };
1108        let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1109            return Err(ParquetError::General(
1110                "Required field repetition_levels_byte_length is missing".to_owned(),
1111            ));
1112        };
1113        Ok(Self {
1114            num_values,
1115            num_nulls,
1116            num_rows,
1117            encoding,
1118            definition_levels_byte_length,
1119            repetition_levels_byte_length,
1120            is_compressed,
1121            statistics,
1122        })
1123    }
1124}
1125
1126thrift_struct!(
1127pub(crate) struct PageHeader {
1128  /// the type of the page: indicates which of the *_header fields is set
1129  1: required PageType r#type
1130
1131  /// Uncompressed page size in bytes (not including this header)
1132  2: required i32 uncompressed_page_size
1133
1134  /// Compressed (and potentially encrypted) page size in bytes, not including this header
1135  3: required i32 compressed_page_size
1136
1137  /// The 32-bit CRC checksum for the page, to be be calculated as follows:
1138  4: optional i32 crc
1139
1140  // Headers for page specific data.  One only will be set.
1141  5: optional DataPageHeader data_page_header;
1142  6: optional IndexPageHeader index_page_header;
1143  7: optional DictionaryPageHeader dictionary_page_header;
1144  8: optional DataPageHeaderV2 data_page_header_v2;
1145}
1146);
1147
1148impl PageHeader {
1149    // reader that skips reading page statistics. obtained by running
1150    // `cargo expand -p parquet --all-features --lib file::metadata::thrift_gen`
1151    // and modifying the impl of `read_thrift`
1152    pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1153    where
1154        R: ThriftCompactInputProtocol<'a>,
1155    {
1156        let mut type_: Option<PageType> = None;
1157        let mut uncompressed_page_size: Option<i32> = None;
1158        let mut compressed_page_size: Option<i32> = None;
1159        let mut crc: Option<i32> = None;
1160        let mut data_page_header: Option<DataPageHeader> = None;
1161        let mut index_page_header: Option<IndexPageHeader> = None;
1162        let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1163        let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1164        let mut last_field_id = 0i16;
1165        loop {
1166            let field_ident = prot.read_field_begin(last_field_id)?;
1167            if field_ident.field_type == FieldType::Stop {
1168                break;
1169            }
1170            match field_ident.id {
1171                1 => {
1172                    let val = PageType::read_thrift(&mut *prot)?;
1173                    type_ = Some(val);
1174                }
1175                2 => {
1176                    let val = i32::read_thrift(&mut *prot)?;
1177                    uncompressed_page_size = Some(val);
1178                }
1179                3 => {
1180                    let val = i32::read_thrift(&mut *prot)?;
1181                    compressed_page_size = Some(val);
1182                }
1183                4 => {
1184                    let val = i32::read_thrift(&mut *prot)?;
1185                    crc = Some(val);
1186                }
1187                5 => {
1188                    let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1189                    data_page_header = Some(val);
1190                }
1191                6 => {
1192                    let val = IndexPageHeader::read_thrift(&mut *prot)?;
1193                    index_page_header = Some(val);
1194                }
1195                7 => {
1196                    let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1197                    dictionary_page_header = Some(val);
1198                }
1199                8 => {
1200                    let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1201                    data_page_header_v2 = Some(val);
1202                }
1203                _ => {
1204                    prot.skip(field_ident.field_type)?;
1205                }
1206            };
1207            last_field_id = field_ident.id;
1208        }
1209        let Some(type_) = type_ else {
1210            return Err(ParquetError::General(
1211                "Required field type_ is missing".to_owned(),
1212            ));
1213        };
1214        let Some(uncompressed_page_size) = uncompressed_page_size else {
1215            return Err(ParquetError::General(
1216                "Required field uncompressed_page_size is missing".to_owned(),
1217            ));
1218        };
1219        let Some(compressed_page_size) = compressed_page_size else {
1220            return Err(ParquetError::General(
1221                "Required field compressed_page_size is missing".to_owned(),
1222            ));
1223        };
1224        Ok(Self {
1225            r#type: type_,
1226            uncompressed_page_size,
1227            compressed_page_size,
1228            crc,
1229            data_page_header,
1230            index_page_header,
1231            dictionary_page_header,
1232            data_page_header_v2,
1233        })
1234    }
1235}
1236
1237/////////////////////////////////////////////////
1238// helper functions for writing file meta data
1239
1240// serialize the bits of the column chunk needed for a thrift ColumnMetaData
1241// struct ColumnMetaData {
1242//   1: required Type type
1243//   2: required list<Encoding> encodings
1244//   3: required list<string> path_in_schema
1245//   4: required CompressionCodec codec
1246//   5: required i64 num_values
1247//   6: required i64 total_uncompressed_size
1248//   7: required i64 total_compressed_size
1249//   8: optional list<KeyValue> key_value_metadata
1250//   9: required i64 data_page_offset
1251//   10: optional i64 index_page_offset
1252//   11: optional i64 dictionary_page_offset
1253//   12: optional Statistics statistics;
1254//   13: optional list<PageEncodingStats> encoding_stats;
1255//   14: optional i64 bloom_filter_offset;
1256//   15: optional i32 bloom_filter_length;
1257//   16: optional SizeStatistics size_statistics;
1258//   17: optional GeospatialStatistics geospatial_statistics;
1259// }
1260pub(crate) fn serialize_column_meta_data<W: Write>(
1261    column_chunk: &ColumnChunkMetaData,
1262    w: &mut ThriftCompactOutputProtocol<W>,
1263) -> Result<()> {
1264    use crate::file::statistics::page_stats_to_thrift;
1265
1266    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1267    column_chunk.encodings.write_thrift_field(w, 2, 1)?;
1268    let path = column_chunk.column_descr.path().parts();
1269    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1270    path.write_thrift_field(w, 3, 2)?;
1271    column_chunk.compression.write_thrift_field(w, 4, 3)?;
1272    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1273    column_chunk
1274        .total_uncompressed_size
1275        .write_thrift_field(w, 6, 5)?;
1276    column_chunk
1277        .total_compressed_size
1278        .write_thrift_field(w, 7, 6)?;
1279    // no key_value_metadata here
1280    let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1281    if let Some(index_page_offset) = column_chunk.index_page_offset {
1282        last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1283    }
1284    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1285        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1286    }
1287    // PageStatistics is the same as thrift Statistics, but writable
1288    let stats = page_stats_to_thrift(column_chunk.statistics());
1289    if let Some(stats) = stats {
1290        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1291    }
1292    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1293        last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1294    }
1295    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1296        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1297    }
1298    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1299        last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1300    }
1301
1302    // SizeStatistics
1303    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1304        || column_chunk.repetition_level_histogram.is_some()
1305        || column_chunk.definition_level_histogram.is_some()
1306    {
1307        let repetition_level_histogram = column_chunk
1308            .repetition_level_histogram()
1309            .map(|hist| hist.clone().into_inner());
1310
1311        let definition_level_histogram = column_chunk
1312            .definition_level_histogram()
1313            .map(|hist| hist.clone().into_inner());
1314
1315        Some(SizeStatistics {
1316            unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1317            repetition_level_histogram,
1318            definition_level_histogram,
1319        })
1320    } else {
1321        None
1322    };
1323    if let Some(size_stats) = size_stats {
1324        last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1325    }
1326
1327    if let Some(geo_stats) = column_chunk.geo_statistics() {
1328        geo_stats.write_thrift_field(w, 17, last_field_id)?;
1329    }
1330
1331    w.write_struct_end()
1332}
1333
1334// temp struct used for writing
1335pub(crate) struct FileMeta<'a> {
1336    pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
1337    pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
1338    pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
1339    pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
1340}
1341
1342impl<'a> WriteThrift for FileMeta<'a> {
1343    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1344
1345    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1346        self.file_metadata
1347            .version
1348            .write_thrift_field(writer, 1, 0)?;
1349
1350        // field 2 is schema. do depth-first traversal of tree, converting to SchemaElement and
1351        // writing along the way.
1352        let root = self.file_metadata.schema_descr().root_schema_ptr();
1353        let schema_len = num_nodes(&root)?;
1354        writer.write_field_begin(FieldType::List, 2, 1)?;
1355        writer.write_list_begin(ElementType::Struct, schema_len)?;
1356        // recursively write Type nodes as SchemaElements
1357        write_schema(&root, writer)?;
1358
1359        self.file_metadata
1360            .num_rows
1361            .write_thrift_field(writer, 3, 2)?;
1362
1363        // this will call RowGroupMetaData::write_thrift
1364        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1365
1366        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1367            last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1368        }
1369        if let Some(created_by) = self.file_metadata.created_by() {
1370            last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1371        }
1372        if let Some(column_orders) = self.file_metadata.column_orders() {
1373            last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1374        }
1375        if let Some(algo) = self.encryption_algorithm.as_ref() {
1376            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1377        }
1378        if let Some(key) = self.footer_signing_key_metadata.as_ref() {
1379            key.as_slice()
1380                .write_thrift_field(writer, 9, last_field_id)?;
1381        }
1382
1383        writer.write_struct_end()
1384    }
1385}
1386
1387fn write_schema<W: Write>(
1388    schema: &TypePtr,
1389    writer: &mut ThriftCompactOutputProtocol<W>,
1390) -> Result<()> {
1391    if !schema.is_group() {
1392        return Err(general_err!("Root schema must be Group type"));
1393    }
1394    write_schema_helper(schema, writer)
1395}
1396
1397fn write_schema_helper<W: Write>(
1398    node: &TypePtr,
1399    writer: &mut ThriftCompactOutputProtocol<W>,
1400) -> Result<()> {
1401    match node.as_ref() {
1402        crate::schema::types::Type::PrimitiveType {
1403            basic_info,
1404            physical_type,
1405            type_length,
1406            scale,
1407            precision,
1408        } => {
1409            let element = SchemaElement {
1410                r#type: Some(*physical_type),
1411                type_length: if *type_length >= 0 {
1412                    Some(*type_length)
1413                } else {
1414                    None
1415                },
1416                repetition_type: Some(basic_info.repetition()),
1417                name: basic_info.name(),
1418                num_children: None,
1419                converted_type: match basic_info.converted_type() {
1420                    ConvertedType::NONE => None,
1421                    other => Some(other),
1422                },
1423                scale: if *scale >= 0 { Some(*scale) } else { None },
1424                precision: if *precision >= 0 {
1425                    Some(*precision)
1426                } else {
1427                    None
1428                },
1429                field_id: if basic_info.has_id() {
1430                    Some(basic_info.id())
1431                } else {
1432                    None
1433                },
1434                logical_type: basic_info.logical_type(),
1435            };
1436            element.write_thrift(writer)
1437        }
1438        crate::schema::types::Type::GroupType { basic_info, fields } => {
1439            let repetition = if basic_info.has_repetition() {
1440                Some(basic_info.repetition())
1441            } else {
1442                None
1443            };
1444
1445            let element = SchemaElement {
1446                r#type: None,
1447                type_length: None,
1448                repetition_type: repetition,
1449                name: basic_info.name(),
1450                num_children: Some(fields.len().try_into()?),
1451                converted_type: match basic_info.converted_type() {
1452                    ConvertedType::NONE => None,
1453                    other => Some(other),
1454                },
1455                scale: None,
1456                precision: None,
1457                field_id: if basic_info.has_id() {
1458                    Some(basic_info.id())
1459                } else {
1460                    None
1461                },
1462                logical_type: basic_info.logical_type(),
1463            };
1464
1465            element.write_thrift(writer)?;
1466
1467            // Add child elements for a group
1468            for field in fields {
1469                write_schema_helper(field, writer)?;
1470            }
1471            Ok(())
1472        }
1473    }
1474}
1475
1476// struct RowGroup {
1477//   1: required list<ColumnChunk> columns
1478//   2: required i64 total_byte_size
1479//   3: required i64 num_rows
1480//   4: optional list<SortingColumn> sorting_columns
1481//   5: optional i64 file_offset
1482//   6: optional i64 total_compressed_size
1483//   7: optional i16 ordinal
1484// }
1485impl WriteThrift for RowGroupMetaData {
1486    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1487
1488    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1489        // this will call ColumnChunkMetaData::write_thrift
1490        self.columns.write_thrift_field(writer, 1, 0)?;
1491        self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1492        let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1493        if let Some(sorting_columns) = self.sorting_columns() {
1494            last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1495        }
1496        if let Some(file_offset) = self.file_offset() {
1497            last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1498        }
1499        // this is optional, but we'll always write it
1500        last_field_id = self
1501            .compressed_size()
1502            .write_thrift_field(writer, 6, last_field_id)?;
1503        if let Some(ordinal) = self.ordinal() {
1504            ordinal.write_thrift_field(writer, 7, last_field_id)?;
1505        }
1506        writer.write_struct_end()
1507    }
1508}
1509
1510// struct ColumnChunk {
1511//   1: optional string file_path
1512//   2: required i64 file_offset = 0
1513//   3: optional ColumnMetaData meta_data
1514//   4: optional i64 offset_index_offset
1515//   5: optional i32 offset_index_length
1516//   6: optional i64 column_index_offset
1517//   7: optional i32 column_index_length
1518//   8: optional ColumnCryptoMetaData crypto_metadata
1519//   9: optional binary encrypted_column_metadata
1520// }
1521impl WriteThrift for ColumnChunkMetaData {
1522    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1523
1524    #[allow(unused_assignments)]
1525    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1526        let mut last_field_id = 0i16;
1527        if let Some(file_path) = self.file_path() {
1528            last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1529        }
1530        last_field_id = self
1531            .file_offset()
1532            .write_thrift_field(writer, 2, last_field_id)?;
1533
1534        #[cfg(feature = "encryption")]
1535        {
1536            // only write the ColumnMetaData if we haven't already encrypted it
1537            if self.encrypted_column_metadata.is_none() {
1538                writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1539                serialize_column_meta_data(self, writer)?;
1540                last_field_id = 3;
1541            }
1542        }
1543        #[cfg(not(feature = "encryption"))]
1544        {
1545            // always write the ColumnMetaData
1546            writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1547            serialize_column_meta_data(self, writer)?;
1548            last_field_id = 3;
1549        }
1550
1551        if let Some(offset_idx_off) = self.offset_index_offset() {
1552            last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1553        }
1554        if let Some(offset_idx_len) = self.offset_index_length() {
1555            last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1556        }
1557        if let Some(column_idx_off) = self.column_index_offset() {
1558            last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1559        }
1560        if let Some(column_idx_len) = self.column_index_length() {
1561            last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1562        }
1563        #[cfg(feature = "encryption")]
1564        {
1565            if let Some(crypto_metadata) = self.crypto_metadata() {
1566                last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1567            }
1568            if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1569                encrypted_meta
1570                    .as_slice()
1571                    .write_thrift_field(writer, 9, last_field_id)?;
1572            }
1573        }
1574
1575        writer.write_struct_end()
1576    }
1577}
1578
1579// struct GeospatialStatistics {
1580//   1: optional BoundingBox bbox;
1581//   2: optional list<i32> geospatial_types;
1582// }
1583impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1584    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1585
1586    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1587        let mut last_field_id = 0i16;
1588        if let Some(bbox) = self.bounding_box() {
1589            last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1590        }
1591        if let Some(geo_types) = self.geospatial_types() {
1592            geo_types.write_thrift_field(writer, 2, last_field_id)?;
1593        }
1594
1595        writer.write_struct_end()
1596    }
1597}
1598
1599impl WriteThriftField for crate::geospatial::statistics::GeospatialStatistics {
1600    fn write_thrift_field<W: Write>(
1601        &self,
1602        writer: &mut ThriftCompactOutputProtocol<W>,
1603        field_id: i16,
1604        last_field_id: i16,
1605    ) -> Result<i16> {
1606        writer.write_field_begin(FieldType::Struct, field_id, last_field_id)?;
1607        self.write_thrift(writer)?;
1608        Ok(field_id)
1609    }
1610}
1611
1612// struct BoundingBox {
1613//   1: required double xmin;
1614//   2: required double xmax;
1615//   3: required double ymin;
1616//   4: required double ymax;
1617//   5: optional double zmin;
1618//   6: optional double zmax;
1619//   7: optional double mmin;
1620//   8: optional double mmax;
1621// }
1622impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1623    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1624
1625    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1626        self.get_xmin().write_thrift_field(writer, 1, 0)?;
1627        self.get_xmax().write_thrift_field(writer, 2, 1)?;
1628        self.get_ymin().write_thrift_field(writer, 3, 2)?;
1629        let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1630
1631        if let Some(zmin) = self.get_zmin() {
1632            last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1633        }
1634        if let Some(zmax) = self.get_zmax() {
1635            last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1636        }
1637        if let Some(mmin) = self.get_mmin() {
1638            last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1639        }
1640        if let Some(mmax) = self.get_mmax() {
1641            mmax.write_thrift_field(writer, 8, last_field_id)?;
1642        }
1643
1644        writer.write_struct_end()
1645    }
1646}
1647
1648impl WriteThriftField for crate::geospatial::bounding_box::BoundingBox {
1649    fn write_thrift_field<W: Write>(
1650        &self,
1651        writer: &mut ThriftCompactOutputProtocol<W>,
1652        field_id: i16,
1653        last_field_id: i16,
1654    ) -> Result<i16> {
1655        writer.write_field_begin(FieldType::Struct, field_id, last_field_id)?;
1656        self.write_thrift(writer)?;
1657        Ok(field_id)
1658    }
1659}
1660
1661#[cfg(test)]
1662pub(crate) mod tests {
1663    use crate::errors::Result;
1664    use crate::file::metadata::thrift_gen::{
1665        convert_column, convert_row_group, write_schema, BoundingBox, ColumnChunk, RowGroup,
1666        SchemaElement,
1667    };
1668    use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1669    use crate::parquet_thrift::tests::test_roundtrip;
1670    use crate::parquet_thrift::{
1671        read_thrift_vec, ElementType, ReadThrift, ThriftCompactOutputProtocol,
1672        ThriftSliceInputProtocol,
1673    };
1674    use crate::schema::types::{
1675        num_nodes, parquet_schema_from_array, ColumnDescriptor, SchemaDescriptor, TypePtr,
1676    };
1677    use std::sync::Arc;
1678
1679    // for testing. decode thrift encoded RowGroup
1680    pub(crate) fn read_row_group(
1681        buf: &mut [u8],
1682        schema_descr: Arc<SchemaDescriptor>,
1683    ) -> Result<RowGroupMetaData> {
1684        let mut reader = ThriftSliceInputProtocol::new(buf);
1685        let rg = RowGroup::read_thrift(&mut reader)?;
1686        convert_row_group(rg, schema_descr)
1687    }
1688
1689    pub(crate) fn read_column_chunk(
1690        buf: &mut [u8],
1691        column_descr: Arc<ColumnDescriptor>,
1692    ) -> Result<ColumnChunkMetaData> {
1693        let mut reader = ThriftSliceInputProtocol::new(buf);
1694        let cc = ColumnChunk::read_thrift(&mut reader)?;
1695        convert_column(cc, column_descr)
1696    }
1697
1698    pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1699        let num_nodes = num_nodes(&schema)?;
1700        let mut buf = Vec::new();
1701        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1702
1703        // kick off writing list
1704        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1705
1706        // write SchemaElements
1707        write_schema(&schema, &mut writer)?;
1708
1709        let mut prot = ThriftSliceInputProtocol::new(&buf);
1710        let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1711        parquet_schema_from_array(se)
1712    }
1713
1714    pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1715        let num_nodes = num_nodes(schema)?;
1716        let mut buf = Vec::new();
1717        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1718
1719        // kick off writing list
1720        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1721
1722        // write SchemaElements
1723        write_schema(schema, &mut writer)?;
1724        Ok(buf)
1725    }
1726
1727    pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1728        let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1729        read_thrift_vec(&mut prot)
1730    }
1731
1732    #[test]
1733    fn test_bounding_box_roundtrip() {
1734        test_roundtrip(BoundingBox {
1735            xmin: 0.1.into(),
1736            xmax: 10.3.into(),
1737            ymin: 0.001.into(),
1738            ymax: 128.5.into(),
1739            zmin: None,
1740            zmax: None,
1741            mmin: None,
1742            mmax: None,
1743        });
1744
1745        test_roundtrip(BoundingBox {
1746            xmin: 0.1.into(),
1747            xmax: 10.3.into(),
1748            ymin: 0.001.into(),
1749            ymax: 128.5.into(),
1750            zmin: Some(11.0.into()),
1751            zmax: Some(1300.0.into()),
1752            mmin: None,
1753            mmax: None,
1754        });
1755
1756        test_roundtrip(BoundingBox {
1757            xmin: 0.1.into(),
1758            xmax: 10.3.into(),
1759            ymin: 0.001.into(),
1760            ymax: 128.5.into(),
1761            zmin: Some(11.0.into()),
1762            zmax: Some(1300.0.into()),
1763            mmin: Some(3.7.into()),
1764            mmax: Some(42.0.into()),
1765        });
1766    }
1767}