1use std::io::Write;
21use std::sync::Arc;
22
23use crate::{
24 basic::{
25 ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, PageType, Repetition, Type,
26 },
27 data_type::{ByteArray, FixedLenByteArray, Int96},
28 errors::{ParquetError, Result},
29 file::{
30 metadata::{
31 ColumnChunkMetaData, KeyValue, LevelHistogram, PageEncodingStats, ParquetMetaData,
32 RowGroupMetaData, SortingColumn,
33 },
34 statistics::ValueStatistics,
35 },
36 parquet_thrift::{
37 read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
38 ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
39 },
40 schema::types::{
41 num_nodes, parquet_schema_from_array, ColumnDescriptor, SchemaDescriptor, TypePtr,
42 },
43 thrift_struct, thrift_union,
44 util::bit_util::FromBytes,
45};
46#[cfg(feature = "encryption")]
47use crate::{
48 encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
49 file::column_crypto_metadata::ColumnCryptoMetaData,
50 parquet_thrift::ThriftSliceInputProtocol,
51 schema::types::SchemaDescPtr,
52};
53
54thrift_struct!(
56pub(crate) struct SchemaElement<'a> {
57 1: optional Type r#type;
59 2: optional i32 type_length;
64 3: optional Repetition repetition_type;
67 4: required string<'a> name;
69 5: optional i32 num_children;
74 6: optional ConvertedType converted_type;
79 7: optional i32 scale
84 8: optional i32 precision
85 9: optional i32 field_id;
88 10: optional LogicalType logical_type
93}
94);
95
96thrift_struct!(
97pub(crate) struct AesGcmV1 {
98 1: optional binary aad_prefix
100
101 2: optional binary aad_file_unique
103
104 3: optional bool supply_aad_prefix
107}
108);
109
110thrift_struct!(
111pub(crate) struct AesGcmCtrV1 {
112 1: optional binary aad_prefix
114
115 2: optional binary aad_file_unique
117
118 3: optional bool supply_aad_prefix
121}
122);
123
124thrift_union!(
125union EncryptionAlgorithm {
126 1: (AesGcmV1) AES_GCM_V1
127 2: (AesGcmCtrV1) AES_GCM_CTR_V1
128}
129);
130
131#[cfg(feature = "encryption")]
132thrift_struct!(
133pub(crate) struct FileCryptoMetaData<'a> {
135 1: required EncryptionAlgorithm encryption_algorithm
139
140 2: optional binary<'a> key_metadata
143}
144);
145
146thrift_struct!(
148struct FileMetaData<'a> {
149 1: required i32 version
150 2: required list<'a><SchemaElement> schema;
151 3: required i64 num_rows
152 4: required list<'a><RowGroup> row_groups
153 5: optional list<KeyValue> key_value_metadata
154 6: optional string<'a> created_by
155 7: optional list<ColumnOrder> column_orders;
156 8: optional EncryptionAlgorithm encryption_algorithm
157 9: optional binary<'a> footer_signing_key_metadata
158}
159);
160
161thrift_struct!(
162struct RowGroup<'a> {
163 1: required list<'a><ColumnChunk> columns
164 2: required i64 total_byte_size
165 3: required i64 num_rows
166 4: optional list<SortingColumn> sorting_columns
167 5: optional i64 file_offset
168 7: optional i16 ordinal
171}
172);
173
174#[cfg(feature = "encryption")]
175thrift_struct!(
176struct ColumnChunk<'a> {
177 1: optional string<'a> file_path
178 2: required i64 file_offset = 0
179 3: optional ColumnMetaData<'a> meta_data
180 4: optional i64 offset_index_offset
181 5: optional i32 offset_index_length
182 6: optional i64 column_index_offset
183 7: optional i32 column_index_length
184 8: optional ColumnCryptoMetaData crypto_metadata
185 9: optional binary<'a> encrypted_column_metadata
186}
187);
188#[cfg(not(feature = "encryption"))]
189thrift_struct!(
190struct ColumnChunk<'a> {
191 1: optional string<'a> file_path
192 2: required i64 file_offset = 0
193 3: optional ColumnMetaData<'a> meta_data
194 4: optional i64 offset_index_offset
195 5: optional i32 offset_index_length
196 6: optional i64 column_index_offset
197 7: optional i32 column_index_length
198}
199);
200
201type CompressionCodec = Compression;
202thrift_struct!(
203struct ColumnMetaData<'a> {
204 1: required Type r#type
205 2: required list<Encoding> encodings
206 4: required CompressionCodec codec
209 5: required i64 num_values
210 6: required i64 total_uncompressed_size
211 7: required i64 total_compressed_size
212 9: required i64 data_page_offset
215 10: optional i64 index_page_offset
216 11: optional i64 dictionary_page_offset
217 12: optional Statistics<'a> statistics
218 13: optional list<PageEncodingStats> encoding_stats;
219 14: optional i64 bloom_filter_offset;
220 15: optional i32 bloom_filter_length;
221 16: optional SizeStatistics size_statistics;
222 17: optional GeospatialStatistics geospatial_statistics;
223}
224);
225
226thrift_struct!(
227struct BoundingBox {
228 1: required double xmin;
229 2: required double xmax;
230 3: required double ymin;
231 4: required double ymax;
232 5: optional double zmin;
233 6: optional double zmax;
234 7: optional double mmin;
235 8: optional double mmax;
236}
237);
238
239thrift_struct!(
240struct GeospatialStatistics {
241 1: optional BoundingBox bbox;
242 2: optional list<i32> geospatial_types;
243}
244);
245
246thrift_struct!(
247struct SizeStatistics {
248 1: optional i64 unencoded_byte_array_data_bytes;
249 2: optional list<i64> repetition_level_histogram;
250 3: optional list<i64> definition_level_histogram;
251}
252);
253
254thrift_struct!(
255pub(crate) struct Statistics<'a> {
256 1: optional binary<'a> max;
257 2: optional binary<'a> min;
258 3: optional i64 null_count;
259 4: optional i64 distinct_count;
260 5: optional binary<'a> max_value;
261 6: optional binary<'a> min_value;
262 7: optional bool is_max_value_exact;
263 8: optional bool is_min_value_exact;
264}
265);
266
267fn convert_row_groups(
269 mut row_groups: Vec<RowGroup>,
270 schema_descr: Arc<SchemaDescriptor>,
271) -> Result<Vec<RowGroupMetaData>> {
272 let mut res: Vec<RowGroupMetaData> = Vec::with_capacity(row_groups.len());
273 for rg in row_groups.drain(0..) {
274 res.push(convert_row_group(rg, schema_descr.clone())?);
275 }
276
277 Ok(res)
278}
279
280fn convert_row_group(
281 row_group: RowGroup,
282 schema_descr: Arc<SchemaDescriptor>,
283) -> Result<RowGroupMetaData> {
284 if schema_descr.num_columns() != row_group.columns.len() {
285 return Err(general_err!(
286 "Column count mismatch. Schema has {} columns while Row Group has {}",
287 schema_descr.num_columns(),
288 row_group.columns.len()
289 ));
290 }
291
292 let num_rows = row_group.num_rows;
293 let sorting_columns = row_group.sorting_columns;
294 let total_byte_size = row_group.total_byte_size;
295 let file_offset = row_group.file_offset;
296 let ordinal = row_group.ordinal;
297
298 let columns = convert_columns(row_group.columns, schema_descr.clone())?;
299
300 Ok(RowGroupMetaData {
301 columns,
302 num_rows,
303 sorting_columns,
304 total_byte_size,
305 schema_descr,
306 file_offset,
307 ordinal,
308 })
309}
310
311fn convert_columns(
312 mut columns: Vec<ColumnChunk>,
313 schema_descr: Arc<SchemaDescriptor>,
314) -> Result<Vec<ColumnChunkMetaData>> {
315 let mut res: Vec<ColumnChunkMetaData> = Vec::with_capacity(columns.len());
316 for (c, d) in columns.drain(0..).zip(schema_descr.columns()) {
317 res.push(convert_column(c, d.clone())?);
318 }
319
320 Ok(res)
321}
322
323fn convert_column(
324 column: ColumnChunk,
325 column_descr: Arc<ColumnDescriptor>,
326) -> Result<ColumnChunkMetaData> {
327 if column.meta_data.is_none() {
328 return Err(general_err!("Expected to have column metadata"));
329 }
330 let col_metadata = column.meta_data.unwrap();
331 let column_type = col_metadata.r#type;
332 let encodings = col_metadata.encodings;
333 let compression = col_metadata.codec;
334 let file_path = column.file_path.map(|v| v.to_owned());
335 let file_offset = column.file_offset;
336 let num_values = col_metadata.num_values;
337 let total_compressed_size = col_metadata.total_compressed_size;
338 let total_uncompressed_size = col_metadata.total_uncompressed_size;
339 let data_page_offset = col_metadata.data_page_offset;
340 let index_page_offset = col_metadata.index_page_offset;
341 let dictionary_page_offset = col_metadata.dictionary_page_offset;
342 let statistics = convert_stats(column_type, col_metadata.statistics)?;
343 let encoding_stats = col_metadata.encoding_stats;
344 let bloom_filter_offset = col_metadata.bloom_filter_offset;
345 let bloom_filter_length = col_metadata.bloom_filter_length;
346 let offset_index_offset = column.offset_index_offset;
347 let offset_index_length = column.offset_index_length;
348 let column_index_offset = column.column_index_offset;
349 let column_index_length = column.column_index_length;
350 let (unencoded_byte_array_data_bytes, repetition_level_histogram, definition_level_histogram) =
351 if let Some(size_stats) = col_metadata.size_statistics {
352 (
353 size_stats.unencoded_byte_array_data_bytes,
354 size_stats.repetition_level_histogram,
355 size_stats.definition_level_histogram,
356 )
357 } else {
358 (None, None, None)
359 };
360
361 let geo_statistics = convert_geo_stats(col_metadata.geospatial_statistics);
362
363 let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from);
364 let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from);
365
366 let result = ColumnChunkMetaData {
367 column_descr,
368 encodings,
369 file_path,
370 file_offset,
371 num_values,
372 compression,
373 total_compressed_size,
374 total_uncompressed_size,
375 data_page_offset,
376 index_page_offset,
377 dictionary_page_offset,
378 statistics,
379 geo_statistics,
380 encoding_stats,
381 bloom_filter_offset,
382 bloom_filter_length,
383 offset_index_offset,
384 offset_index_length,
385 column_index_offset,
386 column_index_length,
387 unencoded_byte_array_data_bytes,
388 repetition_level_histogram,
389 definition_level_histogram,
390 #[cfg(feature = "encryption")]
391 column_crypto_metadata: column.crypto_metadata,
392 #[cfg(feature = "encryption")]
393 encrypted_column_metadata: None,
394 };
395 Ok(result)
396}
397
398fn convert_geo_stats(
399 stats: Option<GeospatialStatistics>,
400) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
401 stats.map(|st| {
402 let bbox = convert_bounding_box(st.bbox);
403 let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
404 Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
405 bbox,
406 geospatial_types,
407 ))
408 })
409}
410
411fn convert_bounding_box(
412 bbox: Option<BoundingBox>,
413) -> Option<crate::geospatial::bounding_box::BoundingBox> {
414 bbox.map(|bb| {
415 let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
416 bb.xmin.into(),
417 bb.xmax.into(),
418 bb.ymin.into(),
419 bb.ymax.into(),
420 );
421
422 newbb = match (bb.zmin, bb.zmax) {
423 (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
424 _ => newbb,
426 };
427
428 newbb = match (bb.mmin, bb.mmax) {
429 (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
430 _ => newbb,
432 };
433
434 newbb
435 })
436}
437
438pub(crate) fn convert_stats(
439 physical_type: Type,
440 thrift_stats: Option<Statistics>,
441) -> Result<Option<crate::file::statistics::Statistics>> {
442 use crate::file::statistics::Statistics as FStatistics;
443 Ok(match thrift_stats {
444 Some(stats) => {
445 let null_count = stats.null_count.unwrap_or(0);
449
450 if null_count < 0 {
451 return Err(ParquetError::General(format!(
452 "Statistics null count is negative {null_count}",
453 )));
454 }
455
456 let null_count = Some(null_count as u64);
458 let distinct_count = stats.distinct_count.map(|value| value as u64);
460 let old_format = stats.min_value.is_none() && stats.max_value.is_none();
462 let min = if old_format {
464 stats.min
465 } else {
466 stats.min_value
467 };
468 let max = if old_format {
470 stats.max
471 } else {
472 stats.max_value
473 };
474
475 fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
476 if let Some(min) = min {
477 if min.len() < len {
478 return Err(ParquetError::General(
479 "Insufficient bytes to parse min statistic".to_string(),
480 ));
481 }
482 }
483 if let Some(max) = max {
484 if max.len() < len {
485 return Err(ParquetError::General(
486 "Insufficient bytes to parse max statistic".to_string(),
487 ));
488 }
489 }
490 Ok(())
491 }
492
493 match physical_type {
494 Type::BOOLEAN => check_len(&min, &max, 1),
495 Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
496 Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
497 Type::INT96 => check_len(&min, &max, 12),
498 _ => Ok(()),
499 }?;
500
501 let res = match physical_type {
506 Type::BOOLEAN => FStatistics::boolean(
507 min.map(|data| data[0] != 0),
508 max.map(|data| data[0] != 0),
509 distinct_count,
510 null_count,
511 old_format,
512 ),
513 Type::INT32 => FStatistics::int32(
514 min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
515 max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
516 distinct_count,
517 null_count,
518 old_format,
519 ),
520 Type::INT64 => FStatistics::int64(
521 min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
522 max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
523 distinct_count,
524 null_count,
525 old_format,
526 ),
527 Type::INT96 => {
528 let min = if let Some(data) = min {
530 assert_eq!(data.len(), 12);
531 Some(Int96::try_from_le_slice(data)?)
532 } else {
533 None
534 };
535 let max = if let Some(data) = max {
536 assert_eq!(data.len(), 12);
537 Some(Int96::try_from_le_slice(data)?)
538 } else {
539 None
540 };
541 FStatistics::int96(min, max, distinct_count, null_count, old_format)
542 }
543 Type::FLOAT => FStatistics::float(
544 min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
545 max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
546 distinct_count,
547 null_count,
548 old_format,
549 ),
550 Type::DOUBLE => FStatistics::double(
551 min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
552 max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
553 distinct_count,
554 null_count,
555 old_format,
556 ),
557 Type::BYTE_ARRAY => FStatistics::ByteArray(
558 ValueStatistics::new(
559 min.map(ByteArray::from),
560 max.map(ByteArray::from),
561 distinct_count,
562 null_count,
563 old_format,
564 )
565 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
566 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
567 ),
568 Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
569 ValueStatistics::new(
570 min.map(ByteArray::from).map(FixedLenByteArray::from),
571 max.map(ByteArray::from).map(FixedLenByteArray::from),
572 distinct_count,
573 null_count,
574 old_format,
575 )
576 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
577 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
578 ),
579 };
580
581 Some(res)
582 }
583 None => None,
584 })
585}
586
587#[cfg(feature = "encryption")]
588fn row_group_from_encrypted_thrift(
589 mut rg: RowGroup,
590 schema_descr: SchemaDescPtr,
591 decryptor: Option<&FileDecryptor>,
592) -> Result<RowGroupMetaData> {
593 if schema_descr.num_columns() != rg.columns.len() {
594 return Err(general_err!(
595 "Column count mismatch. Schema has {} columns while Row Group has {}",
596 schema_descr.num_columns(),
597 rg.columns.len()
598 ));
599 }
600 let total_byte_size = rg.total_byte_size;
601 let num_rows = rg.num_rows;
602 let mut columns = vec![];
603
604 for (i, (mut c, d)) in rg
605 .columns
606 .drain(0..)
607 .zip(schema_descr.columns())
608 .enumerate()
609 {
610 if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
612 let column_decryptor = match c.crypto_metadata.as_ref() {
613 None => {
614 return Err(general_err!(
615 "No crypto_metadata is set for column '{}', which has encrypted metadata",
616 d.path().string()
617 ));
618 }
619 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
620 let column_name = crypto_metadata.path_in_schema.join(".");
621 decryptor.get_column_metadata_decryptor(
622 column_name.as_str(),
623 crypto_metadata.key_metadata.as_deref(),
624 )?
625 }
626 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
627 decryptor.get_footer_decryptor()?
628 }
629 };
630
631 let column_aad = crate::encryption::modules::create_module_aad(
632 decryptor.file_aad(),
633 crate::encryption::modules::ModuleType::ColumnMetaData,
634 rg.ordinal.unwrap() as usize,
635 i,
636 None,
637 )?;
638
639 let buf = c.encrypted_column_metadata.unwrap();
640 let decrypted_cc_buf =
641 column_decryptor
642 .decrypt(buf, column_aad.as_ref())
643 .map_err(|_| {
644 general_err!(
645 "Unable to decrypt column '{}', perhaps the column key is wrong?",
646 d.path().string()
647 )
648 })?;
649
650 let mut prot = ThriftSliceInputProtocol::new(decrypted_cc_buf.as_slice());
651 let col_meta = ColumnMetaData::read_thrift(&mut prot)?;
652 c.meta_data = Some(col_meta);
653 columns.push(convert_column(c, d.clone())?);
654 } else {
655 columns.push(convert_column(c, d.clone())?);
656 }
657 }
658
659 let sorting_columns = rg.sorting_columns;
660 let file_offset = rg.file_offset;
661 let ordinal = rg.ordinal;
662
663 Ok(RowGroupMetaData {
664 columns,
665 num_rows,
666 sorting_columns,
667 total_byte_size,
668 schema_descr,
669 file_offset,
670 ordinal,
671 })
672}
673
674#[cfg(feature = "encryption")]
675pub(crate) fn parquet_metadata_with_encryption(
685 file_decryption_properties: Option<&FileDecryptionProperties>,
686 encrypted_footer: bool,
687 buf: &[u8],
688) -> Result<ParquetMetaData> {
689 let mut prot = ThriftSliceInputProtocol::new(buf);
690 let mut file_decryptor = None;
691 let decrypted_fmd_buf;
692
693 if encrypted_footer {
694 if let Some(file_decryption_properties) = file_decryption_properties {
695 let t_file_crypto_metadata: FileCryptoMetaData =
696 FileCryptoMetaData::read_thrift(&mut prot)
697 .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
698 let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
699 EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix,
700 _ => Some(false),
701 }
702 .unwrap_or(false);
703 if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
704 return Err(general_err!(
705 "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
706 but no AAD prefix was provided in the file decryption properties"
707 ));
708 }
709 let decryptor = get_file_decryptor(
710 t_file_crypto_metadata.encryption_algorithm,
711 t_file_crypto_metadata.key_metadata,
712 file_decryption_properties,
713 )?;
714 let footer_decryptor = decryptor.get_footer_decryptor();
715 let aad_footer = crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
716
717 decrypted_fmd_buf = footer_decryptor?
718 .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
719 .map_err(|_| {
720 general_err!(
721 "Provided footer key and AAD were unable to decrypt parquet footer"
722 )
723 })?;
724 prot = ThriftSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
725
726 file_decryptor = Some(decryptor);
727 } else {
728 return Err(general_err!(
729 "Parquet file has an encrypted footer but decryption properties were not provided"
730 ));
731 }
732 }
733
734 let file_meta = FileMetaData::read_thrift(&mut prot)
735 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
736
737 let version = file_meta.version;
738 let num_rows = file_meta.num_rows;
739 let created_by = file_meta.created_by.map(|c| c.to_owned());
740 let key_value_metadata = file_meta.key_value_metadata;
741
742 let val = parquet_schema_from_array(file_meta.schema)?;
743 let schema_descr = Arc::new(SchemaDescriptor::new(val));
744
745 if let (Some(algo), Some(file_decryption_properties)) =
746 (file_meta.encryption_algorithm, file_decryption_properties)
747 {
748 let file_decryptor_value = get_file_decryptor(
750 algo,
751 file_meta.footer_signing_key_metadata,
752 file_decryption_properties,
753 )?;
754 if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
755 file_decryptor_value.verify_plaintext_footer_signature(buf)?;
756 }
757 file_decryptor = Some(file_decryptor_value);
758 }
759
760 let mut row_groups = Vec::with_capacity(file_meta.row_groups.len());
762 for rg in file_meta.row_groups {
763 let r = row_group_from_encrypted_thrift(rg, schema_descr.clone(), file_decryptor.as_ref())?;
764 row_groups.push(r);
765 }
766
767 if file_meta
769 .column_orders
770 .as_ref()
771 .is_some_and(|cos| cos.len() != schema_descr.num_columns())
772 {
773 return Err(general_err!("Column order length mismatch"));
774 }
775
776 let column_orders = file_meta.column_orders.map(|cos| {
777 let mut res = Vec::with_capacity(cos.len());
778 for (i, column) in schema_descr.columns().iter().enumerate() {
779 match cos[i] {
780 ColumnOrder::TYPE_DEFINED_ORDER(_) => {
781 let sort_order = ColumnOrder::get_sort_order(
782 column.logical_type(),
783 column.converted_type(),
784 column.physical_type(),
785 );
786 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
787 }
788 _ => res.push(cos[i]),
789 }
790 }
791 res
792 });
793
794 let fmd = crate::file::metadata::FileMetaData::new(
795 version,
796 num_rows,
797 created_by,
798 key_value_metadata,
799 schema_descr,
800 column_orders,
801 );
802 let mut metadata = ParquetMetaData::new(fmd, row_groups);
803
804 metadata.with_file_decryptor(file_decryptor);
805
806 Ok(metadata)
807}
808
809#[cfg(feature = "encryption")]
810fn get_file_decryptor(
811 encryption_algorithm: EncryptionAlgorithm,
812 footer_key_metadata: Option<&[u8]>,
813 file_decryption_properties: &FileDecryptionProperties,
814) -> Result<FileDecryptor> {
815 match encryption_algorithm {
816 EncryptionAlgorithm::AES_GCM_V1(algo) => {
817 let aad_file_unique = algo
818 .aad_file_unique
819 .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
820 let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
821 aad_prefix.clone()
822 } else {
823 algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
824 };
825 let aad_file_unique = aad_file_unique.to_vec();
826
827 FileDecryptor::new(
828 file_decryption_properties,
829 footer_key_metadata,
830 aad_file_unique,
831 aad_prefix,
832 )
833 }
834 EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
835 "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
836 )),
837 }
838}
839
840impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ParquetMetaData {
843 fn read_thrift(prot: &mut R) -> Result<Self> {
844 let file_meta = FileMetaData::read_thrift(prot)?;
845
846 let version = file_meta.version;
847 let num_rows = file_meta.num_rows;
848 let row_groups = file_meta.row_groups;
849 let created_by = file_meta.created_by.map(|c| c.to_owned());
850 let key_value_metadata = file_meta.key_value_metadata;
851
852 let val = parquet_schema_from_array(file_meta.schema)?;
853 let schema_descr = Arc::new(SchemaDescriptor::new(val));
854
855 let row_groups = convert_row_groups(row_groups, schema_descr.clone())?;
857
858 if file_meta
860 .column_orders
861 .as_ref()
862 .is_some_and(|cos| cos.len() != schema_descr.num_columns())
863 {
864 return Err(general_err!("Column order length mismatch"));
865 }
866
867 let column_orders = file_meta.column_orders.map(|cos| {
868 let mut res = Vec::with_capacity(cos.len());
869 for (i, column) in schema_descr.columns().iter().enumerate() {
870 match cos[i] {
871 ColumnOrder::TYPE_DEFINED_ORDER(_) => {
872 let sort_order = ColumnOrder::get_sort_order(
873 column.logical_type(),
874 column.converted_type(),
875 column.physical_type(),
876 );
877 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
878 }
879 _ => res.push(cos[i]),
880 }
881 }
882 res
883 });
884
885 let fmd = crate::file::metadata::FileMetaData::new(
886 version,
887 num_rows,
888 created_by,
889 key_value_metadata,
890 schema_descr,
891 column_orders,
892 );
893
894 Ok(ParquetMetaData::new(fmd, row_groups))
895 }
896}
897
898thrift_struct!(
899 pub(crate) struct IndexPageHeader {}
900);
901
902thrift_struct!(
903pub(crate) struct DictionaryPageHeader {
904 1: required i32 num_values;
906
907 2: required Encoding encoding
909
910 3: optional bool is_sorted;
912}
913);
914
915thrift_struct!(
916pub(crate) struct PageStatistics {
924 1: optional binary max;
925 2: optional binary min;
926 3: optional i64 null_count;
927 4: optional i64 distinct_count;
928 5: optional binary max_value;
929 6: optional binary min_value;
930 7: optional bool is_max_value_exact;
931 8: optional bool is_min_value_exact;
932}
933);
934
935thrift_struct!(
936pub(crate) struct DataPageHeader {
937 1: required i32 num_values
938 2: required Encoding encoding
939 3: required Encoding definition_level_encoding;
940 4: required Encoding repetition_level_encoding;
941 5: optional PageStatistics statistics;
942}
943);
944
945impl DataPageHeader {
946 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
948 where
949 R: ThriftCompactInputProtocol<'a>,
950 {
951 let mut num_values: Option<i32> = None;
952 let mut encoding: Option<Encoding> = None;
953 let mut definition_level_encoding: Option<Encoding> = None;
954 let mut repetition_level_encoding: Option<Encoding> = None;
955 let statistics: Option<PageStatistics> = None;
956 let mut last_field_id = 0i16;
957 loop {
958 let field_ident = prot.read_field_begin(last_field_id)?;
959 if field_ident.field_type == FieldType::Stop {
960 break;
961 }
962 match field_ident.id {
963 1 => {
964 let val = i32::read_thrift(&mut *prot)?;
965 num_values = Some(val);
966 }
967 2 => {
968 let val = Encoding::read_thrift(&mut *prot)?;
969 encoding = Some(val);
970 }
971 3 => {
972 let val = Encoding::read_thrift(&mut *prot)?;
973 definition_level_encoding = Some(val);
974 }
975 4 => {
976 let val = Encoding::read_thrift(&mut *prot)?;
977 repetition_level_encoding = Some(val);
978 }
979 _ => {
980 prot.skip(field_ident.field_type)?;
981 }
982 };
983 last_field_id = field_ident.id;
984 }
985 let Some(num_values) = num_values else {
986 return Err(ParquetError::General(
987 "Required field num_values is missing".to_owned(),
988 ));
989 };
990 let Some(encoding) = encoding else {
991 return Err(ParquetError::General(
992 "Required field encoding is missing".to_owned(),
993 ));
994 };
995 let Some(definition_level_encoding) = definition_level_encoding else {
996 return Err(ParquetError::General(
997 "Required field definition_level_encoding is missing".to_owned(),
998 ));
999 };
1000 let Some(repetition_level_encoding) = repetition_level_encoding else {
1001 return Err(ParquetError::General(
1002 "Required field repetition_level_encoding is missing".to_owned(),
1003 ));
1004 };
1005 Ok(Self {
1006 num_values,
1007 encoding,
1008 definition_level_encoding,
1009 repetition_level_encoding,
1010 statistics,
1011 })
1012 }
1013}
1014
1015thrift_struct!(
1016pub(crate) struct DataPageHeaderV2 {
1017 1: required i32 num_values
1018 2: required i32 num_nulls
1019 3: required i32 num_rows
1020 4: required Encoding encoding
1021 5: required i32 definition_levels_byte_length;
1022 6: required i32 repetition_levels_byte_length;
1023 7: optional bool is_compressed = true;
1024 8: optional PageStatistics statistics;
1025}
1026);
1027
1028impl DataPageHeaderV2 {
1029 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1031 where
1032 R: ThriftCompactInputProtocol<'a>,
1033 {
1034 let mut num_values: Option<i32> = None;
1035 let mut num_nulls: Option<i32> = None;
1036 let mut num_rows: Option<i32> = None;
1037 let mut encoding: Option<Encoding> = None;
1038 let mut definition_levels_byte_length: Option<i32> = None;
1039 let mut repetition_levels_byte_length: Option<i32> = None;
1040 let mut is_compressed: Option<bool> = None;
1041 let statistics: Option<PageStatistics> = None;
1042 let mut last_field_id = 0i16;
1043 loop {
1044 let field_ident = prot.read_field_begin(last_field_id)?;
1045 if field_ident.field_type == FieldType::Stop {
1046 break;
1047 }
1048 match field_ident.id {
1049 1 => {
1050 let val = i32::read_thrift(&mut *prot)?;
1051 num_values = Some(val);
1052 }
1053 2 => {
1054 let val = i32::read_thrift(&mut *prot)?;
1055 num_nulls = Some(val);
1056 }
1057 3 => {
1058 let val = i32::read_thrift(&mut *prot)?;
1059 num_rows = Some(val);
1060 }
1061 4 => {
1062 let val = Encoding::read_thrift(&mut *prot)?;
1063 encoding = Some(val);
1064 }
1065 5 => {
1066 let val = i32::read_thrift(&mut *prot)?;
1067 definition_levels_byte_length = Some(val);
1068 }
1069 6 => {
1070 let val = i32::read_thrift(&mut *prot)?;
1071 repetition_levels_byte_length = Some(val);
1072 }
1073 7 => {
1074 let val = field_ident.bool_val.unwrap();
1075 is_compressed = Some(val);
1076 }
1077 _ => {
1078 prot.skip(field_ident.field_type)?;
1079 }
1080 };
1081 last_field_id = field_ident.id;
1082 }
1083 let Some(num_values) = num_values else {
1084 return Err(ParquetError::General(
1085 "Required field num_values is missing".to_owned(),
1086 ));
1087 };
1088 let Some(num_nulls) = num_nulls else {
1089 return Err(ParquetError::General(
1090 "Required field num_nulls is missing".to_owned(),
1091 ));
1092 };
1093 let Some(num_rows) = num_rows else {
1094 return Err(ParquetError::General(
1095 "Required field num_rows is missing".to_owned(),
1096 ));
1097 };
1098 let Some(encoding) = encoding else {
1099 return Err(ParquetError::General(
1100 "Required field encoding is missing".to_owned(),
1101 ));
1102 };
1103 let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1104 return Err(ParquetError::General(
1105 "Required field definition_levels_byte_length is missing".to_owned(),
1106 ));
1107 };
1108 let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1109 return Err(ParquetError::General(
1110 "Required field repetition_levels_byte_length is missing".to_owned(),
1111 ));
1112 };
1113 Ok(Self {
1114 num_values,
1115 num_nulls,
1116 num_rows,
1117 encoding,
1118 definition_levels_byte_length,
1119 repetition_levels_byte_length,
1120 is_compressed,
1121 statistics,
1122 })
1123 }
1124}
1125
1126thrift_struct!(
1127pub(crate) struct PageHeader {
1128 1: required PageType r#type
1130
1131 2: required i32 uncompressed_page_size
1133
1134 3: required i32 compressed_page_size
1136
1137 4: optional i32 crc
1139
1140 5: optional DataPageHeader data_page_header;
1142 6: optional IndexPageHeader index_page_header;
1143 7: optional DictionaryPageHeader dictionary_page_header;
1144 8: optional DataPageHeaderV2 data_page_header_v2;
1145}
1146);
1147
1148impl PageHeader {
1149 pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1153 where
1154 R: ThriftCompactInputProtocol<'a>,
1155 {
1156 let mut type_: Option<PageType> = None;
1157 let mut uncompressed_page_size: Option<i32> = None;
1158 let mut compressed_page_size: Option<i32> = None;
1159 let mut crc: Option<i32> = None;
1160 let mut data_page_header: Option<DataPageHeader> = None;
1161 let mut index_page_header: Option<IndexPageHeader> = None;
1162 let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1163 let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1164 let mut last_field_id = 0i16;
1165 loop {
1166 let field_ident = prot.read_field_begin(last_field_id)?;
1167 if field_ident.field_type == FieldType::Stop {
1168 break;
1169 }
1170 match field_ident.id {
1171 1 => {
1172 let val = PageType::read_thrift(&mut *prot)?;
1173 type_ = Some(val);
1174 }
1175 2 => {
1176 let val = i32::read_thrift(&mut *prot)?;
1177 uncompressed_page_size = Some(val);
1178 }
1179 3 => {
1180 let val = i32::read_thrift(&mut *prot)?;
1181 compressed_page_size = Some(val);
1182 }
1183 4 => {
1184 let val = i32::read_thrift(&mut *prot)?;
1185 crc = Some(val);
1186 }
1187 5 => {
1188 let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1189 data_page_header = Some(val);
1190 }
1191 6 => {
1192 let val = IndexPageHeader::read_thrift(&mut *prot)?;
1193 index_page_header = Some(val);
1194 }
1195 7 => {
1196 let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1197 dictionary_page_header = Some(val);
1198 }
1199 8 => {
1200 let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1201 data_page_header_v2 = Some(val);
1202 }
1203 _ => {
1204 prot.skip(field_ident.field_type)?;
1205 }
1206 };
1207 last_field_id = field_ident.id;
1208 }
1209 let Some(type_) = type_ else {
1210 return Err(ParquetError::General(
1211 "Required field type_ is missing".to_owned(),
1212 ));
1213 };
1214 let Some(uncompressed_page_size) = uncompressed_page_size else {
1215 return Err(ParquetError::General(
1216 "Required field uncompressed_page_size is missing".to_owned(),
1217 ));
1218 };
1219 let Some(compressed_page_size) = compressed_page_size else {
1220 return Err(ParquetError::General(
1221 "Required field compressed_page_size is missing".to_owned(),
1222 ));
1223 };
1224 Ok(Self {
1225 r#type: type_,
1226 uncompressed_page_size,
1227 compressed_page_size,
1228 crc,
1229 data_page_header,
1230 index_page_header,
1231 dictionary_page_header,
1232 data_page_header_v2,
1233 })
1234 }
1235}
1236
1237pub(crate) fn serialize_column_meta_data<W: Write>(
1261 column_chunk: &ColumnChunkMetaData,
1262 w: &mut ThriftCompactOutputProtocol<W>,
1263) -> Result<()> {
1264 use crate::file::statistics::page_stats_to_thrift;
1265
1266 column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1267 column_chunk.encodings.write_thrift_field(w, 2, 1)?;
1268 let path = column_chunk.column_descr.path().parts();
1269 let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1270 path.write_thrift_field(w, 3, 2)?;
1271 column_chunk.compression.write_thrift_field(w, 4, 3)?;
1272 column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1273 column_chunk
1274 .total_uncompressed_size
1275 .write_thrift_field(w, 6, 5)?;
1276 column_chunk
1277 .total_compressed_size
1278 .write_thrift_field(w, 7, 6)?;
1279 let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1281 if let Some(index_page_offset) = column_chunk.index_page_offset {
1282 last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1283 }
1284 if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1285 last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1286 }
1287 let stats = page_stats_to_thrift(column_chunk.statistics());
1289 if let Some(stats) = stats {
1290 last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1291 }
1292 if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1293 last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1294 }
1295 if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1296 last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1297 }
1298 if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1299 last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1300 }
1301
1302 let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1304 || column_chunk.repetition_level_histogram.is_some()
1305 || column_chunk.definition_level_histogram.is_some()
1306 {
1307 let repetition_level_histogram = column_chunk
1308 .repetition_level_histogram()
1309 .map(|hist| hist.clone().into_inner());
1310
1311 let definition_level_histogram = column_chunk
1312 .definition_level_histogram()
1313 .map(|hist| hist.clone().into_inner());
1314
1315 Some(SizeStatistics {
1316 unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1317 repetition_level_histogram,
1318 definition_level_histogram,
1319 })
1320 } else {
1321 None
1322 };
1323 if let Some(size_stats) = size_stats {
1324 last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1325 }
1326
1327 if let Some(geo_stats) = column_chunk.geo_statistics() {
1328 geo_stats.write_thrift_field(w, 17, last_field_id)?;
1329 }
1330
1331 w.write_struct_end()
1332}
1333
1334pub(crate) struct FileMeta<'a> {
1336 pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
1337 pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
1338 pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
1339 pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
1340}
1341
1342impl<'a> WriteThrift for FileMeta<'a> {
1343 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1344
1345 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1346 self.file_metadata
1347 .version
1348 .write_thrift_field(writer, 1, 0)?;
1349
1350 let root = self.file_metadata.schema_descr().root_schema_ptr();
1353 let schema_len = num_nodes(&root)?;
1354 writer.write_field_begin(FieldType::List, 2, 1)?;
1355 writer.write_list_begin(ElementType::Struct, schema_len)?;
1356 write_schema(&root, writer)?;
1358
1359 self.file_metadata
1360 .num_rows
1361 .write_thrift_field(writer, 3, 2)?;
1362
1363 let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1365
1366 if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1367 last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1368 }
1369 if let Some(created_by) = self.file_metadata.created_by() {
1370 last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1371 }
1372 if let Some(column_orders) = self.file_metadata.column_orders() {
1373 last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1374 }
1375 if let Some(algo) = self.encryption_algorithm.as_ref() {
1376 last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1377 }
1378 if let Some(key) = self.footer_signing_key_metadata.as_ref() {
1379 key.as_slice()
1380 .write_thrift_field(writer, 9, last_field_id)?;
1381 }
1382
1383 writer.write_struct_end()
1384 }
1385}
1386
1387fn write_schema<W: Write>(
1388 schema: &TypePtr,
1389 writer: &mut ThriftCompactOutputProtocol<W>,
1390) -> Result<()> {
1391 if !schema.is_group() {
1392 return Err(general_err!("Root schema must be Group type"));
1393 }
1394 write_schema_helper(schema, writer)
1395}
1396
1397fn write_schema_helper<W: Write>(
1398 node: &TypePtr,
1399 writer: &mut ThriftCompactOutputProtocol<W>,
1400) -> Result<()> {
1401 match node.as_ref() {
1402 crate::schema::types::Type::PrimitiveType {
1403 basic_info,
1404 physical_type,
1405 type_length,
1406 scale,
1407 precision,
1408 } => {
1409 let element = SchemaElement {
1410 r#type: Some(*physical_type),
1411 type_length: if *type_length >= 0 {
1412 Some(*type_length)
1413 } else {
1414 None
1415 },
1416 repetition_type: Some(basic_info.repetition()),
1417 name: basic_info.name(),
1418 num_children: None,
1419 converted_type: match basic_info.converted_type() {
1420 ConvertedType::NONE => None,
1421 other => Some(other),
1422 },
1423 scale: if *scale >= 0 { Some(*scale) } else { None },
1424 precision: if *precision >= 0 {
1425 Some(*precision)
1426 } else {
1427 None
1428 },
1429 field_id: if basic_info.has_id() {
1430 Some(basic_info.id())
1431 } else {
1432 None
1433 },
1434 logical_type: basic_info.logical_type(),
1435 };
1436 element.write_thrift(writer)
1437 }
1438 crate::schema::types::Type::GroupType { basic_info, fields } => {
1439 let repetition = if basic_info.has_repetition() {
1440 Some(basic_info.repetition())
1441 } else {
1442 None
1443 };
1444
1445 let element = SchemaElement {
1446 r#type: None,
1447 type_length: None,
1448 repetition_type: repetition,
1449 name: basic_info.name(),
1450 num_children: Some(fields.len().try_into()?),
1451 converted_type: match basic_info.converted_type() {
1452 ConvertedType::NONE => None,
1453 other => Some(other),
1454 },
1455 scale: None,
1456 precision: None,
1457 field_id: if basic_info.has_id() {
1458 Some(basic_info.id())
1459 } else {
1460 None
1461 },
1462 logical_type: basic_info.logical_type(),
1463 };
1464
1465 element.write_thrift(writer)?;
1466
1467 for field in fields {
1469 write_schema_helper(field, writer)?;
1470 }
1471 Ok(())
1472 }
1473 }
1474}
1475
1476impl WriteThrift for RowGroupMetaData {
1486 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1487
1488 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1489 self.columns.write_thrift_field(writer, 1, 0)?;
1491 self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1492 let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1493 if let Some(sorting_columns) = self.sorting_columns() {
1494 last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1495 }
1496 if let Some(file_offset) = self.file_offset() {
1497 last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1498 }
1499 last_field_id = self
1501 .compressed_size()
1502 .write_thrift_field(writer, 6, last_field_id)?;
1503 if let Some(ordinal) = self.ordinal() {
1504 ordinal.write_thrift_field(writer, 7, last_field_id)?;
1505 }
1506 writer.write_struct_end()
1507 }
1508}
1509
1510impl WriteThrift for ColumnChunkMetaData {
1522 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1523
1524 #[allow(unused_assignments)]
1525 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1526 let mut last_field_id = 0i16;
1527 if let Some(file_path) = self.file_path() {
1528 last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1529 }
1530 last_field_id = self
1531 .file_offset()
1532 .write_thrift_field(writer, 2, last_field_id)?;
1533
1534 #[cfg(feature = "encryption")]
1535 {
1536 if self.encrypted_column_metadata.is_none() {
1538 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1539 serialize_column_meta_data(self, writer)?;
1540 last_field_id = 3;
1541 }
1542 }
1543 #[cfg(not(feature = "encryption"))]
1544 {
1545 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1547 serialize_column_meta_data(self, writer)?;
1548 last_field_id = 3;
1549 }
1550
1551 if let Some(offset_idx_off) = self.offset_index_offset() {
1552 last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1553 }
1554 if let Some(offset_idx_len) = self.offset_index_length() {
1555 last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1556 }
1557 if let Some(column_idx_off) = self.column_index_offset() {
1558 last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1559 }
1560 if let Some(column_idx_len) = self.column_index_length() {
1561 last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1562 }
1563 #[cfg(feature = "encryption")]
1564 {
1565 if let Some(crypto_metadata) = self.crypto_metadata() {
1566 last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1567 }
1568 if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1569 encrypted_meta
1570 .as_slice()
1571 .write_thrift_field(writer, 9, last_field_id)?;
1572 }
1573 }
1574
1575 writer.write_struct_end()
1576 }
1577}
1578
1579impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1584 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1585
1586 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1587 let mut last_field_id = 0i16;
1588 if let Some(bbox) = self.bounding_box() {
1589 last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1590 }
1591 if let Some(geo_types) = self.geospatial_types() {
1592 geo_types.write_thrift_field(writer, 2, last_field_id)?;
1593 }
1594
1595 writer.write_struct_end()
1596 }
1597}
1598
1599impl WriteThriftField for crate::geospatial::statistics::GeospatialStatistics {
1600 fn write_thrift_field<W: Write>(
1601 &self,
1602 writer: &mut ThriftCompactOutputProtocol<W>,
1603 field_id: i16,
1604 last_field_id: i16,
1605 ) -> Result<i16> {
1606 writer.write_field_begin(FieldType::Struct, field_id, last_field_id)?;
1607 self.write_thrift(writer)?;
1608 Ok(field_id)
1609 }
1610}
1611
1612impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1623 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1624
1625 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1626 self.get_xmin().write_thrift_field(writer, 1, 0)?;
1627 self.get_xmax().write_thrift_field(writer, 2, 1)?;
1628 self.get_ymin().write_thrift_field(writer, 3, 2)?;
1629 let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1630
1631 if let Some(zmin) = self.get_zmin() {
1632 last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1633 }
1634 if let Some(zmax) = self.get_zmax() {
1635 last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1636 }
1637 if let Some(mmin) = self.get_mmin() {
1638 last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1639 }
1640 if let Some(mmax) = self.get_mmax() {
1641 mmax.write_thrift_field(writer, 8, last_field_id)?;
1642 }
1643
1644 writer.write_struct_end()
1645 }
1646}
1647
1648impl WriteThriftField for crate::geospatial::bounding_box::BoundingBox {
1649 fn write_thrift_field<W: Write>(
1650 &self,
1651 writer: &mut ThriftCompactOutputProtocol<W>,
1652 field_id: i16,
1653 last_field_id: i16,
1654 ) -> Result<i16> {
1655 writer.write_field_begin(FieldType::Struct, field_id, last_field_id)?;
1656 self.write_thrift(writer)?;
1657 Ok(field_id)
1658 }
1659}
1660
1661#[cfg(test)]
1662pub(crate) mod tests {
1663 use crate::errors::Result;
1664 use crate::file::metadata::thrift_gen::{
1665 convert_column, convert_row_group, write_schema, BoundingBox, ColumnChunk, RowGroup,
1666 SchemaElement,
1667 };
1668 use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1669 use crate::parquet_thrift::tests::test_roundtrip;
1670 use crate::parquet_thrift::{
1671 read_thrift_vec, ElementType, ReadThrift, ThriftCompactOutputProtocol,
1672 ThriftSliceInputProtocol,
1673 };
1674 use crate::schema::types::{
1675 num_nodes, parquet_schema_from_array, ColumnDescriptor, SchemaDescriptor, TypePtr,
1676 };
1677 use std::sync::Arc;
1678
1679 pub(crate) fn read_row_group(
1681 buf: &mut [u8],
1682 schema_descr: Arc<SchemaDescriptor>,
1683 ) -> Result<RowGroupMetaData> {
1684 let mut reader = ThriftSliceInputProtocol::new(buf);
1685 let rg = RowGroup::read_thrift(&mut reader)?;
1686 convert_row_group(rg, schema_descr)
1687 }
1688
1689 pub(crate) fn read_column_chunk(
1690 buf: &mut [u8],
1691 column_descr: Arc<ColumnDescriptor>,
1692 ) -> Result<ColumnChunkMetaData> {
1693 let mut reader = ThriftSliceInputProtocol::new(buf);
1694 let cc = ColumnChunk::read_thrift(&mut reader)?;
1695 convert_column(cc, column_descr)
1696 }
1697
1698 pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1699 let num_nodes = num_nodes(&schema)?;
1700 let mut buf = Vec::new();
1701 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1702
1703 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1705
1706 write_schema(&schema, &mut writer)?;
1708
1709 let mut prot = ThriftSliceInputProtocol::new(&buf);
1710 let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1711 parquet_schema_from_array(se)
1712 }
1713
1714 pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1715 let num_nodes = num_nodes(schema)?;
1716 let mut buf = Vec::new();
1717 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1718
1719 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1721
1722 write_schema(schema, &mut writer)?;
1724 Ok(buf)
1725 }
1726
1727 pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1728 let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1729 read_thrift_vec(&mut prot)
1730 }
1731
1732 #[test]
1733 fn test_bounding_box_roundtrip() {
1734 test_roundtrip(BoundingBox {
1735 xmin: 0.1.into(),
1736 xmax: 10.3.into(),
1737 ymin: 0.001.into(),
1738 ymax: 128.5.into(),
1739 zmin: None,
1740 zmax: None,
1741 mmin: None,
1742 mmax: None,
1743 });
1744
1745 test_roundtrip(BoundingBox {
1746 xmin: 0.1.into(),
1747 xmax: 10.3.into(),
1748 ymin: 0.001.into(),
1749 ymax: 128.5.into(),
1750 zmin: Some(11.0.into()),
1751 zmax: Some(1300.0.into()),
1752 mmin: None,
1753 mmax: None,
1754 });
1755
1756 test_roundtrip(BoundingBox {
1757 xmin: 0.1.into(),
1758 xmax: 10.3.into(),
1759 ymin: 0.001.into(),
1760 ymax: 128.5.into(),
1761 zmin: Some(11.0.into()),
1762 zmax: Some(1300.0.into()),
1763 mmin: Some(3.7.into()),
1764 mmax: Some(42.0.into()),
1765 });
1766 }
1767}