1use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34 column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37 basic::{
38 ColumnOrder, CompressionCodec, ConvertedType, Encoding, EncodingMask, LogicalType,
39 PageType, Repetition, Type,
40 },
41 data_type::{ByteArray, FixedLenByteArray, Int96},
42 errors::{ParquetError, Result},
43 file::{
44 metadata::{
45 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46 PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, ParquetPageEncodingStats,
47 RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
48 },
49 statistics::ValueStatistics,
50 },
51 parquet_thrift::{
52 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53 ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54 read_thrift_vec, validate_list_type,
55 },
56 schema::types::{
57 ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58 },
59 thrift_struct,
60 util::bit_util::FromBytes,
61 write_thrift_field,
62};
63
64thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67 1: optional Type r#type;
69 2: optional i32 type_length;
74 3: optional Repetition repetition_type;
77 4: required string<'a> name;
79 5: optional i32 num_children;
84 6: optional ConvertedType converted_type;
89 7: optional i32 scale
94 8: optional i32 precision
95 9: optional i32 field_id;
98 10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108 1: optional binary<'a> max;
109 2: optional binary<'a> min;
110 3: optional i64 null_count;
111 4: optional i64 distinct_count;
112 5: optional binary<'a> max_value;
113 6: optional binary<'a> min_value;
114 7: optional bool is_max_value_exact;
115 8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121 1: required double xmin;
122 2: required double xmax;
123 3: required double ymin;
124 4: required double ymax;
125 5: optional double zmin;
126 6: optional double zmax;
127 7: optional double mmin;
128 8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134 1: optional BoundingBox bbox;
135 2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141 1: optional i64 unencoded_byte_array_data_bytes;
142 2: optional list<i64> repetition_level_histogram;
143 3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148 stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150 stats.map(|st| {
151 let bbox = convert_bounding_box(st.bbox);
152 let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153 Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154 bbox,
155 geospatial_types,
156 ))
157 })
158}
159
160fn convert_bounding_box(
161 bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163 bbox.map(|bb| {
164 let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165 bb.xmin.into(),
166 bb.xmax.into(),
167 bb.ymin.into(),
168 bb.ymax.into(),
169 );
170
171 newbb = match (bb.zmin, bb.zmax) {
172 (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173 _ => newbb,
175 };
176
177 newbb = match (bb.mmin, bb.mmax) {
178 (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179 _ => newbb,
181 };
182
183 newbb
184 })
185}
186
187fn convert_stats(
189 column_descr: &Arc<ColumnDescriptor>,
190 thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192 use crate::file::statistics::Statistics as FStatistics;
193 Ok(match thrift_stats {
194 Some(stats) => {
195 let null_count = stats
197 .null_count
198 .map(|null_count| {
199 if null_count < 0 {
200 return Err(general_err!(
201 "Statistics null count is negative {}",
202 null_count
203 ));
204 }
205 Ok(null_count as u64)
206 })
207 .transpose()?;
208 let distinct_count = stats.distinct_count.map(|value| value as u64);
210 let old_format = stats.min_value.is_none() && stats.max_value.is_none();
212 let min = if old_format {
214 stats.min
215 } else {
216 stats.min_value
217 };
218 let max = if old_format {
220 stats.max
221 } else {
222 stats.max_value
223 };
224
225 fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
226 if let Some(min) = min {
227 if min.len() < len {
228 return Err(general_err!("Insufficient bytes to parse min statistic",));
229 }
230 }
231 if let Some(max) = max {
232 if max.len() < len {
233 return Err(general_err!("Insufficient bytes to parse max statistic",));
234 }
235 }
236 Ok(())
237 }
238
239 let physical_type = column_descr.physical_type();
240 match physical_type {
241 Type::BOOLEAN => check_len(&min, &max, 1),
242 Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
243 Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
244 Type::INT96 => check_len(&min, &max, 12),
245 _ => Ok(()),
246 }?;
247
248 let res = match physical_type {
253 Type::BOOLEAN => FStatistics::boolean(
254 min.map(|data| data[0] != 0),
255 max.map(|data| data[0] != 0),
256 distinct_count,
257 null_count,
258 old_format,
259 ),
260 Type::INT32 => FStatistics::int32(
261 min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
262 max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263 distinct_count,
264 null_count,
265 old_format,
266 ),
267 Type::INT64 => FStatistics::int64(
268 min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
269 max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270 distinct_count,
271 null_count,
272 old_format,
273 ),
274 Type::INT96 => {
275 let min = if let Some(data) = min {
277 if data.len() != 12 {
278 return Err(general_err!("Incorrect Int96 min statistics"));
279 }
280 Some(Int96::try_from_le_slice(data)?)
281 } else {
282 None
283 };
284 let max = if let Some(data) = max {
285 if data.len() != 12 {
286 return Err(general_err!("Incorrect Int96 max statistics"));
287 }
288 Some(Int96::try_from_le_slice(data)?)
289 } else {
290 None
291 };
292 FStatistics::int96(min, max, distinct_count, null_count, old_format)
293 }
294 Type::FLOAT => FStatistics::float(
295 min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
296 max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
297 distinct_count,
298 null_count,
299 old_format,
300 ),
301 Type::DOUBLE => FStatistics::double(
302 min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
303 max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
304 distinct_count,
305 null_count,
306 old_format,
307 ),
308 Type::BYTE_ARRAY => FStatistics::ByteArray(
309 ValueStatistics::new(
310 min.map(ByteArray::from),
311 max.map(ByteArray::from),
312 distinct_count,
313 null_count,
314 old_format,
315 )
316 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
317 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
318 ),
319 Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
320 ValueStatistics::new(
321 min.map(ByteArray::from).map(FixedLenByteArray::from),
322 max.map(ByteArray::from).map(FixedLenByteArray::from),
323 distinct_count,
324 null_count,
325 old_format,
326 )
327 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
328 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
329 ),
330 };
331
332 Some(res)
333 }
334 None => None,
335 })
336}
337
338const COL_META_TYPE: u16 = 1 << 1;
340const COL_META_ENCODINGS: u16 = 1 << 2;
341const COL_META_CODEC: u16 = 1 << 4;
342const COL_META_NUM_VALUES: u16 = 1 << 5;
343const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
344const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
345const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
346
347const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
349 | COL_META_ENCODINGS
350 | COL_META_CODEC
351 | COL_META_NUM_VALUES
352 | COL_META_TOTAL_UNCOMP_SZ
353 | COL_META_TOTAL_COMP_SZ
354 | COL_META_DATA_PAGE_OFFSET;
355
356fn validate_column_metadata(mask: u16) -> Result<()> {
359 if mask != COL_META_ALL_REQUIRED {
360 if mask & COL_META_ENCODINGS == 0 {
361 return Err(general_err!("Required field encodings is missing"));
362 }
363
364 if mask & COL_META_CODEC == 0 {
365 return Err(general_err!("Required field codec is missing"));
366 }
367 if mask & COL_META_NUM_VALUES == 0 {
368 return Err(general_err!("Required field num_values is missing"));
369 }
370 if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
371 return Err(general_err!(
372 "Required field total_uncompressed_size is missing"
373 ));
374 }
375 if mask & COL_META_TOTAL_COMP_SZ == 0 {
376 return Err(general_err!(
377 "Required field total_compressed_size is missing"
378 ));
379 }
380 if mask & COL_META_DATA_PAGE_OFFSET == 0 {
381 return Err(general_err!("Required field data_page_offset is missing"));
382 }
383 }
384
385 Ok(())
386}
387
388fn read_encoding_stats_as_mask<'a>(
389 prot: &mut ThriftSliceInputProtocol<'a>,
390) -> Result<EncodingMask> {
391 let mut mask = 0i32;
393 let list_ident = prot.read_list_begin()?;
394 validate_list_type(ElementType::Struct, &list_ident)?;
396
397 for _ in 0..list_ident.size {
398 let pes = PageEncodingStats::read_thrift(prot)?;
399 match pes.page_type {
400 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => mask |= 1 << pes.encoding as i32,
401 _ => {}
402 }
403 }
404 EncodingMask::try_new(mask)
405}
406
407fn read_column_metadata<'a>(
410 prot: &mut ThriftSliceInputProtocol<'a>,
411 column: &mut ColumnChunkMetaData,
412 col_index: usize,
413 options: Option<&ParquetMetaDataOptions>,
414) -> Result<u16> {
415 let mut seen_mask = 0u16;
417
418 let mut skip_pes = false;
419 let mut pes_mask = true;
420 let mut skip_col_stats = false;
421 let mut skip_size_stats = false;
422
423 if let Some(opts) = options {
424 skip_pes = opts.skip_encoding_stats(col_index);
425 pes_mask = opts.encoding_stats_as_mask();
426 skip_col_stats = opts.skip_column_stats(col_index);
427 skip_size_stats = opts.skip_size_stats(col_index);
428 }
429
430 let column_descr = &column.column_descr;
450
451 let mut last_field_id = 0i16;
452 loop {
453 let field_ident = prot.read_field_begin(last_field_id)?;
454 if field_ident.field_type == FieldType::Stop {
455 break;
456 }
457 match field_ident.id {
458 1 => {
460 Type::read_thrift(&mut *prot)?;
462 seen_mask |= COL_META_TYPE;
463 }
464 2 => {
465 column.encodings = EncodingMask::read_thrift(&mut *prot)?;
466 seen_mask |= COL_META_ENCODINGS;
467 }
468 4 => {
470 column.compression = CompressionCodec::read_thrift(&mut *prot)?;
471 seen_mask |= COL_META_CODEC;
472 }
473 5 => {
474 column.num_values = i64::read_thrift(&mut *prot)?;
475 seen_mask |= COL_META_NUM_VALUES;
476 }
477 6 => {
478 column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
479 seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
480 }
481 7 => {
482 column.total_compressed_size = i64::read_thrift(&mut *prot)?;
483 seen_mask |= COL_META_TOTAL_COMP_SZ;
484 }
485 9 => {
487 column.data_page_offset = i64::read_thrift(&mut *prot)?;
488 seen_mask |= COL_META_DATA_PAGE_OFFSET;
489 }
490 10 => {
491 column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
492 }
493 11 => {
494 column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
495 }
496 12 if !skip_col_stats => {
497 column.statistics =
498 convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
499 }
500 13 if !skip_pes => {
501 if pes_mask {
502 let val = read_encoding_stats_as_mask(&mut *prot)?;
503 column.encoding_stats = Some(ParquetPageEncodingStats::Mask(val));
504 } else {
505 let val =
506 read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
507 column.encoding_stats = Some(ParquetPageEncodingStats::Full(val));
508 }
509 }
510 14 => {
511 column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
512 }
513 15 => {
514 column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
515 }
516 16 if !skip_size_stats => {
517 let val = SizeStatistics::read_thrift(&mut *prot)?;
518 column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
519 column.repetition_level_histogram =
520 val.repetition_level_histogram.map(LevelHistogram::from);
521 column.definition_level_histogram =
522 val.definition_level_histogram.map(LevelHistogram::from);
523 }
524 17 => {
525 let val = GeospatialStatistics::read_thrift(&mut *prot)?;
526 column.geo_statistics = convert_geo_stats(Some(val));
527 }
528 _ => {
529 prot.skip(field_ident.field_type)?;
530 }
531 };
532 last_field_id = field_ident.id;
533 }
534
535 Ok(seen_mask)
536}
537
538fn read_column_chunk<'a>(
541 prot: &mut ThriftSliceInputProtocol<'a>,
542 column_descr: &Arc<ColumnDescriptor>,
543 col_index: usize,
544 options: Option<&ParquetMetaDataOptions>,
545) -> Result<ColumnChunkMetaData> {
546 let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
548
549 let mut has_file_offset = false;
551
552 let mut col_meta_mask = 0u16;
554
555 let mut last_field_id = 0i16;
567 loop {
568 let field_ident = prot.read_field_begin(last_field_id)?;
569 if field_ident.field_type == FieldType::Stop {
570 break;
571 }
572 match field_ident.id {
573 1 => {
574 col.file_path = Some(String::read_thrift(&mut *prot)?);
575 }
576 2 => {
577 col.file_offset = i64::read_thrift(&mut *prot)?;
578 has_file_offset = true;
579 }
580 3 => {
581 col_meta_mask = read_column_metadata(&mut *prot, &mut col, col_index, options)?;
582 }
583 4 => {
584 col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
585 }
586 5 => {
587 col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
588 }
589 6 => {
590 col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
591 }
592 7 => {
593 col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
594 }
595 #[cfg(feature = "encryption")]
596 8 => {
597 let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
598 col.column_crypto_metadata = Some(Box::new(val));
599 }
600 #[cfg(feature = "encryption")]
601 9 => {
602 col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
603 }
604 _ => {
605 prot.skip(field_ident.field_type)?;
606 }
607 };
608 last_field_id = field_ident.id;
609 }
610
611 if !has_file_offset {
613 return Err(general_err!("Required field file_offset is missing"));
614 };
615
616 #[cfg(feature = "encryption")]
618 if col.encrypted_column_metadata.is_some() {
619 return Ok(col);
620 }
621
622 validate_column_metadata(col_meta_mask)?;
624
625 Ok(col)
626}
627
628fn read_row_group(
629 prot: &mut ThriftSliceInputProtocol,
630 schema_descr: &Arc<SchemaDescriptor>,
631 options: Option<&ParquetMetaDataOptions>,
632) -> Result<RowGroupMetaData> {
633 let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
635
636 const RG_COLUMNS: u8 = 1 << 1;
638 const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
639 const RG_NUM_ROWS: u8 = 1 << 3;
640 const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
641
642 let mut mask = 0u8;
643
644 let mut last_field_id = 0i16;
654 loop {
655 let field_ident = prot.read_field_begin(last_field_id)?;
656 if field_ident.field_type == FieldType::Stop {
657 break;
658 }
659 match field_ident.id {
660 1 => {
661 let list_ident = prot.read_list_begin()?;
662 validate_list_type(ElementType::Struct, &list_ident)?;
664 if schema_descr.num_columns() != list_ident.size as usize {
665 return Err(general_err!(
666 "Column count mismatch. Schema has {} columns while Row Group has {}",
667 schema_descr.num_columns(),
668 list_ident.size
669 ));
670 }
671 for i in 0..list_ident.size as usize {
672 let col = read_column_chunk(prot, &schema_descr.columns()[i], i, options)?;
673 row_group.columns.push(col);
674 }
675 mask |= RG_COLUMNS;
676 }
677 2 => {
678 row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
679 mask |= RG_TOT_BYTE_SIZE;
680 }
681 3 => {
682 row_group.num_rows = i64::read_thrift(&mut *prot)?;
683 mask |= RG_NUM_ROWS;
684 }
685 4 => {
686 let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
687 row_group.sorting_columns = Some(val);
688 }
689 5 => {
690 row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
691 }
692 7 => {
694 row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
695 }
696 _ => {
697 prot.skip(field_ident.field_type)?;
698 }
699 };
700 last_field_id = field_ident.id;
701 }
702
703 if mask != RG_ALL_REQUIRED {
704 if mask & RG_COLUMNS == 0 {
705 return Err(general_err!("Required field columns is missing"));
706 }
707 if mask & RG_TOT_BYTE_SIZE == 0 {
708 return Err(general_err!("Required field total_byte_size is missing"));
709 }
710 if mask & RG_NUM_ROWS == 0 {
711 return Err(general_err!("Required field num_rows is missing"));
712 }
713 }
714
715 Ok(row_group)
716}
717
718pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
721 let mut prot = ThriftSliceInputProtocol::new(buf);
722
723 let mut last_field_id = 0i16;
724 loop {
725 let field_ident = prot.read_field_begin(last_field_id)?;
726 if field_ident.field_type == FieldType::Stop {
727 break;
728 }
729 match field_ident.id {
730 2 => {
731 let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
733 let val = parquet_schema_from_array(val)?;
734 return Ok(SchemaDescriptor::new(val));
735 }
736 _ => prot.skip(field_ident.field_type)?,
737 }
738 last_field_id = field_ident.id;
739 }
740 Err(general_err!("Input does not contain a schema"))
741}
742
743pub(crate) fn parquet_metadata_from_bytes(
746 buf: &[u8],
747 options: Option<&ParquetMetaDataOptions>,
748) -> Result<ParquetMetaData> {
749 let mut prot = ThriftSliceInputProtocol::new(buf);
750
751 let mut version: Option<i32> = None;
753 let mut num_rows: Option<i64> = None;
754 let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
755 let mut key_value_metadata: Option<Vec<KeyValue>> = None;
756 let mut created_by: Option<&str> = None;
757 let mut column_orders: Option<Vec<ColumnOrder>> = None;
758 #[cfg(feature = "encryption")]
759 let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
760 #[cfg(feature = "encryption")]
761 let mut footer_signing_key_metadata: Option<&[u8]> = None;
762
763 let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
765
766 if let Some(options) = options {
768 schema_descr = options.schema().cloned();
769 }
770
771 let mut last_field_id = 0i16;
783 loop {
784 let field_ident = prot.read_field_begin(last_field_id)?;
785 if field_ident.field_type == FieldType::Stop {
786 break;
787 }
788 match field_ident.id {
789 1 => {
790 version = Some(i32::read_thrift(&mut prot)?);
791 }
792 2 => {
793 if schema_descr.is_some() {
795 prot.skip(field_ident.field_type)?;
796 } else {
797 let val =
799 read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
800 let val = parquet_schema_from_array(val)?;
801 schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
802 }
803 }
804 3 => {
805 num_rows = Some(i64::read_thrift(&mut prot)?);
806 }
807 4 => {
808 if schema_descr.is_none() {
809 return Err(general_err!("Required field schema is missing"));
810 }
811 let schema_descr = schema_descr.as_ref().unwrap();
812 let list_ident = prot.read_list_begin()?;
813 validate_list_type(ElementType::Struct, &list_ident)?;
815 let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
816
817 let mut assigner = OrdinalAssigner::new();
819 for ordinal in 0..list_ident.size {
820 let ordinal: i16 = ordinal.try_into().map_err(|_| {
821 ParquetError::General(format!(
822 "Row group ordinal {ordinal} exceeds i16 max value",
823 ))
824 })?;
825 let rg = read_row_group(&mut prot, schema_descr, options)?;
826 rg_vec.push(assigner.ensure(ordinal, rg)?);
827 }
828 row_groups = Some(rg_vec);
829 }
830 5 => {
831 let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
832 key_value_metadata = Some(val);
833 }
834 6 => {
835 created_by = Some(<&str>::read_thrift(&mut prot)?);
836 }
837 7 => {
838 let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
839 column_orders = Some(val);
840 }
841 #[cfg(feature = "encryption")]
842 8 => {
843 let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
844 encryption_algorithm = Some(val);
845 }
846 #[cfg(feature = "encryption")]
847 9 => {
848 footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
849 }
850 _ => {
851 prot.skip(field_ident.field_type)?;
852 }
853 };
854 last_field_id = field_ident.id;
855 }
856 let Some(version) = version else {
857 return Err(general_err!("Required field version is missing"));
858 };
859 let Some(num_rows) = num_rows else {
860 return Err(general_err!("Required field num_rows is missing"));
861 };
862 let Some(row_groups) = row_groups else {
863 return Err(general_err!("Required field row_groups is missing"));
864 };
865
866 let created_by = created_by.map(|c| c.to_owned());
867
868 let schema_descr = schema_descr.unwrap();
870
871 if column_orders
873 .as_ref()
874 .is_some_and(|cos| cos.len() != schema_descr.num_columns())
875 {
876 return Err(general_err!("Column order length mismatch"));
877 }
878 let column_orders = column_orders.map(|mut cos| {
881 for (i, column) in schema_descr.columns().iter().enumerate() {
882 if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
883 let sort_order = ColumnOrder::sort_order_for_type(
884 column.logical_type_ref(),
885 column.converted_type(),
886 column.physical_type(),
887 );
888 cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
889 }
890 }
891 cos
892 });
893
894 #[cfg(not(feature = "encryption"))]
895 let fmd = crate::file::metadata::FileMetaData::new(
896 version,
897 num_rows,
898 created_by,
899 key_value_metadata,
900 schema_descr,
901 column_orders,
902 );
903 #[cfg(feature = "encryption")]
904 let fmd = crate::file::metadata::FileMetaData::new(
905 version,
906 num_rows,
907 created_by,
908 key_value_metadata,
909 schema_descr,
910 column_orders,
911 )
912 .with_encryption_algorithm(encryption_algorithm)
913 .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
914
915 Ok(ParquetMetaData::new(fmd, row_groups))
916}
917
918#[derive(Debug, Default)]
920pub(crate) struct OrdinalAssigner {
921 first_has_ordinal: Option<bool>,
922}
923
924impl OrdinalAssigner {
925 fn new() -> Self {
926 Default::default()
927 }
928
929 fn ensure(
942 &mut self,
943 actual_ordinal: i16,
944 mut rg: RowGroupMetaData,
945 ) -> Result<RowGroupMetaData> {
946 let rg_has_ordinal = rg.ordinal.is_some();
947
948 if self.first_has_ordinal.is_none() {
950 self.first_has_ordinal = Some(rg_has_ordinal);
951 }
952
953 let first_has_ordinal = self.first_has_ordinal.unwrap();
955 if !first_has_ordinal && !rg_has_ordinal {
956 rg.ordinal = Some(actual_ordinal);
957 } else if first_has_ordinal != rg_has_ordinal {
958 return Err(general_err!(
959 "Inconsistent ordinal assignment: first_has_ordinal is set to \
960 {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
961 first_has_ordinal,
962 actual_ordinal,
963 rg_has_ordinal
964 ));
965 }
966 Ok(rg)
967 }
968}
969
970thrift_struct!(
971 pub(crate) struct IndexPageHeader {}
972);
973
974thrift_struct!(
975pub(crate) struct DictionaryPageHeader {
976 1: required i32 num_values;
978
979 2: required Encoding encoding
981
982 3: optional bool is_sorted;
984}
985);
986
987thrift_struct!(
988pub(crate) struct PageStatistics {
996 1: optional binary max;
997 2: optional binary min;
998 3: optional i64 null_count;
999 4: optional i64 distinct_count;
1000 5: optional binary max_value;
1001 6: optional binary min_value;
1002 7: optional bool is_max_value_exact;
1003 8: optional bool is_min_value_exact;
1004}
1005);
1006
1007thrift_struct!(
1008pub(crate) struct DataPageHeader {
1009 1: required i32 num_values
1010 2: required Encoding encoding
1011 3: required Encoding definition_level_encoding;
1012 4: required Encoding repetition_level_encoding;
1013 5: optional PageStatistics statistics;
1014}
1015);
1016
1017impl DataPageHeader {
1018 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1020 where
1021 R: ThriftCompactInputProtocol<'a>,
1022 {
1023 let mut num_values: Option<i32> = None;
1024 let mut encoding: Option<Encoding> = None;
1025 let mut definition_level_encoding: Option<Encoding> = None;
1026 let mut repetition_level_encoding: Option<Encoding> = None;
1027 let statistics: Option<PageStatistics> = None;
1028 let mut last_field_id = 0i16;
1029 loop {
1030 let field_ident = prot.read_field_begin(last_field_id)?;
1031 if field_ident.field_type == FieldType::Stop {
1032 break;
1033 }
1034 match field_ident.id {
1035 1 => {
1036 let val = i32::read_thrift(&mut *prot)?;
1037 num_values = Some(val);
1038 }
1039 2 => {
1040 let val = Encoding::read_thrift(&mut *prot)?;
1041 encoding = Some(val);
1042 }
1043 3 => {
1044 let val = Encoding::read_thrift(&mut *prot)?;
1045 definition_level_encoding = Some(val);
1046 }
1047 4 => {
1048 let val = Encoding::read_thrift(&mut *prot)?;
1049 repetition_level_encoding = Some(val);
1050 }
1051 _ => {
1052 prot.skip(field_ident.field_type)?;
1053 }
1054 };
1055 last_field_id = field_ident.id;
1056 }
1057 let Some(num_values) = num_values else {
1058 return Err(general_err!("Required field num_values is missing"));
1059 };
1060 let Some(encoding) = encoding else {
1061 return Err(general_err!("Required field encoding is missing"));
1062 };
1063 let Some(definition_level_encoding) = definition_level_encoding else {
1064 return Err(general_err!(
1065 "Required field definition_level_encoding is missing"
1066 ));
1067 };
1068 let Some(repetition_level_encoding) = repetition_level_encoding else {
1069 return Err(general_err!(
1070 "Required field repetition_level_encoding is missing"
1071 ));
1072 };
1073 Ok(Self {
1074 num_values,
1075 encoding,
1076 definition_level_encoding,
1077 repetition_level_encoding,
1078 statistics,
1079 })
1080 }
1081}
1082
1083thrift_struct!(
1084pub(crate) struct DataPageHeaderV2 {
1085 1: required i32 num_values
1086 2: required i32 num_nulls
1087 3: required i32 num_rows
1088 4: required Encoding encoding
1089 5: required i32 definition_levels_byte_length;
1090 6: required i32 repetition_levels_byte_length;
1091 7: optional bool is_compressed = true;
1092 8: optional PageStatistics statistics;
1093}
1094);
1095
1096impl DataPageHeaderV2 {
1097 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1099 where
1100 R: ThriftCompactInputProtocol<'a>,
1101 {
1102 let mut num_values: Option<i32> = None;
1103 let mut num_nulls: Option<i32> = None;
1104 let mut num_rows: Option<i32> = None;
1105 let mut encoding: Option<Encoding> = None;
1106 let mut definition_levels_byte_length: Option<i32> = None;
1107 let mut repetition_levels_byte_length: Option<i32> = None;
1108 let mut is_compressed: Option<bool> = None;
1109 let statistics: Option<PageStatistics> = None;
1110 let mut last_field_id = 0i16;
1111 loop {
1112 let field_ident = prot.read_field_begin(last_field_id)?;
1113 if field_ident.field_type == FieldType::Stop {
1114 break;
1115 }
1116 match field_ident.id {
1117 1 => {
1118 let val = i32::read_thrift(&mut *prot)?;
1119 num_values = Some(val);
1120 }
1121 2 => {
1122 let val = i32::read_thrift(&mut *prot)?;
1123 num_nulls = Some(val);
1124 }
1125 3 => {
1126 let val = i32::read_thrift(&mut *prot)?;
1127 num_rows = Some(val);
1128 }
1129 4 => {
1130 let val = Encoding::read_thrift(&mut *prot)?;
1131 encoding = Some(val);
1132 }
1133 5 => {
1134 let val = i32::read_thrift(&mut *prot)?;
1135 definition_levels_byte_length = Some(val);
1136 }
1137 6 => {
1138 let val = i32::read_thrift(&mut *prot)?;
1139 repetition_levels_byte_length = Some(val);
1140 }
1141 7 => {
1142 is_compressed = Some(field_ident.bool_val()?);
1143 }
1144 _ => {
1145 prot.skip(field_ident.field_type)?;
1146 }
1147 };
1148 last_field_id = field_ident.id;
1149 }
1150 let Some(num_values) = num_values else {
1151 return Err(general_err!("Required field num_values is missing"));
1152 };
1153 let Some(num_nulls) = num_nulls else {
1154 return Err(general_err!("Required field num_nulls is missing"));
1155 };
1156 let Some(num_rows) = num_rows else {
1157 return Err(general_err!("Required field num_rows is missing"));
1158 };
1159 let Some(encoding) = encoding else {
1160 return Err(general_err!("Required field encoding is missing"));
1161 };
1162 let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1163 return Err(general_err!(
1164 "Required field definition_levels_byte_length is missing"
1165 ));
1166 };
1167 let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1168 return Err(general_err!(
1169 "Required field repetition_levels_byte_length is missing"
1170 ));
1171 };
1172 Ok(Self {
1173 num_values,
1174 num_nulls,
1175 num_rows,
1176 encoding,
1177 definition_levels_byte_length,
1178 repetition_levels_byte_length,
1179 is_compressed,
1180 statistics,
1181 })
1182 }
1183}
1184
1185thrift_struct!(
1186pub(crate) struct PageHeader {
1187 1: required PageType r#type
1189
1190 2: required i32 uncompressed_page_size
1192
1193 3: required i32 compressed_page_size
1195
1196 4: optional i32 crc
1198
1199 5: optional DataPageHeader data_page_header;
1201 6: optional IndexPageHeader index_page_header;
1202 7: optional DictionaryPageHeader dictionary_page_header;
1203 8: optional DataPageHeaderV2 data_page_header_v2;
1204}
1205);
1206
1207impl PageHeader {
1208 pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1212 where
1213 R: ThriftCompactInputProtocol<'a>,
1214 {
1215 let mut type_: Option<PageType> = None;
1216 let mut uncompressed_page_size: Option<i32> = None;
1217 let mut compressed_page_size: Option<i32> = None;
1218 let mut crc: Option<i32> = None;
1219 let mut data_page_header: Option<DataPageHeader> = None;
1220 let mut index_page_header: Option<IndexPageHeader> = None;
1221 let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1222 let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1223 let mut last_field_id = 0i16;
1224 loop {
1225 let field_ident = prot.read_field_begin(last_field_id)?;
1226 if field_ident.field_type == FieldType::Stop {
1227 break;
1228 }
1229 match field_ident.id {
1230 1 => {
1231 let val = PageType::read_thrift(&mut *prot)?;
1232 type_ = Some(val);
1233 }
1234 2 => {
1235 let val = i32::read_thrift(&mut *prot)?;
1236 uncompressed_page_size = Some(val);
1237 }
1238 3 => {
1239 let val = i32::read_thrift(&mut *prot)?;
1240 compressed_page_size = Some(val);
1241 }
1242 4 => {
1243 let val = i32::read_thrift(&mut *prot)?;
1244 crc = Some(val);
1245 }
1246 5 => {
1247 let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1248 data_page_header = Some(val);
1249 }
1250 6 => {
1251 let val = IndexPageHeader::read_thrift(&mut *prot)?;
1252 index_page_header = Some(val);
1253 }
1254 7 => {
1255 let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1256 dictionary_page_header = Some(val);
1257 }
1258 8 => {
1259 let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1260 data_page_header_v2 = Some(val);
1261 }
1262 _ => {
1263 prot.skip(field_ident.field_type)?;
1264 }
1265 };
1266 last_field_id = field_ident.id;
1267 }
1268 let Some(type_) = type_ else {
1269 return Err(general_err!("Required field type_ is missing"));
1270 };
1271 let Some(uncompressed_page_size) = uncompressed_page_size else {
1272 return Err(general_err!(
1273 "Required field uncompressed_page_size is missing"
1274 ));
1275 };
1276 let Some(compressed_page_size) = compressed_page_size else {
1277 return Err(general_err!(
1278 "Required field compressed_page_size is missing"
1279 ));
1280 };
1281 Ok(Self {
1282 r#type: type_,
1283 uncompressed_page_size,
1284 compressed_page_size,
1285 crc,
1286 data_page_header,
1287 index_page_header,
1288 dictionary_page_header,
1289 data_page_header_v2,
1290 })
1291 }
1292}
1293
1294#[cfg(feature = "encryption")]
1298fn should_write_column_stats(column_chunk: &ColumnChunkMetaData) -> bool {
1299 column_chunk.encrypted_column_metadata.is_none()
1303}
1304
1305#[cfg(not(feature = "encryption"))]
1306fn should_write_column_stats(_column_chunk: &ColumnChunkMetaData) -> bool {
1307 true
1308}
1309
1310pub(super) fn serialize_column_meta_data<W: Write>(
1331 column_chunk: &ColumnChunkMetaData,
1332 w: &mut ThriftCompactOutputProtocol<W>,
1333) -> Result<()> {
1334 use crate::file::statistics::page_stats_to_thrift;
1335
1336 column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1337 column_chunk
1338 .encodings()
1339 .collect::<Vec<_>>()
1340 .write_thrift_field(w, 2, 1)?;
1341 if w.write_path_in_schema() {
1342 let path = column_chunk.column_descr.path().parts();
1343 let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1344 path.write_thrift_field(w, 3, 2)?;
1345 column_chunk.compression.write_thrift_field(w, 4, 3)?;
1346 } else {
1347 column_chunk.compression.write_thrift_field(w, 4, 2)?;
1348 }
1349
1350 column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1351 column_chunk
1352 .total_uncompressed_size
1353 .write_thrift_field(w, 6, 5)?;
1354 column_chunk
1355 .total_compressed_size
1356 .write_thrift_field(w, 7, 6)?;
1357 let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1359 if let Some(index_page_offset) = column_chunk.index_page_offset {
1360 last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1361 }
1362 if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1363 last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1364 }
1365
1366 if should_write_column_stats(column_chunk) {
1367 let stats = page_stats_to_thrift(column_chunk.statistics());
1369 if let Some(stats) = stats {
1370 last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1371 }
1372 if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1373 last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1374 }
1375 if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1376 last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1377 }
1378 if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1379 last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1380 }
1381
1382 let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1384 || column_chunk.repetition_level_histogram.is_some()
1385 || column_chunk.definition_level_histogram.is_some()
1386 {
1387 let repetition_level_histogram = column_chunk
1388 .repetition_level_histogram()
1389 .map(|hist| hist.clone().into_inner());
1390
1391 let definition_level_histogram = column_chunk
1392 .definition_level_histogram()
1393 .map(|hist| hist.clone().into_inner());
1394
1395 Some(SizeStatistics {
1396 unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1397 repetition_level_histogram,
1398 definition_level_histogram,
1399 })
1400 } else {
1401 None
1402 };
1403 if let Some(size_stats) = size_stats {
1404 last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1405 }
1406
1407 if let Some(geo_stats) = column_chunk.geo_statistics() {
1408 geo_stats.write_thrift_field(w, 17, last_field_id)?;
1409 }
1410 }
1411
1412 w.write_struct_end()
1413}
1414
1415pub(super) struct FileMeta<'a> {
1417 pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1418 pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1419 pub(super) write_path_in_schema: bool,
1421}
1422
1423impl<'a> WriteThrift for FileMeta<'a> {
1435 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1436
1437 #[allow(unused_assignments)]
1439 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1440 writer.set_write_path_in_schema(self.write_path_in_schema);
1441
1442 self.file_metadata
1443 .version
1444 .write_thrift_field(writer, 1, 0)?;
1445
1446 let root = self.file_metadata.schema_descr().root_schema_ptr();
1449 let schema_len = num_nodes(&root)?;
1450 writer.write_field_begin(FieldType::List, 2, 1)?;
1451 writer.write_list_begin(ElementType::Struct, schema_len)?;
1452 write_schema(&root, writer)?;
1454
1455 self.file_metadata
1456 .num_rows
1457 .write_thrift_field(writer, 3, 2)?;
1458
1459 let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1461
1462 if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1463 last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1464 }
1465 if let Some(created_by) = self.file_metadata.created_by() {
1466 last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1467 }
1468 if let Some(column_orders) = self.file_metadata.column_orders() {
1469 last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1470 }
1471 #[cfg(feature = "encryption")]
1472 if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1473 last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1474 }
1475 #[cfg(feature = "encryption")]
1476 if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1477 key.as_slice()
1478 .write_thrift_field(writer, 9, last_field_id)?;
1479 }
1480
1481 writer.write_struct_end()
1482 }
1483}
1484
1485fn write_schema<W: Write>(
1486 schema: &TypePtr,
1487 writer: &mut ThriftCompactOutputProtocol<W>,
1488) -> Result<()> {
1489 if !schema.is_group() {
1490 return Err(general_err!("Root schema must be Group type"));
1491 }
1492 write_schema_helper(schema, writer)
1493}
1494
1495fn write_schema_helper<W: Write>(
1496 node: &TypePtr,
1497 writer: &mut ThriftCompactOutputProtocol<W>,
1498) -> Result<()> {
1499 match node.as_ref() {
1500 crate::schema::types::Type::PrimitiveType {
1501 basic_info,
1502 physical_type,
1503 type_length,
1504 scale,
1505 precision,
1506 } => {
1507 let element = SchemaElement {
1508 r#type: Some(*physical_type),
1509 type_length: if *type_length >= 0 {
1510 Some(*type_length)
1511 } else {
1512 None
1513 },
1514 repetition_type: Some(basic_info.repetition()),
1515 name: basic_info.name(),
1516 num_children: None,
1517 converted_type: match basic_info.converted_type() {
1518 ConvertedType::NONE => None,
1519 other => Some(other),
1520 },
1521 scale: if *scale >= 0 { Some(*scale) } else { None },
1522 precision: if *precision >= 0 {
1523 Some(*precision)
1524 } else {
1525 None
1526 },
1527 field_id: if basic_info.has_id() {
1528 Some(basic_info.id())
1529 } else {
1530 None
1531 },
1532 logical_type: basic_info.logical_type_ref().cloned(),
1533 };
1534 element.write_thrift(writer)
1535 }
1536 crate::schema::types::Type::GroupType { basic_info, fields } => {
1537 let repetition = if basic_info.has_repetition() {
1538 Some(basic_info.repetition())
1539 } else {
1540 None
1541 };
1542
1543 let element = SchemaElement {
1544 r#type: None,
1545 type_length: None,
1546 repetition_type: repetition,
1547 name: basic_info.name(),
1548 num_children: Some(fields.len().try_into()?),
1549 converted_type: match basic_info.converted_type() {
1550 ConvertedType::NONE => None,
1551 other => Some(other),
1552 },
1553 scale: None,
1554 precision: None,
1555 field_id: if basic_info.has_id() {
1556 Some(basic_info.id())
1557 } else {
1558 None
1559 },
1560 logical_type: basic_info.logical_type_ref().cloned(),
1561 };
1562
1563 element.write_thrift(writer)?;
1564
1565 for field in fields {
1567 write_schema_helper(field, writer)?;
1568 }
1569 Ok(())
1570 }
1571 }
1572}
1573
1574impl WriteThrift for RowGroupMetaData {
1584 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1585
1586 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1587 self.columns.write_thrift_field(writer, 1, 0)?;
1589 self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1590 let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1591 if let Some(sorting_columns) = self.sorting_columns() {
1592 last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1593 }
1594 if let Some(file_offset) = self.file_offset() {
1595 last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1596 }
1597 last_field_id = self
1599 .compressed_size()
1600 .write_thrift_field(writer, 6, last_field_id)?;
1601 if let Some(ordinal) = self.ordinal() {
1602 ordinal.write_thrift_field(writer, 7, last_field_id)?;
1603 }
1604 writer.write_struct_end()
1605 }
1606}
1607
1608impl WriteThrift for ColumnChunkMetaData {
1620 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1621
1622 #[allow(unused_assignments)]
1623 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1624 let mut last_field_id = 0i16;
1625 if let Some(file_path) = self.file_path() {
1626 last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1627 }
1628 last_field_id = self
1629 .file_offset()
1630 .write_thrift_field(writer, 2, last_field_id)?;
1631
1632 #[cfg(feature = "encryption")]
1633 let write_meta_data =
1634 self.encrypted_column_metadata.is_none() || self.plaintext_footer_mode;
1635 #[cfg(not(feature = "encryption"))]
1636 let write_meta_data = true;
1637
1638 if write_meta_data {
1644 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1645 serialize_column_meta_data(self, writer)?;
1646 last_field_id = 3;
1647 }
1648
1649 if let Some(offset_idx_off) = self.offset_index_offset() {
1650 last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1651 }
1652 if let Some(offset_idx_len) = self.offset_index_length() {
1653 last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1654 }
1655 if let Some(column_idx_off) = self.column_index_offset() {
1656 last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1657 }
1658 if let Some(column_idx_len) = self.column_index_length() {
1659 last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1660 }
1661 #[cfg(feature = "encryption")]
1662 {
1663 if let Some(crypto_metadata) = self.crypto_metadata() {
1664 last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1665 }
1666 if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1667 encrypted_meta
1668 .as_slice()
1669 .write_thrift_field(writer, 9, last_field_id)?;
1670 }
1671 }
1672
1673 writer.write_struct_end()
1674 }
1675}
1676
1677impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1682 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1683
1684 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1685 let mut last_field_id = 0i16;
1686 if let Some(bbox) = self.bounding_box() {
1687 last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1688 }
1689 if let Some(geo_types) = self.geospatial_types() {
1690 geo_types.write_thrift_field(writer, 2, last_field_id)?;
1691 }
1692
1693 writer.write_struct_end()
1694 }
1695}
1696
1697use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1699write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1700
1701impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1712 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1713
1714 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1715 self.get_xmin().write_thrift_field(writer, 1, 0)?;
1716 self.get_xmax().write_thrift_field(writer, 2, 1)?;
1717 self.get_ymin().write_thrift_field(writer, 3, 2)?;
1718 let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1719
1720 if let Some(zmin) = self.get_zmin() {
1721 last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1722 }
1723 if let Some(zmax) = self.get_zmax() {
1724 last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1725 }
1726 if let Some(mmin) = self.get_mmin() {
1727 last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1728 }
1729 if let Some(mmax) = self.get_mmax() {
1730 mmax.write_thrift_field(writer, 8, last_field_id)?;
1731 }
1732
1733 writer.write_struct_end()
1734 }
1735}
1736
1737use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1739write_thrift_field!(RustBoundingBox, FieldType::Struct);
1740
1741#[cfg(test)]
1742pub(crate) mod tests {
1743 use crate::basic::{Encoding, PageType, Type as PhysicalType};
1744 use crate::errors::Result;
1745 use crate::file::metadata::thrift::{
1746 BoundingBox, DataPageHeaderV2, DictionaryPageHeader, PageHeader, SchemaElement,
1747 write_schema,
1748 };
1749 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
1750 use crate::parquet_thrift::tests::test_roundtrip;
1751 use crate::parquet_thrift::{
1752 ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift,
1753 read_thrift_vec,
1754 };
1755 use crate::schema::types::{
1756 ColumnDescriptor, ColumnPath, SchemaDescriptor, TypePtr, num_nodes,
1757 parquet_schema_from_array,
1758 };
1759 use std::sync::Arc;
1760
1761 pub(crate) fn read_row_group(
1763 buf: &mut [u8],
1764 schema_descr: Arc<SchemaDescriptor>,
1765 ) -> Result<RowGroupMetaData> {
1766 let mut reader = ThriftSliceInputProtocol::new(buf);
1767 crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None)
1768 }
1769
1770 pub(crate) fn read_column_chunk(
1771 buf: &mut [u8],
1772 column_descr: Arc<ColumnDescriptor>,
1773 ) -> Result<ColumnChunkMetaData> {
1774 read_column_chunk_with_options(buf, column_descr, None)
1775 }
1776
1777 pub(crate) fn read_column_chunk_with_options(
1778 buf: &mut [u8],
1779 column_descr: Arc<ColumnDescriptor>,
1780 options: Option<&ParquetMetaDataOptions>,
1781 ) -> Result<ColumnChunkMetaData> {
1782 let mut reader = ThriftSliceInputProtocol::new(buf);
1783 crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
1784 }
1785
1786 pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1787 let num_nodes = num_nodes(&schema)?;
1788 let mut buf = Vec::new();
1789 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1790
1791 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1793
1794 write_schema(&schema, &mut writer)?;
1796
1797 let mut prot = ThriftSliceInputProtocol::new(&buf);
1798 let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1799 parquet_schema_from_array(se)
1800 }
1801
1802 pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1803 let num_nodes = num_nodes(schema)?;
1804 let mut buf = Vec::new();
1805 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1806
1807 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1809
1810 write_schema(schema, &mut writer)?;
1812 Ok(buf)
1813 }
1814
1815 pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1816 let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1817 read_thrift_vec(&mut prot)
1818 }
1819
1820 fn thrift_bytes<T: WriteThrift>(value: &T) -> Vec<u8> {
1821 let mut buf = Vec::new();
1822 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1823 value.write_thrift(&mut writer).unwrap();
1824 buf
1825 }
1826
1827 fn change_false_bool_field_to_i32(buf: &mut [u8]) {
1828 let pos = buf
1829 .iter()
1830 .rposition(|byte| *byte == 0x12)
1831 .expect("expected BOOL_FALSE field header byte");
1832 buf[pos] = 0x15;
1833 }
1834
1835 fn assert_malformed_bool_error(err: crate::errors::ParquetError) {
1836 let msg = err.to_string();
1837 assert!(
1838 msg.contains("Unexpected struct field type"),
1839 "unexpected error message: {msg}"
1840 );
1841 }
1842
1843 #[test]
1844 fn test_bounding_box_roundtrip() {
1845 test_roundtrip(BoundingBox {
1846 xmin: 0.1.into(),
1847 xmax: 10.3.into(),
1848 ymin: 0.001.into(),
1849 ymax: 128.5.into(),
1850 zmin: None,
1851 zmax: None,
1852 mmin: None,
1853 mmax: None,
1854 });
1855
1856 test_roundtrip(BoundingBox {
1857 xmin: 0.1.into(),
1858 xmax: 10.3.into(),
1859 ymin: 0.001.into(),
1860 ymax: 128.5.into(),
1861 zmin: Some(11.0.into()),
1862 zmax: Some(1300.0.into()),
1863 mmin: None,
1864 mmax: None,
1865 });
1866
1867 test_roundtrip(BoundingBox {
1868 xmin: 0.1.into(),
1869 xmax: 10.3.into(),
1870 ymin: 0.001.into(),
1871 ymax: 128.5.into(),
1872 zmin: Some(11.0.into()),
1873 zmax: Some(1300.0.into()),
1874 mmin: Some(3.7.into()),
1875 mmax: Some(42.0.into()),
1876 });
1877 }
1878
1879 #[test]
1880 fn test_convert_stats_preserves_missing_null_count() {
1881 let primitive =
1882 crate::schema::types::Type::primitive_type_builder("col", PhysicalType::INT32)
1883 .build()
1884 .unwrap();
1885 let column_descr = Arc::new(ColumnDescriptor::new(
1886 Arc::new(primitive),
1887 0,
1888 0,
1889 ColumnPath::new(vec![]),
1890 ));
1891
1892 let none_null_count = super::Statistics {
1893 max: None,
1894 min: None,
1895 null_count: None,
1896 distinct_count: None,
1897 max_value: None,
1898 min_value: None,
1899 is_max_value_exact: None,
1900 is_min_value_exact: None,
1901 };
1902 let decoded_none = super::convert_stats(&column_descr, Some(none_null_count))
1903 .unwrap()
1904 .unwrap();
1905 assert_eq!(decoded_none.null_count_opt(), None);
1906
1907 let zero_null_count = super::Statistics {
1908 max: None,
1909 min: None,
1910 null_count: Some(0),
1911 distinct_count: None,
1912 max_value: None,
1913 min_value: None,
1914 is_max_value_exact: None,
1915 is_min_value_exact: None,
1916 };
1917 let decoded_zero = super::convert_stats(&column_descr, Some(zero_null_count))
1918 .unwrap()
1919 .unwrap();
1920 assert_eq!(decoded_zero.null_count_opt(), Some(0));
1921 }
1922
1923 #[test]
1924 fn test_convert_stats_returns_error_for_overlong_int96_statistics() {
1925 let primitive =
1926 crate::schema::types::Type::primitive_type_builder("col", PhysicalType::INT96)
1927 .build()
1928 .unwrap();
1929 let column_descr = Arc::new(ColumnDescriptor::new(
1930 Arc::new(primitive),
1931 0,
1932 0,
1933 ColumnPath::new(vec![]),
1934 ));
1935 let invalid = (0..13).collect::<Vec<_>>();
1936
1937 let make_stats = |min, max| super::Statistics {
1938 max,
1939 min,
1940 null_count: Some(0),
1941 distinct_count: None,
1942 max_value: None,
1943 min_value: None,
1944 is_max_value_exact: None,
1945 is_min_value_exact: None,
1946 };
1947
1948 let err = super::convert_stats(&column_descr, Some(make_stats(Some(&invalid), None)))
1949 .unwrap_err();
1950 assert_eq!(
1951 err.to_string(),
1952 "Parquet error: Incorrect Int96 min statistics"
1953 );
1954
1955 let err = super::convert_stats(&column_descr, Some(make_stats(None, Some(&invalid))))
1956 .unwrap_err();
1957 assert_eq!(
1958 err.to_string(),
1959 "Parquet error: Incorrect Int96 max statistics"
1960 );
1961 }
1962
1963 #[test]
1964 fn malformed_bool_field_returns_error_not_panic() {
1965 let page_header = PageHeader {
1966 r#type: PageType::DICTIONARY_PAGE,
1967 uncompressed_page_size: 1,
1968 compressed_page_size: 1,
1969 crc: None,
1970 data_page_header: None,
1971 index_page_header: None,
1972 dictionary_page_header: Some(DictionaryPageHeader {
1973 num_values: 1,
1974 encoding: Encoding::PLAIN,
1975 is_sorted: Some(false),
1976 }),
1977 data_page_header_v2: None,
1978 };
1979
1980 let mut buf = thrift_bytes(&page_header);
1981 change_false_bool_field_to_i32(&mut buf);
1982
1983 let mut prot = ThriftSliceInputProtocol::new(&buf);
1984 let err = PageHeader::read_thrift_without_stats(&mut prot)
1985 .expect_err("malformed bool field should return an error");
1986 assert_malformed_bool_error(err);
1987 }
1988
1989 #[test]
1990 fn malformed_data_page_v2_bool_field_returns_error_not_panic() {
1991 let data_page_header_v2 = DataPageHeaderV2 {
1992 num_values: 1,
1993 num_nulls: 0,
1994 num_rows: 1,
1995 encoding: Encoding::PLAIN,
1996 definition_levels_byte_length: 0,
1997 repetition_levels_byte_length: 0,
1998 is_compressed: Some(false),
1999 statistics: None,
2000 };
2001
2002 let mut buf = thrift_bytes(&data_page_header_v2);
2003 change_false_bool_field_to_i32(&mut buf);
2004
2005 let mut prot = ThriftSliceInputProtocol::new(&buf);
2006 let err = DataPageHeaderV2::read_thrift_without_stats(&mut prot)
2007 .expect_err("malformed bool field should return an error");
2008 assert_malformed_bool_error(err);
2009 }
2010}