1use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34 column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37 basic::{
38 ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType,
39 Repetition, Type,
40 },
41 data_type::{ByteArray, FixedLenByteArray, Int96},
42 errors::{ParquetError, Result},
43 file::{
44 metadata::{
45 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46 PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, ParquetPageEncodingStats,
47 RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
48 },
49 statistics::ValueStatistics,
50 },
51 parquet_thrift::{
52 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53 ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54 read_thrift_vec,
55 },
56 schema::types::{
57 ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58 },
59 thrift_struct,
60 util::bit_util::FromBytes,
61 write_thrift_field,
62};
63
64thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67 1: optional Type r#type;
69 2: optional i32 type_length;
74 3: optional Repetition repetition_type;
77 4: required string<'a> name;
79 5: optional i32 num_children;
84 6: optional ConvertedType converted_type;
89 7: optional i32 scale
94 8: optional i32 precision
95 9: optional i32 field_id;
98 10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108 1: optional binary<'a> max;
109 2: optional binary<'a> min;
110 3: optional i64 null_count;
111 4: optional i64 distinct_count;
112 5: optional binary<'a> max_value;
113 6: optional binary<'a> min_value;
114 7: optional bool is_max_value_exact;
115 8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121 1: required double xmin;
122 2: required double xmax;
123 3: required double ymin;
124 4: required double ymax;
125 5: optional double zmin;
126 6: optional double zmax;
127 7: optional double mmin;
128 8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134 1: optional BoundingBox bbox;
135 2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141 1: optional i64 unencoded_byte_array_data_bytes;
142 2: optional list<i64> repetition_level_histogram;
143 3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148 stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150 stats.map(|st| {
151 let bbox = convert_bounding_box(st.bbox);
152 let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153 Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154 bbox,
155 geospatial_types,
156 ))
157 })
158}
159
160fn convert_bounding_box(
161 bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163 bbox.map(|bb| {
164 let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165 bb.xmin.into(),
166 bb.xmax.into(),
167 bb.ymin.into(),
168 bb.ymax.into(),
169 );
170
171 newbb = match (bb.zmin, bb.zmax) {
172 (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173 _ => newbb,
175 };
176
177 newbb = match (bb.mmin, bb.mmax) {
178 (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179 _ => newbb,
181 };
182
183 newbb
184 })
185}
186
187fn convert_stats(
189 column_descr: &Arc<ColumnDescriptor>,
190 thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192 use crate::file::statistics::Statistics as FStatistics;
193 Ok(match thrift_stats {
194 Some(stats) => {
195 let null_count = stats
197 .null_count
198 .map(|null_count| {
199 if null_count < 0 {
200 return Err(general_err!(
201 "Statistics null count is negative {}",
202 null_count
203 ));
204 }
205 Ok(null_count as u64)
206 })
207 .transpose()?;
208 let distinct_count = stats.distinct_count.map(|value| value as u64);
210 let old_format = stats.min_value.is_none() && stats.max_value.is_none();
212 let min = if old_format {
214 stats.min
215 } else {
216 stats.min_value
217 };
218 let max = if old_format {
220 stats.max
221 } else {
222 stats.max_value
223 };
224
225 fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
226 if let Some(min) = min {
227 if min.len() < len {
228 return Err(general_err!("Insufficient bytes to parse min statistic",));
229 }
230 }
231 if let Some(max) = max {
232 if max.len() < len {
233 return Err(general_err!("Insufficient bytes to parse max statistic",));
234 }
235 }
236 Ok(())
237 }
238
239 let physical_type = column_descr.physical_type();
240 match physical_type {
241 Type::BOOLEAN => check_len(&min, &max, 1),
242 Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
243 Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
244 Type::INT96 => check_len(&min, &max, 12),
245 _ => Ok(()),
246 }?;
247
248 let res = match physical_type {
253 Type::BOOLEAN => FStatistics::boolean(
254 min.map(|data| data[0] != 0),
255 max.map(|data| data[0] != 0),
256 distinct_count,
257 null_count,
258 old_format,
259 ),
260 Type::INT32 => FStatistics::int32(
261 min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
262 max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263 distinct_count,
264 null_count,
265 old_format,
266 ),
267 Type::INT64 => FStatistics::int64(
268 min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
269 max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270 distinct_count,
271 null_count,
272 old_format,
273 ),
274 Type::INT96 => {
275 let min = if let Some(data) = min {
277 assert_eq!(data.len(), 12);
278 Some(Int96::try_from_le_slice(data)?)
279 } else {
280 None
281 };
282 let max = if let Some(data) = max {
283 assert_eq!(data.len(), 12);
284 Some(Int96::try_from_le_slice(data)?)
285 } else {
286 None
287 };
288 FStatistics::int96(min, max, distinct_count, null_count, old_format)
289 }
290 Type::FLOAT => FStatistics::float(
291 min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
292 max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293 distinct_count,
294 null_count,
295 old_format,
296 ),
297 Type::DOUBLE => FStatistics::double(
298 min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
299 max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300 distinct_count,
301 null_count,
302 old_format,
303 ),
304 Type::BYTE_ARRAY => FStatistics::ByteArray(
305 ValueStatistics::new(
306 min.map(ByteArray::from),
307 max.map(ByteArray::from),
308 distinct_count,
309 null_count,
310 old_format,
311 )
312 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
313 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
314 ),
315 Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
316 ValueStatistics::new(
317 min.map(ByteArray::from).map(FixedLenByteArray::from),
318 max.map(ByteArray::from).map(FixedLenByteArray::from),
319 distinct_count,
320 null_count,
321 old_format,
322 )
323 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
324 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
325 ),
326 };
327
328 Some(res)
329 }
330 None => None,
331 })
332}
333
334const COL_META_TYPE: u16 = 1 << 1;
336const COL_META_ENCODINGS: u16 = 1 << 2;
337const COL_META_CODEC: u16 = 1 << 4;
338const COL_META_NUM_VALUES: u16 = 1 << 5;
339const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
340const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
341const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
342
343const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
345 | COL_META_ENCODINGS
346 | COL_META_CODEC
347 | COL_META_NUM_VALUES
348 | COL_META_TOTAL_UNCOMP_SZ
349 | COL_META_TOTAL_COMP_SZ
350 | COL_META_DATA_PAGE_OFFSET;
351
352fn validate_column_metadata(mask: u16) -> Result<()> {
355 if mask != COL_META_ALL_REQUIRED {
356 if mask & COL_META_ENCODINGS == 0 {
357 return Err(general_err!("Required field encodings is missing"));
358 }
359
360 if mask & COL_META_CODEC == 0 {
361 return Err(general_err!("Required field codec is missing"));
362 }
363 if mask & COL_META_NUM_VALUES == 0 {
364 return Err(general_err!("Required field num_values is missing"));
365 }
366 if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
367 return Err(general_err!(
368 "Required field total_uncompressed_size is missing"
369 ));
370 }
371 if mask & COL_META_TOTAL_COMP_SZ == 0 {
372 return Err(general_err!(
373 "Required field total_compressed_size is missing"
374 ));
375 }
376 if mask & COL_META_DATA_PAGE_OFFSET == 0 {
377 return Err(general_err!("Required field data_page_offset is missing"));
378 }
379 }
380
381 Ok(())
382}
383
384fn read_encoding_stats_as_mask<'a>(
385 prot: &mut ThriftSliceInputProtocol<'a>,
386) -> Result<EncodingMask> {
387 let mut mask = 0i32;
389 let list_ident = prot.read_list_begin()?;
390 for _ in 0..list_ident.size {
391 let pes = PageEncodingStats::read_thrift(prot)?;
392 match pes.page_type {
393 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => mask |= 1 << pes.encoding as i32,
394 _ => {}
395 }
396 }
397 EncodingMask::try_new(mask)
398}
399
400fn read_column_metadata<'a>(
403 prot: &mut ThriftSliceInputProtocol<'a>,
404 column: &mut ColumnChunkMetaData,
405 col_index: usize,
406 options: Option<&ParquetMetaDataOptions>,
407) -> Result<u16> {
408 let mut seen_mask = 0u16;
410
411 let mut skip_pes = false;
412 let mut pes_mask = true;
413 let mut skip_col_stats = false;
414 let mut skip_size_stats = false;
415
416 if let Some(opts) = options {
417 skip_pes = opts.skip_encoding_stats(col_index);
418 pes_mask = opts.encoding_stats_as_mask();
419 skip_col_stats = opts.skip_column_stats(col_index);
420 skip_size_stats = opts.skip_size_stats(col_index);
421 }
422
423 let column_descr = &column.column_descr;
443
444 let mut last_field_id = 0i16;
445 loop {
446 let field_ident = prot.read_field_begin(last_field_id)?;
447 if field_ident.field_type == FieldType::Stop {
448 break;
449 }
450 match field_ident.id {
451 1 => {
453 Type::read_thrift(&mut *prot)?;
455 seen_mask |= COL_META_TYPE;
456 }
457 2 => {
458 column.encodings = EncodingMask::read_thrift(&mut *prot)?;
459 seen_mask |= COL_META_ENCODINGS;
460 }
461 4 => {
463 column.compression = Compression::read_thrift(&mut *prot)?;
464 seen_mask |= COL_META_CODEC;
465 }
466 5 => {
467 column.num_values = i64::read_thrift(&mut *prot)?;
468 seen_mask |= COL_META_NUM_VALUES;
469 }
470 6 => {
471 column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
472 seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
473 }
474 7 => {
475 column.total_compressed_size = i64::read_thrift(&mut *prot)?;
476 seen_mask |= COL_META_TOTAL_COMP_SZ;
477 }
478 9 => {
480 column.data_page_offset = i64::read_thrift(&mut *prot)?;
481 seen_mask |= COL_META_DATA_PAGE_OFFSET;
482 }
483 10 => {
484 column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
485 }
486 11 => {
487 column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
488 }
489 12 if !skip_col_stats => {
490 column.statistics =
491 convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
492 }
493 13 if !skip_pes => {
494 if pes_mask {
495 let val = read_encoding_stats_as_mask(&mut *prot)?;
496 column.encoding_stats = Some(ParquetPageEncodingStats::Mask(val));
497 } else {
498 let val =
499 read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
500 column.encoding_stats = Some(ParquetPageEncodingStats::Full(val));
501 }
502 }
503 14 => {
504 column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
505 }
506 15 => {
507 column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
508 }
509 16 if !skip_size_stats => {
510 let val = SizeStatistics::read_thrift(&mut *prot)?;
511 column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
512 column.repetition_level_histogram =
513 val.repetition_level_histogram.map(LevelHistogram::from);
514 column.definition_level_histogram =
515 val.definition_level_histogram.map(LevelHistogram::from);
516 }
517 17 => {
518 let val = GeospatialStatistics::read_thrift(&mut *prot)?;
519 column.geo_statistics = convert_geo_stats(Some(val));
520 }
521 _ => {
522 prot.skip(field_ident.field_type)?;
523 }
524 };
525 last_field_id = field_ident.id;
526 }
527
528 Ok(seen_mask)
529}
530
531fn read_column_chunk<'a>(
534 prot: &mut ThriftSliceInputProtocol<'a>,
535 column_descr: &Arc<ColumnDescriptor>,
536 col_index: usize,
537 options: Option<&ParquetMetaDataOptions>,
538) -> Result<ColumnChunkMetaData> {
539 let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
541
542 let mut has_file_offset = false;
544
545 let mut col_meta_mask = 0u16;
547
548 let mut last_field_id = 0i16;
560 loop {
561 let field_ident = prot.read_field_begin(last_field_id)?;
562 if field_ident.field_type == FieldType::Stop {
563 break;
564 }
565 match field_ident.id {
566 1 => {
567 col.file_path = Some(String::read_thrift(&mut *prot)?);
568 }
569 2 => {
570 col.file_offset = i64::read_thrift(&mut *prot)?;
571 has_file_offset = true;
572 }
573 3 => {
574 col_meta_mask = read_column_metadata(&mut *prot, &mut col, col_index, options)?;
575 }
576 4 => {
577 col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
578 }
579 5 => {
580 col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
581 }
582 6 => {
583 col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
584 }
585 7 => {
586 col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
587 }
588 #[cfg(feature = "encryption")]
589 8 => {
590 let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
591 col.column_crypto_metadata = Some(Box::new(val));
592 }
593 #[cfg(feature = "encryption")]
594 9 => {
595 col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
596 }
597 _ => {
598 prot.skip(field_ident.field_type)?;
599 }
600 };
601 last_field_id = field_ident.id;
602 }
603
604 if !has_file_offset {
606 return Err(general_err!("Required field file_offset is missing"));
607 };
608
609 #[cfg(feature = "encryption")]
611 if col.encrypted_column_metadata.is_some() {
612 return Ok(col);
613 }
614
615 validate_column_metadata(col_meta_mask)?;
617
618 Ok(col)
619}
620
621fn read_row_group(
622 prot: &mut ThriftSliceInputProtocol,
623 schema_descr: &Arc<SchemaDescriptor>,
624 options: Option<&ParquetMetaDataOptions>,
625) -> Result<RowGroupMetaData> {
626 let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
628
629 const RG_COLUMNS: u8 = 1 << 1;
631 const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
632 const RG_NUM_ROWS: u8 = 1 << 3;
633 const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
634
635 let mut mask = 0u8;
636
637 let mut last_field_id = 0i16;
647 loop {
648 let field_ident = prot.read_field_begin(last_field_id)?;
649 if field_ident.field_type == FieldType::Stop {
650 break;
651 }
652 match field_ident.id {
653 1 => {
654 let list_ident = prot.read_list_begin()?;
655 if schema_descr.num_columns() != list_ident.size as usize {
656 return Err(general_err!(
657 "Column count mismatch. Schema has {} columns while Row Group has {}",
658 schema_descr.num_columns(),
659 list_ident.size
660 ));
661 }
662 for i in 0..list_ident.size as usize {
663 let col = read_column_chunk(prot, &schema_descr.columns()[i], i, options)?;
664 row_group.columns.push(col);
665 }
666 mask |= RG_COLUMNS;
667 }
668 2 => {
669 row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
670 mask |= RG_TOT_BYTE_SIZE;
671 }
672 3 => {
673 row_group.num_rows = i64::read_thrift(&mut *prot)?;
674 mask |= RG_NUM_ROWS;
675 }
676 4 => {
677 let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
678 row_group.sorting_columns = Some(val);
679 }
680 5 => {
681 row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
682 }
683 7 => {
685 row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
686 }
687 _ => {
688 prot.skip(field_ident.field_type)?;
689 }
690 };
691 last_field_id = field_ident.id;
692 }
693
694 if mask != RG_ALL_REQUIRED {
695 if mask & RG_COLUMNS == 0 {
696 return Err(general_err!("Required field columns is missing"));
697 }
698 if mask & RG_TOT_BYTE_SIZE == 0 {
699 return Err(general_err!("Required field total_byte_size is missing"));
700 }
701 if mask & RG_NUM_ROWS == 0 {
702 return Err(general_err!("Required field num_rows is missing"));
703 }
704 }
705
706 Ok(row_group)
707}
708
709pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
712 let mut prot = ThriftSliceInputProtocol::new(buf);
713
714 let mut last_field_id = 0i16;
715 loop {
716 let field_ident = prot.read_field_begin(last_field_id)?;
717 if field_ident.field_type == FieldType::Stop {
718 break;
719 }
720 match field_ident.id {
721 2 => {
722 let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
724 let val = parquet_schema_from_array(val)?;
725 return Ok(SchemaDescriptor::new(val));
726 }
727 _ => prot.skip(field_ident.field_type)?,
728 }
729 last_field_id = field_ident.id;
730 }
731 Err(general_err!("Input does not contain a schema"))
732}
733
734pub(crate) fn parquet_metadata_from_bytes(
737 buf: &[u8],
738 options: Option<&ParquetMetaDataOptions>,
739) -> Result<ParquetMetaData> {
740 let mut prot = ThriftSliceInputProtocol::new(buf);
741
742 let mut version: Option<i32> = None;
744 let mut num_rows: Option<i64> = None;
745 let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
746 let mut key_value_metadata: Option<Vec<KeyValue>> = None;
747 let mut created_by: Option<&str> = None;
748 let mut column_orders: Option<Vec<ColumnOrder>> = None;
749 #[cfg(feature = "encryption")]
750 let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
751 #[cfg(feature = "encryption")]
752 let mut footer_signing_key_metadata: Option<&[u8]> = None;
753
754 let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
756
757 if let Some(options) = options {
759 schema_descr = options.schema().cloned();
760 }
761
762 let mut last_field_id = 0i16;
774 loop {
775 let field_ident = prot.read_field_begin(last_field_id)?;
776 if field_ident.field_type == FieldType::Stop {
777 break;
778 }
779 match field_ident.id {
780 1 => {
781 version = Some(i32::read_thrift(&mut prot)?);
782 }
783 2 => {
784 if schema_descr.is_some() {
786 prot.skip(field_ident.field_type)?;
787 } else {
788 let val =
790 read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
791 let val = parquet_schema_from_array(val)?;
792 schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
793 }
794 }
795 3 => {
796 num_rows = Some(i64::read_thrift(&mut prot)?);
797 }
798 4 => {
799 if schema_descr.is_none() {
800 return Err(general_err!("Required field schema is missing"));
801 }
802 let schema_descr = schema_descr.as_ref().unwrap();
803 let list_ident = prot.read_list_begin()?;
804 let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
805
806 let mut assigner = OrdinalAssigner::new();
808 for ordinal in 0..list_ident.size {
809 let ordinal: i16 = ordinal.try_into().map_err(|_| {
810 ParquetError::General(format!(
811 "Row group ordinal {ordinal} exceeds i16 max value",
812 ))
813 })?;
814 let rg = read_row_group(&mut prot, schema_descr, options)?;
815 rg_vec.push(assigner.ensure(ordinal, rg)?);
816 }
817 row_groups = Some(rg_vec);
818 }
819 5 => {
820 let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
821 key_value_metadata = Some(val);
822 }
823 6 => {
824 created_by = Some(<&str>::read_thrift(&mut prot)?);
825 }
826 7 => {
827 let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
828 column_orders = Some(val);
829 }
830 #[cfg(feature = "encryption")]
831 8 => {
832 let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
833 encryption_algorithm = Some(val);
834 }
835 #[cfg(feature = "encryption")]
836 9 => {
837 footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
838 }
839 _ => {
840 prot.skip(field_ident.field_type)?;
841 }
842 };
843 last_field_id = field_ident.id;
844 }
845 let Some(version) = version else {
846 return Err(general_err!("Required field version is missing"));
847 };
848 let Some(num_rows) = num_rows else {
849 return Err(general_err!("Required field num_rows is missing"));
850 };
851 let Some(row_groups) = row_groups else {
852 return Err(general_err!("Required field row_groups is missing"));
853 };
854
855 let created_by = created_by.map(|c| c.to_owned());
856
857 let schema_descr = schema_descr.unwrap();
859
860 if column_orders
862 .as_ref()
863 .is_some_and(|cos| cos.len() != schema_descr.num_columns())
864 {
865 return Err(general_err!("Column order length mismatch"));
866 }
867 let column_orders = column_orders.map(|mut cos| {
870 for (i, column) in schema_descr.columns().iter().enumerate() {
871 if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
872 let sort_order = ColumnOrder::sort_order_for_type(
873 column.logical_type_ref(),
874 column.converted_type(),
875 column.physical_type(),
876 );
877 cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
878 }
879 }
880 cos
881 });
882
883 #[cfg(not(feature = "encryption"))]
884 let fmd = crate::file::metadata::FileMetaData::new(
885 version,
886 num_rows,
887 created_by,
888 key_value_metadata,
889 schema_descr,
890 column_orders,
891 );
892 #[cfg(feature = "encryption")]
893 let fmd = crate::file::metadata::FileMetaData::new(
894 version,
895 num_rows,
896 created_by,
897 key_value_metadata,
898 schema_descr,
899 column_orders,
900 )
901 .with_encryption_algorithm(encryption_algorithm)
902 .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
903
904 Ok(ParquetMetaData::new(fmd, row_groups))
905}
906
907#[derive(Debug, Default)]
909pub(crate) struct OrdinalAssigner {
910 first_has_ordinal: Option<bool>,
911}
912
913impl OrdinalAssigner {
914 fn new() -> Self {
915 Default::default()
916 }
917
918 fn ensure(
931 &mut self,
932 actual_ordinal: i16,
933 mut rg: RowGroupMetaData,
934 ) -> Result<RowGroupMetaData> {
935 let rg_has_ordinal = rg.ordinal.is_some();
936
937 if self.first_has_ordinal.is_none() {
939 self.first_has_ordinal = Some(rg_has_ordinal);
940 }
941
942 let first_has_ordinal = self.first_has_ordinal.unwrap();
944 if !first_has_ordinal && !rg_has_ordinal {
945 rg.ordinal = Some(actual_ordinal);
946 } else if first_has_ordinal != rg_has_ordinal {
947 return Err(general_err!(
948 "Inconsistent ordinal assignment: first_has_ordinal is set to \
949 {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
950 first_has_ordinal,
951 actual_ordinal,
952 rg_has_ordinal
953 ));
954 }
955 Ok(rg)
956 }
957}
958
959thrift_struct!(
960 pub(crate) struct IndexPageHeader {}
961);
962
963thrift_struct!(
964pub(crate) struct DictionaryPageHeader {
965 1: required i32 num_values;
967
968 2: required Encoding encoding
970
971 3: optional bool is_sorted;
973}
974);
975
976thrift_struct!(
977pub(crate) struct PageStatistics {
985 1: optional binary max;
986 2: optional binary min;
987 3: optional i64 null_count;
988 4: optional i64 distinct_count;
989 5: optional binary max_value;
990 6: optional binary min_value;
991 7: optional bool is_max_value_exact;
992 8: optional bool is_min_value_exact;
993}
994);
995
996thrift_struct!(
997pub(crate) struct DataPageHeader {
998 1: required i32 num_values
999 2: required Encoding encoding
1000 3: required Encoding definition_level_encoding;
1001 4: required Encoding repetition_level_encoding;
1002 5: optional PageStatistics statistics;
1003}
1004);
1005
1006impl DataPageHeader {
1007 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1009 where
1010 R: ThriftCompactInputProtocol<'a>,
1011 {
1012 let mut num_values: Option<i32> = None;
1013 let mut encoding: Option<Encoding> = None;
1014 let mut definition_level_encoding: Option<Encoding> = None;
1015 let mut repetition_level_encoding: Option<Encoding> = None;
1016 let statistics: Option<PageStatistics> = None;
1017 let mut last_field_id = 0i16;
1018 loop {
1019 let field_ident = prot.read_field_begin(last_field_id)?;
1020 if field_ident.field_type == FieldType::Stop {
1021 break;
1022 }
1023 match field_ident.id {
1024 1 => {
1025 let val = i32::read_thrift(&mut *prot)?;
1026 num_values = Some(val);
1027 }
1028 2 => {
1029 let val = Encoding::read_thrift(&mut *prot)?;
1030 encoding = Some(val);
1031 }
1032 3 => {
1033 let val = Encoding::read_thrift(&mut *prot)?;
1034 definition_level_encoding = Some(val);
1035 }
1036 4 => {
1037 let val = Encoding::read_thrift(&mut *prot)?;
1038 repetition_level_encoding = Some(val);
1039 }
1040 _ => {
1041 prot.skip(field_ident.field_type)?;
1042 }
1043 };
1044 last_field_id = field_ident.id;
1045 }
1046 let Some(num_values) = num_values else {
1047 return Err(general_err!("Required field num_values is missing"));
1048 };
1049 let Some(encoding) = encoding else {
1050 return Err(general_err!("Required field encoding is missing"));
1051 };
1052 let Some(definition_level_encoding) = definition_level_encoding else {
1053 return Err(general_err!(
1054 "Required field definition_level_encoding is missing"
1055 ));
1056 };
1057 let Some(repetition_level_encoding) = repetition_level_encoding else {
1058 return Err(general_err!(
1059 "Required field repetition_level_encoding is missing"
1060 ));
1061 };
1062 Ok(Self {
1063 num_values,
1064 encoding,
1065 definition_level_encoding,
1066 repetition_level_encoding,
1067 statistics,
1068 })
1069 }
1070}
1071
1072thrift_struct!(
1073pub(crate) struct DataPageHeaderV2 {
1074 1: required i32 num_values
1075 2: required i32 num_nulls
1076 3: required i32 num_rows
1077 4: required Encoding encoding
1078 5: required i32 definition_levels_byte_length;
1079 6: required i32 repetition_levels_byte_length;
1080 7: optional bool is_compressed = true;
1081 8: optional PageStatistics statistics;
1082}
1083);
1084
1085impl DataPageHeaderV2 {
1086 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1088 where
1089 R: ThriftCompactInputProtocol<'a>,
1090 {
1091 let mut num_values: Option<i32> = None;
1092 let mut num_nulls: Option<i32> = None;
1093 let mut num_rows: Option<i32> = None;
1094 let mut encoding: Option<Encoding> = None;
1095 let mut definition_levels_byte_length: Option<i32> = None;
1096 let mut repetition_levels_byte_length: Option<i32> = None;
1097 let mut is_compressed: Option<bool> = None;
1098 let statistics: Option<PageStatistics> = None;
1099 let mut last_field_id = 0i16;
1100 loop {
1101 let field_ident = prot.read_field_begin(last_field_id)?;
1102 if field_ident.field_type == FieldType::Stop {
1103 break;
1104 }
1105 match field_ident.id {
1106 1 => {
1107 let val = i32::read_thrift(&mut *prot)?;
1108 num_values = Some(val);
1109 }
1110 2 => {
1111 let val = i32::read_thrift(&mut *prot)?;
1112 num_nulls = Some(val);
1113 }
1114 3 => {
1115 let val = i32::read_thrift(&mut *prot)?;
1116 num_rows = Some(val);
1117 }
1118 4 => {
1119 let val = Encoding::read_thrift(&mut *prot)?;
1120 encoding = Some(val);
1121 }
1122 5 => {
1123 let val = i32::read_thrift(&mut *prot)?;
1124 definition_levels_byte_length = Some(val);
1125 }
1126 6 => {
1127 let val = i32::read_thrift(&mut *prot)?;
1128 repetition_levels_byte_length = Some(val);
1129 }
1130 7 => {
1131 if field_ident.bool_val.is_none() {
1132 return Err(general_err!(
1133 "Expected bool field but got thrift type {:?}",
1134 field_ident.field_type
1135 ));
1136 }
1137 is_compressed = field_ident.bool_val;
1138 }
1139 _ => {
1140 prot.skip(field_ident.field_type)?;
1141 }
1142 };
1143 last_field_id = field_ident.id;
1144 }
1145 let Some(num_values) = num_values else {
1146 return Err(general_err!("Required field num_values is missing"));
1147 };
1148 let Some(num_nulls) = num_nulls else {
1149 return Err(general_err!("Required field num_nulls is missing"));
1150 };
1151 let Some(num_rows) = num_rows else {
1152 return Err(general_err!("Required field num_rows is missing"));
1153 };
1154 let Some(encoding) = encoding else {
1155 return Err(general_err!("Required field encoding is missing"));
1156 };
1157 let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1158 return Err(general_err!(
1159 "Required field definition_levels_byte_length is missing"
1160 ));
1161 };
1162 let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1163 return Err(general_err!(
1164 "Required field repetition_levels_byte_length is missing"
1165 ));
1166 };
1167 Ok(Self {
1168 num_values,
1169 num_nulls,
1170 num_rows,
1171 encoding,
1172 definition_levels_byte_length,
1173 repetition_levels_byte_length,
1174 is_compressed,
1175 statistics,
1176 })
1177 }
1178}
1179
1180thrift_struct!(
1181pub(crate) struct PageHeader {
1182 1: required PageType r#type
1184
1185 2: required i32 uncompressed_page_size
1187
1188 3: required i32 compressed_page_size
1190
1191 4: optional i32 crc
1193
1194 5: optional DataPageHeader data_page_header;
1196 6: optional IndexPageHeader index_page_header;
1197 7: optional DictionaryPageHeader dictionary_page_header;
1198 8: optional DataPageHeaderV2 data_page_header_v2;
1199}
1200);
1201
1202impl PageHeader {
1203 pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1207 where
1208 R: ThriftCompactInputProtocol<'a>,
1209 {
1210 let mut type_: Option<PageType> = None;
1211 let mut uncompressed_page_size: Option<i32> = None;
1212 let mut compressed_page_size: Option<i32> = None;
1213 let mut crc: Option<i32> = None;
1214 let mut data_page_header: Option<DataPageHeader> = None;
1215 let mut index_page_header: Option<IndexPageHeader> = None;
1216 let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1217 let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1218 let mut last_field_id = 0i16;
1219 loop {
1220 let field_ident = prot.read_field_begin(last_field_id)?;
1221 if field_ident.field_type == FieldType::Stop {
1222 break;
1223 }
1224 match field_ident.id {
1225 1 => {
1226 let val = PageType::read_thrift(&mut *prot)?;
1227 type_ = Some(val);
1228 }
1229 2 => {
1230 let val = i32::read_thrift(&mut *prot)?;
1231 uncompressed_page_size = Some(val);
1232 }
1233 3 => {
1234 let val = i32::read_thrift(&mut *prot)?;
1235 compressed_page_size = Some(val);
1236 }
1237 4 => {
1238 let val = i32::read_thrift(&mut *prot)?;
1239 crc = Some(val);
1240 }
1241 5 => {
1242 let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1243 data_page_header = Some(val);
1244 }
1245 6 => {
1246 let val = IndexPageHeader::read_thrift(&mut *prot)?;
1247 index_page_header = Some(val);
1248 }
1249 7 => {
1250 let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1251 dictionary_page_header = Some(val);
1252 }
1253 8 => {
1254 let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1255 data_page_header_v2 = Some(val);
1256 }
1257 _ => {
1258 prot.skip(field_ident.field_type)?;
1259 }
1260 };
1261 last_field_id = field_ident.id;
1262 }
1263 let Some(type_) = type_ else {
1264 return Err(general_err!("Required field type_ is missing"));
1265 };
1266 let Some(uncompressed_page_size) = uncompressed_page_size else {
1267 return Err(general_err!(
1268 "Required field uncompressed_page_size is missing"
1269 ));
1270 };
1271 let Some(compressed_page_size) = compressed_page_size else {
1272 return Err(general_err!(
1273 "Required field compressed_page_size is missing"
1274 ));
1275 };
1276 Ok(Self {
1277 r#type: type_,
1278 uncompressed_page_size,
1279 compressed_page_size,
1280 crc,
1281 data_page_header,
1282 index_page_header,
1283 dictionary_page_header,
1284 data_page_header_v2,
1285 })
1286 }
1287}
1288
1289#[cfg(feature = "encryption")]
1293fn should_write_column_stats(column_chunk: &ColumnChunkMetaData) -> bool {
1294 column_chunk.encrypted_column_metadata.is_none()
1298}
1299
1300#[cfg(not(feature = "encryption"))]
1301fn should_write_column_stats(_column_chunk: &ColumnChunkMetaData) -> bool {
1302 true
1303}
1304
1305pub(super) fn serialize_column_meta_data<W: Write>(
1326 column_chunk: &ColumnChunkMetaData,
1327 w: &mut ThriftCompactOutputProtocol<W>,
1328) -> Result<()> {
1329 use crate::file::statistics::page_stats_to_thrift;
1330
1331 column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1332 column_chunk
1333 .encodings()
1334 .collect::<Vec<_>>()
1335 .write_thrift_field(w, 2, 1)?;
1336 let path = column_chunk.column_descr.path().parts();
1337 let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1338 path.write_thrift_field(w, 3, 2)?;
1339 column_chunk.compression.write_thrift_field(w, 4, 3)?;
1340 column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1341 column_chunk
1342 .total_uncompressed_size
1343 .write_thrift_field(w, 6, 5)?;
1344 column_chunk
1345 .total_compressed_size
1346 .write_thrift_field(w, 7, 6)?;
1347 let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1349 if let Some(index_page_offset) = column_chunk.index_page_offset {
1350 last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1351 }
1352 if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1353 last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1354 }
1355
1356 if should_write_column_stats(column_chunk) {
1357 let stats = page_stats_to_thrift(column_chunk.statistics());
1359 if let Some(stats) = stats {
1360 last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1361 }
1362 if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1363 last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1364 }
1365 if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1366 last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1367 }
1368 if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1369 last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1370 }
1371
1372 let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1374 || column_chunk.repetition_level_histogram.is_some()
1375 || column_chunk.definition_level_histogram.is_some()
1376 {
1377 let repetition_level_histogram = column_chunk
1378 .repetition_level_histogram()
1379 .map(|hist| hist.clone().into_inner());
1380
1381 let definition_level_histogram = column_chunk
1382 .definition_level_histogram()
1383 .map(|hist| hist.clone().into_inner());
1384
1385 Some(SizeStatistics {
1386 unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1387 repetition_level_histogram,
1388 definition_level_histogram,
1389 })
1390 } else {
1391 None
1392 };
1393 if let Some(size_stats) = size_stats {
1394 last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1395 }
1396
1397 if let Some(geo_stats) = column_chunk.geo_statistics() {
1398 geo_stats.write_thrift_field(w, 17, last_field_id)?;
1399 }
1400 }
1401
1402 w.write_struct_end()
1403}
1404
1405pub(super) struct FileMeta<'a> {
1407 pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1408 pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1409}
1410
1411impl<'a> WriteThrift for FileMeta<'a> {
1423 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1424
1425 #[allow(unused_assignments)]
1427 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1428 self.file_metadata
1429 .version
1430 .write_thrift_field(writer, 1, 0)?;
1431
1432 let root = self.file_metadata.schema_descr().root_schema_ptr();
1435 let schema_len = num_nodes(&root)?;
1436 writer.write_field_begin(FieldType::List, 2, 1)?;
1437 writer.write_list_begin(ElementType::Struct, schema_len)?;
1438 write_schema(&root, writer)?;
1440
1441 self.file_metadata
1442 .num_rows
1443 .write_thrift_field(writer, 3, 2)?;
1444
1445 let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1447
1448 if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1449 last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1450 }
1451 if let Some(created_by) = self.file_metadata.created_by() {
1452 last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1453 }
1454 if let Some(column_orders) = self.file_metadata.column_orders() {
1455 last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1456 }
1457 #[cfg(feature = "encryption")]
1458 if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1459 last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1460 }
1461 #[cfg(feature = "encryption")]
1462 if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1463 key.as_slice()
1464 .write_thrift_field(writer, 9, last_field_id)?;
1465 }
1466
1467 writer.write_struct_end()
1468 }
1469}
1470
1471fn write_schema<W: Write>(
1472 schema: &TypePtr,
1473 writer: &mut ThriftCompactOutputProtocol<W>,
1474) -> Result<()> {
1475 if !schema.is_group() {
1476 return Err(general_err!("Root schema must be Group type"));
1477 }
1478 write_schema_helper(schema, writer)
1479}
1480
1481fn write_schema_helper<W: Write>(
1482 node: &TypePtr,
1483 writer: &mut ThriftCompactOutputProtocol<W>,
1484) -> Result<()> {
1485 match node.as_ref() {
1486 crate::schema::types::Type::PrimitiveType {
1487 basic_info,
1488 physical_type,
1489 type_length,
1490 scale,
1491 precision,
1492 } => {
1493 let element = SchemaElement {
1494 r#type: Some(*physical_type),
1495 type_length: if *type_length >= 0 {
1496 Some(*type_length)
1497 } else {
1498 None
1499 },
1500 repetition_type: Some(basic_info.repetition()),
1501 name: basic_info.name(),
1502 num_children: None,
1503 converted_type: match basic_info.converted_type() {
1504 ConvertedType::NONE => None,
1505 other => Some(other),
1506 },
1507 scale: if *scale >= 0 { Some(*scale) } else { None },
1508 precision: if *precision >= 0 {
1509 Some(*precision)
1510 } else {
1511 None
1512 },
1513 field_id: if basic_info.has_id() {
1514 Some(basic_info.id())
1515 } else {
1516 None
1517 },
1518 logical_type: basic_info.logical_type_ref().cloned(),
1519 };
1520 element.write_thrift(writer)
1521 }
1522 crate::schema::types::Type::GroupType { basic_info, fields } => {
1523 let repetition = if basic_info.has_repetition() {
1524 Some(basic_info.repetition())
1525 } else {
1526 None
1527 };
1528
1529 let element = SchemaElement {
1530 r#type: None,
1531 type_length: None,
1532 repetition_type: repetition,
1533 name: basic_info.name(),
1534 num_children: Some(fields.len().try_into()?),
1535 converted_type: match basic_info.converted_type() {
1536 ConvertedType::NONE => None,
1537 other => Some(other),
1538 },
1539 scale: None,
1540 precision: None,
1541 field_id: if basic_info.has_id() {
1542 Some(basic_info.id())
1543 } else {
1544 None
1545 },
1546 logical_type: basic_info.logical_type_ref().cloned(),
1547 };
1548
1549 element.write_thrift(writer)?;
1550
1551 for field in fields {
1553 write_schema_helper(field, writer)?;
1554 }
1555 Ok(())
1556 }
1557 }
1558}
1559
1560impl WriteThrift for RowGroupMetaData {
1570 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1571
1572 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1573 self.columns.write_thrift_field(writer, 1, 0)?;
1575 self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1576 let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1577 if let Some(sorting_columns) = self.sorting_columns() {
1578 last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1579 }
1580 if let Some(file_offset) = self.file_offset() {
1581 last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1582 }
1583 last_field_id = self
1585 .compressed_size()
1586 .write_thrift_field(writer, 6, last_field_id)?;
1587 if let Some(ordinal) = self.ordinal() {
1588 ordinal.write_thrift_field(writer, 7, last_field_id)?;
1589 }
1590 writer.write_struct_end()
1591 }
1592}
1593
1594impl WriteThrift for ColumnChunkMetaData {
1606 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1607
1608 #[allow(unused_assignments)]
1609 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1610 let mut last_field_id = 0i16;
1611 if let Some(file_path) = self.file_path() {
1612 last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1613 }
1614 last_field_id = self
1615 .file_offset()
1616 .write_thrift_field(writer, 2, last_field_id)?;
1617
1618 #[cfg(feature = "encryption")]
1619 let write_meta_data =
1620 self.encrypted_column_metadata.is_none() || self.plaintext_footer_mode;
1621 #[cfg(not(feature = "encryption"))]
1622 let write_meta_data = true;
1623
1624 if write_meta_data {
1630 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1631 serialize_column_meta_data(self, writer)?;
1632 last_field_id = 3;
1633 }
1634
1635 if let Some(offset_idx_off) = self.offset_index_offset() {
1636 last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1637 }
1638 if let Some(offset_idx_len) = self.offset_index_length() {
1639 last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1640 }
1641 if let Some(column_idx_off) = self.column_index_offset() {
1642 last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1643 }
1644 if let Some(column_idx_len) = self.column_index_length() {
1645 last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1646 }
1647 #[cfg(feature = "encryption")]
1648 {
1649 if let Some(crypto_metadata) = self.crypto_metadata() {
1650 last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1651 }
1652 if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1653 encrypted_meta
1654 .as_slice()
1655 .write_thrift_field(writer, 9, last_field_id)?;
1656 }
1657 }
1658
1659 writer.write_struct_end()
1660 }
1661}
1662
1663impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1668 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1669
1670 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1671 let mut last_field_id = 0i16;
1672 if let Some(bbox) = self.bounding_box() {
1673 last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1674 }
1675 if let Some(geo_types) = self.geospatial_types() {
1676 geo_types.write_thrift_field(writer, 2, last_field_id)?;
1677 }
1678
1679 writer.write_struct_end()
1680 }
1681}
1682
1683use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1685write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1686
1687impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1698 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1699
1700 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1701 self.get_xmin().write_thrift_field(writer, 1, 0)?;
1702 self.get_xmax().write_thrift_field(writer, 2, 1)?;
1703 self.get_ymin().write_thrift_field(writer, 3, 2)?;
1704 let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1705
1706 if let Some(zmin) = self.get_zmin() {
1707 last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1708 }
1709 if let Some(zmax) = self.get_zmax() {
1710 last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1711 }
1712 if let Some(mmin) = self.get_mmin() {
1713 last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1714 }
1715 if let Some(mmax) = self.get_mmax() {
1716 mmax.write_thrift_field(writer, 8, last_field_id)?;
1717 }
1718
1719 writer.write_struct_end()
1720 }
1721}
1722
1723use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1725write_thrift_field!(RustBoundingBox, FieldType::Struct);
1726
1727#[cfg(test)]
1728pub(crate) mod tests {
1729 use crate::basic::{Encoding, PageType, Type as PhysicalType};
1730 use crate::errors::Result;
1731 use crate::file::metadata::thrift::{
1732 BoundingBox, DataPageHeaderV2, DictionaryPageHeader, PageHeader, SchemaElement,
1733 write_schema,
1734 };
1735 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
1736 use crate::parquet_thrift::tests::test_roundtrip;
1737 use crate::parquet_thrift::{
1738 ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift,
1739 read_thrift_vec,
1740 };
1741 use crate::schema::types::{
1742 ColumnDescriptor, ColumnPath, SchemaDescriptor, TypePtr, num_nodes,
1743 parquet_schema_from_array,
1744 };
1745 use std::sync::Arc;
1746
1747 pub(crate) fn read_row_group(
1749 buf: &mut [u8],
1750 schema_descr: Arc<SchemaDescriptor>,
1751 ) -> Result<RowGroupMetaData> {
1752 let mut reader = ThriftSliceInputProtocol::new(buf);
1753 crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None)
1754 }
1755
1756 pub(crate) fn read_column_chunk(
1757 buf: &mut [u8],
1758 column_descr: Arc<ColumnDescriptor>,
1759 ) -> Result<ColumnChunkMetaData> {
1760 read_column_chunk_with_options(buf, column_descr, None)
1761 }
1762
1763 pub(crate) fn read_column_chunk_with_options(
1764 buf: &mut [u8],
1765 column_descr: Arc<ColumnDescriptor>,
1766 options: Option<&ParquetMetaDataOptions>,
1767 ) -> Result<ColumnChunkMetaData> {
1768 let mut reader = ThriftSliceInputProtocol::new(buf);
1769 crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
1770 }
1771
1772 pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1773 let num_nodes = num_nodes(&schema)?;
1774 let mut buf = Vec::new();
1775 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1776
1777 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1779
1780 write_schema(&schema, &mut writer)?;
1782
1783 let mut prot = ThriftSliceInputProtocol::new(&buf);
1784 let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1785 parquet_schema_from_array(se)
1786 }
1787
1788 pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1789 let num_nodes = num_nodes(schema)?;
1790 let mut buf = Vec::new();
1791 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1792
1793 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1795
1796 write_schema(schema, &mut writer)?;
1798 Ok(buf)
1799 }
1800
1801 pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1802 let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1803 read_thrift_vec(&mut prot)
1804 }
1805
1806 fn thrift_bytes<T: WriteThrift>(value: &T) -> Vec<u8> {
1807 let mut buf = Vec::new();
1808 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1809 value.write_thrift(&mut writer).unwrap();
1810 buf
1811 }
1812
1813 fn change_false_bool_field_to_i32(buf: &mut [u8]) {
1814 let pos = buf
1815 .iter()
1816 .rposition(|byte| *byte == 0x12)
1817 .expect("expected BOOL_FALSE field header byte");
1818 buf[pos] = 0x15;
1819 }
1820
1821 fn assert_malformed_bool_error(err: crate::errors::ParquetError) {
1822 let msg = err.to_string();
1823 assert!(
1824 msg.contains("Expected bool field"),
1825 "unexpected error message: {msg}"
1826 );
1827 }
1828
1829 #[test]
1830 fn test_bounding_box_roundtrip() {
1831 test_roundtrip(BoundingBox {
1832 xmin: 0.1.into(),
1833 xmax: 10.3.into(),
1834 ymin: 0.001.into(),
1835 ymax: 128.5.into(),
1836 zmin: None,
1837 zmax: None,
1838 mmin: None,
1839 mmax: None,
1840 });
1841
1842 test_roundtrip(BoundingBox {
1843 xmin: 0.1.into(),
1844 xmax: 10.3.into(),
1845 ymin: 0.001.into(),
1846 ymax: 128.5.into(),
1847 zmin: Some(11.0.into()),
1848 zmax: Some(1300.0.into()),
1849 mmin: None,
1850 mmax: None,
1851 });
1852
1853 test_roundtrip(BoundingBox {
1854 xmin: 0.1.into(),
1855 xmax: 10.3.into(),
1856 ymin: 0.001.into(),
1857 ymax: 128.5.into(),
1858 zmin: Some(11.0.into()),
1859 zmax: Some(1300.0.into()),
1860 mmin: Some(3.7.into()),
1861 mmax: Some(42.0.into()),
1862 });
1863 }
1864
1865 #[test]
1866 fn test_convert_stats_preserves_missing_null_count() {
1867 let primitive =
1868 crate::schema::types::Type::primitive_type_builder("col", PhysicalType::INT32)
1869 .build()
1870 .unwrap();
1871 let column_descr = Arc::new(ColumnDescriptor::new(
1872 Arc::new(primitive),
1873 0,
1874 0,
1875 ColumnPath::new(vec![]),
1876 ));
1877
1878 let none_null_count = super::Statistics {
1879 max: None,
1880 min: None,
1881 null_count: None,
1882 distinct_count: None,
1883 max_value: None,
1884 min_value: None,
1885 is_max_value_exact: None,
1886 is_min_value_exact: None,
1887 };
1888 let decoded_none = super::convert_stats(&column_descr, Some(none_null_count))
1889 .unwrap()
1890 .unwrap();
1891 assert_eq!(decoded_none.null_count_opt(), None);
1892
1893 let zero_null_count = super::Statistics {
1894 max: None,
1895 min: None,
1896 null_count: Some(0),
1897 distinct_count: None,
1898 max_value: None,
1899 min_value: None,
1900 is_max_value_exact: None,
1901 is_min_value_exact: None,
1902 };
1903 let decoded_zero = super::convert_stats(&column_descr, Some(zero_null_count))
1904 .unwrap()
1905 .unwrap();
1906 assert_eq!(decoded_zero.null_count_opt(), Some(0));
1907 }
1908
1909 #[test]
1910 fn malformed_bool_field_returns_error_not_panic() {
1911 let page_header = PageHeader {
1912 r#type: PageType::DICTIONARY_PAGE,
1913 uncompressed_page_size: 1,
1914 compressed_page_size: 1,
1915 crc: None,
1916 data_page_header: None,
1917 index_page_header: None,
1918 dictionary_page_header: Some(DictionaryPageHeader {
1919 num_values: 1,
1920 encoding: Encoding::PLAIN,
1921 is_sorted: Some(false),
1922 }),
1923 data_page_header_v2: None,
1924 };
1925
1926 let mut buf = thrift_bytes(&page_header);
1927 change_false_bool_field_to_i32(&mut buf);
1928
1929 let mut prot = ThriftSliceInputProtocol::new(&buf);
1930 let err = PageHeader::read_thrift_without_stats(&mut prot)
1931 .expect_err("malformed bool field should return an error");
1932 assert_malformed_bool_error(err);
1933 }
1934
1935 #[test]
1936 fn malformed_data_page_v2_bool_field_returns_error_not_panic() {
1937 let data_page_header_v2 = DataPageHeaderV2 {
1938 num_values: 1,
1939 num_nulls: 0,
1940 num_rows: 1,
1941 encoding: Encoding::PLAIN,
1942 definition_levels_byte_length: 0,
1943 repetition_levels_byte_length: 0,
1944 is_compressed: Some(false),
1945 statistics: None,
1946 };
1947
1948 let mut buf = thrift_bytes(&data_page_header_v2);
1949 change_false_bool_field_to_i32(&mut buf);
1950
1951 let mut prot = ThriftSliceInputProtocol::new(&buf);
1952 let err = DataPageHeaderV2::read_thrift_without_stats(&mut prot)
1953 .expect_err("malformed bool field should return an error");
1954 assert_malformed_bool_error(err);
1955 }
1956}