1use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34 column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37 basic::{
38 ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType,
39 Repetition, Type,
40 },
41 data_type::{ByteArray, FixedLenByteArray, Int96},
42 errors::{ParquetError, Result},
43 file::{
44 metadata::{
45 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46 PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, ParquetPageEncodingStats,
47 RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
48 },
49 statistics::ValueStatistics,
50 },
51 parquet_thrift::{
52 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53 ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54 read_thrift_vec,
55 },
56 schema::types::{
57 ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58 },
59 thrift_struct,
60 util::bit_util::FromBytes,
61 write_thrift_field,
62};
63
64thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67 1: optional Type r#type;
69 2: optional i32 type_length;
74 3: optional Repetition repetition_type;
77 4: required string<'a> name;
79 5: optional i32 num_children;
84 6: optional ConvertedType converted_type;
89 7: optional i32 scale
94 8: optional i32 precision
95 9: optional i32 field_id;
98 10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108 1: optional binary<'a> max;
109 2: optional binary<'a> min;
110 3: optional i64 null_count;
111 4: optional i64 distinct_count;
112 5: optional binary<'a> max_value;
113 6: optional binary<'a> min_value;
114 7: optional bool is_max_value_exact;
115 8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121 1: required double xmin;
122 2: required double xmax;
123 3: required double ymin;
124 4: required double ymax;
125 5: optional double zmin;
126 6: optional double zmax;
127 7: optional double mmin;
128 8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134 1: optional BoundingBox bbox;
135 2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141 1: optional i64 unencoded_byte_array_data_bytes;
142 2: optional list<i64> repetition_level_histogram;
143 3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148 stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150 stats.map(|st| {
151 let bbox = convert_bounding_box(st.bbox);
152 let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153 Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154 bbox,
155 geospatial_types,
156 ))
157 })
158}
159
160fn convert_bounding_box(
161 bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163 bbox.map(|bb| {
164 let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165 bb.xmin.into(),
166 bb.xmax.into(),
167 bb.ymin.into(),
168 bb.ymax.into(),
169 );
170
171 newbb = match (bb.zmin, bb.zmax) {
172 (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173 _ => newbb,
175 };
176
177 newbb = match (bb.mmin, bb.mmax) {
178 (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179 _ => newbb,
181 };
182
183 newbb
184 })
185}
186
187fn convert_stats(
189 column_descr: &Arc<ColumnDescriptor>,
190 thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192 use crate::file::statistics::Statistics as FStatistics;
193 Ok(match thrift_stats {
194 Some(stats) => {
195 let null_count = stats.null_count.unwrap_or(0);
199
200 if null_count < 0 {
201 return Err(general_err!(
202 "Statistics null count is negative {}",
203 null_count
204 ));
205 }
206
207 let null_count = Some(null_count as u64);
209 let distinct_count = stats.distinct_count.map(|value| value as u64);
211 let old_format = stats.min_value.is_none() && stats.max_value.is_none();
213 let min = if old_format {
215 stats.min
216 } else {
217 stats.min_value
218 };
219 let max = if old_format {
221 stats.max
222 } else {
223 stats.max_value
224 };
225
226 fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
227 if let Some(min) = min {
228 if min.len() < len {
229 return Err(general_err!("Insufficient bytes to parse min statistic",));
230 }
231 }
232 if let Some(max) = max {
233 if max.len() < len {
234 return Err(general_err!("Insufficient bytes to parse max statistic",));
235 }
236 }
237 Ok(())
238 }
239
240 let physical_type = column_descr.physical_type();
241 match physical_type {
242 Type::BOOLEAN => check_len(&min, &max, 1),
243 Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
244 Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
245 Type::INT96 => check_len(&min, &max, 12),
246 _ => Ok(()),
247 }?;
248
249 let res = match physical_type {
254 Type::BOOLEAN => FStatistics::boolean(
255 min.map(|data| data[0] != 0),
256 max.map(|data| data[0] != 0),
257 distinct_count,
258 null_count,
259 old_format,
260 ),
261 Type::INT32 => FStatistics::int32(
262 min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263 max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
264 distinct_count,
265 null_count,
266 old_format,
267 ),
268 Type::INT64 => FStatistics::int64(
269 min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270 max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
271 distinct_count,
272 null_count,
273 old_format,
274 ),
275 Type::INT96 => {
276 let min = if let Some(data) = min {
278 assert_eq!(data.len(), 12);
279 Some(Int96::try_from_le_slice(data)?)
280 } else {
281 None
282 };
283 let max = if let Some(data) = max {
284 assert_eq!(data.len(), 12);
285 Some(Int96::try_from_le_slice(data)?)
286 } else {
287 None
288 };
289 FStatistics::int96(min, max, distinct_count, null_count, old_format)
290 }
291 Type::FLOAT => FStatistics::float(
292 min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293 max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
294 distinct_count,
295 null_count,
296 old_format,
297 ),
298 Type::DOUBLE => FStatistics::double(
299 min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300 max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
301 distinct_count,
302 null_count,
303 old_format,
304 ),
305 Type::BYTE_ARRAY => FStatistics::ByteArray(
306 ValueStatistics::new(
307 min.map(ByteArray::from),
308 max.map(ByteArray::from),
309 distinct_count,
310 null_count,
311 old_format,
312 )
313 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
314 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
315 ),
316 Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
317 ValueStatistics::new(
318 min.map(ByteArray::from).map(FixedLenByteArray::from),
319 max.map(ByteArray::from).map(FixedLenByteArray::from),
320 distinct_count,
321 null_count,
322 old_format,
323 )
324 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
325 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
326 ),
327 };
328
329 Some(res)
330 }
331 None => None,
332 })
333}
334
335const COL_META_TYPE: u16 = 1 << 1;
337const COL_META_ENCODINGS: u16 = 1 << 2;
338const COL_META_CODEC: u16 = 1 << 4;
339const COL_META_NUM_VALUES: u16 = 1 << 5;
340const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
341const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
342const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
343
344const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
346 | COL_META_ENCODINGS
347 | COL_META_CODEC
348 | COL_META_NUM_VALUES
349 | COL_META_TOTAL_UNCOMP_SZ
350 | COL_META_TOTAL_COMP_SZ
351 | COL_META_DATA_PAGE_OFFSET;
352
353fn validate_column_metadata(mask: u16) -> Result<()> {
356 if mask != COL_META_ALL_REQUIRED {
357 if mask & COL_META_ENCODINGS == 0 {
358 return Err(general_err!("Required field encodings is missing"));
359 }
360
361 if mask & COL_META_CODEC == 0 {
362 return Err(general_err!("Required field codec is missing"));
363 }
364 if mask & COL_META_NUM_VALUES == 0 {
365 return Err(general_err!("Required field num_values is missing"));
366 }
367 if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
368 return Err(general_err!(
369 "Required field total_uncompressed_size is missing"
370 ));
371 }
372 if mask & COL_META_TOTAL_COMP_SZ == 0 {
373 return Err(general_err!(
374 "Required field total_compressed_size is missing"
375 ));
376 }
377 if mask & COL_META_DATA_PAGE_OFFSET == 0 {
378 return Err(general_err!("Required field data_page_offset is missing"));
379 }
380 }
381
382 Ok(())
383}
384
385fn read_encoding_stats_as_mask<'a>(
386 prot: &mut ThriftSliceInputProtocol<'a>,
387) -> Result<EncodingMask> {
388 let mut mask = 0i32;
390 let list_ident = prot.read_list_begin()?;
391 for _ in 0..list_ident.size {
392 let pes = PageEncodingStats::read_thrift(prot)?;
393 match pes.page_type {
394 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => mask |= 1 << pes.encoding as i32,
395 _ => {}
396 }
397 }
398 EncodingMask::try_new(mask)
399}
400
401fn read_column_metadata<'a>(
404 prot: &mut ThriftSliceInputProtocol<'a>,
405 column: &mut ColumnChunkMetaData,
406 col_index: usize,
407 options: Option<&ParquetMetaDataOptions>,
408) -> Result<u16> {
409 let mut seen_mask = 0u16;
411
412 let mut skip_pes = false;
413 let mut pes_mask = true;
414 let mut skip_col_stats = false;
415 let mut skip_size_stats = false;
416
417 if let Some(opts) = options {
418 skip_pes = opts.skip_encoding_stats(col_index);
419 pes_mask = opts.encoding_stats_as_mask();
420 skip_col_stats = opts.skip_column_stats(col_index);
421 skip_size_stats = opts.skip_size_stats(col_index);
422 }
423
424 let column_descr = &column.column_descr;
444
445 let mut last_field_id = 0i16;
446 loop {
447 let field_ident = prot.read_field_begin(last_field_id)?;
448 if field_ident.field_type == FieldType::Stop {
449 break;
450 }
451 match field_ident.id {
452 1 => {
454 Type::read_thrift(&mut *prot)?;
456 seen_mask |= COL_META_TYPE;
457 }
458 2 => {
459 column.encodings = EncodingMask::read_thrift(&mut *prot)?;
460 seen_mask |= COL_META_ENCODINGS;
461 }
462 4 => {
464 column.compression = Compression::read_thrift(&mut *prot)?;
465 seen_mask |= COL_META_CODEC;
466 }
467 5 => {
468 column.num_values = i64::read_thrift(&mut *prot)?;
469 seen_mask |= COL_META_NUM_VALUES;
470 }
471 6 => {
472 column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
473 seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
474 }
475 7 => {
476 column.total_compressed_size = i64::read_thrift(&mut *prot)?;
477 seen_mask |= COL_META_TOTAL_COMP_SZ;
478 }
479 9 => {
481 column.data_page_offset = i64::read_thrift(&mut *prot)?;
482 seen_mask |= COL_META_DATA_PAGE_OFFSET;
483 }
484 10 => {
485 column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
486 }
487 11 => {
488 column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
489 }
490 12 if !skip_col_stats => {
491 column.statistics =
492 convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
493 }
494 13 if !skip_pes => {
495 if pes_mask {
496 let val = read_encoding_stats_as_mask(&mut *prot)?;
497 column.encoding_stats = Some(ParquetPageEncodingStats::Mask(val));
498 } else {
499 let val =
500 read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
501 column.encoding_stats = Some(ParquetPageEncodingStats::Full(val));
502 }
503 }
504 14 => {
505 column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
506 }
507 15 => {
508 column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
509 }
510 16 if !skip_size_stats => {
511 let val = SizeStatistics::read_thrift(&mut *prot)?;
512 column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
513 column.repetition_level_histogram =
514 val.repetition_level_histogram.map(LevelHistogram::from);
515 column.definition_level_histogram =
516 val.definition_level_histogram.map(LevelHistogram::from);
517 }
518 17 => {
519 let val = GeospatialStatistics::read_thrift(&mut *prot)?;
520 column.geo_statistics = convert_geo_stats(Some(val));
521 }
522 _ => {
523 prot.skip(field_ident.field_type)?;
524 }
525 };
526 last_field_id = field_ident.id;
527 }
528
529 Ok(seen_mask)
530}
531
532fn read_column_chunk<'a>(
535 prot: &mut ThriftSliceInputProtocol<'a>,
536 column_descr: &Arc<ColumnDescriptor>,
537 col_index: usize,
538 options: Option<&ParquetMetaDataOptions>,
539) -> Result<ColumnChunkMetaData> {
540 let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
542
543 let mut has_file_offset = false;
545
546 let mut col_meta_mask = 0u16;
548
549 let mut last_field_id = 0i16;
561 loop {
562 let field_ident = prot.read_field_begin(last_field_id)?;
563 if field_ident.field_type == FieldType::Stop {
564 break;
565 }
566 match field_ident.id {
567 1 => {
568 col.file_path = Some(String::read_thrift(&mut *prot)?);
569 }
570 2 => {
571 col.file_offset = i64::read_thrift(&mut *prot)?;
572 has_file_offset = true;
573 }
574 3 => {
575 col_meta_mask = read_column_metadata(&mut *prot, &mut col, col_index, options)?;
576 }
577 4 => {
578 col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
579 }
580 5 => {
581 col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
582 }
583 6 => {
584 col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
585 }
586 7 => {
587 col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
588 }
589 #[cfg(feature = "encryption")]
590 8 => {
591 let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
592 col.column_crypto_metadata = Some(Box::new(val));
593 }
594 #[cfg(feature = "encryption")]
595 9 => {
596 col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
597 }
598 _ => {
599 prot.skip(field_ident.field_type)?;
600 }
601 };
602 last_field_id = field_ident.id;
603 }
604
605 if !has_file_offset {
607 return Err(general_err!("Required field file_offset is missing"));
608 };
609
610 #[cfg(feature = "encryption")]
612 if col.encrypted_column_metadata.is_some() {
613 return Ok(col);
614 }
615
616 validate_column_metadata(col_meta_mask)?;
618
619 Ok(col)
620}
621
622fn read_row_group(
623 prot: &mut ThriftSliceInputProtocol,
624 schema_descr: &Arc<SchemaDescriptor>,
625 options: Option<&ParquetMetaDataOptions>,
626) -> Result<RowGroupMetaData> {
627 let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
629
630 const RG_COLUMNS: u8 = 1 << 1;
632 const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
633 const RG_NUM_ROWS: u8 = 1 << 3;
634 const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
635
636 let mut mask = 0u8;
637
638 let mut last_field_id = 0i16;
648 loop {
649 let field_ident = prot.read_field_begin(last_field_id)?;
650 if field_ident.field_type == FieldType::Stop {
651 break;
652 }
653 match field_ident.id {
654 1 => {
655 let list_ident = prot.read_list_begin()?;
656 if schema_descr.num_columns() != list_ident.size as usize {
657 return Err(general_err!(
658 "Column count mismatch. Schema has {} columns while Row Group has {}",
659 schema_descr.num_columns(),
660 list_ident.size
661 ));
662 }
663 for i in 0..list_ident.size as usize {
664 let col = read_column_chunk(prot, &schema_descr.columns()[i], i, options)?;
665 row_group.columns.push(col);
666 }
667 mask |= RG_COLUMNS;
668 }
669 2 => {
670 row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
671 mask |= RG_TOT_BYTE_SIZE;
672 }
673 3 => {
674 row_group.num_rows = i64::read_thrift(&mut *prot)?;
675 mask |= RG_NUM_ROWS;
676 }
677 4 => {
678 let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
679 row_group.sorting_columns = Some(val);
680 }
681 5 => {
682 row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
683 }
684 7 => {
686 row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
687 }
688 _ => {
689 prot.skip(field_ident.field_type)?;
690 }
691 };
692 last_field_id = field_ident.id;
693 }
694
695 if mask != RG_ALL_REQUIRED {
696 if mask & RG_COLUMNS == 0 {
697 return Err(general_err!("Required field columns is missing"));
698 }
699 if mask & RG_TOT_BYTE_SIZE == 0 {
700 return Err(general_err!("Required field total_byte_size is missing"));
701 }
702 if mask & RG_NUM_ROWS == 0 {
703 return Err(general_err!("Required field num_rows is missing"));
704 }
705 }
706
707 Ok(row_group)
708}
709
710pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
713 let mut prot = ThriftSliceInputProtocol::new(buf);
714
715 let mut last_field_id = 0i16;
716 loop {
717 let field_ident = prot.read_field_begin(last_field_id)?;
718 if field_ident.field_type == FieldType::Stop {
719 break;
720 }
721 match field_ident.id {
722 2 => {
723 let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
725 let val = parquet_schema_from_array(val)?;
726 return Ok(SchemaDescriptor::new(val));
727 }
728 _ => prot.skip(field_ident.field_type)?,
729 }
730 last_field_id = field_ident.id;
731 }
732 Err(general_err!("Input does not contain a schema"))
733}
734
735pub(crate) fn parquet_metadata_from_bytes(
738 buf: &[u8],
739 options: Option<&ParquetMetaDataOptions>,
740) -> Result<ParquetMetaData> {
741 let mut prot = ThriftSliceInputProtocol::new(buf);
742
743 let mut version: Option<i32> = None;
745 let mut num_rows: Option<i64> = None;
746 let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
747 let mut key_value_metadata: Option<Vec<KeyValue>> = None;
748 let mut created_by: Option<&str> = None;
749 let mut column_orders: Option<Vec<ColumnOrder>> = None;
750 #[cfg(feature = "encryption")]
751 let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
752 #[cfg(feature = "encryption")]
753 let mut footer_signing_key_metadata: Option<&[u8]> = None;
754
755 let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
757
758 if let Some(options) = options {
760 schema_descr = options.schema().cloned();
761 }
762
763 let mut last_field_id = 0i16;
775 loop {
776 let field_ident = prot.read_field_begin(last_field_id)?;
777 if field_ident.field_type == FieldType::Stop {
778 break;
779 }
780 match field_ident.id {
781 1 => {
782 version = Some(i32::read_thrift(&mut prot)?);
783 }
784 2 => {
785 if schema_descr.is_some() {
787 prot.skip(field_ident.field_type)?;
788 } else {
789 let val =
791 read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
792 let val = parquet_schema_from_array(val)?;
793 schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
794 }
795 }
796 3 => {
797 num_rows = Some(i64::read_thrift(&mut prot)?);
798 }
799 4 => {
800 if schema_descr.is_none() {
801 return Err(general_err!("Required field schema is missing"));
802 }
803 let schema_descr = schema_descr.as_ref().unwrap();
804 let list_ident = prot.read_list_begin()?;
805 let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
806
807 let mut assigner = OrdinalAssigner::new();
809 for ordinal in 0..list_ident.size {
810 let ordinal: i16 = ordinal.try_into().map_err(|_| {
811 ParquetError::General(format!(
812 "Row group ordinal {ordinal} exceeds i16 max value",
813 ))
814 })?;
815 let rg = read_row_group(&mut prot, schema_descr, options)?;
816 rg_vec.push(assigner.ensure(ordinal, rg)?);
817 }
818 row_groups = Some(rg_vec);
819 }
820 5 => {
821 let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
822 key_value_metadata = Some(val);
823 }
824 6 => {
825 created_by = Some(<&str>::read_thrift(&mut prot)?);
826 }
827 7 => {
828 let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
829 column_orders = Some(val);
830 }
831 #[cfg(feature = "encryption")]
832 8 => {
833 let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
834 encryption_algorithm = Some(val);
835 }
836 #[cfg(feature = "encryption")]
837 9 => {
838 footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
839 }
840 _ => {
841 prot.skip(field_ident.field_type)?;
842 }
843 };
844 last_field_id = field_ident.id;
845 }
846 let Some(version) = version else {
847 return Err(general_err!("Required field version is missing"));
848 };
849 let Some(num_rows) = num_rows else {
850 return Err(general_err!("Required field num_rows is missing"));
851 };
852 let Some(row_groups) = row_groups else {
853 return Err(general_err!("Required field row_groups is missing"));
854 };
855
856 let created_by = created_by.map(|c| c.to_owned());
857
858 let schema_descr = schema_descr.unwrap();
860
861 if column_orders
863 .as_ref()
864 .is_some_and(|cos| cos.len() != schema_descr.num_columns())
865 {
866 return Err(general_err!("Column order length mismatch"));
867 }
868 let column_orders = column_orders.map(|mut cos| {
871 for (i, column) in schema_descr.columns().iter().enumerate() {
872 if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
873 let sort_order = ColumnOrder::sort_order_for_type(
874 column.logical_type_ref(),
875 column.converted_type(),
876 column.physical_type(),
877 );
878 cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
879 }
880 }
881 cos
882 });
883
884 #[cfg(not(feature = "encryption"))]
885 let fmd = crate::file::metadata::FileMetaData::new(
886 version,
887 num_rows,
888 created_by,
889 key_value_metadata,
890 schema_descr,
891 column_orders,
892 );
893 #[cfg(feature = "encryption")]
894 let fmd = crate::file::metadata::FileMetaData::new(
895 version,
896 num_rows,
897 created_by,
898 key_value_metadata,
899 schema_descr,
900 column_orders,
901 )
902 .with_encryption_algorithm(encryption_algorithm)
903 .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
904
905 Ok(ParquetMetaData::new(fmd, row_groups))
906}
907
908#[derive(Debug, Default)]
910pub(crate) struct OrdinalAssigner {
911 first_has_ordinal: Option<bool>,
912}
913
914impl OrdinalAssigner {
915 fn new() -> Self {
916 Default::default()
917 }
918
919 fn ensure(
932 &mut self,
933 actual_ordinal: i16,
934 mut rg: RowGroupMetaData,
935 ) -> Result<RowGroupMetaData> {
936 let rg_has_ordinal = rg.ordinal.is_some();
937
938 if self.first_has_ordinal.is_none() {
940 self.first_has_ordinal = Some(rg_has_ordinal);
941 }
942
943 let first_has_ordinal = self.first_has_ordinal.unwrap();
945 if !first_has_ordinal && !rg_has_ordinal {
946 rg.ordinal = Some(actual_ordinal);
947 } else if first_has_ordinal != rg_has_ordinal {
948 return Err(general_err!(
949 "Inconsistent ordinal assignment: first_has_ordinal is set to \
950 {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
951 first_has_ordinal,
952 actual_ordinal,
953 rg_has_ordinal
954 ));
955 }
956 Ok(rg)
957 }
958}
959
960thrift_struct!(
961 pub(crate) struct IndexPageHeader {}
962);
963
964thrift_struct!(
965pub(crate) struct DictionaryPageHeader {
966 1: required i32 num_values;
968
969 2: required Encoding encoding
971
972 3: optional bool is_sorted;
974}
975);
976
977thrift_struct!(
978pub(crate) struct PageStatistics {
986 1: optional binary max;
987 2: optional binary min;
988 3: optional i64 null_count;
989 4: optional i64 distinct_count;
990 5: optional binary max_value;
991 6: optional binary min_value;
992 7: optional bool is_max_value_exact;
993 8: optional bool is_min_value_exact;
994}
995);
996
997thrift_struct!(
998pub(crate) struct DataPageHeader {
999 1: required i32 num_values
1000 2: required Encoding encoding
1001 3: required Encoding definition_level_encoding;
1002 4: required Encoding repetition_level_encoding;
1003 5: optional PageStatistics statistics;
1004}
1005);
1006
1007impl DataPageHeader {
1008 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1010 where
1011 R: ThriftCompactInputProtocol<'a>,
1012 {
1013 let mut num_values: Option<i32> = None;
1014 let mut encoding: Option<Encoding> = None;
1015 let mut definition_level_encoding: Option<Encoding> = None;
1016 let mut repetition_level_encoding: Option<Encoding> = None;
1017 let statistics: Option<PageStatistics> = None;
1018 let mut last_field_id = 0i16;
1019 loop {
1020 let field_ident = prot.read_field_begin(last_field_id)?;
1021 if field_ident.field_type == FieldType::Stop {
1022 break;
1023 }
1024 match field_ident.id {
1025 1 => {
1026 let val = i32::read_thrift(&mut *prot)?;
1027 num_values = Some(val);
1028 }
1029 2 => {
1030 let val = Encoding::read_thrift(&mut *prot)?;
1031 encoding = Some(val);
1032 }
1033 3 => {
1034 let val = Encoding::read_thrift(&mut *prot)?;
1035 definition_level_encoding = Some(val);
1036 }
1037 4 => {
1038 let val = Encoding::read_thrift(&mut *prot)?;
1039 repetition_level_encoding = Some(val);
1040 }
1041 _ => {
1042 prot.skip(field_ident.field_type)?;
1043 }
1044 };
1045 last_field_id = field_ident.id;
1046 }
1047 let Some(num_values) = num_values else {
1048 return Err(general_err!("Required field num_values is missing"));
1049 };
1050 let Some(encoding) = encoding else {
1051 return Err(general_err!("Required field encoding is missing"));
1052 };
1053 let Some(definition_level_encoding) = definition_level_encoding else {
1054 return Err(general_err!(
1055 "Required field definition_level_encoding is missing"
1056 ));
1057 };
1058 let Some(repetition_level_encoding) = repetition_level_encoding else {
1059 return Err(general_err!(
1060 "Required field repetition_level_encoding is missing"
1061 ));
1062 };
1063 Ok(Self {
1064 num_values,
1065 encoding,
1066 definition_level_encoding,
1067 repetition_level_encoding,
1068 statistics,
1069 })
1070 }
1071}
1072
1073thrift_struct!(
1074pub(crate) struct DataPageHeaderV2 {
1075 1: required i32 num_values
1076 2: required i32 num_nulls
1077 3: required i32 num_rows
1078 4: required Encoding encoding
1079 5: required i32 definition_levels_byte_length;
1080 6: required i32 repetition_levels_byte_length;
1081 7: optional bool is_compressed = true;
1082 8: optional PageStatistics statistics;
1083}
1084);
1085
1086impl DataPageHeaderV2 {
1087 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1089 where
1090 R: ThriftCompactInputProtocol<'a>,
1091 {
1092 let mut num_values: Option<i32> = None;
1093 let mut num_nulls: Option<i32> = None;
1094 let mut num_rows: Option<i32> = None;
1095 let mut encoding: Option<Encoding> = None;
1096 let mut definition_levels_byte_length: Option<i32> = None;
1097 let mut repetition_levels_byte_length: Option<i32> = None;
1098 let mut is_compressed: Option<bool> = None;
1099 let statistics: Option<PageStatistics> = None;
1100 let mut last_field_id = 0i16;
1101 loop {
1102 let field_ident = prot.read_field_begin(last_field_id)?;
1103 if field_ident.field_type == FieldType::Stop {
1104 break;
1105 }
1106 match field_ident.id {
1107 1 => {
1108 let val = i32::read_thrift(&mut *prot)?;
1109 num_values = Some(val);
1110 }
1111 2 => {
1112 let val = i32::read_thrift(&mut *prot)?;
1113 num_nulls = Some(val);
1114 }
1115 3 => {
1116 let val = i32::read_thrift(&mut *prot)?;
1117 num_rows = Some(val);
1118 }
1119 4 => {
1120 let val = Encoding::read_thrift(&mut *prot)?;
1121 encoding = Some(val);
1122 }
1123 5 => {
1124 let val = i32::read_thrift(&mut *prot)?;
1125 definition_levels_byte_length = Some(val);
1126 }
1127 6 => {
1128 let val = i32::read_thrift(&mut *prot)?;
1129 repetition_levels_byte_length = Some(val);
1130 }
1131 7 => {
1132 let val = field_ident.bool_val.unwrap();
1133 is_compressed = Some(val);
1134 }
1135 _ => {
1136 prot.skip(field_ident.field_type)?;
1137 }
1138 };
1139 last_field_id = field_ident.id;
1140 }
1141 let Some(num_values) = num_values else {
1142 return Err(general_err!("Required field num_values is missing"));
1143 };
1144 let Some(num_nulls) = num_nulls else {
1145 return Err(general_err!("Required field num_nulls is missing"));
1146 };
1147 let Some(num_rows) = num_rows else {
1148 return Err(general_err!("Required field num_rows is missing"));
1149 };
1150 let Some(encoding) = encoding else {
1151 return Err(general_err!("Required field encoding is missing"));
1152 };
1153 let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1154 return Err(general_err!(
1155 "Required field definition_levels_byte_length is missing"
1156 ));
1157 };
1158 let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1159 return Err(general_err!(
1160 "Required field repetition_levels_byte_length is missing"
1161 ));
1162 };
1163 Ok(Self {
1164 num_values,
1165 num_nulls,
1166 num_rows,
1167 encoding,
1168 definition_levels_byte_length,
1169 repetition_levels_byte_length,
1170 is_compressed,
1171 statistics,
1172 })
1173 }
1174}
1175
1176thrift_struct!(
1177pub(crate) struct PageHeader {
1178 1: required PageType r#type
1180
1181 2: required i32 uncompressed_page_size
1183
1184 3: required i32 compressed_page_size
1186
1187 4: optional i32 crc
1189
1190 5: optional DataPageHeader data_page_header;
1192 6: optional IndexPageHeader index_page_header;
1193 7: optional DictionaryPageHeader dictionary_page_header;
1194 8: optional DataPageHeaderV2 data_page_header_v2;
1195}
1196);
1197
1198impl PageHeader {
1199 pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1203 where
1204 R: ThriftCompactInputProtocol<'a>,
1205 {
1206 let mut type_: Option<PageType> = None;
1207 let mut uncompressed_page_size: Option<i32> = None;
1208 let mut compressed_page_size: Option<i32> = None;
1209 let mut crc: Option<i32> = None;
1210 let mut data_page_header: Option<DataPageHeader> = None;
1211 let mut index_page_header: Option<IndexPageHeader> = None;
1212 let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1213 let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1214 let mut last_field_id = 0i16;
1215 loop {
1216 let field_ident = prot.read_field_begin(last_field_id)?;
1217 if field_ident.field_type == FieldType::Stop {
1218 break;
1219 }
1220 match field_ident.id {
1221 1 => {
1222 let val = PageType::read_thrift(&mut *prot)?;
1223 type_ = Some(val);
1224 }
1225 2 => {
1226 let val = i32::read_thrift(&mut *prot)?;
1227 uncompressed_page_size = Some(val);
1228 }
1229 3 => {
1230 let val = i32::read_thrift(&mut *prot)?;
1231 compressed_page_size = Some(val);
1232 }
1233 4 => {
1234 let val = i32::read_thrift(&mut *prot)?;
1235 crc = Some(val);
1236 }
1237 5 => {
1238 let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1239 data_page_header = Some(val);
1240 }
1241 6 => {
1242 let val = IndexPageHeader::read_thrift(&mut *prot)?;
1243 index_page_header = Some(val);
1244 }
1245 7 => {
1246 let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1247 dictionary_page_header = Some(val);
1248 }
1249 8 => {
1250 let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1251 data_page_header_v2 = Some(val);
1252 }
1253 _ => {
1254 prot.skip(field_ident.field_type)?;
1255 }
1256 };
1257 last_field_id = field_ident.id;
1258 }
1259 let Some(type_) = type_ else {
1260 return Err(general_err!("Required field type_ is missing"));
1261 };
1262 let Some(uncompressed_page_size) = uncompressed_page_size else {
1263 return Err(general_err!(
1264 "Required field uncompressed_page_size is missing"
1265 ));
1266 };
1267 let Some(compressed_page_size) = compressed_page_size else {
1268 return Err(general_err!(
1269 "Required field compressed_page_size is missing"
1270 ));
1271 };
1272 Ok(Self {
1273 r#type: type_,
1274 uncompressed_page_size,
1275 compressed_page_size,
1276 crc,
1277 data_page_header,
1278 index_page_header,
1279 dictionary_page_header,
1280 data_page_header_v2,
1281 })
1282 }
1283}
1284
1285pub(super) fn serialize_column_meta_data<W: Write>(
1309 column_chunk: &ColumnChunkMetaData,
1310 w: &mut ThriftCompactOutputProtocol<W>,
1311) -> Result<()> {
1312 use crate::file::statistics::page_stats_to_thrift;
1313
1314 column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1315 column_chunk
1316 .encodings()
1317 .collect::<Vec<_>>()
1318 .write_thrift_field(w, 2, 1)?;
1319 let path = column_chunk.column_descr.path().parts();
1320 let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1321 path.write_thrift_field(w, 3, 2)?;
1322 column_chunk.compression.write_thrift_field(w, 4, 3)?;
1323 column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1324 column_chunk
1325 .total_uncompressed_size
1326 .write_thrift_field(w, 6, 5)?;
1327 column_chunk
1328 .total_compressed_size
1329 .write_thrift_field(w, 7, 6)?;
1330 let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1332 if let Some(index_page_offset) = column_chunk.index_page_offset {
1333 last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1334 }
1335 if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1336 last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1337 }
1338 let stats = page_stats_to_thrift(column_chunk.statistics());
1340 if let Some(stats) = stats {
1341 last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1342 }
1343 if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1344 last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1345 }
1346 if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1347 last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1348 }
1349 if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1350 last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1351 }
1352
1353 let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1355 || column_chunk.repetition_level_histogram.is_some()
1356 || column_chunk.definition_level_histogram.is_some()
1357 {
1358 let repetition_level_histogram = column_chunk
1359 .repetition_level_histogram()
1360 .map(|hist| hist.clone().into_inner());
1361
1362 let definition_level_histogram = column_chunk
1363 .definition_level_histogram()
1364 .map(|hist| hist.clone().into_inner());
1365
1366 Some(SizeStatistics {
1367 unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1368 repetition_level_histogram,
1369 definition_level_histogram,
1370 })
1371 } else {
1372 None
1373 };
1374 if let Some(size_stats) = size_stats {
1375 last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1376 }
1377
1378 if let Some(geo_stats) = column_chunk.geo_statistics() {
1379 geo_stats.write_thrift_field(w, 17, last_field_id)?;
1380 }
1381
1382 w.write_struct_end()
1383}
1384
1385pub(super) struct FileMeta<'a> {
1387 pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1388 pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1389}
1390
1391impl<'a> WriteThrift for FileMeta<'a> {
1403 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1404
1405 #[allow(unused_assignments)]
1407 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1408 self.file_metadata
1409 .version
1410 .write_thrift_field(writer, 1, 0)?;
1411
1412 let root = self.file_metadata.schema_descr().root_schema_ptr();
1415 let schema_len = num_nodes(&root)?;
1416 writer.write_field_begin(FieldType::List, 2, 1)?;
1417 writer.write_list_begin(ElementType::Struct, schema_len)?;
1418 write_schema(&root, writer)?;
1420
1421 self.file_metadata
1422 .num_rows
1423 .write_thrift_field(writer, 3, 2)?;
1424
1425 let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1427
1428 if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1429 last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1430 }
1431 if let Some(created_by) = self.file_metadata.created_by() {
1432 last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1433 }
1434 if let Some(column_orders) = self.file_metadata.column_orders() {
1435 last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1436 }
1437 #[cfg(feature = "encryption")]
1438 if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1439 last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1440 }
1441 #[cfg(feature = "encryption")]
1442 if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1443 key.as_slice()
1444 .write_thrift_field(writer, 9, last_field_id)?;
1445 }
1446
1447 writer.write_struct_end()
1448 }
1449}
1450
1451fn write_schema<W: Write>(
1452 schema: &TypePtr,
1453 writer: &mut ThriftCompactOutputProtocol<W>,
1454) -> Result<()> {
1455 if !schema.is_group() {
1456 return Err(general_err!("Root schema must be Group type"));
1457 }
1458 write_schema_helper(schema, writer)
1459}
1460
1461fn write_schema_helper<W: Write>(
1462 node: &TypePtr,
1463 writer: &mut ThriftCompactOutputProtocol<W>,
1464) -> Result<()> {
1465 match node.as_ref() {
1466 crate::schema::types::Type::PrimitiveType {
1467 basic_info,
1468 physical_type,
1469 type_length,
1470 scale,
1471 precision,
1472 } => {
1473 let element = SchemaElement {
1474 r#type: Some(*physical_type),
1475 type_length: if *type_length >= 0 {
1476 Some(*type_length)
1477 } else {
1478 None
1479 },
1480 repetition_type: Some(basic_info.repetition()),
1481 name: basic_info.name(),
1482 num_children: None,
1483 converted_type: match basic_info.converted_type() {
1484 ConvertedType::NONE => None,
1485 other => Some(other),
1486 },
1487 scale: if *scale >= 0 { Some(*scale) } else { None },
1488 precision: if *precision >= 0 {
1489 Some(*precision)
1490 } else {
1491 None
1492 },
1493 field_id: if basic_info.has_id() {
1494 Some(basic_info.id())
1495 } else {
1496 None
1497 },
1498 logical_type: basic_info.logical_type_ref().cloned(),
1499 };
1500 element.write_thrift(writer)
1501 }
1502 crate::schema::types::Type::GroupType { basic_info, fields } => {
1503 let repetition = if basic_info.has_repetition() {
1504 Some(basic_info.repetition())
1505 } else {
1506 None
1507 };
1508
1509 let element = SchemaElement {
1510 r#type: None,
1511 type_length: None,
1512 repetition_type: repetition,
1513 name: basic_info.name(),
1514 num_children: Some(fields.len().try_into()?),
1515 converted_type: match basic_info.converted_type() {
1516 ConvertedType::NONE => None,
1517 other => Some(other),
1518 },
1519 scale: None,
1520 precision: None,
1521 field_id: if basic_info.has_id() {
1522 Some(basic_info.id())
1523 } else {
1524 None
1525 },
1526 logical_type: basic_info.logical_type_ref().cloned(),
1527 };
1528
1529 element.write_thrift(writer)?;
1530
1531 for field in fields {
1533 write_schema_helper(field, writer)?;
1534 }
1535 Ok(())
1536 }
1537 }
1538}
1539
1540impl WriteThrift for RowGroupMetaData {
1550 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1551
1552 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1553 self.columns.write_thrift_field(writer, 1, 0)?;
1555 self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1556 let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1557 if let Some(sorting_columns) = self.sorting_columns() {
1558 last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1559 }
1560 if let Some(file_offset) = self.file_offset() {
1561 last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1562 }
1563 last_field_id = self
1565 .compressed_size()
1566 .write_thrift_field(writer, 6, last_field_id)?;
1567 if let Some(ordinal) = self.ordinal() {
1568 ordinal.write_thrift_field(writer, 7, last_field_id)?;
1569 }
1570 writer.write_struct_end()
1571 }
1572}
1573
1574impl WriteThrift for ColumnChunkMetaData {
1586 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1587
1588 #[allow(unused_assignments)]
1589 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1590 let mut last_field_id = 0i16;
1591 if let Some(file_path) = self.file_path() {
1592 last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1593 }
1594 last_field_id = self
1595 .file_offset()
1596 .write_thrift_field(writer, 2, last_field_id)?;
1597
1598 #[cfg(feature = "encryption")]
1599 {
1600 if self.encrypted_column_metadata.is_none() {
1602 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1603 serialize_column_meta_data(self, writer)?;
1604 last_field_id = 3;
1605 }
1606 }
1607 #[cfg(not(feature = "encryption"))]
1608 {
1609 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1611 serialize_column_meta_data(self, writer)?;
1612 last_field_id = 3;
1613 }
1614
1615 if let Some(offset_idx_off) = self.offset_index_offset() {
1616 last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1617 }
1618 if let Some(offset_idx_len) = self.offset_index_length() {
1619 last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1620 }
1621 if let Some(column_idx_off) = self.column_index_offset() {
1622 last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1623 }
1624 if let Some(column_idx_len) = self.column_index_length() {
1625 last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1626 }
1627 #[cfg(feature = "encryption")]
1628 {
1629 if let Some(crypto_metadata) = self.crypto_metadata() {
1630 last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1631 }
1632 if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1633 encrypted_meta
1634 .as_slice()
1635 .write_thrift_field(writer, 9, last_field_id)?;
1636 }
1637 }
1638
1639 writer.write_struct_end()
1640 }
1641}
1642
1643impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1648 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1649
1650 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1651 let mut last_field_id = 0i16;
1652 if let Some(bbox) = self.bounding_box() {
1653 last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1654 }
1655 if let Some(geo_types) = self.geospatial_types() {
1656 geo_types.write_thrift_field(writer, 2, last_field_id)?;
1657 }
1658
1659 writer.write_struct_end()
1660 }
1661}
1662
1663use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1665write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1666
1667impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1678 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1679
1680 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1681 self.get_xmin().write_thrift_field(writer, 1, 0)?;
1682 self.get_xmax().write_thrift_field(writer, 2, 1)?;
1683 self.get_ymin().write_thrift_field(writer, 3, 2)?;
1684 let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1685
1686 if let Some(zmin) = self.get_zmin() {
1687 last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1688 }
1689 if let Some(zmax) = self.get_zmax() {
1690 last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1691 }
1692 if let Some(mmin) = self.get_mmin() {
1693 last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1694 }
1695 if let Some(mmax) = self.get_mmax() {
1696 mmax.write_thrift_field(writer, 8, last_field_id)?;
1697 }
1698
1699 writer.write_struct_end()
1700 }
1701}
1702
1703use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1705write_thrift_field!(RustBoundingBox, FieldType::Struct);
1706
1707#[cfg(test)]
1708pub(crate) mod tests {
1709 use crate::errors::Result;
1710 use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema};
1711 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
1712 use crate::parquet_thrift::tests::test_roundtrip;
1713 use crate::parquet_thrift::{
1714 ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec,
1715 };
1716 use crate::schema::types::{
1717 ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
1718 };
1719 use std::sync::Arc;
1720
1721 pub(crate) fn read_row_group(
1723 buf: &mut [u8],
1724 schema_descr: Arc<SchemaDescriptor>,
1725 ) -> Result<RowGroupMetaData> {
1726 let mut reader = ThriftSliceInputProtocol::new(buf);
1727 crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None)
1728 }
1729
1730 pub(crate) fn read_column_chunk(
1731 buf: &mut [u8],
1732 column_descr: Arc<ColumnDescriptor>,
1733 ) -> Result<ColumnChunkMetaData> {
1734 read_column_chunk_with_options(buf, column_descr, None)
1735 }
1736
1737 pub(crate) fn read_column_chunk_with_options(
1738 buf: &mut [u8],
1739 column_descr: Arc<ColumnDescriptor>,
1740 options: Option<&ParquetMetaDataOptions>,
1741 ) -> Result<ColumnChunkMetaData> {
1742 let mut reader = ThriftSliceInputProtocol::new(buf);
1743 crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
1744 }
1745
1746 pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1747 let num_nodes = num_nodes(&schema)?;
1748 let mut buf = Vec::new();
1749 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1750
1751 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1753
1754 write_schema(&schema, &mut writer)?;
1756
1757 let mut prot = ThriftSliceInputProtocol::new(&buf);
1758 let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1759 parquet_schema_from_array(se)
1760 }
1761
1762 pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1763 let num_nodes = num_nodes(schema)?;
1764 let mut buf = Vec::new();
1765 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1766
1767 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1769
1770 write_schema(schema, &mut writer)?;
1772 Ok(buf)
1773 }
1774
1775 pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1776 let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1777 read_thrift_vec(&mut prot)
1778 }
1779
1780 #[test]
1781 fn test_bounding_box_roundtrip() {
1782 test_roundtrip(BoundingBox {
1783 xmin: 0.1.into(),
1784 xmax: 10.3.into(),
1785 ymin: 0.001.into(),
1786 ymax: 128.5.into(),
1787 zmin: None,
1788 zmax: None,
1789 mmin: None,
1790 mmax: None,
1791 });
1792
1793 test_roundtrip(BoundingBox {
1794 xmin: 0.1.into(),
1795 xmax: 10.3.into(),
1796 ymin: 0.001.into(),
1797 ymax: 128.5.into(),
1798 zmin: Some(11.0.into()),
1799 zmax: Some(1300.0.into()),
1800 mmin: None,
1801 mmax: None,
1802 });
1803
1804 test_roundtrip(BoundingBox {
1805 xmin: 0.1.into(),
1806 xmax: 10.3.into(),
1807 ymin: 0.001.into(),
1808 ymax: 128.5.into(),
1809 zmin: Some(11.0.into()),
1810 zmax: Some(1300.0.into()),
1811 mmin: Some(3.7.into()),
1812 mmax: Some(42.0.into()),
1813 });
1814 }
1815}