parquet/file/metadata/thrift/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module is the bridge between a Parquet file's thrift encoded metadata
19//! and this crate's [Parquet metadata API]. It contains objects and functions used
20//! to serialize/deserialize metadata objects into/from the Thrift compact protocol
21//! format as defined by the [Parquet specification].
22//!
23//! [Parquet metadata API]: crate::file::metadata
24//! [Parquet specification]: https://github.com/apache/parquet-format/tree/master
25
26use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34    column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37    basic::{
38        ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType,
39        Repetition, Type,
40    },
41    data_type::{ByteArray, FixedLenByteArray, Int96},
42    errors::{ParquetError, Result},
43    file::{
44        metadata::{
45            ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46            PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, ParquetPageEncodingStats,
47            RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
48        },
49        statistics::ValueStatistics,
50    },
51    parquet_thrift::{
52        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54        read_thrift_vec,
55    },
56    schema::types::{
57        ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58    },
59    thrift_struct,
60    util::bit_util::FromBytes,
61    write_thrift_field,
62};
63
64// this needs to be visible to the schema conversion code
65thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67  /// Data type for this field. Not set if the current element is a non-leaf node
68  1: optional Type r#type;
69  /// If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values.
70  /// Otherwise, if specified, this is the maximum bit length to store any of the values.
71  /// (e.g. a low cardinality INT col could have this set to 3).  Note that this is
72  /// in the schema, and therefore fixed for the entire file.
73  2: optional i32 type_length;
74  /// Repetition of the field. The root of the schema does not have a repetition_type.
75  /// All other nodes must have one.
76  3: optional Repetition repetition_type;
77  /// Name of the field in the schema
78  4: required string<'a> name;
79  /// Nested fields. Since thrift does not support nested fields,
80  /// the nesting is flattened to a single list by a depth-first traversal.
81  /// The children count is used to construct the nested relationship.
82  /// This field is not set when the element is a primitive type.
83  5: optional i32 num_children;
84  /// DEPRECATED: When the schema is the result of a conversion from another model.
85  /// Used to record the original type to help with cross conversion.
86  ///
87  /// This is superseded by logical_type.
88  6: optional ConvertedType converted_type;
89  /// DEPRECATED: Used when this column contains decimal data.
90  /// See the DECIMAL converted type for more details.
91  ///
92  /// This is superseded by using the DecimalType annotation in logical_type.
93  7: optional i32 scale
94  8: optional i32 precision
95  /// When the original schema supports field ids, this will save the
96  /// original field id in the parquet schema
97  9: optional i32 field_id;
98  /// The logical type of this SchemaElement
99  ///
100  /// LogicalType replaces ConvertedType, but ConvertedType is still required
101  /// for some logical types to ensure forward-compatibility in format v1.
102  10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108   1: optional binary<'a> max;
109   2: optional binary<'a> min;
110   3: optional i64 null_count;
111   4: optional i64 distinct_count;
112   5: optional binary<'a> max_value;
113   6: optional binary<'a> min_value;
114   7: optional bool is_max_value_exact;
115   8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121  1: required double xmin;
122  2: required double xmax;
123  3: required double ymin;
124  4: required double ymax;
125  5: optional double zmin;
126  6: optional double zmax;
127  7: optional double mmin;
128  8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134  1: optional BoundingBox bbox;
135  2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141   1: optional i64 unencoded_byte_array_data_bytes;
142   2: optional list<i64> repetition_level_histogram;
143   3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148    stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150    stats.map(|st| {
151        let bbox = convert_bounding_box(st.bbox);
152        let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153        Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154            bbox,
155            geospatial_types,
156        ))
157    })
158}
159
160fn convert_bounding_box(
161    bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163    bbox.map(|bb| {
164        let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165            bb.xmin.into(),
166            bb.xmax.into(),
167            bb.ymin.into(),
168            bb.ymax.into(),
169        );
170
171        newbb = match (bb.zmin, bb.zmax) {
172            (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173            // If either None or mismatch, leave it as None and don't error
174            _ => newbb,
175        };
176
177        newbb = match (bb.mmin, bb.mmax) {
178            (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179            // If either None or mismatch, leave it as None and don't error
180            _ => newbb,
181        };
182
183        newbb
184    })
185}
186
187/// Create a [`crate::file::statistics::Statistics`] from a thrift [`Statistics`] object.
188fn convert_stats(
189    column_descr: &Arc<ColumnDescriptor>,
190    thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192    use crate::file::statistics::Statistics as FStatistics;
193    Ok(match thrift_stats {
194        Some(stats) => {
195            // Number of nulls recorded, when it is not available, we just mark it as 0.
196            // TODO this should be `None` if there is no information about NULLS.
197            // see https://github.com/apache/arrow-rs/pull/6216/files
198            let null_count = stats.null_count.unwrap_or(0);
199
200            if null_count < 0 {
201                return Err(general_err!(
202                    "Statistics null count is negative {}",
203                    null_count
204                ));
205            }
206
207            // Generic null count.
208            let null_count = Some(null_count as u64);
209            // Generic distinct count (count of distinct values occurring)
210            let distinct_count = stats.distinct_count.map(|value| value as u64);
211            // Whether or not statistics use deprecated min/max fields.
212            let old_format = stats.min_value.is_none() && stats.max_value.is_none();
213            // Generic min value as bytes.
214            let min = if old_format {
215                stats.min
216            } else {
217                stats.min_value
218            };
219            // Generic max value as bytes.
220            let max = if old_format {
221                stats.max
222            } else {
223                stats.max_value
224            };
225
226            fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
227                if let Some(min) = min {
228                    if min.len() < len {
229                        return Err(general_err!("Insufficient bytes to parse min statistic",));
230                    }
231                }
232                if let Some(max) = max {
233                    if max.len() < len {
234                        return Err(general_err!("Insufficient bytes to parse max statistic",));
235                    }
236                }
237                Ok(())
238            }
239
240            let physical_type = column_descr.physical_type();
241            match physical_type {
242                Type::BOOLEAN => check_len(&min, &max, 1),
243                Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
244                Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
245                Type::INT96 => check_len(&min, &max, 12),
246                _ => Ok(()),
247            }?;
248
249            // Values are encoded using PLAIN encoding definition, except that
250            // variable-length byte arrays do not include a length prefix.
251            //
252            // Instead of using actual decoder, we manually convert values.
253            let res = match physical_type {
254                Type::BOOLEAN => FStatistics::boolean(
255                    min.map(|data| data[0] != 0),
256                    max.map(|data| data[0] != 0),
257                    distinct_count,
258                    null_count,
259                    old_format,
260                ),
261                Type::INT32 => FStatistics::int32(
262                    min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263                    max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
264                    distinct_count,
265                    null_count,
266                    old_format,
267                ),
268                Type::INT64 => FStatistics::int64(
269                    min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270                    max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
271                    distinct_count,
272                    null_count,
273                    old_format,
274                ),
275                Type::INT96 => {
276                    // INT96 statistics may not be correct, because comparison is signed
277                    let min = if let Some(data) = min {
278                        assert_eq!(data.len(), 12);
279                        Some(Int96::try_from_le_slice(data)?)
280                    } else {
281                        None
282                    };
283                    let max = if let Some(data) = max {
284                        assert_eq!(data.len(), 12);
285                        Some(Int96::try_from_le_slice(data)?)
286                    } else {
287                        None
288                    };
289                    FStatistics::int96(min, max, distinct_count, null_count, old_format)
290                }
291                Type::FLOAT => FStatistics::float(
292                    min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293                    max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
294                    distinct_count,
295                    null_count,
296                    old_format,
297                ),
298                Type::DOUBLE => FStatistics::double(
299                    min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300                    max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
301                    distinct_count,
302                    null_count,
303                    old_format,
304                ),
305                Type::BYTE_ARRAY => FStatistics::ByteArray(
306                    ValueStatistics::new(
307                        min.map(ByteArray::from),
308                        max.map(ByteArray::from),
309                        distinct_count,
310                        null_count,
311                        old_format,
312                    )
313                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
314                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
315                ),
316                Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
317                    ValueStatistics::new(
318                        min.map(ByteArray::from).map(FixedLenByteArray::from),
319                        max.map(ByteArray::from).map(FixedLenByteArray::from),
320                        distinct_count,
321                        null_count,
322                        old_format,
323                    )
324                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
325                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
326                ),
327            };
328
329            Some(res)
330        }
331        None => None,
332    })
333}
334
335// bit positions for required fields in the Thrift ColumnMetaData struct
336const COL_META_TYPE: u16 = 1 << 1;
337const COL_META_ENCODINGS: u16 = 1 << 2;
338const COL_META_CODEC: u16 = 1 << 4;
339const COL_META_NUM_VALUES: u16 = 1 << 5;
340const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
341const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
342const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
343
344// a mask where all required fields' bits are set
345const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
346    | COL_META_ENCODINGS
347    | COL_META_CODEC
348    | COL_META_NUM_VALUES
349    | COL_META_TOTAL_UNCOMP_SZ
350    | COL_META_TOTAL_COMP_SZ
351    | COL_META_DATA_PAGE_OFFSET;
352
353// check mask to see if all required fields are set. return an appropriate error if
354// any are missing.
355fn validate_column_metadata(mask: u16) -> Result<()> {
356    if mask != COL_META_ALL_REQUIRED {
357        if mask & COL_META_ENCODINGS == 0 {
358            return Err(general_err!("Required field encodings is missing"));
359        }
360
361        if mask & COL_META_CODEC == 0 {
362            return Err(general_err!("Required field codec is missing"));
363        }
364        if mask & COL_META_NUM_VALUES == 0 {
365            return Err(general_err!("Required field num_values is missing"));
366        }
367        if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
368            return Err(general_err!(
369                "Required field total_uncompressed_size is missing"
370            ));
371        }
372        if mask & COL_META_TOTAL_COMP_SZ == 0 {
373            return Err(general_err!(
374                "Required field total_compressed_size is missing"
375            ));
376        }
377        if mask & COL_META_DATA_PAGE_OFFSET == 0 {
378            return Err(general_err!("Required field data_page_offset is missing"));
379        }
380    }
381
382    Ok(())
383}
384
385fn read_encoding_stats_as_mask<'a>(
386    prot: &mut ThriftSliceInputProtocol<'a>,
387) -> Result<EncodingMask> {
388    // read the vector of stats, setting mask bits for data pages
389    let mut mask = 0i32;
390    let list_ident = prot.read_list_begin()?;
391    for _ in 0..list_ident.size {
392        let pes = PageEncodingStats::read_thrift(prot)?;
393        match pes.page_type {
394            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => mask |= 1 << pes.encoding as i32,
395            _ => {}
396        }
397    }
398    EncodingMask::try_new(mask)
399}
400
401// Decode `ColumnMetaData`. Returns a mask of all required fields that were observed.
402// This mask can be passed to `validate_column_metadata`.
403fn read_column_metadata<'a>(
404    prot: &mut ThriftSliceInputProtocol<'a>,
405    column: &mut ColumnChunkMetaData,
406    col_index: usize,
407    options: Option<&ParquetMetaDataOptions>,
408) -> Result<u16> {
409    // mask for seen required fields in ColumnMetaData
410    let mut seen_mask = 0u16;
411
412    let mut skip_pes = false;
413    let mut pes_mask = true;
414    let mut skip_col_stats = false;
415    let mut skip_size_stats = false;
416
417    if let Some(opts) = options {
418        skip_pes = opts.skip_encoding_stats(col_index);
419        pes_mask = opts.encoding_stats_as_mask();
420        skip_col_stats = opts.skip_column_stats(col_index);
421        skip_size_stats = opts.skip_size_stats(col_index);
422    }
423
424    // struct ColumnMetaData {
425    //   1: required Type type
426    //   2: required list<Encoding> encodings
427    //   3: required list<string> path_in_schema
428    //   4: required CompressionCodec codec
429    //   5: required i64 num_values
430    //   6: required i64 total_uncompressed_size
431    //   7: required i64 total_compressed_size
432    //   8: optional list<KeyValue> key_value_metadata
433    //   9: required i64 data_page_offset
434    //   10: optional i64 index_page_offset
435    //   11: optional i64 dictionary_page_offset
436    //   12: optional Statistics statistics;
437    //   13: optional list<PageEncodingStats> encoding_stats;
438    //   14: optional i64 bloom_filter_offset;
439    //   15: optional i32 bloom_filter_length;
440    //   16: optional SizeStatistics size_statistics;
441    //   17: optional GeospatialStatistics geospatial_statistics;
442    // }
443    let column_descr = &column.column_descr;
444
445    let mut last_field_id = 0i16;
446    loop {
447        let field_ident = prot.read_field_begin(last_field_id)?;
448        if field_ident.field_type == FieldType::Stop {
449            break;
450        }
451        match field_ident.id {
452            // 1: type is never used, we can use the column descriptor
453            1 => {
454                // read for error handling
455                Type::read_thrift(&mut *prot)?;
456                seen_mask |= COL_META_TYPE;
457            }
458            2 => {
459                column.encodings = EncodingMask::read_thrift(&mut *prot)?;
460                seen_mask |= COL_META_ENCODINGS;
461            }
462            // 3: path_in_schema is redundant
463            4 => {
464                column.compression = Compression::read_thrift(&mut *prot)?;
465                seen_mask |= COL_META_CODEC;
466            }
467            5 => {
468                column.num_values = i64::read_thrift(&mut *prot)?;
469                seen_mask |= COL_META_NUM_VALUES;
470            }
471            6 => {
472                column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
473                seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
474            }
475            7 => {
476                column.total_compressed_size = i64::read_thrift(&mut *prot)?;
477                seen_mask |= COL_META_TOTAL_COMP_SZ;
478            }
479            // 8: we don't expose this key value
480            9 => {
481                column.data_page_offset = i64::read_thrift(&mut *prot)?;
482                seen_mask |= COL_META_DATA_PAGE_OFFSET;
483            }
484            10 => {
485                column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
486            }
487            11 => {
488                column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
489            }
490            12 if !skip_col_stats => {
491                column.statistics =
492                    convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
493            }
494            13 if !skip_pes => {
495                if pes_mask {
496                    let val = read_encoding_stats_as_mask(&mut *prot)?;
497                    column.encoding_stats = Some(ParquetPageEncodingStats::Mask(val));
498                } else {
499                    let val =
500                        read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
501                    column.encoding_stats = Some(ParquetPageEncodingStats::Full(val));
502                }
503            }
504            14 => {
505                column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
506            }
507            15 => {
508                column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
509            }
510            16 if !skip_size_stats => {
511                let val = SizeStatistics::read_thrift(&mut *prot)?;
512                column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
513                column.repetition_level_histogram =
514                    val.repetition_level_histogram.map(LevelHistogram::from);
515                column.definition_level_histogram =
516                    val.definition_level_histogram.map(LevelHistogram::from);
517            }
518            17 => {
519                let val = GeospatialStatistics::read_thrift(&mut *prot)?;
520                column.geo_statistics = convert_geo_stats(Some(val));
521            }
522            _ => {
523                prot.skip(field_ident.field_type)?;
524            }
525        };
526        last_field_id = field_ident.id;
527    }
528
529    Ok(seen_mask)
530}
531
532// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait because
533// these are all internal and operate on slices.
534fn read_column_chunk<'a>(
535    prot: &mut ThriftSliceInputProtocol<'a>,
536    column_descr: &Arc<ColumnDescriptor>,
537    col_index: usize,
538    options: Option<&ParquetMetaDataOptions>,
539) -> Result<ColumnChunkMetaData> {
540    // create a default initialized ColumnMetaData
541    let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
542
543    // seen flag for file_offset
544    let mut has_file_offset = false;
545
546    // mask of seen flags for ColumnMetaData
547    let mut col_meta_mask = 0u16;
548
549    // struct ColumnChunk {
550    //   1: optional string file_path
551    //   2: required i64 file_offset = 0
552    //   3: optional ColumnMetaData meta_data
553    //   4: optional i64 offset_index_offset
554    //   5: optional i32 offset_index_length
555    //   6: optional i64 column_index_offset
556    //   7: optional i32 column_index_length
557    //   8: optional ColumnCryptoMetaData crypto_metadata
558    //   9: optional binary encrypted_column_metadata
559    // }
560    let mut last_field_id = 0i16;
561    loop {
562        let field_ident = prot.read_field_begin(last_field_id)?;
563        if field_ident.field_type == FieldType::Stop {
564            break;
565        }
566        match field_ident.id {
567            1 => {
568                col.file_path = Some(String::read_thrift(&mut *prot)?);
569            }
570            2 => {
571                col.file_offset = i64::read_thrift(&mut *prot)?;
572                has_file_offset = true;
573            }
574            3 => {
575                col_meta_mask = read_column_metadata(&mut *prot, &mut col, col_index, options)?;
576            }
577            4 => {
578                col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
579            }
580            5 => {
581                col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
582            }
583            6 => {
584                col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
585            }
586            7 => {
587                col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
588            }
589            #[cfg(feature = "encryption")]
590            8 => {
591                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
592                col.column_crypto_metadata = Some(Box::new(val));
593            }
594            #[cfg(feature = "encryption")]
595            9 => {
596                col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
597            }
598            _ => {
599                prot.skip(field_ident.field_type)?;
600            }
601        };
602        last_field_id = field_ident.id;
603    }
604
605    // the only required field from ColumnChunk
606    if !has_file_offset {
607        return Err(general_err!("Required field file_offset is missing"));
608    };
609
610    // if encrypted just return. we'll decrypt after finishing the footer and populate the rest.
611    #[cfg(feature = "encryption")]
612    if col.encrypted_column_metadata.is_some() {
613        return Ok(col);
614    }
615
616    // not encrypted, so make sure all required fields were read
617    validate_column_metadata(col_meta_mask)?;
618
619    Ok(col)
620}
621
622fn read_row_group(
623    prot: &mut ThriftSliceInputProtocol,
624    schema_descr: &Arc<SchemaDescriptor>,
625    options: Option<&ParquetMetaDataOptions>,
626) -> Result<RowGroupMetaData> {
627    // create default initialized RowGroupMetaData
628    let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
629
630    // mask values for required fields
631    const RG_COLUMNS: u8 = 1 << 1;
632    const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
633    const RG_NUM_ROWS: u8 = 1 << 3;
634    const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
635
636    let mut mask = 0u8;
637
638    // struct RowGroup {
639    //   1: required list<ColumnChunk> columns
640    //   2: required i64 total_byte_size
641    //   3: required i64 num_rows
642    //   4: optional list<SortingColumn> sorting_columns
643    //   5: optional i64 file_offset
644    //   6: optional i64 total_compressed_size
645    //   7: optional i16 ordinal
646    // }
647    let mut last_field_id = 0i16;
648    loop {
649        let field_ident = prot.read_field_begin(last_field_id)?;
650        if field_ident.field_type == FieldType::Stop {
651            break;
652        }
653        match field_ident.id {
654            1 => {
655                let list_ident = prot.read_list_begin()?;
656                if schema_descr.num_columns() != list_ident.size as usize {
657                    return Err(general_err!(
658                        "Column count mismatch. Schema has {} columns while Row Group has {}",
659                        schema_descr.num_columns(),
660                        list_ident.size
661                    ));
662                }
663                for i in 0..list_ident.size as usize {
664                    let col = read_column_chunk(prot, &schema_descr.columns()[i], i, options)?;
665                    row_group.columns.push(col);
666                }
667                mask |= RG_COLUMNS;
668            }
669            2 => {
670                row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
671                mask |= RG_TOT_BYTE_SIZE;
672            }
673            3 => {
674                row_group.num_rows = i64::read_thrift(&mut *prot)?;
675                mask |= RG_NUM_ROWS;
676            }
677            4 => {
678                let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
679                row_group.sorting_columns = Some(val);
680            }
681            5 => {
682                row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
683            }
684            // 6: we don't expose total_compressed_size
685            7 => {
686                row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
687            }
688            _ => {
689                prot.skip(field_ident.field_type)?;
690            }
691        };
692        last_field_id = field_ident.id;
693    }
694
695    if mask != RG_ALL_REQUIRED {
696        if mask & RG_COLUMNS == 0 {
697            return Err(general_err!("Required field columns is missing"));
698        }
699        if mask & RG_TOT_BYTE_SIZE == 0 {
700            return Err(general_err!("Required field total_byte_size is missing"));
701        }
702        if mask & RG_NUM_ROWS == 0 {
703            return Err(general_err!("Required field num_rows is missing"));
704        }
705    }
706
707    Ok(row_group)
708}
709
710/// Create a [`SchemaDescriptor`] from thrift input. The input buffer must contain a complete
711/// Parquet footer.
712pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
713    let mut prot = ThriftSliceInputProtocol::new(buf);
714
715    let mut last_field_id = 0i16;
716    loop {
717        let field_ident = prot.read_field_begin(last_field_id)?;
718        if field_ident.field_type == FieldType::Stop {
719            break;
720        }
721        match field_ident.id {
722            2 => {
723                // read schema and convert to SchemaDescriptor for use when reading row groups
724                let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
725                let val = parquet_schema_from_array(val)?;
726                return Ok(SchemaDescriptor::new(val));
727            }
728            _ => prot.skip(field_ident.field_type)?,
729        }
730        last_field_id = field_ident.id;
731    }
732    Err(general_err!("Input does not contain a schema"))
733}
734
735/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes the file metadata in
736/// the Parquet footer. Page indexes will need to be added later.
737pub(crate) fn parquet_metadata_from_bytes(
738    buf: &[u8],
739    options: Option<&ParquetMetaDataOptions>,
740) -> Result<ParquetMetaData> {
741    let mut prot = ThriftSliceInputProtocol::new(buf);
742
743    // begin reading the file metadata
744    let mut version: Option<i32> = None;
745    let mut num_rows: Option<i64> = None;
746    let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
747    let mut key_value_metadata: Option<Vec<KeyValue>> = None;
748    let mut created_by: Option<&str> = None;
749    let mut column_orders: Option<Vec<ColumnOrder>> = None;
750    #[cfg(feature = "encryption")]
751    let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
752    #[cfg(feature = "encryption")]
753    let mut footer_signing_key_metadata: Option<&[u8]> = None;
754
755    // this will need to be set before parsing row groups
756    let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
757
758    // see if we already have a schema.
759    if let Some(options) = options {
760        schema_descr = options.schema().cloned();
761    }
762
763    // struct FileMetaData {
764    //   1: required i32 version
765    //   2: required list<SchemaElement> schema;
766    //   3: required i64 num_rows
767    //   4: required list<RowGroup> row_groups
768    //   5: optional list<KeyValue> key_value_metadata
769    //   6: optional string created_by
770    //   7: optional list<ColumnOrder> column_orders;
771    //   8: optional EncryptionAlgorithm encryption_algorithm
772    //   9: optional binary footer_signing_key_metadata
773    // }
774    let mut last_field_id = 0i16;
775    loop {
776        let field_ident = prot.read_field_begin(last_field_id)?;
777        if field_ident.field_type == FieldType::Stop {
778            break;
779        }
780        match field_ident.id {
781            1 => {
782                version = Some(i32::read_thrift(&mut prot)?);
783            }
784            2 => {
785                // If schema was passed in, skip parsing it
786                if schema_descr.is_some() {
787                    prot.skip(field_ident.field_type)?;
788                } else {
789                    // read schema and convert to SchemaDescriptor for use when reading row groups
790                    let val =
791                        read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
792                    let val = parquet_schema_from_array(val)?;
793                    schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
794                }
795            }
796            3 => {
797                num_rows = Some(i64::read_thrift(&mut prot)?);
798            }
799            4 => {
800                if schema_descr.is_none() {
801                    return Err(general_err!("Required field schema is missing"));
802                }
803                let schema_descr = schema_descr.as_ref().unwrap();
804                let list_ident = prot.read_list_begin()?;
805                let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
806
807                // Read row groups and handle ordinal assignment
808                let mut assigner = OrdinalAssigner::new();
809                for ordinal in 0..list_ident.size {
810                    let ordinal: i16 = ordinal.try_into().map_err(|_| {
811                        ParquetError::General(format!(
812                            "Row group ordinal {ordinal} exceeds i16 max value",
813                        ))
814                    })?;
815                    let rg = read_row_group(&mut prot, schema_descr, options)?;
816                    rg_vec.push(assigner.ensure(ordinal, rg)?);
817                }
818                row_groups = Some(rg_vec);
819            }
820            5 => {
821                let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
822                key_value_metadata = Some(val);
823            }
824            6 => {
825                created_by = Some(<&str>::read_thrift(&mut prot)?);
826            }
827            7 => {
828                let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
829                column_orders = Some(val);
830            }
831            #[cfg(feature = "encryption")]
832            8 => {
833                let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
834                encryption_algorithm = Some(val);
835            }
836            #[cfg(feature = "encryption")]
837            9 => {
838                footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
839            }
840            _ => {
841                prot.skip(field_ident.field_type)?;
842            }
843        };
844        last_field_id = field_ident.id;
845    }
846    let Some(version) = version else {
847        return Err(general_err!("Required field version is missing"));
848    };
849    let Some(num_rows) = num_rows else {
850        return Err(general_err!("Required field num_rows is missing"));
851    };
852    let Some(row_groups) = row_groups else {
853        return Err(general_err!("Required field row_groups is missing"));
854    };
855
856    let created_by = created_by.map(|c| c.to_owned());
857
858    // we've tested for `None` by now so this is safe
859    let schema_descr = schema_descr.unwrap();
860
861    // need to map read column orders to actual values based on the schema
862    if column_orders
863        .as_ref()
864        .is_some_and(|cos| cos.len() != schema_descr.num_columns())
865    {
866        return Err(general_err!("Column order length mismatch"));
867    }
868    // replace default type defined column orders with ones having the correct sort order
869    // TODO(ets): this could instead be done above when decoding
870    let column_orders = column_orders.map(|mut cos| {
871        for (i, column) in schema_descr.columns().iter().enumerate() {
872            if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
873                let sort_order = ColumnOrder::sort_order_for_type(
874                    column.logical_type_ref(),
875                    column.converted_type(),
876                    column.physical_type(),
877                );
878                cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
879            }
880        }
881        cos
882    });
883
884    #[cfg(not(feature = "encryption"))]
885    let fmd = crate::file::metadata::FileMetaData::new(
886        version,
887        num_rows,
888        created_by,
889        key_value_metadata,
890        schema_descr,
891        column_orders,
892    );
893    #[cfg(feature = "encryption")]
894    let fmd = crate::file::metadata::FileMetaData::new(
895        version,
896        num_rows,
897        created_by,
898        key_value_metadata,
899        schema_descr,
900        column_orders,
901    )
902    .with_encryption_algorithm(encryption_algorithm)
903    .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
904
905    Ok(ParquetMetaData::new(fmd, row_groups))
906}
907
908/// Assign [`RowGroupMetaData::ordinal`]  if it is missing.
909#[derive(Debug, Default)]
910pub(crate) struct OrdinalAssigner {
911    first_has_ordinal: Option<bool>,
912}
913
914impl OrdinalAssigner {
915    fn new() -> Self {
916        Default::default()
917    }
918
919    /// Sets [`RowGroupMetaData::ordinal`] if it is missing.
920    ///
921    /// # Arguments
922    /// - actual_ordinal: The ordinal (index) of the row group being processed
923    ///   in the file metadata.
924    /// - rg: The [`RowGroupMetaData`] to potentially modify.
925    ///
926    /// Ensures:
927    /// 1. If the first row group has an ordinal, all subsequent row groups must
928    ///    also have ordinals.
929    /// 2. If the first row group does NOT have an ordinal, all subsequent row
930    ///    groups must also not have ordinals.
931    fn ensure(
932        &mut self,
933        actual_ordinal: i16,
934        mut rg: RowGroupMetaData,
935    ) -> Result<RowGroupMetaData> {
936        let rg_has_ordinal = rg.ordinal.is_some();
937
938        // Only set first_has_ordinal if it's None (first row group that arrives)
939        if self.first_has_ordinal.is_none() {
940            self.first_has_ordinal = Some(rg_has_ordinal);
941        }
942
943        // assign ordinal if missing and consistent with first row group
944        let first_has_ordinal = self.first_has_ordinal.unwrap();
945        if !first_has_ordinal && !rg_has_ordinal {
946            rg.ordinal = Some(actual_ordinal);
947        } else if first_has_ordinal != rg_has_ordinal {
948            return Err(general_err!(
949                "Inconsistent ordinal assignment: first_has_ordinal is set to \
950                {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
951                first_has_ordinal,
952                actual_ordinal,
953                rg_has_ordinal
954            ));
955        }
956        Ok(rg)
957    }
958}
959
960thrift_struct!(
961    pub(crate) struct IndexPageHeader {}
962);
963
964thrift_struct!(
965pub(crate) struct DictionaryPageHeader {
966  /// Number of values in the dictionary
967  1: required i32 num_values;
968
969  /// Encoding using this dictionary page
970  2: required Encoding encoding
971
972  /// If true, the entries in the dictionary are sorted in ascending order
973  3: optional bool is_sorted;
974}
975);
976
977thrift_struct!(
978/// Statistics for the page header.
979///
980/// This is a duplicate of the [`Statistics`] struct above. Because the page reader uses
981/// the [`Read`] API, we cannot read the min/max values as slices. This should not be
982/// a huge problem since this crate no longer reads the page header statistics by default.
983///
984/// [`Read`]: crate::parquet_thrift::ThriftReadInputProtocol
985pub(crate) struct PageStatistics {
986   1: optional binary max;
987   2: optional binary min;
988   3: optional i64 null_count;
989   4: optional i64 distinct_count;
990   5: optional binary max_value;
991   6: optional binary min_value;
992   7: optional bool is_max_value_exact;
993   8: optional bool is_min_value_exact;
994}
995);
996
997thrift_struct!(
998pub(crate) struct DataPageHeader {
999  1: required i32 num_values
1000  2: required Encoding encoding
1001  3: required Encoding definition_level_encoding;
1002  4: required Encoding repetition_level_encoding;
1003  5: optional PageStatistics statistics;
1004}
1005);
1006
1007impl DataPageHeader {
1008    // reader that skips decoding page statistics
1009    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1010    where
1011        R: ThriftCompactInputProtocol<'a>,
1012    {
1013        let mut num_values: Option<i32> = None;
1014        let mut encoding: Option<Encoding> = None;
1015        let mut definition_level_encoding: Option<Encoding> = None;
1016        let mut repetition_level_encoding: Option<Encoding> = None;
1017        let statistics: Option<PageStatistics> = None;
1018        let mut last_field_id = 0i16;
1019        loop {
1020            let field_ident = prot.read_field_begin(last_field_id)?;
1021            if field_ident.field_type == FieldType::Stop {
1022                break;
1023            }
1024            match field_ident.id {
1025                1 => {
1026                    let val = i32::read_thrift(&mut *prot)?;
1027                    num_values = Some(val);
1028                }
1029                2 => {
1030                    let val = Encoding::read_thrift(&mut *prot)?;
1031                    encoding = Some(val);
1032                }
1033                3 => {
1034                    let val = Encoding::read_thrift(&mut *prot)?;
1035                    definition_level_encoding = Some(val);
1036                }
1037                4 => {
1038                    let val = Encoding::read_thrift(&mut *prot)?;
1039                    repetition_level_encoding = Some(val);
1040                }
1041                _ => {
1042                    prot.skip(field_ident.field_type)?;
1043                }
1044            };
1045            last_field_id = field_ident.id;
1046        }
1047        let Some(num_values) = num_values else {
1048            return Err(general_err!("Required field num_values is missing"));
1049        };
1050        let Some(encoding) = encoding else {
1051            return Err(general_err!("Required field encoding is missing"));
1052        };
1053        let Some(definition_level_encoding) = definition_level_encoding else {
1054            return Err(general_err!(
1055                "Required field definition_level_encoding is missing"
1056            ));
1057        };
1058        let Some(repetition_level_encoding) = repetition_level_encoding else {
1059            return Err(general_err!(
1060                "Required field repetition_level_encoding is missing"
1061            ));
1062        };
1063        Ok(Self {
1064            num_values,
1065            encoding,
1066            definition_level_encoding,
1067            repetition_level_encoding,
1068            statistics,
1069        })
1070    }
1071}
1072
1073thrift_struct!(
1074pub(crate) struct DataPageHeaderV2 {
1075  1: required i32 num_values
1076  2: required i32 num_nulls
1077  3: required i32 num_rows
1078  4: required Encoding encoding
1079  5: required i32 definition_levels_byte_length;
1080  6: required i32 repetition_levels_byte_length;
1081  7: optional bool is_compressed = true;
1082  8: optional PageStatistics statistics;
1083}
1084);
1085
1086impl DataPageHeaderV2 {
1087    // reader that skips decoding page statistics
1088    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1089    where
1090        R: ThriftCompactInputProtocol<'a>,
1091    {
1092        let mut num_values: Option<i32> = None;
1093        let mut num_nulls: Option<i32> = None;
1094        let mut num_rows: Option<i32> = None;
1095        let mut encoding: Option<Encoding> = None;
1096        let mut definition_levels_byte_length: Option<i32> = None;
1097        let mut repetition_levels_byte_length: Option<i32> = None;
1098        let mut is_compressed: Option<bool> = None;
1099        let statistics: Option<PageStatistics> = None;
1100        let mut last_field_id = 0i16;
1101        loop {
1102            let field_ident = prot.read_field_begin(last_field_id)?;
1103            if field_ident.field_type == FieldType::Stop {
1104                break;
1105            }
1106            match field_ident.id {
1107                1 => {
1108                    let val = i32::read_thrift(&mut *prot)?;
1109                    num_values = Some(val);
1110                }
1111                2 => {
1112                    let val = i32::read_thrift(&mut *prot)?;
1113                    num_nulls = Some(val);
1114                }
1115                3 => {
1116                    let val = i32::read_thrift(&mut *prot)?;
1117                    num_rows = Some(val);
1118                }
1119                4 => {
1120                    let val = Encoding::read_thrift(&mut *prot)?;
1121                    encoding = Some(val);
1122                }
1123                5 => {
1124                    let val = i32::read_thrift(&mut *prot)?;
1125                    definition_levels_byte_length = Some(val);
1126                }
1127                6 => {
1128                    let val = i32::read_thrift(&mut *prot)?;
1129                    repetition_levels_byte_length = Some(val);
1130                }
1131                7 => {
1132                    let val = field_ident.bool_val.unwrap();
1133                    is_compressed = Some(val);
1134                }
1135                _ => {
1136                    prot.skip(field_ident.field_type)?;
1137                }
1138            };
1139            last_field_id = field_ident.id;
1140        }
1141        let Some(num_values) = num_values else {
1142            return Err(general_err!("Required field num_values is missing"));
1143        };
1144        let Some(num_nulls) = num_nulls else {
1145            return Err(general_err!("Required field num_nulls is missing"));
1146        };
1147        let Some(num_rows) = num_rows else {
1148            return Err(general_err!("Required field num_rows is missing"));
1149        };
1150        let Some(encoding) = encoding else {
1151            return Err(general_err!("Required field encoding is missing"));
1152        };
1153        let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1154            return Err(general_err!(
1155                "Required field definition_levels_byte_length is missing"
1156            ));
1157        };
1158        let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1159            return Err(general_err!(
1160                "Required field repetition_levels_byte_length is missing"
1161            ));
1162        };
1163        Ok(Self {
1164            num_values,
1165            num_nulls,
1166            num_rows,
1167            encoding,
1168            definition_levels_byte_length,
1169            repetition_levels_byte_length,
1170            is_compressed,
1171            statistics,
1172        })
1173    }
1174}
1175
1176thrift_struct!(
1177pub(crate) struct PageHeader {
1178  /// the type of the page: indicates which of the *_header fields is set
1179  1: required PageType r#type
1180
1181  /// Uncompressed page size in bytes (not including this header)
1182  2: required i32 uncompressed_page_size
1183
1184  /// Compressed (and potentially encrypted) page size in bytes, not including this header
1185  3: required i32 compressed_page_size
1186
1187  /// The 32-bit CRC checksum for the page, to be be calculated as follows:
1188  4: optional i32 crc
1189
1190  // Headers for page specific data.  One only will be set.
1191  5: optional DataPageHeader data_page_header;
1192  6: optional IndexPageHeader index_page_header;
1193  7: optional DictionaryPageHeader dictionary_page_header;
1194  8: optional DataPageHeaderV2 data_page_header_v2;
1195}
1196);
1197
1198impl PageHeader {
1199    // reader that skips reading page statistics. obtained by running
1200    // `cargo expand -p parquet --all-features --lib file::metadata::thrift`
1201    // and modifying the impl of `read_thrift`
1202    pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1203    where
1204        R: ThriftCompactInputProtocol<'a>,
1205    {
1206        let mut type_: Option<PageType> = None;
1207        let mut uncompressed_page_size: Option<i32> = None;
1208        let mut compressed_page_size: Option<i32> = None;
1209        let mut crc: Option<i32> = None;
1210        let mut data_page_header: Option<DataPageHeader> = None;
1211        let mut index_page_header: Option<IndexPageHeader> = None;
1212        let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1213        let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1214        let mut last_field_id = 0i16;
1215        loop {
1216            let field_ident = prot.read_field_begin(last_field_id)?;
1217            if field_ident.field_type == FieldType::Stop {
1218                break;
1219            }
1220            match field_ident.id {
1221                1 => {
1222                    let val = PageType::read_thrift(&mut *prot)?;
1223                    type_ = Some(val);
1224                }
1225                2 => {
1226                    let val = i32::read_thrift(&mut *prot)?;
1227                    uncompressed_page_size = Some(val);
1228                }
1229                3 => {
1230                    let val = i32::read_thrift(&mut *prot)?;
1231                    compressed_page_size = Some(val);
1232                }
1233                4 => {
1234                    let val = i32::read_thrift(&mut *prot)?;
1235                    crc = Some(val);
1236                }
1237                5 => {
1238                    let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1239                    data_page_header = Some(val);
1240                }
1241                6 => {
1242                    let val = IndexPageHeader::read_thrift(&mut *prot)?;
1243                    index_page_header = Some(val);
1244                }
1245                7 => {
1246                    let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1247                    dictionary_page_header = Some(val);
1248                }
1249                8 => {
1250                    let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1251                    data_page_header_v2 = Some(val);
1252                }
1253                _ => {
1254                    prot.skip(field_ident.field_type)?;
1255                }
1256            };
1257            last_field_id = field_ident.id;
1258        }
1259        let Some(type_) = type_ else {
1260            return Err(general_err!("Required field type_ is missing"));
1261        };
1262        let Some(uncompressed_page_size) = uncompressed_page_size else {
1263            return Err(general_err!(
1264                "Required field uncompressed_page_size is missing"
1265            ));
1266        };
1267        let Some(compressed_page_size) = compressed_page_size else {
1268            return Err(general_err!(
1269                "Required field compressed_page_size is missing"
1270            ));
1271        };
1272        Ok(Self {
1273            r#type: type_,
1274            uncompressed_page_size,
1275            compressed_page_size,
1276            crc,
1277            data_page_header,
1278            index_page_header,
1279            dictionary_page_header,
1280            data_page_header_v2,
1281        })
1282    }
1283}
1284
1285/////////////////////////////////////////////////
1286// helper functions for writing file meta data
1287
1288// serialize the bits of the column chunk needed for a thrift ColumnMetaData
1289// struct ColumnMetaData {
1290//   1: required Type type
1291//   2: required list<Encoding> encodings
1292//   3: required list<string> path_in_schema
1293//   4: required CompressionCodec codec
1294//   5: required i64 num_values
1295//   6: required i64 total_uncompressed_size
1296//   7: required i64 total_compressed_size
1297//   8: optional list<KeyValue> key_value_metadata
1298//   9: required i64 data_page_offset
1299//   10: optional i64 index_page_offset
1300//   11: optional i64 dictionary_page_offset
1301//   12: optional Statistics statistics;
1302//   13: optional list<PageEncodingStats> encoding_stats;
1303//   14: optional i64 bloom_filter_offset;
1304//   15: optional i32 bloom_filter_length;
1305//   16: optional SizeStatistics size_statistics;
1306//   17: optional GeospatialStatistics geospatial_statistics;
1307// }
1308pub(super) fn serialize_column_meta_data<W: Write>(
1309    column_chunk: &ColumnChunkMetaData,
1310    w: &mut ThriftCompactOutputProtocol<W>,
1311) -> Result<()> {
1312    use crate::file::statistics::page_stats_to_thrift;
1313
1314    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1315    column_chunk
1316        .encodings()
1317        .collect::<Vec<_>>()
1318        .write_thrift_field(w, 2, 1)?;
1319    let path = column_chunk.column_descr.path().parts();
1320    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1321    path.write_thrift_field(w, 3, 2)?;
1322    column_chunk.compression.write_thrift_field(w, 4, 3)?;
1323    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1324    column_chunk
1325        .total_uncompressed_size
1326        .write_thrift_field(w, 6, 5)?;
1327    column_chunk
1328        .total_compressed_size
1329        .write_thrift_field(w, 7, 6)?;
1330    // no key_value_metadata here
1331    let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1332    if let Some(index_page_offset) = column_chunk.index_page_offset {
1333        last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1334    }
1335    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1336        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1337    }
1338    // PageStatistics is the same as thrift Statistics, but writable
1339    let stats = page_stats_to_thrift(column_chunk.statistics());
1340    if let Some(stats) = stats {
1341        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1342    }
1343    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1344        last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1345    }
1346    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1347        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1348    }
1349    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1350        last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1351    }
1352
1353    // SizeStatistics
1354    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1355        || column_chunk.repetition_level_histogram.is_some()
1356        || column_chunk.definition_level_histogram.is_some()
1357    {
1358        let repetition_level_histogram = column_chunk
1359            .repetition_level_histogram()
1360            .map(|hist| hist.clone().into_inner());
1361
1362        let definition_level_histogram = column_chunk
1363            .definition_level_histogram()
1364            .map(|hist| hist.clone().into_inner());
1365
1366        Some(SizeStatistics {
1367            unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1368            repetition_level_histogram,
1369            definition_level_histogram,
1370        })
1371    } else {
1372        None
1373    };
1374    if let Some(size_stats) = size_stats {
1375        last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1376    }
1377
1378    if let Some(geo_stats) = column_chunk.geo_statistics() {
1379        geo_stats.write_thrift_field(w, 17, last_field_id)?;
1380    }
1381
1382    w.write_struct_end()
1383}
1384
1385// temp struct used for writing
1386pub(super) struct FileMeta<'a> {
1387    pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1388    pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1389}
1390
1391// struct FileMetaData {
1392//   1: required i32 version
1393//   2: required list<SchemaElement> schema;
1394//   3: required i64 num_rows
1395//   4: required list<RowGroup> row_groups
1396//   5: optional list<KeyValue> key_value_metadata
1397//   6: optional string created_by
1398//   7: optional list<ColumnOrder> column_orders;
1399//   8: optional EncryptionAlgorithm encryption_algorithm
1400//   9: optional binary footer_signing_key_metadata
1401// }
1402impl<'a> WriteThrift for FileMeta<'a> {
1403    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1404
1405    // needed for last_field_id w/o encryption
1406    #[allow(unused_assignments)]
1407    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1408        self.file_metadata
1409            .version
1410            .write_thrift_field(writer, 1, 0)?;
1411
1412        // field 2 is schema. do depth-first traversal of tree, converting to SchemaElement and
1413        // writing along the way.
1414        let root = self.file_metadata.schema_descr().root_schema_ptr();
1415        let schema_len = num_nodes(&root)?;
1416        writer.write_field_begin(FieldType::List, 2, 1)?;
1417        writer.write_list_begin(ElementType::Struct, schema_len)?;
1418        // recursively write Type nodes as SchemaElements
1419        write_schema(&root, writer)?;
1420
1421        self.file_metadata
1422            .num_rows
1423            .write_thrift_field(writer, 3, 2)?;
1424
1425        // this will call RowGroupMetaData::write_thrift
1426        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1427
1428        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1429            last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1430        }
1431        if let Some(created_by) = self.file_metadata.created_by() {
1432            last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1433        }
1434        if let Some(column_orders) = self.file_metadata.column_orders() {
1435            last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1436        }
1437        #[cfg(feature = "encryption")]
1438        if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1439            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1440        }
1441        #[cfg(feature = "encryption")]
1442        if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1443            key.as_slice()
1444                .write_thrift_field(writer, 9, last_field_id)?;
1445        }
1446
1447        writer.write_struct_end()
1448    }
1449}
1450
1451fn write_schema<W: Write>(
1452    schema: &TypePtr,
1453    writer: &mut ThriftCompactOutputProtocol<W>,
1454) -> Result<()> {
1455    if !schema.is_group() {
1456        return Err(general_err!("Root schema must be Group type"));
1457    }
1458    write_schema_helper(schema, writer)
1459}
1460
1461fn write_schema_helper<W: Write>(
1462    node: &TypePtr,
1463    writer: &mut ThriftCompactOutputProtocol<W>,
1464) -> Result<()> {
1465    match node.as_ref() {
1466        crate::schema::types::Type::PrimitiveType {
1467            basic_info,
1468            physical_type,
1469            type_length,
1470            scale,
1471            precision,
1472        } => {
1473            let element = SchemaElement {
1474                r#type: Some(*physical_type),
1475                type_length: if *type_length >= 0 {
1476                    Some(*type_length)
1477                } else {
1478                    None
1479                },
1480                repetition_type: Some(basic_info.repetition()),
1481                name: basic_info.name(),
1482                num_children: None,
1483                converted_type: match basic_info.converted_type() {
1484                    ConvertedType::NONE => None,
1485                    other => Some(other),
1486                },
1487                scale: if *scale >= 0 { Some(*scale) } else { None },
1488                precision: if *precision >= 0 {
1489                    Some(*precision)
1490                } else {
1491                    None
1492                },
1493                field_id: if basic_info.has_id() {
1494                    Some(basic_info.id())
1495                } else {
1496                    None
1497                },
1498                logical_type: basic_info.logical_type_ref().cloned(),
1499            };
1500            element.write_thrift(writer)
1501        }
1502        crate::schema::types::Type::GroupType { basic_info, fields } => {
1503            let repetition = if basic_info.has_repetition() {
1504                Some(basic_info.repetition())
1505            } else {
1506                None
1507            };
1508
1509            let element = SchemaElement {
1510                r#type: None,
1511                type_length: None,
1512                repetition_type: repetition,
1513                name: basic_info.name(),
1514                num_children: Some(fields.len().try_into()?),
1515                converted_type: match basic_info.converted_type() {
1516                    ConvertedType::NONE => None,
1517                    other => Some(other),
1518                },
1519                scale: None,
1520                precision: None,
1521                field_id: if basic_info.has_id() {
1522                    Some(basic_info.id())
1523                } else {
1524                    None
1525                },
1526                logical_type: basic_info.logical_type_ref().cloned(),
1527            };
1528
1529            element.write_thrift(writer)?;
1530
1531            // Add child elements for a group
1532            for field in fields {
1533                write_schema_helper(field, writer)?;
1534            }
1535            Ok(())
1536        }
1537    }
1538}
1539
1540// struct RowGroup {
1541//   1: required list<ColumnChunk> columns
1542//   2: required i64 total_byte_size
1543//   3: required i64 num_rows
1544//   4: optional list<SortingColumn> sorting_columns
1545//   5: optional i64 file_offset
1546//   6: optional i64 total_compressed_size
1547//   7: optional i16 ordinal
1548// }
1549impl WriteThrift for RowGroupMetaData {
1550    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1551
1552    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1553        // this will call ColumnChunkMetaData::write_thrift
1554        self.columns.write_thrift_field(writer, 1, 0)?;
1555        self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1556        let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1557        if let Some(sorting_columns) = self.sorting_columns() {
1558            last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1559        }
1560        if let Some(file_offset) = self.file_offset() {
1561            last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1562        }
1563        // this is optional, but we'll always write it
1564        last_field_id = self
1565            .compressed_size()
1566            .write_thrift_field(writer, 6, last_field_id)?;
1567        if let Some(ordinal) = self.ordinal() {
1568            ordinal.write_thrift_field(writer, 7, last_field_id)?;
1569        }
1570        writer.write_struct_end()
1571    }
1572}
1573
1574// struct ColumnChunk {
1575//   1: optional string file_path
1576//   2: required i64 file_offset = 0
1577//   3: optional ColumnMetaData meta_data
1578//   4: optional i64 offset_index_offset
1579//   5: optional i32 offset_index_length
1580//   6: optional i64 column_index_offset
1581//   7: optional i32 column_index_length
1582//   8: optional ColumnCryptoMetaData crypto_metadata
1583//   9: optional binary encrypted_column_metadata
1584// }
1585impl WriteThrift for ColumnChunkMetaData {
1586    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1587
1588    #[allow(unused_assignments)]
1589    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1590        let mut last_field_id = 0i16;
1591        if let Some(file_path) = self.file_path() {
1592            last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1593        }
1594        last_field_id = self
1595            .file_offset()
1596            .write_thrift_field(writer, 2, last_field_id)?;
1597
1598        #[cfg(feature = "encryption")]
1599        {
1600            // only write the ColumnMetaData if we haven't already encrypted it
1601            if self.encrypted_column_metadata.is_none() {
1602                writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1603                serialize_column_meta_data(self, writer)?;
1604                last_field_id = 3;
1605            }
1606        }
1607        #[cfg(not(feature = "encryption"))]
1608        {
1609            // always write the ColumnMetaData
1610            writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1611            serialize_column_meta_data(self, writer)?;
1612            last_field_id = 3;
1613        }
1614
1615        if let Some(offset_idx_off) = self.offset_index_offset() {
1616            last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1617        }
1618        if let Some(offset_idx_len) = self.offset_index_length() {
1619            last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1620        }
1621        if let Some(column_idx_off) = self.column_index_offset() {
1622            last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1623        }
1624        if let Some(column_idx_len) = self.column_index_length() {
1625            last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1626        }
1627        #[cfg(feature = "encryption")]
1628        {
1629            if let Some(crypto_metadata) = self.crypto_metadata() {
1630                last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1631            }
1632            if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1633                encrypted_meta
1634                    .as_slice()
1635                    .write_thrift_field(writer, 9, last_field_id)?;
1636            }
1637        }
1638
1639        writer.write_struct_end()
1640    }
1641}
1642
1643// struct GeospatialStatistics {
1644//   1: optional BoundingBox bbox;
1645//   2: optional list<i32> geospatial_types;
1646// }
1647impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1648    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1649
1650    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1651        let mut last_field_id = 0i16;
1652        if let Some(bbox) = self.bounding_box() {
1653            last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1654        }
1655        if let Some(geo_types) = self.geospatial_types() {
1656            geo_types.write_thrift_field(writer, 2, last_field_id)?;
1657        }
1658
1659        writer.write_struct_end()
1660    }
1661}
1662
1663// macro cannot handle qualified names
1664use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1665write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1666
1667// struct BoundingBox {
1668//   1: required double xmin;
1669//   2: required double xmax;
1670//   3: required double ymin;
1671//   4: required double ymax;
1672//   5: optional double zmin;
1673//   6: optional double zmax;
1674//   7: optional double mmin;
1675//   8: optional double mmax;
1676// }
1677impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1678    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1679
1680    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1681        self.get_xmin().write_thrift_field(writer, 1, 0)?;
1682        self.get_xmax().write_thrift_field(writer, 2, 1)?;
1683        self.get_ymin().write_thrift_field(writer, 3, 2)?;
1684        let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1685
1686        if let Some(zmin) = self.get_zmin() {
1687            last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1688        }
1689        if let Some(zmax) = self.get_zmax() {
1690            last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1691        }
1692        if let Some(mmin) = self.get_mmin() {
1693            last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1694        }
1695        if let Some(mmax) = self.get_mmax() {
1696            mmax.write_thrift_field(writer, 8, last_field_id)?;
1697        }
1698
1699        writer.write_struct_end()
1700    }
1701}
1702
1703// macro cannot handle qualified names
1704use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1705write_thrift_field!(RustBoundingBox, FieldType::Struct);
1706
1707#[cfg(test)]
1708pub(crate) mod tests {
1709    use crate::errors::Result;
1710    use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema};
1711    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
1712    use crate::parquet_thrift::tests::test_roundtrip;
1713    use crate::parquet_thrift::{
1714        ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec,
1715    };
1716    use crate::schema::types::{
1717        ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
1718    };
1719    use std::sync::Arc;
1720
1721    // for testing. decode thrift encoded RowGroup
1722    pub(crate) fn read_row_group(
1723        buf: &mut [u8],
1724        schema_descr: Arc<SchemaDescriptor>,
1725    ) -> Result<RowGroupMetaData> {
1726        let mut reader = ThriftSliceInputProtocol::new(buf);
1727        crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None)
1728    }
1729
1730    pub(crate) fn read_column_chunk(
1731        buf: &mut [u8],
1732        column_descr: Arc<ColumnDescriptor>,
1733    ) -> Result<ColumnChunkMetaData> {
1734        read_column_chunk_with_options(buf, column_descr, None)
1735    }
1736
1737    pub(crate) fn read_column_chunk_with_options(
1738        buf: &mut [u8],
1739        column_descr: Arc<ColumnDescriptor>,
1740        options: Option<&ParquetMetaDataOptions>,
1741    ) -> Result<ColumnChunkMetaData> {
1742        let mut reader = ThriftSliceInputProtocol::new(buf);
1743        crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
1744    }
1745
1746    pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1747        let num_nodes = num_nodes(&schema)?;
1748        let mut buf = Vec::new();
1749        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1750
1751        // kick off writing list
1752        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1753
1754        // write SchemaElements
1755        write_schema(&schema, &mut writer)?;
1756
1757        let mut prot = ThriftSliceInputProtocol::new(&buf);
1758        let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1759        parquet_schema_from_array(se)
1760    }
1761
1762    pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1763        let num_nodes = num_nodes(schema)?;
1764        let mut buf = Vec::new();
1765        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1766
1767        // kick off writing list
1768        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1769
1770        // write SchemaElements
1771        write_schema(schema, &mut writer)?;
1772        Ok(buf)
1773    }
1774
1775    pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1776        let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1777        read_thrift_vec(&mut prot)
1778    }
1779
1780    #[test]
1781    fn test_bounding_box_roundtrip() {
1782        test_roundtrip(BoundingBox {
1783            xmin: 0.1.into(),
1784            xmax: 10.3.into(),
1785            ymin: 0.001.into(),
1786            ymax: 128.5.into(),
1787            zmin: None,
1788            zmax: None,
1789            mmin: None,
1790            mmax: None,
1791        });
1792
1793        test_roundtrip(BoundingBox {
1794            xmin: 0.1.into(),
1795            xmax: 10.3.into(),
1796            ymin: 0.001.into(),
1797            ymax: 128.5.into(),
1798            zmin: Some(11.0.into()),
1799            zmax: Some(1300.0.into()),
1800            mmin: None,
1801            mmax: None,
1802        });
1803
1804        test_roundtrip(BoundingBox {
1805            xmin: 0.1.into(),
1806            xmax: 10.3.into(),
1807            ymin: 0.001.into(),
1808            ymax: 128.5.into(),
1809            zmin: Some(11.0.into()),
1810            zmax: Some(1300.0.into()),
1811            mmin: Some(3.7.into()),
1812            mmax: Some(42.0.into()),
1813        });
1814    }
1815}