parquet/file/metadata/thrift/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module is the bridge between a Parquet file's thrift encoded metadata
19//! and this crate's [Parquet metadata API]. It contains objects and functions used
20//! to serialize/deserialize metadata objects into/from the Thrift compact protocol
21//! format as defined by the [Parquet specification].
22//!
23//! [Parquet metadata API]: crate::file::metadata
24//! [Parquet specification]: https://github.com/apache/parquet-format/tree/master
25
26use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34    column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37    basic::{
38        ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType,
39        Repetition, Type,
40    },
41    data_type::{ByteArray, FixedLenByteArray, Int96},
42    errors::{ParquetError, Result},
43    file::{
44        metadata::{
45            ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46            PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, RowGroupMetaData,
47            RowGroupMetaDataBuilder, SortingColumn,
48        },
49        statistics::ValueStatistics,
50    },
51    parquet_thrift::{
52        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54        read_thrift_vec,
55    },
56    schema::types::{
57        ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58    },
59    thrift_struct,
60    util::bit_util::FromBytes,
61    write_thrift_field,
62};
63
64// this needs to be visible to the schema conversion code
65thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67  /// Data type for this field. Not set if the current element is a non-leaf node
68  1: optional Type r#type;
69  /// If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values.
70  /// Otherwise, if specified, this is the maximum bit length to store any of the values.
71  /// (e.g. a low cardinality INT col could have this set to 3).  Note that this is
72  /// in the schema, and therefore fixed for the entire file.
73  2: optional i32 type_length;
74  /// Repetition of the field. The root of the schema does not have a repetition_type.
75  /// All other nodes must have one.
76  3: optional Repetition repetition_type;
77  /// Name of the field in the schema
78  4: required string<'a> name;
79  /// Nested fields. Since thrift does not support nested fields,
80  /// the nesting is flattened to a single list by a depth-first traversal.
81  /// The children count is used to construct the nested relationship.
82  /// This field is not set when the element is a primitive type.
83  5: optional i32 num_children;
84  /// DEPRECATED: When the schema is the result of a conversion from another model.
85  /// Used to record the original type to help with cross conversion.
86  ///
87  /// This is superseded by logical_type.
88  6: optional ConvertedType converted_type;
89  /// DEPRECATED: Used when this column contains decimal data.
90  /// See the DECIMAL converted type for more details.
91  ///
92  /// This is superseded by using the DecimalType annotation in logical_type.
93  7: optional i32 scale
94  8: optional i32 precision
95  /// When the original schema supports field ids, this will save the
96  /// original field id in the parquet schema
97  9: optional i32 field_id;
98  /// The logical type of this SchemaElement
99  ///
100  /// LogicalType replaces ConvertedType, but ConvertedType is still required
101  /// for some logical types to ensure forward-compatibility in format v1.
102  10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108   1: optional binary<'a> max;
109   2: optional binary<'a> min;
110   3: optional i64 null_count;
111   4: optional i64 distinct_count;
112   5: optional binary<'a> max_value;
113   6: optional binary<'a> min_value;
114   7: optional bool is_max_value_exact;
115   8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121  1: required double xmin;
122  2: required double xmax;
123  3: required double ymin;
124  4: required double ymax;
125  5: optional double zmin;
126  6: optional double zmax;
127  7: optional double mmin;
128  8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134  1: optional BoundingBox bbox;
135  2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141   1: optional i64 unencoded_byte_array_data_bytes;
142   2: optional list<i64> repetition_level_histogram;
143   3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148    stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150    stats.map(|st| {
151        let bbox = convert_bounding_box(st.bbox);
152        let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153        Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154            bbox,
155            geospatial_types,
156        ))
157    })
158}
159
160fn convert_bounding_box(
161    bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163    bbox.map(|bb| {
164        let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165            bb.xmin.into(),
166            bb.xmax.into(),
167            bb.ymin.into(),
168            bb.ymax.into(),
169        );
170
171        newbb = match (bb.zmin, bb.zmax) {
172            (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173            // If either None or mismatch, leave it as None and don't error
174            _ => newbb,
175        };
176
177        newbb = match (bb.mmin, bb.mmax) {
178            (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179            // If either None or mismatch, leave it as None and don't error
180            _ => newbb,
181        };
182
183        newbb
184    })
185}
186
187/// Create a [`crate::file::statistics::Statistics`] from a thrift [`Statistics`] object.
188fn convert_stats(
189    column_descr: &Arc<ColumnDescriptor>,
190    thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192    use crate::file::statistics::Statistics as FStatistics;
193    Ok(match thrift_stats {
194        Some(stats) => {
195            // Number of nulls recorded, when it is not available, we just mark it as 0.
196            // TODO this should be `None` if there is no information about NULLS.
197            // see https://github.com/apache/arrow-rs/pull/6216/files
198            let null_count = stats.null_count.unwrap_or(0);
199
200            if null_count < 0 {
201                return Err(general_err!(
202                    "Statistics null count is negative {}",
203                    null_count
204                ));
205            }
206
207            // Generic null count.
208            let null_count = Some(null_count as u64);
209            // Generic distinct count (count of distinct values occurring)
210            let distinct_count = stats.distinct_count.map(|value| value as u64);
211            // Whether or not statistics use deprecated min/max fields.
212            let old_format = stats.min_value.is_none() && stats.max_value.is_none();
213            // Generic min value as bytes.
214            let min = if old_format {
215                stats.min
216            } else {
217                stats.min_value
218            };
219            // Generic max value as bytes.
220            let max = if old_format {
221                stats.max
222            } else {
223                stats.max_value
224            };
225
226            fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
227                if let Some(min) = min {
228                    if min.len() < len {
229                        return Err(general_err!("Insufficient bytes to parse min statistic",));
230                    }
231                }
232                if let Some(max) = max {
233                    if max.len() < len {
234                        return Err(general_err!("Insufficient bytes to parse max statistic",));
235                    }
236                }
237                Ok(())
238            }
239
240            let physical_type = column_descr.physical_type();
241            match physical_type {
242                Type::BOOLEAN => check_len(&min, &max, 1),
243                Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
244                Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
245                Type::INT96 => check_len(&min, &max, 12),
246                _ => Ok(()),
247            }?;
248
249            // Values are encoded using PLAIN encoding definition, except that
250            // variable-length byte arrays do not include a length prefix.
251            //
252            // Instead of using actual decoder, we manually convert values.
253            let res = match physical_type {
254                Type::BOOLEAN => FStatistics::boolean(
255                    min.map(|data| data[0] != 0),
256                    max.map(|data| data[0] != 0),
257                    distinct_count,
258                    null_count,
259                    old_format,
260                ),
261                Type::INT32 => FStatistics::int32(
262                    min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263                    max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
264                    distinct_count,
265                    null_count,
266                    old_format,
267                ),
268                Type::INT64 => FStatistics::int64(
269                    min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270                    max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
271                    distinct_count,
272                    null_count,
273                    old_format,
274                ),
275                Type::INT96 => {
276                    // INT96 statistics may not be correct, because comparison is signed
277                    let min = if let Some(data) = min {
278                        assert_eq!(data.len(), 12);
279                        Some(Int96::try_from_le_slice(data)?)
280                    } else {
281                        None
282                    };
283                    let max = if let Some(data) = max {
284                        assert_eq!(data.len(), 12);
285                        Some(Int96::try_from_le_slice(data)?)
286                    } else {
287                        None
288                    };
289                    FStatistics::int96(min, max, distinct_count, null_count, old_format)
290                }
291                Type::FLOAT => FStatistics::float(
292                    min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293                    max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
294                    distinct_count,
295                    null_count,
296                    old_format,
297                ),
298                Type::DOUBLE => FStatistics::double(
299                    min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300                    max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
301                    distinct_count,
302                    null_count,
303                    old_format,
304                ),
305                Type::BYTE_ARRAY => FStatistics::ByteArray(
306                    ValueStatistics::new(
307                        min.map(ByteArray::from),
308                        max.map(ByteArray::from),
309                        distinct_count,
310                        null_count,
311                        old_format,
312                    )
313                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
314                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
315                ),
316                Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
317                    ValueStatistics::new(
318                        min.map(ByteArray::from).map(FixedLenByteArray::from),
319                        max.map(ByteArray::from).map(FixedLenByteArray::from),
320                        distinct_count,
321                        null_count,
322                        old_format,
323                    )
324                    .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
325                    .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
326                ),
327            };
328
329            Some(res)
330        }
331        None => None,
332    })
333}
334
335// bit positions for required fields in the Thrift ColumnMetaData struct
336const COL_META_TYPE: u16 = 1 << 1;
337const COL_META_ENCODINGS: u16 = 1 << 2;
338const COL_META_CODEC: u16 = 1 << 4;
339const COL_META_NUM_VALUES: u16 = 1 << 5;
340const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
341const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
342const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
343
344// a mask where all required fields' bits are set
345const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
346    | COL_META_ENCODINGS
347    | COL_META_CODEC
348    | COL_META_NUM_VALUES
349    | COL_META_TOTAL_UNCOMP_SZ
350    | COL_META_TOTAL_COMP_SZ
351    | COL_META_DATA_PAGE_OFFSET;
352
353// check mask to see if all required fields are set. return an appropriate error if
354// any are missing.
355fn validate_column_metadata(mask: u16) -> Result<()> {
356    if mask != COL_META_ALL_REQUIRED {
357        if mask & COL_META_ENCODINGS == 0 {
358            return Err(general_err!("Required field encodings is missing"));
359        }
360
361        if mask & COL_META_CODEC == 0 {
362            return Err(general_err!("Required field codec is missing"));
363        }
364        if mask & COL_META_NUM_VALUES == 0 {
365            return Err(general_err!("Required field num_values is missing"));
366        }
367        if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
368            return Err(general_err!(
369                "Required field total_uncompressed_size is missing"
370            ));
371        }
372        if mask & COL_META_TOTAL_COMP_SZ == 0 {
373            return Err(general_err!(
374                "Required field total_compressed_size is missing"
375            ));
376        }
377        if mask & COL_META_DATA_PAGE_OFFSET == 0 {
378            return Err(general_err!("Required field data_page_offset is missing"));
379        }
380    }
381
382    Ok(())
383}
384
385// Decode `ColumnMetaData`. Returns a mask of all required fields that were observed.
386// This mask can be passed to `validate_column_metadata`.
387fn read_column_metadata<'a>(
388    prot: &mut ThriftSliceInputProtocol<'a>,
389    column: &mut ColumnChunkMetaData,
390) -> Result<u16> {
391    // mask for seen required fields in ColumnMetaData
392    let mut seen_mask = 0u16;
393
394    // struct ColumnMetaData {
395    //   1: required Type type
396    //   2: required list<Encoding> encodings
397    //   3: required list<string> path_in_schema
398    //   4: required CompressionCodec codec
399    //   5: required i64 num_values
400    //   6: required i64 total_uncompressed_size
401    //   7: required i64 total_compressed_size
402    //   8: optional list<KeyValue> key_value_metadata
403    //   9: required i64 data_page_offset
404    //   10: optional i64 index_page_offset
405    //   11: optional i64 dictionary_page_offset
406    //   12: optional Statistics statistics;
407    //   13: optional list<PageEncodingStats> encoding_stats;
408    //   14: optional i64 bloom_filter_offset;
409    //   15: optional i32 bloom_filter_length;
410    //   16: optional SizeStatistics size_statistics;
411    //   17: optional GeospatialStatistics geospatial_statistics;
412    // }
413    let column_descr = &column.column_descr;
414
415    let mut last_field_id = 0i16;
416    loop {
417        let field_ident = prot.read_field_begin(last_field_id)?;
418        if field_ident.field_type == FieldType::Stop {
419            break;
420        }
421        match field_ident.id {
422            // 1: type is never used, we can use the column descriptor
423            1 => {
424                // read for error handling
425                Type::read_thrift(&mut *prot)?;
426                seen_mask |= COL_META_TYPE;
427            }
428            2 => {
429                column.encodings = EncodingMask::read_thrift(&mut *prot)?;
430                seen_mask |= COL_META_ENCODINGS;
431            }
432            // 3: path_in_schema is redundant
433            4 => {
434                column.compression = Compression::read_thrift(&mut *prot)?;
435                seen_mask |= COL_META_CODEC;
436            }
437            5 => {
438                column.num_values = i64::read_thrift(&mut *prot)?;
439                seen_mask |= COL_META_NUM_VALUES;
440            }
441            6 => {
442                column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
443                seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
444            }
445            7 => {
446                column.total_compressed_size = i64::read_thrift(&mut *prot)?;
447                seen_mask |= COL_META_TOTAL_COMP_SZ;
448            }
449            // 8: we don't expose this key value
450            9 => {
451                column.data_page_offset = i64::read_thrift(&mut *prot)?;
452                seen_mask |= COL_META_DATA_PAGE_OFFSET;
453            }
454            10 => {
455                column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
456            }
457            11 => {
458                column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
459            }
460            12 => {
461                column.statistics =
462                    convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
463            }
464            13 => {
465                let val =
466                    read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
467                column.encoding_stats = Some(val);
468            }
469            14 => {
470                column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
471            }
472            15 => {
473                column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
474            }
475            16 => {
476                let val = SizeStatistics::read_thrift(&mut *prot)?;
477                column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
478                column.repetition_level_histogram =
479                    val.repetition_level_histogram.map(LevelHistogram::from);
480                column.definition_level_histogram =
481                    val.definition_level_histogram.map(LevelHistogram::from);
482            }
483            17 => {
484                let val = GeospatialStatistics::read_thrift(&mut *prot)?;
485                column.geo_statistics = convert_geo_stats(Some(val));
486            }
487            _ => {
488                prot.skip(field_ident.field_type)?;
489            }
490        };
491        last_field_id = field_ident.id;
492    }
493
494    Ok(seen_mask)
495}
496
497// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait because
498// these are all internal and operate on slices.
499fn read_column_chunk<'a>(
500    prot: &mut ThriftSliceInputProtocol<'a>,
501    column_descr: &Arc<ColumnDescriptor>,
502) -> Result<ColumnChunkMetaData> {
503    // create a default initialized ColumnMetaData
504    let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
505
506    // seen flag for file_offset
507    let mut has_file_offset = false;
508
509    // mask of seen flags for ColumnMetaData
510    let mut col_meta_mask = 0u16;
511
512    // struct ColumnChunk {
513    //   1: optional string file_path
514    //   2: required i64 file_offset = 0
515    //   3: optional ColumnMetaData meta_data
516    //   4: optional i64 offset_index_offset
517    //   5: optional i32 offset_index_length
518    //   6: optional i64 column_index_offset
519    //   7: optional i32 column_index_length
520    //   8: optional ColumnCryptoMetaData crypto_metadata
521    //   9: optional binary encrypted_column_metadata
522    // }
523    let mut last_field_id = 0i16;
524    loop {
525        let field_ident = prot.read_field_begin(last_field_id)?;
526        if field_ident.field_type == FieldType::Stop {
527            break;
528        }
529        match field_ident.id {
530            1 => {
531                col.file_path = Some(String::read_thrift(&mut *prot)?);
532            }
533            2 => {
534                col.file_offset = i64::read_thrift(&mut *prot)?;
535                has_file_offset = true;
536            }
537            3 => {
538                col_meta_mask = read_column_metadata(&mut *prot, &mut col)?;
539            }
540            4 => {
541                col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
542            }
543            5 => {
544                col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
545            }
546            6 => {
547                col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
548            }
549            7 => {
550                col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
551            }
552            #[cfg(feature = "encryption")]
553            8 => {
554                let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
555                col.column_crypto_metadata = Some(Box::new(val));
556            }
557            #[cfg(feature = "encryption")]
558            9 => {
559                col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
560            }
561            _ => {
562                prot.skip(field_ident.field_type)?;
563            }
564        };
565        last_field_id = field_ident.id;
566    }
567
568    // the only required field from ColumnChunk
569    if !has_file_offset {
570        return Err(general_err!("Required field file_offset is missing"));
571    };
572
573    // if encrypted just return. we'll decrypt after finishing the footer and populate the rest.
574    #[cfg(feature = "encryption")]
575    if col.encrypted_column_metadata.is_some() {
576        return Ok(col);
577    }
578
579    // not encrypted, so make sure all required fields were read
580    validate_column_metadata(col_meta_mask)?;
581
582    Ok(col)
583}
584
585fn read_row_group(
586    prot: &mut ThriftSliceInputProtocol,
587    schema_descr: &Arc<SchemaDescriptor>,
588) -> Result<RowGroupMetaData> {
589    // create default initialized RowGroupMetaData
590    let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
591
592    // mask values for required fields
593    const RG_COLUMNS: u8 = 1 << 1;
594    const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
595    const RG_NUM_ROWS: u8 = 1 << 3;
596    const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
597
598    let mut mask = 0u8;
599
600    // struct RowGroup {
601    //   1: required list<ColumnChunk> columns
602    //   2: required i64 total_byte_size
603    //   3: required i64 num_rows
604    //   4: optional list<SortingColumn> sorting_columns
605    //   5: optional i64 file_offset
606    //   6: optional i64 total_compressed_size
607    //   7: optional i16 ordinal
608    // }
609    let mut last_field_id = 0i16;
610    loop {
611        let field_ident = prot.read_field_begin(last_field_id)?;
612        if field_ident.field_type == FieldType::Stop {
613            break;
614        }
615        match field_ident.id {
616            1 => {
617                let list_ident = prot.read_list_begin()?;
618                if schema_descr.num_columns() != list_ident.size as usize {
619                    return Err(general_err!(
620                        "Column count mismatch. Schema has {} columns while Row Group has {}",
621                        schema_descr.num_columns(),
622                        list_ident.size
623                    ));
624                }
625                for i in 0..list_ident.size as usize {
626                    let col = read_column_chunk(prot, &schema_descr.columns()[i])?;
627                    row_group.columns.push(col);
628                }
629                mask |= RG_COLUMNS;
630            }
631            2 => {
632                row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
633                mask |= RG_TOT_BYTE_SIZE;
634            }
635            3 => {
636                row_group.num_rows = i64::read_thrift(&mut *prot)?;
637                mask |= RG_NUM_ROWS;
638            }
639            4 => {
640                let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
641                row_group.sorting_columns = Some(val);
642            }
643            5 => {
644                row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
645            }
646            // 6: we don't expose total_compressed_size
647            7 => {
648                row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
649            }
650            _ => {
651                prot.skip(field_ident.field_type)?;
652            }
653        };
654        last_field_id = field_ident.id;
655    }
656
657    if mask != RG_ALL_REQUIRED {
658        if mask & RG_COLUMNS == 0 {
659            return Err(general_err!("Required field columns is missing"));
660        }
661        if mask & RG_TOT_BYTE_SIZE == 0 {
662            return Err(general_err!("Required field total_byte_size is missing"));
663        }
664        if mask & RG_NUM_ROWS == 0 {
665            return Err(general_err!("Required field num_rows is missing"));
666        }
667    }
668
669    Ok(row_group)
670}
671
672/// Create a [`SchemaDescriptor`] from thrift input. The input buffer must contain a complete
673/// Parquet footer.
674pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
675    let mut prot = ThriftSliceInputProtocol::new(buf);
676
677    let mut last_field_id = 0i16;
678    loop {
679        let field_ident = prot.read_field_begin(last_field_id)?;
680        if field_ident.field_type == FieldType::Stop {
681            break;
682        }
683        match field_ident.id {
684            2 => {
685                // read schema and convert to SchemaDescriptor for use when reading row groups
686                let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
687                let val = parquet_schema_from_array(val)?;
688                return Ok(SchemaDescriptor::new(val));
689            }
690            _ => prot.skip(field_ident.field_type)?,
691        }
692        last_field_id = field_ident.id;
693    }
694    Err(general_err!("Input does not contain a schema"))
695}
696
697/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes the file metadata in
698/// the Parquet footer. Page indexes will need to be added later.
699pub(crate) fn parquet_metadata_from_bytes(
700    buf: &[u8],
701    options: Option<&ParquetMetaDataOptions>,
702) -> Result<ParquetMetaData> {
703    let mut prot = ThriftSliceInputProtocol::new(buf);
704
705    // begin reading the file metadata
706    let mut version: Option<i32> = None;
707    let mut num_rows: Option<i64> = None;
708    let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
709    let mut key_value_metadata: Option<Vec<KeyValue>> = None;
710    let mut created_by: Option<&str> = None;
711    let mut column_orders: Option<Vec<ColumnOrder>> = None;
712    #[cfg(feature = "encryption")]
713    let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
714    #[cfg(feature = "encryption")]
715    let mut footer_signing_key_metadata: Option<&[u8]> = None;
716
717    // this will need to be set before parsing row groups
718    let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
719
720    // see if we already have a schema.
721    if let Some(options) = options {
722        schema_descr = options.schema().cloned();
723    }
724
725    // struct FileMetaData {
726    //   1: required i32 version
727    //   2: required list<SchemaElement> schema;
728    //   3: required i64 num_rows
729    //   4: required list<RowGroup> row_groups
730    //   5: optional list<KeyValue> key_value_metadata
731    //   6: optional string created_by
732    //   7: optional list<ColumnOrder> column_orders;
733    //   8: optional EncryptionAlgorithm encryption_algorithm
734    //   9: optional binary footer_signing_key_metadata
735    // }
736    let mut last_field_id = 0i16;
737    loop {
738        let field_ident = prot.read_field_begin(last_field_id)?;
739        if field_ident.field_type == FieldType::Stop {
740            break;
741        }
742        match field_ident.id {
743            1 => {
744                version = Some(i32::read_thrift(&mut prot)?);
745            }
746            2 => {
747                // If schema was passed in, skip parsing it
748                if schema_descr.is_some() {
749                    prot.skip(field_ident.field_type)?;
750                } else {
751                    // read schema and convert to SchemaDescriptor for use when reading row groups
752                    let val =
753                        read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
754                    let val = parquet_schema_from_array(val)?;
755                    schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
756                }
757            }
758            3 => {
759                num_rows = Some(i64::read_thrift(&mut prot)?);
760            }
761            4 => {
762                if schema_descr.is_none() {
763                    return Err(general_err!("Required field schema is missing"));
764                }
765                let schema_descr = schema_descr.as_ref().unwrap();
766                let list_ident = prot.read_list_begin()?;
767                let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
768
769                // Read row groups and handle ordinal assignment
770                let mut assigner = OrdinalAssigner::new();
771                for ordinal in 0..list_ident.size {
772                    let ordinal: i16 = ordinal.try_into().map_err(|_| {
773                        ParquetError::General(format!(
774                            "Row group ordinal {ordinal} exceeds i16 max value",
775                        ))
776                    })?;
777                    let rg = read_row_group(&mut prot, schema_descr)?;
778                    rg_vec.push(assigner.ensure(ordinal, rg)?);
779                }
780                row_groups = Some(rg_vec);
781            }
782            5 => {
783                let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
784                key_value_metadata = Some(val);
785            }
786            6 => {
787                created_by = Some(<&str>::read_thrift(&mut prot)?);
788            }
789            7 => {
790                let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
791                column_orders = Some(val);
792            }
793            #[cfg(feature = "encryption")]
794            8 => {
795                let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
796                encryption_algorithm = Some(val);
797            }
798            #[cfg(feature = "encryption")]
799            9 => {
800                footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
801            }
802            _ => {
803                prot.skip(field_ident.field_type)?;
804            }
805        };
806        last_field_id = field_ident.id;
807    }
808    let Some(version) = version else {
809        return Err(general_err!("Required field version is missing"));
810    };
811    let Some(num_rows) = num_rows else {
812        return Err(general_err!("Required field num_rows is missing"));
813    };
814    let Some(row_groups) = row_groups else {
815        return Err(general_err!("Required field row_groups is missing"));
816    };
817
818    let created_by = created_by.map(|c| c.to_owned());
819
820    // we've tested for `None` by now so this is safe
821    let schema_descr = schema_descr.unwrap();
822
823    // need to map read column orders to actual values based on the schema
824    if column_orders
825        .as_ref()
826        .is_some_and(|cos| cos.len() != schema_descr.num_columns())
827    {
828        return Err(general_err!("Column order length mismatch"));
829    }
830    // replace default type defined column orders with ones having the correct sort order
831    // TODO(ets): this could instead be done above when decoding
832    let column_orders = column_orders.map(|mut cos| {
833        for (i, column) in schema_descr.columns().iter().enumerate() {
834            if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
835                let sort_order = ColumnOrder::sort_order_for_type(
836                    column.logical_type_ref(),
837                    column.converted_type(),
838                    column.physical_type(),
839                );
840                cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
841            }
842        }
843        cos
844    });
845
846    #[cfg(not(feature = "encryption"))]
847    let fmd = crate::file::metadata::FileMetaData::new(
848        version,
849        num_rows,
850        created_by,
851        key_value_metadata,
852        schema_descr,
853        column_orders,
854    );
855    #[cfg(feature = "encryption")]
856    let fmd = crate::file::metadata::FileMetaData::new(
857        version,
858        num_rows,
859        created_by,
860        key_value_metadata,
861        schema_descr,
862        column_orders,
863    )
864    .with_encryption_algorithm(encryption_algorithm)
865    .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
866
867    Ok(ParquetMetaData::new(fmd, row_groups))
868}
869
870/// Assign [`RowGroupMetaData::ordinal`]  if it is missing.
871#[derive(Debug, Default)]
872pub(crate) struct OrdinalAssigner {
873    first_has_ordinal: Option<bool>,
874}
875
876impl OrdinalAssigner {
877    fn new() -> Self {
878        Default::default()
879    }
880
881    /// Sets [`RowGroupMetaData::ordinal`] if it is missing.
882    ///
883    /// # Arguments
884    /// - actual_ordinal: The ordinal (index) of the row group being processed
885    ///   in the file metadata.
886    /// - rg: The [`RowGroupMetaData`] to potentially modify.
887    ///
888    /// Ensures:
889    /// 1. If the first row group has an ordinal, all subsequent row groups must
890    ///    also have ordinals.
891    /// 2. If the first row group does NOT have an ordinal, all subsequent row
892    ///    groups must also not have ordinals.
893    fn ensure(
894        &mut self,
895        actual_ordinal: i16,
896        mut rg: RowGroupMetaData,
897    ) -> Result<RowGroupMetaData> {
898        let rg_has_ordinal = rg.ordinal.is_some();
899
900        // Only set first_has_ordinal if it's None (first row group that arrives)
901        if self.first_has_ordinal.is_none() {
902            self.first_has_ordinal = Some(rg_has_ordinal);
903        }
904
905        // assign ordinal if missing and consistent with first row group
906        let first_has_ordinal = self.first_has_ordinal.unwrap();
907        if !first_has_ordinal && !rg_has_ordinal {
908            rg.ordinal = Some(actual_ordinal);
909        } else if first_has_ordinal != rg_has_ordinal {
910            return Err(general_err!(
911                "Inconsistent ordinal assignment: first_has_ordinal is set to \
912                {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
913                first_has_ordinal,
914                actual_ordinal,
915                rg_has_ordinal
916            ));
917        }
918        Ok(rg)
919    }
920}
921
922thrift_struct!(
923    pub(crate) struct IndexPageHeader {}
924);
925
926thrift_struct!(
927pub(crate) struct DictionaryPageHeader {
928  /// Number of values in the dictionary
929  1: required i32 num_values;
930
931  /// Encoding using this dictionary page
932  2: required Encoding encoding
933
934  /// If true, the entries in the dictionary are sorted in ascending order
935  3: optional bool is_sorted;
936}
937);
938
939thrift_struct!(
940/// Statistics for the page header.
941///
942/// This is a duplicate of the [`Statistics`] struct above. Because the page reader uses
943/// the [`Read`] API, we cannot read the min/max values as slices. This should not be
944/// a huge problem since this crate no longer reads the page header statistics by default.
945///
946/// [`Read`]: crate::parquet_thrift::ThriftReadInputProtocol
947pub(crate) struct PageStatistics {
948   1: optional binary max;
949   2: optional binary min;
950   3: optional i64 null_count;
951   4: optional i64 distinct_count;
952   5: optional binary max_value;
953   6: optional binary min_value;
954   7: optional bool is_max_value_exact;
955   8: optional bool is_min_value_exact;
956}
957);
958
959thrift_struct!(
960pub(crate) struct DataPageHeader {
961  1: required i32 num_values
962  2: required Encoding encoding
963  3: required Encoding definition_level_encoding;
964  4: required Encoding repetition_level_encoding;
965  5: optional PageStatistics statistics;
966}
967);
968
969impl DataPageHeader {
970    // reader that skips decoding page statistics
971    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
972    where
973        R: ThriftCompactInputProtocol<'a>,
974    {
975        let mut num_values: Option<i32> = None;
976        let mut encoding: Option<Encoding> = None;
977        let mut definition_level_encoding: Option<Encoding> = None;
978        let mut repetition_level_encoding: Option<Encoding> = None;
979        let statistics: Option<PageStatistics> = None;
980        let mut last_field_id = 0i16;
981        loop {
982            let field_ident = prot.read_field_begin(last_field_id)?;
983            if field_ident.field_type == FieldType::Stop {
984                break;
985            }
986            match field_ident.id {
987                1 => {
988                    let val = i32::read_thrift(&mut *prot)?;
989                    num_values = Some(val);
990                }
991                2 => {
992                    let val = Encoding::read_thrift(&mut *prot)?;
993                    encoding = Some(val);
994                }
995                3 => {
996                    let val = Encoding::read_thrift(&mut *prot)?;
997                    definition_level_encoding = Some(val);
998                }
999                4 => {
1000                    let val = Encoding::read_thrift(&mut *prot)?;
1001                    repetition_level_encoding = Some(val);
1002                }
1003                _ => {
1004                    prot.skip(field_ident.field_type)?;
1005                }
1006            };
1007            last_field_id = field_ident.id;
1008        }
1009        let Some(num_values) = num_values else {
1010            return Err(general_err!("Required field num_values is missing"));
1011        };
1012        let Some(encoding) = encoding else {
1013            return Err(general_err!("Required field encoding is missing"));
1014        };
1015        let Some(definition_level_encoding) = definition_level_encoding else {
1016            return Err(general_err!(
1017                "Required field definition_level_encoding is missing"
1018            ));
1019        };
1020        let Some(repetition_level_encoding) = repetition_level_encoding else {
1021            return Err(general_err!(
1022                "Required field repetition_level_encoding is missing"
1023            ));
1024        };
1025        Ok(Self {
1026            num_values,
1027            encoding,
1028            definition_level_encoding,
1029            repetition_level_encoding,
1030            statistics,
1031        })
1032    }
1033}
1034
1035thrift_struct!(
1036pub(crate) struct DataPageHeaderV2 {
1037  1: required i32 num_values
1038  2: required i32 num_nulls
1039  3: required i32 num_rows
1040  4: required Encoding encoding
1041  5: required i32 definition_levels_byte_length;
1042  6: required i32 repetition_levels_byte_length;
1043  7: optional bool is_compressed = true;
1044  8: optional PageStatistics statistics;
1045}
1046);
1047
1048impl DataPageHeaderV2 {
1049    // reader that skips decoding page statistics
1050    fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1051    where
1052        R: ThriftCompactInputProtocol<'a>,
1053    {
1054        let mut num_values: Option<i32> = None;
1055        let mut num_nulls: Option<i32> = None;
1056        let mut num_rows: Option<i32> = None;
1057        let mut encoding: Option<Encoding> = None;
1058        let mut definition_levels_byte_length: Option<i32> = None;
1059        let mut repetition_levels_byte_length: Option<i32> = None;
1060        let mut is_compressed: Option<bool> = None;
1061        let statistics: Option<PageStatistics> = None;
1062        let mut last_field_id = 0i16;
1063        loop {
1064            let field_ident = prot.read_field_begin(last_field_id)?;
1065            if field_ident.field_type == FieldType::Stop {
1066                break;
1067            }
1068            match field_ident.id {
1069                1 => {
1070                    let val = i32::read_thrift(&mut *prot)?;
1071                    num_values = Some(val);
1072                }
1073                2 => {
1074                    let val = i32::read_thrift(&mut *prot)?;
1075                    num_nulls = Some(val);
1076                }
1077                3 => {
1078                    let val = i32::read_thrift(&mut *prot)?;
1079                    num_rows = Some(val);
1080                }
1081                4 => {
1082                    let val = Encoding::read_thrift(&mut *prot)?;
1083                    encoding = Some(val);
1084                }
1085                5 => {
1086                    let val = i32::read_thrift(&mut *prot)?;
1087                    definition_levels_byte_length = Some(val);
1088                }
1089                6 => {
1090                    let val = i32::read_thrift(&mut *prot)?;
1091                    repetition_levels_byte_length = Some(val);
1092                }
1093                7 => {
1094                    let val = field_ident.bool_val.unwrap();
1095                    is_compressed = Some(val);
1096                }
1097                _ => {
1098                    prot.skip(field_ident.field_type)?;
1099                }
1100            };
1101            last_field_id = field_ident.id;
1102        }
1103        let Some(num_values) = num_values else {
1104            return Err(general_err!("Required field num_values is missing"));
1105        };
1106        let Some(num_nulls) = num_nulls else {
1107            return Err(general_err!("Required field num_nulls is missing"));
1108        };
1109        let Some(num_rows) = num_rows else {
1110            return Err(general_err!("Required field num_rows is missing"));
1111        };
1112        let Some(encoding) = encoding else {
1113            return Err(general_err!("Required field encoding is missing"));
1114        };
1115        let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1116            return Err(general_err!(
1117                "Required field definition_levels_byte_length is missing"
1118            ));
1119        };
1120        let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1121            return Err(general_err!(
1122                "Required field repetition_levels_byte_length is missing"
1123            ));
1124        };
1125        Ok(Self {
1126            num_values,
1127            num_nulls,
1128            num_rows,
1129            encoding,
1130            definition_levels_byte_length,
1131            repetition_levels_byte_length,
1132            is_compressed,
1133            statistics,
1134        })
1135    }
1136}
1137
1138thrift_struct!(
1139pub(crate) struct PageHeader {
1140  /// the type of the page: indicates which of the *_header fields is set
1141  1: required PageType r#type
1142
1143  /// Uncompressed page size in bytes (not including this header)
1144  2: required i32 uncompressed_page_size
1145
1146  /// Compressed (and potentially encrypted) page size in bytes, not including this header
1147  3: required i32 compressed_page_size
1148
1149  /// The 32-bit CRC checksum for the page, to be be calculated as follows:
1150  4: optional i32 crc
1151
1152  // Headers for page specific data.  One only will be set.
1153  5: optional DataPageHeader data_page_header;
1154  6: optional IndexPageHeader index_page_header;
1155  7: optional DictionaryPageHeader dictionary_page_header;
1156  8: optional DataPageHeaderV2 data_page_header_v2;
1157}
1158);
1159
1160impl PageHeader {
1161    // reader that skips reading page statistics. obtained by running
1162    // `cargo expand -p parquet --all-features --lib file::metadata::thrift`
1163    // and modifying the impl of `read_thrift`
1164    pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1165    where
1166        R: ThriftCompactInputProtocol<'a>,
1167    {
1168        let mut type_: Option<PageType> = None;
1169        let mut uncompressed_page_size: Option<i32> = None;
1170        let mut compressed_page_size: Option<i32> = None;
1171        let mut crc: Option<i32> = None;
1172        let mut data_page_header: Option<DataPageHeader> = None;
1173        let mut index_page_header: Option<IndexPageHeader> = None;
1174        let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1175        let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1176        let mut last_field_id = 0i16;
1177        loop {
1178            let field_ident = prot.read_field_begin(last_field_id)?;
1179            if field_ident.field_type == FieldType::Stop {
1180                break;
1181            }
1182            match field_ident.id {
1183                1 => {
1184                    let val = PageType::read_thrift(&mut *prot)?;
1185                    type_ = Some(val);
1186                }
1187                2 => {
1188                    let val = i32::read_thrift(&mut *prot)?;
1189                    uncompressed_page_size = Some(val);
1190                }
1191                3 => {
1192                    let val = i32::read_thrift(&mut *prot)?;
1193                    compressed_page_size = Some(val);
1194                }
1195                4 => {
1196                    let val = i32::read_thrift(&mut *prot)?;
1197                    crc = Some(val);
1198                }
1199                5 => {
1200                    let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1201                    data_page_header = Some(val);
1202                }
1203                6 => {
1204                    let val = IndexPageHeader::read_thrift(&mut *prot)?;
1205                    index_page_header = Some(val);
1206                }
1207                7 => {
1208                    let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1209                    dictionary_page_header = Some(val);
1210                }
1211                8 => {
1212                    let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1213                    data_page_header_v2 = Some(val);
1214                }
1215                _ => {
1216                    prot.skip(field_ident.field_type)?;
1217                }
1218            };
1219            last_field_id = field_ident.id;
1220        }
1221        let Some(type_) = type_ else {
1222            return Err(general_err!("Required field type_ is missing"));
1223        };
1224        let Some(uncompressed_page_size) = uncompressed_page_size else {
1225            return Err(general_err!(
1226                "Required field uncompressed_page_size is missing"
1227            ));
1228        };
1229        let Some(compressed_page_size) = compressed_page_size else {
1230            return Err(general_err!(
1231                "Required field compressed_page_size is missing"
1232            ));
1233        };
1234        Ok(Self {
1235            r#type: type_,
1236            uncompressed_page_size,
1237            compressed_page_size,
1238            crc,
1239            data_page_header,
1240            index_page_header,
1241            dictionary_page_header,
1242            data_page_header_v2,
1243        })
1244    }
1245}
1246
1247/////////////////////////////////////////////////
1248// helper functions for writing file meta data
1249
1250// serialize the bits of the column chunk needed for a thrift ColumnMetaData
1251// struct ColumnMetaData {
1252//   1: required Type type
1253//   2: required list<Encoding> encodings
1254//   3: required list<string> path_in_schema
1255//   4: required CompressionCodec codec
1256//   5: required i64 num_values
1257//   6: required i64 total_uncompressed_size
1258//   7: required i64 total_compressed_size
1259//   8: optional list<KeyValue> key_value_metadata
1260//   9: required i64 data_page_offset
1261//   10: optional i64 index_page_offset
1262//   11: optional i64 dictionary_page_offset
1263//   12: optional Statistics statistics;
1264//   13: optional list<PageEncodingStats> encoding_stats;
1265//   14: optional i64 bloom_filter_offset;
1266//   15: optional i32 bloom_filter_length;
1267//   16: optional SizeStatistics size_statistics;
1268//   17: optional GeospatialStatistics geospatial_statistics;
1269// }
1270pub(super) fn serialize_column_meta_data<W: Write>(
1271    column_chunk: &ColumnChunkMetaData,
1272    w: &mut ThriftCompactOutputProtocol<W>,
1273) -> Result<()> {
1274    use crate::file::statistics::page_stats_to_thrift;
1275
1276    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1277    column_chunk
1278        .encodings()
1279        .collect::<Vec<_>>()
1280        .write_thrift_field(w, 2, 1)?;
1281    let path = column_chunk.column_descr.path().parts();
1282    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1283    path.write_thrift_field(w, 3, 2)?;
1284    column_chunk.compression.write_thrift_field(w, 4, 3)?;
1285    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1286    column_chunk
1287        .total_uncompressed_size
1288        .write_thrift_field(w, 6, 5)?;
1289    column_chunk
1290        .total_compressed_size
1291        .write_thrift_field(w, 7, 6)?;
1292    // no key_value_metadata here
1293    let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1294    if let Some(index_page_offset) = column_chunk.index_page_offset {
1295        last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1296    }
1297    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1298        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1299    }
1300    // PageStatistics is the same as thrift Statistics, but writable
1301    let stats = page_stats_to_thrift(column_chunk.statistics());
1302    if let Some(stats) = stats {
1303        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1304    }
1305    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1306        last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1307    }
1308    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1309        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1310    }
1311    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1312        last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1313    }
1314
1315    // SizeStatistics
1316    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1317        || column_chunk.repetition_level_histogram.is_some()
1318        || column_chunk.definition_level_histogram.is_some()
1319    {
1320        let repetition_level_histogram = column_chunk
1321            .repetition_level_histogram()
1322            .map(|hist| hist.clone().into_inner());
1323
1324        let definition_level_histogram = column_chunk
1325            .definition_level_histogram()
1326            .map(|hist| hist.clone().into_inner());
1327
1328        Some(SizeStatistics {
1329            unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1330            repetition_level_histogram,
1331            definition_level_histogram,
1332        })
1333    } else {
1334        None
1335    };
1336    if let Some(size_stats) = size_stats {
1337        last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1338    }
1339
1340    if let Some(geo_stats) = column_chunk.geo_statistics() {
1341        geo_stats.write_thrift_field(w, 17, last_field_id)?;
1342    }
1343
1344    w.write_struct_end()
1345}
1346
1347// temp struct used for writing
1348pub(super) struct FileMeta<'a> {
1349    pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1350    pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1351}
1352
1353// struct FileMetaData {
1354//   1: required i32 version
1355//   2: required list<SchemaElement> schema;
1356//   3: required i64 num_rows
1357//   4: required list<RowGroup> row_groups
1358//   5: optional list<KeyValue> key_value_metadata
1359//   6: optional string created_by
1360//   7: optional list<ColumnOrder> column_orders;
1361//   8: optional EncryptionAlgorithm encryption_algorithm
1362//   9: optional binary footer_signing_key_metadata
1363// }
1364impl<'a> WriteThrift for FileMeta<'a> {
1365    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1366
1367    // needed for last_field_id w/o encryption
1368    #[allow(unused_assignments)]
1369    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1370        self.file_metadata
1371            .version
1372            .write_thrift_field(writer, 1, 0)?;
1373
1374        // field 2 is schema. do depth-first traversal of tree, converting to SchemaElement and
1375        // writing along the way.
1376        let root = self.file_metadata.schema_descr().root_schema_ptr();
1377        let schema_len = num_nodes(&root)?;
1378        writer.write_field_begin(FieldType::List, 2, 1)?;
1379        writer.write_list_begin(ElementType::Struct, schema_len)?;
1380        // recursively write Type nodes as SchemaElements
1381        write_schema(&root, writer)?;
1382
1383        self.file_metadata
1384            .num_rows
1385            .write_thrift_field(writer, 3, 2)?;
1386
1387        // this will call RowGroupMetaData::write_thrift
1388        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1389
1390        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1391            last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1392        }
1393        if let Some(created_by) = self.file_metadata.created_by() {
1394            last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1395        }
1396        if let Some(column_orders) = self.file_metadata.column_orders() {
1397            last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1398        }
1399        #[cfg(feature = "encryption")]
1400        if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1401            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1402        }
1403        #[cfg(feature = "encryption")]
1404        if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1405            key.as_slice()
1406                .write_thrift_field(writer, 9, last_field_id)?;
1407        }
1408
1409        writer.write_struct_end()
1410    }
1411}
1412
1413fn write_schema<W: Write>(
1414    schema: &TypePtr,
1415    writer: &mut ThriftCompactOutputProtocol<W>,
1416) -> Result<()> {
1417    if !schema.is_group() {
1418        return Err(general_err!("Root schema must be Group type"));
1419    }
1420    write_schema_helper(schema, writer)
1421}
1422
1423fn write_schema_helper<W: Write>(
1424    node: &TypePtr,
1425    writer: &mut ThriftCompactOutputProtocol<W>,
1426) -> Result<()> {
1427    match node.as_ref() {
1428        crate::schema::types::Type::PrimitiveType {
1429            basic_info,
1430            physical_type,
1431            type_length,
1432            scale,
1433            precision,
1434        } => {
1435            let element = SchemaElement {
1436                r#type: Some(*physical_type),
1437                type_length: if *type_length >= 0 {
1438                    Some(*type_length)
1439                } else {
1440                    None
1441                },
1442                repetition_type: Some(basic_info.repetition()),
1443                name: basic_info.name(),
1444                num_children: None,
1445                converted_type: match basic_info.converted_type() {
1446                    ConvertedType::NONE => None,
1447                    other => Some(other),
1448                },
1449                scale: if *scale >= 0 { Some(*scale) } else { None },
1450                precision: if *precision >= 0 {
1451                    Some(*precision)
1452                } else {
1453                    None
1454                },
1455                field_id: if basic_info.has_id() {
1456                    Some(basic_info.id())
1457                } else {
1458                    None
1459                },
1460                logical_type: basic_info.logical_type_ref().cloned(),
1461            };
1462            element.write_thrift(writer)
1463        }
1464        crate::schema::types::Type::GroupType { basic_info, fields } => {
1465            let repetition = if basic_info.has_repetition() {
1466                Some(basic_info.repetition())
1467            } else {
1468                None
1469            };
1470
1471            let element = SchemaElement {
1472                r#type: None,
1473                type_length: None,
1474                repetition_type: repetition,
1475                name: basic_info.name(),
1476                num_children: Some(fields.len().try_into()?),
1477                converted_type: match basic_info.converted_type() {
1478                    ConvertedType::NONE => None,
1479                    other => Some(other),
1480                },
1481                scale: None,
1482                precision: None,
1483                field_id: if basic_info.has_id() {
1484                    Some(basic_info.id())
1485                } else {
1486                    None
1487                },
1488                logical_type: basic_info.logical_type_ref().cloned(),
1489            };
1490
1491            element.write_thrift(writer)?;
1492
1493            // Add child elements for a group
1494            for field in fields {
1495                write_schema_helper(field, writer)?;
1496            }
1497            Ok(())
1498        }
1499    }
1500}
1501
1502// struct RowGroup {
1503//   1: required list<ColumnChunk> columns
1504//   2: required i64 total_byte_size
1505//   3: required i64 num_rows
1506//   4: optional list<SortingColumn> sorting_columns
1507//   5: optional i64 file_offset
1508//   6: optional i64 total_compressed_size
1509//   7: optional i16 ordinal
1510// }
1511impl WriteThrift for RowGroupMetaData {
1512    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1513
1514    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1515        // this will call ColumnChunkMetaData::write_thrift
1516        self.columns.write_thrift_field(writer, 1, 0)?;
1517        self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1518        let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1519        if let Some(sorting_columns) = self.sorting_columns() {
1520            last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1521        }
1522        if let Some(file_offset) = self.file_offset() {
1523            last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1524        }
1525        // this is optional, but we'll always write it
1526        last_field_id = self
1527            .compressed_size()
1528            .write_thrift_field(writer, 6, last_field_id)?;
1529        if let Some(ordinal) = self.ordinal() {
1530            ordinal.write_thrift_field(writer, 7, last_field_id)?;
1531        }
1532        writer.write_struct_end()
1533    }
1534}
1535
1536// struct ColumnChunk {
1537//   1: optional string file_path
1538//   2: required i64 file_offset = 0
1539//   3: optional ColumnMetaData meta_data
1540//   4: optional i64 offset_index_offset
1541//   5: optional i32 offset_index_length
1542//   6: optional i64 column_index_offset
1543//   7: optional i32 column_index_length
1544//   8: optional ColumnCryptoMetaData crypto_metadata
1545//   9: optional binary encrypted_column_metadata
1546// }
1547impl WriteThrift for ColumnChunkMetaData {
1548    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1549
1550    #[allow(unused_assignments)]
1551    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1552        let mut last_field_id = 0i16;
1553        if let Some(file_path) = self.file_path() {
1554            last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1555        }
1556        last_field_id = self
1557            .file_offset()
1558            .write_thrift_field(writer, 2, last_field_id)?;
1559
1560        #[cfg(feature = "encryption")]
1561        {
1562            // only write the ColumnMetaData if we haven't already encrypted it
1563            if self.encrypted_column_metadata.is_none() {
1564                writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1565                serialize_column_meta_data(self, writer)?;
1566                last_field_id = 3;
1567            }
1568        }
1569        #[cfg(not(feature = "encryption"))]
1570        {
1571            // always write the ColumnMetaData
1572            writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1573            serialize_column_meta_data(self, writer)?;
1574            last_field_id = 3;
1575        }
1576
1577        if let Some(offset_idx_off) = self.offset_index_offset() {
1578            last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1579        }
1580        if let Some(offset_idx_len) = self.offset_index_length() {
1581            last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1582        }
1583        if let Some(column_idx_off) = self.column_index_offset() {
1584            last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1585        }
1586        if let Some(column_idx_len) = self.column_index_length() {
1587            last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1588        }
1589        #[cfg(feature = "encryption")]
1590        {
1591            if let Some(crypto_metadata) = self.crypto_metadata() {
1592                last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1593            }
1594            if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1595                encrypted_meta
1596                    .as_slice()
1597                    .write_thrift_field(writer, 9, last_field_id)?;
1598            }
1599        }
1600
1601        writer.write_struct_end()
1602    }
1603}
1604
1605// struct GeospatialStatistics {
1606//   1: optional BoundingBox bbox;
1607//   2: optional list<i32> geospatial_types;
1608// }
1609impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1610    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1611
1612    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1613        let mut last_field_id = 0i16;
1614        if let Some(bbox) = self.bounding_box() {
1615            last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1616        }
1617        if let Some(geo_types) = self.geospatial_types() {
1618            geo_types.write_thrift_field(writer, 2, last_field_id)?;
1619        }
1620
1621        writer.write_struct_end()
1622    }
1623}
1624
1625// macro cannot handle qualified names
1626use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1627write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1628
1629// struct BoundingBox {
1630//   1: required double xmin;
1631//   2: required double xmax;
1632//   3: required double ymin;
1633//   4: required double ymax;
1634//   5: optional double zmin;
1635//   6: optional double zmax;
1636//   7: optional double mmin;
1637//   8: optional double mmax;
1638// }
1639impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1640    const ELEMENT_TYPE: ElementType = ElementType::Struct;
1641
1642    fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1643        self.get_xmin().write_thrift_field(writer, 1, 0)?;
1644        self.get_xmax().write_thrift_field(writer, 2, 1)?;
1645        self.get_ymin().write_thrift_field(writer, 3, 2)?;
1646        let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1647
1648        if let Some(zmin) = self.get_zmin() {
1649            last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1650        }
1651        if let Some(zmax) = self.get_zmax() {
1652            last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1653        }
1654        if let Some(mmin) = self.get_mmin() {
1655            last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1656        }
1657        if let Some(mmax) = self.get_mmax() {
1658            mmax.write_thrift_field(writer, 8, last_field_id)?;
1659        }
1660
1661        writer.write_struct_end()
1662    }
1663}
1664
1665// macro cannot handle qualified names
1666use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1667write_thrift_field!(RustBoundingBox, FieldType::Struct);
1668
1669#[cfg(test)]
1670pub(crate) mod tests {
1671    use crate::errors::Result;
1672    use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema};
1673    use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1674    use crate::parquet_thrift::tests::test_roundtrip;
1675    use crate::parquet_thrift::{
1676        ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec,
1677    };
1678    use crate::schema::types::{
1679        ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
1680    };
1681    use std::sync::Arc;
1682
1683    // for testing. decode thrift encoded RowGroup
1684    pub(crate) fn read_row_group(
1685        buf: &mut [u8],
1686        schema_descr: Arc<SchemaDescriptor>,
1687    ) -> Result<RowGroupMetaData> {
1688        let mut reader = ThriftSliceInputProtocol::new(buf);
1689        crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr)
1690    }
1691
1692    pub(crate) fn read_column_chunk(
1693        buf: &mut [u8],
1694        column_descr: Arc<ColumnDescriptor>,
1695    ) -> Result<ColumnChunkMetaData> {
1696        let mut reader = ThriftSliceInputProtocol::new(buf);
1697        crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr)
1698    }
1699
1700    pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1701        let num_nodes = num_nodes(&schema)?;
1702        let mut buf = Vec::new();
1703        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1704
1705        // kick off writing list
1706        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1707
1708        // write SchemaElements
1709        write_schema(&schema, &mut writer)?;
1710
1711        let mut prot = ThriftSliceInputProtocol::new(&buf);
1712        let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1713        parquet_schema_from_array(se)
1714    }
1715
1716    pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1717        let num_nodes = num_nodes(schema)?;
1718        let mut buf = Vec::new();
1719        let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1720
1721        // kick off writing list
1722        writer.write_list_begin(ElementType::Struct, num_nodes)?;
1723
1724        // write SchemaElements
1725        write_schema(schema, &mut writer)?;
1726        Ok(buf)
1727    }
1728
1729    pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1730        let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1731        read_thrift_vec(&mut prot)
1732    }
1733
1734    #[test]
1735    fn test_bounding_box_roundtrip() {
1736        test_roundtrip(BoundingBox {
1737            xmin: 0.1.into(),
1738            xmax: 10.3.into(),
1739            ymin: 0.001.into(),
1740            ymax: 128.5.into(),
1741            zmin: None,
1742            zmax: None,
1743            mmin: None,
1744            mmax: None,
1745        });
1746
1747        test_roundtrip(BoundingBox {
1748            xmin: 0.1.into(),
1749            xmax: 10.3.into(),
1750            ymin: 0.001.into(),
1751            ymax: 128.5.into(),
1752            zmin: Some(11.0.into()),
1753            zmax: Some(1300.0.into()),
1754            mmin: None,
1755            mmax: None,
1756        });
1757
1758        test_roundtrip(BoundingBox {
1759            xmin: 0.1.into(),
1760            xmax: 10.3.into(),
1761            ymin: 0.001.into(),
1762            ymax: 128.5.into(),
1763            zmin: Some(11.0.into()),
1764            zmax: Some(1300.0.into()),
1765            mmin: Some(3.7.into()),
1766            mmax: Some(42.0.into()),
1767        });
1768    }
1769}