Skip to main content

parquet/file/metadata/thrift/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module is the bridge between a Parquet file's thrift encoded metadata
19//! and this crate's [Parquet metadata API]. It contains objects and functions used
20//! to serialize/deserialize metadata objects into/from the Thrift compact protocol
21//! format as defined by the [Parquet specification].
22//!
23//! [Parquet metadata API]: crate::file::metadata
24//! [Parquet specification]: https://github.com/apache/parquet-format/tree/master
25
26use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34    column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37    basic::{
38        ColumnOrder, CompressionCodec, ConvertedType, Encoding, EncodingMask, LogicalType,
39        PageType, Repetition, Type,
40    },
41    data_type::{ByteArray, FixedLenByteArray, Int96},
42    errors::{ParquetError, Result},
43    file::{
44        metadata::{
45            ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46            PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, ParquetPageEncodingStats,
47            RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
48        },
49        statistics::ValueStatistics,
50    },
51    parquet_thrift::{
52        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54        read_thrift_vec, validate_list_type,
55    },
56    schema::types::{
57        ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58    },
59    thrift_struct,
60    util::bit_util::FromBytes,
61    write_thrift_field,
62};
63
64// this needs to be visible to the schema conversion code
65thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67  /// Data type for this field. Not set if the current element is a non-leaf node
68  1: optional Type r#type;
69  /// If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values.
70  /// Otherwise, if specified, this is the maximum bit length to store any of the values.
71  /// (e.g. a low cardinality INT col could have this set to 3).  Note that this is
72  /// in the schema, and therefore fixed for the entire file.
73  2: optional i32 type_length;
74  /// Repetition of the field. The root of the schema does not have a repetition_type.
75  /// All other nodes must have one.
76  3: optional Repetition repetition_type;
77  /// Name of the field in the schema
78  4: required string<'a> name;
79  /// Nested fields. Since thrift does not support nested fields,
80  /// the nesting is flattened to a single list by a depth-first traversal.
81  /// The children count is used to construct the nested relationship.
82  /// This field is not set when the element is a primitive type.
83  5: optional i32 num_children;
84  /// DEPRECATED: When the schema is the result of a conversion from another model.
85  /// Used to record the original type to help with cross conversion.
86  ///
87  /// This is superseded by logical_type.
88  6: optional ConvertedType converted_type;
89  /// DEPRECATED: Used when this column contains decimal data.
90  /// See the DECIMAL converted type for more details.
91  ///
92  /// This is superseded by using the DecimalType annotation in logical_type.
93  7: optional i32 scale
94  8: optional i32 precision
95  /// When the original schema supports field ids, this will save the
96  /// original field id in the parquet schema
97  9: optional i32 field_id;
98  /// The logical type of this SchemaElement
99  ///
100  /// LogicalType replaces ConvertedType, but ConvertedType is still required
101  /// for some logical types to ensure forward-compatibility in format v1.
102  10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108   1: optional binary<'a> max;
109   2: optional binary<'a> min;
110   3: optional i64 null_count;
111   4: optional i64 distinct_count;
112   5: optional binary<'a> max_value;
113   6: optional binary<'a> min_value;
114   7: optional bool is_max_value_exact;
115   8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121  1: required double xmin;
122  2: required double xmax;
123  3: required double ymin;
124  4: required double ymax;
125  5: optional double zmin;
126  6: optional double zmax;
127  7: optional double mmin;
128  8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134  1: optional BoundingBox bbox;
135  2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141   1: optional i64 unencoded_byte_array_data_bytes;
142   2: optional list<i64> repetition_level_histogram;
143   3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148    stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150    stats.map(|st| {
151        let bbox = convert_bounding_box(st.bbox);
152        let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153        Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154            bbox,
155            geospatial_types,
156        ))
157    })
158}
159
160fn convert_bounding_box(
161    bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163    bbox.map(|bb| {
164        let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165            bb.xmin.into(),
166            bb.xmax.into(),
167            bb.ymin.into(),
168            bb.ymax.into(),
169        );
170
171        newbb = match (bb.zmin, bb.zmax) {
172            (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173            // If either None or mismatch, leave it as None and don't error
174            _ => newbb,
175        };
176
177        newbb = match (bb.mmin, bb.mmax) {
178            (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179            // If either None or mismatch, leave it as None and don't error
180            _ => newbb,
181        };
182
183        newbb
184    })
185}
186
187/// Create a [`crate::file::statistics::Statistics`] from a thrift [`Statistics`] object.
188fn convert_stats(
189    column_descr: &Arc<ColumnDescriptor>,
190    thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192    use crate::file::statistics::Statistics as FStatistics;
193    Ok(match thrift_stats {
194        Some(stats) => {
195            // Generic null count.
196            let null_count = stats
197                .null_count
198                .map(|null_count| {
199                    if null_count < 0 {
200                        return Err(general_err!(
201                            "Statistics null count is negative {}",
202                            null_count
203                        ));
204                    }
205                    Ok(null_count as u64)
206                })
207                .transpose()?;
208            // Generic distinct count (count of distinct values occurring)
209            let distinct_count = stats.distinct_count.map(|value| value as u64);
210            // Whether or not statistics use deprecated min/max fields.
211            let old_format = stats.min_value.is_none() && stats.max_value.is_none();
212            // Generic min value as bytes.
213            let min = if old_format {
214                stats.min
215            } else {
216                stats.min_value
217            };
218            // Generic max value as bytes.
219            let max = if old_format {
220                stats.max
221            } else {
222                stats.max_value
223            };
224
225            fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
226                if let Some(min) = min {
227                    if min.len() < len {
228                        return Err(general_err!("Insufficient bytes to parse min statistic",));
229                    }
230                }
231                if let Some(max) = max {
232                    if max.len() < len {
233                        return Err(general_err!("Insufficient bytes to parse max statistic",));
234                    }
235                }
236                Ok(())
237            }
238
239            let physical_type = column_descr.physical_type();
240            match physical_type {
241                Type::BOOLEAN => check_len(&min, &max, 1),
242                Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
243                Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
244                Type::INT96 => check_len(&min, &max, 12),
245                _ => Ok(()),
246            }?;
247
248            // Values are encoded using PLAIN encoding definition, except that
249            // variable-length byte arrays do not include a length prefix.
250            //
251            // Instead of using actual decoder, we manually convert values.
252            let res = match physical_type {
253                Type::BOOLEAN => FStatistics::boolean(
254                    min.map(|data| data[0] != 0),
255                    max.map(|data| data[0] != 0),
256                    distinct_count,
257                    null_count,
258                    old_format,
259                ),
260                Type::INT32 => FStatistics::int32(
261                    min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
262                    max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263                    distinct_count,
264                    null_count,
265                    old_format,
266                ),
267                Type::INT64 => FStatistics::int64(
268                    min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
269                    max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270                    distinct_count,
271                    null_count,
272                    old_format,
273                ),
274                Type::INT96 => {
275                    // INT96 statistics may not be correct, because comparison is signed
276                    let min = if let Some(data) = min {
277                        if data.len() != 12 {
278                            return Err(general_err!("Incorrect Int96 min statistics"));
279                        }
280                        Some(Int96::try_from_le_slice(data)?)
281                    } else {
282                        None
283                    };
284                    let max = if let Some(data) = max {
285                        if data.len() != 12 {
286                            return Err(general_err!("Incorrect Int96 max statistics"));
287                        }
288                        Some(Int96::try_from_le_slice(data)?)
289                    } else {
290                        None
291                    };
292                    FStatistics::int96(min, max, distinct_count, null_count, old_format)
293                }
294                Type::FLOAT => FStatistics::float(
295                    min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
296                    max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
297                    distinct_count,
298                    null_count,
299                    old_format,
300                ),
301                Type::DOUBLE => FStatistics::double(
302                    min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
303                    max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
304                    distinct_count,
305                    null_count,
306                    old_format,
307                ),
308                Type::BYTE_ARRAY => FStatistics::ByteArray(
309                    ValueStatistics::new(
310                        min.map(ByteArray::from),
311                        max.map(ByteArray::from),
312                        distinct_count,
313                        null_count,
314                        old_format,
315                    )
316                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
317                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
318                ),
319                Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
320                    ValueStatistics::new(
321                        min.map(ByteArray::from).map(FixedLenByteArray::from),
322                        max.map(ByteArray::from).map(FixedLenByteArray::from),
323                        distinct_count,
324                        null_count,
325                        old_format,
326                    )
327                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
328                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
329                ),
330            };
331
332            Some(res)
333        }
334        None => None,
335    })
336}
337
338// bit positions for required fields in the Thrift ColumnMetaData struct
339const COL_META_TYPE: u16 = 1 << 1;
340const COL_META_ENCODINGS: u16 = 1 << 2;
341const COL_META_CODEC: u16 = 1 << 4;
342const COL_META_NUM_VALUES: u16 = 1 << 5;
343const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
344const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
345const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
346
347// a mask where all required fields' bits are set
348const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
349    | COL_META_ENCODINGS
350    | COL_META_CODEC
351    | COL_META_NUM_VALUES
352    | COL_META_TOTAL_UNCOMP_SZ
353    | COL_META_TOTAL_COMP_SZ
354    | COL_META_DATA_PAGE_OFFSET;
355
356// check mask to see if all required fields are set. return an appropriate error if
357// any are missing.
358fn validate_column_metadata(mask: u16) -> Result<()> {
359    if mask != COL_META_ALL_REQUIRED {
360        if mask & COL_META_ENCODINGS == 0 {
361            return Err(general_err!("Required field encodings is missing"));
362        }
363
364        if mask & COL_META_CODEC == 0 {
365            return Err(general_err!("Required field codec is missing"));
366        }
367        if mask & COL_META_NUM_VALUES == 0 {
368            return Err(general_err!("Required field num_values is missing"));
369        }
370        if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
371            return Err(general_err!(
372                "Required field total_uncompressed_size is missing"
373            ));
374        }
375        if mask & COL_META_TOTAL_COMP_SZ == 0 {
376            return Err(general_err!(
377                "Required field total_compressed_size is missing"
378            ));
379        }
380        if mask & COL_META_DATA_PAGE_OFFSET == 0 {
381            return Err(general_err!("Required field data_page_offset is missing"));
382        }
383    }
384
385    Ok(())
386}
387
388fn read_encoding_stats_as_mask<'a>(
389    prot: &mut ThriftSliceInputProtocol<'a>,
390) -> Result<EncodingMask> {
391    // read the vector of stats, setting mask bits for data pages
392    let mut mask = 0i32;
393    let list_ident = prot.read_list_begin()?;
394    // check for PageEncodingStats struct
395    validate_list_type(ElementType::Struct, &list_ident)?;
396
397    for _ in 0..list_ident.size {
398        let pes = PageEncodingStats::read_thrift(prot)?;
399        match pes.page_type {
400            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => mask |= 1 << pes.encoding as i32,
401            _ => {}
402        }
403    }
404    EncodingMask::try_new(mask)
405}
406
407// Decode `ColumnMetaData`. Returns a mask of all required fields that were observed.
408// This mask can be passed to `validate_column_metadata`.
409fn read_column_metadata<'a>(
410    prot: &mut ThriftSliceInputProtocol<'a>,
411    column: &mut ColumnChunkMetaData,
412    col_index: usize,
413    options: Option<&ParquetMetaDataOptions>,
414) -> Result<u16> {
415    // mask for seen required fields in ColumnMetaData
416    let mut seen_mask = 0u16;
417
418    let mut skip_pes = false;
419    let mut pes_mask = true;
420    let mut skip_col_stats = false;
421    let mut skip_size_stats = false;
422
423    if let Some(opts) = options {
424        skip_pes = opts.skip_encoding_stats(col_index);
425        pes_mask = opts.encoding_stats_as_mask();
426        skip_col_stats = opts.skip_column_stats(col_index);
427        skip_size_stats = opts.skip_size_stats(col_index);
428    }
429
430    // struct ColumnMetaData {
431    //   1: required Type type
432    //   2: required list<Encoding> encodings
433    //   3: required list<string> path_in_schema
434    //   4: required CompressionCodec codec
435    //   5: required i64 num_values
436    //   6: required i64 total_uncompressed_size
437    //   7: required i64 total_compressed_size
438    //   8: optional list<KeyValue> key_value_metadata
439    //   9: required i64 data_page_offset
440    //   10: optional i64 index_page_offset
441    //   11: optional i64 dictionary_page_offset
442    //   12: optional Statistics statistics;
443    //   13: optional list<PageEncodingStats> encoding_stats;
444    //   14: optional i64 bloom_filter_offset;
445    //   15: optional i32 bloom_filter_length;
446    //   16: optional SizeStatistics size_statistics;
447    //   17: optional GeospatialStatistics geospatial_statistics;
448    // }
449    let column_descr = &column.column_descr;
450
451    let mut last_field_id = 0i16;
452    loop {
453        let field_ident = prot.read_field_begin(last_field_id)?;
454        if field_ident.field_type == FieldType::Stop {
455            break;
456        }
457        match field_ident.id {
458            // 1: type is never used, we can use the column descriptor
459            1 => {
460                // read for error handling
461                Type::read_thrift(&mut *prot)?;
462                seen_mask |= COL_META_TYPE;
463            }
464            2 => {
465                column.encodings = EncodingMask::read_thrift(&mut *prot)?;
466                seen_mask |= COL_META_ENCODINGS;
467            }
468            // 3: path_in_schema is redundant
469            4 => {
470                column.compression = CompressionCodec::read_thrift(&mut *prot)?;
471                seen_mask |= COL_META_CODEC;
472            }
473            5 => {
474                column.num_values = i64::read_thrift(&mut *prot)?;
475                seen_mask |= COL_META_NUM_VALUES;
476            }
477            6 => {
478                column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
479                seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
480            }
481            7 => {
482                column.total_compressed_size = i64::read_thrift(&mut *prot)?;
483                seen_mask |= COL_META_TOTAL_COMP_SZ;
484            }
485            // 8: we don't expose this key value
486            9 => {
487                column.data_page_offset = i64::read_thrift(&mut *prot)?;
488                seen_mask |= COL_META_DATA_PAGE_OFFSET;
489            }
490            10 => {
491                column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
492            }
493            11 => {
494                column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
495            }
496            12 if !skip_col_stats => {
497                column.statistics =
498                    convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
499            }
500            13 if !skip_pes => {
501                if pes_mask {
502                    let val = read_encoding_stats_as_mask(&mut *prot)?;
503                    column.encoding_stats = Some(ParquetPageEncodingStats::Mask(val));
504                } else {
505                    let val =
506                        read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
507                    column.encoding_stats = Some(ParquetPageEncodingStats::Full(val));
508                }
509            }
510            14 => {
511                column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
512            }
513            15 => {
514                column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
515            }
516            16 if !skip_size_stats => {
517                let val = SizeStatistics::read_thrift(&mut *prot)?;
518                column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
519                column.repetition_level_histogram =
520                    val.repetition_level_histogram.map(LevelHistogram::from);
521                column.definition_level_histogram =
522                    val.definition_level_histogram.map(LevelHistogram::from);
523            }
524            17 => {
525                let val = GeospatialStatistics::read_thrift(&mut *prot)?;
526                column.geo_statistics = convert_geo_stats(Some(val));
527            }
528            _ => {
529                prot.skip(field_ident.field_type)?;
530            }
531        };
532        last_field_id = field_ident.id;
533    }
534
535    Ok(seen_mask)
536}
537
538// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait because
539// these are all internal and operate on slices.
540fn read_column_chunk<'a>(
541    prot: &mut ThriftSliceInputProtocol<'a>,
542    column_descr: &Arc<ColumnDescriptor>,
543    col_index: usize,
544    options: Option<&ParquetMetaDataOptions>,
545) -> Result<ColumnChunkMetaData> {
546    // create a default initialized ColumnMetaData
547    let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
548
549    // seen flag for file_offset
550    let mut has_file_offset = false;
551
552    // mask of seen flags for ColumnMetaData
553    let mut col_meta_mask = 0u16;
554
555    // struct ColumnChunk {
556    //   1: optional string file_path
557    //   2: required i64 file_offset = 0
558    //   3: optional ColumnMetaData meta_data
559    //   4: optional i64 offset_index_offset
560    //   5: optional i32 offset_index_length
561    //   6: optional i64 column_index_offset
562    //   7: optional i32 column_index_length
563    //   8: optional ColumnCryptoMetaData crypto_metadata
564    //   9: optional binary encrypted_column_metadata
565    // }
566    let mut last_field_id = 0i16;
567    loop {
568        let field_ident = prot.read_field_begin(last_field_id)?;
569        if field_ident.field_type == FieldType::Stop {
570            break;
571        }
572        match field_ident.id {
573            1 => {
574                col.file_path = Some(String::read_thrift(&mut *prot)?);
575            }
576            2 => {
577                col.file_offset = i64::read_thrift(&mut *prot)?;
578                has_file_offset = true;
579            }
580            3 => {
581                col_meta_mask = read_column_metadata(&mut *prot, &mut col, col_index, options)?;
582            }
583            4 => {
584                col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
585            }
586            5 => {
587                col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
588            }
589            6 => {
590                col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
591            }
592            7 => {
593                col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
594            }
595            #[cfg(feature = "encryption")]
596            8 => {
597                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
598                col.column_crypto_metadata = Some(Box::new(val));
599            }
600            #[cfg(feature = "encryption")]
601            9 => {
602                col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
603            }
604            _ => {
605                prot.skip(field_ident.field_type)?;
606            }
607        };
608        last_field_id = field_ident.id;
609    }
610
611    // the only required field from ColumnChunk
612    if !has_file_offset {
613        return Err(general_err!("Required field file_offset is missing"));
614    };
615
616    // if encrypted just return. we'll decrypt after finishing the footer and populate the rest.
617    #[cfg(feature = "encryption")]
618    if col.encrypted_column_metadata.is_some() {
619        return Ok(col);
620    }
621
622    // not encrypted, so make sure all required fields were read
623    validate_column_metadata(col_meta_mask)?;
624
625    Ok(col)
626}
627
628fn read_row_group(
629    prot: &mut ThriftSliceInputProtocol,
630    schema_descr: &Arc<SchemaDescriptor>,
631    options: Option<&ParquetMetaDataOptions>,
632) -> Result<RowGroupMetaData> {
633    // create default initialized RowGroupMetaData
634    let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
635
636    // mask values for required fields
637    const RG_COLUMNS: u8 = 1 << 1;
638    const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
639    const RG_NUM_ROWS: u8 = 1 << 3;
640    const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
641
642    let mut mask = 0u8;
643
644    // struct RowGroup {
645    //   1: required list<ColumnChunk> columns
646    //   2: required i64 total_byte_size
647    //   3: required i64 num_rows
648    //   4: optional list<SortingColumn> sorting_columns
649    //   5: optional i64 file_offset
650    //   6: optional i64 total_compressed_size
651    //   7: optional i16 ordinal
652    // }
653    let mut last_field_id = 0i16;
654    loop {
655        let field_ident = prot.read_field_begin(last_field_id)?;
656        if field_ident.field_type == FieldType::Stop {
657            break;
658        }
659        match field_ident.id {
660            1 => {
661                let list_ident = prot.read_list_begin()?;
662                // check for list of struct
663                validate_list_type(ElementType::Struct, &list_ident)?;
664                if schema_descr.num_columns() != list_ident.size as usize {
665                    return Err(general_err!(
666                        "Column count mismatch. Schema has {} columns while Row Group has {}",
667                        schema_descr.num_columns(),
668                        list_ident.size
669                    ));
670                }
671                for i in 0..list_ident.size as usize {
672                    let col = read_column_chunk(prot, &schema_descr.columns()[i], i, options)?;
673                    row_group.columns.push(col);
674                }
675                mask |= RG_COLUMNS;
676            }
677            2 => {
678                row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
679                mask |= RG_TOT_BYTE_SIZE;
680            }
681            3 => {
682                row_group.num_rows = i64::read_thrift(&mut *prot)?;
683                mask |= RG_NUM_ROWS;
684            }
685            4 => {
686                let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
687                row_group.sorting_columns = Some(val);
688            }
689            5 => {
690                row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
691            }
692            // 6: we don't expose total_compressed_size
693            7 => {
694                row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
695            }
696            _ => {
697                prot.skip(field_ident.field_type)?;
698            }
699        };
700        last_field_id = field_ident.id;
701    }
702
703    if mask != RG_ALL_REQUIRED {
704        if mask & RG_COLUMNS == 0 {
705            return Err(general_err!("Required field columns is missing"));
706        }
707        if mask & RG_TOT_BYTE_SIZE == 0 {
708            return Err(general_err!("Required field total_byte_size is missing"));
709        }
710        if mask & RG_NUM_ROWS == 0 {
711            return Err(general_err!("Required field num_rows is missing"));
712        }
713    }
714
715    Ok(row_group)
716}
717
718/// Create a [`SchemaDescriptor`] from thrift input. The input buffer must contain a complete
719/// Parquet footer.
720pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
721    let mut prot = ThriftSliceInputProtocol::new(buf);
722
723    let mut last_field_id = 0i16;
724    loop {
725        let field_ident = prot.read_field_begin(last_field_id)?;
726        if field_ident.field_type == FieldType::Stop {
727            break;
728        }
729        match field_ident.id {
730            2 => {
731                // read schema and convert to SchemaDescriptor for use when reading row groups
732                let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
733                let val = parquet_schema_from_array(val)?;
734                return Ok(SchemaDescriptor::new(val));
735            }
736            _ => prot.skip(field_ident.field_type)?,
737        }
738        last_field_id = field_ident.id;
739    }
740    Err(general_err!("Input does not contain a schema"))
741}
742
743/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes the file metadata in
744/// the Parquet footer. Page indexes will need to be added later.
745pub(crate) fn parquet_metadata_from_bytes(
746    buf: &[u8],
747    options: Option<&ParquetMetaDataOptions>,
748) -> Result<ParquetMetaData> {
749    let mut prot = ThriftSliceInputProtocol::new(buf);
750
751    // begin reading the file metadata
752    let mut version: Option<i32> = None;
753    let mut num_rows: Option<i64> = None;
754    let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
755    let mut key_value_metadata: Option<Vec<KeyValue>> = None;
756    let mut created_by: Option<&str> = None;
757    let mut column_orders: Option<Vec<ColumnOrder>> = None;
758    #[cfg(feature = "encryption")]
759    let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
760    #[cfg(feature = "encryption")]
761    let mut footer_signing_key_metadata: Option<&[u8]> = None;
762
763    // this will need to be set before parsing row groups
764    let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
765
766    // see if we already have a schema.
767    if let Some(options) = options {
768        schema_descr = options.schema().cloned();
769    }
770
771    // struct FileMetaData {
772    //   1: required i32 version
773    //   2: required list<SchemaElement> schema;
774    //   3: required i64 num_rows
775    //   4: required list<RowGroup> row_groups
776    //   5: optional list<KeyValue> key_value_metadata
777    //   6: optional string created_by
778    //   7: optional list<ColumnOrder> column_orders;
779    //   8: optional EncryptionAlgorithm encryption_algorithm
780    //   9: optional binary footer_signing_key_metadata
781    // }
782    let mut last_field_id = 0i16;
783    loop {
784        let field_ident = prot.read_field_begin(last_field_id)?;
785        if field_ident.field_type == FieldType::Stop {
786            break;
787        }
788        match field_ident.id {
789            1 => {
790                version = Some(i32::read_thrift(&mut prot)?);
791            }
792            2 => {
793                // If schema was passed in, skip parsing it
794                if schema_descr.is_some() {
795                    prot.skip(field_ident.field_type)?;
796                } else {
797                    // read schema and convert to SchemaDescriptor for use when reading row groups
798                    let val =
799                        read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
800                    let val = parquet_schema_from_array(val)?;
801                    schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
802                }
803            }
804            3 => {
805                num_rows = Some(i64::read_thrift(&mut prot)?);
806            }
807            4 => {
808                if schema_descr.is_none() {
809                    return Err(general_err!("Required field schema is missing"));
810                }
811                let schema_descr = schema_descr.as_ref().unwrap();
812                let list_ident = prot.read_list_begin()?;
813                // check for list of struct
814                validate_list_type(ElementType::Struct, &list_ident)?;
815                let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
816
817                // Read row groups and handle ordinal assignment
818                let mut assigner = OrdinalAssigner::new();
819                for ordinal in 0..list_ident.size {
820                    let ordinal: i16 = ordinal.try_into().map_err(|_| {
821                        ParquetError::General(format!(
822                            "Row group ordinal {ordinal} exceeds i16 max value",
823                        ))
824                    })?;
825                    let rg = read_row_group(&mut prot, schema_descr, options)?;
826                    rg_vec.push(assigner.ensure(ordinal, rg)?);
827                }
828                row_groups = Some(rg_vec);
829            }
830            5 => {
831                let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
832                key_value_metadata = Some(val);
833            }
834            6 => {
835                created_by = Some(<&str>::read_thrift(&mut prot)?);
836            }
837            7 => {
838                let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
839                column_orders = Some(val);
840            }
841            #[cfg(feature = "encryption")]
842            8 => {
843                let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
844                encryption_algorithm = Some(val);
845            }
846            #[cfg(feature = "encryption")]
847            9 => {
848                footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
849            }
850            _ => {
851                prot.skip(field_ident.field_type)?;
852            }
853        };
854        last_field_id = field_ident.id;
855    }
856    let Some(version) = version else {
857        return Err(general_err!("Required field version is missing"));
858    };
859    let Some(num_rows) = num_rows else {
860        return Err(general_err!("Required field num_rows is missing"));
861    };
862    let Some(row_groups) = row_groups else {
863        return Err(general_err!("Required field row_groups is missing"));
864    };
865
866    let created_by = created_by.map(|c| c.to_owned());
867
868    // we've tested for `None` by now so this is safe
869    let schema_descr = schema_descr.unwrap();
870
871    // need to map read column orders to actual values based on the schema
872    if column_orders
873        .as_ref()
874        .is_some_and(|cos| cos.len() != schema_descr.num_columns())
875    {
876        return Err(general_err!("Column order length mismatch"));
877    }
878    // replace default type defined column orders with ones having the correct sort order
879    // TODO(ets): this could instead be done above when decoding
880    let column_orders = column_orders.map(|mut cos| {
881        for (i, column) in schema_descr.columns().iter().enumerate() {
882            if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
883                let sort_order = ColumnOrder::sort_order_for_type(
884                    column.logical_type_ref(),
885                    column.converted_type(),
886                    column.physical_type(),
887                );
888                cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
889            }
890        }
891        cos
892    });
893
894    #[cfg(not(feature = "encryption"))]
895    let fmd = crate::file::metadata::FileMetaData::new(
896        version,
897        num_rows,
898        created_by,
899        key_value_metadata,
900        schema_descr,
901        column_orders,
902    );
903    #[cfg(feature = "encryption")]
904    let fmd = crate::file::metadata::FileMetaData::new(
905        version,
906        num_rows,
907        created_by,
908        key_value_metadata,
909        schema_descr,
910        column_orders,
911    )
912    .with_encryption_algorithm(encryption_algorithm)
913    .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
914
915    Ok(ParquetMetaData::new(fmd, row_groups))
916}
917
918/// Assign [`RowGroupMetaData::ordinal`]  if it is missing.
919#[derive(Debug, Default)]
920pub(crate) struct OrdinalAssigner {
921    first_has_ordinal: Option<bool>,
922}
923
924impl OrdinalAssigner {
925    fn new() -> Self {
926        Default::default()
927    }
928
929    /// Sets [`RowGroupMetaData::ordinal`] if it is missing.
930    ///
931    /// # Arguments
932    /// - actual_ordinal: The ordinal (index) of the row group being processed
933    ///   in the file metadata.
934    /// - rg: The [`RowGroupMetaData`] to potentially modify.
935    ///
936    /// Ensures:
937    /// 1. If the first row group has an ordinal, all subsequent row groups must
938    ///    also have ordinals.
939    /// 2. If the first row group does NOT have an ordinal, all subsequent row
940    ///    groups must also not have ordinals.
941    fn ensure(
942        &mut self,
943        actual_ordinal: i16,
944        mut rg: RowGroupMetaData,
945    ) -> Result<RowGroupMetaData> {
946        let rg_has_ordinal = rg.ordinal.is_some();
947
948        // Only set first_has_ordinal if it's None (first row group that arrives)
949        if self.first_has_ordinal.is_none() {
950            self.first_has_ordinal = Some(rg_has_ordinal);
951        }
952
953        // assign ordinal if missing and consistent with first row group
954        let first_has_ordinal = self.first_has_ordinal.unwrap();
955        if !first_has_ordinal && !rg_has_ordinal {
956            rg.ordinal = Some(actual_ordinal);
957        } else if first_has_ordinal != rg_has_ordinal {
958            return Err(general_err!(
959                "Inconsistent ordinal assignment: first_has_ordinal is set to \
960                {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
961                first_has_ordinal,
962                actual_ordinal,
963                rg_has_ordinal
964            ));
965        }
966        Ok(rg)
967    }
968}
969
970thrift_struct!(
971    pub(crate) struct IndexPageHeader {}
972);
973
974thrift_struct!(
975pub(crate) struct DictionaryPageHeader {
976  /// Number of values in the dictionary
977  1: required i32 num_values;
978
979  /// Encoding using this dictionary page
980  2: required Encoding encoding
981
982  /// If true, the entries in the dictionary are sorted in ascending order
983  3: optional bool is_sorted;
984}
985);
986
987thrift_struct!(
988/// Statistics for the page header.
989///
990/// This is a duplicate of the [`Statistics`] struct above. Because the page reader uses
991/// the [`Read`] API, we cannot read the min/max values as slices. This should not be
992/// a huge problem since this crate no longer reads the page header statistics by default.
993///
994/// [`Read`]: crate::parquet_thrift::ThriftReadInputProtocol
995pub(crate) struct PageStatistics {
996   1: optional binary max;
997   2: optional binary min;
998   3: optional i64 null_count;
999   4: optional i64 distinct_count;
1000   5: optional binary max_value;
1001   6: optional binary min_value;
1002   7: optional bool is_max_value_exact;
1003   8: optional bool is_min_value_exact;
1004}
1005);
1006
1007thrift_struct!(
1008pub(crate) struct DataPageHeader {
1009  1: required i32 num_values
1010  2: required Encoding encoding
1011  3: required Encoding definition_level_encoding;
1012  4: required Encoding repetition_level_encoding;
1013  5: optional PageStatistics statistics;
1014}
1015);
1016
1017impl DataPageHeader {
1018    // reader that skips decoding page statistics
1019    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1020    where
1021        R: ThriftCompactInputProtocol<'a>,
1022    {
1023        let mut num_values: Option<i32> = None;
1024        let mut encoding: Option<Encoding> = None;
1025        let mut definition_level_encoding: Option<Encoding> = None;
1026        let mut repetition_level_encoding: Option<Encoding> = None;
1027        let statistics: Option<PageStatistics> = None;
1028        let mut last_field_id = 0i16;
1029        loop {
1030            let field_ident = prot.read_field_begin(last_field_id)?;
1031            if field_ident.field_type == FieldType::Stop {
1032                break;
1033            }
1034            match field_ident.id {
1035                1 => {
1036                    let val = i32::read_thrift(&mut *prot)?;
1037                    num_values = Some(val);
1038                }
1039                2 => {
1040                    let val = Encoding::read_thrift(&mut *prot)?;
1041                    encoding = Some(val);
1042                }
1043                3 => {
1044                    let val = Encoding::read_thrift(&mut *prot)?;
1045                    definition_level_encoding = Some(val);
1046                }
1047                4 => {
1048                    let val = Encoding::read_thrift(&mut *prot)?;
1049                    repetition_level_encoding = Some(val);
1050                }
1051                _ => {
1052                    prot.skip(field_ident.field_type)?;
1053                }
1054            };
1055            last_field_id = field_ident.id;
1056        }
1057        let Some(num_values) = num_values else {
1058            return Err(general_err!("Required field num_values is missing"));
1059        };
1060        let Some(encoding) = encoding else {
1061            return Err(general_err!("Required field encoding is missing"));
1062        };
1063        let Some(definition_level_encoding) = definition_level_encoding else {
1064            return Err(general_err!(
1065                "Required field definition_level_encoding is missing"
1066            ));
1067        };
1068        let Some(repetition_level_encoding) = repetition_level_encoding else {
1069            return Err(general_err!(
1070                "Required field repetition_level_encoding is missing"
1071            ));
1072        };
1073        Ok(Self {
1074            num_values,
1075            encoding,
1076            definition_level_encoding,
1077            repetition_level_encoding,
1078            statistics,
1079        })
1080    }
1081}
1082
1083thrift_struct!(
1084pub(crate) struct DataPageHeaderV2 {
1085  1: required i32 num_values
1086  2: required i32 num_nulls
1087  3: required i32 num_rows
1088  4: required Encoding encoding
1089  5: required i32 definition_levels_byte_length;
1090  6: required i32 repetition_levels_byte_length;
1091  7: optional bool is_compressed = true;
1092  8: optional PageStatistics statistics;
1093}
1094);
1095
1096impl DataPageHeaderV2 {
1097    // reader that skips decoding page statistics
1098    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1099    where
1100        R: ThriftCompactInputProtocol<'a>,
1101    {
1102        let mut num_values: Option<i32> = None;
1103        let mut num_nulls: Option<i32> = None;
1104        let mut num_rows: Option<i32> = None;
1105        let mut encoding: Option<Encoding> = None;
1106        let mut definition_levels_byte_length: Option<i32> = None;
1107        let mut repetition_levels_byte_length: Option<i32> = None;
1108        let mut is_compressed: Option<bool> = None;
1109        let statistics: Option<PageStatistics> = None;
1110        let mut last_field_id = 0i16;
1111        loop {
1112            let field_ident = prot.read_field_begin(last_field_id)?;
1113            if field_ident.field_type == FieldType::Stop {
1114                break;
1115            }
1116            match field_ident.id {
1117                1 => {
1118                    let val = i32::read_thrift(&mut *prot)?;
1119                    num_values = Some(val);
1120                }
1121                2 => {
1122                    let val = i32::read_thrift(&mut *prot)?;
1123                    num_nulls = Some(val);
1124                }
1125                3 => {
1126                    let val = i32::read_thrift(&mut *prot)?;
1127                    num_rows = Some(val);
1128                }
1129                4 => {
1130                    let val = Encoding::read_thrift(&mut *prot)?;
1131                    encoding = Some(val);
1132                }
1133                5 => {
1134                    let val = i32::read_thrift(&mut *prot)?;
1135                    definition_levels_byte_length = Some(val);
1136                }
1137                6 => {
1138                    let val = i32::read_thrift(&mut *prot)?;
1139                    repetition_levels_byte_length = Some(val);
1140                }
1141                7 => {
1142                    is_compressed = Some(field_ident.bool_val()?);
1143                }
1144                _ => {
1145                    prot.skip(field_ident.field_type)?;
1146                }
1147            };
1148            last_field_id = field_ident.id;
1149        }
1150        let Some(num_values) = num_values else {
1151            return Err(general_err!("Required field num_values is missing"));
1152        };
1153        let Some(num_nulls) = num_nulls else {
1154            return Err(general_err!("Required field num_nulls is missing"));
1155        };
1156        let Some(num_rows) = num_rows else {
1157            return Err(general_err!("Required field num_rows is missing"));
1158        };
1159        let Some(encoding) = encoding else {
1160            return Err(general_err!("Required field encoding is missing"));
1161        };
1162        let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1163            return Err(general_err!(
1164                "Required field definition_levels_byte_length is missing"
1165            ));
1166        };
1167        let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1168            return Err(general_err!(
1169                "Required field repetition_levels_byte_length is missing"
1170            ));
1171        };
1172        Ok(Self {
1173            num_values,
1174            num_nulls,
1175            num_rows,
1176            encoding,
1177            definition_levels_byte_length,
1178            repetition_levels_byte_length,
1179            is_compressed,
1180            statistics,
1181        })
1182    }
1183}
1184
1185thrift_struct!(
1186pub(crate) struct PageHeader {
1187  /// the type of the page: indicates which of the *_header fields is set
1188  1: required PageType r#type
1189
1190  /// Uncompressed page size in bytes (not including this header)
1191  2: required i32 uncompressed_page_size
1192
1193  /// Compressed (and potentially encrypted) page size in bytes, not including this header
1194  3: required i32 compressed_page_size
1195
1196  /// The 32-bit CRC checksum for the page, to be be calculated as follows:
1197  4: optional i32 crc
1198
1199  // Headers for page specific data.  One only will be set.
1200  5: optional DataPageHeader data_page_header;
1201  6: optional IndexPageHeader index_page_header;
1202  7: optional DictionaryPageHeader dictionary_page_header;
1203  8: optional DataPageHeaderV2 data_page_header_v2;
1204}
1205);
1206
1207impl PageHeader {
1208    // reader that skips reading page statistics. obtained by running
1209    // `cargo expand -p parquet --all-features --lib file::metadata::thrift`
1210    // and modifying the impl of `read_thrift`
1211    pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1212    where
1213        R: ThriftCompactInputProtocol<'a>,
1214    {
1215        let mut type_: Option<PageType> = None;
1216        let mut uncompressed_page_size: Option<i32> = None;
1217        let mut compressed_page_size: Option<i32> = None;
1218        let mut crc: Option<i32> = None;
1219        let mut data_page_header: Option<DataPageHeader> = None;
1220        let mut index_page_header: Option<IndexPageHeader> = None;
1221        let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1222        let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1223        let mut last_field_id = 0i16;
1224        loop {
1225            let field_ident = prot.read_field_begin(last_field_id)?;
1226            if field_ident.field_type == FieldType::Stop {
1227                break;
1228            }
1229            match field_ident.id {
1230                1 => {
1231                    let val = PageType::read_thrift(&mut *prot)?;
1232                    type_ = Some(val);
1233                }
1234                2 => {
1235                    let val = i32::read_thrift(&mut *prot)?;
1236                    uncompressed_page_size = Some(val);
1237                }
1238                3 => {
1239                    let val = i32::read_thrift(&mut *prot)?;
1240                    compressed_page_size = Some(val);
1241                }
1242                4 => {
1243                    let val = i32::read_thrift(&mut *prot)?;
1244                    crc = Some(val);
1245                }
1246                5 => {
1247                    let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1248                    data_page_header = Some(val);
1249                }
1250                6 => {
1251                    let val = IndexPageHeader::read_thrift(&mut *prot)?;
1252                    index_page_header = Some(val);
1253                }
1254                7 => {
1255                    let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1256                    dictionary_page_header = Some(val);
1257                }
1258                8 => {
1259                    let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1260                    data_page_header_v2 = Some(val);
1261                }
1262                _ => {
1263                    prot.skip(field_ident.field_type)?;
1264                }
1265            };
1266            last_field_id = field_ident.id;
1267        }
1268        let Some(type_) = type_ else {
1269            return Err(general_err!("Required field type_ is missing"));
1270        };
1271        let Some(uncompressed_page_size) = uncompressed_page_size else {
1272            return Err(general_err!(
1273                "Required field uncompressed_page_size is missing"
1274            ));
1275        };
1276        let Some(compressed_page_size) = compressed_page_size else {
1277            return Err(general_err!(
1278                "Required field compressed_page_size is missing"
1279            ));
1280        };
1281        Ok(Self {
1282            r#type: type_,
1283            uncompressed_page_size,
1284            compressed_page_size,
1285            crc,
1286            data_page_header,
1287            index_page_header,
1288            dictionary_page_header,
1289            data_page_header_v2,
1290        })
1291    }
1292}
1293
1294/////////////////////////////////////////////////
1295// helper functions for writing file meta data
1296
1297#[cfg(feature = "encryption")]
1298fn should_write_column_stats(column_chunk: &ColumnChunkMetaData) -> bool {
1299    // If there is encrypted column metadata present,
1300    // the column is encrypted with a different key to the footer or a plaintext footer is used,
1301    // so the statistics are sensitive and shouldn't be written.
1302    column_chunk.encrypted_column_metadata.is_none()
1303}
1304
1305#[cfg(not(feature = "encryption"))]
1306fn should_write_column_stats(_column_chunk: &ColumnChunkMetaData) -> bool {
1307    true
1308}
1309
1310// serialize the bits of the column chunk needed for a thrift ColumnMetaData
1311// struct ColumnMetaData {
1312//   1: required Type type
1313//   2: required list<Encoding> encodings
1314//   3: required list<string> path_in_schema
1315//   4: required CompressionCodec codec
1316//   5: required i64 num_values
1317//   6: required i64 total_uncompressed_size
1318//   7: required i64 total_compressed_size
1319//   8: optional list<KeyValue> key_value_metadata
1320//   9: required i64 data_page_offset
1321//   10: optional i64 index_page_offset
1322//   11: optional i64 dictionary_page_offset
1323//   12: optional Statistics statistics;
1324//   13: optional list<PageEncodingStats> encoding_stats;
1325//   14: optional i64 bloom_filter_offset;
1326//   15: optional i32 bloom_filter_length;
1327//   16: optional SizeStatistics size_statistics;
1328//   17: optional GeospatialStatistics geospatial_statistics;
1329// }
1330pub(super) fn serialize_column_meta_data<W: Write>(
1331    column_chunk: &ColumnChunkMetaData,
1332    w: &mut ThriftCompactOutputProtocol<W>,
1333) -> Result<()> {
1334    use crate::file::statistics::page_stats_to_thrift;
1335
1336    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1337    column_chunk
1338        .encodings()
1339        .collect::<Vec<_>>()
1340        .write_thrift_field(w, 2, 1)?;
1341    if w.write_path_in_schema() {
1342        let path = column_chunk.column_descr.path().parts();
1343        let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1344        path.write_thrift_field(w, 3, 2)?;
1345        column_chunk.compression.write_thrift_field(w, 4, 3)?;
1346    } else {
1347        column_chunk.compression.write_thrift_field(w, 4, 2)?;
1348    }
1349
1350    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1351    column_chunk
1352        .total_uncompressed_size
1353        .write_thrift_field(w, 6, 5)?;
1354    column_chunk
1355        .total_compressed_size
1356        .write_thrift_field(w, 7, 6)?;
1357    // no key_value_metadata here
1358    let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1359    if let Some(index_page_offset) = column_chunk.index_page_offset {
1360        last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1361    }
1362    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1363        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1364    }
1365
1366    if should_write_column_stats(column_chunk) {
1367        // PageStatistics is the same as thrift Statistics, but writable
1368        let stats = page_stats_to_thrift(column_chunk.statistics());
1369        if let Some(stats) = stats {
1370            last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1371        }
1372        if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1373            last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1374        }
1375        if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1376            last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1377        }
1378        if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1379            last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1380        }
1381
1382        // SizeStatistics
1383        let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1384            || column_chunk.repetition_level_histogram.is_some()
1385            || column_chunk.definition_level_histogram.is_some()
1386        {
1387            let repetition_level_histogram = column_chunk
1388                .repetition_level_histogram()
1389                .map(|hist| hist.clone().into_inner());
1390
1391            let definition_level_histogram = column_chunk
1392                .definition_level_histogram()
1393                .map(|hist| hist.clone().into_inner());
1394
1395            Some(SizeStatistics {
1396                unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1397                repetition_level_histogram,
1398                definition_level_histogram,
1399            })
1400        } else {
1401            None
1402        };
1403        if let Some(size_stats) = size_stats {
1404            last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1405        }
1406
1407        if let Some(geo_stats) = column_chunk.geo_statistics() {
1408            geo_stats.write_thrift_field(w, 17, last_field_id)?;
1409        }
1410    }
1411
1412    w.write_struct_end()
1413}
1414
1415// temp struct used for writing
1416pub(super) struct FileMeta<'a> {
1417    pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1418    pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1419    // If true, then write the `path_in_schema` field in the ColumnMetaData struct.
1420    pub(super) write_path_in_schema: bool,
1421}
1422
1423// struct FileMetaData {
1424//   1: required i32 version
1425//   2: required list<SchemaElement> schema;
1426//   3: required i64 num_rows
1427//   4: required list<RowGroup> row_groups
1428//   5: optional list<KeyValue> key_value_metadata
1429//   6: optional string created_by
1430//   7: optional list<ColumnOrder> column_orders;
1431//   8: optional EncryptionAlgorithm encryption_algorithm
1432//   9: optional binary footer_signing_key_metadata
1433// }
1434impl<'a> WriteThrift for FileMeta<'a> {
1435    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1436
1437    // needed for last_field_id w/o encryption
1438    #[allow(unused_assignments)]
1439    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1440        writer.set_write_path_in_schema(self.write_path_in_schema);
1441
1442        self.file_metadata
1443            .version
1444            .write_thrift_field(writer, 1, 0)?;
1445
1446        // field 2 is schema. do depth-first traversal of tree, converting to SchemaElement and
1447        // writing along the way.
1448        let root = self.file_metadata.schema_descr().root_schema_ptr();
1449        let schema_len = num_nodes(&root)?;
1450        writer.write_field_begin(FieldType::List, 2, 1)?;
1451        writer.write_list_begin(ElementType::Struct, schema_len)?;
1452        // recursively write Type nodes as SchemaElements
1453        write_schema(&root, writer)?;
1454
1455        self.file_metadata
1456            .num_rows
1457            .write_thrift_field(writer, 3, 2)?;
1458
1459        // this will call RowGroupMetaData::write_thrift
1460        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1461
1462        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1463            last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1464        }
1465        if let Some(created_by) = self.file_metadata.created_by() {
1466            last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1467        }
1468        if let Some(column_orders) = self.file_metadata.column_orders() {
1469            last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1470        }
1471        #[cfg(feature = "encryption")]
1472        if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1473            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1474        }
1475        #[cfg(feature = "encryption")]
1476        if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1477            key.as_slice()
1478                .write_thrift_field(writer, 9, last_field_id)?;
1479        }
1480
1481        writer.write_struct_end()
1482    }
1483}
1484
1485fn write_schema<W: Write>(
1486    schema: &TypePtr,
1487    writer: &mut ThriftCompactOutputProtocol<W>,
1488) -> Result<()> {
1489    if !schema.is_group() {
1490        return Err(general_err!("Root schema must be Group type"));
1491    }
1492    write_schema_helper(schema, writer)
1493}
1494
1495fn write_schema_helper<W: Write>(
1496    node: &TypePtr,
1497    writer: &mut ThriftCompactOutputProtocol<W>,
1498) -> Result<()> {
1499    match node.as_ref() {
1500        crate::schema::types::Type::PrimitiveType {
1501            basic_info,
1502            physical_type,
1503            type_length,
1504            scale,
1505            precision,
1506        } => {
1507            let element = SchemaElement {
1508                r#type: Some(*physical_type),
1509                type_length: if *type_length >= 0 {
1510                    Some(*type_length)
1511                } else {
1512                    None
1513                },
1514                repetition_type: Some(basic_info.repetition()),
1515                name: basic_info.name(),
1516                num_children: None,
1517                converted_type: match basic_info.converted_type() {
1518                    ConvertedType::NONE => None,
1519                    other => Some(other),
1520                },
1521                scale: if *scale >= 0 { Some(*scale) } else { None },
1522                precision: if *precision >= 0 {
1523                    Some(*precision)
1524                } else {
1525                    None
1526                },
1527                field_id: if basic_info.has_id() {
1528                    Some(basic_info.id())
1529                } else {
1530                    None
1531                },
1532                logical_type: basic_info.logical_type_ref().cloned(),
1533            };
1534            element.write_thrift(writer)
1535        }
1536        crate::schema::types::Type::GroupType { basic_info, fields } => {
1537            let repetition = if basic_info.has_repetition() {
1538                Some(basic_info.repetition())
1539            } else {
1540                None
1541            };
1542
1543            let element = SchemaElement {
1544                r#type: None,
1545                type_length: None,
1546                repetition_type: repetition,
1547                name: basic_info.name(),
1548                num_children: Some(fields.len().try_into()?),
1549                converted_type: match basic_info.converted_type() {
1550                    ConvertedType::NONE => None,
1551                    other => Some(other),
1552                },
1553                scale: None,
1554                precision: None,
1555                field_id: if basic_info.has_id() {
1556                    Some(basic_info.id())
1557                } else {
1558                    None
1559                },
1560                logical_type: basic_info.logical_type_ref().cloned(),
1561            };
1562
1563            element.write_thrift(writer)?;
1564
1565            // Add child elements for a group
1566            for field in fields {
1567                write_schema_helper(field, writer)?;
1568            }
1569            Ok(())
1570        }
1571    }
1572}
1573
1574// struct RowGroup {
1575//   1: required list<ColumnChunk> columns
1576//   2: required i64 total_byte_size
1577//   3: required i64 num_rows
1578//   4: optional list<SortingColumn> sorting_columns
1579//   5: optional i64 file_offset
1580//   6: optional i64 total_compressed_size
1581//   7: optional i16 ordinal
1582// }
1583impl WriteThrift for RowGroupMetaData {
1584    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1585
1586    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1587        // this will call ColumnChunkMetaData::write_thrift
1588        self.columns.write_thrift_field(writer, 1, 0)?;
1589        self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1590        let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1591        if let Some(sorting_columns) = self.sorting_columns() {
1592            last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1593        }
1594        if let Some(file_offset) = self.file_offset() {
1595            last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1596        }
1597        // this is optional, but we'll always write it
1598        last_field_id = self
1599            .compressed_size()
1600            .write_thrift_field(writer, 6, last_field_id)?;
1601        if let Some(ordinal) = self.ordinal() {
1602            ordinal.write_thrift_field(writer, 7, last_field_id)?;
1603        }
1604        writer.write_struct_end()
1605    }
1606}
1607
1608// struct ColumnChunk {
1609//   1: optional string file_path
1610//   2: required i64 file_offset = 0
1611//   3: optional ColumnMetaData meta_data
1612//   4: optional i64 offset_index_offset
1613//   5: optional i32 offset_index_length
1614//   6: optional i64 column_index_offset
1615//   7: optional i32 column_index_length
1616//   8: optional ColumnCryptoMetaData crypto_metadata
1617//   9: optional binary encrypted_column_metadata
1618// }
1619impl WriteThrift for ColumnChunkMetaData {
1620    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1621
1622    #[allow(unused_assignments)]
1623    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1624        let mut last_field_id = 0i16;
1625        if let Some(file_path) = self.file_path() {
1626            last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1627        }
1628        last_field_id = self
1629            .file_offset()
1630            .write_thrift_field(writer, 2, last_field_id)?;
1631
1632        #[cfg(feature = "encryption")]
1633        let write_meta_data =
1634            self.encrypted_column_metadata.is_none() || self.plaintext_footer_mode;
1635        #[cfg(not(feature = "encryption"))]
1636        let write_meta_data = true;
1637
1638        // When the footer is encrypted and encrypted_column_metadata is present,
1639        // skip writing the plaintext meta_data field to reduce footer size.
1640        // When the footer is plaintext (plaintext_footer_mode=true), we still write
1641        // meta_data for backward compatibility with readers that expect it, but with
1642        // sensitive fields (statistics, bloom filter info, etc.) stripped out.
1643        if write_meta_data {
1644            writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1645            serialize_column_meta_data(self, writer)?;
1646            last_field_id = 3;
1647        }
1648
1649        if let Some(offset_idx_off) = self.offset_index_offset() {
1650            last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1651        }
1652        if let Some(offset_idx_len) = self.offset_index_length() {
1653            last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1654        }
1655        if let Some(column_idx_off) = self.column_index_offset() {
1656            last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1657        }
1658        if let Some(column_idx_len) = self.column_index_length() {
1659            last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1660        }
1661        #[cfg(feature = "encryption")]
1662        {
1663            if let Some(crypto_metadata) = self.crypto_metadata() {
1664                last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1665            }
1666            if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1667                encrypted_meta
1668                    .as_slice()
1669                    .write_thrift_field(writer, 9, last_field_id)?;
1670            }
1671        }
1672
1673        writer.write_struct_end()
1674    }
1675}
1676
1677// struct GeospatialStatistics {
1678//   1: optional BoundingBox bbox;
1679//   2: optional list<i32> geospatial_types;
1680// }
1681impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1682    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1683
1684    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1685        let mut last_field_id = 0i16;
1686        if let Some(bbox) = self.bounding_box() {
1687            last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1688        }
1689        if let Some(geo_types) = self.geospatial_types() {
1690            geo_types.write_thrift_field(writer, 2, last_field_id)?;
1691        }
1692
1693        writer.write_struct_end()
1694    }
1695}
1696
1697// macro cannot handle qualified names
1698use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1699write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1700
1701// struct BoundingBox {
1702//   1: required double xmin;
1703//   2: required double xmax;
1704//   3: required double ymin;
1705//   4: required double ymax;
1706//   5: optional double zmin;
1707//   6: optional double zmax;
1708//   7: optional double mmin;
1709//   8: optional double mmax;
1710// }
1711impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1712    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1713
1714    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1715        self.get_xmin().write_thrift_field(writer, 1, 0)?;
1716        self.get_xmax().write_thrift_field(writer, 2, 1)?;
1717        self.get_ymin().write_thrift_field(writer, 3, 2)?;
1718        let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1719
1720        if let Some(zmin) = self.get_zmin() {
1721            last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1722        }
1723        if let Some(zmax) = self.get_zmax() {
1724            last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1725        }
1726        if let Some(mmin) = self.get_mmin() {
1727            last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1728        }
1729        if let Some(mmax) = self.get_mmax() {
1730            mmax.write_thrift_field(writer, 8, last_field_id)?;
1731        }
1732
1733        writer.write_struct_end()
1734    }
1735}
1736
1737// macro cannot handle qualified names
1738use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1739write_thrift_field!(RustBoundingBox, FieldType::Struct);
1740
1741#[cfg(test)]
1742pub(crate) mod tests {
1743    use crate::basic::{Encoding, PageType, Type as PhysicalType};
1744    use crate::errors::Result;
1745    use crate::file::metadata::thrift::{
1746        BoundingBox, DataPageHeaderV2, DictionaryPageHeader, PageHeader, SchemaElement,
1747        write_schema,
1748    };
1749    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
1750    use crate::parquet_thrift::tests::test_roundtrip;
1751    use crate::parquet_thrift::{
1752        ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift,
1753        read_thrift_vec,
1754    };
1755    use crate::schema::types::{
1756        ColumnDescriptor, ColumnPath, SchemaDescriptor, TypePtr, num_nodes,
1757        parquet_schema_from_array,
1758    };
1759    use std::sync::Arc;
1760
1761    // for testing. decode thrift encoded RowGroup
1762    pub(crate) fn read_row_group(
1763        buf: &mut [u8],
1764        schema_descr: Arc<SchemaDescriptor>,
1765    ) -> Result<RowGroupMetaData> {
1766        let mut reader = ThriftSliceInputProtocol::new(buf);
1767        crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None)
1768    }
1769
1770    pub(crate) fn read_column_chunk(
1771        buf: &mut [u8],
1772        column_descr: Arc<ColumnDescriptor>,
1773    ) -> Result<ColumnChunkMetaData> {
1774        read_column_chunk_with_options(buf, column_descr, None)
1775    }
1776
1777    pub(crate) fn read_column_chunk_with_options(
1778        buf: &mut [u8],
1779        column_descr: Arc<ColumnDescriptor>,
1780        options: Option<&ParquetMetaDataOptions>,
1781    ) -> Result<ColumnChunkMetaData> {
1782        let mut reader = ThriftSliceInputProtocol::new(buf);
1783        crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
1784    }
1785
1786    pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1787        let num_nodes = num_nodes(&schema)?;
1788        let mut buf = Vec::new();
1789        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1790
1791        // kick off writing list
1792        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1793
1794        // write SchemaElements
1795        write_schema(&schema, &mut writer)?;
1796
1797        let mut prot = ThriftSliceInputProtocol::new(&buf);
1798        let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1799        parquet_schema_from_array(se)
1800    }
1801
1802    pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1803        let num_nodes = num_nodes(schema)?;
1804        let mut buf = Vec::new();
1805        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1806
1807        // kick off writing list
1808        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1809
1810        // write SchemaElements
1811        write_schema(schema, &mut writer)?;
1812        Ok(buf)
1813    }
1814
1815    pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1816        let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1817        read_thrift_vec(&mut prot)
1818    }
1819
1820    fn thrift_bytes<T: WriteThrift>(value: &T) -> Vec<u8> {
1821        let mut buf = Vec::new();
1822        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1823        value.write_thrift(&mut writer).unwrap();
1824        buf
1825    }
1826
1827    fn change_false_bool_field_to_i32(buf: &mut [u8]) {
1828        let pos = buf
1829            .iter()
1830            .rposition(|byte| *byte == 0x12)
1831            .expect("expected BOOL_FALSE field header byte");
1832        buf[pos] = 0x15;
1833    }
1834
1835    fn assert_malformed_bool_error(err: crate::errors::ParquetError) {
1836        let msg = err.to_string();
1837        assert!(
1838            msg.contains("Unexpected struct field type"),
1839            "unexpected error message: {msg}"
1840        );
1841    }
1842
1843    #[test]
1844    fn test_bounding_box_roundtrip() {
1845        test_roundtrip(BoundingBox {
1846            xmin: 0.1.into(),
1847            xmax: 10.3.into(),
1848            ymin: 0.001.into(),
1849            ymax: 128.5.into(),
1850            zmin: None,
1851            zmax: None,
1852            mmin: None,
1853            mmax: None,
1854        });
1855
1856        test_roundtrip(BoundingBox {
1857            xmin: 0.1.into(),
1858            xmax: 10.3.into(),
1859            ymin: 0.001.into(),
1860            ymax: 128.5.into(),
1861            zmin: Some(11.0.into()),
1862            zmax: Some(1300.0.into()),
1863            mmin: None,
1864            mmax: None,
1865        });
1866
1867        test_roundtrip(BoundingBox {
1868            xmin: 0.1.into(),
1869            xmax: 10.3.into(),
1870            ymin: 0.001.into(),
1871            ymax: 128.5.into(),
1872            zmin: Some(11.0.into()),
1873            zmax: Some(1300.0.into()),
1874            mmin: Some(3.7.into()),
1875            mmax: Some(42.0.into()),
1876        });
1877    }
1878
1879    #[test]
1880    fn test_convert_stats_preserves_missing_null_count() {
1881        let primitive =
1882            crate::schema::types::Type::primitive_type_builder("col", PhysicalType::INT32)
1883                .build()
1884                .unwrap();
1885        let column_descr = Arc::new(ColumnDescriptor::new(
1886            Arc::new(primitive),
1887            0,
1888            0,
1889            ColumnPath::new(vec![]),
1890        ));
1891
1892        let none_null_count = super::Statistics {
1893            max: None,
1894            min: None,
1895            null_count: None,
1896            distinct_count: None,
1897            max_value: None,
1898            min_value: None,
1899            is_max_value_exact: None,
1900            is_min_value_exact: None,
1901        };
1902        let decoded_none = super::convert_stats(&column_descr, Some(none_null_count))
1903            .unwrap()
1904            .unwrap();
1905        assert_eq!(decoded_none.null_count_opt(), None);
1906
1907        let zero_null_count = super::Statistics {
1908            max: None,
1909            min: None,
1910            null_count: Some(0),
1911            distinct_count: None,
1912            max_value: None,
1913            min_value: None,
1914            is_max_value_exact: None,
1915            is_min_value_exact: None,
1916        };
1917        let decoded_zero = super::convert_stats(&column_descr, Some(zero_null_count))
1918            .unwrap()
1919            .unwrap();
1920        assert_eq!(decoded_zero.null_count_opt(), Some(0));
1921    }
1922
1923    #[test]
1924    fn test_convert_stats_returns_error_for_overlong_int96_statistics() {
1925        let primitive =
1926            crate::schema::types::Type::primitive_type_builder("col", PhysicalType::INT96)
1927                .build()
1928                .unwrap();
1929        let column_descr = Arc::new(ColumnDescriptor::new(
1930            Arc::new(primitive),
1931            0,
1932            0,
1933            ColumnPath::new(vec![]),
1934        ));
1935        let invalid = (0..13).collect::<Vec<_>>();
1936
1937        let make_stats = |min, max| super::Statistics {
1938            max,
1939            min,
1940            null_count: Some(0),
1941            distinct_count: None,
1942            max_value: None,
1943            min_value: None,
1944            is_max_value_exact: None,
1945            is_min_value_exact: None,
1946        };
1947
1948        let err = super::convert_stats(&column_descr, Some(make_stats(Some(&invalid), None)))
1949            .unwrap_err();
1950        assert_eq!(
1951            err.to_string(),
1952            "Parquet error: Incorrect Int96 min statistics"
1953        );
1954
1955        let err = super::convert_stats(&column_descr, Some(make_stats(None, Some(&invalid))))
1956            .unwrap_err();
1957        assert_eq!(
1958            err.to_string(),
1959            "Parquet error: Incorrect Int96 max statistics"
1960        );
1961    }
1962
1963    #[test]
1964    fn malformed_bool_field_returns_error_not_panic() {
1965        let page_header = PageHeader {
1966            r#type: PageType::DICTIONARY_PAGE,
1967            uncompressed_page_size: 1,
1968            compressed_page_size: 1,
1969            crc: None,
1970            data_page_header: None,
1971            index_page_header: None,
1972            dictionary_page_header: Some(DictionaryPageHeader {
1973                num_values: 1,
1974                encoding: Encoding::PLAIN,
1975                is_sorted: Some(false),
1976            }),
1977            data_page_header_v2: None,
1978        };
1979
1980        let mut buf = thrift_bytes(&page_header);
1981        change_false_bool_field_to_i32(&mut buf);
1982
1983        let mut prot = ThriftSliceInputProtocol::new(&buf);
1984        let err = PageHeader::read_thrift_without_stats(&mut prot)
1985            .expect_err("malformed bool field should return an error");
1986        assert_malformed_bool_error(err);
1987    }
1988
1989    #[test]
1990    fn malformed_data_page_v2_bool_field_returns_error_not_panic() {
1991        let data_page_header_v2 = DataPageHeaderV2 {
1992            num_values: 1,
1993            num_nulls: 0,
1994            num_rows: 1,
1995            encoding: Encoding::PLAIN,
1996            definition_levels_byte_length: 0,
1997            repetition_levels_byte_length: 0,
1998            is_compressed: Some(false),
1999            statistics: None,
2000        };
2001
2002        let mut buf = thrift_bytes(&data_page_header_v2);
2003        change_false_bool_field_to_i32(&mut buf);
2004
2005        let mut prot = ThriftSliceInputProtocol::new(&buf);
2006        let err = DataPageHeaderV2::read_thrift_without_stats(&mut prot)
2007            .expect_err("malformed bool field should return an error");
2008        assert_malformed_bool_error(err);
2009    }
2010}