parquet/file/metadata/thrift/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module is the bridge between a Parquet file's thrift encoded metadata
19//! and this crate's [Parquet metadata API]. It contains objects and functions used
20//! to serialize/deserialize metadata objects into/from the Thrift compact protocol
21//! format as defined by the [Parquet specification].
22//!
23//! [Parquet metadata API]: crate::file::metadata
24//! [Parquet specification]: https://github.com/apache/parquet-format/tree/master
25
26use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34    column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37    basic::{
38        ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType,
39        Repetition, Type,
40    },
41    data_type::{ByteArray, FixedLenByteArray, Int96},
42    errors::{ParquetError, Result},
43    file::{
44        metadata::{
45            ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46            PageEncodingStats, ParquetMetaData, RowGroupMetaData, RowGroupMetaDataBuilder,
47            SortingColumn,
48        },
49        statistics::ValueStatistics,
50    },
51    parquet_thrift::{
52        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54        read_thrift_vec,
55    },
56    schema::types::{
57        ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58    },
59    thrift_struct,
60    util::bit_util::FromBytes,
61};
62
63// this needs to be visible to the schema conversion code
64thrift_struct!(
65pub(crate) struct SchemaElement<'a> {
66  /// Data type for this field. Not set if the current element is a non-leaf node
67  1: optional Type r#type;
68  /// If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values.
69  /// Otherwise, if specified, this is the maximum bit length to store any of the values.
70  /// (e.g. a low cardinality INT col could have this set to 3).  Note that this is
71  /// in the schema, and therefore fixed for the entire file.
72  2: optional i32 type_length;
73  /// Repetition of the field. The root of the schema does not have a repetition_type.
74  /// All other nodes must have one.
75  3: optional Repetition repetition_type;
76  /// Name of the field in the schema
77  4: required string<'a> name;
78  /// Nested fields. Since thrift does not support nested fields,
79  /// the nesting is flattened to a single list by a depth-first traversal.
80  /// The children count is used to construct the nested relationship.
81  /// This field is not set when the element is a primitive type.
82  5: optional i32 num_children;
83  /// DEPRECATED: When the schema is the result of a conversion from another model.
84  /// Used to record the original type to help with cross conversion.
85  ///
86  /// This is superseded by logical_type.
87  6: optional ConvertedType converted_type;
88  /// DEPRECATED: Used when this column contains decimal data.
89  /// See the DECIMAL converted type for more details.
90  ///
91  /// This is superseded by using the DecimalType annotation in logical_type.
92  7: optional i32 scale
93  8: optional i32 precision
94  /// When the original schema supports field ids, this will save the
95  /// original field id in the parquet schema
96  9: optional i32 field_id;
97  /// The logical type of this SchemaElement
98  ///
99  /// LogicalType replaces ConvertedType, but ConvertedType is still required
100  /// for some logical types to ensure forward-compatibility in format v1.
101  10: optional LogicalType logical_type
102}
103);
104
105thrift_struct!(
106struct Statistics<'a> {
107   1: optional binary<'a> max;
108   2: optional binary<'a> min;
109   3: optional i64 null_count;
110   4: optional i64 distinct_count;
111   5: optional binary<'a> max_value;
112   6: optional binary<'a> min_value;
113   7: optional bool is_max_value_exact;
114   8: optional bool is_min_value_exact;
115}
116);
117
118thrift_struct!(
119struct BoundingBox {
120  1: required double xmin;
121  2: required double xmax;
122  3: required double ymin;
123  4: required double ymax;
124  5: optional double zmin;
125  6: optional double zmax;
126  7: optional double mmin;
127  8: optional double mmax;
128}
129);
130
131thrift_struct!(
132struct GeospatialStatistics {
133  1: optional BoundingBox bbox;
134  2: optional list<i32> geospatial_types;
135}
136);
137
138thrift_struct!(
139struct SizeStatistics {
140   1: optional i64 unencoded_byte_array_data_bytes;
141   2: optional list<i64> repetition_level_histogram;
142   3: optional list<i64> definition_level_histogram;
143}
144);
145
146fn convert_geo_stats(
147    stats: Option<GeospatialStatistics>,
148) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
149    stats.map(|st| {
150        let bbox = convert_bounding_box(st.bbox);
151        let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
152        Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
153            bbox,
154            geospatial_types,
155        ))
156    })
157}
158
159fn convert_bounding_box(
160    bbox: Option<BoundingBox>,
161) -> Option<crate::geospatial::bounding_box::BoundingBox> {
162    bbox.map(|bb| {
163        let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
164            bb.xmin.into(),
165            bb.xmax.into(),
166            bb.ymin.into(),
167            bb.ymax.into(),
168        );
169
170        newbb = match (bb.zmin, bb.zmax) {
171            (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
172            // If either None or mismatch, leave it as None and don't error
173            _ => newbb,
174        };
175
176        newbb = match (bb.mmin, bb.mmax) {
177            (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
178            // If either None or mismatch, leave it as None and don't error
179            _ => newbb,
180        };
181
182        newbb
183    })
184}
185
186/// Create a [`crate::file::statistics::Statistics`] from a thrift [`Statistics`] object.
187fn convert_stats(
188    column_descr: &Arc<ColumnDescriptor>,
189    thrift_stats: Option<Statistics>,
190) -> Result<Option<crate::file::statistics::Statistics>> {
191    use crate::file::statistics::Statistics as FStatistics;
192    Ok(match thrift_stats {
193        Some(stats) => {
194            // Number of nulls recorded, when it is not available, we just mark it as 0.
195            // TODO this should be `None` if there is no information about NULLS.
196            // see https://github.com/apache/arrow-rs/pull/6216/files
197            let null_count = stats.null_count.unwrap_or(0);
198
199            if null_count < 0 {
200                return Err(general_err!(
201                    "Statistics null count is negative {}",
202                    null_count
203                ));
204            }
205
206            // Generic null count.
207            let null_count = Some(null_count as u64);
208            // Generic distinct count (count of distinct values occurring)
209            let distinct_count = stats.distinct_count.map(|value| value as u64);
210            // Whether or not statistics use deprecated min/max fields.
211            let old_format = stats.min_value.is_none() && stats.max_value.is_none();
212            // Generic min value as bytes.
213            let min = if old_format {
214                stats.min
215            } else {
216                stats.min_value
217            };
218            // Generic max value as bytes.
219            let max = if old_format {
220                stats.max
221            } else {
222                stats.max_value
223            };
224
225            fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
226                if let Some(min) = min {
227                    if min.len() < len {
228                        return Err(general_err!("Insufficient bytes to parse min statistic",));
229                    }
230                }
231                if let Some(max) = max {
232                    if max.len() < len {
233                        return Err(general_err!("Insufficient bytes to parse max statistic",));
234                    }
235                }
236                Ok(())
237            }
238
239            let physical_type = column_descr.physical_type();
240            match physical_type {
241                Type::BOOLEAN => check_len(&min, &max, 1),
242                Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
243                Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
244                Type::INT96 => check_len(&min, &max, 12),
245                _ => Ok(()),
246            }?;
247
248            // Values are encoded using PLAIN encoding definition, except that
249            // variable-length byte arrays do not include a length prefix.
250            //
251            // Instead of using actual decoder, we manually convert values.
252            let res = match physical_type {
253                Type::BOOLEAN => FStatistics::boolean(
254                    min.map(|data| data[0] != 0),
255                    max.map(|data| data[0] != 0),
256                    distinct_count,
257                    null_count,
258                    old_format,
259                ),
260                Type::INT32 => FStatistics::int32(
261                    min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
262                    max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263                    distinct_count,
264                    null_count,
265                    old_format,
266                ),
267                Type::INT64 => FStatistics::int64(
268                    min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
269                    max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270                    distinct_count,
271                    null_count,
272                    old_format,
273                ),
274                Type::INT96 => {
275                    // INT96 statistics may not be correct, because comparison is signed
276                    let min = if let Some(data) = min {
277                        assert_eq!(data.len(), 12);
278                        Some(Int96::try_from_le_slice(data)?)
279                    } else {
280                        None
281                    };
282                    let max = if let Some(data) = max {
283                        assert_eq!(data.len(), 12);
284                        Some(Int96::try_from_le_slice(data)?)
285                    } else {
286                        None
287                    };
288                    FStatistics::int96(min, max, distinct_count, null_count, old_format)
289                }
290                Type::FLOAT => FStatistics::float(
291                    min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
292                    max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293                    distinct_count,
294                    null_count,
295                    old_format,
296                ),
297                Type::DOUBLE => FStatistics::double(
298                    min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
299                    max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300                    distinct_count,
301                    null_count,
302                    old_format,
303                ),
304                Type::BYTE_ARRAY => FStatistics::ByteArray(
305                    ValueStatistics::new(
306                        min.map(ByteArray::from),
307                        max.map(ByteArray::from),
308                        distinct_count,
309                        null_count,
310                        old_format,
311                    )
312                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
313                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
314                ),
315                Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
316                    ValueStatistics::new(
317                        min.map(ByteArray::from).map(FixedLenByteArray::from),
318                        max.map(ByteArray::from).map(FixedLenByteArray::from),
319                        distinct_count,
320                        null_count,
321                        old_format,
322                    )
323                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
324                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
325                ),
326            };
327
328            Some(res)
329        }
330        None => None,
331    })
332}
333
334// bit positions for required fields in the Thrift ColumnMetaData struct
335const COL_META_TYPE: u16 = 1 << 1;
336const COL_META_ENCODINGS: u16 = 1 << 2;
337const COL_META_CODEC: u16 = 1 << 4;
338const COL_META_NUM_VALUES: u16 = 1 << 5;
339const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
340const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
341const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
342
343// a mask where all required fields' bits are set
344const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
345    | COL_META_ENCODINGS
346    | COL_META_CODEC
347    | COL_META_NUM_VALUES
348    | COL_META_TOTAL_UNCOMP_SZ
349    | COL_META_TOTAL_COMP_SZ
350    | COL_META_DATA_PAGE_OFFSET;
351
352// check mask to see if all required fields are set. return an appropriate error if
353// any are missing.
354fn validate_column_metadata(mask: u16) -> Result<()> {
355    if mask != COL_META_ALL_REQUIRED {
356        if mask & COL_META_ENCODINGS == 0 {
357            return Err(general_err!("Required field encodings is missing"));
358        }
359
360        if mask & COL_META_CODEC == 0 {
361            return Err(general_err!("Required field codec is missing"));
362        }
363        if mask & COL_META_NUM_VALUES == 0 {
364            return Err(general_err!("Required field num_values is missing"));
365        }
366        if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
367            return Err(general_err!(
368                "Required field total_uncompressed_size is missing"
369            ));
370        }
371        if mask & COL_META_TOTAL_COMP_SZ == 0 {
372            return Err(general_err!(
373                "Required field total_compressed_size is missing"
374            ));
375        }
376        if mask & COL_META_DATA_PAGE_OFFSET == 0 {
377            return Err(general_err!("Required field data_page_offset is missing"));
378        }
379    }
380
381    Ok(())
382}
383
384// Decode `ColumnMetaData`. Returns a mask of all required fields that were observed.
385// This mask can be passed to `validate_column_metadata`.
386fn read_column_metadata<'a>(
387    prot: &mut ThriftSliceInputProtocol<'a>,
388    column: &mut ColumnChunkMetaData,
389) -> Result<u16> {
390    // mask for seen required fields in ColumnMetaData
391    let mut seen_mask = 0u16;
392
393    // struct ColumnMetaData {
394    //   1: required Type type
395    //   2: required list<Encoding> encodings
396    //   3: required list<string> path_in_schema
397    //   4: required CompressionCodec codec
398    //   5: required i64 num_values
399    //   6: required i64 total_uncompressed_size
400    //   7: required i64 total_compressed_size
401    //   8: optional list<KeyValue> key_value_metadata
402    //   9: required i64 data_page_offset
403    //   10: optional i64 index_page_offset
404    //   11: optional i64 dictionary_page_offset
405    //   12: optional Statistics statistics;
406    //   13: optional list<PageEncodingStats> encoding_stats;
407    //   14: optional i64 bloom_filter_offset;
408    //   15: optional i32 bloom_filter_length;
409    //   16: optional SizeStatistics size_statistics;
410    //   17: optional GeospatialStatistics geospatial_statistics;
411    // }
412    let column_descr = &column.column_descr;
413
414    let mut last_field_id = 0i16;
415    loop {
416        let field_ident = prot.read_field_begin(last_field_id)?;
417        if field_ident.field_type == FieldType::Stop {
418            break;
419        }
420        match field_ident.id {
421            // 1: type is never used, we can use the column descriptor
422            1 => {
423                // read for error handling
424                Type::read_thrift(&mut *prot)?;
425                seen_mask |= COL_META_TYPE;
426            }
427            2 => {
428                column.encodings = EncodingMask::read_thrift(&mut *prot)?;
429                seen_mask |= COL_META_ENCODINGS;
430            }
431            // 3: path_in_schema is redundant
432            4 => {
433                column.compression = Compression::read_thrift(&mut *prot)?;
434                seen_mask |= COL_META_CODEC;
435            }
436            5 => {
437                column.num_values = i64::read_thrift(&mut *prot)?;
438                seen_mask |= COL_META_NUM_VALUES;
439            }
440            6 => {
441                column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
442                seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
443            }
444            7 => {
445                column.total_compressed_size = i64::read_thrift(&mut *prot)?;
446                seen_mask |= COL_META_TOTAL_COMP_SZ;
447            }
448            // 8: we don't expose this key value
449            9 => {
450                column.data_page_offset = i64::read_thrift(&mut *prot)?;
451                seen_mask |= COL_META_DATA_PAGE_OFFSET;
452            }
453            10 => {
454                column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
455            }
456            11 => {
457                column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
458            }
459            12 => {
460                column.statistics =
461                    convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
462            }
463            13 => {
464                let val =
465                    read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
466                column.encoding_stats = Some(val);
467            }
468            14 => {
469                column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
470            }
471            15 => {
472                column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
473            }
474            16 => {
475                let val = SizeStatistics::read_thrift(&mut *prot)?;
476                column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
477                column.repetition_level_histogram =
478                    val.repetition_level_histogram.map(LevelHistogram::from);
479                column.definition_level_histogram =
480                    val.definition_level_histogram.map(LevelHistogram::from);
481            }
482            17 => {
483                let val = GeospatialStatistics::read_thrift(&mut *prot)?;
484                column.geo_statistics = convert_geo_stats(Some(val));
485            }
486            _ => {
487                prot.skip(field_ident.field_type)?;
488            }
489        };
490        last_field_id = field_ident.id;
491    }
492
493    Ok(seen_mask)
494}
495
496// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait because
497// these are all internal and operate on slices.
498fn read_column_chunk<'a>(
499    prot: &mut ThriftSliceInputProtocol<'a>,
500    column_descr: &Arc<ColumnDescriptor>,
501) -> Result<ColumnChunkMetaData> {
502    // create a default initialized ColumnMetaData
503    let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
504
505    // seen flag for file_offset
506    let mut has_file_offset = false;
507
508    // mask of seen flags for ColumnMetaData
509    let mut col_meta_mask = 0u16;
510
511    // struct ColumnChunk {
512    //   1: optional string file_path
513    //   2: required i64 file_offset = 0
514    //   3: optional ColumnMetaData meta_data
515    //   4: optional i64 offset_index_offset
516    //   5: optional i32 offset_index_length
517    //   6: optional i64 column_index_offset
518    //   7: optional i32 column_index_length
519    //   8: optional ColumnCryptoMetaData crypto_metadata
520    //   9: optional binary encrypted_column_metadata
521    // }
522    let mut last_field_id = 0i16;
523    loop {
524        let field_ident = prot.read_field_begin(last_field_id)?;
525        if field_ident.field_type == FieldType::Stop {
526            break;
527        }
528        match field_ident.id {
529            1 => {
530                col.file_path = Some(String::read_thrift(&mut *prot)?);
531            }
532            2 => {
533                col.file_offset = i64::read_thrift(&mut *prot)?;
534                has_file_offset = true;
535            }
536            3 => {
537                col_meta_mask = read_column_metadata(&mut *prot, &mut col)?;
538            }
539            4 => {
540                col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
541            }
542            5 => {
543                col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
544            }
545            6 => {
546                col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
547            }
548            7 => {
549                col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
550            }
551            #[cfg(feature = "encryption")]
552            8 => {
553                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
554                col.column_crypto_metadata = Some(Box::new(val));
555            }
556            #[cfg(feature = "encryption")]
557            9 => {
558                col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
559            }
560            _ => {
561                prot.skip(field_ident.field_type)?;
562            }
563        };
564        last_field_id = field_ident.id;
565    }
566
567    // the only required field from ColumnChunk
568    if !has_file_offset {
569        return Err(general_err!("Required field file_offset is missing"));
570    };
571
572    // if encrypted just return. we'll decrypt after finishing the footer and populate the rest.
573    #[cfg(feature = "encryption")]
574    if col.encrypted_column_metadata.is_some() {
575        return Ok(col);
576    }
577
578    // not encrypted, so make sure all required fields were read
579    validate_column_metadata(col_meta_mask)?;
580
581    Ok(col)
582}
583
584fn read_row_group(
585    prot: &mut ThriftSliceInputProtocol,
586    schema_descr: &Arc<SchemaDescriptor>,
587) -> Result<RowGroupMetaData> {
588    // create default initialized RowGroupMetaData
589    let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
590
591    // mask values for required fields
592    const RG_COLUMNS: u8 = 1 << 1;
593    const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
594    const RG_NUM_ROWS: u8 = 1 << 3;
595    const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
596
597    let mut mask = 0u8;
598
599    // struct RowGroup {
600    //   1: required list<ColumnChunk> columns
601    //   2: required i64 total_byte_size
602    //   3: required i64 num_rows
603    //   4: optional list<SortingColumn> sorting_columns
604    //   5: optional i64 file_offset
605    //   6: optional i64 total_compressed_size
606    //   7: optional i16 ordinal
607    // }
608    let mut last_field_id = 0i16;
609    loop {
610        let field_ident = prot.read_field_begin(last_field_id)?;
611        if field_ident.field_type == FieldType::Stop {
612            break;
613        }
614        match field_ident.id {
615            1 => {
616                let list_ident = prot.read_list_begin()?;
617                if schema_descr.num_columns() != list_ident.size as usize {
618                    return Err(general_err!(
619                        "Column count mismatch. Schema has {} columns while Row Group has {}",
620                        schema_descr.num_columns(),
621                        list_ident.size
622                    ));
623                }
624                for i in 0..list_ident.size as usize {
625                    let col = read_column_chunk(prot, &schema_descr.columns()[i])?;
626                    row_group.columns.push(col);
627                }
628                mask |= RG_COLUMNS;
629            }
630            2 => {
631                row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
632                mask |= RG_TOT_BYTE_SIZE;
633            }
634            3 => {
635                row_group.num_rows = i64::read_thrift(&mut *prot)?;
636                mask |= RG_NUM_ROWS;
637            }
638            4 => {
639                let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
640                row_group.sorting_columns = Some(val);
641            }
642            5 => {
643                row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
644            }
645            // 6: we don't expose total_compressed_size
646            7 => {
647                row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
648            }
649            _ => {
650                prot.skip(field_ident.field_type)?;
651            }
652        };
653        last_field_id = field_ident.id;
654    }
655
656    if mask != RG_ALL_REQUIRED {
657        if mask & RG_COLUMNS == 0 {
658            return Err(general_err!("Required field columns is missing"));
659        }
660        if mask & RG_TOT_BYTE_SIZE == 0 {
661            return Err(general_err!("Required field total_byte_size is missing"));
662        }
663        if mask & RG_NUM_ROWS == 0 {
664            return Err(general_err!("Required field num_rows is missing"));
665        }
666    }
667
668    Ok(row_group)
669}
670
671/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes the file metadata in
672/// the Parquet footer. Page indexes will need to be added later.
673pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result<ParquetMetaData> {
674    let mut prot = ThriftSliceInputProtocol::new(buf);
675
676    // begin reading the file metadata
677    let mut version: Option<i32> = None;
678    let mut num_rows: Option<i64> = None;
679    let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
680    let mut key_value_metadata: Option<Vec<KeyValue>> = None;
681    let mut created_by: Option<&str> = None;
682    let mut column_orders: Option<Vec<ColumnOrder>> = None;
683    #[cfg(feature = "encryption")]
684    let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
685    #[cfg(feature = "encryption")]
686    let mut footer_signing_key_metadata: Option<&[u8]> = None;
687
688    // this will need to be set before parsing row groups
689    let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
690
691    // struct FileMetaData {
692    //   1: required i32 version
693    //   2: required list<SchemaElement> schema;
694    //   3: required i64 num_rows
695    //   4: required list<RowGroup> row_groups
696    //   5: optional list<KeyValue> key_value_metadata
697    //   6: optional string created_by
698    //   7: optional list<ColumnOrder> column_orders;
699    //   8: optional EncryptionAlgorithm encryption_algorithm
700    //   9: optional binary footer_signing_key_metadata
701    // }
702    let mut last_field_id = 0i16;
703    loop {
704        let field_ident = prot.read_field_begin(last_field_id)?;
705        if field_ident.field_type == FieldType::Stop {
706            break;
707        }
708        match field_ident.id {
709            1 => {
710                version = Some(i32::read_thrift(&mut prot)?);
711            }
712            2 => {
713                // read schema and convert to SchemaDescriptor for use when reading row groups
714                let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
715                let val = parquet_schema_from_array(val)?;
716                schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
717            }
718            3 => {
719                num_rows = Some(i64::read_thrift(&mut prot)?);
720            }
721            4 => {
722                if schema_descr.is_none() {
723                    return Err(general_err!("Required field schema is missing"));
724                }
725                let schema_descr = schema_descr.as_ref().unwrap();
726                let list_ident = prot.read_list_begin()?;
727                let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
728                for _ in 0..list_ident.size {
729                    rg_vec.push(read_row_group(&mut prot, schema_descr)?);
730                }
731                row_groups = Some(rg_vec);
732            }
733            5 => {
734                let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
735                key_value_metadata = Some(val);
736            }
737            6 => {
738                created_by = Some(<&str>::read_thrift(&mut prot)?);
739            }
740            7 => {
741                let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
742                column_orders = Some(val);
743            }
744            #[cfg(feature = "encryption")]
745            8 => {
746                let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
747                encryption_algorithm = Some(val);
748            }
749            #[cfg(feature = "encryption")]
750            9 => {
751                footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
752            }
753            _ => {
754                prot.skip(field_ident.field_type)?;
755            }
756        };
757        last_field_id = field_ident.id;
758    }
759    let Some(version) = version else {
760        return Err(general_err!("Required field version is missing"));
761    };
762    let Some(num_rows) = num_rows else {
763        return Err(general_err!("Required field num_rows is missing"));
764    };
765    let Some(row_groups) = row_groups else {
766        return Err(general_err!("Required field row_groups is missing"));
767    };
768
769    let created_by = created_by.map(|c| c.to_owned());
770
771    // we've tested for `None` by now so this is safe
772    let schema_descr = schema_descr.unwrap();
773
774    // need to map read column orders to actual values based on the schema
775    if column_orders
776        .as_ref()
777        .is_some_and(|cos| cos.len() != schema_descr.num_columns())
778    {
779        return Err(general_err!("Column order length mismatch"));
780    }
781    // replace default type defined column orders with ones having the correct sort order
782    // TODO(ets): this could instead be done above when decoding
783    let column_orders = column_orders.map(|mut cos| {
784        for (i, column) in schema_descr.columns().iter().enumerate() {
785            if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
786                let sort_order = ColumnOrder::get_sort_order(
787                    column.logical_type(),
788                    column.converted_type(),
789                    column.physical_type(),
790                );
791                cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
792            }
793        }
794        cos
795    });
796
797    #[cfg(not(feature = "encryption"))]
798    let fmd = crate::file::metadata::FileMetaData::new(
799        version,
800        num_rows,
801        created_by,
802        key_value_metadata,
803        schema_descr,
804        column_orders,
805    );
806    #[cfg(feature = "encryption")]
807    let fmd = crate::file::metadata::FileMetaData::new(
808        version,
809        num_rows,
810        created_by,
811        key_value_metadata,
812        schema_descr,
813        column_orders,
814    )
815    .with_encryption_algorithm(encryption_algorithm)
816    .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
817
818    Ok(ParquetMetaData::new(fmd, row_groups))
819}
820
821thrift_struct!(
822    pub(crate) struct IndexPageHeader {}
823);
824
825thrift_struct!(
826pub(crate) struct DictionaryPageHeader {
827  /// Number of values in the dictionary
828  1: required i32 num_values;
829
830  /// Encoding using this dictionary page
831  2: required Encoding encoding
832
833  /// If true, the entries in the dictionary are sorted in ascending order
834  3: optional bool is_sorted;
835}
836);
837
838thrift_struct!(
839/// Statistics for the page header.
840///
841/// This is a duplicate of the [`Statistics`] struct above. Because the page reader uses
842/// the [`Read`] API, we cannot read the min/max values as slices. This should not be
843/// a huge problem since this crate no longer reads the page header statistics by default.
844///
845/// [`Read`]: crate::parquet_thrift::ThriftReadInputProtocol
846pub(crate) struct PageStatistics {
847   1: optional binary max;
848   2: optional binary min;
849   3: optional i64 null_count;
850   4: optional i64 distinct_count;
851   5: optional binary max_value;
852   6: optional binary min_value;
853   7: optional bool is_max_value_exact;
854   8: optional bool is_min_value_exact;
855}
856);
857
858thrift_struct!(
859pub(crate) struct DataPageHeader {
860  1: required i32 num_values
861  2: required Encoding encoding
862  3: required Encoding definition_level_encoding;
863  4: required Encoding repetition_level_encoding;
864  5: optional PageStatistics statistics;
865}
866);
867
868impl DataPageHeader {
869    // reader that skips decoding page statistics
870    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
871    where
872        R: ThriftCompactInputProtocol<'a>,
873    {
874        let mut num_values: Option<i32> = None;
875        let mut encoding: Option<Encoding> = None;
876        let mut definition_level_encoding: Option<Encoding> = None;
877        let mut repetition_level_encoding: Option<Encoding> = None;
878        let statistics: Option<PageStatistics> = None;
879        let mut last_field_id = 0i16;
880        loop {
881            let field_ident = prot.read_field_begin(last_field_id)?;
882            if field_ident.field_type == FieldType::Stop {
883                break;
884            }
885            match field_ident.id {
886                1 => {
887                    let val = i32::read_thrift(&mut *prot)?;
888                    num_values = Some(val);
889                }
890                2 => {
891                    let val = Encoding::read_thrift(&mut *prot)?;
892                    encoding = Some(val);
893                }
894                3 => {
895                    let val = Encoding::read_thrift(&mut *prot)?;
896                    definition_level_encoding = Some(val);
897                }
898                4 => {
899                    let val = Encoding::read_thrift(&mut *prot)?;
900                    repetition_level_encoding = Some(val);
901                }
902                _ => {
903                    prot.skip(field_ident.field_type)?;
904                }
905            };
906            last_field_id = field_ident.id;
907        }
908        let Some(num_values) = num_values else {
909            return Err(general_err!("Required field num_values is missing"));
910        };
911        let Some(encoding) = encoding else {
912            return Err(general_err!("Required field encoding is missing"));
913        };
914        let Some(definition_level_encoding) = definition_level_encoding else {
915            return Err(general_err!(
916                "Required field definition_level_encoding is missing"
917            ));
918        };
919        let Some(repetition_level_encoding) = repetition_level_encoding else {
920            return Err(general_err!(
921                "Required field repetition_level_encoding is missing"
922            ));
923        };
924        Ok(Self {
925            num_values,
926            encoding,
927            definition_level_encoding,
928            repetition_level_encoding,
929            statistics,
930        })
931    }
932}
933
934thrift_struct!(
935pub(crate) struct DataPageHeaderV2 {
936  1: required i32 num_values
937  2: required i32 num_nulls
938  3: required i32 num_rows
939  4: required Encoding encoding
940  5: required i32 definition_levels_byte_length;
941  6: required i32 repetition_levels_byte_length;
942  7: optional bool is_compressed = true;
943  8: optional PageStatistics statistics;
944}
945);
946
947impl DataPageHeaderV2 {
948    // reader that skips decoding page statistics
949    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
950    where
951        R: ThriftCompactInputProtocol<'a>,
952    {
953        let mut num_values: Option<i32> = None;
954        let mut num_nulls: Option<i32> = None;
955        let mut num_rows: Option<i32> = None;
956        let mut encoding: Option<Encoding> = None;
957        let mut definition_levels_byte_length: Option<i32> = None;
958        let mut repetition_levels_byte_length: Option<i32> = None;
959        let mut is_compressed: Option<bool> = None;
960        let statistics: Option<PageStatistics> = None;
961        let mut last_field_id = 0i16;
962        loop {
963            let field_ident = prot.read_field_begin(last_field_id)?;
964            if field_ident.field_type == FieldType::Stop {
965                break;
966            }
967            match field_ident.id {
968                1 => {
969                    let val = i32::read_thrift(&mut *prot)?;
970                    num_values = Some(val);
971                }
972                2 => {
973                    let val = i32::read_thrift(&mut *prot)?;
974                    num_nulls = Some(val);
975                }
976                3 => {
977                    let val = i32::read_thrift(&mut *prot)?;
978                    num_rows = Some(val);
979                }
980                4 => {
981                    let val = Encoding::read_thrift(&mut *prot)?;
982                    encoding = Some(val);
983                }
984                5 => {
985                    let val = i32::read_thrift(&mut *prot)?;
986                    definition_levels_byte_length = Some(val);
987                }
988                6 => {
989                    let val = i32::read_thrift(&mut *prot)?;
990                    repetition_levels_byte_length = Some(val);
991                }
992                7 => {
993                    let val = field_ident.bool_val.unwrap();
994                    is_compressed = Some(val);
995                }
996                _ => {
997                    prot.skip(field_ident.field_type)?;
998                }
999            };
1000            last_field_id = field_ident.id;
1001        }
1002        let Some(num_values) = num_values else {
1003            return Err(general_err!("Required field num_values is missing"));
1004        };
1005        let Some(num_nulls) = num_nulls else {
1006            return Err(general_err!("Required field num_nulls is missing"));
1007        };
1008        let Some(num_rows) = num_rows else {
1009            return Err(general_err!("Required field num_rows is missing"));
1010        };
1011        let Some(encoding) = encoding else {
1012            return Err(general_err!("Required field encoding is missing"));
1013        };
1014        let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1015            return Err(general_err!(
1016                "Required field definition_levels_byte_length is missing"
1017            ));
1018        };
1019        let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1020            return Err(general_err!(
1021                "Required field repetition_levels_byte_length is missing"
1022            ));
1023        };
1024        Ok(Self {
1025            num_values,
1026            num_nulls,
1027            num_rows,
1028            encoding,
1029            definition_levels_byte_length,
1030            repetition_levels_byte_length,
1031            is_compressed,
1032            statistics,
1033        })
1034    }
1035}
1036
1037thrift_struct!(
1038pub(crate) struct PageHeader {
1039  /// the type of the page: indicates which of the *_header fields is set
1040  1: required PageType r#type
1041
1042  /// Uncompressed page size in bytes (not including this header)
1043  2: required i32 uncompressed_page_size
1044
1045  /// Compressed (and potentially encrypted) page size in bytes, not including this header
1046  3: required i32 compressed_page_size
1047
1048  /// The 32-bit CRC checksum for the page, to be be calculated as follows:
1049  4: optional i32 crc
1050
1051  // Headers for page specific data.  One only will be set.
1052  5: optional DataPageHeader data_page_header;
1053  6: optional IndexPageHeader index_page_header;
1054  7: optional DictionaryPageHeader dictionary_page_header;
1055  8: optional DataPageHeaderV2 data_page_header_v2;
1056}
1057);
1058
1059impl PageHeader {
1060    // reader that skips reading page statistics. obtained by running
1061    // `cargo expand -p parquet --all-features --lib file::metadata::thrift`
1062    // and modifying the impl of `read_thrift`
1063    pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1064    where
1065        R: ThriftCompactInputProtocol<'a>,
1066    {
1067        let mut type_: Option<PageType> = None;
1068        let mut uncompressed_page_size: Option<i32> = None;
1069        let mut compressed_page_size: Option<i32> = None;
1070        let mut crc: Option<i32> = None;
1071        let mut data_page_header: Option<DataPageHeader> = None;
1072        let mut index_page_header: Option<IndexPageHeader> = None;
1073        let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1074        let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1075        let mut last_field_id = 0i16;
1076        loop {
1077            let field_ident = prot.read_field_begin(last_field_id)?;
1078            if field_ident.field_type == FieldType::Stop {
1079                break;
1080            }
1081            match field_ident.id {
1082                1 => {
1083                    let val = PageType::read_thrift(&mut *prot)?;
1084                    type_ = Some(val);
1085                }
1086                2 => {
1087                    let val = i32::read_thrift(&mut *prot)?;
1088                    uncompressed_page_size = Some(val);
1089                }
1090                3 => {
1091                    let val = i32::read_thrift(&mut *prot)?;
1092                    compressed_page_size = Some(val);
1093                }
1094                4 => {
1095                    let val = i32::read_thrift(&mut *prot)?;
1096                    crc = Some(val);
1097                }
1098                5 => {
1099                    let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1100                    data_page_header = Some(val);
1101                }
1102                6 => {
1103                    let val = IndexPageHeader::read_thrift(&mut *prot)?;
1104                    index_page_header = Some(val);
1105                }
1106                7 => {
1107                    let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1108                    dictionary_page_header = Some(val);
1109                }
1110                8 => {
1111                    let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1112                    data_page_header_v2 = Some(val);
1113                }
1114                _ => {
1115                    prot.skip(field_ident.field_type)?;
1116                }
1117            };
1118            last_field_id = field_ident.id;
1119        }
1120        let Some(type_) = type_ else {
1121            return Err(general_err!("Required field type_ is missing"));
1122        };
1123        let Some(uncompressed_page_size) = uncompressed_page_size else {
1124            return Err(general_err!(
1125                "Required field uncompressed_page_size is missing"
1126            ));
1127        };
1128        let Some(compressed_page_size) = compressed_page_size else {
1129            return Err(general_err!(
1130                "Required field compressed_page_size is missing"
1131            ));
1132        };
1133        Ok(Self {
1134            r#type: type_,
1135            uncompressed_page_size,
1136            compressed_page_size,
1137            crc,
1138            data_page_header,
1139            index_page_header,
1140            dictionary_page_header,
1141            data_page_header_v2,
1142        })
1143    }
1144}
1145
1146/////////////////////////////////////////////////
1147// helper functions for writing file meta data
1148
1149// serialize the bits of the column chunk needed for a thrift ColumnMetaData
1150// struct ColumnMetaData {
1151//   1: required Type type
1152//   2: required list<Encoding> encodings
1153//   3: required list<string> path_in_schema
1154//   4: required CompressionCodec codec
1155//   5: required i64 num_values
1156//   6: required i64 total_uncompressed_size
1157//   7: required i64 total_compressed_size
1158//   8: optional list<KeyValue> key_value_metadata
1159//   9: required i64 data_page_offset
1160//   10: optional i64 index_page_offset
1161//   11: optional i64 dictionary_page_offset
1162//   12: optional Statistics statistics;
1163//   13: optional list<PageEncodingStats> encoding_stats;
1164//   14: optional i64 bloom_filter_offset;
1165//   15: optional i32 bloom_filter_length;
1166//   16: optional SizeStatistics size_statistics;
1167//   17: optional GeospatialStatistics geospatial_statistics;
1168// }
1169pub(super) fn serialize_column_meta_data<W: Write>(
1170    column_chunk: &ColumnChunkMetaData,
1171    w: &mut ThriftCompactOutputProtocol<W>,
1172) -> Result<()> {
1173    use crate::file::statistics::page_stats_to_thrift;
1174
1175    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1176    column_chunk
1177        .encodings()
1178        .collect::<Vec<_>>()
1179        .write_thrift_field(w, 2, 1)?;
1180    let path = column_chunk.column_descr.path().parts();
1181    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1182    path.write_thrift_field(w, 3, 2)?;
1183    column_chunk.compression.write_thrift_field(w, 4, 3)?;
1184    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1185    column_chunk
1186        .total_uncompressed_size
1187        .write_thrift_field(w, 6, 5)?;
1188    column_chunk
1189        .total_compressed_size
1190        .write_thrift_field(w, 7, 6)?;
1191    // no key_value_metadata here
1192    let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1193    if let Some(index_page_offset) = column_chunk.index_page_offset {
1194        last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1195    }
1196    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1197        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1198    }
1199    // PageStatistics is the same as thrift Statistics, but writable
1200    let stats = page_stats_to_thrift(column_chunk.statistics());
1201    if let Some(stats) = stats {
1202        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1203    }
1204    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1205        last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1206    }
1207    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1208        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1209    }
1210    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1211        last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1212    }
1213
1214    // SizeStatistics
1215    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1216        || column_chunk.repetition_level_histogram.is_some()
1217        || column_chunk.definition_level_histogram.is_some()
1218    {
1219        let repetition_level_histogram = column_chunk
1220            .repetition_level_histogram()
1221            .map(|hist| hist.clone().into_inner());
1222
1223        let definition_level_histogram = column_chunk
1224            .definition_level_histogram()
1225            .map(|hist| hist.clone().into_inner());
1226
1227        Some(SizeStatistics {
1228            unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1229            repetition_level_histogram,
1230            definition_level_histogram,
1231        })
1232    } else {
1233        None
1234    };
1235    if let Some(size_stats) = size_stats {
1236        last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1237    }
1238
1239    if let Some(geo_stats) = column_chunk.geo_statistics() {
1240        geo_stats.write_thrift_field(w, 17, last_field_id)?;
1241    }
1242
1243    w.write_struct_end()
1244}
1245
1246// temp struct used for writing
1247pub(super) struct FileMeta<'a> {
1248    pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1249    pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1250}
1251
1252// struct FileMetaData {
1253//   1: required i32 version
1254//   2: required list<SchemaElement> schema;
1255//   3: required i64 num_rows
1256//   4: required list<RowGroup> row_groups
1257//   5: optional list<KeyValue> key_value_metadata
1258//   6: optional string created_by
1259//   7: optional list<ColumnOrder> column_orders;
1260//   8: optional EncryptionAlgorithm encryption_algorithm
1261//   9: optional binary footer_signing_key_metadata
1262// }
1263impl<'a> WriteThrift for FileMeta<'a> {
1264    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1265
1266    // needed for last_field_id w/o encryption
1267    #[allow(unused_assignments)]
1268    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1269        self.file_metadata
1270            .version
1271            .write_thrift_field(writer, 1, 0)?;
1272
1273        // field 2 is schema. do depth-first traversal of tree, converting to SchemaElement and
1274        // writing along the way.
1275        let root = self.file_metadata.schema_descr().root_schema_ptr();
1276        let schema_len = num_nodes(&root)?;
1277        writer.write_field_begin(FieldType::List, 2, 1)?;
1278        writer.write_list_begin(ElementType::Struct, schema_len)?;
1279        // recursively write Type nodes as SchemaElements
1280        write_schema(&root, writer)?;
1281
1282        self.file_metadata
1283            .num_rows
1284            .write_thrift_field(writer, 3, 2)?;
1285
1286        // this will call RowGroupMetaData::write_thrift
1287        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1288
1289        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1290            last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1291        }
1292        if let Some(created_by) = self.file_metadata.created_by() {
1293            last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1294        }
1295        if let Some(column_orders) = self.file_metadata.column_orders() {
1296            last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1297        }
1298        #[cfg(feature = "encryption")]
1299        if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1300            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1301        }
1302        #[cfg(feature = "encryption")]
1303        if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1304            key.as_slice()
1305                .write_thrift_field(writer, 9, last_field_id)?;
1306        }
1307
1308        writer.write_struct_end()
1309    }
1310}
1311
1312fn write_schema<W: Write>(
1313    schema: &TypePtr,
1314    writer: &mut ThriftCompactOutputProtocol<W>,
1315) -> Result<()> {
1316    if !schema.is_group() {
1317        return Err(general_err!("Root schema must be Group type"));
1318    }
1319    write_schema_helper(schema, writer)
1320}
1321
1322fn write_schema_helper<W: Write>(
1323    node: &TypePtr,
1324    writer: &mut ThriftCompactOutputProtocol<W>,
1325) -> Result<()> {
1326    match node.as_ref() {
1327        crate::schema::types::Type::PrimitiveType {
1328            basic_info,
1329            physical_type,
1330            type_length,
1331            scale,
1332            precision,
1333        } => {
1334            let element = SchemaElement {
1335                r#type: Some(*physical_type),
1336                type_length: if *type_length >= 0 {
1337                    Some(*type_length)
1338                } else {
1339                    None
1340                },
1341                repetition_type: Some(basic_info.repetition()),
1342                name: basic_info.name(),
1343                num_children: None,
1344                converted_type: match basic_info.converted_type() {
1345                    ConvertedType::NONE => None,
1346                    other => Some(other),
1347                },
1348                scale: if *scale >= 0 { Some(*scale) } else { None },
1349                precision: if *precision >= 0 {
1350                    Some(*precision)
1351                } else {
1352                    None
1353                },
1354                field_id: if basic_info.has_id() {
1355                    Some(basic_info.id())
1356                } else {
1357                    None
1358                },
1359                logical_type: basic_info.logical_type(),
1360            };
1361            element.write_thrift(writer)
1362        }
1363        crate::schema::types::Type::GroupType { basic_info, fields } => {
1364            let repetition = if basic_info.has_repetition() {
1365                Some(basic_info.repetition())
1366            } else {
1367                None
1368            };
1369
1370            let element = SchemaElement {
1371                r#type: None,
1372                type_length: None,
1373                repetition_type: repetition,
1374                name: basic_info.name(),
1375                num_children: Some(fields.len().try_into()?),
1376                converted_type: match basic_info.converted_type() {
1377                    ConvertedType::NONE => None,
1378                    other => Some(other),
1379                },
1380                scale: None,
1381                precision: None,
1382                field_id: if basic_info.has_id() {
1383                    Some(basic_info.id())
1384                } else {
1385                    None
1386                },
1387                logical_type: basic_info.logical_type(),
1388            };
1389
1390            element.write_thrift(writer)?;
1391
1392            // Add child elements for a group
1393            for field in fields {
1394                write_schema_helper(field, writer)?;
1395            }
1396            Ok(())
1397        }
1398    }
1399}
1400
1401// struct RowGroup {
1402//   1: required list<ColumnChunk> columns
1403//   2: required i64 total_byte_size
1404//   3: required i64 num_rows
1405//   4: optional list<SortingColumn> sorting_columns
1406//   5: optional i64 file_offset
1407//   6: optional i64 total_compressed_size
1408//   7: optional i16 ordinal
1409// }
1410impl WriteThrift for RowGroupMetaData {
1411    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1412
1413    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1414        // this will call ColumnChunkMetaData::write_thrift
1415        self.columns.write_thrift_field(writer, 1, 0)?;
1416        self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1417        let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1418        if let Some(sorting_columns) = self.sorting_columns() {
1419            last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1420        }
1421        if let Some(file_offset) = self.file_offset() {
1422            last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1423        }
1424        // this is optional, but we'll always write it
1425        last_field_id = self
1426            .compressed_size()
1427            .write_thrift_field(writer, 6, last_field_id)?;
1428        if let Some(ordinal) = self.ordinal() {
1429            ordinal.write_thrift_field(writer, 7, last_field_id)?;
1430        }
1431        writer.write_struct_end()
1432    }
1433}
1434
1435// struct ColumnChunk {
1436//   1: optional string file_path
1437//   2: required i64 file_offset = 0
1438//   3: optional ColumnMetaData meta_data
1439//   4: optional i64 offset_index_offset
1440//   5: optional i32 offset_index_length
1441//   6: optional i64 column_index_offset
1442//   7: optional i32 column_index_length
1443//   8: optional ColumnCryptoMetaData crypto_metadata
1444//   9: optional binary encrypted_column_metadata
1445// }
1446impl WriteThrift for ColumnChunkMetaData {
1447    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1448
1449    #[allow(unused_assignments)]
1450    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1451        let mut last_field_id = 0i16;
1452        if let Some(file_path) = self.file_path() {
1453            last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1454        }
1455        last_field_id = self
1456            .file_offset()
1457            .write_thrift_field(writer, 2, last_field_id)?;
1458
1459        #[cfg(feature = "encryption")]
1460        {
1461            // only write the ColumnMetaData if we haven't already encrypted it
1462            if self.encrypted_column_metadata.is_none() {
1463                writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1464                serialize_column_meta_data(self, writer)?;
1465                last_field_id = 3;
1466            }
1467        }
1468        #[cfg(not(feature = "encryption"))]
1469        {
1470            // always write the ColumnMetaData
1471            writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1472            serialize_column_meta_data(self, writer)?;
1473            last_field_id = 3;
1474        }
1475
1476        if let Some(offset_idx_off) = self.offset_index_offset() {
1477            last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1478        }
1479        if let Some(offset_idx_len) = self.offset_index_length() {
1480            last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1481        }
1482        if let Some(column_idx_off) = self.column_index_offset() {
1483            last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1484        }
1485        if let Some(column_idx_len) = self.column_index_length() {
1486            last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1487        }
1488        #[cfg(feature = "encryption")]
1489        {
1490            if let Some(crypto_metadata) = self.crypto_metadata() {
1491                last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1492            }
1493            if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1494                encrypted_meta
1495                    .as_slice()
1496                    .write_thrift_field(writer, 9, last_field_id)?;
1497            }
1498        }
1499
1500        writer.write_struct_end()
1501    }
1502}
1503
1504// struct GeospatialStatistics {
1505//   1: optional BoundingBox bbox;
1506//   2: optional list<i32> geospatial_types;
1507// }
1508impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1509    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1510
1511    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1512        let mut last_field_id = 0i16;
1513        if let Some(bbox) = self.bounding_box() {
1514            last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1515        }
1516        if let Some(geo_types) = self.geospatial_types() {
1517            geo_types.write_thrift_field(writer, 2, last_field_id)?;
1518        }
1519
1520        writer.write_struct_end()
1521    }
1522}
1523
1524impl WriteThriftField for crate::geospatial::statistics::GeospatialStatistics {
1525    fn write_thrift_field<W: Write>(
1526        &self,
1527        writer: &mut ThriftCompactOutputProtocol<W>,
1528        field_id: i16,
1529        last_field_id: i16,
1530    ) -> Result<i16> {
1531        writer.write_field_begin(FieldType::Struct, field_id, last_field_id)?;
1532        self.write_thrift(writer)?;
1533        Ok(field_id)
1534    }
1535}
1536
1537// struct BoundingBox {
1538//   1: required double xmin;
1539//   2: required double xmax;
1540//   3: required double ymin;
1541//   4: required double ymax;
1542//   5: optional double zmin;
1543//   6: optional double zmax;
1544//   7: optional double mmin;
1545//   8: optional double mmax;
1546// }
1547impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1548    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1549
1550    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1551        self.get_xmin().write_thrift_field(writer, 1, 0)?;
1552        self.get_xmax().write_thrift_field(writer, 2, 1)?;
1553        self.get_ymin().write_thrift_field(writer, 3, 2)?;
1554        let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1555
1556        if let Some(zmin) = self.get_zmin() {
1557            last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1558        }
1559        if let Some(zmax) = self.get_zmax() {
1560            last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1561        }
1562        if let Some(mmin) = self.get_mmin() {
1563            last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1564        }
1565        if let Some(mmax) = self.get_mmax() {
1566            mmax.write_thrift_field(writer, 8, last_field_id)?;
1567        }
1568
1569        writer.write_struct_end()
1570    }
1571}
1572
1573impl WriteThriftField for crate::geospatial::bounding_box::BoundingBox {
1574    fn write_thrift_field<W: Write>(
1575        &self,
1576        writer: &mut ThriftCompactOutputProtocol<W>,
1577        field_id: i16,
1578        last_field_id: i16,
1579    ) -> Result<i16> {
1580        writer.write_field_begin(FieldType::Struct, field_id, last_field_id)?;
1581        self.write_thrift(writer)?;
1582        Ok(field_id)
1583    }
1584}
1585
1586#[cfg(test)]
1587pub(crate) mod tests {
1588    use crate::errors::Result;
1589    use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema};
1590    use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1591    use crate::parquet_thrift::tests::test_roundtrip;
1592    use crate::parquet_thrift::{
1593        ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec,
1594    };
1595    use crate::schema::types::{
1596        ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
1597    };
1598    use std::sync::Arc;
1599
1600    // for testing. decode thrift encoded RowGroup
1601    pub(crate) fn read_row_group(
1602        buf: &mut [u8],
1603        schema_descr: Arc<SchemaDescriptor>,
1604    ) -> Result<RowGroupMetaData> {
1605        let mut reader = ThriftSliceInputProtocol::new(buf);
1606        crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr)
1607    }
1608
1609    pub(crate) fn read_column_chunk(
1610        buf: &mut [u8],
1611        column_descr: Arc<ColumnDescriptor>,
1612    ) -> Result<ColumnChunkMetaData> {
1613        let mut reader = ThriftSliceInputProtocol::new(buf);
1614        crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr)
1615    }
1616
1617    pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1618        let num_nodes = num_nodes(&schema)?;
1619        let mut buf = Vec::new();
1620        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1621
1622        // kick off writing list
1623        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1624
1625        // write SchemaElements
1626        write_schema(&schema, &mut writer)?;
1627
1628        let mut prot = ThriftSliceInputProtocol::new(&buf);
1629        let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1630        parquet_schema_from_array(se)
1631    }
1632
1633    pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1634        let num_nodes = num_nodes(schema)?;
1635        let mut buf = Vec::new();
1636        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1637
1638        // kick off writing list
1639        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1640
1641        // write SchemaElements
1642        write_schema(schema, &mut writer)?;
1643        Ok(buf)
1644    }
1645
1646    pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1647        let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1648        read_thrift_vec(&mut prot)
1649    }
1650
1651    #[test]
1652    fn test_bounding_box_roundtrip() {
1653        test_roundtrip(BoundingBox {
1654            xmin: 0.1.into(),
1655            xmax: 10.3.into(),
1656            ymin: 0.001.into(),
1657            ymax: 128.5.into(),
1658            zmin: None,
1659            zmax: None,
1660            mmin: None,
1661            mmax: None,
1662        });
1663
1664        test_roundtrip(BoundingBox {
1665            xmin: 0.1.into(),
1666            xmax: 10.3.into(),
1667            ymin: 0.001.into(),
1668            ymax: 128.5.into(),
1669            zmin: Some(11.0.into()),
1670            zmax: Some(1300.0.into()),
1671            mmin: None,
1672            mmax: None,
1673        });
1674
1675        test_roundtrip(BoundingBox {
1676            xmin: 0.1.into(),
1677            xmax: 10.3.into(),
1678            ymin: 0.001.into(),
1679            ymax: 128.5.into(),
1680            zmin: Some(11.0.into()),
1681            zmax: Some(1300.0.into()),
1682            mmin: Some(3.7.into()),
1683            mmax: Some(42.0.into()),
1684        });
1685    }
1686}