parquet/file/metadata/
parser.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Internal metadata parsing routines
19//!
20//! These functions parse thrift-encoded metadata from a byte slice
21//! into the corresponding Rust structures
22
23use crate::basic::ColumnOrder;
24use crate::errors::ParquetError;
25use crate::file::metadata::{
26    ColumnChunkMetaData, FileMetaData, PageIndexPolicy, ParquetMetaData, RowGroupMetaData,
27};
28use crate::file::page_index::index::Index;
29use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index};
30use crate::file::page_index::offset_index::OffsetIndexMetaData;
31use crate::schema::types;
32use crate::schema::types::SchemaDescriptor;
33use crate::thrift::TCompactSliceInputProtocol;
34use crate::thrift::TSerializable;
35use bytes::Bytes;
36use std::sync::Arc;
37
38#[cfg(feature = "encryption")]
39use crate::encryption::{
40    decrypt::{FileDecryptionProperties, FileDecryptor},
41    modules::create_footer_aad,
42};
43#[cfg(feature = "encryption")]
44use crate::format::EncryptionAlgorithm;
45
46/// Helper struct for metadata parsing
47///
48/// This structure parses thrift-encoded bytes into the correct Rust structs,
49/// such as [`ParquetMetaData`], handling decryption if necessary.
50//
51// Note this structure is used to minimize the number of
52// places need to add `#[cfg(feature = "encryption")]` checks.
53pub(crate) use inner::MetadataParser;
54
55#[cfg(feature = "encryption")]
56mod inner {
57    use super::*;
58    use crate::encryption::decrypt::FileDecryptionProperties;
59    use crate::errors::Result;
60
61    /// API for decoding metadata that may be encrypted
62    #[derive(Debug, Default)]
63    pub(crate) struct MetadataParser {
64        // the credentials and keys needed to decrypt metadata
65        file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
66    }
67
68    impl MetadataParser {
69        pub(crate) fn new() -> Self {
70            MetadataParser::default()
71        }
72
73        pub(crate) fn with_file_decryption_properties(
74            mut self,
75            file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
76        ) -> Self {
77            self.file_decryption_properties = file_decryption_properties;
78            self
79        }
80
81        pub(crate) fn decode_metadata(
82            &self,
83            buf: &[u8],
84            encrypted_footer: bool,
85        ) -> Result<ParquetMetaData> {
86            decode_metadata_with_encryption(
87                buf,
88                encrypted_footer,
89                self.file_decryption_properties.as_deref(),
90            )
91        }
92    }
93}
94
95#[cfg(not(feature = "encryption"))]
96mod inner {
97    use super::*;
98    use crate::errors::Result;
99    /// parallel implementation when encryption feature is not enabled
100    ///
101    /// This has the same API as the encryption-enabled version
102    #[derive(Debug, Default)]
103    pub(crate) struct MetadataParser;
104
105    impl MetadataParser {
106        pub(crate) fn new() -> Self {
107            MetadataParser
108        }
109
110        pub(crate) fn decode_metadata(
111            &self,
112            buf: &[u8],
113            encrypted_footer: bool,
114        ) -> Result<ParquetMetaData> {
115            if encrypted_footer {
116                Err(general_err!(
117                    "Parquet file has an encrypted footer but the encryption feature is disabled"
118                ))
119            } else {
120                decode_metadata(buf)
121            }
122        }
123    }
124}
125
126/// Decodes [`ParquetMetaData`] from the provided bytes.
127///
128/// Typically this is used to decode the metadata from the end of a parquet
129/// file. The format of `buf` is the Thrift compact binary protocol, as specified
130/// by the [Parquet Spec].
131///
132/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
133pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result<ParquetMetaData> {
134    let mut prot = TCompactSliceInputProtocol::new(buf);
135
136    let t_file_metadata: crate::format::FileMetaData =
137        crate::format::FileMetaData::read_from_in_protocol(&mut prot)
138            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
139    let schema = types::from_thrift(&t_file_metadata.schema)?;
140    let schema_descr = Arc::new(SchemaDescriptor::new(schema));
141
142    let mut row_groups = Vec::new();
143    for rg in t_file_metadata.row_groups {
144        row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
145    }
146    let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
147
148    let file_metadata = FileMetaData::new(
149        t_file_metadata.version,
150        t_file_metadata.num_rows,
151        t_file_metadata.created_by,
152        t_file_metadata.key_value_metadata,
153        schema_descr,
154        column_orders,
155    );
156
157    Ok(ParquetMetaData::new(file_metadata, row_groups))
158}
159
160/// Parses column orders from Thrift definition.
161/// If no column orders are defined, returns `None`.
162fn parse_column_orders(
163    t_column_orders: Option<Vec<crate::format::ColumnOrder>>,
164    schema_descr: &SchemaDescriptor,
165) -> crate::errors::Result<Option<Vec<ColumnOrder>>> {
166    match t_column_orders {
167        Some(orders) => {
168            // Should always be the case
169            if orders.len() != schema_descr.num_columns() {
170                return Err(general_err!("Column order length mismatch"));
171            };
172            let mut res = Vec::new();
173            for (i, column) in schema_descr.columns().iter().enumerate() {
174                match orders[i] {
175                    crate::format::ColumnOrder::TYPEORDER(_) => {
176                        let sort_order = ColumnOrder::get_sort_order(
177                            column.logical_type(),
178                            column.converted_type(),
179                            column.physical_type(),
180                        );
181                        res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
182                    }
183                }
184            }
185            Ok(Some(res))
186        }
187        None => Ok(None),
188    }
189}
190
191/// Parses column index from the provided bytes and adds it to the metadata.
192///
193/// Arguments
194/// * `metadata` - The ParquetMetaData to which the parsed column index will be added.
195/// * `column_index_policy` - The policy for handling column index parsing (e.g.,
196///   Required, Optional, Skip).
197/// * `bytes` - The byte slice containing the column index data.
198/// * `start_offset` - The offset where `bytes` begin in the file.
199pub(crate) fn parse_column_index(
200    metadata: &mut ParquetMetaData,
201    column_index_policy: PageIndexPolicy,
202    bytes: &Bytes,
203    start_offset: u64,
204) -> crate::errors::Result<()> {
205    if column_index_policy == PageIndexPolicy::Skip {
206        return Ok(());
207    }
208    let index = metadata
209        .row_groups()
210        .iter()
211        .enumerate()
212        .map(|(rg_idx, x)| {
213            x.columns()
214                .iter()
215                .enumerate()
216                .map(|(col_idx, c)| match c.column_index_range() {
217                    Some(r) => {
218                        let r_start = usize::try_from(r.start - start_offset)?;
219                        let r_end = usize::try_from(r.end - start_offset)?;
220                        parse_single_column_index(
221                            &bytes[r_start..r_end],
222                            metadata,
223                            c,
224                            rg_idx,
225                            col_idx,
226                        )
227                    }
228                    None => Ok(Index::NONE),
229                })
230                .collect::<crate::errors::Result<Vec<_>>>()
231        })
232        .collect::<crate::errors::Result<Vec<_>>>()?;
233
234    metadata.set_column_index(Some(index));
235    Ok(())
236}
237
238#[cfg(feature = "encryption")]
239fn parse_single_column_index(
240    bytes: &[u8],
241    metadata: &ParquetMetaData,
242    column: &ColumnChunkMetaData,
243    row_group_index: usize,
244    col_index: usize,
245) -> crate::errors::Result<Index> {
246    use crate::encryption::decrypt::CryptoContext;
247    match &column.column_crypto_metadata {
248        Some(crypto_metadata) => {
249            let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
250                general_err!("Cannot decrypt column index, no file decryptor set")
251            })?;
252            let crypto_context = CryptoContext::for_column(
253                file_decryptor,
254                crypto_metadata,
255                row_group_index,
256                col_index,
257            )?;
258            let column_decryptor = crypto_context.metadata_decryptor();
259            let aad = crypto_context.create_column_index_aad()?;
260            let plaintext = column_decryptor.decrypt(bytes, &aad)?;
261            decode_column_index(&plaintext, column.column_type())
262        }
263        None => decode_column_index(bytes, column.column_type()),
264    }
265}
266
267#[cfg(not(feature = "encryption"))]
268fn parse_single_column_index(
269    bytes: &[u8],
270    _metadata: &ParquetMetaData,
271    column: &ColumnChunkMetaData,
272    _row_group_index: usize,
273    _col_index: usize,
274) -> crate::errors::Result<Index> {
275    decode_column_index(bytes, column.column_type())
276}
277
278pub(crate) fn parse_offset_index(
279    metadata: &mut ParquetMetaData,
280    offset_index_policy: PageIndexPolicy,
281    bytes: &Bytes,
282    start_offset: u64,
283) -> crate::errors::Result<()> {
284    if offset_index_policy == PageIndexPolicy::Skip {
285        return Ok(());
286    }
287    let row_groups = metadata.row_groups();
288    let mut all_indexes = Vec::with_capacity(row_groups.len());
289    for (rg_idx, x) in row_groups.iter().enumerate() {
290        let mut row_group_indexes = Vec::with_capacity(x.columns().len());
291        for (col_idx, c) in x.columns().iter().enumerate() {
292            let result = match c.offset_index_range() {
293                Some(r) => {
294                    let r_start = usize::try_from(r.start - start_offset)?;
295                    let r_end = usize::try_from(r.end - start_offset)?;
296                    parse_single_offset_index(&bytes[r_start..r_end], metadata, c, rg_idx, col_idx)
297                }
298                None => Err(general_err!("missing offset index")),
299            };
300
301            match result {
302                Ok(index) => row_group_indexes.push(index),
303                Err(e) => {
304                    if offset_index_policy == PageIndexPolicy::Required {
305                        return Err(e);
306                    } else {
307                        // Invalidate and return
308                        metadata.set_column_index(None);
309                        metadata.set_offset_index(None);
310                        return Ok(());
311                    }
312                }
313            }
314        }
315        all_indexes.push(row_group_indexes);
316    }
317    metadata.set_offset_index(Some(all_indexes));
318    Ok(())
319}
320
321#[cfg(feature = "encryption")]
322fn parse_single_offset_index(
323    bytes: &[u8],
324    metadata: &ParquetMetaData,
325    column: &ColumnChunkMetaData,
326    row_group_index: usize,
327    col_index: usize,
328) -> crate::errors::Result<OffsetIndexMetaData> {
329    use crate::encryption::decrypt::CryptoContext;
330    match &column.column_crypto_metadata {
331        Some(crypto_metadata) => {
332            let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
333                general_err!("Cannot decrypt offset index, no file decryptor set")
334            })?;
335            let crypto_context = CryptoContext::for_column(
336                file_decryptor,
337                crypto_metadata,
338                row_group_index,
339                col_index,
340            )?;
341            let column_decryptor = crypto_context.metadata_decryptor();
342            let aad = crypto_context.create_offset_index_aad()?;
343            let plaintext = column_decryptor.decrypt(bytes, &aad)?;
344            decode_offset_index(&plaintext)
345        }
346        None => decode_offset_index(bytes),
347    }
348}
349
350#[cfg(not(feature = "encryption"))]
351fn parse_single_offset_index(
352    bytes: &[u8],
353    _metadata: &ParquetMetaData,
354    _column: &ColumnChunkMetaData,
355    _row_group_index: usize,
356    _col_index: usize,
357) -> crate::errors::Result<OffsetIndexMetaData> {
358    decode_offset_index(bytes)
359}
360
361/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted.
362///
363/// Typically this is used to decode the metadata from the end of a parquet
364/// file. The format of `buf` is the Thrift compact binary protocol, as specified
365/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
366/// ciphers as specfied in the [Parquet Encryption Spec].
367///
368/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
369/// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
370#[cfg(feature = "encryption")]
371fn decode_metadata_with_encryption(
372    buf: &[u8],
373    encrypted_footer: bool,
374    file_decryption_properties: Option<&FileDecryptionProperties>,
375) -> crate::errors::Result<ParquetMetaData> {
376    let mut prot = TCompactSliceInputProtocol::new(buf);
377    let mut file_decryptor = None;
378    let decrypted_fmd_buf;
379
380    if encrypted_footer {
381        if let Some(file_decryption_properties) = file_decryption_properties {
382            let t_file_crypto_metadata: crate::format::FileCryptoMetaData =
383                crate::format::FileCryptoMetaData::read_from_in_protocol(&mut prot)
384                    .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
385            let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
386                EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
387                _ => Some(false),
388            }
389            .unwrap_or(false);
390            if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
391                return Err(general_err!(
392                        "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
393                        but no AAD prefix was provided in the file decryption properties"
394                    ));
395            }
396            let decryptor = get_file_decryptor(
397                t_file_crypto_metadata.encryption_algorithm,
398                t_file_crypto_metadata.key_metadata.as_deref(),
399                file_decryption_properties,
400            )?;
401            let footer_decryptor = decryptor.get_footer_decryptor();
402            let aad_footer = create_footer_aad(decryptor.file_aad())?;
403
404            decrypted_fmd_buf = footer_decryptor?
405                .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
406                .map_err(|_| {
407                    general_err!(
408                        "Provided footer key and AAD were unable to decrypt parquet footer"
409                    )
410                })?;
411            prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
412
413            file_decryptor = Some(decryptor);
414        } else {
415            return Err(general_err!(
416                "Parquet file has an encrypted footer but decryption properties were not provided"
417            ));
418        }
419    }
420
421    use crate::format::FileMetaData as TFileMetaData;
422    let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
423        .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
424    let schema = types::from_thrift(&t_file_metadata.schema)?;
425    let schema_descr = Arc::new(SchemaDescriptor::new(schema));
426
427    if let (Some(algo), Some(file_decryption_properties)) = (
428        t_file_metadata.encryption_algorithm,
429        file_decryption_properties,
430    ) {
431        // File has a plaintext footer but encryption algorithm is set
432        let file_decryptor_value = get_file_decryptor(
433            algo,
434            t_file_metadata.footer_signing_key_metadata.as_deref(),
435            file_decryption_properties,
436        )?;
437        if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
438            file_decryptor_value.verify_plaintext_footer_signature(buf)?;
439        }
440        file_decryptor = Some(file_decryptor_value);
441    }
442
443    let mut row_groups = Vec::new();
444    for rg in t_file_metadata.row_groups {
445        let r = RowGroupMetaData::from_encrypted_thrift(
446            schema_descr.clone(),
447            rg,
448            file_decryptor.as_ref(),
449        )?;
450        row_groups.push(r);
451    }
452    let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
453
454    let file_metadata = FileMetaData::new(
455        t_file_metadata.version,
456        t_file_metadata.num_rows,
457        t_file_metadata.created_by,
458        t_file_metadata.key_value_metadata,
459        schema_descr,
460        column_orders,
461    );
462    let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
463
464    metadata.with_file_decryptor(file_decryptor);
465
466    Ok(metadata)
467}
468
469#[cfg(feature = "encryption")]
470fn get_file_decryptor(
471    encryption_algorithm: EncryptionAlgorithm,
472    footer_key_metadata: Option<&[u8]>,
473    file_decryption_properties: &FileDecryptionProperties,
474) -> crate::errors::Result<FileDecryptor> {
475    match encryption_algorithm {
476        EncryptionAlgorithm::AESGCMV1(algo) => {
477            let aad_file_unique = algo
478                .aad_file_unique
479                .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
480            let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
481                aad_prefix.clone()
482            } else {
483                algo.aad_prefix.unwrap_or_default()
484            };
485
486            FileDecryptor::new(
487                file_decryption_properties,
488                footer_key_metadata,
489                aad_file_unique,
490                aad_prefix,
491            )
492        }
493        EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
494            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
495        )),
496    }
497}
498
499#[cfg(test)]
500mod test {
501    use super::*;
502    use crate::basic::{SortOrder, Type};
503    use crate::file::metadata::SchemaType;
504    use crate::format::ColumnOrder as TColumnOrder;
505    use crate::format::TypeDefinedOrder;
506    #[test]
507    fn test_metadata_column_orders_parse() {
508        // Define simple schema, we do not need to provide logical types.
509        let fields = vec![
510            Arc::new(
511                SchemaType::primitive_type_builder("col1", Type::INT32)
512                    .build()
513                    .unwrap(),
514            ),
515            Arc::new(
516                SchemaType::primitive_type_builder("col2", Type::FLOAT)
517                    .build()
518                    .unwrap(),
519            ),
520        ];
521        let schema = SchemaType::group_type_builder("schema")
522            .with_fields(fields)
523            .build()
524            .unwrap();
525        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
526
527        let t_column_orders = Some(vec![
528            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
529            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
530        ]);
531
532        assert_eq!(
533            parse_column_orders(t_column_orders, &schema_descr).unwrap(),
534            Some(vec![
535                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
536                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
537            ])
538        );
539
540        // Test when no column orders are defined.
541        assert_eq!(parse_column_orders(None, &schema_descr).unwrap(), None);
542    }
543
544    #[test]
545    fn test_metadata_column_orders_len_mismatch() {
546        let schema = SchemaType::group_type_builder("schema").build().unwrap();
547        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
548
549        let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
550
551        let res = parse_column_orders(t_column_orders, &schema_descr);
552        assert!(res.is_err());
553        assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
554    }
555}