1use std::io::Write;
27use std::sync::Arc;
28
29#[cfg(feature = "encryption")]
30pub(crate) mod encryption;
31
32#[cfg(feature = "encryption")]
33use crate::file::{
34 column_crypto_metadata::ColumnCryptoMetaData, metadata::thrift::encryption::EncryptionAlgorithm,
35};
36use crate::{
37 basic::{
38 ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType,
39 Repetition, Type,
40 },
41 data_type::{ByteArray, FixedLenByteArray, Int96},
42 errors::{ParquetError, Result},
43 file::{
44 metadata::{
45 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram,
46 PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, RowGroupMetaData,
47 RowGroupMetaDataBuilder, SortingColumn,
48 },
49 statistics::ValueStatistics,
50 },
51 parquet_thrift::{
52 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
53 ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
54 read_thrift_vec,
55 },
56 schema::types::{
57 ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
58 },
59 thrift_struct,
60 util::bit_util::FromBytes,
61 write_thrift_field,
62};
63
64thrift_struct!(
66pub(crate) struct SchemaElement<'a> {
67 1: optional Type r#type;
69 2: optional i32 type_length;
74 3: optional Repetition repetition_type;
77 4: required string<'a> name;
79 5: optional i32 num_children;
84 6: optional ConvertedType converted_type;
89 7: optional i32 scale
94 8: optional i32 precision
95 9: optional i32 field_id;
98 10: optional LogicalType logical_type
103}
104);
105
106thrift_struct!(
107struct Statistics<'a> {
108 1: optional binary<'a> max;
109 2: optional binary<'a> min;
110 3: optional i64 null_count;
111 4: optional i64 distinct_count;
112 5: optional binary<'a> max_value;
113 6: optional binary<'a> min_value;
114 7: optional bool is_max_value_exact;
115 8: optional bool is_min_value_exact;
116}
117);
118
119thrift_struct!(
120struct BoundingBox {
121 1: required double xmin;
122 2: required double xmax;
123 3: required double ymin;
124 4: required double ymax;
125 5: optional double zmin;
126 6: optional double zmax;
127 7: optional double mmin;
128 8: optional double mmax;
129}
130);
131
132thrift_struct!(
133struct GeospatialStatistics {
134 1: optional BoundingBox bbox;
135 2: optional list<i32> geospatial_types;
136}
137);
138
139thrift_struct!(
140struct SizeStatistics {
141 1: optional i64 unencoded_byte_array_data_bytes;
142 2: optional list<i64> repetition_level_histogram;
143 3: optional list<i64> definition_level_histogram;
144}
145);
146
147fn convert_geo_stats(
148 stats: Option<GeospatialStatistics>,
149) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
150 stats.map(|st| {
151 let bbox = convert_bounding_box(st.bbox);
152 let geospatial_types: Option<Vec<i32>> = st.geospatial_types.filter(|v| !v.is_empty());
153 Box::new(crate::geospatial::statistics::GeospatialStatistics::new(
154 bbox,
155 geospatial_types,
156 ))
157 })
158}
159
160fn convert_bounding_box(
161 bbox: Option<BoundingBox>,
162) -> Option<crate::geospatial::bounding_box::BoundingBox> {
163 bbox.map(|bb| {
164 let mut newbb = crate::geospatial::bounding_box::BoundingBox::new(
165 bb.xmin.into(),
166 bb.xmax.into(),
167 bb.ymin.into(),
168 bb.ymax.into(),
169 );
170
171 newbb = match (bb.zmin, bb.zmax) {
172 (Some(zmin), Some(zmax)) => newbb.with_zrange(zmin.into(), zmax.into()),
173 _ => newbb,
175 };
176
177 newbb = match (bb.mmin, bb.mmax) {
178 (Some(mmin), Some(mmax)) => newbb.with_mrange(mmin.into(), mmax.into()),
179 _ => newbb,
181 };
182
183 newbb
184 })
185}
186
187fn convert_stats(
189 column_descr: &Arc<ColumnDescriptor>,
190 thrift_stats: Option<Statistics>,
191) -> Result<Option<crate::file::statistics::Statistics>> {
192 use crate::file::statistics::Statistics as FStatistics;
193 Ok(match thrift_stats {
194 Some(stats) => {
195 let null_count = stats.null_count.unwrap_or(0);
199
200 if null_count < 0 {
201 return Err(general_err!(
202 "Statistics null count is negative {}",
203 null_count
204 ));
205 }
206
207 let null_count = Some(null_count as u64);
209 let distinct_count = stats.distinct_count.map(|value| value as u64);
211 let old_format = stats.min_value.is_none() && stats.max_value.is_none();
213 let min = if old_format {
215 stats.min
216 } else {
217 stats.min_value
218 };
219 let max = if old_format {
221 stats.max
222 } else {
223 stats.max_value
224 };
225
226 fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> {
227 if let Some(min) = min {
228 if min.len() < len {
229 return Err(general_err!("Insufficient bytes to parse min statistic",));
230 }
231 }
232 if let Some(max) = max {
233 if max.len() < len {
234 return Err(general_err!("Insufficient bytes to parse max statistic",));
235 }
236 }
237 Ok(())
238 }
239
240 let physical_type = column_descr.physical_type();
241 match physical_type {
242 Type::BOOLEAN => check_len(&min, &max, 1),
243 Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
244 Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
245 Type::INT96 => check_len(&min, &max, 12),
246 _ => Ok(()),
247 }?;
248
249 let res = match physical_type {
254 Type::BOOLEAN => FStatistics::boolean(
255 min.map(|data| data[0] != 0),
256 max.map(|data| data[0] != 0),
257 distinct_count,
258 null_count,
259 old_format,
260 ),
261 Type::INT32 => FStatistics::int32(
262 min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
263 max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())),
264 distinct_count,
265 null_count,
266 old_format,
267 ),
268 Type::INT64 => FStatistics::int64(
269 min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
270 max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())),
271 distinct_count,
272 null_count,
273 old_format,
274 ),
275 Type::INT96 => {
276 let min = if let Some(data) = min {
278 assert_eq!(data.len(), 12);
279 Some(Int96::try_from_le_slice(data)?)
280 } else {
281 None
282 };
283 let max = if let Some(data) = max {
284 assert_eq!(data.len(), 12);
285 Some(Int96::try_from_le_slice(data)?)
286 } else {
287 None
288 };
289 FStatistics::int96(min, max, distinct_count, null_count, old_format)
290 }
291 Type::FLOAT => FStatistics::float(
292 min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
293 max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())),
294 distinct_count,
295 null_count,
296 old_format,
297 ),
298 Type::DOUBLE => FStatistics::double(
299 min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
300 max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())),
301 distinct_count,
302 null_count,
303 old_format,
304 ),
305 Type::BYTE_ARRAY => FStatistics::ByteArray(
306 ValueStatistics::new(
307 min.map(ByteArray::from),
308 max.map(ByteArray::from),
309 distinct_count,
310 null_count,
311 old_format,
312 )
313 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
314 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
315 ),
316 Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray(
317 ValueStatistics::new(
318 min.map(ByteArray::from).map(FixedLenByteArray::from),
319 max.map(ByteArray::from).map(FixedLenByteArray::from),
320 distinct_count,
321 null_count,
322 old_format,
323 )
324 .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false))
325 .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)),
326 ),
327 };
328
329 Some(res)
330 }
331 None => None,
332 })
333}
334
335const COL_META_TYPE: u16 = 1 << 1;
337const COL_META_ENCODINGS: u16 = 1 << 2;
338const COL_META_CODEC: u16 = 1 << 4;
339const COL_META_NUM_VALUES: u16 = 1 << 5;
340const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
341const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
342const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
343
344const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
346 | COL_META_ENCODINGS
347 | COL_META_CODEC
348 | COL_META_NUM_VALUES
349 | COL_META_TOTAL_UNCOMP_SZ
350 | COL_META_TOTAL_COMP_SZ
351 | COL_META_DATA_PAGE_OFFSET;
352
353fn validate_column_metadata(mask: u16) -> Result<()> {
356 if mask != COL_META_ALL_REQUIRED {
357 if mask & COL_META_ENCODINGS == 0 {
358 return Err(general_err!("Required field encodings is missing"));
359 }
360
361 if mask & COL_META_CODEC == 0 {
362 return Err(general_err!("Required field codec is missing"));
363 }
364 if mask & COL_META_NUM_VALUES == 0 {
365 return Err(general_err!("Required field num_values is missing"));
366 }
367 if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
368 return Err(general_err!(
369 "Required field total_uncompressed_size is missing"
370 ));
371 }
372 if mask & COL_META_TOTAL_COMP_SZ == 0 {
373 return Err(general_err!(
374 "Required field total_compressed_size is missing"
375 ));
376 }
377 if mask & COL_META_DATA_PAGE_OFFSET == 0 {
378 return Err(general_err!("Required field data_page_offset is missing"));
379 }
380 }
381
382 Ok(())
383}
384
385fn read_column_metadata<'a>(
388 prot: &mut ThriftSliceInputProtocol<'a>,
389 column: &mut ColumnChunkMetaData,
390) -> Result<u16> {
391 let mut seen_mask = 0u16;
393
394 let column_descr = &column.column_descr;
414
415 let mut last_field_id = 0i16;
416 loop {
417 let field_ident = prot.read_field_begin(last_field_id)?;
418 if field_ident.field_type == FieldType::Stop {
419 break;
420 }
421 match field_ident.id {
422 1 => {
424 Type::read_thrift(&mut *prot)?;
426 seen_mask |= COL_META_TYPE;
427 }
428 2 => {
429 column.encodings = EncodingMask::read_thrift(&mut *prot)?;
430 seen_mask |= COL_META_ENCODINGS;
431 }
432 4 => {
434 column.compression = Compression::read_thrift(&mut *prot)?;
435 seen_mask |= COL_META_CODEC;
436 }
437 5 => {
438 column.num_values = i64::read_thrift(&mut *prot)?;
439 seen_mask |= COL_META_NUM_VALUES;
440 }
441 6 => {
442 column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
443 seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
444 }
445 7 => {
446 column.total_compressed_size = i64::read_thrift(&mut *prot)?;
447 seen_mask |= COL_META_TOTAL_COMP_SZ;
448 }
449 9 => {
451 column.data_page_offset = i64::read_thrift(&mut *prot)?;
452 seen_mask |= COL_META_DATA_PAGE_OFFSET;
453 }
454 10 => {
455 column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
456 }
457 11 => {
458 column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?);
459 }
460 12 => {
461 column.statistics =
462 convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?;
463 }
464 13 => {
465 let val =
466 read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut *prot)?;
467 column.encoding_stats = Some(val);
468 }
469 14 => {
470 column.bloom_filter_offset = Some(i64::read_thrift(&mut *prot)?);
471 }
472 15 => {
473 column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?);
474 }
475 16 => {
476 let val = SizeStatistics::read_thrift(&mut *prot)?;
477 column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes;
478 column.repetition_level_histogram =
479 val.repetition_level_histogram.map(LevelHistogram::from);
480 column.definition_level_histogram =
481 val.definition_level_histogram.map(LevelHistogram::from);
482 }
483 17 => {
484 let val = GeospatialStatistics::read_thrift(&mut *prot)?;
485 column.geo_statistics = convert_geo_stats(Some(val));
486 }
487 _ => {
488 prot.skip(field_ident.field_type)?;
489 }
490 };
491 last_field_id = field_ident.id;
492 }
493
494 Ok(seen_mask)
495}
496
497fn read_column_chunk<'a>(
500 prot: &mut ThriftSliceInputProtocol<'a>,
501 column_descr: &Arc<ColumnDescriptor>,
502) -> Result<ColumnChunkMetaData> {
503 let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
505
506 let mut has_file_offset = false;
508
509 let mut col_meta_mask = 0u16;
511
512 let mut last_field_id = 0i16;
524 loop {
525 let field_ident = prot.read_field_begin(last_field_id)?;
526 if field_ident.field_type == FieldType::Stop {
527 break;
528 }
529 match field_ident.id {
530 1 => {
531 col.file_path = Some(String::read_thrift(&mut *prot)?);
532 }
533 2 => {
534 col.file_offset = i64::read_thrift(&mut *prot)?;
535 has_file_offset = true;
536 }
537 3 => {
538 col_meta_mask = read_column_metadata(&mut *prot, &mut col)?;
539 }
540 4 => {
541 col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
542 }
543 5 => {
544 col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
545 }
546 6 => {
547 col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
548 }
549 7 => {
550 col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
551 }
552 #[cfg(feature = "encryption")]
553 8 => {
554 let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
555 col.column_crypto_metadata = Some(Box::new(val));
556 }
557 #[cfg(feature = "encryption")]
558 9 => {
559 col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut *prot)?.to_vec());
560 }
561 _ => {
562 prot.skip(field_ident.field_type)?;
563 }
564 };
565 last_field_id = field_ident.id;
566 }
567
568 if !has_file_offset {
570 return Err(general_err!("Required field file_offset is missing"));
571 };
572
573 #[cfg(feature = "encryption")]
575 if col.encrypted_column_metadata.is_some() {
576 return Ok(col);
577 }
578
579 validate_column_metadata(col_meta_mask)?;
581
582 Ok(col)
583}
584
585fn read_row_group(
586 prot: &mut ThriftSliceInputProtocol,
587 schema_descr: &Arc<SchemaDescriptor>,
588) -> Result<RowGroupMetaData> {
589 let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
591
592 const RG_COLUMNS: u8 = 1 << 1;
594 const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
595 const RG_NUM_ROWS: u8 = 1 << 3;
596 const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
597
598 let mut mask = 0u8;
599
600 let mut last_field_id = 0i16;
610 loop {
611 let field_ident = prot.read_field_begin(last_field_id)?;
612 if field_ident.field_type == FieldType::Stop {
613 break;
614 }
615 match field_ident.id {
616 1 => {
617 let list_ident = prot.read_list_begin()?;
618 if schema_descr.num_columns() != list_ident.size as usize {
619 return Err(general_err!(
620 "Column count mismatch. Schema has {} columns while Row Group has {}",
621 schema_descr.num_columns(),
622 list_ident.size
623 ));
624 }
625 for i in 0..list_ident.size as usize {
626 let col = read_column_chunk(prot, &schema_descr.columns()[i])?;
627 row_group.columns.push(col);
628 }
629 mask |= RG_COLUMNS;
630 }
631 2 => {
632 row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
633 mask |= RG_TOT_BYTE_SIZE;
634 }
635 3 => {
636 row_group.num_rows = i64::read_thrift(&mut *prot)?;
637 mask |= RG_NUM_ROWS;
638 }
639 4 => {
640 let val = read_thrift_vec::<SortingColumn, ThriftSliceInputProtocol>(&mut *prot)?;
641 row_group.sorting_columns = Some(val);
642 }
643 5 => {
644 row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
645 }
646 7 => {
648 row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
649 }
650 _ => {
651 prot.skip(field_ident.field_type)?;
652 }
653 };
654 last_field_id = field_ident.id;
655 }
656
657 if mask != RG_ALL_REQUIRED {
658 if mask & RG_COLUMNS == 0 {
659 return Err(general_err!("Required field columns is missing"));
660 }
661 if mask & RG_TOT_BYTE_SIZE == 0 {
662 return Err(general_err!("Required field total_byte_size is missing"));
663 }
664 if mask & RG_NUM_ROWS == 0 {
665 return Err(general_err!("Required field num_rows is missing"));
666 }
667 }
668
669 Ok(row_group)
670}
671
672pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result<SchemaDescriptor> {
675 let mut prot = ThriftSliceInputProtocol::new(buf);
676
677 let mut last_field_id = 0i16;
678 loop {
679 let field_ident = prot.read_field_begin(last_field_id)?;
680 if field_ident.field_type == FieldType::Stop {
681 break;
682 }
683 match field_ident.id {
684 2 => {
685 let val = read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
687 let val = parquet_schema_from_array(val)?;
688 return Ok(SchemaDescriptor::new(val));
689 }
690 _ => prot.skip(field_ident.field_type)?,
691 }
692 last_field_id = field_ident.id;
693 }
694 Err(general_err!("Input does not contain a schema"))
695}
696
697pub(crate) fn parquet_metadata_from_bytes(
700 buf: &[u8],
701 options: Option<&ParquetMetaDataOptions>,
702) -> Result<ParquetMetaData> {
703 let mut prot = ThriftSliceInputProtocol::new(buf);
704
705 let mut version: Option<i32> = None;
707 let mut num_rows: Option<i64> = None;
708 let mut row_groups: Option<Vec<RowGroupMetaData>> = None;
709 let mut key_value_metadata: Option<Vec<KeyValue>> = None;
710 let mut created_by: Option<&str> = None;
711 let mut column_orders: Option<Vec<ColumnOrder>> = None;
712 #[cfg(feature = "encryption")]
713 let mut encryption_algorithm: Option<EncryptionAlgorithm> = None;
714 #[cfg(feature = "encryption")]
715 let mut footer_signing_key_metadata: Option<&[u8]> = None;
716
717 let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
719
720 if let Some(options) = options {
722 schema_descr = options.schema().cloned();
723 }
724
725 let mut last_field_id = 0i16;
737 loop {
738 let field_ident = prot.read_field_begin(last_field_id)?;
739 if field_ident.field_type == FieldType::Stop {
740 break;
741 }
742 match field_ident.id {
743 1 => {
744 version = Some(i32::read_thrift(&mut prot)?);
745 }
746 2 => {
747 if schema_descr.is_some() {
749 prot.skip(field_ident.field_type)?;
750 } else {
751 let val =
753 read_thrift_vec::<SchemaElement, ThriftSliceInputProtocol>(&mut prot)?;
754 let val = parquet_schema_from_array(val)?;
755 schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
756 }
757 }
758 3 => {
759 num_rows = Some(i64::read_thrift(&mut prot)?);
760 }
761 4 => {
762 if schema_descr.is_none() {
763 return Err(general_err!("Required field schema is missing"));
764 }
765 let schema_descr = schema_descr.as_ref().unwrap();
766 let list_ident = prot.read_list_begin()?;
767 let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
768
769 let mut assigner = OrdinalAssigner::new();
771 for ordinal in 0..list_ident.size {
772 let ordinal: i16 = ordinal.try_into().map_err(|_| {
773 ParquetError::General(format!(
774 "Row group ordinal {ordinal} exceeds i16 max value",
775 ))
776 })?;
777 let rg = read_row_group(&mut prot, schema_descr)?;
778 rg_vec.push(assigner.ensure(ordinal, rg)?);
779 }
780 row_groups = Some(rg_vec);
781 }
782 5 => {
783 let val = read_thrift_vec::<KeyValue, ThriftSliceInputProtocol>(&mut prot)?;
784 key_value_metadata = Some(val);
785 }
786 6 => {
787 created_by = Some(<&str>::read_thrift(&mut prot)?);
788 }
789 7 => {
790 let val = read_thrift_vec::<ColumnOrder, ThriftSliceInputProtocol>(&mut prot)?;
791 column_orders = Some(val);
792 }
793 #[cfg(feature = "encryption")]
794 8 => {
795 let val = EncryptionAlgorithm::read_thrift(&mut prot)?;
796 encryption_algorithm = Some(val);
797 }
798 #[cfg(feature = "encryption")]
799 9 => {
800 footer_signing_key_metadata = Some(<&[u8]>::read_thrift(&mut prot)?);
801 }
802 _ => {
803 prot.skip(field_ident.field_type)?;
804 }
805 };
806 last_field_id = field_ident.id;
807 }
808 let Some(version) = version else {
809 return Err(general_err!("Required field version is missing"));
810 };
811 let Some(num_rows) = num_rows else {
812 return Err(general_err!("Required field num_rows is missing"));
813 };
814 let Some(row_groups) = row_groups else {
815 return Err(general_err!("Required field row_groups is missing"));
816 };
817
818 let created_by = created_by.map(|c| c.to_owned());
819
820 let schema_descr = schema_descr.unwrap();
822
823 if column_orders
825 .as_ref()
826 .is_some_and(|cos| cos.len() != schema_descr.num_columns())
827 {
828 return Err(general_err!("Column order length mismatch"));
829 }
830 let column_orders = column_orders.map(|mut cos| {
833 for (i, column) in schema_descr.columns().iter().enumerate() {
834 if let ColumnOrder::TYPE_DEFINED_ORDER(_) = cos[i] {
835 let sort_order = ColumnOrder::sort_order_for_type(
836 column.logical_type_ref(),
837 column.converted_type(),
838 column.physical_type(),
839 );
840 cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order);
841 }
842 }
843 cos
844 });
845
846 #[cfg(not(feature = "encryption"))]
847 let fmd = crate::file::metadata::FileMetaData::new(
848 version,
849 num_rows,
850 created_by,
851 key_value_metadata,
852 schema_descr,
853 column_orders,
854 );
855 #[cfg(feature = "encryption")]
856 let fmd = crate::file::metadata::FileMetaData::new(
857 version,
858 num_rows,
859 created_by,
860 key_value_metadata,
861 schema_descr,
862 column_orders,
863 )
864 .with_encryption_algorithm(encryption_algorithm)
865 .with_footer_signing_key_metadata(footer_signing_key_metadata.map(|v| v.to_vec()));
866
867 Ok(ParquetMetaData::new(fmd, row_groups))
868}
869
870#[derive(Debug, Default)]
872pub(crate) struct OrdinalAssigner {
873 first_has_ordinal: Option<bool>,
874}
875
876impl OrdinalAssigner {
877 fn new() -> Self {
878 Default::default()
879 }
880
881 fn ensure(
894 &mut self,
895 actual_ordinal: i16,
896 mut rg: RowGroupMetaData,
897 ) -> Result<RowGroupMetaData> {
898 let rg_has_ordinal = rg.ordinal.is_some();
899
900 if self.first_has_ordinal.is_none() {
902 self.first_has_ordinal = Some(rg_has_ordinal);
903 }
904
905 let first_has_ordinal = self.first_has_ordinal.unwrap();
907 if !first_has_ordinal && !rg_has_ordinal {
908 rg.ordinal = Some(actual_ordinal);
909 } else if first_has_ordinal != rg_has_ordinal {
910 return Err(general_err!(
911 "Inconsistent ordinal assignment: first_has_ordinal is set to \
912 {} but row-group with actual ordinal {} has rg_has_ordinal set to {}",
913 first_has_ordinal,
914 actual_ordinal,
915 rg_has_ordinal
916 ));
917 }
918 Ok(rg)
919 }
920}
921
922thrift_struct!(
923 pub(crate) struct IndexPageHeader {}
924);
925
926thrift_struct!(
927pub(crate) struct DictionaryPageHeader {
928 1: required i32 num_values;
930
931 2: required Encoding encoding
933
934 3: optional bool is_sorted;
936}
937);
938
939thrift_struct!(
940pub(crate) struct PageStatistics {
948 1: optional binary max;
949 2: optional binary min;
950 3: optional i64 null_count;
951 4: optional i64 distinct_count;
952 5: optional binary max_value;
953 6: optional binary min_value;
954 7: optional bool is_max_value_exact;
955 8: optional bool is_min_value_exact;
956}
957);
958
959thrift_struct!(
960pub(crate) struct DataPageHeader {
961 1: required i32 num_values
962 2: required Encoding encoding
963 3: required Encoding definition_level_encoding;
964 4: required Encoding repetition_level_encoding;
965 5: optional PageStatistics statistics;
966}
967);
968
969impl DataPageHeader {
970 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
972 where
973 R: ThriftCompactInputProtocol<'a>,
974 {
975 let mut num_values: Option<i32> = None;
976 let mut encoding: Option<Encoding> = None;
977 let mut definition_level_encoding: Option<Encoding> = None;
978 let mut repetition_level_encoding: Option<Encoding> = None;
979 let statistics: Option<PageStatistics> = None;
980 let mut last_field_id = 0i16;
981 loop {
982 let field_ident = prot.read_field_begin(last_field_id)?;
983 if field_ident.field_type == FieldType::Stop {
984 break;
985 }
986 match field_ident.id {
987 1 => {
988 let val = i32::read_thrift(&mut *prot)?;
989 num_values = Some(val);
990 }
991 2 => {
992 let val = Encoding::read_thrift(&mut *prot)?;
993 encoding = Some(val);
994 }
995 3 => {
996 let val = Encoding::read_thrift(&mut *prot)?;
997 definition_level_encoding = Some(val);
998 }
999 4 => {
1000 let val = Encoding::read_thrift(&mut *prot)?;
1001 repetition_level_encoding = Some(val);
1002 }
1003 _ => {
1004 prot.skip(field_ident.field_type)?;
1005 }
1006 };
1007 last_field_id = field_ident.id;
1008 }
1009 let Some(num_values) = num_values else {
1010 return Err(general_err!("Required field num_values is missing"));
1011 };
1012 let Some(encoding) = encoding else {
1013 return Err(general_err!("Required field encoding is missing"));
1014 };
1015 let Some(definition_level_encoding) = definition_level_encoding else {
1016 return Err(general_err!(
1017 "Required field definition_level_encoding is missing"
1018 ));
1019 };
1020 let Some(repetition_level_encoding) = repetition_level_encoding else {
1021 return Err(general_err!(
1022 "Required field repetition_level_encoding is missing"
1023 ));
1024 };
1025 Ok(Self {
1026 num_values,
1027 encoding,
1028 definition_level_encoding,
1029 repetition_level_encoding,
1030 statistics,
1031 })
1032 }
1033}
1034
1035thrift_struct!(
1036pub(crate) struct DataPageHeaderV2 {
1037 1: required i32 num_values
1038 2: required i32 num_nulls
1039 3: required i32 num_rows
1040 4: required Encoding encoding
1041 5: required i32 definition_levels_byte_length;
1042 6: required i32 repetition_levels_byte_length;
1043 7: optional bool is_compressed = true;
1044 8: optional PageStatistics statistics;
1045}
1046);
1047
1048impl DataPageHeaderV2 {
1049 fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1051 where
1052 R: ThriftCompactInputProtocol<'a>,
1053 {
1054 let mut num_values: Option<i32> = None;
1055 let mut num_nulls: Option<i32> = None;
1056 let mut num_rows: Option<i32> = None;
1057 let mut encoding: Option<Encoding> = None;
1058 let mut definition_levels_byte_length: Option<i32> = None;
1059 let mut repetition_levels_byte_length: Option<i32> = None;
1060 let mut is_compressed: Option<bool> = None;
1061 let statistics: Option<PageStatistics> = None;
1062 let mut last_field_id = 0i16;
1063 loop {
1064 let field_ident = prot.read_field_begin(last_field_id)?;
1065 if field_ident.field_type == FieldType::Stop {
1066 break;
1067 }
1068 match field_ident.id {
1069 1 => {
1070 let val = i32::read_thrift(&mut *prot)?;
1071 num_values = Some(val);
1072 }
1073 2 => {
1074 let val = i32::read_thrift(&mut *prot)?;
1075 num_nulls = Some(val);
1076 }
1077 3 => {
1078 let val = i32::read_thrift(&mut *prot)?;
1079 num_rows = Some(val);
1080 }
1081 4 => {
1082 let val = Encoding::read_thrift(&mut *prot)?;
1083 encoding = Some(val);
1084 }
1085 5 => {
1086 let val = i32::read_thrift(&mut *prot)?;
1087 definition_levels_byte_length = Some(val);
1088 }
1089 6 => {
1090 let val = i32::read_thrift(&mut *prot)?;
1091 repetition_levels_byte_length = Some(val);
1092 }
1093 7 => {
1094 let val = field_ident.bool_val.unwrap();
1095 is_compressed = Some(val);
1096 }
1097 _ => {
1098 prot.skip(field_ident.field_type)?;
1099 }
1100 };
1101 last_field_id = field_ident.id;
1102 }
1103 let Some(num_values) = num_values else {
1104 return Err(general_err!("Required field num_values is missing"));
1105 };
1106 let Some(num_nulls) = num_nulls else {
1107 return Err(general_err!("Required field num_nulls is missing"));
1108 };
1109 let Some(num_rows) = num_rows else {
1110 return Err(general_err!("Required field num_rows is missing"));
1111 };
1112 let Some(encoding) = encoding else {
1113 return Err(general_err!("Required field encoding is missing"));
1114 };
1115 let Some(definition_levels_byte_length) = definition_levels_byte_length else {
1116 return Err(general_err!(
1117 "Required field definition_levels_byte_length is missing"
1118 ));
1119 };
1120 let Some(repetition_levels_byte_length) = repetition_levels_byte_length else {
1121 return Err(general_err!(
1122 "Required field repetition_levels_byte_length is missing"
1123 ));
1124 };
1125 Ok(Self {
1126 num_values,
1127 num_nulls,
1128 num_rows,
1129 encoding,
1130 definition_levels_byte_length,
1131 repetition_levels_byte_length,
1132 is_compressed,
1133 statistics,
1134 })
1135 }
1136}
1137
1138thrift_struct!(
1139pub(crate) struct PageHeader {
1140 1: required PageType r#type
1142
1143 2: required i32 uncompressed_page_size
1145
1146 3: required i32 compressed_page_size
1148
1149 4: optional i32 crc
1151
1152 5: optional DataPageHeader data_page_header;
1154 6: optional IndexPageHeader index_page_header;
1155 7: optional DictionaryPageHeader dictionary_page_header;
1156 8: optional DataPageHeaderV2 data_page_header_v2;
1157}
1158);
1159
1160impl PageHeader {
1161 pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) -> Result<Self>
1165 where
1166 R: ThriftCompactInputProtocol<'a>,
1167 {
1168 let mut type_: Option<PageType> = None;
1169 let mut uncompressed_page_size: Option<i32> = None;
1170 let mut compressed_page_size: Option<i32> = None;
1171 let mut crc: Option<i32> = None;
1172 let mut data_page_header: Option<DataPageHeader> = None;
1173 let mut index_page_header: Option<IndexPageHeader> = None;
1174 let mut dictionary_page_header: Option<DictionaryPageHeader> = None;
1175 let mut data_page_header_v2: Option<DataPageHeaderV2> = None;
1176 let mut last_field_id = 0i16;
1177 loop {
1178 let field_ident = prot.read_field_begin(last_field_id)?;
1179 if field_ident.field_type == FieldType::Stop {
1180 break;
1181 }
1182 match field_ident.id {
1183 1 => {
1184 let val = PageType::read_thrift(&mut *prot)?;
1185 type_ = Some(val);
1186 }
1187 2 => {
1188 let val = i32::read_thrift(&mut *prot)?;
1189 uncompressed_page_size = Some(val);
1190 }
1191 3 => {
1192 let val = i32::read_thrift(&mut *prot)?;
1193 compressed_page_size = Some(val);
1194 }
1195 4 => {
1196 let val = i32::read_thrift(&mut *prot)?;
1197 crc = Some(val);
1198 }
1199 5 => {
1200 let val = DataPageHeader::read_thrift_without_stats(&mut *prot)?;
1201 data_page_header = Some(val);
1202 }
1203 6 => {
1204 let val = IndexPageHeader::read_thrift(&mut *prot)?;
1205 index_page_header = Some(val);
1206 }
1207 7 => {
1208 let val = DictionaryPageHeader::read_thrift(&mut *prot)?;
1209 dictionary_page_header = Some(val);
1210 }
1211 8 => {
1212 let val = DataPageHeaderV2::read_thrift_without_stats(&mut *prot)?;
1213 data_page_header_v2 = Some(val);
1214 }
1215 _ => {
1216 prot.skip(field_ident.field_type)?;
1217 }
1218 };
1219 last_field_id = field_ident.id;
1220 }
1221 let Some(type_) = type_ else {
1222 return Err(general_err!("Required field type_ is missing"));
1223 };
1224 let Some(uncompressed_page_size) = uncompressed_page_size else {
1225 return Err(general_err!(
1226 "Required field uncompressed_page_size is missing"
1227 ));
1228 };
1229 let Some(compressed_page_size) = compressed_page_size else {
1230 return Err(general_err!(
1231 "Required field compressed_page_size is missing"
1232 ));
1233 };
1234 Ok(Self {
1235 r#type: type_,
1236 uncompressed_page_size,
1237 compressed_page_size,
1238 crc,
1239 data_page_header,
1240 index_page_header,
1241 dictionary_page_header,
1242 data_page_header_v2,
1243 })
1244 }
1245}
1246
1247pub(super) fn serialize_column_meta_data<W: Write>(
1271 column_chunk: &ColumnChunkMetaData,
1272 w: &mut ThriftCompactOutputProtocol<W>,
1273) -> Result<()> {
1274 use crate::file::statistics::page_stats_to_thrift;
1275
1276 column_chunk.column_type().write_thrift_field(w, 1, 0)?;
1277 column_chunk
1278 .encodings()
1279 .collect::<Vec<_>>()
1280 .write_thrift_field(w, 2, 1)?;
1281 let path = column_chunk.column_descr.path().parts();
1282 let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
1283 path.write_thrift_field(w, 3, 2)?;
1284 column_chunk.compression.write_thrift_field(w, 4, 3)?;
1285 column_chunk.num_values.write_thrift_field(w, 5, 4)?;
1286 column_chunk
1287 .total_uncompressed_size
1288 .write_thrift_field(w, 6, 5)?;
1289 column_chunk
1290 .total_compressed_size
1291 .write_thrift_field(w, 7, 6)?;
1292 let mut last_field_id = column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
1294 if let Some(index_page_offset) = column_chunk.index_page_offset {
1295 last_field_id = index_page_offset.write_thrift_field(w, 10, last_field_id)?;
1296 }
1297 if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
1298 last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
1299 }
1300 let stats = page_stats_to_thrift(column_chunk.statistics());
1302 if let Some(stats) = stats {
1303 last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
1304 }
1305 if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
1306 last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
1307 }
1308 if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
1309 last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
1310 }
1311 if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
1312 last_field_id = bloom_filter_length.write_thrift_field(w, 15, last_field_id)?;
1313 }
1314
1315 let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
1317 || column_chunk.repetition_level_histogram.is_some()
1318 || column_chunk.definition_level_histogram.is_some()
1319 {
1320 let repetition_level_histogram = column_chunk
1321 .repetition_level_histogram()
1322 .map(|hist| hist.clone().into_inner());
1323
1324 let definition_level_histogram = column_chunk
1325 .definition_level_histogram()
1326 .map(|hist| hist.clone().into_inner());
1327
1328 Some(SizeStatistics {
1329 unencoded_byte_array_data_bytes: column_chunk.unencoded_byte_array_data_bytes,
1330 repetition_level_histogram,
1331 definition_level_histogram,
1332 })
1333 } else {
1334 None
1335 };
1336 if let Some(size_stats) = size_stats {
1337 last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
1338 }
1339
1340 if let Some(geo_stats) = column_chunk.geo_statistics() {
1341 geo_stats.write_thrift_field(w, 17, last_field_id)?;
1342 }
1343
1344 w.write_struct_end()
1345}
1346
1347pub(super) struct FileMeta<'a> {
1349 pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
1350 pub(super) row_groups: &'a Vec<RowGroupMetaData>,
1351}
1352
1353impl<'a> WriteThrift for FileMeta<'a> {
1365 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1366
1367 #[allow(unused_assignments)]
1369 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1370 self.file_metadata
1371 .version
1372 .write_thrift_field(writer, 1, 0)?;
1373
1374 let root = self.file_metadata.schema_descr().root_schema_ptr();
1377 let schema_len = num_nodes(&root)?;
1378 writer.write_field_begin(FieldType::List, 2, 1)?;
1379 writer.write_list_begin(ElementType::Struct, schema_len)?;
1380 write_schema(&root, writer)?;
1382
1383 self.file_metadata
1384 .num_rows
1385 .write_thrift_field(writer, 3, 2)?;
1386
1387 let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?;
1389
1390 if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
1391 last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?;
1392 }
1393 if let Some(created_by) = self.file_metadata.created_by() {
1394 last_field_id = created_by.write_thrift_field(writer, 6, last_field_id)?;
1395 }
1396 if let Some(column_orders) = self.file_metadata.column_orders() {
1397 last_field_id = column_orders.write_thrift_field(writer, 7, last_field_id)?;
1398 }
1399 #[cfg(feature = "encryption")]
1400 if let Some(algo) = self.file_metadata.encryption_algorithm.as_ref() {
1401 last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
1402 }
1403 #[cfg(feature = "encryption")]
1404 if let Some(key) = self.file_metadata.footer_signing_key_metadata.as_ref() {
1405 key.as_slice()
1406 .write_thrift_field(writer, 9, last_field_id)?;
1407 }
1408
1409 writer.write_struct_end()
1410 }
1411}
1412
1413fn write_schema<W: Write>(
1414 schema: &TypePtr,
1415 writer: &mut ThriftCompactOutputProtocol<W>,
1416) -> Result<()> {
1417 if !schema.is_group() {
1418 return Err(general_err!("Root schema must be Group type"));
1419 }
1420 write_schema_helper(schema, writer)
1421}
1422
1423fn write_schema_helper<W: Write>(
1424 node: &TypePtr,
1425 writer: &mut ThriftCompactOutputProtocol<W>,
1426) -> Result<()> {
1427 match node.as_ref() {
1428 crate::schema::types::Type::PrimitiveType {
1429 basic_info,
1430 physical_type,
1431 type_length,
1432 scale,
1433 precision,
1434 } => {
1435 let element = SchemaElement {
1436 r#type: Some(*physical_type),
1437 type_length: if *type_length >= 0 {
1438 Some(*type_length)
1439 } else {
1440 None
1441 },
1442 repetition_type: Some(basic_info.repetition()),
1443 name: basic_info.name(),
1444 num_children: None,
1445 converted_type: match basic_info.converted_type() {
1446 ConvertedType::NONE => None,
1447 other => Some(other),
1448 },
1449 scale: if *scale >= 0 { Some(*scale) } else { None },
1450 precision: if *precision >= 0 {
1451 Some(*precision)
1452 } else {
1453 None
1454 },
1455 field_id: if basic_info.has_id() {
1456 Some(basic_info.id())
1457 } else {
1458 None
1459 },
1460 logical_type: basic_info.logical_type_ref().cloned(),
1461 };
1462 element.write_thrift(writer)
1463 }
1464 crate::schema::types::Type::GroupType { basic_info, fields } => {
1465 let repetition = if basic_info.has_repetition() {
1466 Some(basic_info.repetition())
1467 } else {
1468 None
1469 };
1470
1471 let element = SchemaElement {
1472 r#type: None,
1473 type_length: None,
1474 repetition_type: repetition,
1475 name: basic_info.name(),
1476 num_children: Some(fields.len().try_into()?),
1477 converted_type: match basic_info.converted_type() {
1478 ConvertedType::NONE => None,
1479 other => Some(other),
1480 },
1481 scale: None,
1482 precision: None,
1483 field_id: if basic_info.has_id() {
1484 Some(basic_info.id())
1485 } else {
1486 None
1487 },
1488 logical_type: basic_info.logical_type_ref().cloned(),
1489 };
1490
1491 element.write_thrift(writer)?;
1492
1493 for field in fields {
1495 write_schema_helper(field, writer)?;
1496 }
1497 Ok(())
1498 }
1499 }
1500}
1501
1502impl WriteThrift for RowGroupMetaData {
1512 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1513
1514 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1515 self.columns.write_thrift_field(writer, 1, 0)?;
1517 self.total_byte_size.write_thrift_field(writer, 2, 1)?;
1518 let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?;
1519 if let Some(sorting_columns) = self.sorting_columns() {
1520 last_field_id = sorting_columns.write_thrift_field(writer, 4, last_field_id)?;
1521 }
1522 if let Some(file_offset) = self.file_offset() {
1523 last_field_id = file_offset.write_thrift_field(writer, 5, last_field_id)?;
1524 }
1525 last_field_id = self
1527 .compressed_size()
1528 .write_thrift_field(writer, 6, last_field_id)?;
1529 if let Some(ordinal) = self.ordinal() {
1530 ordinal.write_thrift_field(writer, 7, last_field_id)?;
1531 }
1532 writer.write_struct_end()
1533 }
1534}
1535
1536impl WriteThrift for ColumnChunkMetaData {
1548 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1549
1550 #[allow(unused_assignments)]
1551 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1552 let mut last_field_id = 0i16;
1553 if let Some(file_path) = self.file_path() {
1554 last_field_id = file_path.write_thrift_field(writer, 1, last_field_id)?;
1555 }
1556 last_field_id = self
1557 .file_offset()
1558 .write_thrift_field(writer, 2, last_field_id)?;
1559
1560 #[cfg(feature = "encryption")]
1561 {
1562 if self.encrypted_column_metadata.is_none() {
1564 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1565 serialize_column_meta_data(self, writer)?;
1566 last_field_id = 3;
1567 }
1568 }
1569 #[cfg(not(feature = "encryption"))]
1570 {
1571 writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
1573 serialize_column_meta_data(self, writer)?;
1574 last_field_id = 3;
1575 }
1576
1577 if let Some(offset_idx_off) = self.offset_index_offset() {
1578 last_field_id = offset_idx_off.write_thrift_field(writer, 4, last_field_id)?;
1579 }
1580 if let Some(offset_idx_len) = self.offset_index_length() {
1581 last_field_id = offset_idx_len.write_thrift_field(writer, 5, last_field_id)?;
1582 }
1583 if let Some(column_idx_off) = self.column_index_offset() {
1584 last_field_id = column_idx_off.write_thrift_field(writer, 6, last_field_id)?;
1585 }
1586 if let Some(column_idx_len) = self.column_index_length() {
1587 last_field_id = column_idx_len.write_thrift_field(writer, 7, last_field_id)?;
1588 }
1589 #[cfg(feature = "encryption")]
1590 {
1591 if let Some(crypto_metadata) = self.crypto_metadata() {
1592 last_field_id = crypto_metadata.write_thrift_field(writer, 8, last_field_id)?;
1593 }
1594 if let Some(encrypted_meta) = self.encrypted_column_metadata.as_ref() {
1595 encrypted_meta
1596 .as_slice()
1597 .write_thrift_field(writer, 9, last_field_id)?;
1598 }
1599 }
1600
1601 writer.write_struct_end()
1602 }
1603}
1604
1605impl WriteThrift for crate::geospatial::statistics::GeospatialStatistics {
1610 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1611
1612 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1613 let mut last_field_id = 0i16;
1614 if let Some(bbox) = self.bounding_box() {
1615 last_field_id = bbox.write_thrift_field(writer, 1, last_field_id)?;
1616 }
1617 if let Some(geo_types) = self.geospatial_types() {
1618 geo_types.write_thrift_field(writer, 2, last_field_id)?;
1619 }
1620
1621 writer.write_struct_end()
1622 }
1623}
1624
1625use crate::geospatial::statistics::GeospatialStatistics as RustGeospatialStatistics;
1627write_thrift_field!(RustGeospatialStatistics, FieldType::Struct);
1628
1629impl WriteThrift for crate::geospatial::bounding_box::BoundingBox {
1640 const ELEMENT_TYPE: ElementType = ElementType::Struct;
1641
1642 fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
1643 self.get_xmin().write_thrift_field(writer, 1, 0)?;
1644 self.get_xmax().write_thrift_field(writer, 2, 1)?;
1645 self.get_ymin().write_thrift_field(writer, 3, 2)?;
1646 let mut last_field_id = self.get_ymax().write_thrift_field(writer, 4, 3)?;
1647
1648 if let Some(zmin) = self.get_zmin() {
1649 last_field_id = zmin.write_thrift_field(writer, 5, last_field_id)?;
1650 }
1651 if let Some(zmax) = self.get_zmax() {
1652 last_field_id = zmax.write_thrift_field(writer, 6, last_field_id)?;
1653 }
1654 if let Some(mmin) = self.get_mmin() {
1655 last_field_id = mmin.write_thrift_field(writer, 7, last_field_id)?;
1656 }
1657 if let Some(mmax) = self.get_mmax() {
1658 mmax.write_thrift_field(writer, 8, last_field_id)?;
1659 }
1660
1661 writer.write_struct_end()
1662 }
1663}
1664
1665use crate::geospatial::bounding_box::BoundingBox as RustBoundingBox;
1667write_thrift_field!(RustBoundingBox, FieldType::Struct);
1668
1669#[cfg(test)]
1670pub(crate) mod tests {
1671 use crate::errors::Result;
1672 use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema};
1673 use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1674 use crate::parquet_thrift::tests::test_roundtrip;
1675 use crate::parquet_thrift::{
1676 ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec,
1677 };
1678 use crate::schema::types::{
1679 ColumnDescriptor, SchemaDescriptor, TypePtr, num_nodes, parquet_schema_from_array,
1680 };
1681 use std::sync::Arc;
1682
1683 pub(crate) fn read_row_group(
1685 buf: &mut [u8],
1686 schema_descr: Arc<SchemaDescriptor>,
1687 ) -> Result<RowGroupMetaData> {
1688 let mut reader = ThriftSliceInputProtocol::new(buf);
1689 crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr)
1690 }
1691
1692 pub(crate) fn read_column_chunk(
1693 buf: &mut [u8],
1694 column_descr: Arc<ColumnDescriptor>,
1695 ) -> Result<ColumnChunkMetaData> {
1696 let mut reader = ThriftSliceInputProtocol::new(buf);
1697 crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr)
1698 }
1699
1700 pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
1701 let num_nodes = num_nodes(&schema)?;
1702 let mut buf = Vec::new();
1703 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1704
1705 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1707
1708 write_schema(&schema, &mut writer)?;
1710
1711 let mut prot = ThriftSliceInputProtocol::new(&buf);
1712 let se: Vec<SchemaElement> = read_thrift_vec(&mut prot)?;
1713 parquet_schema_from_array(se)
1714 }
1715
1716 pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result<Vec<u8>> {
1717 let num_nodes = num_nodes(schema)?;
1718 let mut buf = Vec::new();
1719 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1720
1721 writer.write_list_begin(ElementType::Struct, num_nodes)?;
1723
1724 write_schema(schema, &mut writer)?;
1726 Ok(buf)
1727 }
1728
1729 pub(crate) fn buf_to_schema_list<'a>(buf: &'a mut Vec<u8>) -> Result<Vec<SchemaElement<'a>>> {
1730 let mut prot = ThriftSliceInputProtocol::new(buf.as_mut_slice());
1731 read_thrift_vec(&mut prot)
1732 }
1733
1734 #[test]
1735 fn test_bounding_box_roundtrip() {
1736 test_roundtrip(BoundingBox {
1737 xmin: 0.1.into(),
1738 xmax: 10.3.into(),
1739 ymin: 0.001.into(),
1740 ymax: 128.5.into(),
1741 zmin: None,
1742 zmax: None,
1743 mmin: None,
1744 mmax: None,
1745 });
1746
1747 test_roundtrip(BoundingBox {
1748 xmin: 0.1.into(),
1749 xmax: 10.3.into(),
1750 ymin: 0.001.into(),
1751 ymax: 128.5.into(),
1752 zmin: Some(11.0.into()),
1753 zmax: Some(1300.0.into()),
1754 mmin: None,
1755 mmax: None,
1756 });
1757
1758 test_roundtrip(BoundingBox {
1759 xmin: 0.1.into(),
1760 xmax: 10.3.into(),
1761 ymin: 0.001.into(),
1762 ymax: 128.5.into(),
1763 zmin: Some(11.0.into()),
1764 zmax: Some(1300.0.into()),
1765 mmin: Some(3.7.into()),
1766 mmax: Some(42.0.into()),
1767 });
1768 }
1769}