Skip to main content

parquet/file/metadata/thrift/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module is the bridge between a Parquet file's thrift encoded metadata
19//! and this crate's [Parquet metadata API]. It contains objects and functions used
20//! to serialize/deserialize metadata objects into/from the Thrift compact protocol
21//! format as defined by the [Parquet specification].
22//!
23//! [Parquet metadata API]: crate::file::metadata
24//! [Parquet specification]: https://github.com/apache/parquet-format/tree/master
25
26use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34    column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37    basic::{
38        ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType,
39        Repetition, Type,
40    },
41    data_type::{ByteArray, FixedLenByteArray, Int96},
42    errors::{ParquetError, Result},
43    file::{
44        metadata::{
45            ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46            PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, ParquetPageEncodingStats,
47            RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
48        },
49        statistics::ValueStatistics,
50    },
51    parquet_thrift::{
52        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54        read_thrift_vec,
55    },
56    schema::types::{
57        ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58    },
59    thrift_struct,
60    util::bit_util::FromBytes,
61    write_thrift_field,
62};
63
64// this needs to be visible to the schema conversion code
65thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67  /// Data type for this field. Not set if the current element is a non-leaf node
68  1: optional Type r#type;
69  /// If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values.
70  /// Otherwise, if specified, this is the maximum bit length to store any of the values.
71  /// (e.g. a low cardinality INT col could have this set to 3).  Note that this is
72  /// in the schema, and therefore fixed for the entire file.
73  2: optional i32 type_length;
74  /// Repetition of the field. The root of the schema does not have a repetition_type.
75  /// All other nodes must have one.
76  3: optional Repetition repetition_type;
77  /// Name of the field in the schema
78  4: required string<'a> name;
79  /// Nested fields. Since thrift does not support nested fields,
80  /// the nesting is flattened to a single list by a depth-first traversal.
81  /// The children count is used to construct the nested relationship.
82  /// This field is not set when the element is a primitive type.
83  5: optional i32 num_children;
84  /// DEPRECATED: When the schema is the result of a conversion from another model.
85  /// Used to record the original type to help with cross conversion.
86  ///
87  /// This is superseded by logical_type.
88  6: optional ConvertedType converted_type;
89  /// DEPRECATED: Used when this column contains decimal data.
90  /// See the DECIMAL converted type for more details.
91  ///
92  /// This is superseded by using the DecimalType annotation in logical_type.
93  7: optional i32 scale
94  8: optional i32 precision
95  /// When the original schema supports field ids, this will save the
96  /// original field id in the parquet schema
97  9: optional i32 field_id;
98  /// The logical type of this SchemaElement
99  ///
100  /// LogicalType replaces ConvertedType, but ConvertedType is still required
101  /// for some logical types to ensure forward-compatibility in format v1.
102  10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108   1: optional binary<'a> max;
109   2: optional binary<'a> min;
110   3: optional i64 null_count;
111   4: optional i64 distinct_count;
112   5: optional binary<'a> max_value;
113   6: optional binary<'a> min_value;
114   7: optional bool is_max_value_exact;
115   8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121  1: required double xmin;
122  2: required double xmax;
123  3: required double ymin;
124  4: required double ymax;
125  5: optional double zmin;
126  6: optional double zmax;
127  7: optional double mmin;
128  8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134  1: optional BoundingBox bbox;
135  2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141   1: optional i64 unencoded_byte_array_data_bytes;
142   2: optional list<i64> repetition_level_histogram;
143   3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148    stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150    stats.map(|st| {
151        let bbox = convert_bounding_box(st.bbox);
152        let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153        Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154            bbox,
155            geospatial_types,
156        ))
157    })
158}
159
160fn convert_bounding_box(
161    bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163    bbox.map(|bb| {
164        let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165            bb.xmin.into(),
166            bb.xmax.into(),
167            bb.ymin.into(),
168            bb.ymax.into(),
169        );
170
171        newbb = match (bb.zmin, bb.zmax) {
172            (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173            // If either None or mismatch, leave it as None and don't error
174            _ => newbb,
175        };
176
177        newbb = match (bb.mmin, bb.mmax) {
178            (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179            // If either None or mismatch, leave it as None and don't error
180            _ => newbb,
181        };
182
183        newbb
184    })
185}
186
187/// Create a [`crate::file::statistics::Statistics`] from a thrift [`Statistics`] object.
188fn convert_stats(
189    column_descr: &Arc<ColumnDescriptor>,
190    thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192    use crate::file::statistics::Statistics as FStatistics;
193    Ok(match thrift_stats {
194        Some(stats) => {
195            // Generic null count.
196            let null_count = stats
197                .null_count
198                .map(|null_count| {
199                    if null_count < 0 {
200                        return Err(general_err!(
201                            "Statistics null count is negative {}",
202                            null_count
203                        ));
204                    }
205                    Ok(null_count as u64)
206                })
207                .transpose()?;
208            // Generic distinct count (count of distinct values occurring)
209            let distinct_count = stats.distinct_count.map(|value| value as u64);
210            // Whether or not statistics use deprecated min/max fields.
211            let old_format = stats.min_value.is_none() && stats.max_value.is_none();
212            // Generic min value as bytes.
213            let min = if old_format {
214                stats.min
215            } else {
216                stats.min_value
217            };
218            // Generic max value as bytes.
219            let max = if old_format {
220                stats.max
221            } else {
222                stats.max_value
223            };
224
225            fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
226                if let Some(min) = min {
227                    if min.len() < len {
228                        return Err(general_err!("Insufficient bytes to parse min statistic",));
229                    }
230                }
231                if let Some(max) = max {
232                    if max.len() < len {
233                        return Err(general_err!("Insufficient bytes to parse max statistic",));
234                    }
235                }
236                Ok(())
237            }
238
239            let physical_type = column_descr.physical_type();
240            match physical_type {
241                Type::BOOLEAN => check_len(&min, &max, 1),
242                Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
243                Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
244                Type::INT96 => check_len(&min, &max, 12),
245                _ => Ok(()),
246            }?;
247
248            // Values are encoded using PLAIN encoding definition, except that
249            // variable-length byte arrays do not include a length prefix.
250            //
251            // Instead of using actual decoder, we manually convert values.
252            let res = match physical_type {
253                Type::BOOLEAN => FStatistics::boolean(
254                    min.map(|data| data[0] != 0),
255                    max.map(|data| data[0] != 0),
256                    distinct_count,
257                    null_count,
258                    old_format,
259                ),
260                Type::INT32 => FStatistics::int32(
261                    min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
262                    max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263                    distinct_count,
264                    null_count,
265                    old_format,
266                ),
267                Type::INT64 => FStatistics::int64(
268                    min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
269                    max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270                    distinct_count,
271                    null_count,
272                    old_format,
273                ),
274                Type::INT96 => {
275                    // INT96 statistics may not be correct, because comparison is signed
276                    let min = if let Some(data) = min {
277                        assert_eq!(data.len(), 12);
278                        Some(Int96::try_from_le_slice(data)?)
279                    } else {
280                        None
281                    };
282                    let max = if let Some(data) = max {
283                        assert_eq!(data.len(), 12);
284                        Some(Int96::try_from_le_slice(data)?)
285                    } else {
286                        None
287                    };
288                    FStatistics::int96(min, max, distinct_count, null_count, old_format)
289                }
290                Type::FLOAT => FStatistics::float(
291                    min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
292                    max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293                    distinct_count,
294                    null_count,
295                    old_format,
296                ),
297                Type::DOUBLE => FStatistics::double(
298                    min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
299                    max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300                    distinct_count,
301                    null_count,
302                    old_format,
303                ),
304                Type::BYTE_ARRAY => FStatistics::ByteArray(
305                    ValueStatistics::new(
306                        min.map(ByteArray::from),
307                        max.map(ByteArray::from),
308                        distinct_count,
309                        null_count,
310                        old_format,
311                    )
312                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
313                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
314                ),
315                Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
316                    ValueStatistics::new(
317                        min.map(ByteArray::from).map(FixedLenByteArray::from),
318                        max.map(ByteArray::from).map(FixedLenByteArray::from),
319                        distinct_count,
320                        null_count,
321                        old_format,
322                    )
323                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
324                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
325                ),
326            };
327
328            Some(res)
329        }
330        None => None,
331    })
332}
333
334// bit positions for required fields in the Thrift ColumnMetaData struct
335const COL_META_TYPE: u16 = 1 << 1;
336const COL_META_ENCODINGS: u16 = 1 << 2;
337const COL_META_CODEC: u16 = 1 << 4;
338const COL_META_NUM_VALUES: u16 = 1 << 5;
339const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
340const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
341const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
342
343// a mask where all required fields' bits are set
344const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
345    | COL_META_ENCODINGS
346    | COL_META_CODEC
347    | COL_META_NUM_VALUES
348    | COL_META_TOTAL_UNCOMP_SZ
349    | COL_META_TOTAL_COMP_SZ
350    | COL_META_DATA_PAGE_OFFSET;
351
352// check mask to see if all required fields are set. return an appropriate error if
353// any are missing.
354fn validate_column_metadata(mask: u16) -> Result<()> {
355    if mask != COL_META_ALL_REQUIRED {
356        if mask & COL_META_ENCODINGS == 0 {
357            return Err(general_err!("Required field encodings is missing"));
358        }
359
360        if mask & COL_META_CODEC == 0 {
361            return Err(general_err!("Required field codec is missing"));
362        }
363        if mask & COL_META_NUM_VALUES == 0 {
364            return Err(general_err!("Required field num_values is missing"));
365        }
366        if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
367            return Err(general_err!(
368                "Required field total_uncompressed_size is missing"
369            ));
370        }
371        if mask & COL_META_TOTAL_COMP_SZ == 0 {
372            return Err(general_err!(
373                "Required field total_compressed_size is missing"
374            ));
375        }
376        if mask & COL_META_DATA_PAGE_OFFSET == 0 {
377            return Err(general_err!("Required field data_page_offset is missing"));
378        }
379    }
380
381    Ok(())
382}
383
384fn read_encoding_stats_as_mask<'a>(
385    prot: &mut ThriftSliceInputProtocol<'a>,
386) -> Result<EncodingMask> {
387    // read the vector of stats, setting mask bits for data pages
388    let mut mask = 0i32;
389    let list_ident = prot.read_list_begin()?;
390    for _ in 0..list_ident.size {
391        let pes = PageEncodingStats::read_thrift(prot)?;
392        match pes.page_type {
393            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => mask |= 1 << pes.encoding as i32,
394            _ => {}
395        }
396    }
397    EncodingMask::try_new(mask)
398}
399
400// Decode `ColumnMetaData`. Returns a mask of all required fields that were observed.
401// This mask can be passed to `validate_column_metadata`.
402fn read_column_metadata<'a>(
403    prot: &mut ThriftSliceInputProtocol<'a>,
404    column: &mut ColumnChunkMetaData,
405    col_index: usize,
406    options: Option<&ParquetMetaDataOptions>,
407) -> Result<u16> {
408    // mask for seen required fields in ColumnMetaData
409    let mut seen_mask = 0u16;
410
411    let mut skip_pes = false;
412    let mut pes_mask = true;
413    let mut skip_col_stats = false;
414    let mut skip_size_stats = false;
415
416    if let Some(opts) = options {
417        skip_pes = opts.skip_encoding_stats(col_index);
418        pes_mask = opts.encoding_stats_as_mask();
419        skip_col_stats = opts.skip_column_stats(col_index);
420        skip_size_stats = opts.skip_size_stats(col_index);
421    }
422
423    // struct ColumnMetaData {
424    //   1: required Type type
425    //   2: required list<Encoding> encodings
426    //   3: required list<string> path_in_schema
427    //   4: required CompressionCodec codec
428    //   5: required i64 num_values
429    //   6: required i64 total_uncompressed_size
430    //   7: required i64 total_compressed_size
431    //   8: optional list<KeyValue> key_value_metadata
432    //   9: required i64 data_page_offset
433    //   10: optional i64 index_page_offset
434    //   11: optional i64 dictionary_page_offset
435    //   12: optional Statistics statistics;
436    //   13: optional list<PageEncodingStats> encoding_stats;
437    //   14: optional i64 bloom_filter_offset;
438    //   15: optional i32 bloom_filter_length;
439    //   16: optional SizeStatistics size_statistics;
440    //   17: optional GeospatialStatistics geospatial_statistics;
441    // }
442    let column_descr = &column.column_descr;
443
444    let mut last_field_id = 0i16;
445    loop {
446        let field_ident = prot.read_field_begin(last_field_id)?;
447        if field_ident.field_type == FieldType::Stop {
448            break;
449        }
450        match field_ident.id {
451            // 1: type is never used, we can use the column descriptor
452            1 => {
453                // read for error handling
454                Type::read_thrift(&mut *prot)?;
455                seen_mask |= COL_META_TYPE;
456            }
457            2 => {
458                column.encodings = EncodingMask::read_thrift(&mut *prot)?;
459                seen_mask |= COL_META_ENCODINGS;
460            }
461            // 3: path_in_schema is redundant
462            4 => {
463                column.compression = Compression::read_thrift(&mut *prot)?;
464                seen_mask |= COL_META_CODEC;
465            }
466            5 => {
467                column.num_values = i64::read_thrift(&mut *prot)?;
468                seen_mask |= COL_META_NUM_VALUES;
469            }
470            6 => {
471                column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
472                seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
473            }
474            7 => {
475                column.total_compressed_size = i64::read_thrift(&mut *prot)?;
476                seen_mask |= COL_META_TOTAL_COMP_SZ;
477            }
478            // 8: we don't expose this key value
479            9 => {
480                column.data_page_offset = i64::read_thrift(&mut *prot)?;
481                seen_mask |= COL_META_DATA_PAGE_OFFSET;
482            }
483            10 => {
484                column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
485            }
486            11 => {
487                column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
488            }
489            12 if !skip_col_stats => {
490                column.statistics =
491                    convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
492            }
493            13 if !skip_pes => {
494                if pes_mask {
495                    let val = read_encoding_stats_as_mask(&mut *prot)?;
496                    column.encoding_stats = Some(ParquetPageEncodingStats::Mask(val));
497                } else {
498                    let val =
499                        read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
500                    column.encoding_stats = Some(ParquetPageEncodingStats::Full(val));
501                }
502            }
503            14 => {
504                column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
505            }
506            15 => {
507                column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
508            }
509            16 if !skip_size_stats => {
510                let val = SizeStatistics::read_thrift(&mut *prot)?;
511                column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
512                column.repetition_level_histogram =
513                    val.repetition_level_histogram.map(LevelHistogram::from);
514                column.definition_level_histogram =
515                    val.definition_level_histogram.map(LevelHistogram::from);
516            }
517            17 => {
518                let val = GeospatialStatistics::read_thrift(&mut *prot)?;
519                column.geo_statistics = convert_geo_stats(Some(val));
520            }
521            _ => {
522                prot.skip(field_ident.field_type)?;
523            }
524        };
525        last_field_id = field_ident.id;
526    }
527
528    Ok(seen_mask)
529}
530
531// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait because
532// these are all internal and operate on slices.
533fn read_column_chunk<'a>(
534    prot: &mut ThriftSliceInputProtocol<'a>,
535    column_descr: &Arc<ColumnDescriptor>,
536    col_index: usize,
537    options: Option<&ParquetMetaDataOptions>,
538) -> Result<ColumnChunkMetaData> {
539    // create a default initialized ColumnMetaData
540    let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
541
542    // seen flag for file_offset
543    let mut has_file_offset = false;
544
545    // mask of seen flags for ColumnMetaData
546    let mut col_meta_mask = 0u16;
547
548    // struct ColumnChunk {
549    //   1: optional string file_path
550    //   2: required i64 file_offset = 0
551    //   3: optional ColumnMetaData meta_data
552    //   4: optional i64 offset_index_offset
553    //   5: optional i32 offset_index_length
554    //   6: optional i64 column_index_offset
555    //   7: optional i32 column_index_length
556    //   8: optional ColumnCryptoMetaData crypto_metadata
557    //   9: optional binary encrypted_column_metadata
558    // }
559    let mut last_field_id = 0i16;
560    loop {
561        let field_ident = prot.read_field_begin(last_field_id)?;
562        if field_ident.field_type == FieldType::Stop {
563            break;
564        }
565        match field_ident.id {
566            1 => {
567                col.file_path = Some(String::read_thrift(&mut *prot)?);
568            }
569            2 => {
570                col.file_offset = i64::read_thrift(&mut *prot)?;
571                has_file_offset = true;
572            }
573            3 => {
574                col_meta_mask = read_column_metadata(&mut *prot, &mut col, col_index, options)?;
575            }
576            4 => {
577                col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
578            }
579            5 => {
580                col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
581            }
582            6 => {
583                col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
584            }
585            7 => {
586                col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
587            }
588            #[cfg(feature = "encryption")]
589            8 => {
590                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
591                col.column_crypto_metadata = Some(Box::new(val));
592            }
593            #[cfg(feature = "encryption")]
594            9 => {
595                col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
596            }
597            _ => {
598                prot.skip(field_ident.field_type)?;
599            }
600        };
601        last_field_id = field_ident.id;
602    }
603
604    // the only required field from ColumnChunk
605    if !has_file_offset {
606        return Err(general_err!("Required field file_offset is missing"));
607    };
608
609    // if encrypted just return. we'll decrypt after finishing the footer and populate the rest.
610    #[cfg(feature = "encryption")]
611    if col.encrypted_column_metadata.is_some() {
612        return Ok(col);
613    }
614
615    // not encrypted, so make sure all required fields were read
616    validate_column_metadata(col_meta_mask)?;
617
618    Ok(col)
619}
620
621fn read_row_group(
622    prot: &mut ThriftSliceInputProtocol,
623    schema_descr: &Arc<SchemaDescriptor>,
624    options: Option<&ParquetMetaDataOptions>,
625) -> Result<RowGroupMetaData> {
626    // create default initialized RowGroupMetaData
627    let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
628
629    // mask values for required fields
630    const RG_COLUMNS: u8 = 1 << 1;
631    const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
632    const RG_NUM_ROWS: u8 = 1 << 3;
633    const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
634
635    let mut mask = 0u8;
636
637    // struct RowGroup {
638    //   1: required list<ColumnChunk> columns
639    //   2: required i64 total_byte_size
640    //   3: required i64 num_rows
641    //   4: optional list<SortingColumn> sorting_columns
642    //   5: optional i64 file_offset
643    //   6: optional i64 total_compressed_size
644    //   7: optional i16 ordinal
645    // }
646    let mut last_field_id = 0i16;
647    loop {
648        let field_ident = prot.read_field_begin(last_field_id)?;
649        if field_ident.field_type == FieldType::Stop {
650            break;
651        }
652        match field_ident.id {
653            1 => {
654                let list_ident = prot.read_list_begin()?;
655                if schema_descr.num_columns() != list_ident.size as usize {
656                    return Err(general_err!(
657                        "Column count mismatch. Schema has {} columns while Row Group has {}",
658                        schema_descr.num_columns(),
659                        list_ident.size
660                    ));
661                }
662                for i in 0..list_ident.size as usize {
663                    let col = read_column_chunk(prot, &schema_descr.columns()[i], i, options)?;
664                    row_group.columns.push(col);
665                }
666                mask |= RG_COLUMNS;
667            }
668            2 => {
669                row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
670                mask |= RG_TOT_BYTE_SIZE;
671            }
672            3 => {
673                row_group.num_rows = i64::read_thrift(&mut *prot)?;
674                mask |= RG_NUM_ROWS;
675            }
676            4 => {
677                let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
678                row_group.sorting_columns = Some(val);
679            }
680            5 => {
681                row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
682            }
683            // 6: we don't expose total_compressed_size
684            7 => {
685                row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
686            }
687            _ => {
688                prot.skip(field_ident.field_type)?;
689            }
690        };
691        last_field_id = field_ident.id;
692    }
693
694    if mask != RG_ALL_REQUIRED {
695        if mask & RG_COLUMNS == 0 {
696            return Err(general_err!("Required field columns is missing"));
697        }
698        if mask & RG_TOT_BYTE_SIZE == 0 {
699            return Err(general_err!("Required field total_byte_size is missing"));
700        }
701        if mask & RG_NUM_ROWS == 0 {
702            return Err(general_err!("Required field num_rows is missing"));
703        }
704    }
705
706    Ok(row_group)
707}
708
709/// Create a [`SchemaDescriptor`] from thrift input. The input buffer must contain a complete
710/// Parquet footer.
711pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
712    let mut prot = ThriftSliceInputProtocol::new(buf);
713
714    let mut last_field_id = 0i16;
715    loop {
716        let field_ident = prot.read_field_begin(last_field_id)?;
717        if field_ident.field_type == FieldType::Stop {
718            break;
719        }
720        match field_ident.id {
721            2 => {
722                // read schema and convert to SchemaDescriptor for use when reading row groups
723                let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
724                let val = parquet_schema_from_array(val)?;
725                return Ok(SchemaDescriptor::new(val));
726            }
727            _ => prot.skip(field_ident.field_type)?,
728        }
729        last_field_id = field_ident.id;
730    }
731    Err(general_err!("Input does not contain a schema"))
732}
733
734/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes the file metadata in
735/// the Parquet footer. Page indexes will need to be added later.
736pub(crate) fn parquet_metadata_from_bytes(
737    buf: &[u8],
738    options: Option<&ParquetMetaDataOptions>,
739) -> Result<ParquetMetaData> {
740    let mut prot = ThriftSliceInputProtocol::new(buf);
741
742    // begin reading the file metadata
743    let mut version: Option<i32> = None;
744    let mut num_rows: Option<i64> = None;
745    let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
746    let mut key_value_metadata: Option<Vec<KeyValue>> = None;
747    let mut created_by: Option<&str> = None;
748    let mut column_orders: Option<Vec<ColumnOrder>> = None;
749    #[cfg(feature = "encryption")]
750    let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
751    #[cfg(feature = "encryption")]
752    let mut footer_signing_key_metadata: Option<&[u8]> = None;
753
754    // this will need to be set before parsing row groups
755    let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
756
757    // see if we already have a schema.
758    if let Some(options) = options {
759        schema_descr = options.schema().cloned();
760    }
761
762    // struct FileMetaData {
763    //   1: required i32 version
764    //   2: required list<SchemaElement> schema;
765    //   3: required i64 num_rows
766    //   4: required list<RowGroup> row_groups
767    //   5: optional list<KeyValue> key_value_metadata
768    //   6: optional string created_by
769    //   7: optional list<ColumnOrder> column_orders;
770    //   8: optional EncryptionAlgorithm encryption_algorithm
771    //   9: optional binary footer_signing_key_metadata
772    // }
773    let mut last_field_id = 0i16;
774    loop {
775        let field_ident = prot.read_field_begin(last_field_id)?;
776        if field_ident.field_type == FieldType::Stop {
777            break;
778        }
779        match field_ident.id {
780            1 => {
781                version = Some(i32::read_thrift(&mut prot)?);
782            }
783            2 => {
784                // If schema was passed in, skip parsing it
785                if schema_descr.is_some() {
786                    prot.skip(field_ident.field_type)?;
787                } else {
788                    // read schema and convert to SchemaDescriptor for use when reading row groups
789                    let val =
790                        read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
791                    let val = parquet_schema_from_array(val)?;
792                    schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
793                }
794            }
795            3 => {
796                num_rows = Some(i64::read_thrift(&mut prot)?);
797            }
798            4 => {
799                if schema_descr.is_none() {
800                    return Err(general_err!("Required field schema is missing"));
801                }
802                let schema_descr = schema_descr.as_ref().unwrap();
803                let list_ident = prot.read_list_begin()?;
804                let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
805
806                // Read row groups and handle ordinal assignment
807                let mut assigner = OrdinalAssigner::new();
808                for ordinal in 0..list_ident.size {
809                    let ordinal: i16 = ordinal.try_into().map_err(|_| {
810                        ParquetError::General(format!(
811                            "Row group ordinal {ordinal} exceeds i16 max value",
812                        ))
813                    })?;
814                    let rg = read_row_group(&mut prot, schema_descr, options)?;
815                    rg_vec.push(assigner.ensure(ordinal, rg)?);
816                }
817                row_groups = Some(rg_vec);
818            }
819            5 => {
820                let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
821                key_value_metadata = Some(val);
822            }
823            6 => {
824                created_by = Some(<&str>::read_thrift(&mut prot)?);
825            }
826            7 => {
827                let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
828                column_orders = Some(val);
829            }
830            #[cfg(feature = "encryption")]
831            8 => {
832                let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
833                encryption_algorithm = Some(val);
834            }
835            #[cfg(feature = "encryption")]
836            9 => {
837                footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
838            }
839            _ => {
840                prot.skip(field_ident.field_type)?;
841            }
842        };
843        last_field_id = field_ident.id;
844    }
845    let Some(version) = version else {
846        return Err(general_err!("Required field version is missing"));
847    };
848    let Some(num_rows) = num_rows else {
849        return Err(general_err!("Required field num_rows is missing"));
850    };
851    let Some(row_groups) = row_groups else {
852        return Err(general_err!("Required field row_groups is missing"));
853    };
854
855    let created_by = created_by.map(|c| c.to_owned());
856
857    // we've tested for `None` by now so this is safe
858    let schema_descr = schema_descr.unwrap();
859
860    // need to map read column orders to actual values based on the schema
861    if column_orders
862        .as_ref()
863        .is_some_and(|cos| cos.len() != schema_descr.num_columns())
864    {
865        return Err(general_err!("Column order length mismatch"));
866    }
867    // replace default type defined column orders with ones having the correct sort order
868    // TODO(ets): this could instead be done above when decoding
869    let column_orders = column_orders.map(|mut cos| {
870        for (i, column) in schema_descr.columns().iter().enumerate() {
871            if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
872                let sort_order = ColumnOrder::sort_order_for_type(
873                    column.logical_type_ref(),
874                    column.converted_type(),
875                    column.physical_type(),
876                );
877                cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
878            }
879        }
880        cos
881    });
882
883    #[cfg(not(feature = "encryption"))]
884    let fmd = crate::file::metadata::FileMetaData::new(
885        version,
886        num_rows,
887        created_by,
888        key_value_metadata,
889        schema_descr,
890        column_orders,
891    );
892    #[cfg(feature = "encryption")]
893    let fmd = crate::file::metadata::FileMetaData::new(
894        version,
895        num_rows,
896        created_by,
897        key_value_metadata,
898        schema_descr,
899        column_orders,
900    )
901    .with_encryption_algorithm(encryption_algorithm)
902    .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
903
904    Ok(ParquetMetaData::new(fmd, row_groups))
905}
906
907/// Assign [`RowGroupMetaData::ordinal`]  if it is missing.
908#[derive(Debug, Default)]
909pub(crate) struct OrdinalAssigner {
910    first_has_ordinal: Option<bool>,
911}
912
913impl OrdinalAssigner {
914    fn new() -> Self {
915        Default::default()
916    }
917
918    /// Sets [`RowGroupMetaData::ordinal`] if it is missing.
919    ///
920    /// # Arguments
921    /// - actual_ordinal: The ordinal (index) of the row group being processed
922    ///   in the file metadata.
923    /// - rg: The [`RowGroupMetaData`] to potentially modify.
924    ///
925    /// Ensures:
926    /// 1. If the first row group has an ordinal, all subsequent row groups must
927    ///    also have ordinals.
928    /// 2. If the first row group does NOT have an ordinal, all subsequent row
929    ///    groups must also not have ordinals.
930    fn ensure(
931        &mut self,
932        actual_ordinal: i16,
933        mut rg: RowGroupMetaData,
934    ) -> Result<RowGroupMetaData> {
935        let rg_has_ordinal = rg.ordinal.is_some();
936
937        // Only set first_has_ordinal if it's None (first row group that arrives)
938        if self.first_has_ordinal.is_none() {
939            self.first_has_ordinal = Some(rg_has_ordinal);
940        }
941
942        // assign ordinal if missing and consistent with first row group
943        let first_has_ordinal = self.first_has_ordinal.unwrap();
944        if !first_has_ordinal && !rg_has_ordinal {
945            rg.ordinal = Some(actual_ordinal);
946        } else if first_has_ordinal != rg_has_ordinal {
947            return Err(general_err!(
948                "Inconsistent ordinal assignment: first_has_ordinal is set to \
949                {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
950                first_has_ordinal,
951                actual_ordinal,
952                rg_has_ordinal
953            ));
954        }
955        Ok(rg)
956    }
957}
958
959thrift_struct!(
960    pub(crate) struct IndexPageHeader {}
961);
962
963thrift_struct!(
964pub(crate) struct DictionaryPageHeader {
965  /// Number of values in the dictionary
966  1: required i32 num_values;
967
968  /// Encoding using this dictionary page
969  2: required Encoding encoding
970
971  /// If true, the entries in the dictionary are sorted in ascending order
972  3: optional bool is_sorted;
973}
974);
975
976thrift_struct!(
977/// Statistics for the page header.
978///
979/// This is a duplicate of the [`Statistics`] struct above. Because the page reader uses
980/// the [`Read`] API, we cannot read the min/max values as slices. This should not be
981/// a huge problem since this crate no longer reads the page header statistics by default.
982///
983/// [`Read`]: crate::parquet_thrift::ThriftReadInputProtocol
984pub(crate) struct PageStatistics {
985   1: optional binary max;
986   2: optional binary min;
987   3: optional i64 null_count;
988   4: optional i64 distinct_count;
989   5: optional binary max_value;
990   6: optional binary min_value;
991   7: optional bool is_max_value_exact;
992   8: optional bool is_min_value_exact;
993}
994);
995
996thrift_struct!(
997pub(crate) struct DataPageHeader {
998  1: required i32 num_values
999  2: required Encoding encoding
1000  3: required Encoding definition_level_encoding;
1001  4: required Encoding repetition_level_encoding;
1002  5: optional PageStatistics statistics;
1003}
1004);
1005
1006impl DataPageHeader {
1007    // reader that skips decoding page statistics
1008    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1009    where
1010        R: ThriftCompactInputProtocol<'a>,
1011    {
1012        let mut num_values: Option<i32> = None;
1013        let mut encoding: Option<Encoding> = None;
1014        let mut definition_level_encoding: Option<Encoding> = None;
1015        let mut repetition_level_encoding: Option<Encoding> = None;
1016        let statistics: Option<PageStatistics> = None;
1017        let mut last_field_id = 0i16;
1018        loop {
1019            let field_ident = prot.read_field_begin(last_field_id)?;
1020            if field_ident.field_type == FieldType::Stop {
1021                break;
1022            }
1023            match field_ident.id {
1024                1 => {
1025                    let val = i32::read_thrift(&mut *prot)?;
1026                    num_values = Some(val);
1027                }
1028                2 => {
1029                    let val = Encoding::read_thrift(&mut *prot)?;
1030                    encoding = Some(val);
1031                }
1032                3 => {
1033                    let val = Encoding::read_thrift(&mut *prot)?;
1034                    definition_level_encoding = Some(val);
1035                }
1036                4 => {
1037                    let val = Encoding::read_thrift(&mut *prot)?;
1038                    repetition_level_encoding = Some(val);
1039                }
1040                _ => {
1041                    prot.skip(field_ident.field_type)?;
1042                }
1043            };
1044            last_field_id = field_ident.id;
1045        }
1046        let Some(num_values) = num_values else {
1047            return Err(general_err!("Required field num_values is missing"));
1048        };
1049        let Some(encoding) = encoding else {
1050            return Err(general_err!("Required field encoding is missing"));
1051        };
1052        let Some(definition_level_encoding) = definition_level_encoding else {
1053            return Err(general_err!(
1054                "Required field definition_level_encoding is missing"
1055            ));
1056        };
1057        let Some(repetition_level_encoding) = repetition_level_encoding else {
1058            return Err(general_err!(
1059                "Required field repetition_level_encoding is missing"
1060            ));
1061        };
1062        Ok(Self {
1063            num_values,
1064            encoding,
1065            definition_level_encoding,
1066            repetition_level_encoding,
1067            statistics,
1068        })
1069    }
1070}
1071
1072thrift_struct!(
1073pub(crate) struct DataPageHeaderV2 {
1074  1: required i32 num_values
1075  2: required i32 num_nulls
1076  3: required i32 num_rows
1077  4: required Encoding encoding
1078  5: required i32 definition_levels_byte_length;
1079  6: required i32 repetition_levels_byte_length;
1080  7: optional bool is_compressed = true;
1081  8: optional PageStatistics statistics;
1082}
1083);
1084
1085impl DataPageHeaderV2 {
1086    // reader that skips decoding page statistics
1087    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1088    where
1089        R: ThriftCompactInputProtocol<'a>,
1090    {
1091        let mut num_values: Option<i32> = None;
1092        let mut num_nulls: Option<i32> = None;
1093        let mut num_rows: Option<i32> = None;
1094        let mut encoding: Option<Encoding> = None;
1095        let mut definition_levels_byte_length: Option<i32> = None;
1096        let mut repetition_levels_byte_length: Option<i32> = None;
1097        let mut is_compressed: Option<bool> = None;
1098        let statistics: Option<PageStatistics> = None;
1099        let mut last_field_id = 0i16;
1100        loop {
1101            let field_ident = prot.read_field_begin(last_field_id)?;
1102            if field_ident.field_type == FieldType::Stop {
1103                break;
1104            }
1105            match field_ident.id {
1106                1 => {
1107                    let val = i32::read_thrift(&mut *prot)?;
1108                    num_values = Some(val);
1109                }
1110                2 => {
1111                    let val = i32::read_thrift(&mut *prot)?;
1112                    num_nulls = Some(val);
1113                }
1114                3 => {
1115                    let val = i32::read_thrift(&mut *prot)?;
1116                    num_rows = Some(val);
1117                }
1118                4 => {
1119                    let val = Encoding::read_thrift(&mut *prot)?;
1120                    encoding = Some(val);
1121                }
1122                5 => {
1123                    let val = i32::read_thrift(&mut *prot)?;
1124                    definition_levels_byte_length = Some(val);
1125                }
1126                6 => {
1127                    let val = i32::read_thrift(&mut *prot)?;
1128                    repetition_levels_byte_length = Some(val);
1129                }
1130                7 => {
1131                    let val = field_ident.bool_val.unwrap();
1132                    is_compressed = Some(val);
1133                }
1134                _ => {
1135                    prot.skip(field_ident.field_type)?;
1136                }
1137            };
1138            last_field_id = field_ident.id;
1139        }
1140        let Some(num_values) = num_values else {
1141            return Err(general_err!("Required field num_values is missing"));
1142        };
1143        let Some(num_nulls) = num_nulls else {
1144            return Err(general_err!("Required field num_nulls is missing"));
1145        };
1146        let Some(num_rows) = num_rows else {
1147            return Err(general_err!("Required field num_rows is missing"));
1148        };
1149        let Some(encoding) = encoding else {
1150            return Err(general_err!("Required field encoding is missing"));
1151        };
1152        let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1153            return Err(general_err!(
1154                "Required field definition_levels_byte_length is missing"
1155            ));
1156        };
1157        let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1158            return Err(general_err!(
1159                "Required field repetition_levels_byte_length is missing"
1160            ));
1161        };
1162        Ok(Self {
1163            num_values,
1164            num_nulls,
1165            num_rows,
1166            encoding,
1167            definition_levels_byte_length,
1168            repetition_levels_byte_length,
1169            is_compressed,
1170            statistics,
1171        })
1172    }
1173}
1174
1175thrift_struct!(
1176pub(crate) struct PageHeader {
1177  /// the type of the page: indicates which of the *_header fields is set
1178  1: required PageType r#type
1179
1180  /// Uncompressed page size in bytes (not including this header)
1181  2: required i32 uncompressed_page_size
1182
1183  /// Compressed (and potentially encrypted) page size in bytes, not including this header
1184  3: required i32 compressed_page_size
1185
1186  /// The 32-bit CRC checksum for the page, to be be calculated as follows:
1187  4: optional i32 crc
1188
1189  // Headers for page specific data.  One only will be set.
1190  5: optional DataPageHeader data_page_header;
1191  6: optional IndexPageHeader index_page_header;
1192  7: optional DictionaryPageHeader dictionary_page_header;
1193  8: optional DataPageHeaderV2 data_page_header_v2;
1194}
1195);
1196
1197impl PageHeader {
1198    // reader that skips reading page statistics. obtained by running
1199    // `cargo expand -p parquet --all-features --lib file::metadata::thrift`
1200    // and modifying the impl of `read_thrift`
1201    pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1202    where
1203        R: ThriftCompactInputProtocol<'a>,
1204    {
1205        let mut type_: Option<PageType> = None;
1206        let mut uncompressed_page_size: Option<i32> = None;
1207        let mut compressed_page_size: Option<i32> = None;
1208        let mut crc: Option<i32> = None;
1209        let mut data_page_header: Option<DataPageHeader> = None;
1210        let mut index_page_header: Option<IndexPageHeader> = None;
1211        let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1212        let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1213        let mut last_field_id = 0i16;
1214        loop {
1215            let field_ident = prot.read_field_begin(last_field_id)?;
1216            if field_ident.field_type == FieldType::Stop {
1217                break;
1218            }
1219            match field_ident.id {
1220                1 => {
1221                    let val = PageType::read_thrift(&mut *prot)?;
1222                    type_ = Some(val);
1223                }
1224                2 => {
1225                    let val = i32::read_thrift(&mut *prot)?;
1226                    uncompressed_page_size = Some(val);
1227                }
1228                3 => {
1229                    let val = i32::read_thrift(&mut *prot)?;
1230                    compressed_page_size = Some(val);
1231                }
1232                4 => {
1233                    let val = i32::read_thrift(&mut *prot)?;
1234                    crc = Some(val);
1235                }
1236                5 => {
1237                    let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1238                    data_page_header = Some(val);
1239                }
1240                6 => {
1241                    let val = IndexPageHeader::read_thrift(&mut *prot)?;
1242                    index_page_header = Some(val);
1243                }
1244                7 => {
1245                    let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1246                    dictionary_page_header = Some(val);
1247                }
1248                8 => {
1249                    let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1250                    data_page_header_v2 = Some(val);
1251                }
1252                _ => {
1253                    prot.skip(field_ident.field_type)?;
1254                }
1255            };
1256            last_field_id = field_ident.id;
1257        }
1258        let Some(type_) = type_ else {
1259            return Err(general_err!("Required field type_ is missing"));
1260        };
1261        let Some(uncompressed_page_size) = uncompressed_page_size else {
1262            return Err(general_err!(
1263                "Required field uncompressed_page_size is missing"
1264            ));
1265        };
1266        let Some(compressed_page_size) = compressed_page_size else {
1267            return Err(general_err!(
1268                "Required field compressed_page_size is missing"
1269            ));
1270        };
1271        Ok(Self {
1272            r#type: type_,
1273            uncompressed_page_size,
1274            compressed_page_size,
1275            crc,
1276            data_page_header,
1277            index_page_header,
1278            dictionary_page_header,
1279            data_page_header_v2,
1280        })
1281    }
1282}
1283
1284/////////////////////////////////////////////////
1285// helper functions for writing file meta data
1286
1287#[cfg(feature = "encryption")]
1288fn should_write_column_stats(column_chunk: &ColumnChunkMetaData) -> bool {
1289    // If there is encrypted column metadata present,
1290    // the column is encrypted with a different key to the footer or a plaintext footer is used,
1291    // so the statistics are sensitive and shouldn't be written.
1292    column_chunk.encrypted_column_metadata.is_none()
1293}
1294
1295#[cfg(not(feature = "encryption"))]
1296fn should_write_column_stats(_column_chunk: &ColumnChunkMetaData) -> bool {
1297    true
1298}
1299
1300// serialize the bits of the column chunk needed for a thrift ColumnMetaData
1301// struct ColumnMetaData {
1302//   1: required Type type
1303//   2: required list<Encoding> encodings
1304//   3: required list<string> path_in_schema
1305//   4: required CompressionCodec codec
1306//   5: required i64 num_values
1307//   6: required i64 total_uncompressed_size
1308//   7: required i64 total_compressed_size
1309//   8: optional list<KeyValue> key_value_metadata
1310//   9: required i64 data_page_offset
1311//   10: optional i64 index_page_offset
1312//   11: optional i64 dictionary_page_offset
1313//   12: optional Statistics statistics;
1314//   13: optional list<PageEncodingStats> encoding_stats;
1315//   14: optional i64 bloom_filter_offset;
1316//   15: optional i32 bloom_filter_length;
1317//   16: optional SizeStatistics size_statistics;
1318//   17: optional GeospatialStatistics geospatial_statistics;
1319// }
1320pub(super) fn serialize_column_meta_data<W: Write>(
1321    column_chunk: &ColumnChunkMetaData,
1322    w: &mut ThriftCompactOutputProtocol<W>,
1323) -> Result<()> {
1324    use crate::file::statistics::page_stats_to_thrift;
1325
1326    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1327    column_chunk
1328        .encodings()
1329        .collect::<Vec<_>>()
1330        .write_thrift_field(w, 2, 1)?;
1331    let path = column_chunk.column_descr.path().parts();
1332    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1333    path.write_thrift_field(w, 3, 2)?;
1334    column_chunk.compression.write_thrift_field(w, 4, 3)?;
1335    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1336    column_chunk
1337        .total_uncompressed_size
1338        .write_thrift_field(w, 6, 5)?;
1339    column_chunk
1340        .total_compressed_size
1341        .write_thrift_field(w, 7, 6)?;
1342    // no key_value_metadata here
1343    let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1344    if let Some(index_page_offset) = column_chunk.index_page_offset {
1345        last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1346    }
1347    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1348        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1349    }
1350
1351    if should_write_column_stats(column_chunk) {
1352        // PageStatistics is the same as thrift Statistics, but writable
1353        let stats = page_stats_to_thrift(column_chunk.statistics());
1354        if let Some(stats) = stats {
1355            last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1356        }
1357        if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1358            last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1359        }
1360        if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1361            last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1362        }
1363        if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1364            last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1365        }
1366
1367        // SizeStatistics
1368        let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1369            || column_chunk.repetition_level_histogram.is_some()
1370            || column_chunk.definition_level_histogram.is_some()
1371        {
1372            let repetition_level_histogram = column_chunk
1373                .repetition_level_histogram()
1374                .map(|hist| hist.clone().into_inner());
1375
1376            let definition_level_histogram = column_chunk
1377                .definition_level_histogram()
1378                .map(|hist| hist.clone().into_inner());
1379
1380            Some(SizeStatistics {
1381                unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1382                repetition_level_histogram,
1383                definition_level_histogram,
1384            })
1385        } else {
1386            None
1387        };
1388        if let Some(size_stats) = size_stats {
1389            last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1390        }
1391
1392        if let Some(geo_stats) = column_chunk.geo_statistics() {
1393            geo_stats.write_thrift_field(w, 17, last_field_id)?;
1394        }
1395    }
1396
1397    w.write_struct_end()
1398}
1399
1400// temp struct used for writing
1401pub(super) struct FileMeta<'a> {
1402    pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1403    pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1404}
1405
1406// struct FileMetaData {
1407//   1: required i32 version
1408//   2: required list<SchemaElement> schema;
1409//   3: required i64 num_rows
1410//   4: required list<RowGroup> row_groups
1411//   5: optional list<KeyValue> key_value_metadata
1412//   6: optional string created_by
1413//   7: optional list<ColumnOrder> column_orders;
1414//   8: optional EncryptionAlgorithm encryption_algorithm
1415//   9: optional binary footer_signing_key_metadata
1416// }
1417impl<'a> WriteThrift for FileMeta<'a> {
1418    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1419
1420    // needed for last_field_id w/o encryption
1421    #[allow(unused_assignments)]
1422    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1423        self.file_metadata
1424            .version
1425            .write_thrift_field(writer, 1, 0)?;
1426
1427        // field 2 is schema. do depth-first traversal of tree, converting to SchemaElement and
1428        // writing along the way.
1429        let root = self.file_metadata.schema_descr().root_schema_ptr();
1430        let schema_len = num_nodes(&root)?;
1431        writer.write_field_begin(FieldType::List, 2, 1)?;
1432        writer.write_list_begin(ElementType::Struct, schema_len)?;
1433        // recursively write Type nodes as SchemaElements
1434        write_schema(&root, writer)?;
1435
1436        self.file_metadata
1437            .num_rows
1438            .write_thrift_field(writer, 3, 2)?;
1439
1440        // this will call RowGroupMetaData::write_thrift
1441        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1442
1443        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1444            last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1445        }
1446        if let Some(created_by) = self.file_metadata.created_by() {
1447            last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1448        }
1449        if let Some(column_orders) = self.file_metadata.column_orders() {
1450            last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1451        }
1452        #[cfg(feature = "encryption")]
1453        if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1454            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1455        }
1456        #[cfg(feature = "encryption")]
1457        if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1458            key.as_slice()
1459                .write_thrift_field(writer, 9, last_field_id)?;
1460        }
1461
1462        writer.write_struct_end()
1463    }
1464}
1465
1466fn write_schema<W: Write>(
1467    schema: &TypePtr,
1468    writer: &mut ThriftCompactOutputProtocol<W>,
1469) -> Result<()> {
1470    if !schema.is_group() {
1471        return Err(general_err!("Root schema must be Group type"));
1472    }
1473    write_schema_helper(schema, writer)
1474}
1475
1476fn write_schema_helper<W: Write>(
1477    node: &TypePtr,
1478    writer: &mut ThriftCompactOutputProtocol<W>,
1479) -> Result<()> {
1480    match node.as_ref() {
1481        crate::schema::types::Type::PrimitiveType {
1482            basic_info,
1483            physical_type,
1484            type_length,
1485            scale,
1486            precision,
1487        } => {
1488            let element = SchemaElement {
1489                r#type: Some(*physical_type),
1490                type_length: if *type_length >= 0 {
1491                    Some(*type_length)
1492                } else {
1493                    None
1494                },
1495                repetition_type: Some(basic_info.repetition()),
1496                name: basic_info.name(),
1497                num_children: None,
1498                converted_type: match basic_info.converted_type() {
1499                    ConvertedType::NONE => None,
1500                    other => Some(other),
1501                },
1502                scale: if *scale >= 0 { Some(*scale) } else { None },
1503                precision: if *precision >= 0 {
1504                    Some(*precision)
1505                } else {
1506                    None
1507                },
1508                field_id: if basic_info.has_id() {
1509                    Some(basic_info.id())
1510                } else {
1511                    None
1512                },
1513                logical_type: basic_info.logical_type_ref().cloned(),
1514            };
1515            element.write_thrift(writer)
1516        }
1517        crate::schema::types::Type::GroupType { basic_info, fields } => {
1518            let repetition = if basic_info.has_repetition() {
1519                Some(basic_info.repetition())
1520            } else {
1521                None
1522            };
1523
1524            let element = SchemaElement {
1525                r#type: None,
1526                type_length: None,
1527                repetition_type: repetition,
1528                name: basic_info.name(),
1529                num_children: Some(fields.len().try_into()?),
1530                converted_type: match basic_info.converted_type() {
1531                    ConvertedType::NONE => None,
1532                    other => Some(other),
1533                },
1534                scale: None,
1535                precision: None,
1536                field_id: if basic_info.has_id() {
1537                    Some(basic_info.id())
1538                } else {
1539                    None
1540                },
1541                logical_type: basic_info.logical_type_ref().cloned(),
1542            };
1543
1544            element.write_thrift(writer)?;
1545
1546            // Add child elements for a group
1547            for field in fields {
1548                write_schema_helper(field, writer)?;
1549            }
1550            Ok(())
1551        }
1552    }
1553}
1554
1555// struct RowGroup {
1556//   1: required list<ColumnChunk> columns
1557//   2: required i64 total_byte_size
1558//   3: required i64 num_rows
1559//   4: optional list<SortingColumn> sorting_columns
1560//   5: optional i64 file_offset
1561//   6: optional i64 total_compressed_size
1562//   7: optional i16 ordinal
1563// }
1564impl WriteThrift for RowGroupMetaData {
1565    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1566
1567    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1568        // this will call ColumnChunkMetaData::write_thrift
1569        self.columns.write_thrift_field(writer, 1, 0)?;
1570        self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1571        let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1572        if let Some(sorting_columns) = self.sorting_columns() {
1573            last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1574        }
1575        if let Some(file_offset) = self.file_offset() {
1576            last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1577        }
1578        // this is optional, but we'll always write it
1579        last_field_id = self
1580            .compressed_size()
1581            .write_thrift_field(writer, 6, last_field_id)?;
1582        if let Some(ordinal) = self.ordinal() {
1583            ordinal.write_thrift_field(writer, 7, last_field_id)?;
1584        }
1585        writer.write_struct_end()
1586    }
1587}
1588
1589// struct ColumnChunk {
1590//   1: optional string file_path
1591//   2: required i64 file_offset = 0
1592//   3: optional ColumnMetaData meta_data
1593//   4: optional i64 offset_index_offset
1594//   5: optional i32 offset_index_length
1595//   6: optional i64 column_index_offset
1596//   7: optional i32 column_index_length
1597//   8: optional ColumnCryptoMetaData crypto_metadata
1598//   9: optional binary encrypted_column_metadata
1599// }
1600impl WriteThrift for ColumnChunkMetaData {
1601    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1602
1603    #[allow(unused_assignments)]
1604    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1605        let mut last_field_id = 0i16;
1606        if let Some(file_path) = self.file_path() {
1607            last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1608        }
1609        last_field_id = self
1610            .file_offset()
1611            .write_thrift_field(writer, 2, last_field_id)?;
1612
1613        #[cfg(feature = "encryption")]
1614        let write_meta_data =
1615            self.encrypted_column_metadata.is_none() || self.plaintext_footer_mode;
1616        #[cfg(not(feature = "encryption"))]
1617        let write_meta_data = true;
1618
1619        // When the footer is encrypted and encrypted_column_metadata is present,
1620        // skip writing the plaintext meta_data field to reduce footer size.
1621        // When the footer is plaintext (plaintext_footer_mode=true), we still write
1622        // meta_data for backward compatibility with readers that expect it, but with
1623        // sensitive fields (statistics, bloom filter info, etc.) stripped out.
1624        if write_meta_data {
1625            writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1626            serialize_column_meta_data(self, writer)?;
1627            last_field_id = 3;
1628        }
1629
1630        if let Some(offset_idx_off) = self.offset_index_offset() {
1631            last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1632        }
1633        if let Some(offset_idx_len) = self.offset_index_length() {
1634            last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1635        }
1636        if let Some(column_idx_off) = self.column_index_offset() {
1637            last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1638        }
1639        if let Some(column_idx_len) = self.column_index_length() {
1640            last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1641        }
1642        #[cfg(feature = "encryption")]
1643        {
1644            if let Some(crypto_metadata) = self.crypto_metadata() {
1645                last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1646            }
1647            if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1648                encrypted_meta
1649                    .as_slice()
1650                    .write_thrift_field(writer, 9, last_field_id)?;
1651            }
1652        }
1653
1654        writer.write_struct_end()
1655    }
1656}
1657
1658// struct GeospatialStatistics {
1659//   1: optional BoundingBox bbox;
1660//   2: optional list<i32> geospatial_types;
1661// }
1662impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1663    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1664
1665    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1666        let mut last_field_id = 0i16;
1667        if let Some(bbox) = self.bounding_box() {
1668            last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1669        }
1670        if let Some(geo_types) = self.geospatial_types() {
1671            geo_types.write_thrift_field(writer, 2, last_field_id)?;
1672        }
1673
1674        writer.write_struct_end()
1675    }
1676}
1677
1678// macro cannot handle qualified names
1679use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1680write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1681
1682// struct BoundingBox {
1683//   1: required double xmin;
1684//   2: required double xmax;
1685//   3: required double ymin;
1686//   4: required double ymax;
1687//   5: optional double zmin;
1688//   6: optional double zmax;
1689//   7: optional double mmin;
1690//   8: optional double mmax;
1691// }
1692impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1693    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1694
1695    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1696        self.get_xmin().write_thrift_field(writer, 1, 0)?;
1697        self.get_xmax().write_thrift_field(writer, 2, 1)?;
1698        self.get_ymin().write_thrift_field(writer, 3, 2)?;
1699        let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1700
1701        if let Some(zmin) = self.get_zmin() {
1702            last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1703        }
1704        if let Some(zmax) = self.get_zmax() {
1705            last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1706        }
1707        if let Some(mmin) = self.get_mmin() {
1708            last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1709        }
1710        if let Some(mmax) = self.get_mmax() {
1711            mmax.write_thrift_field(writer, 8, last_field_id)?;
1712        }
1713
1714        writer.write_struct_end()
1715    }
1716}
1717
1718// macro cannot handle qualified names
1719use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1720write_thrift_field!(RustBoundingBox, FieldType::Struct);
1721
1722#[cfg(test)]
1723pub(crate) mod tests {
1724    use crate::basic::Type as PhysicalType;
1725    use crate::errors::Result;
1726    use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema};
1727    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
1728    use crate::parquet_thrift::tests::test_roundtrip;
1729    use crate::parquet_thrift::{
1730        ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec,
1731    };
1732    use crate::schema::types::{
1733        ColumnDescriptor, ColumnPath, SchemaDescriptor, TypePtr, num_nodes,
1734        parquet_schema_from_array,
1735    };
1736    use std::sync::Arc;
1737
1738    // for testing. decode thrift encoded RowGroup
1739    pub(crate) fn read_row_group(
1740        buf: &mut [u8],
1741        schema_descr: Arc<SchemaDescriptor>,
1742    ) -> Result<RowGroupMetaData> {
1743        let mut reader = ThriftSliceInputProtocol::new(buf);
1744        crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None)
1745    }
1746
1747    pub(crate) fn read_column_chunk(
1748        buf: &mut [u8],
1749        column_descr: Arc<ColumnDescriptor>,
1750    ) -> Result<ColumnChunkMetaData> {
1751        read_column_chunk_with_options(buf, column_descr, None)
1752    }
1753
1754    pub(crate) fn read_column_chunk_with_options(
1755        buf: &mut [u8],
1756        column_descr: Arc<ColumnDescriptor>,
1757        options: Option<&ParquetMetaDataOptions>,
1758    ) -> Result<ColumnChunkMetaData> {
1759        let mut reader = ThriftSliceInputProtocol::new(buf);
1760        crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
1761    }
1762
1763    pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1764        let num_nodes = num_nodes(&schema)?;
1765        let mut buf = Vec::new();
1766        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1767
1768        // kick off writing list
1769        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1770
1771        // write SchemaElements
1772        write_schema(&schema, &mut writer)?;
1773
1774        let mut prot = ThriftSliceInputProtocol::new(&buf);
1775        let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1776        parquet_schema_from_array(se)
1777    }
1778
1779    pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1780        let num_nodes = num_nodes(schema)?;
1781        let mut buf = Vec::new();
1782        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1783
1784        // kick off writing list
1785        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1786
1787        // write SchemaElements
1788        write_schema(schema, &mut writer)?;
1789        Ok(buf)
1790    }
1791
1792    pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1793        let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1794        read_thrift_vec(&mut prot)
1795    }
1796
1797    #[test]
1798    fn test_bounding_box_roundtrip() {
1799        test_roundtrip(BoundingBox {
1800            xmin: 0.1.into(),
1801            xmax: 10.3.into(),
1802            ymin: 0.001.into(),
1803            ymax: 128.5.into(),
1804            zmin: None,
1805            zmax: None,
1806            mmin: None,
1807            mmax: None,
1808        });
1809
1810        test_roundtrip(BoundingBox {
1811            xmin: 0.1.into(),
1812            xmax: 10.3.into(),
1813            ymin: 0.001.into(),
1814            ymax: 128.5.into(),
1815            zmin: Some(11.0.into()),
1816            zmax: Some(1300.0.into()),
1817            mmin: None,
1818            mmax: None,
1819        });
1820
1821        test_roundtrip(BoundingBox {
1822            xmin: 0.1.into(),
1823            xmax: 10.3.into(),
1824            ymin: 0.001.into(),
1825            ymax: 128.5.into(),
1826            zmin: Some(11.0.into()),
1827            zmax: Some(1300.0.into()),
1828            mmin: Some(3.7.into()),
1829            mmax: Some(42.0.into()),
1830        });
1831    }
1832
1833    #[test]
1834    fn test_convert_stats_preserves_missing_null_count() {
1835        let primitive =
1836            crate::schema::types::Type::primitive_type_builder("col", PhysicalType::INT32)
1837                .build()
1838                .unwrap();
1839        let column_descr = Arc::new(ColumnDescriptor::new(
1840            Arc::new(primitive),
1841            0,
1842            0,
1843            ColumnPath::new(vec![]),
1844        ));
1845
1846        let none_null_count = super::Statistics {
1847            max: None,
1848            min: None,
1849            null_count: None,
1850            distinct_count: None,
1851            max_value: None,
1852            min_value: None,
1853            is_max_value_exact: None,
1854            is_min_value_exact: None,
1855        };
1856        let decoded_none = super::convert_stats(&column_descr, Some(none_null_count))
1857            .unwrap()
1858            .unwrap();
1859        assert_eq!(decoded_none.null_count_opt(), None);
1860
1861        let zero_null_count = super::Statistics {
1862            max: None,
1863            min: None,
1864            null_count: Some(0),
1865            distinct_count: None,
1866            max_value: None,
1867            min_value: None,
1868            is_max_value_exact: None,
1869            is_min_value_exact: None,
1870        };
1871        let decoded_zero = super::convert_stats(&column_descr, Some(zero_null_count))
1872            .unwrap()
1873            .unwrap();
1874        assert_eq!(decoded_zero.null_count_opt(), Some(0));
1875    }
1876}