1use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34 column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37 basic::{
38 ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType,
39 Repetition, Type,
40 },
41 data_type::{ByteArray, FixedLenByteArray, Int96},
42 errors::{ParquetError, Result},
43 file::{
44 metadata::{
45 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46 PageEncodingStats, ParquetMetaData, RowGroupMetaData, RowGroupMetaDataBuilder,
47 SortingColumn,
48 },
49 statistics::ValueStatistics,
50 },
51 parquet_thrift::{
52 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53 ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54 read_thrift_vec,
55 },
56 schema::types::{
57 ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58 },
59 thrift_struct,
60 util::bit_util::FromBytes,
61};
62
63thrift_struct!(
65pub(crate) struct SchemaElement<'a> {
66 1: optional Type r#type;
68 2: optional i32 type_length;
73 3: optional Repetition repetition_type;
76 4: required string<'a> name;
78 5: optional i32 num_children;
83 6: optional ConvertedType converted_type;
88 7: optional i32 scale
93 8: optional i32 precision
94 9: optional i32 field_id;
97 10: optional LogicalType logical_type
102}
103);
104
105thrift_struct!(
106struct Statistics<'a> {
107 1: optional binary<'a> max;
108 2: optional binary<'a> min;
109 3: optional i64 null_count;
110 4: optional i64 distinct_count;
111 5: optional binary<'a> max_value;
112 6: optional binary<'a> min_value;
113 7: optional bool is_max_value_exact;
114 8: optional bool is_min_value_exact;
115}
116);
117
118thrift_struct!(
119struct BoundingBox {
120 1: required double xmin;
121 2: required double xmax;
122 3: required double ymin;
123 4: required double ymax;
124 5: optional double zmin;
125 6: optional double zmax;
126 7: optional double mmin;
127 8: optional double mmax;
128}
129);
130
131thrift_struct!(
132struct GeospatialStatistics {
133 1: optional BoundingBox bbox;
134 2: optional list<i32> geospatial_types;
135}
136);
137
138thrift_struct!(
139struct SizeStatistics {
140 1: optional i64 unencoded_byte_array_data_bytes;
141 2: optional list<i64> repetition_level_histogram;
142 3: optional list<i64> definition_level_histogram;
143}
144);
145
146fn convert_geo_stats(
147 stats: Option<GeospatialStatistics>,
148) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
149 stats.map(|st| {
150 let bbox = convert_bounding_box(st.bbox);
151 let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
152 Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
153 bbox,
154 geospatial_types,
155 ))
156 })
157}
158
159fn convert_bounding_box(
160 bbox: Option<BoundingBox>,
161) -> Option<crate::geospatial::bounding_box::BoundingBox> {
162 bbox.map(|bb| {
163 let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
164 bb.xmin.into(),
165 bb.xmax.into(),
166 bb.ymin.into(),
167 bb.ymax.into(),
168 );
169
170 newbb = match (bb.zmin, bb.zmax) {
171 (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
172 _ => newbb,
174 };
175
176 newbb = match (bb.mmin, bb.mmax) {
177 (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
178 _ => newbb,
180 };
181
182 newbb
183 })
184}
185
186fn convert_stats(
188 column_descr: &Arc<ColumnDescriptor>,
189 thrift_stats: Option<Statistics>,
190) -> Result<Option<crate::file::statistics::Statistics>> {
191 use crate::file::statistics::Statistics as FStatistics;
192 Ok(match thrift_stats {
193 Some(stats) => {
194 let null_count = stats.null_count.unwrap_or(0);
198
199 if null_count < 0 {
200 return Err(general_err!(
201 "Statistics null count is negative {}",
202 null_count
203 ));
204 }
205
206 let null_count = Some(null_count as u64);
208 let distinct_count = stats.distinct_count.map(|value| value as u64);
210 let old_format = stats.min_value.is_none() && stats.max_value.is_none();
212 let min = if old_format {
214 stats.min
215 } else {
216 stats.min_value
217 };
218 let max = if old_format {
220 stats.max
221 } else {
222 stats.max_value
223 };
224
225 fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
226 if let Some(min) = min {
227 if min.len() < len {
228 return Err(general_err!("Insufficient bytes to parse min statistic",));
229 }
230 }
231 if let Some(max) = max {
232 if max.len() < len {
233 return Err(general_err!("Insufficient bytes to parse max statistic",));
234 }
235 }
236 Ok(())
237 }
238
239 let physical_type = column_descr.physical_type();
240 match physical_type {
241 Type::BOOLEAN => check_len(&min, &max, 1),
242 Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
243 Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
244 Type::INT96 => check_len(&min, &max, 12),
245 _ => Ok(()),
246 }?;
247
248 let res = match physical_type {
253 Type::BOOLEAN => FStatistics::boolean(
254 min.map(|data| data[0] != 0),
255 max.map(|data| data[0] != 0),
256 distinct_count,
257 null_count,
258 old_format,
259 ),
260 Type::INT32 => FStatistics::int32(
261 min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
262 max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263 distinct_count,
264 null_count,
265 old_format,
266 ),
267 Type::INT64 => FStatistics::int64(
268 min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
269 max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270 distinct_count,
271 null_count,
272 old_format,
273 ),
274 Type::INT96 => {
275 let min = if let Some(data) = min {
277 assert_eq!(data.len(), 12);
278 Some(Int96::try_from_le_slice(data)?)
279 } else {
280 None
281 };
282 let max = if let Some(data) = max {
283 assert_eq!(data.len(), 12);
284 Some(Int96::try_from_le_slice(data)?)
285 } else {
286 None
287 };
288 FStatistics::int96(min, max, distinct_count, null_count, old_format)
289 }
290 Type::FLOAT => FStatistics::float(
291 min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
292 max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293 distinct_count,
294 null_count,
295 old_format,
296 ),
297 Type::DOUBLE => FStatistics::double(
298 min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
299 max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300 distinct_count,
301 null_count,
302 old_format,
303 ),
304 Type::BYTE_ARRAY => FStatistics::ByteArray(
305 ValueStatistics::new(
306 min.map(ByteArray::from),
307 max.map(ByteArray::from),
308 distinct_count,
309 null_count,
310 old_format,
311 )
312 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
313 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
314 ),
315 Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
316 ValueStatistics::new(
317 min.map(ByteArray::from).map(FixedLenByteArray::from),
318 max.map(ByteArray::from).map(FixedLenByteArray::from),
319 distinct_count,
320 null_count,
321 old_format,
322 )
323 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
324 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
325 ),
326 };
327
328 Some(res)
329 }
330 None => None,
331 })
332}
333
334const COL_META_TYPE: u16 = 1 << 1;
336const COL_META_ENCODINGS: u16 = 1 << 2;
337const COL_META_CODEC: u16 = 1 << 4;
338const COL_META_NUM_VALUES: u16 = 1 << 5;
339const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
340const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
341const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
342
343const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
345 | COL_META_ENCODINGS
346 | COL_META_CODEC
347 | COL_META_NUM_VALUES
348 | COL_META_TOTAL_UNCOMP_SZ
349 | COL_META_TOTAL_COMP_SZ
350 | COL_META_DATA_PAGE_OFFSET;
351
352fn validate_column_metadata(mask: u16) -> Result<()> {
355 if mask != COL_META_ALL_REQUIRED {
356 if mask & COL_META_ENCODINGS == 0 {
357 return Err(general_err!("Required field encodings is missing"));
358 }
359
360 if mask & COL_META_CODEC == 0 {
361 return Err(general_err!("Required field codec is missing"));
362 }
363 if mask & COL_META_NUM_VALUES == 0 {
364 return Err(general_err!("Required field num_values is missing"));
365 }
366 if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
367 return Err(general_err!(
368 "Required field total_uncompressed_size is missing"
369 ));
370 }
371 if mask & COL_META_TOTAL_COMP_SZ == 0 {
372 return Err(general_err!(
373 "Required field total_compressed_size is missing"
374 ));
375 }
376 if mask & COL_META_DATA_PAGE_OFFSET == 0 {
377 return Err(general_err!("Required field data_page_offset is missing"));
378 }
379 }
380
381 Ok(())
382}
383
384fn read_column_metadata<'a>(
387 prot: &mut ThriftSliceInputProtocol<'a>,
388 column: &mut ColumnChunkMetaData,
389) -> Result<u16> {
390 let mut seen_mask = 0u16;
392
393 let column_descr = &column.column_descr;
413
414 let mut last_field_id = 0i16;
415 loop {
416 let field_ident = prot.read_field_begin(last_field_id)?;
417 if field_ident.field_type == FieldType::Stop {
418 break;
419 }
420 match field_ident.id {
421 1 => {
423 Type::read_thrift(&mut *prot)?;
425 seen_mask |= COL_META_TYPE;
426 }
427 2 => {
428 column.encodings = EncodingMask::read_thrift(&mut *prot)?;
429 seen_mask |= COL_META_ENCODINGS;
430 }
431 4 => {
433 column.compression = Compression::read_thrift(&mut *prot)?;
434 seen_mask |= COL_META_CODEC;
435 }
436 5 => {
437 column.num_values = i64::read_thrift(&mut *prot)?;
438 seen_mask |= COL_META_NUM_VALUES;
439 }
440 6 => {
441 column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
442 seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
443 }
444 7 => {
445 column.total_compressed_size = i64::read_thrift(&mut *prot)?;
446 seen_mask |= COL_META_TOTAL_COMP_SZ;
447 }
448 9 => {
450 column.data_page_offset = i64::read_thrift(&mut *prot)?;
451 seen_mask |= COL_META_DATA_PAGE_OFFSET;
452 }
453 10 => {
454 column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
455 }
456 11 => {
457 column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
458 }
459 12 => {
460 column.statistics =
461 convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
462 }
463 13 => {
464 let val =
465 read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
466 column.encoding_stats = Some(val);
467 }
468 14 => {
469 column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
470 }
471 15 => {
472 column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
473 }
474 16 => {
475 let val = SizeStatistics::read_thrift(&mut *prot)?;
476 column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
477 column.repetition_level_histogram =
478 val.repetition_level_histogram.map(LevelHistogram::from);
479 column.definition_level_histogram =
480 val.definition_level_histogram.map(LevelHistogram::from);
481 }
482 17 => {
483 let val = GeospatialStatistics::read_thrift(&mut *prot)?;
484 column.geo_statistics = convert_geo_stats(Some(val));
485 }
486 _ => {
487 prot.skip(field_ident.field_type)?;
488 }
489 };
490 last_field_id = field_ident.id;
491 }
492
493 Ok(seen_mask)
494}
495
496fn read_column_chunk<'a>(
499 prot: &mut ThriftSliceInputProtocol<'a>,
500 column_descr: &Arc<ColumnDescriptor>,
501) -> Result<ColumnChunkMetaData> {
502 let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
504
505 let mut has_file_offset = false;
507
508 let mut col_meta_mask = 0u16;
510
511 let mut last_field_id = 0i16;
523 loop {
524 let field_ident = prot.read_field_begin(last_field_id)?;
525 if field_ident.field_type == FieldType::Stop {
526 break;
527 }
528 match field_ident.id {
529 1 => {
530 col.file_path = Some(String::read_thrift(&mut *prot)?);
531 }
532 2 => {
533 col.file_offset = i64::read_thrift(&mut *prot)?;
534 has_file_offset = true;
535 }
536 3 => {
537 col_meta_mask = read_column_metadata(&mut *prot, &mut col)?;
538 }
539 4 => {
540 col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
541 }
542 5 => {
543 col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
544 }
545 6 => {
546 col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
547 }
548 7 => {
549 col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
550 }
551 #[cfg(feature = "encryption")]
552 8 => {
553 let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
554 col.column_crypto_metadata = Some(Box::new(val));
555 }
556 #[cfg(feature = "encryption")]
557 9 => {
558 col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
559 }
560 _ => {
561 prot.skip(field_ident.field_type)?;
562 }
563 };
564 last_field_id = field_ident.id;
565 }
566
567 if !has_file_offset {
569 return Err(general_err!("Required field file_offset is missing"));
570 };
571
572 #[cfg(feature = "encryption")]
574 if col.encrypted_column_metadata.is_some() {
575 return Ok(col);
576 }
577
578 validate_column_metadata(col_meta_mask)?;
580
581 Ok(col)
582}
583
584fn read_row_group(
585 prot: &mut ThriftSliceInputProtocol,
586 schema_descr: &Arc<SchemaDescriptor>,
587) -> Result<RowGroupMetaData> {
588 let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
590
591 const RG_COLUMNS: u8 = 1 << 1;
593 const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
594 const RG_NUM_ROWS: u8 = 1 << 3;
595 const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
596
597 let mut mask = 0u8;
598
599 let mut last_field_id = 0i16;
609 loop {
610 let field_ident = prot.read_field_begin(last_field_id)?;
611 if field_ident.field_type == FieldType::Stop {
612 break;
613 }
614 match field_ident.id {
615 1 => {
616 let list_ident = prot.read_list_begin()?;
617 if schema_descr.num_columns() != list_ident.size as usize {
618 return Err(general_err!(
619 "Column count mismatch. Schema has {} columns while Row Group has {}",
620 schema_descr.num_columns(),
621 list_ident.size
622 ));
623 }
624 for i in 0..list_ident.size as usize {
625 let col = read_column_chunk(prot, &schema_descr.columns()[i])?;
626 row_group.columns.push(col);
627 }
628 mask |= RG_COLUMNS;
629 }
630 2 => {
631 row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
632 mask |= RG_TOT_BYTE_SIZE;
633 }
634 3 => {
635 row_group.num_rows = i64::read_thrift(&mut *prot)?;
636 mask |= RG_NUM_ROWS;
637 }
638 4 => {
639 let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
640 row_group.sorting_columns = Some(val);
641 }
642 5 => {
643 row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
644 }
645 7 => {
647 row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
648 }
649 _ => {
650 prot.skip(field_ident.field_type)?;
651 }
652 };
653 last_field_id = field_ident.id;
654 }
655
656 if mask != RG_ALL_REQUIRED {
657 if mask & RG_COLUMNS == 0 {
658 return Err(general_err!("Required field columns is missing"));
659 }
660 if mask & RG_TOT_BYTE_SIZE == 0 {
661 return Err(general_err!("Required field total_byte_size is missing"));
662 }
663 if mask & RG_NUM_ROWS == 0 {
664 return Err(general_err!("Required field num_rows is missing"));
665 }
666 }
667
668 Ok(row_group)
669}
670
671pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result<ParquetMetaData> {
674 let mut prot = ThriftSliceInputProtocol::new(buf);
675
676 let mut version: Option<i32> = None;
678 let mut num_rows: Option<i64> = None;
679 let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
680 let mut key_value_metadata: Option<Vec<KeyValue>> = None;
681 let mut created_by: Option<&str> = None;
682 let mut column_orders: Option<Vec<ColumnOrder>> = None;
683 #[cfg(feature = "encryption")]
684 let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
685 #[cfg(feature = "encryption")]
686 let mut footer_signing_key_metadata: Option<&[u8]> = None;
687
688 let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
690
691 let mut last_field_id = 0i16;
703 loop {
704 let field_ident = prot.read_field_begin(last_field_id)?;
705 if field_ident.field_type == FieldType::Stop {
706 break;
707 }
708 match field_ident.id {
709 1 => {
710 version = Some(i32::read_thrift(&mut prot)?);
711 }
712 2 => {
713 let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
715 let val = parquet_schema_from_array(val)?;
716 schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
717 }
718 3 => {
719 num_rows = Some(i64::read_thrift(&mut prot)?);
720 }
721 4 => {
722 if schema_descr.is_none() {
723 return Err(general_err!("Required field schema is missing"));
724 }
725 let schema_descr = schema_descr.as_ref().unwrap();
726 let list_ident = prot.read_list_begin()?;
727 let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
728 for _ in 0..list_ident.size {
729 rg_vec.push(read_row_group(&mut prot, schema_descr)?);
730 }
731 row_groups = Some(rg_vec);
732 }
733 5 => {
734 let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
735 key_value_metadata = Some(val);
736 }
737 6 => {
738 created_by = Some(<&str>::read_thrift(&mut prot)?);
739 }
740 7 => {
741 let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
742 column_orders = Some(val);
743 }
744 #[cfg(feature = "encryption")]
745 8 => {
746 let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
747 encryption_algorithm = Some(val);
748 }
749 #[cfg(feature = "encryption")]
750 9 => {
751 footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
752 }
753 _ => {
754 prot.skip(field_ident.field_type)?;
755 }
756 };
757 last_field_id = field_ident.id;
758 }
759 let Some(version) = version else {
760 return Err(general_err!("Required field version is missing"));
761 };
762 let Some(num_rows) = num_rows else {
763 return Err(general_err!("Required field num_rows is missing"));
764 };
765 let Some(row_groups) = row_groups else {
766 return Err(general_err!("Required field row_groups is missing"));
767 };
768
769 let created_by = created_by.map(|c| c.to_owned());
770
771 let schema_descr = schema_descr.unwrap();
773
774 if column_orders
776 .as_ref()
777 .is_some_and(|cos| cos.len() != schema_descr.num_columns())
778 {
779 return Err(general_err!("Column order length mismatch"));
780 }
781 let column_orders = column_orders.map(|mut cos| {
784 for (i, column) in schema_descr.columns().iter().enumerate() {
785 if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
786 let sort_order = ColumnOrder::get_sort_order(
787 column.logical_type(),
788 column.converted_type(),
789 column.physical_type(),
790 );
791 cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
792 }
793 }
794 cos
795 });
796
797 #[cfg(not(feature = "encryption"))]
798 let fmd = crate::file::metadata::FileMetaData::new(
799 version,
800 num_rows,
801 created_by,
802 key_value_metadata,
803 schema_descr,
804 column_orders,
805 );
806 #[cfg(feature = "encryption")]
807 let fmd = crate::file::metadata::FileMetaData::new(
808 version,
809 num_rows,
810 created_by,
811 key_value_metadata,
812 schema_descr,
813 column_orders,
814 )
815 .with_encryption_algorithm(encryption_algorithm)
816 .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
817
818 Ok(ParquetMetaData::new(fmd, row_groups))
819}
820
821thrift_struct!(
822 pub(crate) struct IndexPageHeader {}
823);
824
825thrift_struct!(
826pub(crate) struct DictionaryPageHeader {
827 1: required i32 num_values;
829
830 2: required Encoding encoding
832
833 3: optional bool is_sorted;
835}
836);
837
838thrift_struct!(
839pub(crate) struct PageStatistics {
847 1: optional binary max;
848 2: optional binary min;
849 3: optional i64 null_count;
850 4: optional i64 distinct_count;
851 5: optional binary max_value;
852 6: optional binary min_value;
853 7: optional bool is_max_value_exact;
854 8: optional bool is_min_value_exact;
855}
856);
857
858thrift_struct!(
859pub(crate) struct DataPageHeader {
860 1: required i32 num_values
861 2: required Encoding encoding
862 3: required Encoding definition_level_encoding;
863 4: required Encoding repetition_level_encoding;
864 5: optional PageStatistics statistics;
865}
866);
867
868impl DataPageHeader {
869 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
871 where
872 R: ThriftCompactInputProtocol<'a>,
873 {
874 let mut num_values: Option<i32> = None;
875 let mut encoding: Option<Encoding> = None;
876 let mut definition_level_encoding: Option<Encoding> = None;
877 let mut repetition_level_encoding: Option<Encoding> = None;
878 let statistics: Option<PageStatistics> = None;
879 let mut last_field_id = 0i16;
880 loop {
881 let field_ident = prot.read_field_begin(last_field_id)?;
882 if field_ident.field_type == FieldType::Stop {
883 break;
884 }
885 match field_ident.id {
886 1 => {
887 let val = i32::read_thrift(&mut *prot)?;
888 num_values = Some(val);
889 }
890 2 => {
891 let val = Encoding::read_thrift(&mut *prot)?;
892 encoding = Some(val);
893 }
894 3 => {
895 let val = Encoding::read_thrift(&mut *prot)?;
896 definition_level_encoding = Some(val);
897 }
898 4 => {
899 let val = Encoding::read_thrift(&mut *prot)?;
900 repetition_level_encoding = Some(val);
901 }
902 _ => {
903 prot.skip(field_ident.field_type)?;
904 }
905 };
906 last_field_id = field_ident.id;
907 }
908 let Some(num_values) = num_values else {
909 return Err(general_err!("Required field num_values is missing"));
910 };
911 let Some(encoding) = encoding else {
912 return Err(general_err!("Required field encoding is missing"));
913 };
914 let Some(definition_level_encoding) = definition_level_encoding else {
915 return Err(general_err!(
916 "Required field definition_level_encoding is missing"
917 ));
918 };
919 let Some(repetition_level_encoding) = repetition_level_encoding else {
920 return Err(general_err!(
921 "Required field repetition_level_encoding is missing"
922 ));
923 };
924 Ok(Self {
925 num_values,
926 encoding,
927 definition_level_encoding,
928 repetition_level_encoding,
929 statistics,
930 })
931 }
932}
933
934thrift_struct!(
935pub(crate) struct DataPageHeaderV2 {
936 1: required i32 num_values
937 2: required i32 num_nulls
938 3: required i32 num_rows
939 4: required Encoding encoding
940 5: required i32 definition_levels_byte_length;
941 6: required i32 repetition_levels_byte_length;
942 7: optional bool is_compressed = true;
943 8: optional PageStatistics statistics;
944}
945);
946
947impl DataPageHeaderV2 {
948 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
950 where
951 R: ThriftCompactInputProtocol<'a>,
952 {
953 let mut num_values: Option<i32> = None;
954 let mut num_nulls: Option<i32> = None;
955 let mut num_rows: Option<i32> = None;
956 let mut encoding: Option<Encoding> = None;
957 let mut definition_levels_byte_length: Option<i32> = None;
958 let mut repetition_levels_byte_length: Option<i32> = None;
959 let mut is_compressed: Option<bool> = None;
960 let statistics: Option<PageStatistics> = None;
961 let mut last_field_id = 0i16;
962 loop {
963 let field_ident = prot.read_field_begin(last_field_id)?;
964 if field_ident.field_type == FieldType::Stop {
965 break;
966 }
967 match field_ident.id {
968 1 => {
969 let val = i32::read_thrift(&mut *prot)?;
970 num_values = Some(val);
971 }
972 2 => {
973 let val = i32::read_thrift(&mut *prot)?;
974 num_nulls = Some(val);
975 }
976 3 => {
977 let val = i32::read_thrift(&mut *prot)?;
978 num_rows = Some(val);
979 }
980 4 => {
981 let val = Encoding::read_thrift(&mut *prot)?;
982 encoding = Some(val);
983 }
984 5 => {
985 let val = i32::read_thrift(&mut *prot)?;
986 definition_levels_byte_length = Some(val);
987 }
988 6 => {
989 let val = i32::read_thrift(&mut *prot)?;
990 repetition_levels_byte_length = Some(val);
991 }
992 7 => {
993 let val = field_ident.bool_val.unwrap();
994 is_compressed = Some(val);
995 }
996 _ => {
997 prot.skip(field_ident.field_type)?;
998 }
999 };
1000 last_field_id = field_ident.id;
1001 }
1002 let Some(num_values) = num_values else {
1003 return Err(general_err!("Required field num_values is missing"));
1004 };
1005 let Some(num_nulls) = num_nulls else {
1006 return Err(general_err!("Required field num_nulls is missing"));
1007 };
1008 let Some(num_rows) = num_rows else {
1009 return Err(general_err!("Required field num_rows is missing"));
1010 };
1011 let Some(encoding) = encoding else {
1012 return Err(general_err!("Required field encoding is missing"));
1013 };
1014 let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1015 return Err(general_err!(
1016 "Required field definition_levels_byte_length is missing"
1017 ));
1018 };
1019 let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1020 return Err(general_err!(
1021 "Required field repetition_levels_byte_length is missing"
1022 ));
1023 };
1024 Ok(Self {
1025 num_values,
1026 num_nulls,
1027 num_rows,
1028 encoding,
1029 definition_levels_byte_length,
1030 repetition_levels_byte_length,
1031 is_compressed,
1032 statistics,
1033 })
1034 }
1035}
1036
1037thrift_struct!(
1038pub(crate) struct PageHeader {
1039 1: required PageType r#type
1041
1042 2: required i32 uncompressed_page_size
1044
1045 3: required i32 compressed_page_size
1047
1048 4: optional i32 crc
1050
1051 5: optional DataPageHeader data_page_header;
1053 6: optional IndexPageHeader index_page_header;
1054 7: optional DictionaryPageHeader dictionary_page_header;
1055 8: optional DataPageHeaderV2 data_page_header_v2;
1056}
1057);
1058
1059impl PageHeader {
1060 pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1064 where
1065 R: ThriftCompactInputProtocol<'a>,
1066 {
1067 let mut type_: Option<PageType> = None;
1068 let mut uncompressed_page_size: Option<i32> = None;
1069 let mut compressed_page_size: Option<i32> = None;
1070 let mut crc: Option<i32> = None;
1071 let mut data_page_header: Option<DataPageHeader> = None;
1072 let mut index_page_header: Option<IndexPageHeader> = None;
1073 let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1074 let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1075 let mut last_field_id = 0i16;
1076 loop {
1077 let field_ident = prot.read_field_begin(last_field_id)?;
1078 if field_ident.field_type == FieldType::Stop {
1079 break;
1080 }
1081 match field_ident.id {
1082 1 => {
1083 let val = PageType::read_thrift(&mut *prot)?;
1084 type_ = Some(val);
1085 }
1086 2 => {
1087 let val = i32::read_thrift(&mut *prot)?;
1088 uncompressed_page_size = Some(val);
1089 }
1090 3 => {
1091 let val = i32::read_thrift(&mut *prot)?;
1092 compressed_page_size = Some(val);
1093 }
1094 4 => {
1095 let val = i32::read_thrift(&mut *prot)?;
1096 crc = Some(val);
1097 }
1098 5 => {
1099 let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1100 data_page_header = Some(val);
1101 }
1102 6 => {
1103 let val = IndexPageHeader::read_thrift(&mut *prot)?;
1104 index_page_header = Some(val);
1105 }
1106 7 => {
1107 let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1108 dictionary_page_header = Some(val);
1109 }
1110 8 => {
1111 let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1112 data_page_header_v2 = Some(val);
1113 }
1114 _ => {
1115 prot.skip(field_ident.field_type)?;
1116 }
1117 };
1118 last_field_id = field_ident.id;
1119 }
1120 let Some(type_) = type_ else {
1121 return Err(general_err!("Required field type_ is missing"));
1122 };
1123 let Some(uncompressed_page_size) = uncompressed_page_size else {
1124 return Err(general_err!(
1125 "Required field uncompressed_page_size is missing"
1126 ));
1127 };
1128 let Some(compressed_page_size) = compressed_page_size else {
1129 return Err(general_err!(
1130 "Required field compressed_page_size is missing"
1131 ));
1132 };
1133 Ok(Self {
1134 r#type: type_,
1135 uncompressed_page_size,
1136 compressed_page_size,
1137 crc,
1138 data_page_header,
1139 index_page_header,
1140 dictionary_page_header,
1141 data_page_header_v2,
1142 })
1143 }
1144}
1145
1146pub(super) fn serialize_column_meta_data<W: Write>(
1170 column_chunk: &ColumnChunkMetaData,
1171 w: &mut ThriftCompactOutputProtocol<W>,
1172) -> Result<()> {
1173 use crate::file::statistics::page_stats_to_thrift;
1174
1175 column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1176 column_chunk
1177 .encodings()
1178 .collect::<Vec<_>>()
1179 .write_thrift_field(w, 2, 1)?;
1180 let path = column_chunk.column_descr.path().parts();
1181 let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1182 path.write_thrift_field(w, 3, 2)?;
1183 column_chunk.compression.write_thrift_field(w, 4, 3)?;
1184 column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1185 column_chunk
1186 .total_uncompressed_size
1187 .write_thrift_field(w, 6, 5)?;
1188 column_chunk
1189 .total_compressed_size
1190 .write_thrift_field(w, 7, 6)?;
1191 let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1193 if let Some(index_page_offset) = column_chunk.index_page_offset {
1194 last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1195 }
1196 if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1197 last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1198 }
1199 let stats = page_stats_to_thrift(column_chunk.statistics());
1201 if let Some(stats) = stats {
1202 last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1203 }
1204 if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1205 last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1206 }
1207 if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1208 last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1209 }
1210 if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1211 last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1212 }
1213
1214 let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1216 || column_chunk.repetition_level_histogram.is_some()
1217 || column_chunk.definition_level_histogram.is_some()
1218 {
1219 let repetition_level_histogram = column_chunk
1220 .repetition_level_histogram()
1221 .map(|hist| hist.clone().into_inner());
1222
1223 let definition_level_histogram = column_chunk
1224 .definition_level_histogram()
1225 .map(|hist| hist.clone().into_inner());
1226
1227 Some(SizeStatistics {
1228 unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1229 repetition_level_histogram,
1230 definition_level_histogram,
1231 })
1232 } else {
1233 None
1234 };
1235 if let Some(size_stats) = size_stats {
1236 last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1237 }
1238
1239 if let Some(geo_stats) = column_chunk.geo_statistics() {
1240 geo_stats.write_thrift_field(w, 17, last_field_id)?;
1241 }
1242
1243 w.write_struct_end()
1244}
1245
1246pub(super) struct FileMeta<'a> {
1248 pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1249 pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1250}
1251
1252impl<'a> WriteThrift for FileMeta<'a> {
1264 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1265
1266 #[allow(unused_assignments)]
1268 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1269 self.file_metadata
1270 .version
1271 .write_thrift_field(writer, 1, 0)?;
1272
1273 let root = self.file_metadata.schema_descr().root_schema_ptr();
1276 let schema_len = num_nodes(&root)?;
1277 writer.write_field_begin(FieldType::List, 2, 1)?;
1278 writer.write_list_begin(ElementType::Struct, schema_len)?;
1279 write_schema(&root, writer)?;
1281
1282 self.file_metadata
1283 .num_rows
1284 .write_thrift_field(writer, 3, 2)?;
1285
1286 let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1288
1289 if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1290 last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1291 }
1292 if let Some(created_by) = self.file_metadata.created_by() {
1293 last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1294 }
1295 if let Some(column_orders) = self.file_metadata.column_orders() {
1296 last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1297 }
1298 #[cfg(feature = "encryption")]
1299 if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1300 last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1301 }
1302 #[cfg(feature = "encryption")]
1303 if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1304 key.as_slice()
1305 .write_thrift_field(writer, 9, last_field_id)?;
1306 }
1307
1308 writer.write_struct_end()
1309 }
1310}
1311
1312fn write_schema<W: Write>(
1313 schema: &TypePtr,
1314 writer: &mut ThriftCompactOutputProtocol<W>,
1315) -> Result<()> {
1316 if !schema.is_group() {
1317 return Err(general_err!("Root schema must be Group type"));
1318 }
1319 write_schema_helper(schema, writer)
1320}
1321
1322fn write_schema_helper<W: Write>(
1323 node: &TypePtr,
1324 writer: &mut ThriftCompactOutputProtocol<W>,
1325) -> Result<()> {
1326 match node.as_ref() {
1327 crate::schema::types::Type::PrimitiveType {
1328 basic_info,
1329 physical_type,
1330 type_length,
1331 scale,
1332 precision,
1333 } => {
1334 let element = SchemaElement {
1335 r#type: Some(*physical_type),
1336 type_length: if *type_length >= 0 {
1337 Some(*type_length)
1338 } else {
1339 None
1340 },
1341 repetition_type: Some(basic_info.repetition()),
1342 name: basic_info.name(),
1343 num_children: None,
1344 converted_type: match basic_info.converted_type() {
1345 ConvertedType::NONE => None,
1346 other => Some(other),
1347 },
1348 scale: if *scale >= 0 { Some(*scale) } else { None },
1349 precision: if *precision >= 0 {
1350 Some(*precision)
1351 } else {
1352 None
1353 },
1354 field_id: if basic_info.has_id() {
1355 Some(basic_info.id())
1356 } else {
1357 None
1358 },
1359 logical_type: basic_info.logical_type(),
1360 };
1361 element.write_thrift(writer)
1362 }
1363 crate::schema::types::Type::GroupType { basic_info, fields } => {
1364 let repetition = if basic_info.has_repetition() {
1365 Some(basic_info.repetition())
1366 } else {
1367 None
1368 };
1369
1370 let element = SchemaElement {
1371 r#type: None,
1372 type_length: None,
1373 repetition_type: repetition,
1374 name: basic_info.name(),
1375 num_children: Some(fields.len().try_into()?),
1376 converted_type: match basic_info.converted_type() {
1377 ConvertedType::NONE => None,
1378 other => Some(other),
1379 },
1380 scale: None,
1381 precision: None,
1382 field_id: if basic_info.has_id() {
1383 Some(basic_info.id())
1384 } else {
1385 None
1386 },
1387 logical_type: basic_info.logical_type(),
1388 };
1389
1390 element.write_thrift(writer)?;
1391
1392 for field in fields {
1394 write_schema_helper(field, writer)?;
1395 }
1396 Ok(())
1397 }
1398 }
1399}
1400
1401impl WriteThrift for RowGroupMetaData {
1411 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1412
1413 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1414 self.columns.write_thrift_field(writer, 1, 0)?;
1416 self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1417 let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1418 if let Some(sorting_columns) = self.sorting_columns() {
1419 last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1420 }
1421 if let Some(file_offset) = self.file_offset() {
1422 last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1423 }
1424 last_field_id = self
1426 .compressed_size()
1427 .write_thrift_field(writer, 6, last_field_id)?;
1428 if let Some(ordinal) = self.ordinal() {
1429 ordinal.write_thrift_field(writer, 7, last_field_id)?;
1430 }
1431 writer.write_struct_end()
1432 }
1433}
1434
1435impl WriteThrift for ColumnChunkMetaData {
1447 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1448
1449 #[allow(unused_assignments)]
1450 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1451 let mut last_field_id = 0i16;
1452 if let Some(file_path) = self.file_path() {
1453 last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1454 }
1455 last_field_id = self
1456 .file_offset()
1457 .write_thrift_field(writer, 2, last_field_id)?;
1458
1459 #[cfg(feature = "encryption")]
1460 {
1461 if self.encrypted_column_metadata.is_none() {
1463 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1464 serialize_column_meta_data(self, writer)?;
1465 last_field_id = 3;
1466 }
1467 }
1468 #[cfg(not(feature = "encryption"))]
1469 {
1470 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1472 serialize_column_meta_data(self, writer)?;
1473 last_field_id = 3;
1474 }
1475
1476 if let Some(offset_idx_off) = self.offset_index_offset() {
1477 last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1478 }
1479 if let Some(offset_idx_len) = self.offset_index_length() {
1480 last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1481 }
1482 if let Some(column_idx_off) = self.column_index_offset() {
1483 last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1484 }
1485 if let Some(column_idx_len) = self.column_index_length() {
1486 last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1487 }
1488 #[cfg(feature = "encryption")]
1489 {
1490 if let Some(crypto_metadata) = self.crypto_metadata() {
1491 last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1492 }
1493 if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1494 encrypted_meta
1495 .as_slice()
1496 .write_thrift_field(writer, 9, last_field_id)?;
1497 }
1498 }
1499
1500 writer.write_struct_end()
1501 }
1502}
1503
1504impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1509 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1510
1511 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1512 let mut last_field_id = 0i16;
1513 if let Some(bbox) = self.bounding_box() {
1514 last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1515 }
1516 if let Some(geo_types) = self.geospatial_types() {
1517 geo_types.write_thrift_field(writer, 2, last_field_id)?;
1518 }
1519
1520 writer.write_struct_end()
1521 }
1522}
1523
1524impl WriteThriftField for crate::geospatial::statistics::GeospatialStatistics {
1525 fn write_thrift_field<W: Write>(
1526 &self,
1527 writer: &mut ThriftCompactOutputProtocol<W>,
1528 field_id: i16,
1529 last_field_id: i16,
1530 ) -> Result<i16> {
1531 writer.write_field_begin(FieldType::Struct, field_id, last_field_id)?;
1532 self.write_thrift(writer)?;
1533 Ok(field_id)
1534 }
1535}
1536
1537impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1548 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1549
1550 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1551 self.get_xmin().write_thrift_field(writer, 1, 0)?;
1552 self.get_xmax().write_thrift_field(writer, 2, 1)?;
1553 self.get_ymin().write_thrift_field(writer, 3, 2)?;
1554 let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1555
1556 if let Some(zmin) = self.get_zmin() {
1557 last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1558 }
1559 if let Some(zmax) = self.get_zmax() {
1560 last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1561 }
1562 if let Some(mmin) = self.get_mmin() {
1563 last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1564 }
1565 if let Some(mmax) = self.get_mmax() {
1566 mmax.write_thrift_field(writer, 8, last_field_id)?;
1567 }
1568
1569 writer.write_struct_end()
1570 }
1571}
1572
1573impl WriteThriftField for crate::geospatial::bounding_box::BoundingBox {
1574 fn write_thrift_field<W: Write>(
1575 &self,
1576 writer: &mut ThriftCompactOutputProtocol<W>,
1577 field_id: i16,
1578 last_field_id: i16,
1579 ) -> Result<i16> {
1580 writer.write_field_begin(FieldType::Struct, field_id, last_field_id)?;
1581 self.write_thrift(writer)?;
1582 Ok(field_id)
1583 }
1584}
1585
1586#[cfg(test)]
1587pub(crate) mod tests {
1588 use crate::errors::Result;
1589 use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema};
1590 use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1591 use crate::parquet_thrift::tests::test_roundtrip;
1592 use crate::parquet_thrift::{
1593 ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec,
1594 };
1595 use crate::schema::types::{
1596 ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
1597 };
1598 use std::sync::Arc;
1599
1600 pub(crate) fn read_row_group(
1602 buf: &mut [u8],
1603 schema_descr: Arc<SchemaDescriptor>,
1604 ) -> Result<RowGroupMetaData> {
1605 let mut reader = ThriftSliceInputProtocol::new(buf);
1606 crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr)
1607 }
1608
1609 pub(crate) fn read_column_chunk(
1610 buf: &mut [u8],
1611 column_descr: Arc<ColumnDescriptor>,
1612 ) -> Result<ColumnChunkMetaData> {
1613 let mut reader = ThriftSliceInputProtocol::new(buf);
1614 crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr)
1615 }
1616
1617 pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1618 let num_nodes = num_nodes(&schema)?;
1619 let mut buf = Vec::new();
1620 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1621
1622 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1624
1625 write_schema(&schema, &mut writer)?;
1627
1628 let mut prot = ThriftSliceInputProtocol::new(&buf);
1629 let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1630 parquet_schema_from_array(se)
1631 }
1632
1633 pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1634 let num_nodes = num_nodes(schema)?;
1635 let mut buf = Vec::new();
1636 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1637
1638 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1640
1641 write_schema(schema, &mut writer)?;
1643 Ok(buf)
1644 }
1645
1646 pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1647 let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1648 read_thrift_vec(&mut prot)
1649 }
1650
1651 #[test]
1652 fn test_bounding_box_roundtrip() {
1653 test_roundtrip(BoundingBox {
1654 xmin: 0.1.into(),
1655 xmax: 10.3.into(),
1656 ymin: 0.001.into(),
1657 ymax: 128.5.into(),
1658 zmin: None,
1659 zmax: None,
1660 mmin: None,
1661 mmax: None,
1662 });
1663
1664 test_roundtrip(BoundingBox {
1665 xmin: 0.1.into(),
1666 xmax: 10.3.into(),
1667 ymin: 0.001.into(),
1668 ymax: 128.5.into(),
1669 zmin: Some(11.0.into()),
1670 zmax: Some(1300.0.into()),
1671 mmin: None,
1672 mmax: None,
1673 });
1674
1675 test_roundtrip(BoundingBox {
1676 xmin: 0.1.into(),
1677 xmax: 10.3.into(),
1678 ymin: 0.001.into(),
1679 ymax: 128.5.into(),
1680 zmin: Some(11.0.into()),
1681 zmax: Some(1300.0.into()),
1682 mmin: Some(3.7.into()),
1683 mmax: Some(42.0.into()),
1684 });
1685 }
1686}