1use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34 column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37 basic::{
38 ColumnOrder, CompressionCodec, ConvertedType, Encoding, EncodingMask, LogicalType,
39 PageType, Repetition, Type,
40 },
41 data_type::{ByteArray, FixedLenByteArray, Int96},
42 errors::{ParquetError, Result},
43 file::{
44 metadata::{
45 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46 PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, ParquetPageEncodingStats,
47 RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
48 },
49 statistics::ValueStatistics,
50 },
51 parquet_thrift::{
52 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53 ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54 read_thrift_vec, validate_list_type,
55 },
56 schema::types::{
57 ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58 },
59 thrift_struct,
60 util::bit_util::FromBytes,
61 write_thrift_field,
62};
63
64thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67 1: optional Type r#type;
69 2: optional i32 type_length;
74 3: optional Repetition repetition_type;
77 4: required string<'a> name;
79 5: optional i32 num_children;
84 6: optional ConvertedType converted_type;
89 7: optional i32 scale
94 8: optional i32 precision
95 9: optional i32 field_id;
98 10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108 1: optional binary<'a> max;
109 2: optional binary<'a> min;
110 3: optional i64 null_count;
111 4: optional i64 distinct_count;
112 5: optional binary<'a> max_value;
113 6: optional binary<'a> min_value;
114 7: optional bool is_max_value_exact;
115 8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121 1: required double xmin;
122 2: required double xmax;
123 3: required double ymin;
124 4: required double ymax;
125 5: optional double zmin;
126 6: optional double zmax;
127 7: optional double mmin;
128 8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134 1: optional BoundingBox bbox;
135 2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141 1: optional i64 unencoded_byte_array_data_bytes;
142 2: optional list<i64> repetition_level_histogram;
143 3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148 stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150 stats.map(|st| {
151 let bbox = convert_bounding_box(st.bbox);
152 let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153 Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154 bbox,
155 geospatial_types,
156 ))
157 })
158}
159
160fn convert_bounding_box(
161 bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163 bbox.map(|bb| {
164 let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165 bb.xmin.into(),
166 bb.xmax.into(),
167 bb.ymin.into(),
168 bb.ymax.into(),
169 );
170
171 newbb = match (bb.zmin, bb.zmax) {
172 (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173 _ => newbb,
175 };
176
177 newbb = match (bb.mmin, bb.mmax) {
178 (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179 _ => newbb,
181 };
182
183 newbb
184 })
185}
186
187fn convert_stats(
189 column_descr: &Arc<ColumnDescriptor>,
190 thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192 use crate::file::statistics::Statistics as FStatistics;
193 Ok(match thrift_stats {
194 Some(stats) => {
195 let null_count = stats
197 .null_count
198 .map(|null_count| {
199 if null_count < 0 {
200 return Err(general_err!(
201 "Statistics null count is negative {}",
202 null_count
203 ));
204 }
205 Ok(null_count as u64)
206 })
207 .transpose()?;
208 let distinct_count = stats.distinct_count.map(|value| value as u64);
210 let old_format = stats.min_value.is_none() && stats.max_value.is_none();
212 let min = if old_format {
214 stats.min
215 } else {
216 stats.min_value
217 };
218 let max = if old_format {
220 stats.max
221 } else {
222 stats.max_value
223 };
224
225 fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
226 if let Some(min) = min {
227 if min.len() < len {
228 return Err(general_err!("Insufficient bytes to parse min statistic",));
229 }
230 }
231 if let Some(max) = max {
232 if max.len() < len {
233 return Err(general_err!("Insufficient bytes to parse max statistic",));
234 }
235 }
236 Ok(())
237 }
238
239 let physical_type = column_descr.physical_type();
240 match physical_type {
241 Type::BOOLEAN => check_len(&min, &max, 1),
242 Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
243 Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
244 Type::INT96 => check_len(&min, &max, 12),
245 _ => Ok(()),
246 }?;
247
248 let res = match physical_type {
253 Type::BOOLEAN => FStatistics::boolean(
254 min.map(|data| data[0] != 0),
255 max.map(|data| data[0] != 0),
256 distinct_count,
257 null_count,
258 old_format,
259 ),
260 Type::INT32 => FStatistics::int32(
261 min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
262 max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263 distinct_count,
264 null_count,
265 old_format,
266 ),
267 Type::INT64 => FStatistics::int64(
268 min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
269 max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270 distinct_count,
271 null_count,
272 old_format,
273 ),
274 Type::INT96 => {
275 let min = if let Some(data) = min {
277 assert_eq!(data.len(), 12);
278 Some(Int96::try_from_le_slice(data)?)
279 } else {
280 None
281 };
282 let max = if let Some(data) = max {
283 assert_eq!(data.len(), 12);
284 Some(Int96::try_from_le_slice(data)?)
285 } else {
286 None
287 };
288 FStatistics::int96(min, max, distinct_count, null_count, old_format)
289 }
290 Type::FLOAT => FStatistics::float(
291 min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
292 max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293 distinct_count,
294 null_count,
295 old_format,
296 ),
297 Type::DOUBLE => FStatistics::double(
298 min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
299 max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300 distinct_count,
301 null_count,
302 old_format,
303 ),
304 Type::BYTE_ARRAY => FStatistics::ByteArray(
305 ValueStatistics::new(
306 min.map(ByteArray::from),
307 max.map(ByteArray::from),
308 distinct_count,
309 null_count,
310 old_format,
311 )
312 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
313 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
314 ),
315 Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
316 ValueStatistics::new(
317 min.map(ByteArray::from).map(FixedLenByteArray::from),
318 max.map(ByteArray::from).map(FixedLenByteArray::from),
319 distinct_count,
320 null_count,
321 old_format,
322 )
323 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
324 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
325 ),
326 };
327
328 Some(res)
329 }
330 None => None,
331 })
332}
333
334const COL_META_TYPE: u16 = 1 << 1;
336const COL_META_ENCODINGS: u16 = 1 << 2;
337const COL_META_CODEC: u16 = 1 << 4;
338const COL_META_NUM_VALUES: u16 = 1 << 5;
339const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
340const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
341const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
342
343const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
345 | COL_META_ENCODINGS
346 | COL_META_CODEC
347 | COL_META_NUM_VALUES
348 | COL_META_TOTAL_UNCOMP_SZ
349 | COL_META_TOTAL_COMP_SZ
350 | COL_META_DATA_PAGE_OFFSET;
351
352fn validate_column_metadata(mask: u16) -> Result<()> {
355 if mask != COL_META_ALL_REQUIRED {
356 if mask & COL_META_ENCODINGS == 0 {
357 return Err(general_err!("Required field encodings is missing"));
358 }
359
360 if mask & COL_META_CODEC == 0 {
361 return Err(general_err!("Required field codec is missing"));
362 }
363 if mask & COL_META_NUM_VALUES == 0 {
364 return Err(general_err!("Required field num_values is missing"));
365 }
366 if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
367 return Err(general_err!(
368 "Required field total_uncompressed_size is missing"
369 ));
370 }
371 if mask & COL_META_TOTAL_COMP_SZ == 0 {
372 return Err(general_err!(
373 "Required field total_compressed_size is missing"
374 ));
375 }
376 if mask & COL_META_DATA_PAGE_OFFSET == 0 {
377 return Err(general_err!("Required field data_page_offset is missing"));
378 }
379 }
380
381 Ok(())
382}
383
384fn read_encoding_stats_as_mask<'a>(
385 prot: &mut ThriftSliceInputProtocol<'a>,
386) -> Result<EncodingMask> {
387 let mut mask = 0i32;
389 let list_ident = prot.read_list_begin()?;
390 validate_list_type(ElementType::Struct, &list_ident)?;
392
393 for _ in 0..list_ident.size {
394 let pes = PageEncodingStats::read_thrift(prot)?;
395 match pes.page_type {
396 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => mask |= 1 << pes.encoding as i32,
397 _ => {}
398 }
399 }
400 EncodingMask::try_new(mask)
401}
402
403fn read_column_metadata<'a>(
406 prot: &mut ThriftSliceInputProtocol<'a>,
407 column: &mut ColumnChunkMetaData,
408 col_index: usize,
409 options: Option<&ParquetMetaDataOptions>,
410) -> Result<u16> {
411 let mut seen_mask = 0u16;
413
414 let mut skip_pes = false;
415 let mut pes_mask = true;
416 let mut skip_col_stats = false;
417 let mut skip_size_stats = false;
418
419 if let Some(opts) = options {
420 skip_pes = opts.skip_encoding_stats(col_index);
421 pes_mask = opts.encoding_stats_as_mask();
422 skip_col_stats = opts.skip_column_stats(col_index);
423 skip_size_stats = opts.skip_size_stats(col_index);
424 }
425
426 let column_descr = &column.column_descr;
446
447 let mut last_field_id = 0i16;
448 loop {
449 let field_ident = prot.read_field_begin(last_field_id)?;
450 if field_ident.field_type == FieldType::Stop {
451 break;
452 }
453 match field_ident.id {
454 1 => {
456 Type::read_thrift(&mut *prot)?;
458 seen_mask |= COL_META_TYPE;
459 }
460 2 => {
461 column.encodings = EncodingMask::read_thrift(&mut *prot)?;
462 seen_mask |= COL_META_ENCODINGS;
463 }
464 4 => {
466 column.compression = CompressionCodec::read_thrift(&mut *prot)?;
467 seen_mask |= COL_META_CODEC;
468 }
469 5 => {
470 column.num_values = i64::read_thrift(&mut *prot)?;
471 seen_mask |= COL_META_NUM_VALUES;
472 }
473 6 => {
474 column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
475 seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
476 }
477 7 => {
478 column.total_compressed_size = i64::read_thrift(&mut *prot)?;
479 seen_mask |= COL_META_TOTAL_COMP_SZ;
480 }
481 9 => {
483 column.data_page_offset = i64::read_thrift(&mut *prot)?;
484 seen_mask |= COL_META_DATA_PAGE_OFFSET;
485 }
486 10 => {
487 column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
488 }
489 11 => {
490 column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
491 }
492 12 if !skip_col_stats => {
493 column.statistics =
494 convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
495 }
496 13 if !skip_pes => {
497 if pes_mask {
498 let val = read_encoding_stats_as_mask(&mut *prot)?;
499 column.encoding_stats = Some(ParquetPageEncodingStats::Mask(val));
500 } else {
501 let val =
502 read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
503 column.encoding_stats = Some(ParquetPageEncodingStats::Full(val));
504 }
505 }
506 14 => {
507 column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
508 }
509 15 => {
510 column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
511 }
512 16 if !skip_size_stats => {
513 let val = SizeStatistics::read_thrift(&mut *prot)?;
514 column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
515 column.repetition_level_histogram =
516 val.repetition_level_histogram.map(LevelHistogram::from);
517 column.definition_level_histogram =
518 val.definition_level_histogram.map(LevelHistogram::from);
519 }
520 17 => {
521 let val = GeospatialStatistics::read_thrift(&mut *prot)?;
522 column.geo_statistics = convert_geo_stats(Some(val));
523 }
524 _ => {
525 prot.skip(field_ident.field_type)?;
526 }
527 };
528 last_field_id = field_ident.id;
529 }
530
531 Ok(seen_mask)
532}
533
534fn read_column_chunk<'a>(
537 prot: &mut ThriftSliceInputProtocol<'a>,
538 column_descr: &Arc<ColumnDescriptor>,
539 col_index: usize,
540 options: Option<&ParquetMetaDataOptions>,
541) -> Result<ColumnChunkMetaData> {
542 let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
544
545 let mut has_file_offset = false;
547
548 let mut col_meta_mask = 0u16;
550
551 let mut last_field_id = 0i16;
563 loop {
564 let field_ident = prot.read_field_begin(last_field_id)?;
565 if field_ident.field_type == FieldType::Stop {
566 break;
567 }
568 match field_ident.id {
569 1 => {
570 col.file_path = Some(String::read_thrift(&mut *prot)?);
571 }
572 2 => {
573 col.file_offset = i64::read_thrift(&mut *prot)?;
574 has_file_offset = true;
575 }
576 3 => {
577 col_meta_mask = read_column_metadata(&mut *prot, &mut col, col_index, options)?;
578 }
579 4 => {
580 col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
581 }
582 5 => {
583 col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
584 }
585 6 => {
586 col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
587 }
588 7 => {
589 col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
590 }
591 #[cfg(feature = "encryption")]
592 8 => {
593 let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
594 col.column_crypto_metadata = Some(Box::new(val));
595 }
596 #[cfg(feature = "encryption")]
597 9 => {
598 col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
599 }
600 _ => {
601 prot.skip(field_ident.field_type)?;
602 }
603 };
604 last_field_id = field_ident.id;
605 }
606
607 if !has_file_offset {
609 return Err(general_err!("Required field file_offset is missing"));
610 };
611
612 #[cfg(feature = "encryption")]
614 if col.encrypted_column_metadata.is_some() {
615 return Ok(col);
616 }
617
618 validate_column_metadata(col_meta_mask)?;
620
621 Ok(col)
622}
623
624fn read_row_group(
625 prot: &mut ThriftSliceInputProtocol,
626 schema_descr: &Arc<SchemaDescriptor>,
627 options: Option<&ParquetMetaDataOptions>,
628) -> Result<RowGroupMetaData> {
629 let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
631
632 const RG_COLUMNS: u8 = 1 << 1;
634 const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
635 const RG_NUM_ROWS: u8 = 1 << 3;
636 const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
637
638 let mut mask = 0u8;
639
640 let mut last_field_id = 0i16;
650 loop {
651 let field_ident = prot.read_field_begin(last_field_id)?;
652 if field_ident.field_type == FieldType::Stop {
653 break;
654 }
655 match field_ident.id {
656 1 => {
657 let list_ident = prot.read_list_begin()?;
658 validate_list_type(ElementType::Struct, &list_ident)?;
660 if schema_descr.num_columns() != list_ident.size as usize {
661 return Err(general_err!(
662 "Column count mismatch. Schema has {} columns while Row Group has {}",
663 schema_descr.num_columns(),
664 list_ident.size
665 ));
666 }
667 for i in 0..list_ident.size as usize {
668 let col = read_column_chunk(prot, &schema_descr.columns()[i], i, options)?;
669 row_group.columns.push(col);
670 }
671 mask |= RG_COLUMNS;
672 }
673 2 => {
674 row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
675 mask |= RG_TOT_BYTE_SIZE;
676 }
677 3 => {
678 row_group.num_rows = i64::read_thrift(&mut *prot)?;
679 mask |= RG_NUM_ROWS;
680 }
681 4 => {
682 let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
683 row_group.sorting_columns = Some(val);
684 }
685 5 => {
686 row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
687 }
688 7 => {
690 row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
691 }
692 _ => {
693 prot.skip(field_ident.field_type)?;
694 }
695 };
696 last_field_id = field_ident.id;
697 }
698
699 if mask != RG_ALL_REQUIRED {
700 if mask & RG_COLUMNS == 0 {
701 return Err(general_err!("Required field columns is missing"));
702 }
703 if mask & RG_TOT_BYTE_SIZE == 0 {
704 return Err(general_err!("Required field total_byte_size is missing"));
705 }
706 if mask & RG_NUM_ROWS == 0 {
707 return Err(general_err!("Required field num_rows is missing"));
708 }
709 }
710
711 Ok(row_group)
712}
713
714pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
717 let mut prot = ThriftSliceInputProtocol::new(buf);
718
719 let mut last_field_id = 0i16;
720 loop {
721 let field_ident = prot.read_field_begin(last_field_id)?;
722 if field_ident.field_type == FieldType::Stop {
723 break;
724 }
725 match field_ident.id {
726 2 => {
727 let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
729 let val = parquet_schema_from_array(val)?;
730 return Ok(SchemaDescriptor::new(val));
731 }
732 _ => prot.skip(field_ident.field_type)?,
733 }
734 last_field_id = field_ident.id;
735 }
736 Err(general_err!("Input does not contain a schema"))
737}
738
739pub(crate) fn parquet_metadata_from_bytes(
742 buf: &[u8],
743 options: Option<&ParquetMetaDataOptions>,
744) -> Result<ParquetMetaData> {
745 let mut prot = ThriftSliceInputProtocol::new(buf);
746
747 let mut version: Option<i32> = None;
749 let mut num_rows: Option<i64> = None;
750 let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
751 let mut key_value_metadata: Option<Vec<KeyValue>> = None;
752 let mut created_by: Option<&str> = None;
753 let mut column_orders: Option<Vec<ColumnOrder>> = None;
754 #[cfg(feature = "encryption")]
755 let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
756 #[cfg(feature = "encryption")]
757 let mut footer_signing_key_metadata: Option<&[u8]> = None;
758
759 let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
761
762 if let Some(options) = options {
764 schema_descr = options.schema().cloned();
765 }
766
767 let mut last_field_id = 0i16;
779 loop {
780 let field_ident = prot.read_field_begin(last_field_id)?;
781 if field_ident.field_type == FieldType::Stop {
782 break;
783 }
784 match field_ident.id {
785 1 => {
786 version = Some(i32::read_thrift(&mut prot)?);
787 }
788 2 => {
789 if schema_descr.is_some() {
791 prot.skip(field_ident.field_type)?;
792 } else {
793 let val =
795 read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
796 let val = parquet_schema_from_array(val)?;
797 schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
798 }
799 }
800 3 => {
801 num_rows = Some(i64::read_thrift(&mut prot)?);
802 }
803 4 => {
804 if schema_descr.is_none() {
805 return Err(general_err!("Required field schema is missing"));
806 }
807 let schema_descr = schema_descr.as_ref().unwrap();
808 let list_ident = prot.read_list_begin()?;
809 validate_list_type(ElementType::Struct, &list_ident)?;
811 let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
812
813 let mut assigner = OrdinalAssigner::new();
815 for ordinal in 0..list_ident.size {
816 let ordinal: i16 = ordinal.try_into().map_err(|_| {
817 ParquetError::General(format!(
818 "Row group ordinal {ordinal} exceeds i16 max value",
819 ))
820 })?;
821 let rg = read_row_group(&mut prot, schema_descr, options)?;
822 rg_vec.push(assigner.ensure(ordinal, rg)?);
823 }
824 row_groups = Some(rg_vec);
825 }
826 5 => {
827 let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
828 key_value_metadata = Some(val);
829 }
830 6 => {
831 created_by = Some(<&str>::read_thrift(&mut prot)?);
832 }
833 7 => {
834 let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
835 column_orders = Some(val);
836 }
837 #[cfg(feature = "encryption")]
838 8 => {
839 let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
840 encryption_algorithm = Some(val);
841 }
842 #[cfg(feature = "encryption")]
843 9 => {
844 footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
845 }
846 _ => {
847 prot.skip(field_ident.field_type)?;
848 }
849 };
850 last_field_id = field_ident.id;
851 }
852 let Some(version) = version else {
853 return Err(general_err!("Required field version is missing"));
854 };
855 let Some(num_rows) = num_rows else {
856 return Err(general_err!("Required field num_rows is missing"));
857 };
858 let Some(row_groups) = row_groups else {
859 return Err(general_err!("Required field row_groups is missing"));
860 };
861
862 let created_by = created_by.map(|c| c.to_owned());
863
864 let schema_descr = schema_descr.unwrap();
866
867 if column_orders
869 .as_ref()
870 .is_some_and(|cos| cos.len() != schema_descr.num_columns())
871 {
872 return Err(general_err!("Column order length mismatch"));
873 }
874 let column_orders = column_orders.map(|mut cos| {
877 for (i, column) in schema_descr.columns().iter().enumerate() {
878 if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
879 let sort_order = ColumnOrder::sort_order_for_type(
880 column.logical_type_ref(),
881 column.converted_type(),
882 column.physical_type(),
883 );
884 cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
885 }
886 }
887 cos
888 });
889
890 #[cfg(not(feature = "encryption"))]
891 let fmd = crate::file::metadata::FileMetaData::new(
892 version,
893 num_rows,
894 created_by,
895 key_value_metadata,
896 schema_descr,
897 column_orders,
898 );
899 #[cfg(feature = "encryption")]
900 let fmd = crate::file::metadata::FileMetaData::new(
901 version,
902 num_rows,
903 created_by,
904 key_value_metadata,
905 schema_descr,
906 column_orders,
907 )
908 .with_encryption_algorithm(encryption_algorithm)
909 .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
910
911 Ok(ParquetMetaData::new(fmd, row_groups))
912}
913
914#[derive(Debug, Default)]
916pub(crate) struct OrdinalAssigner {
917 first_has_ordinal: Option<bool>,
918}
919
920impl OrdinalAssigner {
921 fn new() -> Self {
922 Default::default()
923 }
924
925 fn ensure(
938 &mut self,
939 actual_ordinal: i16,
940 mut rg: RowGroupMetaData,
941 ) -> Result<RowGroupMetaData> {
942 let rg_has_ordinal = rg.ordinal.is_some();
943
944 if self.first_has_ordinal.is_none() {
946 self.first_has_ordinal = Some(rg_has_ordinal);
947 }
948
949 let first_has_ordinal = self.first_has_ordinal.unwrap();
951 if !first_has_ordinal && !rg_has_ordinal {
952 rg.ordinal = Some(actual_ordinal);
953 } else if first_has_ordinal != rg_has_ordinal {
954 return Err(general_err!(
955 "Inconsistent ordinal assignment: first_has_ordinal is set to \
956 {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
957 first_has_ordinal,
958 actual_ordinal,
959 rg_has_ordinal
960 ));
961 }
962 Ok(rg)
963 }
964}
965
966thrift_struct!(
967 pub(crate) struct IndexPageHeader {}
968);
969
970thrift_struct!(
971pub(crate) struct DictionaryPageHeader {
972 1: required i32 num_values;
974
975 2: required Encoding encoding
977
978 3: optional bool is_sorted;
980}
981);
982
983thrift_struct!(
984pub(crate) struct PageStatistics {
992 1: optional binary max;
993 2: optional binary min;
994 3: optional i64 null_count;
995 4: optional i64 distinct_count;
996 5: optional binary max_value;
997 6: optional binary min_value;
998 7: optional bool is_max_value_exact;
999 8: optional bool is_min_value_exact;
1000}
1001);
1002
1003thrift_struct!(
1004pub(crate) struct DataPageHeader {
1005 1: required i32 num_values
1006 2: required Encoding encoding
1007 3: required Encoding definition_level_encoding;
1008 4: required Encoding repetition_level_encoding;
1009 5: optional PageStatistics statistics;
1010}
1011);
1012
1013impl DataPageHeader {
1014 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1016 where
1017 R: ThriftCompactInputProtocol<'a>,
1018 {
1019 let mut num_values: Option<i32> = None;
1020 let mut encoding: Option<Encoding> = None;
1021 let mut definition_level_encoding: Option<Encoding> = None;
1022 let mut repetition_level_encoding: Option<Encoding> = None;
1023 let statistics: Option<PageStatistics> = None;
1024 let mut last_field_id = 0i16;
1025 loop {
1026 let field_ident = prot.read_field_begin(last_field_id)?;
1027 if field_ident.field_type == FieldType::Stop {
1028 break;
1029 }
1030 match field_ident.id {
1031 1 => {
1032 let val = i32::read_thrift(&mut *prot)?;
1033 num_values = Some(val);
1034 }
1035 2 => {
1036 let val = Encoding::read_thrift(&mut *prot)?;
1037 encoding = Some(val);
1038 }
1039 3 => {
1040 let val = Encoding::read_thrift(&mut *prot)?;
1041 definition_level_encoding = Some(val);
1042 }
1043 4 => {
1044 let val = Encoding::read_thrift(&mut *prot)?;
1045 repetition_level_encoding = Some(val);
1046 }
1047 _ => {
1048 prot.skip(field_ident.field_type)?;
1049 }
1050 };
1051 last_field_id = field_ident.id;
1052 }
1053 let Some(num_values) = num_values else {
1054 return Err(general_err!("Required field num_values is missing"));
1055 };
1056 let Some(encoding) = encoding else {
1057 return Err(general_err!("Required field encoding is missing"));
1058 };
1059 let Some(definition_level_encoding) = definition_level_encoding else {
1060 return Err(general_err!(
1061 "Required field definition_level_encoding is missing"
1062 ));
1063 };
1064 let Some(repetition_level_encoding) = repetition_level_encoding else {
1065 return Err(general_err!(
1066 "Required field repetition_level_encoding is missing"
1067 ));
1068 };
1069 Ok(Self {
1070 num_values,
1071 encoding,
1072 definition_level_encoding,
1073 repetition_level_encoding,
1074 statistics,
1075 })
1076 }
1077}
1078
1079thrift_struct!(
1080pub(crate) struct DataPageHeaderV2 {
1081 1: required i32 num_values
1082 2: required i32 num_nulls
1083 3: required i32 num_rows
1084 4: required Encoding encoding
1085 5: required i32 definition_levels_byte_length;
1086 6: required i32 repetition_levels_byte_length;
1087 7: optional bool is_compressed = true;
1088 8: optional PageStatistics statistics;
1089}
1090);
1091
1092impl DataPageHeaderV2 {
1093 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1095 where
1096 R: ThriftCompactInputProtocol<'a>,
1097 {
1098 let mut num_values: Option<i32> = None;
1099 let mut num_nulls: Option<i32> = None;
1100 let mut num_rows: Option<i32> = None;
1101 let mut encoding: Option<Encoding> = None;
1102 let mut definition_levels_byte_length: Option<i32> = None;
1103 let mut repetition_levels_byte_length: Option<i32> = None;
1104 let mut is_compressed: Option<bool> = None;
1105 let statistics: Option<PageStatistics> = None;
1106 let mut last_field_id = 0i16;
1107 loop {
1108 let field_ident = prot.read_field_begin(last_field_id)?;
1109 if field_ident.field_type == FieldType::Stop {
1110 break;
1111 }
1112 match field_ident.id {
1113 1 => {
1114 let val = i32::read_thrift(&mut *prot)?;
1115 num_values = Some(val);
1116 }
1117 2 => {
1118 let val = i32::read_thrift(&mut *prot)?;
1119 num_nulls = Some(val);
1120 }
1121 3 => {
1122 let val = i32::read_thrift(&mut *prot)?;
1123 num_rows = Some(val);
1124 }
1125 4 => {
1126 let val = Encoding::read_thrift(&mut *prot)?;
1127 encoding = Some(val);
1128 }
1129 5 => {
1130 let val = i32::read_thrift(&mut *prot)?;
1131 definition_levels_byte_length = Some(val);
1132 }
1133 6 => {
1134 let val = i32::read_thrift(&mut *prot)?;
1135 repetition_levels_byte_length = Some(val);
1136 }
1137 7 => {
1138 is_compressed = Some(field_ident.bool_val()?);
1139 }
1140 _ => {
1141 prot.skip(field_ident.field_type)?;
1142 }
1143 };
1144 last_field_id = field_ident.id;
1145 }
1146 let Some(num_values) = num_values else {
1147 return Err(general_err!("Required field num_values is missing"));
1148 };
1149 let Some(num_nulls) = num_nulls else {
1150 return Err(general_err!("Required field num_nulls is missing"));
1151 };
1152 let Some(num_rows) = num_rows else {
1153 return Err(general_err!("Required field num_rows is missing"));
1154 };
1155 let Some(encoding) = encoding else {
1156 return Err(general_err!("Required field encoding is missing"));
1157 };
1158 let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1159 return Err(general_err!(
1160 "Required field definition_levels_byte_length is missing"
1161 ));
1162 };
1163 let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1164 return Err(general_err!(
1165 "Required field repetition_levels_byte_length is missing"
1166 ));
1167 };
1168 Ok(Self {
1169 num_values,
1170 num_nulls,
1171 num_rows,
1172 encoding,
1173 definition_levels_byte_length,
1174 repetition_levels_byte_length,
1175 is_compressed,
1176 statistics,
1177 })
1178 }
1179}
1180
1181thrift_struct!(
1182pub(crate) struct PageHeader {
1183 1: required PageType r#type
1185
1186 2: required i32 uncompressed_page_size
1188
1189 3: required i32 compressed_page_size
1191
1192 4: optional i32 crc
1194
1195 5: optional DataPageHeader data_page_header;
1197 6: optional IndexPageHeader index_page_header;
1198 7: optional DictionaryPageHeader dictionary_page_header;
1199 8: optional DataPageHeaderV2 data_page_header_v2;
1200}
1201);
1202
1203impl PageHeader {
1204 pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1208 where
1209 R: ThriftCompactInputProtocol<'a>,
1210 {
1211 let mut type_: Option<PageType> = None;
1212 let mut uncompressed_page_size: Option<i32> = None;
1213 let mut compressed_page_size: Option<i32> = None;
1214 let mut crc: Option<i32> = None;
1215 let mut data_page_header: Option<DataPageHeader> = None;
1216 let mut index_page_header: Option<IndexPageHeader> = None;
1217 let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1218 let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1219 let mut last_field_id = 0i16;
1220 loop {
1221 let field_ident = prot.read_field_begin(last_field_id)?;
1222 if field_ident.field_type == FieldType::Stop {
1223 break;
1224 }
1225 match field_ident.id {
1226 1 => {
1227 let val = PageType::read_thrift(&mut *prot)?;
1228 type_ = Some(val);
1229 }
1230 2 => {
1231 let val = i32::read_thrift(&mut *prot)?;
1232 uncompressed_page_size = Some(val);
1233 }
1234 3 => {
1235 let val = i32::read_thrift(&mut *prot)?;
1236 compressed_page_size = Some(val);
1237 }
1238 4 => {
1239 let val = i32::read_thrift(&mut *prot)?;
1240 crc = Some(val);
1241 }
1242 5 => {
1243 let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1244 data_page_header = Some(val);
1245 }
1246 6 => {
1247 let val = IndexPageHeader::read_thrift(&mut *prot)?;
1248 index_page_header = Some(val);
1249 }
1250 7 => {
1251 let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1252 dictionary_page_header = Some(val);
1253 }
1254 8 => {
1255 let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1256 data_page_header_v2 = Some(val);
1257 }
1258 _ => {
1259 prot.skip(field_ident.field_type)?;
1260 }
1261 };
1262 last_field_id = field_ident.id;
1263 }
1264 let Some(type_) = type_ else {
1265 return Err(general_err!("Required field type_ is missing"));
1266 };
1267 let Some(uncompressed_page_size) = uncompressed_page_size else {
1268 return Err(general_err!(
1269 "Required field uncompressed_page_size is missing"
1270 ));
1271 };
1272 let Some(compressed_page_size) = compressed_page_size else {
1273 return Err(general_err!(
1274 "Required field compressed_page_size is missing"
1275 ));
1276 };
1277 Ok(Self {
1278 r#type: type_,
1279 uncompressed_page_size,
1280 compressed_page_size,
1281 crc,
1282 data_page_header,
1283 index_page_header,
1284 dictionary_page_header,
1285 data_page_header_v2,
1286 })
1287 }
1288}
1289
1290#[cfg(feature = "encryption")]
1294fn should_write_column_stats(column_chunk: &ColumnChunkMetaData) -> bool {
1295 column_chunk.encrypted_column_metadata.is_none()
1299}
1300
1301#[cfg(not(feature = "encryption"))]
1302fn should_write_column_stats(_column_chunk: &ColumnChunkMetaData) -> bool {
1303 true
1304}
1305
1306pub(super) fn serialize_column_meta_data<W: Write>(
1327 column_chunk: &ColumnChunkMetaData,
1328 w: &mut ThriftCompactOutputProtocol<W>,
1329) -> Result<()> {
1330 use crate::file::statistics::page_stats_to_thrift;
1331
1332 column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1333 column_chunk
1334 .encodings()
1335 .collect::<Vec<_>>()
1336 .write_thrift_field(w, 2, 1)?;
1337 if w.write_path_in_schema() {
1338 let path = column_chunk.column_descr.path().parts();
1339 let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1340 path.write_thrift_field(w, 3, 2)?;
1341 column_chunk.compression.write_thrift_field(w, 4, 3)?;
1342 } else {
1343 column_chunk.compression.write_thrift_field(w, 4, 2)?;
1344 }
1345
1346 column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1347 column_chunk
1348 .total_uncompressed_size
1349 .write_thrift_field(w, 6, 5)?;
1350 column_chunk
1351 .total_compressed_size
1352 .write_thrift_field(w, 7, 6)?;
1353 let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1355 if let Some(index_page_offset) = column_chunk.index_page_offset {
1356 last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1357 }
1358 if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1359 last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1360 }
1361
1362 if should_write_column_stats(column_chunk) {
1363 let stats = page_stats_to_thrift(column_chunk.statistics());
1365 if let Some(stats) = stats {
1366 last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1367 }
1368 if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1369 last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1370 }
1371 if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1372 last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1373 }
1374 if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1375 last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1376 }
1377
1378 let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1380 || column_chunk.repetition_level_histogram.is_some()
1381 || column_chunk.definition_level_histogram.is_some()
1382 {
1383 let repetition_level_histogram = column_chunk
1384 .repetition_level_histogram()
1385 .map(|hist| hist.clone().into_inner());
1386
1387 let definition_level_histogram = column_chunk
1388 .definition_level_histogram()
1389 .map(|hist| hist.clone().into_inner());
1390
1391 Some(SizeStatistics {
1392 unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1393 repetition_level_histogram,
1394 definition_level_histogram,
1395 })
1396 } else {
1397 None
1398 };
1399 if let Some(size_stats) = size_stats {
1400 last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1401 }
1402
1403 if let Some(geo_stats) = column_chunk.geo_statistics() {
1404 geo_stats.write_thrift_field(w, 17, last_field_id)?;
1405 }
1406 }
1407
1408 w.write_struct_end()
1409}
1410
1411pub(super) struct FileMeta<'a> {
1413 pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1414 pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1415 pub(super) write_path_in_schema: bool,
1417}
1418
1419impl<'a> WriteThrift for FileMeta<'a> {
1431 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1432
1433 #[allow(unused_assignments)]
1435 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1436 writer.set_write_path_in_schema(self.write_path_in_schema);
1437
1438 self.file_metadata
1439 .version
1440 .write_thrift_field(writer, 1, 0)?;
1441
1442 let root = self.file_metadata.schema_descr().root_schema_ptr();
1445 let schema_len = num_nodes(&root)?;
1446 writer.write_field_begin(FieldType::List, 2, 1)?;
1447 writer.write_list_begin(ElementType::Struct, schema_len)?;
1448 write_schema(&root, writer)?;
1450
1451 self.file_metadata
1452 .num_rows
1453 .write_thrift_field(writer, 3, 2)?;
1454
1455 let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1457
1458 if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1459 last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1460 }
1461 if let Some(created_by) = self.file_metadata.created_by() {
1462 last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1463 }
1464 if let Some(column_orders) = self.file_metadata.column_orders() {
1465 last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1466 }
1467 #[cfg(feature = "encryption")]
1468 if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1469 last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1470 }
1471 #[cfg(feature = "encryption")]
1472 if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1473 key.as_slice()
1474 .write_thrift_field(writer, 9, last_field_id)?;
1475 }
1476
1477 writer.write_struct_end()
1478 }
1479}
1480
1481fn write_schema<W: Write>(
1482 schema: &TypePtr,
1483 writer: &mut ThriftCompactOutputProtocol<W>,
1484) -> Result<()> {
1485 if !schema.is_group() {
1486 return Err(general_err!("Root schema must be Group type"));
1487 }
1488 write_schema_helper(schema, writer)
1489}
1490
1491fn write_schema_helper<W: Write>(
1492 node: &TypePtr,
1493 writer: &mut ThriftCompactOutputProtocol<W>,
1494) -> Result<()> {
1495 match node.as_ref() {
1496 crate::schema::types::Type::PrimitiveType {
1497 basic_info,
1498 physical_type,
1499 type_length,
1500 scale,
1501 precision,
1502 } => {
1503 let element = SchemaElement {
1504 r#type: Some(*physical_type),
1505 type_length: if *type_length >= 0 {
1506 Some(*type_length)
1507 } else {
1508 None
1509 },
1510 repetition_type: Some(basic_info.repetition()),
1511 name: basic_info.name(),
1512 num_children: None,
1513 converted_type: match basic_info.converted_type() {
1514 ConvertedType::NONE => None,
1515 other => Some(other),
1516 },
1517 scale: if *scale >= 0 { Some(*scale) } else { None },
1518 precision: if *precision >= 0 {
1519 Some(*precision)
1520 } else {
1521 None
1522 },
1523 field_id: if basic_info.has_id() {
1524 Some(basic_info.id())
1525 } else {
1526 None
1527 },
1528 logical_type: basic_info.logical_type_ref().cloned(),
1529 };
1530 element.write_thrift(writer)
1531 }
1532 crate::schema::types::Type::GroupType { basic_info, fields } => {
1533 let repetition = if basic_info.has_repetition() {
1534 Some(basic_info.repetition())
1535 } else {
1536 None
1537 };
1538
1539 let element = SchemaElement {
1540 r#type: None,
1541 type_length: None,
1542 repetition_type: repetition,
1543 name: basic_info.name(),
1544 num_children: Some(fields.len().try_into()?),
1545 converted_type: match basic_info.converted_type() {
1546 ConvertedType::NONE => None,
1547 other => Some(other),
1548 },
1549 scale: None,
1550 precision: None,
1551 field_id: if basic_info.has_id() {
1552 Some(basic_info.id())
1553 } else {
1554 None
1555 },
1556 logical_type: basic_info.logical_type_ref().cloned(),
1557 };
1558
1559 element.write_thrift(writer)?;
1560
1561 for field in fields {
1563 write_schema_helper(field, writer)?;
1564 }
1565 Ok(())
1566 }
1567 }
1568}
1569
1570impl WriteThrift for RowGroupMetaData {
1580 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1581
1582 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1583 self.columns.write_thrift_field(writer, 1, 0)?;
1585 self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1586 let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1587 if let Some(sorting_columns) = self.sorting_columns() {
1588 last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1589 }
1590 if let Some(file_offset) = self.file_offset() {
1591 last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1592 }
1593 last_field_id = self
1595 .compressed_size()
1596 .write_thrift_field(writer, 6, last_field_id)?;
1597 if let Some(ordinal) = self.ordinal() {
1598 ordinal.write_thrift_field(writer, 7, last_field_id)?;
1599 }
1600 writer.write_struct_end()
1601 }
1602}
1603
1604impl WriteThrift for ColumnChunkMetaData {
1616 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1617
1618 #[allow(unused_assignments)]
1619 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1620 let mut last_field_id = 0i16;
1621 if let Some(file_path) = self.file_path() {
1622 last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1623 }
1624 last_field_id = self
1625 .file_offset()
1626 .write_thrift_field(writer, 2, last_field_id)?;
1627
1628 #[cfg(feature = "encryption")]
1629 let write_meta_data =
1630 self.encrypted_column_metadata.is_none() || self.plaintext_footer_mode;
1631 #[cfg(not(feature = "encryption"))]
1632 let write_meta_data = true;
1633
1634 if write_meta_data {
1640 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1641 serialize_column_meta_data(self, writer)?;
1642 last_field_id = 3;
1643 }
1644
1645 if let Some(offset_idx_off) = self.offset_index_offset() {
1646 last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1647 }
1648 if let Some(offset_idx_len) = self.offset_index_length() {
1649 last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1650 }
1651 if let Some(column_idx_off) = self.column_index_offset() {
1652 last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1653 }
1654 if let Some(column_idx_len) = self.column_index_length() {
1655 last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1656 }
1657 #[cfg(feature = "encryption")]
1658 {
1659 if let Some(crypto_metadata) = self.crypto_metadata() {
1660 last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1661 }
1662 if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1663 encrypted_meta
1664 .as_slice()
1665 .write_thrift_field(writer, 9, last_field_id)?;
1666 }
1667 }
1668
1669 writer.write_struct_end()
1670 }
1671}
1672
1673impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1678 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1679
1680 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1681 let mut last_field_id = 0i16;
1682 if let Some(bbox) = self.bounding_box() {
1683 last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1684 }
1685 if let Some(geo_types) = self.geospatial_types() {
1686 geo_types.write_thrift_field(writer, 2, last_field_id)?;
1687 }
1688
1689 writer.write_struct_end()
1690 }
1691}
1692
1693use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1695write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1696
1697impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1708 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1709
1710 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1711 self.get_xmin().write_thrift_field(writer, 1, 0)?;
1712 self.get_xmax().write_thrift_field(writer, 2, 1)?;
1713 self.get_ymin().write_thrift_field(writer, 3, 2)?;
1714 let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1715
1716 if let Some(zmin) = self.get_zmin() {
1717 last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1718 }
1719 if let Some(zmax) = self.get_zmax() {
1720 last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1721 }
1722 if let Some(mmin) = self.get_mmin() {
1723 last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1724 }
1725 if let Some(mmax) = self.get_mmax() {
1726 mmax.write_thrift_field(writer, 8, last_field_id)?;
1727 }
1728
1729 writer.write_struct_end()
1730 }
1731}
1732
1733use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1735write_thrift_field!(RustBoundingBox, FieldType::Struct);
1736
1737#[cfg(test)]
1738pub(crate) mod tests {
1739 use crate::basic::{Encoding, PageType, Type as PhysicalType};
1740 use crate::errors::Result;
1741 use crate::file::metadata::thrift::{
1742 BoundingBox, DataPageHeaderV2, DictionaryPageHeader, PageHeader, SchemaElement,
1743 write_schema,
1744 };
1745 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
1746 use crate::parquet_thrift::tests::test_roundtrip;
1747 use crate::parquet_thrift::{
1748 ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift,
1749 read_thrift_vec,
1750 };
1751 use crate::schema::types::{
1752 ColumnDescriptor, ColumnPath, SchemaDescriptor, TypePtr, num_nodes,
1753 parquet_schema_from_array,
1754 };
1755 use std::sync::Arc;
1756
1757 pub(crate) fn read_row_group(
1759 buf: &mut [u8],
1760 schema_descr: Arc<SchemaDescriptor>,
1761 ) -> Result<RowGroupMetaData> {
1762 let mut reader = ThriftSliceInputProtocol::new(buf);
1763 crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None)
1764 }
1765
1766 pub(crate) fn read_column_chunk(
1767 buf: &mut [u8],
1768 column_descr: Arc<ColumnDescriptor>,
1769 ) -> Result<ColumnChunkMetaData> {
1770 read_column_chunk_with_options(buf, column_descr, None)
1771 }
1772
1773 pub(crate) fn read_column_chunk_with_options(
1774 buf: &mut [u8],
1775 column_descr: Arc<ColumnDescriptor>,
1776 options: Option<&ParquetMetaDataOptions>,
1777 ) -> Result<ColumnChunkMetaData> {
1778 let mut reader = ThriftSliceInputProtocol::new(buf);
1779 crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
1780 }
1781
1782 pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1783 let num_nodes = num_nodes(&schema)?;
1784 let mut buf = Vec::new();
1785 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1786
1787 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1789
1790 write_schema(&schema, &mut writer)?;
1792
1793 let mut prot = ThriftSliceInputProtocol::new(&buf);
1794 let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1795 parquet_schema_from_array(se)
1796 }
1797
1798 pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1799 let num_nodes = num_nodes(schema)?;
1800 let mut buf = Vec::new();
1801 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1802
1803 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1805
1806 write_schema(schema, &mut writer)?;
1808 Ok(buf)
1809 }
1810
1811 pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1812 let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1813 read_thrift_vec(&mut prot)
1814 }
1815
1816 fn thrift_bytes<T: WriteThrift>(value: &T) -> Vec<u8> {
1817 let mut buf = Vec::new();
1818 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1819 value.write_thrift(&mut writer).unwrap();
1820 buf
1821 }
1822
1823 fn change_false_bool_field_to_i32(buf: &mut [u8]) {
1824 let pos = buf
1825 .iter()
1826 .rposition(|byte| *byte == 0x12)
1827 .expect("expected BOOL_FALSE field header byte");
1828 buf[pos] = 0x15;
1829 }
1830
1831 fn assert_malformed_bool_error(err: crate::errors::ParquetError) {
1832 let msg = err.to_string();
1833 assert!(
1834 msg.contains("Unexpected struct field type"),
1835 "unexpected error message: {msg}"
1836 );
1837 }
1838
1839 #[test]
1840 fn test_bounding_box_roundtrip() {
1841 test_roundtrip(BoundingBox {
1842 xmin: 0.1.into(),
1843 xmax: 10.3.into(),
1844 ymin: 0.001.into(),
1845 ymax: 128.5.into(),
1846 zmin: None,
1847 zmax: None,
1848 mmin: None,
1849 mmax: None,
1850 });
1851
1852 test_roundtrip(BoundingBox {
1853 xmin: 0.1.into(),
1854 xmax: 10.3.into(),
1855 ymin: 0.001.into(),
1856 ymax: 128.5.into(),
1857 zmin: Some(11.0.into()),
1858 zmax: Some(1300.0.into()),
1859 mmin: None,
1860 mmax: None,
1861 });
1862
1863 test_roundtrip(BoundingBox {
1864 xmin: 0.1.into(),
1865 xmax: 10.3.into(),
1866 ymin: 0.001.into(),
1867 ymax: 128.5.into(),
1868 zmin: Some(11.0.into()),
1869 zmax: Some(1300.0.into()),
1870 mmin: Some(3.7.into()),
1871 mmax: Some(42.0.into()),
1872 });
1873 }
1874
1875 #[test]
1876 fn test_convert_stats_preserves_missing_null_count() {
1877 let primitive =
1878 crate::schema::types::Type::primitive_type_builder("col", PhysicalType::INT32)
1879 .build()
1880 .unwrap();
1881 let column_descr = Arc::new(ColumnDescriptor::new(
1882 Arc::new(primitive),
1883 0,
1884 0,
1885 ColumnPath::new(vec![]),
1886 ));
1887
1888 let none_null_count = super::Statistics {
1889 max: None,
1890 min: None,
1891 null_count: None,
1892 distinct_count: None,
1893 max_value: None,
1894 min_value: None,
1895 is_max_value_exact: None,
1896 is_min_value_exact: None,
1897 };
1898 let decoded_none = super::convert_stats(&column_descr, Some(none_null_count))
1899 .unwrap()
1900 .unwrap();
1901 assert_eq!(decoded_none.null_count_opt(), None);
1902
1903 let zero_null_count = super::Statistics {
1904 max: None,
1905 min: None,
1906 null_count: Some(0),
1907 distinct_count: None,
1908 max_value: None,
1909 min_value: None,
1910 is_max_value_exact: None,
1911 is_min_value_exact: None,
1912 };
1913 let decoded_zero = super::convert_stats(&column_descr, Some(zero_null_count))
1914 .unwrap()
1915 .unwrap();
1916 assert_eq!(decoded_zero.null_count_opt(), Some(0));
1917 }
1918
1919 #[test]
1920 fn malformed_bool_field_returns_error_not_panic() {
1921 let page_header = PageHeader {
1922 r#type: PageType::DICTIONARY_PAGE,
1923 uncompressed_page_size: 1,
1924 compressed_page_size: 1,
1925 crc: None,
1926 data_page_header: None,
1927 index_page_header: None,
1928 dictionary_page_header: Some(DictionaryPageHeader {
1929 num_values: 1,
1930 encoding: Encoding::PLAIN,
1931 is_sorted: Some(false),
1932 }),
1933 data_page_header_v2: None,
1934 };
1935
1936 let mut buf = thrift_bytes(&page_header);
1937 change_false_bool_field_to_i32(&mut buf);
1938
1939 let mut prot = ThriftSliceInputProtocol::new(&buf);
1940 let err = PageHeader::read_thrift_without_stats(&mut prot)
1941 .expect_err("malformed bool field should return an error");
1942 assert_malformed_bool_error(err);
1943 }
1944
1945 #[test]
1946 fn malformed_data_page_v2_bool_field_returns_error_not_panic() {
1947 let data_page_header_v2 = DataPageHeaderV2 {
1948 num_values: 1,
1949 num_nulls: 0,
1950 num_rows: 1,
1951 encoding: Encoding::PLAIN,
1952 definition_levels_byte_length: 0,
1953 repetition_levels_byte_length: 0,
1954 is_compressed: Some(false),
1955 statistics: None,
1956 };
1957
1958 let mut buf = thrift_bytes(&data_page_header_v2);
1959 change_false_bool_field_to_i32(&mut buf);
1960
1961 let mut prot = ThriftSliceInputProtocol::new(&buf);
1962 let err = DataPageHeaderV2::read_thrift_without_stats(&mut prot)
1963 .expect_err("malformed bool field should return an error");
1964 assert_malformed_bool_error(err);
1965 }
1966}