Skip to main content

parquet/file/metadata/thrift/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module is the bridge between a Parquet file's thrift encoded metadata
19//! and this crate's [Parquet metadata API]. It contains objects and functions used
20//! to serialize/deserialize metadata objects into/from the Thrift compact protocol
21//! format as defined by the [Parquet specification].
22//!
23//! [Parquet metadata API]: crate::file::metadata
24//! [Parquet specification]: https://github.com/apache/parquet-format/tree/master
25
26use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34    column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37    basic::{
38        ColumnOrder, CompressionCodec, ConvertedType, Encoding, EncodingMask, LogicalType,
39        PageType, Repetition, Type,
40    },
41    data_type::{ByteArray, FixedLenByteArray, Int96},
42    errors::{ParquetError, Result},
43    file::{
44        metadata::{
45            ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46            PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, ParquetPageEncodingStats,
47            RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
48        },
49        statistics::ValueStatistics,
50    },
51    parquet_thrift::{
52        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54        read_thrift_vec, validate_list_type,
55    },
56    schema::types::{
57        ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58    },
59    thrift_struct,
60    util::bit_util::FromBytes,
61    write_thrift_field,
62};
63
64// this needs to be visible to the schema conversion code
65thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67  /// Data type for this field. Not set if the current element is a non-leaf node
68  1: optional Type r#type;
69  /// If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values.
70  /// Otherwise, if specified, this is the maximum bit length to store any of the values.
71  /// (e.g. a low cardinality INT col could have this set to 3).  Note that this is
72  /// in the schema, and therefore fixed for the entire file.
73  2: optional i32 type_length;
74  /// Repetition of the field. The root of the schema does not have a repetition_type.
75  /// All other nodes must have one.
76  3: optional Repetition repetition_type;
77  /// Name of the field in the schema
78  4: required string<'a> name;
79  /// Nested fields. Since thrift does not support nested fields,
80  /// the nesting is flattened to a single list by a depth-first traversal.
81  /// The children count is used to construct the nested relationship.
82  /// This field is not set when the element is a primitive type.
83  5: optional i32 num_children;
84  /// DEPRECATED: When the schema is the result of a conversion from another model.
85  /// Used to record the original type to help with cross conversion.
86  ///
87  /// This is superseded by logical_type.
88  6: optional ConvertedType converted_type;
89  /// DEPRECATED: Used when this column contains decimal data.
90  /// See the DECIMAL converted type for more details.
91  ///
92  /// This is superseded by using the DecimalType annotation in logical_type.
93  7: optional i32 scale
94  8: optional i32 precision
95  /// When the original schema supports field ids, this will save the
96  /// original field id in the parquet schema
97  9: optional i32 field_id;
98  /// The logical type of this SchemaElement
99  ///
100  /// LogicalType replaces ConvertedType, but ConvertedType is still required
101  /// for some logical types to ensure forward-compatibility in format v1.
102  10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108   1: optional binary<'a> max;
109   2: optional binary<'a> min;
110   3: optional i64 null_count;
111   4: optional i64 distinct_count;
112   5: optional binary<'a> max_value;
113   6: optional binary<'a> min_value;
114   7: optional bool is_max_value_exact;
115   8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121  1: required double xmin;
122  2: required double xmax;
123  3: required double ymin;
124  4: required double ymax;
125  5: optional double zmin;
126  6: optional double zmax;
127  7: optional double mmin;
128  8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134  1: optional BoundingBox bbox;
135  2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141   1: optional i64 unencoded_byte_array_data_bytes;
142   2: optional list<i64> repetition_level_histogram;
143   3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148    stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150    stats.map(|st| {
151        let bbox = convert_bounding_box(st.bbox);
152        let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153        Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154            bbox,
155            geospatial_types,
156        ))
157    })
158}
159
160fn convert_bounding_box(
161    bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163    bbox.map(|bb| {
164        let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165            bb.xmin.into(),
166            bb.xmax.into(),
167            bb.ymin.into(),
168            bb.ymax.into(),
169        );
170
171        newbb = match (bb.zmin, bb.zmax) {
172            (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173            // If either None or mismatch, leave it as None and don't error
174            _ => newbb,
175        };
176
177        newbb = match (bb.mmin, bb.mmax) {
178            (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179            // If either None or mismatch, leave it as None and don't error
180            _ => newbb,
181        };
182
183        newbb
184    })
185}
186
187/// Create a [`crate::file::statistics::Statistics`] from a thrift [`Statistics`] object.
188fn convert_stats(
189    column_descr: &Arc<ColumnDescriptor>,
190    thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192    use crate::file::statistics::Statistics as FStatistics;
193    Ok(match thrift_stats {
194        Some(stats) => {
195            // Generic null count.
196            let null_count = stats
197                .null_count
198                .map(|null_count| {
199                    if null_count < 0 {
200                        return Err(general_err!(
201                            "Statistics null count is negative {}",
202                            null_count
203                        ));
204                    }
205                    Ok(null_count as u64)
206                })
207                .transpose()?;
208            // Generic distinct count (count of distinct values occurring)
209            let distinct_count = stats.distinct_count.map(|value| value as u64);
210            // Whether or not statistics use deprecated min/max fields.
211            let old_format = stats.min_value.is_none() && stats.max_value.is_none();
212            // Generic min value as bytes.
213            let min = if old_format {
214                stats.min
215            } else {
216                stats.min_value
217            };
218            // Generic max value as bytes.
219            let max = if old_format {
220                stats.max
221            } else {
222                stats.max_value
223            };
224
225            fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
226                if let Some(min) = min {
227                    if min.len() < len {
228                        return Err(general_err!("Insufficient bytes to parse min statistic",));
229                    }
230                }
231                if let Some(max) = max {
232                    if max.len() < len {
233                        return Err(general_err!("Insufficient bytes to parse max statistic",));
234                    }
235                }
236                Ok(())
237            }
238
239            let physical_type = column_descr.physical_type();
240            match physical_type {
241                Type::BOOLEAN => check_len(&min, &max, 1),
242                Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
243                Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
244                Type::INT96 => check_len(&min, &max, 12),
245                _ => Ok(()),
246            }?;
247
248            // Values are encoded using PLAIN encoding definition, except that
249            // variable-length byte arrays do not include a length prefix.
250            //
251            // Instead of using actual decoder, we manually convert values.
252            let res = match physical_type {
253                Type::BOOLEAN => FStatistics::boolean(
254                    min.map(|data| data[0] != 0),
255                    max.map(|data| data[0] != 0),
256                    distinct_count,
257                    null_count,
258                    old_format,
259                ),
260                Type::INT32 => FStatistics::int32(
261                    min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
262                    max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263                    distinct_count,
264                    null_count,
265                    old_format,
266                ),
267                Type::INT64 => FStatistics::int64(
268                    min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
269                    max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270                    distinct_count,
271                    null_count,
272                    old_format,
273                ),
274                Type::INT96 => {
275                    // INT96 statistics may not be correct, because comparison is signed
276                    let min = if let Some(data) = min {
277                        assert_eq!(data.len(), 12);
278                        Some(Int96::try_from_le_slice(data)?)
279                    } else {
280                        None
281                    };
282                    let max = if let Some(data) = max {
283                        assert_eq!(data.len(), 12);
284                        Some(Int96::try_from_le_slice(data)?)
285                    } else {
286                        None
287                    };
288                    FStatistics::int96(min, max, distinct_count, null_count, old_format)
289                }
290                Type::FLOAT => FStatistics::float(
291                    min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
292                    max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293                    distinct_count,
294                    null_count,
295                    old_format,
296                ),
297                Type::DOUBLE => FStatistics::double(
298                    min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
299                    max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300                    distinct_count,
301                    null_count,
302                    old_format,
303                ),
304                Type::BYTE_ARRAY => FStatistics::ByteArray(
305                    ValueStatistics::new(
306                        min.map(ByteArray::from),
307                        max.map(ByteArray::from),
308                        distinct_count,
309                        null_count,
310                        old_format,
311                    )
312                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
313                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
314                ),
315                Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
316                    ValueStatistics::new(
317                        min.map(ByteArray::from).map(FixedLenByteArray::from),
318                        max.map(ByteArray::from).map(FixedLenByteArray::from),
319                        distinct_count,
320                        null_count,
321                        old_format,
322                    )
323                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
324                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
325                ),
326            };
327
328            Some(res)
329        }
330        None => None,
331    })
332}
333
334// bit positions for required fields in the Thrift ColumnMetaData struct
335const COL_META_TYPE: u16 = 1 << 1;
336const COL_META_ENCODINGS: u16 = 1 << 2;
337const COL_META_CODEC: u16 = 1 << 4;
338const COL_META_NUM_VALUES: u16 = 1 << 5;
339const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
340const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
341const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
342
343// a mask where all required fields' bits are set
344const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
345    | COL_META_ENCODINGS
346    | COL_META_CODEC
347    | COL_META_NUM_VALUES
348    | COL_META_TOTAL_UNCOMP_SZ
349    | COL_META_TOTAL_COMP_SZ
350    | COL_META_DATA_PAGE_OFFSET;
351
352// check mask to see if all required fields are set. return an appropriate error if
353// any are missing.
354fn validate_column_metadata(mask: u16) -> Result<()> {
355    if mask != COL_META_ALL_REQUIRED {
356        if mask & COL_META_ENCODINGS == 0 {
357            return Err(general_err!("Required field encodings is missing"));
358        }
359
360        if mask & COL_META_CODEC == 0 {
361            return Err(general_err!("Required field codec is missing"));
362        }
363        if mask & COL_META_NUM_VALUES == 0 {
364            return Err(general_err!("Required field num_values is missing"));
365        }
366        if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
367            return Err(general_err!(
368                "Required field total_uncompressed_size is missing"
369            ));
370        }
371        if mask & COL_META_TOTAL_COMP_SZ == 0 {
372            return Err(general_err!(
373                "Required field total_compressed_size is missing"
374            ));
375        }
376        if mask & COL_META_DATA_PAGE_OFFSET == 0 {
377            return Err(general_err!("Required field data_page_offset is missing"));
378        }
379    }
380
381    Ok(())
382}
383
384fn read_encoding_stats_as_mask<'a>(
385    prot: &mut ThriftSliceInputProtocol<'a>,
386) -> Result<EncodingMask> {
387    // read the vector of stats, setting mask bits for data pages
388    let mut mask = 0i32;
389    let list_ident = prot.read_list_begin()?;
390    // check for PageEncodingStats struct
391    validate_list_type(ElementType::Struct, &list_ident)?;
392
393    for _ in 0..list_ident.size {
394        let pes = PageEncodingStats::read_thrift(prot)?;
395        match pes.page_type {
396            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => mask |= 1 << pes.encoding as i32,
397            _ => {}
398        }
399    }
400    EncodingMask::try_new(mask)
401}
402
403// Decode `ColumnMetaData`. Returns a mask of all required fields that were observed.
404// This mask can be passed to `validate_column_metadata`.
405fn read_column_metadata<'a>(
406    prot: &mut ThriftSliceInputProtocol<'a>,
407    column: &mut ColumnChunkMetaData,
408    col_index: usize,
409    options: Option<&ParquetMetaDataOptions>,
410) -> Result<u16> {
411    // mask for seen required fields in ColumnMetaData
412    let mut seen_mask = 0u16;
413
414    let mut skip_pes = false;
415    let mut pes_mask = true;
416    let mut skip_col_stats = false;
417    let mut skip_size_stats = false;
418
419    if let Some(opts) = options {
420        skip_pes = opts.skip_encoding_stats(col_index);
421        pes_mask = opts.encoding_stats_as_mask();
422        skip_col_stats = opts.skip_column_stats(col_index);
423        skip_size_stats = opts.skip_size_stats(col_index);
424    }
425
426    // struct ColumnMetaData {
427    //   1: required Type type
428    //   2: required list<Encoding> encodings
429    //   3: required list<string> path_in_schema
430    //   4: required CompressionCodec codec
431    //   5: required i64 num_values
432    //   6: required i64 total_uncompressed_size
433    //   7: required i64 total_compressed_size
434    //   8: optional list<KeyValue> key_value_metadata
435    //   9: required i64 data_page_offset
436    //   10: optional i64 index_page_offset
437    //   11: optional i64 dictionary_page_offset
438    //   12: optional Statistics statistics;
439    //   13: optional list<PageEncodingStats> encoding_stats;
440    //   14: optional i64 bloom_filter_offset;
441    //   15: optional i32 bloom_filter_length;
442    //   16: optional SizeStatistics size_statistics;
443    //   17: optional GeospatialStatistics geospatial_statistics;
444    // }
445    let column_descr = &column.column_descr;
446
447    let mut last_field_id = 0i16;
448    loop {
449        let field_ident = prot.read_field_begin(last_field_id)?;
450        if field_ident.field_type == FieldType::Stop {
451            break;
452        }
453        match field_ident.id {
454            // 1: type is never used, we can use the column descriptor
455            1 => {
456                // read for error handling
457                Type::read_thrift(&mut *prot)?;
458                seen_mask |= COL_META_TYPE;
459            }
460            2 => {
461                column.encodings = EncodingMask::read_thrift(&mut *prot)?;
462                seen_mask |= COL_META_ENCODINGS;
463            }
464            // 3: path_in_schema is redundant
465            4 => {
466                column.compression = CompressionCodec::read_thrift(&mut *prot)?;
467                seen_mask |= COL_META_CODEC;
468            }
469            5 => {
470                column.num_values = i64::read_thrift(&mut *prot)?;
471                seen_mask |= COL_META_NUM_VALUES;
472            }
473            6 => {
474                column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
475                seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
476            }
477            7 => {
478                column.total_compressed_size = i64::read_thrift(&mut *prot)?;
479                seen_mask |= COL_META_TOTAL_COMP_SZ;
480            }
481            // 8: we don't expose this key value
482            9 => {
483                column.data_page_offset = i64::read_thrift(&mut *prot)?;
484                seen_mask |= COL_META_DATA_PAGE_OFFSET;
485            }
486            10 => {
487                column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
488            }
489            11 => {
490                column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
491            }
492            12 if !skip_col_stats => {
493                column.statistics =
494                    convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
495            }
496            13 if !skip_pes => {
497                if pes_mask {
498                    let val = read_encoding_stats_as_mask(&mut *prot)?;
499                    column.encoding_stats = Some(ParquetPageEncodingStats::Mask(val));
500                } else {
501                    let val =
502                        read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
503                    column.encoding_stats = Some(ParquetPageEncodingStats::Full(val));
504                }
505            }
506            14 => {
507                column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
508            }
509            15 => {
510                column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
511            }
512            16 if !skip_size_stats => {
513                let val = SizeStatistics::read_thrift(&mut *prot)?;
514                column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
515                column.repetition_level_histogram =
516                    val.repetition_level_histogram.map(LevelHistogram::from);
517                column.definition_level_histogram =
518                    val.definition_level_histogram.map(LevelHistogram::from);
519            }
520            17 => {
521                let val = GeospatialStatistics::read_thrift(&mut *prot)?;
522                column.geo_statistics = convert_geo_stats(Some(val));
523            }
524            _ => {
525                prot.skip(field_ident.field_type)?;
526            }
527        };
528        last_field_id = field_ident.id;
529    }
530
531    Ok(seen_mask)
532}
533
534// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait because
535// these are all internal and operate on slices.
536fn read_column_chunk<'a>(
537    prot: &mut ThriftSliceInputProtocol<'a>,
538    column_descr: &Arc<ColumnDescriptor>,
539    col_index: usize,
540    options: Option<&ParquetMetaDataOptions>,
541) -> Result<ColumnChunkMetaData> {
542    // create a default initialized ColumnMetaData
543    let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
544
545    // seen flag for file_offset
546    let mut has_file_offset = false;
547
548    // mask of seen flags for ColumnMetaData
549    let mut col_meta_mask = 0u16;
550
551    // struct ColumnChunk {
552    //   1: optional string file_path
553    //   2: required i64 file_offset = 0
554    //   3: optional ColumnMetaData meta_data
555    //   4: optional i64 offset_index_offset
556    //   5: optional i32 offset_index_length
557    //   6: optional i64 column_index_offset
558    //   7: optional i32 column_index_length
559    //   8: optional ColumnCryptoMetaData crypto_metadata
560    //   9: optional binary encrypted_column_metadata
561    // }
562    let mut last_field_id = 0i16;
563    loop {
564        let field_ident = prot.read_field_begin(last_field_id)?;
565        if field_ident.field_type == FieldType::Stop {
566            break;
567        }
568        match field_ident.id {
569            1 => {
570                col.file_path = Some(String::read_thrift(&mut *prot)?);
571            }
572            2 => {
573                col.file_offset = i64::read_thrift(&mut *prot)?;
574                has_file_offset = true;
575            }
576            3 => {
577                col_meta_mask = read_column_metadata(&mut *prot, &mut col, col_index, options)?;
578            }
579            4 => {
580                col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
581            }
582            5 => {
583                col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
584            }
585            6 => {
586                col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
587            }
588            7 => {
589                col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
590            }
591            #[cfg(feature = "encryption")]
592            8 => {
593                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
594                col.column_crypto_metadata = Some(Box::new(val));
595            }
596            #[cfg(feature = "encryption")]
597            9 => {
598                col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
599            }
600            _ => {
601                prot.skip(field_ident.field_type)?;
602            }
603        };
604        last_field_id = field_ident.id;
605    }
606
607    // the only required field from ColumnChunk
608    if !has_file_offset {
609        return Err(general_err!("Required field file_offset is missing"));
610    };
611
612    // if encrypted just return. we'll decrypt after finishing the footer and populate the rest.
613    #[cfg(feature = "encryption")]
614    if col.encrypted_column_metadata.is_some() {
615        return Ok(col);
616    }
617
618    // not encrypted, so make sure all required fields were read
619    validate_column_metadata(col_meta_mask)?;
620
621    Ok(col)
622}
623
624fn read_row_group(
625    prot: &mut ThriftSliceInputProtocol,
626    schema_descr: &Arc<SchemaDescriptor>,
627    options: Option<&ParquetMetaDataOptions>,
628) -> Result<RowGroupMetaData> {
629    // create default initialized RowGroupMetaData
630    let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
631
632    // mask values for required fields
633    const RG_COLUMNS: u8 = 1 << 1;
634    const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
635    const RG_NUM_ROWS: u8 = 1 << 3;
636    const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
637
638    let mut mask = 0u8;
639
640    // struct RowGroup {
641    //   1: required list<ColumnChunk> columns
642    //   2: required i64 total_byte_size
643    //   3: required i64 num_rows
644    //   4: optional list<SortingColumn> sorting_columns
645    //   5: optional i64 file_offset
646    //   6: optional i64 total_compressed_size
647    //   7: optional i16 ordinal
648    // }
649    let mut last_field_id = 0i16;
650    loop {
651        let field_ident = prot.read_field_begin(last_field_id)?;
652        if field_ident.field_type == FieldType::Stop {
653            break;
654        }
655        match field_ident.id {
656            1 => {
657                let list_ident = prot.read_list_begin()?;
658                // check for list of struct
659                validate_list_type(ElementType::Struct, &list_ident)?;
660                if schema_descr.num_columns() != list_ident.size as usize {
661                    return Err(general_err!(
662                        "Column count mismatch. Schema has {} columns while Row Group has {}",
663                        schema_descr.num_columns(),
664                        list_ident.size
665                    ));
666                }
667                for i in 0..list_ident.size as usize {
668                    let col = read_column_chunk(prot, &schema_descr.columns()[i], i, options)?;
669                    row_group.columns.push(col);
670                }
671                mask |= RG_COLUMNS;
672            }
673            2 => {
674                row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
675                mask |= RG_TOT_BYTE_SIZE;
676            }
677            3 => {
678                row_group.num_rows = i64::read_thrift(&mut *prot)?;
679                mask |= RG_NUM_ROWS;
680            }
681            4 => {
682                let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
683                row_group.sorting_columns = Some(val);
684            }
685            5 => {
686                row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
687            }
688            // 6: we don't expose total_compressed_size
689            7 => {
690                row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
691            }
692            _ => {
693                prot.skip(field_ident.field_type)?;
694            }
695        };
696        last_field_id = field_ident.id;
697    }
698
699    if mask != RG_ALL_REQUIRED {
700        if mask & RG_COLUMNS == 0 {
701            return Err(general_err!("Required field columns is missing"));
702        }
703        if mask & RG_TOT_BYTE_SIZE == 0 {
704            return Err(general_err!("Required field total_byte_size is missing"));
705        }
706        if mask & RG_NUM_ROWS == 0 {
707            return Err(general_err!("Required field num_rows is missing"));
708        }
709    }
710
711    Ok(row_group)
712}
713
714/// Create a [`SchemaDescriptor`] from thrift input. The input buffer must contain a complete
715/// Parquet footer.
716pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
717    let mut prot = ThriftSliceInputProtocol::new(buf);
718
719    let mut last_field_id = 0i16;
720    loop {
721        let field_ident = prot.read_field_begin(last_field_id)?;
722        if field_ident.field_type == FieldType::Stop {
723            break;
724        }
725        match field_ident.id {
726            2 => {
727                // read schema and convert to SchemaDescriptor for use when reading row groups
728                let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
729                let val = parquet_schema_from_array(val)?;
730                return Ok(SchemaDescriptor::new(val));
731            }
732            _ => prot.skip(field_ident.field_type)?,
733        }
734        last_field_id = field_ident.id;
735    }
736    Err(general_err!("Input does not contain a schema"))
737}
738
739/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes the file metadata in
740/// the Parquet footer. Page indexes will need to be added later.
741pub(crate) fn parquet_metadata_from_bytes(
742    buf: &[u8],
743    options: Option<&ParquetMetaDataOptions>,
744) -> Result<ParquetMetaData> {
745    let mut prot = ThriftSliceInputProtocol::new(buf);
746
747    // begin reading the file metadata
748    let mut version: Option<i32> = None;
749    let mut num_rows: Option<i64> = None;
750    let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
751    let mut key_value_metadata: Option<Vec<KeyValue>> = None;
752    let mut created_by: Option<&str> = None;
753    let mut column_orders: Option<Vec<ColumnOrder>> = None;
754    #[cfg(feature = "encryption")]
755    let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
756    #[cfg(feature = "encryption")]
757    let mut footer_signing_key_metadata: Option<&[u8]> = None;
758
759    // this will need to be set before parsing row groups
760    let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
761
762    // see if we already have a schema.
763    if let Some(options) = options {
764        schema_descr = options.schema().cloned();
765    }
766
767    // struct FileMetaData {
768    //   1: required i32 version
769    //   2: required list<SchemaElement> schema;
770    //   3: required i64 num_rows
771    //   4: required list<RowGroup> row_groups
772    //   5: optional list<KeyValue> key_value_metadata
773    //   6: optional string created_by
774    //   7: optional list<ColumnOrder> column_orders;
775    //   8: optional EncryptionAlgorithm encryption_algorithm
776    //   9: optional binary footer_signing_key_metadata
777    // }
778    let mut last_field_id = 0i16;
779    loop {
780        let field_ident = prot.read_field_begin(last_field_id)?;
781        if field_ident.field_type == FieldType::Stop {
782            break;
783        }
784        match field_ident.id {
785            1 => {
786                version = Some(i32::read_thrift(&mut prot)?);
787            }
788            2 => {
789                // If schema was passed in, skip parsing it
790                if schema_descr.is_some() {
791                    prot.skip(field_ident.field_type)?;
792                } else {
793                    // read schema and convert to SchemaDescriptor for use when reading row groups
794                    let val =
795                        read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
796                    let val = parquet_schema_from_array(val)?;
797                    schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
798                }
799            }
800            3 => {
801                num_rows = Some(i64::read_thrift(&mut prot)?);
802            }
803            4 => {
804                if schema_descr.is_none() {
805                    return Err(general_err!("Required field schema is missing"));
806                }
807                let schema_descr = schema_descr.as_ref().unwrap();
808                let list_ident = prot.read_list_begin()?;
809                // check for list of struct
810                validate_list_type(ElementType::Struct, &list_ident)?;
811                let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
812
813                // Read row groups and handle ordinal assignment
814                let mut assigner = OrdinalAssigner::new();
815                for ordinal in 0..list_ident.size {
816                    let ordinal: i16 = ordinal.try_into().map_err(|_| {
817                        ParquetError::General(format!(
818                            "Row group ordinal {ordinal} exceeds i16 max value",
819                        ))
820                    })?;
821                    let rg = read_row_group(&mut prot, schema_descr, options)?;
822                    rg_vec.push(assigner.ensure(ordinal, rg)?);
823                }
824                row_groups = Some(rg_vec);
825            }
826            5 => {
827                let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
828                key_value_metadata = Some(val);
829            }
830            6 => {
831                created_by = Some(<&str>::read_thrift(&mut prot)?);
832            }
833            7 => {
834                let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
835                column_orders = Some(val);
836            }
837            #[cfg(feature = "encryption")]
838            8 => {
839                let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
840                encryption_algorithm = Some(val);
841            }
842            #[cfg(feature = "encryption")]
843            9 => {
844                footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
845            }
846            _ => {
847                prot.skip(field_ident.field_type)?;
848            }
849        };
850        last_field_id = field_ident.id;
851    }
852    let Some(version) = version else {
853        return Err(general_err!("Required field version is missing"));
854    };
855    let Some(num_rows) = num_rows else {
856        return Err(general_err!("Required field num_rows is missing"));
857    };
858    let Some(row_groups) = row_groups else {
859        return Err(general_err!("Required field row_groups is missing"));
860    };
861
862    let created_by = created_by.map(|c| c.to_owned());
863
864    // we've tested for `None` by now so this is safe
865    let schema_descr = schema_descr.unwrap();
866
867    // need to map read column orders to actual values based on the schema
868    if column_orders
869        .as_ref()
870        .is_some_and(|cos| cos.len() != schema_descr.num_columns())
871    {
872        return Err(general_err!("Column order length mismatch"));
873    }
874    // replace default type defined column orders with ones having the correct sort order
875    // TODO(ets): this could instead be done above when decoding
876    let column_orders = column_orders.map(|mut cos| {
877        for (i, column) in schema_descr.columns().iter().enumerate() {
878            if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
879                let sort_order = ColumnOrder::sort_order_for_type(
880                    column.logical_type_ref(),
881                    column.converted_type(),
882                    column.physical_type(),
883                );
884                cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
885            }
886        }
887        cos
888    });
889
890    #[cfg(not(feature = "encryption"))]
891    let fmd = crate::file::metadata::FileMetaData::new(
892        version,
893        num_rows,
894        created_by,
895        key_value_metadata,
896        schema_descr,
897        column_orders,
898    );
899    #[cfg(feature = "encryption")]
900    let fmd = crate::file::metadata::FileMetaData::new(
901        version,
902        num_rows,
903        created_by,
904        key_value_metadata,
905        schema_descr,
906        column_orders,
907    )
908    .with_encryption_algorithm(encryption_algorithm)
909    .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
910
911    Ok(ParquetMetaData::new(fmd, row_groups))
912}
913
914/// Assign [`RowGroupMetaData::ordinal`]  if it is missing.
915#[derive(Debug, Default)]
916pub(crate) struct OrdinalAssigner {
917    first_has_ordinal: Option<bool>,
918}
919
920impl OrdinalAssigner {
921    fn new() -> Self {
922        Default::default()
923    }
924
925    /// Sets [`RowGroupMetaData::ordinal`] if it is missing.
926    ///
927    /// # Arguments
928    /// - actual_ordinal: The ordinal (index) of the row group being processed
929    ///   in the file metadata.
930    /// - rg: The [`RowGroupMetaData`] to potentially modify.
931    ///
932    /// Ensures:
933    /// 1. If the first row group has an ordinal, all subsequent row groups must
934    ///    also have ordinals.
935    /// 2. If the first row group does NOT have an ordinal, all subsequent row
936    ///    groups must also not have ordinals.
937    fn ensure(
938        &mut self,
939        actual_ordinal: i16,
940        mut rg: RowGroupMetaData,
941    ) -> Result<RowGroupMetaData> {
942        let rg_has_ordinal = rg.ordinal.is_some();
943
944        // Only set first_has_ordinal if it's None (first row group that arrives)
945        if self.first_has_ordinal.is_none() {
946            self.first_has_ordinal = Some(rg_has_ordinal);
947        }
948
949        // assign ordinal if missing and consistent with first row group
950        let first_has_ordinal = self.first_has_ordinal.unwrap();
951        if !first_has_ordinal && !rg_has_ordinal {
952            rg.ordinal = Some(actual_ordinal);
953        } else if first_has_ordinal != rg_has_ordinal {
954            return Err(general_err!(
955                "Inconsistent ordinal assignment: first_has_ordinal is set to \
956                {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
957                first_has_ordinal,
958                actual_ordinal,
959                rg_has_ordinal
960            ));
961        }
962        Ok(rg)
963    }
964}
965
966thrift_struct!(
967    pub(crate) struct IndexPageHeader {}
968);
969
970thrift_struct!(
971pub(crate) struct DictionaryPageHeader {
972  /// Number of values in the dictionary
973  1: required i32 num_values;
974
975  /// Encoding using this dictionary page
976  2: required Encoding encoding
977
978  /// If true, the entries in the dictionary are sorted in ascending order
979  3: optional bool is_sorted;
980}
981);
982
983thrift_struct!(
984/// Statistics for the page header.
985///
986/// This is a duplicate of the [`Statistics`] struct above. Because the page reader uses
987/// the [`Read`] API, we cannot read the min/max values as slices. This should not be
988/// a huge problem since this crate no longer reads the page header statistics by default.
989///
990/// [`Read`]: crate::parquet_thrift::ThriftReadInputProtocol
991pub(crate) struct PageStatistics {
992   1: optional binary max;
993   2: optional binary min;
994   3: optional i64 null_count;
995   4: optional i64 distinct_count;
996   5: optional binary max_value;
997   6: optional binary min_value;
998   7: optional bool is_max_value_exact;
999   8: optional bool is_min_value_exact;
1000}
1001);
1002
1003thrift_struct!(
1004pub(crate) struct DataPageHeader {
1005  1: required i32 num_values
1006  2: required Encoding encoding
1007  3: required Encoding definition_level_encoding;
1008  4: required Encoding repetition_level_encoding;
1009  5: optional PageStatistics statistics;
1010}
1011);
1012
1013impl DataPageHeader {
1014    // reader that skips decoding page statistics
1015    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1016    where
1017        R: ThriftCompactInputProtocol<'a>,
1018    {
1019        let mut num_values: Option<i32> = None;
1020        let mut encoding: Option<Encoding> = None;
1021        let mut definition_level_encoding: Option<Encoding> = None;
1022        let mut repetition_level_encoding: Option<Encoding> = None;
1023        let statistics: Option<PageStatistics> = None;
1024        let mut last_field_id = 0i16;
1025        loop {
1026            let field_ident = prot.read_field_begin(last_field_id)?;
1027            if field_ident.field_type == FieldType::Stop {
1028                break;
1029            }
1030            match field_ident.id {
1031                1 => {
1032                    let val = i32::read_thrift(&mut *prot)?;
1033                    num_values = Some(val);
1034                }
1035                2 => {
1036                    let val = Encoding::read_thrift(&mut *prot)?;
1037                    encoding = Some(val);
1038                }
1039                3 => {
1040                    let val = Encoding::read_thrift(&mut *prot)?;
1041                    definition_level_encoding = Some(val);
1042                }
1043                4 => {
1044                    let val = Encoding::read_thrift(&mut *prot)?;
1045                    repetition_level_encoding = Some(val);
1046                }
1047                _ => {
1048                    prot.skip(field_ident.field_type)?;
1049                }
1050            };
1051            last_field_id = field_ident.id;
1052        }
1053        let Some(num_values) = num_values else {
1054            return Err(general_err!("Required field num_values is missing"));
1055        };
1056        let Some(encoding) = encoding else {
1057            return Err(general_err!("Required field encoding is missing"));
1058        };
1059        let Some(definition_level_encoding) = definition_level_encoding else {
1060            return Err(general_err!(
1061                "Required field definition_level_encoding is missing"
1062            ));
1063        };
1064        let Some(repetition_level_encoding) = repetition_level_encoding else {
1065            return Err(general_err!(
1066                "Required field repetition_level_encoding is missing"
1067            ));
1068        };
1069        Ok(Self {
1070            num_values,
1071            encoding,
1072            definition_level_encoding,
1073            repetition_level_encoding,
1074            statistics,
1075        })
1076    }
1077}
1078
1079thrift_struct!(
1080pub(crate) struct DataPageHeaderV2 {
1081  1: required i32 num_values
1082  2: required i32 num_nulls
1083  3: required i32 num_rows
1084  4: required Encoding encoding
1085  5: required i32 definition_levels_byte_length;
1086  6: required i32 repetition_levels_byte_length;
1087  7: optional bool is_compressed = true;
1088  8: optional PageStatistics statistics;
1089}
1090);
1091
1092impl DataPageHeaderV2 {
1093    // reader that skips decoding page statistics
1094    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1095    where
1096        R: ThriftCompactInputProtocol<'a>,
1097    {
1098        let mut num_values: Option<i32> = None;
1099        let mut num_nulls: Option<i32> = None;
1100        let mut num_rows: Option<i32> = None;
1101        let mut encoding: Option<Encoding> = None;
1102        let mut definition_levels_byte_length: Option<i32> = None;
1103        let mut repetition_levels_byte_length: Option<i32> = None;
1104        let mut is_compressed: Option<bool> = None;
1105        let statistics: Option<PageStatistics> = None;
1106        let mut last_field_id = 0i16;
1107        loop {
1108            let field_ident = prot.read_field_begin(last_field_id)?;
1109            if field_ident.field_type == FieldType::Stop {
1110                break;
1111            }
1112            match field_ident.id {
1113                1 => {
1114                    let val = i32::read_thrift(&mut *prot)?;
1115                    num_values = Some(val);
1116                }
1117                2 => {
1118                    let val = i32::read_thrift(&mut *prot)?;
1119                    num_nulls = Some(val);
1120                }
1121                3 => {
1122                    let val = i32::read_thrift(&mut *prot)?;
1123                    num_rows = Some(val);
1124                }
1125                4 => {
1126                    let val = Encoding::read_thrift(&mut *prot)?;
1127                    encoding = Some(val);
1128                }
1129                5 => {
1130                    let val = i32::read_thrift(&mut *prot)?;
1131                    definition_levels_byte_length = Some(val);
1132                }
1133                6 => {
1134                    let val = i32::read_thrift(&mut *prot)?;
1135                    repetition_levels_byte_length = Some(val);
1136                }
1137                7 => {
1138                    is_compressed = Some(field_ident.bool_val()?);
1139                }
1140                _ => {
1141                    prot.skip(field_ident.field_type)?;
1142                }
1143            };
1144            last_field_id = field_ident.id;
1145        }
1146        let Some(num_values) = num_values else {
1147            return Err(general_err!("Required field num_values is missing"));
1148        };
1149        let Some(num_nulls) = num_nulls else {
1150            return Err(general_err!("Required field num_nulls is missing"));
1151        };
1152        let Some(num_rows) = num_rows else {
1153            return Err(general_err!("Required field num_rows is missing"));
1154        };
1155        let Some(encoding) = encoding else {
1156            return Err(general_err!("Required field encoding is missing"));
1157        };
1158        let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1159            return Err(general_err!(
1160                "Required field definition_levels_byte_length is missing"
1161            ));
1162        };
1163        let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1164            return Err(general_err!(
1165                "Required field repetition_levels_byte_length is missing"
1166            ));
1167        };
1168        Ok(Self {
1169            num_values,
1170            num_nulls,
1171            num_rows,
1172            encoding,
1173            definition_levels_byte_length,
1174            repetition_levels_byte_length,
1175            is_compressed,
1176            statistics,
1177        })
1178    }
1179}
1180
1181thrift_struct!(
1182pub(crate) struct PageHeader {
1183  /// the type of the page: indicates which of the *_header fields is set
1184  1: required PageType r#type
1185
1186  /// Uncompressed page size in bytes (not including this header)
1187  2: required i32 uncompressed_page_size
1188
1189  /// Compressed (and potentially encrypted) page size in bytes, not including this header
1190  3: required i32 compressed_page_size
1191
1192  /// The 32-bit CRC checksum for the page, to be be calculated as follows:
1193  4: optional i32 crc
1194
1195  // Headers for page specific data.  One only will be set.
1196  5: optional DataPageHeader data_page_header;
1197  6: optional IndexPageHeader index_page_header;
1198  7: optional DictionaryPageHeader dictionary_page_header;
1199  8: optional DataPageHeaderV2 data_page_header_v2;
1200}
1201);
1202
1203impl PageHeader {
1204    // reader that skips reading page statistics. obtained by running
1205    // `cargo expand -p parquet --all-features --lib file::metadata::thrift`
1206    // and modifying the impl of `read_thrift`
1207    pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1208    where
1209        R: ThriftCompactInputProtocol<'a>,
1210    {
1211        let mut type_: Option<PageType> = None;
1212        let mut uncompressed_page_size: Option<i32> = None;
1213        let mut compressed_page_size: Option<i32> = None;
1214        let mut crc: Option<i32> = None;
1215        let mut data_page_header: Option<DataPageHeader> = None;
1216        let mut index_page_header: Option<IndexPageHeader> = None;
1217        let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1218        let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1219        let mut last_field_id = 0i16;
1220        loop {
1221            let field_ident = prot.read_field_begin(last_field_id)?;
1222            if field_ident.field_type == FieldType::Stop {
1223                break;
1224            }
1225            match field_ident.id {
1226                1 => {
1227                    let val = PageType::read_thrift(&mut *prot)?;
1228                    type_ = Some(val);
1229                }
1230                2 => {
1231                    let val = i32::read_thrift(&mut *prot)?;
1232                    uncompressed_page_size = Some(val);
1233                }
1234                3 => {
1235                    let val = i32::read_thrift(&mut *prot)?;
1236                    compressed_page_size = Some(val);
1237                }
1238                4 => {
1239                    let val = i32::read_thrift(&mut *prot)?;
1240                    crc = Some(val);
1241                }
1242                5 => {
1243                    let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1244                    data_page_header = Some(val);
1245                }
1246                6 => {
1247                    let val = IndexPageHeader::read_thrift(&mut *prot)?;
1248                    index_page_header = Some(val);
1249                }
1250                7 => {
1251                    let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1252                    dictionary_page_header = Some(val);
1253                }
1254                8 => {
1255                    let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1256                    data_page_header_v2 = Some(val);
1257                }
1258                _ => {
1259                    prot.skip(field_ident.field_type)?;
1260                }
1261            };
1262            last_field_id = field_ident.id;
1263        }
1264        let Some(type_) = type_ else {
1265            return Err(general_err!("Required field type_ is missing"));
1266        };
1267        let Some(uncompressed_page_size) = uncompressed_page_size else {
1268            return Err(general_err!(
1269                "Required field uncompressed_page_size is missing"
1270            ));
1271        };
1272        let Some(compressed_page_size) = compressed_page_size else {
1273            return Err(general_err!(
1274                "Required field compressed_page_size is missing"
1275            ));
1276        };
1277        Ok(Self {
1278            r#type: type_,
1279            uncompressed_page_size,
1280            compressed_page_size,
1281            crc,
1282            data_page_header,
1283            index_page_header,
1284            dictionary_page_header,
1285            data_page_header_v2,
1286        })
1287    }
1288}
1289
1290/////////////////////////////////////////////////
1291// helper functions for writing file meta data
1292
1293#[cfg(feature = "encryption")]
1294fn should_write_column_stats(column_chunk: &ColumnChunkMetaData) -> bool {
1295    // If there is encrypted column metadata present,
1296    // the column is encrypted with a different key to the footer or a plaintext footer is used,
1297    // so the statistics are sensitive and shouldn't be written.
1298    column_chunk.encrypted_column_metadata.is_none()
1299}
1300
1301#[cfg(not(feature = "encryption"))]
1302fn should_write_column_stats(_column_chunk: &ColumnChunkMetaData) -> bool {
1303    true
1304}
1305
1306// serialize the bits of the column chunk needed for a thrift ColumnMetaData
1307// struct ColumnMetaData {
1308//   1: required Type type
1309//   2: required list<Encoding> encodings
1310//   3: required list<string> path_in_schema
1311//   4: required CompressionCodec codec
1312//   5: required i64 num_values
1313//   6: required i64 total_uncompressed_size
1314//   7: required i64 total_compressed_size
1315//   8: optional list<KeyValue> key_value_metadata
1316//   9: required i64 data_page_offset
1317//   10: optional i64 index_page_offset
1318//   11: optional i64 dictionary_page_offset
1319//   12: optional Statistics statistics;
1320//   13: optional list<PageEncodingStats> encoding_stats;
1321//   14: optional i64 bloom_filter_offset;
1322//   15: optional i32 bloom_filter_length;
1323//   16: optional SizeStatistics size_statistics;
1324//   17: optional GeospatialStatistics geospatial_statistics;
1325// }
1326pub(super) fn serialize_column_meta_data<W: Write>(
1327    column_chunk: &ColumnChunkMetaData,
1328    w: &mut ThriftCompactOutputProtocol<W>,
1329) -> Result<()> {
1330    use crate::file::statistics::page_stats_to_thrift;
1331
1332    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1333    column_chunk
1334        .encodings()
1335        .collect::<Vec<_>>()
1336        .write_thrift_field(w, 2, 1)?;
1337    if w.write_path_in_schema() {
1338        let path = column_chunk.column_descr.path().parts();
1339        let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1340        path.write_thrift_field(w, 3, 2)?;
1341        column_chunk.compression.write_thrift_field(w, 4, 3)?;
1342    } else {
1343        column_chunk.compression.write_thrift_field(w, 4, 2)?;
1344    }
1345
1346    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1347    column_chunk
1348        .total_uncompressed_size
1349        .write_thrift_field(w, 6, 5)?;
1350    column_chunk
1351        .total_compressed_size
1352        .write_thrift_field(w, 7, 6)?;
1353    // no key_value_metadata here
1354    let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1355    if let Some(index_page_offset) = column_chunk.index_page_offset {
1356        last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1357    }
1358    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1359        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1360    }
1361
1362    if should_write_column_stats(column_chunk) {
1363        // PageStatistics is the same as thrift Statistics, but writable
1364        let stats = page_stats_to_thrift(column_chunk.statistics());
1365        if let Some(stats) = stats {
1366            last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1367        }
1368        if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1369            last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1370        }
1371        if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1372            last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1373        }
1374        if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1375            last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1376        }
1377
1378        // SizeStatistics
1379        let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1380            || column_chunk.repetition_level_histogram.is_some()
1381            || column_chunk.definition_level_histogram.is_some()
1382        {
1383            let repetition_level_histogram = column_chunk
1384                .repetition_level_histogram()
1385                .map(|hist| hist.clone().into_inner());
1386
1387            let definition_level_histogram = column_chunk
1388                .definition_level_histogram()
1389                .map(|hist| hist.clone().into_inner());
1390
1391            Some(SizeStatistics {
1392                unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1393                repetition_level_histogram,
1394                definition_level_histogram,
1395            })
1396        } else {
1397            None
1398        };
1399        if let Some(size_stats) = size_stats {
1400            last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1401        }
1402
1403        if let Some(geo_stats) = column_chunk.geo_statistics() {
1404            geo_stats.write_thrift_field(w, 17, last_field_id)?;
1405        }
1406    }
1407
1408    w.write_struct_end()
1409}
1410
1411// temp struct used for writing
1412pub(super) struct FileMeta<'a> {
1413    pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1414    pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1415    // If true, then write the `path_in_schema` field in the ColumnMetaData struct.
1416    pub(super) write_path_in_schema: bool,
1417}
1418
1419// struct FileMetaData {
1420//   1: required i32 version
1421//   2: required list<SchemaElement> schema;
1422//   3: required i64 num_rows
1423//   4: required list<RowGroup> row_groups
1424//   5: optional list<KeyValue> key_value_metadata
1425//   6: optional string created_by
1426//   7: optional list<ColumnOrder> column_orders;
1427//   8: optional EncryptionAlgorithm encryption_algorithm
1428//   9: optional binary footer_signing_key_metadata
1429// }
1430impl<'a> WriteThrift for FileMeta<'a> {
1431    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1432
1433    // needed for last_field_id w/o encryption
1434    #[allow(unused_assignments)]
1435    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1436        writer.set_write_path_in_schema(self.write_path_in_schema);
1437
1438        self.file_metadata
1439            .version
1440            .write_thrift_field(writer, 1, 0)?;
1441
1442        // field 2 is schema. do depth-first traversal of tree, converting to SchemaElement and
1443        // writing along the way.
1444        let root = self.file_metadata.schema_descr().root_schema_ptr();
1445        let schema_len = num_nodes(&root)?;
1446        writer.write_field_begin(FieldType::List, 2, 1)?;
1447        writer.write_list_begin(ElementType::Struct, schema_len)?;
1448        // recursively write Type nodes as SchemaElements
1449        write_schema(&root, writer)?;
1450
1451        self.file_metadata
1452            .num_rows
1453            .write_thrift_field(writer, 3, 2)?;
1454
1455        // this will call RowGroupMetaData::write_thrift
1456        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1457
1458        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1459            last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1460        }
1461        if let Some(created_by) = self.file_metadata.created_by() {
1462            last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1463        }
1464        if let Some(column_orders) = self.file_metadata.column_orders() {
1465            last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1466        }
1467        #[cfg(feature = "encryption")]
1468        if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1469            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1470        }
1471        #[cfg(feature = "encryption")]
1472        if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1473            key.as_slice()
1474                .write_thrift_field(writer, 9, last_field_id)?;
1475        }
1476
1477        writer.write_struct_end()
1478    }
1479}
1480
1481fn write_schema<W: Write>(
1482    schema: &TypePtr,
1483    writer: &mut ThriftCompactOutputProtocol<W>,
1484) -> Result<()> {
1485    if !schema.is_group() {
1486        return Err(general_err!("Root schema must be Group type"));
1487    }
1488    write_schema_helper(schema, writer)
1489}
1490
1491fn write_schema_helper<W: Write>(
1492    node: &TypePtr,
1493    writer: &mut ThriftCompactOutputProtocol<W>,
1494) -> Result<()> {
1495    match node.as_ref() {
1496        crate::schema::types::Type::PrimitiveType {
1497            basic_info,
1498            physical_type,
1499            type_length,
1500            scale,
1501            precision,
1502        } => {
1503            let element = SchemaElement {
1504                r#type: Some(*physical_type),
1505                type_length: if *type_length >= 0 {
1506                    Some(*type_length)
1507                } else {
1508                    None
1509                },
1510                repetition_type: Some(basic_info.repetition()),
1511                name: basic_info.name(),
1512                num_children: None,
1513                converted_type: match basic_info.converted_type() {
1514                    ConvertedType::NONE => None,
1515                    other => Some(other),
1516                },
1517                scale: if *scale >= 0 { Some(*scale) } else { None },
1518                precision: if *precision >= 0 {
1519                    Some(*precision)
1520                } else {
1521                    None
1522                },
1523                field_id: if basic_info.has_id() {
1524                    Some(basic_info.id())
1525                } else {
1526                    None
1527                },
1528                logical_type: basic_info.logical_type_ref().cloned(),
1529            };
1530            element.write_thrift(writer)
1531        }
1532        crate::schema::types::Type::GroupType { basic_info, fields } => {
1533            let repetition = if basic_info.has_repetition() {
1534                Some(basic_info.repetition())
1535            } else {
1536                None
1537            };
1538
1539            let element = SchemaElement {
1540                r#type: None,
1541                type_length: None,
1542                repetition_type: repetition,
1543                name: basic_info.name(),
1544                num_children: Some(fields.len().try_into()?),
1545                converted_type: match basic_info.converted_type() {
1546                    ConvertedType::NONE => None,
1547                    other => Some(other),
1548                },
1549                scale: None,
1550                precision: None,
1551                field_id: if basic_info.has_id() {
1552                    Some(basic_info.id())
1553                } else {
1554                    None
1555                },
1556                logical_type: basic_info.logical_type_ref().cloned(),
1557            };
1558
1559            element.write_thrift(writer)?;
1560
1561            // Add child elements for a group
1562            for field in fields {
1563                write_schema_helper(field, writer)?;
1564            }
1565            Ok(())
1566        }
1567    }
1568}
1569
1570// struct RowGroup {
1571//   1: required list<ColumnChunk> columns
1572//   2: required i64 total_byte_size
1573//   3: required i64 num_rows
1574//   4: optional list<SortingColumn> sorting_columns
1575//   5: optional i64 file_offset
1576//   6: optional i64 total_compressed_size
1577//   7: optional i16 ordinal
1578// }
1579impl WriteThrift for RowGroupMetaData {
1580    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1581
1582    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1583        // this will call ColumnChunkMetaData::write_thrift
1584        self.columns.write_thrift_field(writer, 1, 0)?;
1585        self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1586        let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1587        if let Some(sorting_columns) = self.sorting_columns() {
1588            last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1589        }
1590        if let Some(file_offset) = self.file_offset() {
1591            last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1592        }
1593        // this is optional, but we'll always write it
1594        last_field_id = self
1595            .compressed_size()
1596            .write_thrift_field(writer, 6, last_field_id)?;
1597        if let Some(ordinal) = self.ordinal() {
1598            ordinal.write_thrift_field(writer, 7, last_field_id)?;
1599        }
1600        writer.write_struct_end()
1601    }
1602}
1603
1604// struct ColumnChunk {
1605//   1: optional string file_path
1606//   2: required i64 file_offset = 0
1607//   3: optional ColumnMetaData meta_data
1608//   4: optional i64 offset_index_offset
1609//   5: optional i32 offset_index_length
1610//   6: optional i64 column_index_offset
1611//   7: optional i32 column_index_length
1612//   8: optional ColumnCryptoMetaData crypto_metadata
1613//   9: optional binary encrypted_column_metadata
1614// }
1615impl WriteThrift for ColumnChunkMetaData {
1616    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1617
1618    #[allow(unused_assignments)]
1619    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1620        let mut last_field_id = 0i16;
1621        if let Some(file_path) = self.file_path() {
1622            last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1623        }
1624        last_field_id = self
1625            .file_offset()
1626            .write_thrift_field(writer, 2, last_field_id)?;
1627
1628        #[cfg(feature = "encryption")]
1629        let write_meta_data =
1630            self.encrypted_column_metadata.is_none() || self.plaintext_footer_mode;
1631        #[cfg(not(feature = "encryption"))]
1632        let write_meta_data = true;
1633
1634        // When the footer is encrypted and encrypted_column_metadata is present,
1635        // skip writing the plaintext meta_data field to reduce footer size.
1636        // When the footer is plaintext (plaintext_footer_mode=true), we still write
1637        // meta_data for backward compatibility with readers that expect it, but with
1638        // sensitive fields (statistics, bloom filter info, etc.) stripped out.
1639        if write_meta_data {
1640            writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1641            serialize_column_meta_data(self, writer)?;
1642            last_field_id = 3;
1643        }
1644
1645        if let Some(offset_idx_off) = self.offset_index_offset() {
1646            last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1647        }
1648        if let Some(offset_idx_len) = self.offset_index_length() {
1649            last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1650        }
1651        if let Some(column_idx_off) = self.column_index_offset() {
1652            last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1653        }
1654        if let Some(column_idx_len) = self.column_index_length() {
1655            last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1656        }
1657        #[cfg(feature = "encryption")]
1658        {
1659            if let Some(crypto_metadata) = self.crypto_metadata() {
1660                last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1661            }
1662            if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1663                encrypted_meta
1664                    .as_slice()
1665                    .write_thrift_field(writer, 9, last_field_id)?;
1666            }
1667        }
1668
1669        writer.write_struct_end()
1670    }
1671}
1672
1673// struct GeospatialStatistics {
1674//   1: optional BoundingBox bbox;
1675//   2: optional list<i32> geospatial_types;
1676// }
1677impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1678    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1679
1680    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1681        let mut last_field_id = 0i16;
1682        if let Some(bbox) = self.bounding_box() {
1683            last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1684        }
1685        if let Some(geo_types) = self.geospatial_types() {
1686            geo_types.write_thrift_field(writer, 2, last_field_id)?;
1687        }
1688
1689        writer.write_struct_end()
1690    }
1691}
1692
1693// macro cannot handle qualified names
1694use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1695write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1696
1697// struct BoundingBox {
1698//   1: required double xmin;
1699//   2: required double xmax;
1700//   3: required double ymin;
1701//   4: required double ymax;
1702//   5: optional double zmin;
1703//   6: optional double zmax;
1704//   7: optional double mmin;
1705//   8: optional double mmax;
1706// }
1707impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1708    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1709
1710    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1711        self.get_xmin().write_thrift_field(writer, 1, 0)?;
1712        self.get_xmax().write_thrift_field(writer, 2, 1)?;
1713        self.get_ymin().write_thrift_field(writer, 3, 2)?;
1714        let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1715
1716        if let Some(zmin) = self.get_zmin() {
1717            last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1718        }
1719        if let Some(zmax) = self.get_zmax() {
1720            last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1721        }
1722        if let Some(mmin) = self.get_mmin() {
1723            last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1724        }
1725        if let Some(mmax) = self.get_mmax() {
1726            mmax.write_thrift_field(writer, 8, last_field_id)?;
1727        }
1728
1729        writer.write_struct_end()
1730    }
1731}
1732
1733// macro cannot handle qualified names
1734use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1735write_thrift_field!(RustBoundingBox, FieldType::Struct);
1736
1737#[cfg(test)]
1738pub(crate) mod tests {
1739    use crate::basic::{Encoding, PageType, Type as PhysicalType};
1740    use crate::errors::Result;
1741    use crate::file::metadata::thrift::{
1742        BoundingBox, DataPageHeaderV2, DictionaryPageHeader, PageHeader, SchemaElement,
1743        write_schema,
1744    };
1745    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
1746    use crate::parquet_thrift::tests::test_roundtrip;
1747    use crate::parquet_thrift::{
1748        ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift,
1749        read_thrift_vec,
1750    };
1751    use crate::schema::types::{
1752        ColumnDescriptor, ColumnPath, SchemaDescriptor, TypePtr, num_nodes,
1753        parquet_schema_from_array,
1754    };
1755    use std::sync::Arc;
1756
1757    // for testing. decode thrift encoded RowGroup
1758    pub(crate) fn read_row_group(
1759        buf: &mut [u8],
1760        schema_descr: Arc<SchemaDescriptor>,
1761    ) -> Result<RowGroupMetaData> {
1762        let mut reader = ThriftSliceInputProtocol::new(buf);
1763        crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None)
1764    }
1765
1766    pub(crate) fn read_column_chunk(
1767        buf: &mut [u8],
1768        column_descr: Arc<ColumnDescriptor>,
1769    ) -> Result<ColumnChunkMetaData> {
1770        read_column_chunk_with_options(buf, column_descr, None)
1771    }
1772
1773    pub(crate) fn read_column_chunk_with_options(
1774        buf: &mut [u8],
1775        column_descr: Arc<ColumnDescriptor>,
1776        options: Option<&ParquetMetaDataOptions>,
1777    ) -> Result<ColumnChunkMetaData> {
1778        let mut reader = ThriftSliceInputProtocol::new(buf);
1779        crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
1780    }
1781
1782    pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1783        let num_nodes = num_nodes(&schema)?;
1784        let mut buf = Vec::new();
1785        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1786
1787        // kick off writing list
1788        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1789
1790        // write SchemaElements
1791        write_schema(&schema, &mut writer)?;
1792
1793        let mut prot = ThriftSliceInputProtocol::new(&buf);
1794        let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1795        parquet_schema_from_array(se)
1796    }
1797
1798    pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1799        let num_nodes = num_nodes(schema)?;
1800        let mut buf = Vec::new();
1801        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1802
1803        // kick off writing list
1804        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1805
1806        // write SchemaElements
1807        write_schema(schema, &mut writer)?;
1808        Ok(buf)
1809    }
1810
1811    pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1812        let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1813        read_thrift_vec(&mut prot)
1814    }
1815
1816    fn thrift_bytes<T: WriteThrift>(value: &T) -> Vec<u8> {
1817        let mut buf = Vec::new();
1818        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1819        value.write_thrift(&mut writer).unwrap();
1820        buf
1821    }
1822
1823    fn change_false_bool_field_to_i32(buf: &mut [u8]) {
1824        let pos = buf
1825            .iter()
1826            .rposition(|byte| *byte == 0x12)
1827            .expect("expected BOOL_FALSE field header byte");
1828        buf[pos] = 0x15;
1829    }
1830
1831    fn assert_malformed_bool_error(err: crate::errors::ParquetError) {
1832        let msg = err.to_string();
1833        assert!(
1834            msg.contains("Unexpected struct field type"),
1835            "unexpected error message: {msg}"
1836        );
1837    }
1838
1839    #[test]
1840    fn test_bounding_box_roundtrip() {
1841        test_roundtrip(BoundingBox {
1842            xmin: 0.1.into(),
1843            xmax: 10.3.into(),
1844            ymin: 0.001.into(),
1845            ymax: 128.5.into(),
1846            zmin: None,
1847            zmax: None,
1848            mmin: None,
1849            mmax: None,
1850        });
1851
1852        test_roundtrip(BoundingBox {
1853            xmin: 0.1.into(),
1854            xmax: 10.3.into(),
1855            ymin: 0.001.into(),
1856            ymax: 128.5.into(),
1857            zmin: Some(11.0.into()),
1858            zmax: Some(1300.0.into()),
1859            mmin: None,
1860            mmax: None,
1861        });
1862
1863        test_roundtrip(BoundingBox {
1864            xmin: 0.1.into(),
1865            xmax: 10.3.into(),
1866            ymin: 0.001.into(),
1867            ymax: 128.5.into(),
1868            zmin: Some(11.0.into()),
1869            zmax: Some(1300.0.into()),
1870            mmin: Some(3.7.into()),
1871            mmax: Some(42.0.into()),
1872        });
1873    }
1874
1875    #[test]
1876    fn test_convert_stats_preserves_missing_null_count() {
1877        let primitive =
1878            crate::schema::types::Type::primitive_type_builder("col", PhysicalType::INT32)
1879                .build()
1880                .unwrap();
1881        let column_descr = Arc::new(ColumnDescriptor::new(
1882            Arc::new(primitive),
1883            0,
1884            0,
1885            ColumnPath::new(vec![]),
1886        ));
1887
1888        let none_null_count = super::Statistics {
1889            max: None,
1890            min: None,
1891            null_count: None,
1892            distinct_count: None,
1893            max_value: None,
1894            min_value: None,
1895            is_max_value_exact: None,
1896            is_min_value_exact: None,
1897        };
1898        let decoded_none = super::convert_stats(&column_descr, Some(none_null_count))
1899            .unwrap()
1900            .unwrap();
1901        assert_eq!(decoded_none.null_count_opt(), None);
1902
1903        let zero_null_count = super::Statistics {
1904            max: None,
1905            min: None,
1906            null_count: Some(0),
1907            distinct_count: None,
1908            max_value: None,
1909            min_value: None,
1910            is_max_value_exact: None,
1911            is_min_value_exact: None,
1912        };
1913        let decoded_zero = super::convert_stats(&column_descr, Some(zero_null_count))
1914            .unwrap()
1915            .unwrap();
1916        assert_eq!(decoded_zero.null_count_opt(), Some(0));
1917    }
1918
1919    #[test]
1920    fn malformed_bool_field_returns_error_not_panic() {
1921        let page_header = PageHeader {
1922            r#type: PageType::DICTIONARY_PAGE,
1923            uncompressed_page_size: 1,
1924            compressed_page_size: 1,
1925            crc: None,
1926            data_page_header: None,
1927            index_page_header: None,
1928            dictionary_page_header: Some(DictionaryPageHeader {
1929                num_values: 1,
1930                encoding: Encoding::PLAIN,
1931                is_sorted: Some(false),
1932            }),
1933            data_page_header_v2: None,
1934        };
1935
1936        let mut buf = thrift_bytes(&page_header);
1937        change_false_bool_field_to_i32(&mut buf);
1938
1939        let mut prot = ThriftSliceInputProtocol::new(&buf);
1940        let err = PageHeader::read_thrift_without_stats(&mut prot)
1941            .expect_err("malformed bool field should return an error");
1942        assert_malformed_bool_error(err);
1943    }
1944
1945    #[test]
1946    fn malformed_data_page_v2_bool_field_returns_error_not_panic() {
1947        let data_page_header_v2 = DataPageHeaderV2 {
1948            num_values: 1,
1949            num_nulls: 0,
1950            num_rows: 1,
1951            encoding: Encoding::PLAIN,
1952            definition_levels_byte_length: 0,
1953            repetition_levels_byte_length: 0,
1954            is_compressed: Some(false),
1955            statistics: None,
1956        };
1957
1958        let mut buf = thrift_bytes(&data_page_header_v2);
1959        change_false_bool_field_to_i32(&mut buf);
1960
1961        let mut prot = ThriftSliceInputProtocol::new(&buf);
1962        let err = DataPageHeaderV2::read_thrift_without_stats(&mut prot)
1963            .expect_err("malformed bool field should return an error");
1964        assert_malformed_bool_error(err);
1965    }
1966}