1mod footer_tail;
91mod memory;
92mod parser;
93mod push_decoder;
94pub(crate) mod reader;
95pub(crate) mod thrift;
96mod writer;
97
98use crate::basic::{EncodingMask, PageType};
99#[cfg(feature = "encryption")]
100use crate::encryption::decrypt::FileDecryptor;
101#[cfg(feature = "encryption")]
102use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
103pub(crate) use crate::file::metadata::memory::HeapSize;
104#[cfg(feature = "encryption")]
105use crate::file::metadata::thrift::encryption::EncryptionAlgorithm;
106use crate::file::page_index::column_index::{ByteArrayColumnIndex, PrimitiveColumnIndex};
107use crate::file::page_index::{column_index::ColumnIndexMetaData, offset_index::PageLocation};
108use crate::file::statistics::Statistics;
109use crate::geospatial::statistics as geo_statistics;
110use crate::schema::types::{
111 ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor,
112 Type as SchemaType,
113};
114use crate::thrift_struct;
115use crate::{
116 basic::BoundaryOrder,
117 errors::{ParquetError, Result},
118};
119use crate::{
120 basic::{ColumnOrder, Compression, Encoding, Type},
121 parquet_thrift::{
122 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
123 ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
124 },
125};
126use crate::{
127 data_type::private::ParquetValueType, file::page_index::offset_index::OffsetIndexMetaData,
128};
129
130pub use footer_tail::FooterTail;
131pub use push_decoder::ParquetMetaDataPushDecoder;
132pub use reader::{PageIndexPolicy, ParquetMetaDataReader};
133use std::io::Write;
134use std::ops::Range;
135use std::sync::Arc;
136pub use writer::ParquetMetaDataWriter;
137pub(crate) use writer::ThriftMetadataWriter;
138
139pub type ParquetColumnIndex = Vec<Vec<ColumnIndexMetaData>>;
156
157pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
169
170#[derive(Debug, Clone, PartialEq)]
188pub struct ParquetMetaData {
189 file_metadata: FileMetaData,
191 row_groups: Vec<RowGroupMetaData>,
193 column_index: Option<ParquetColumnIndex>,
195 offset_index: Option<ParquetOffsetIndex>,
197 #[cfg(feature = "encryption")]
199 file_decryptor: Option<Box<FileDecryptor>>,
200}
201
202impl ParquetMetaData {
203 pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self {
206 ParquetMetaData {
207 file_metadata,
208 row_groups,
209 column_index: None,
210 offset_index: None,
211 #[cfg(feature = "encryption")]
212 file_decryptor: None,
213 }
214 }
215
216 #[cfg(feature = "encryption")]
219 pub(crate) fn with_file_decryptor(&mut self, file_decryptor: Option<FileDecryptor>) {
220 self.file_decryptor = file_decryptor.map(Box::new);
221 }
222
223 pub fn into_builder(self) -> ParquetMetaDataBuilder {
225 self.into()
226 }
227
228 pub fn file_metadata(&self) -> &FileMetaData {
230 &self.file_metadata
231 }
232
233 #[cfg(feature = "encryption")]
235 pub(crate) fn file_decryptor(&self) -> Option<&FileDecryptor> {
236 self.file_decryptor.as_deref()
237 }
238
239 pub fn num_row_groups(&self) -> usize {
241 self.row_groups.len()
242 }
243
244 pub fn row_group(&self, i: usize) -> &RowGroupMetaData {
247 &self.row_groups[i]
248 }
249
250 pub fn row_groups(&self) -> &[RowGroupMetaData] {
252 &self.row_groups
253 }
254
255 pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
262 self.column_index.as_ref()
263 }
264
265 pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
272 self.offset_index.as_ref()
273 }
274
275 pub fn memory_size(&self) -> usize {
290 std::mem::size_of::<Self>()
291 + self.file_metadata.heap_size()
292 + self.row_groups.heap_size()
293 + self.column_index.heap_size()
294 + self.offset_index.heap_size()
295 }
296
297 pub(crate) fn set_column_index(&mut self, index: Option<ParquetColumnIndex>) {
299 self.column_index = index;
300 }
301
302 pub(crate) fn set_offset_index(&mut self, index: Option<ParquetOffsetIndex>) {
304 self.offset_index = index;
305 }
306}
307
308pub struct ParquetMetaDataBuilder(ParquetMetaData);
346
347impl ParquetMetaDataBuilder {
348 pub fn new(file_meta_data: FileMetaData) -> Self {
350 Self(ParquetMetaData::new(file_meta_data, vec![]))
351 }
352
353 pub fn new_from_metadata(metadata: ParquetMetaData) -> Self {
355 Self(metadata)
356 }
357
358 pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self {
360 self.0.row_groups.push(row_group);
361 self
362 }
363
364 pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self {
366 self.0.row_groups = row_groups;
367 self
368 }
369
370 pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> {
376 std::mem::take(&mut self.0.row_groups)
377 }
378
379 pub fn row_groups(&self) -> &[RowGroupMetaData] {
381 &self.0.row_groups
382 }
383
384 pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self {
386 self.0.column_index = column_index;
387 self
388 }
389
390 pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> {
392 std::mem::take(&mut self.0.column_index)
393 }
394
395 pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
397 self.0.column_index.as_ref()
398 }
399
400 pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self {
402 self.0.offset_index = offset_index;
403 self
404 }
405
406 pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> {
408 std::mem::take(&mut self.0.offset_index)
409 }
410
411 pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
413 self.0.offset_index.as_ref()
414 }
415
416 #[cfg(feature = "encryption")]
418 pub(crate) fn set_file_decryptor(mut self, file_decryptor: Option<FileDecryptor>) -> Self {
419 self.0.with_file_decryptor(file_decryptor);
420 self
421 }
422
423 pub fn build(self) -> ParquetMetaData {
425 let Self(metadata) = self;
426 metadata
427 }
428}
429
430impl From<ParquetMetaData> for ParquetMetaDataBuilder {
431 fn from(meta_data: ParquetMetaData) -> Self {
432 Self(meta_data)
433 }
434}
435
436thrift_struct!(
437pub struct KeyValue {
439 1: required string key
440 2: optional string value
441}
442);
443
444impl KeyValue {
445 pub fn new<F2>(key: String, value: F2) -> KeyValue
447 where
448 F2: Into<Option<String>>,
449 {
450 KeyValue {
451 key,
452 value: value.into(),
453 }
454 }
455}
456
457thrift_struct!(
458pub struct PageEncodingStats {
460 1: required PageType page_type;
461 2: required Encoding encoding;
462 3: required i32 count;
463}
464);
465
466pub type FileMetaDataPtr = Arc<FileMetaData>;
468
469#[derive(Debug, Clone, PartialEq)]
473pub struct FileMetaData {
474 version: i32,
475 num_rows: i64,
476 created_by: Option<String>,
477 key_value_metadata: Option<Vec<KeyValue>>,
478 schema_descr: SchemaDescPtr,
479 column_orders: Option<Vec<ColumnOrder>>,
480 #[cfg(feature = "encryption")]
481 encryption_algorithm: Option<Box<EncryptionAlgorithm>>,
482 #[cfg(feature = "encryption")]
483 footer_signing_key_metadata: Option<Vec<u8>>,
484}
485
486impl FileMetaData {
487 pub fn new(
489 version: i32,
490 num_rows: i64,
491 created_by: Option<String>,
492 key_value_metadata: Option<Vec<KeyValue>>,
493 schema_descr: SchemaDescPtr,
494 column_orders: Option<Vec<ColumnOrder>>,
495 ) -> Self {
496 FileMetaData {
497 version,
498 num_rows,
499 created_by,
500 key_value_metadata,
501 schema_descr,
502 column_orders,
503 #[cfg(feature = "encryption")]
504 encryption_algorithm: None,
505 #[cfg(feature = "encryption")]
506 footer_signing_key_metadata: None,
507 }
508 }
509
510 #[cfg(feature = "encryption")]
511 pub(crate) fn with_encryption_algorithm(
512 mut self,
513 encryption_algorithm: Option<EncryptionAlgorithm>,
514 ) -> Self {
515 self.encryption_algorithm = encryption_algorithm.map(Box::new);
516 self
517 }
518
519 #[cfg(feature = "encryption")]
520 pub(crate) fn with_footer_signing_key_metadata(
521 mut self,
522 footer_signing_key_metadata: Option<Vec<u8>>,
523 ) -> Self {
524 self.footer_signing_key_metadata = footer_signing_key_metadata;
525 self
526 }
527
528 pub fn version(&self) -> i32 {
530 self.version
531 }
532
533 pub fn num_rows(&self) -> i64 {
535 self.num_rows
536 }
537
538 pub fn created_by(&self) -> Option<&str> {
547 self.created_by.as_deref()
548 }
549
550 pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
552 self.key_value_metadata.as_ref()
553 }
554
555 pub fn schema(&self) -> &SchemaType {
559 self.schema_descr.root_schema()
560 }
561
562 pub fn schema_descr(&self) -> &SchemaDescriptor {
564 &self.schema_descr
565 }
566
567 pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
569 self.schema_descr.clone()
570 }
571
572 pub fn column_orders(&self) -> Option<&Vec<ColumnOrder>> {
580 self.column_orders.as_ref()
581 }
582
583 pub fn column_order(&self, i: usize) -> ColumnOrder {
586 self.column_orders
587 .as_ref()
588 .map(|data| data[i])
589 .unwrap_or(ColumnOrder::UNDEFINED)
590 }
591}
592
593thrift_struct!(
594pub struct SortingColumn {
596 1: required i32 column_idx
598
599 2: required bool descending
601
602 3: required bool nulls_first
605}
606);
607
608pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
610
611#[derive(Debug, Clone, PartialEq)]
616pub struct RowGroupMetaData {
617 columns: Vec<ColumnChunkMetaData>,
618 num_rows: i64,
619 sorting_columns: Option<Vec<SortingColumn>>,
620 total_byte_size: i64,
621 schema_descr: SchemaDescPtr,
622 file_offset: Option<i64>,
624 ordinal: Option<i16>,
626}
627
628impl RowGroupMetaData {
629 pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
631 RowGroupMetaDataBuilder::new(schema_descr)
632 }
633
634 pub fn num_columns(&self) -> usize {
636 self.columns.len()
637 }
638
639 pub fn column(&self, i: usize) -> &ColumnChunkMetaData {
641 &self.columns[i]
642 }
643
644 pub fn columns(&self) -> &[ColumnChunkMetaData] {
646 &self.columns
647 }
648
649 pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
651 &mut self.columns
652 }
653
654 pub fn num_rows(&self) -> i64 {
656 self.num_rows
657 }
658
659 pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
661 self.sorting_columns.as_ref()
662 }
663
664 pub fn total_byte_size(&self) -> i64 {
666 self.total_byte_size
667 }
668
669 pub fn compressed_size(&self) -> i64 {
671 self.columns.iter().map(|c| c.total_compressed_size).sum()
672 }
673
674 pub fn schema_descr(&self) -> &SchemaDescriptor {
676 self.schema_descr.as_ref()
677 }
678
679 pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
681 self.schema_descr.clone()
682 }
683
684 #[inline(always)]
689 pub fn ordinal(&self) -> Option<i16> {
690 self.ordinal
691 }
692
693 #[inline(always)]
695 pub fn file_offset(&self) -> Option<i64> {
696 self.file_offset
697 }
698
699 pub fn into_builder(self) -> RowGroupMetaDataBuilder {
701 RowGroupMetaDataBuilder(self)
702 }
703}
704
705pub struct RowGroupMetaDataBuilder(RowGroupMetaData);
707
708impl RowGroupMetaDataBuilder {
709 fn new(schema_descr: SchemaDescPtr) -> Self {
711 Self(RowGroupMetaData {
712 columns: Vec::with_capacity(schema_descr.num_columns()),
713 schema_descr,
714 file_offset: None,
715 num_rows: 0,
716 sorting_columns: None,
717 total_byte_size: 0,
718 ordinal: None,
719 })
720 }
721
722 pub fn set_num_rows(mut self, value: i64) -> Self {
724 self.0.num_rows = value;
725 self
726 }
727
728 pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
730 self.0.sorting_columns = value;
731 self
732 }
733
734 pub fn set_total_byte_size(mut self, value: i64) -> Self {
736 self.0.total_byte_size = value;
737 self
738 }
739
740 pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> {
746 std::mem::take(&mut self.0.columns)
747 }
748
749 pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
751 self.0.columns = value;
752 self
753 }
754
755 pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self {
757 self.0.columns.push(value);
758 self
759 }
760
761 pub fn set_ordinal(mut self, value: i16) -> Self {
763 self.0.ordinal = Some(value);
764 self
765 }
766
767 pub fn set_file_offset(mut self, value: i64) -> Self {
769 self.0.file_offset = Some(value);
770 self
771 }
772
773 pub fn build(self) -> Result<RowGroupMetaData> {
775 if self.0.schema_descr.num_columns() != self.0.columns.len() {
776 return Err(general_err!(
777 "Column length mismatch: {} != {}",
778 self.0.schema_descr.num_columns(),
779 self.0.columns.len()
780 ));
781 }
782
783 Ok(self.0)
784 }
785
786 pub(super) fn build_unchecked(self) -> RowGroupMetaData {
788 self.0
789 }
790}
791
792#[derive(Debug, Clone, PartialEq)]
794pub struct ColumnChunkMetaData {
795 column_descr: ColumnDescPtr,
796 encodings: EncodingMask,
797 file_path: Option<String>,
798 file_offset: i64,
799 num_values: i64,
800 compression: Compression,
801 total_compressed_size: i64,
802 total_uncompressed_size: i64,
803 data_page_offset: i64,
804 index_page_offset: Option<i64>,
805 dictionary_page_offset: Option<i64>,
806 statistics: Option<Statistics>,
807 geo_statistics: Option<Box<geo_statistics::GeospatialStatistics>>,
808 encoding_stats: Option<Vec<PageEncodingStats>>,
809 bloom_filter_offset: Option<i64>,
810 bloom_filter_length: Option<i32>,
811 offset_index_offset: Option<i64>,
812 offset_index_length: Option<i32>,
813 column_index_offset: Option<i64>,
814 column_index_length: Option<i32>,
815 unencoded_byte_array_data_bytes: Option<i64>,
816 repetition_level_histogram: Option<LevelHistogram>,
817 definition_level_histogram: Option<LevelHistogram>,
818 #[cfg(feature = "encryption")]
819 column_crypto_metadata: Option<Box<ColumnCryptoMetaData>>,
820 #[cfg(feature = "encryption")]
821 encrypted_column_metadata: Option<Vec<u8>>,
822}
823
824#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
833pub struct LevelHistogram {
834 inner: Vec<i64>,
835}
836
837impl LevelHistogram {
838 pub fn try_new(max_level: i16) -> Option<Self> {
844 if max_level > 0 {
845 Some(Self {
846 inner: vec![0; max_level as usize + 1],
847 })
848 } else {
849 None
850 }
851 }
852 pub fn values(&self) -> &[i64] {
854 &self.inner
855 }
856
857 pub fn into_inner(self) -> Vec<i64> {
859 self.inner
860 }
861
862 pub fn get(&self, index: usize) -> Option<i64> {
869 self.inner.get(index).copied()
870 }
871
872 pub fn add(&mut self, other: &Self) {
877 assert_eq!(self.len(), other.len());
878 for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) {
879 *dst += src;
880 }
881 }
882
883 pub fn len(&self) -> usize {
885 self.inner.len()
886 }
887
888 pub fn is_empty(&self) -> bool {
890 self.inner.is_empty()
891 }
892
893 pub fn reset(&mut self) {
895 for value in self.inner.iter_mut() {
896 *value = 0;
897 }
898 }
899
900 pub fn update_from_levels(&mut self, levels: &[i16]) {
906 for &level in levels {
907 self.inner[level as usize] += 1;
908 }
909 }
910}
911
912impl From<Vec<i64>> for LevelHistogram {
913 fn from(inner: Vec<i64>) -> Self {
914 Self { inner }
915 }
916}
917
918impl From<LevelHistogram> for Vec<i64> {
919 fn from(value: LevelHistogram) -> Self {
920 value.into_inner()
921 }
922}
923
924impl HeapSize for LevelHistogram {
925 fn heap_size(&self) -> usize {
926 self.inner.heap_size()
927 }
928}
929
930impl ColumnChunkMetaData {
932 pub fn builder(column_descr: ColumnDescPtr) -> ColumnChunkMetaDataBuilder {
934 ColumnChunkMetaDataBuilder::new(column_descr)
935 }
936
937 pub fn file_path(&self) -> Option<&str> {
942 self.file_path.as_deref()
943 }
944
945 pub fn file_offset(&self) -> i64 {
952 self.file_offset
953 }
954
955 pub fn column_type(&self) -> Type {
957 self.column_descr.physical_type()
958 }
959
960 pub fn column_path(&self) -> &ColumnPath {
962 self.column_descr.path()
963 }
964
965 pub fn column_descr(&self) -> &ColumnDescriptor {
967 self.column_descr.as_ref()
968 }
969
970 pub fn column_descr_ptr(&self) -> ColumnDescPtr {
972 self.column_descr.clone()
973 }
974
975 pub fn encodings(&self) -> impl Iterator<Item = Encoding> {
977 self.encodings.encodings()
978 }
979
980 pub fn encodings_mask(&self) -> &EncodingMask {
982 &self.encodings
983 }
984
985 pub fn num_values(&self) -> i64 {
987 self.num_values
988 }
989
990 pub fn compression(&self) -> Compression {
992 self.compression
993 }
994
995 pub fn compressed_size(&self) -> i64 {
997 self.total_compressed_size
998 }
999
1000 pub fn uncompressed_size(&self) -> i64 {
1002 self.total_uncompressed_size
1003 }
1004
1005 pub fn data_page_offset(&self) -> i64 {
1007 self.data_page_offset
1008 }
1009
1010 pub fn index_page_offset(&self) -> Option<i64> {
1012 self.index_page_offset
1013 }
1014
1015 pub fn dictionary_page_offset(&self) -> Option<i64> {
1017 self.dictionary_page_offset
1018 }
1019
1020 pub fn byte_range(&self) -> (u64, u64) {
1022 let col_start = match self.dictionary_page_offset() {
1023 Some(dictionary_page_offset) => dictionary_page_offset,
1024 None => self.data_page_offset(),
1025 };
1026 let col_len = self.compressed_size();
1027 assert!(
1028 col_start >= 0 && col_len >= 0,
1029 "column start and length should not be negative"
1030 );
1031 (col_start as u64, col_len as u64)
1032 }
1033
1034 pub fn statistics(&self) -> Option<&Statistics> {
1037 self.statistics.as_ref()
1038 }
1039
1040 pub fn geo_statistics(&self) -> Option<&geo_statistics::GeospatialStatistics> {
1043 self.geo_statistics.as_deref()
1044 }
1045
1046 pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
1049 self.encoding_stats.as_ref()
1050 }
1051
1052 pub fn bloom_filter_offset(&self) -> Option<i64> {
1054 self.bloom_filter_offset
1055 }
1056
1057 pub fn bloom_filter_length(&self) -> Option<i32> {
1059 self.bloom_filter_length
1060 }
1061
1062 pub fn column_index_offset(&self) -> Option<i64> {
1064 self.column_index_offset
1065 }
1066
1067 pub fn column_index_length(&self) -> Option<i32> {
1069 self.column_index_length
1070 }
1071
1072 pub(crate) fn column_index_range(&self) -> Option<Range<u64>> {
1074 let offset = u64::try_from(self.column_index_offset?).ok()?;
1075 let length = u64::try_from(self.column_index_length?).ok()?;
1076 Some(offset..(offset + length))
1077 }
1078
1079 pub fn offset_index_offset(&self) -> Option<i64> {
1081 self.offset_index_offset
1082 }
1083
1084 pub fn offset_index_length(&self) -> Option<i32> {
1086 self.offset_index_length
1087 }
1088
1089 pub(crate) fn offset_index_range(&self) -> Option<Range<u64>> {
1091 let offset = u64::try_from(self.offset_index_offset?).ok()?;
1092 let length = u64::try_from(self.offset_index_length?).ok()?;
1093 Some(offset..(offset + length))
1094 }
1095
1096 pub fn unencoded_byte_array_data_bytes(&self) -> Option<i64> {
1101 self.unencoded_byte_array_data_bytes
1102 }
1103
1104 pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
1110 self.repetition_level_histogram.as_ref()
1111 }
1112
1113 pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
1119 self.definition_level_histogram.as_ref()
1120 }
1121
1122 #[cfg(feature = "encryption")]
1124 pub fn crypto_metadata(&self) -> Option<&ColumnCryptoMetaData> {
1125 self.column_crypto_metadata.as_deref()
1126 }
1127
1128 pub fn into_builder(self) -> ColumnChunkMetaDataBuilder {
1130 ColumnChunkMetaDataBuilder::from(self)
1131 }
1132}
1133
1134pub struct ColumnChunkMetaDataBuilder(ColumnChunkMetaData);
1153
1154impl ColumnChunkMetaDataBuilder {
1155 fn new(column_descr: ColumnDescPtr) -> Self {
1159 Self(ColumnChunkMetaData {
1160 column_descr,
1161 encodings: Default::default(),
1162 file_path: None,
1163 file_offset: 0,
1164 num_values: 0,
1165 compression: Compression::UNCOMPRESSED,
1166 total_compressed_size: 0,
1167 total_uncompressed_size: 0,
1168 data_page_offset: 0,
1169 index_page_offset: None,
1170 dictionary_page_offset: None,
1171 statistics: None,
1172 geo_statistics: None,
1173 encoding_stats: None,
1174 bloom_filter_offset: None,
1175 bloom_filter_length: None,
1176 offset_index_offset: None,
1177 offset_index_length: None,
1178 column_index_offset: None,
1179 column_index_length: None,
1180 unencoded_byte_array_data_bytes: None,
1181 repetition_level_histogram: None,
1182 definition_level_histogram: None,
1183 #[cfg(feature = "encryption")]
1184 column_crypto_metadata: None,
1185 #[cfg(feature = "encryption")]
1186 encrypted_column_metadata: None,
1187 })
1188 }
1189
1190 pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self {
1192 self.0.encodings = EncodingMask::new_from_encodings(encodings.iter());
1193 self
1194 }
1195
1196 pub fn set_encodings_mask(mut self, encodings: EncodingMask) -> Self {
1198 self.0.encodings = encodings;
1199 self
1200 }
1201
1202 pub fn set_file_path(mut self, value: String) -> Self {
1204 self.0.file_path = Some(value);
1205 self
1206 }
1207
1208 pub fn set_num_values(mut self, value: i64) -> Self {
1210 self.0.num_values = value;
1211 self
1212 }
1213
1214 pub fn set_compression(mut self, value: Compression) -> Self {
1216 self.0.compression = value;
1217 self
1218 }
1219
1220 pub fn set_total_compressed_size(mut self, value: i64) -> Self {
1222 self.0.total_compressed_size = value;
1223 self
1224 }
1225
1226 pub fn set_total_uncompressed_size(mut self, value: i64) -> Self {
1228 self.0.total_uncompressed_size = value;
1229 self
1230 }
1231
1232 pub fn set_data_page_offset(mut self, value: i64) -> Self {
1234 self.0.data_page_offset = value;
1235 self
1236 }
1237
1238 pub fn set_dictionary_page_offset(mut self, value: Option<i64>) -> Self {
1240 self.0.dictionary_page_offset = value;
1241 self
1242 }
1243
1244 pub fn set_index_page_offset(mut self, value: Option<i64>) -> Self {
1246 self.0.index_page_offset = value;
1247 self
1248 }
1249
1250 pub fn set_statistics(mut self, value: Statistics) -> Self {
1252 self.0.statistics = Some(value);
1253 self
1254 }
1255
1256 pub fn set_geo_statistics(mut self, value: Box<geo_statistics::GeospatialStatistics>) -> Self {
1258 self.0.geo_statistics = Some(value);
1259 self
1260 }
1261
1262 pub fn clear_statistics(mut self) -> Self {
1264 self.0.statistics = None;
1265 self
1266 }
1267
1268 pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self {
1270 self.0.encoding_stats = Some(value);
1271 self
1272 }
1273
1274 pub fn clear_page_encoding_stats(mut self) -> Self {
1276 self.0.encoding_stats = None;
1277 self
1278 }
1279
1280 pub fn set_bloom_filter_offset(mut self, value: Option<i64>) -> Self {
1282 self.0.bloom_filter_offset = value;
1283 self
1284 }
1285
1286 pub fn set_bloom_filter_length(mut self, value: Option<i32>) -> Self {
1288 self.0.bloom_filter_length = value;
1289 self
1290 }
1291
1292 pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self {
1294 self.0.offset_index_offset = value;
1295 self
1296 }
1297
1298 pub fn set_offset_index_length(mut self, value: Option<i32>) -> Self {
1300 self.0.offset_index_length = value;
1301 self
1302 }
1303
1304 pub fn set_column_index_offset(mut self, value: Option<i64>) -> Self {
1306 self.0.column_index_offset = value;
1307 self
1308 }
1309
1310 pub fn set_column_index_length(mut self, value: Option<i32>) -> Self {
1312 self.0.column_index_length = value;
1313 self
1314 }
1315
1316 pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option<i64>) -> Self {
1318 self.0.unencoded_byte_array_data_bytes = value;
1319 self
1320 }
1321
1322 pub fn set_repetition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
1324 self.0.repetition_level_histogram = value;
1325 self
1326 }
1327
1328 pub fn set_definition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
1330 self.0.definition_level_histogram = value;
1331 self
1332 }
1333
1334 #[cfg(feature = "encryption")]
1335 pub fn set_column_crypto_metadata(mut self, value: Option<ColumnCryptoMetaData>) -> Self {
1337 self.0.column_crypto_metadata = value.map(Box::new);
1338 self
1339 }
1340
1341 #[cfg(feature = "encryption")]
1342 pub fn set_encrypted_column_metadata(mut self, value: Option<Vec<u8>>) -> Self {
1344 self.0.encrypted_column_metadata = value;
1345 self
1346 }
1347
1348 pub fn build(self) -> Result<ColumnChunkMetaData> {
1350 Ok(self.0)
1351 }
1352}
1353
1354pub struct ColumnIndexBuilder {
1359 column_type: Type,
1360 null_pages: Vec<bool>,
1361 min_values: Vec<Vec<u8>>,
1362 max_values: Vec<Vec<u8>>,
1363 null_counts: Vec<i64>,
1364 boundary_order: BoundaryOrder,
1365 repetition_level_histograms: Option<Vec<i64>>,
1367 definition_level_histograms: Option<Vec<i64>>,
1369 valid: bool,
1377}
1378
1379impl ColumnIndexBuilder {
1380 pub fn new(column_type: Type) -> Self {
1382 ColumnIndexBuilder {
1383 column_type,
1384 null_pages: Vec::new(),
1385 min_values: Vec::new(),
1386 max_values: Vec::new(),
1387 null_counts: Vec::new(),
1388 boundary_order: BoundaryOrder::UNORDERED,
1389 repetition_level_histograms: None,
1390 definition_level_histograms: None,
1391 valid: true,
1392 }
1393 }
1394
1395 pub fn append(
1397 &mut self,
1398 null_page: bool,
1399 min_value: Vec<u8>,
1400 max_value: Vec<u8>,
1401 null_count: i64,
1402 ) {
1403 self.null_pages.push(null_page);
1404 self.min_values.push(min_value);
1405 self.max_values.push(max_value);
1406 self.null_counts.push(null_count);
1407 }
1408
1409 pub fn append_histograms(
1414 &mut self,
1415 repetition_level_histogram: &Option<LevelHistogram>,
1416 definition_level_histogram: &Option<LevelHistogram>,
1417 ) {
1418 if !self.valid {
1419 return;
1420 }
1421 if let Some(rep_lvl_hist) = repetition_level_histogram {
1422 let hist = self.repetition_level_histograms.get_or_insert(Vec::new());
1423 hist.reserve(rep_lvl_hist.len());
1424 hist.extend(rep_lvl_hist.values());
1425 }
1426 if let Some(def_lvl_hist) = definition_level_histogram {
1427 let hist = self.definition_level_histograms.get_or_insert(Vec::new());
1428 hist.reserve(def_lvl_hist.len());
1429 hist.extend(def_lvl_hist.values());
1430 }
1431 }
1432
1433 pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
1435 self.boundary_order = boundary_order;
1436 }
1437
1438 pub fn to_invalid(&mut self) {
1440 self.valid = false;
1441 }
1442
1443 pub fn valid(&self) -> bool {
1445 self.valid
1446 }
1447
1448 pub fn build(self) -> Result<ColumnIndexMetaData> {
1452 Ok(match self.column_type {
1453 Type::BOOLEAN => {
1454 let index = self.build_page_index()?;
1455 ColumnIndexMetaData::BOOLEAN(index)
1456 }
1457 Type::INT32 => {
1458 let index = self.build_page_index()?;
1459 ColumnIndexMetaData::INT32(index)
1460 }
1461 Type::INT64 => {
1462 let index = self.build_page_index()?;
1463 ColumnIndexMetaData::INT64(index)
1464 }
1465 Type::INT96 => {
1466 let index = self.build_page_index()?;
1467 ColumnIndexMetaData::INT96(index)
1468 }
1469 Type::FLOAT => {
1470 let index = self.build_page_index()?;
1471 ColumnIndexMetaData::FLOAT(index)
1472 }
1473 Type::DOUBLE => {
1474 let index = self.build_page_index()?;
1475 ColumnIndexMetaData::DOUBLE(index)
1476 }
1477 Type::BYTE_ARRAY => {
1478 let index = self.build_byte_array_index()?;
1479 ColumnIndexMetaData::BYTE_ARRAY(index)
1480 }
1481 Type::FIXED_LEN_BYTE_ARRAY => {
1482 let index = self.build_byte_array_index()?;
1483 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
1484 }
1485 })
1486 }
1487
1488 fn build_page_index<T>(self) -> Result<PrimitiveColumnIndex<T>>
1489 where
1490 T: ParquetValueType,
1491 {
1492 let min_values: Vec<&[u8]> = self.min_values.iter().map(|v| v.as_slice()).collect();
1493 let max_values: Vec<&[u8]> = self.max_values.iter().map(|v| v.as_slice()).collect();
1494
1495 PrimitiveColumnIndex::try_new(
1496 self.null_pages,
1497 self.boundary_order,
1498 Some(self.null_counts),
1499 self.repetition_level_histograms,
1500 self.definition_level_histograms,
1501 min_values,
1502 max_values,
1503 )
1504 }
1505
1506 fn build_byte_array_index(self) -> Result<ByteArrayColumnIndex> {
1507 let min_values: Vec<&[u8]> = self.min_values.iter().map(|v| v.as_slice()).collect();
1508 let max_values: Vec<&[u8]> = self.max_values.iter().map(|v| v.as_slice()).collect();
1509
1510 ByteArrayColumnIndex::try_new(
1511 self.null_pages,
1512 self.boundary_order,
1513 Some(self.null_counts),
1514 self.repetition_level_histograms,
1515 self.definition_level_histograms,
1516 min_values,
1517 max_values,
1518 )
1519 }
1520}
1521
1522impl From<ColumnChunkMetaData> for ColumnChunkMetaDataBuilder {
1523 fn from(value: ColumnChunkMetaData) -> Self {
1524 ColumnChunkMetaDataBuilder(value)
1525 }
1526}
1527
1528pub struct OffsetIndexBuilder {
1532 offset_array: Vec<i64>,
1533 compressed_page_size_array: Vec<i32>,
1534 first_row_index_array: Vec<i64>,
1535 unencoded_byte_array_data_bytes_array: Option<Vec<i64>>,
1536 current_first_row_index: i64,
1537}
1538
1539impl Default for OffsetIndexBuilder {
1540 fn default() -> Self {
1541 Self::new()
1542 }
1543}
1544
1545impl OffsetIndexBuilder {
1546 pub fn new() -> Self {
1548 OffsetIndexBuilder {
1549 offset_array: Vec::new(),
1550 compressed_page_size_array: Vec::new(),
1551 first_row_index_array: Vec::new(),
1552 unencoded_byte_array_data_bytes_array: None,
1553 current_first_row_index: 0,
1554 }
1555 }
1556
1557 pub fn append_row_count(&mut self, row_count: i64) {
1559 let current_page_row_index = self.current_first_row_index;
1560 self.first_row_index_array.push(current_page_row_index);
1561 self.current_first_row_index += row_count;
1562 }
1563
1564 pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) {
1566 self.offset_array.push(offset);
1567 self.compressed_page_size_array.push(compressed_page_size);
1568 }
1569
1570 pub fn append_unencoded_byte_array_data_bytes(
1572 &mut self,
1573 unencoded_byte_array_data_bytes: Option<i64>,
1574 ) {
1575 if let Some(val) = unencoded_byte_array_data_bytes {
1576 self.unencoded_byte_array_data_bytes_array
1577 .get_or_insert(Vec::new())
1578 .push(val);
1579 }
1580 }
1581
1582 pub fn build(self) -> OffsetIndexMetaData {
1584 let locations = self
1585 .offset_array
1586 .iter()
1587 .zip(self.compressed_page_size_array.iter())
1588 .zip(self.first_row_index_array.iter())
1589 .map(|((offset, size), row_index)| PageLocation {
1590 offset: *offset,
1591 compressed_page_size: *size,
1592 first_row_index: *row_index,
1593 })
1594 .collect::<Vec<_>>();
1595 OffsetIndexMetaData {
1596 page_locations: locations,
1597 unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes_array,
1598 }
1599 }
1600}
1601
1602#[cfg(test)]
1603mod tests {
1604 use super::*;
1605 use crate::basic::{PageType, SortOrder};
1606 use crate::file::metadata::thrift::tests::{read_column_chunk, read_row_group};
1607
1608 #[test]
1609 fn test_row_group_metadata_thrift_conversion() {
1610 let schema_descr = get_test_schema_descr();
1611
1612 let mut columns = vec![];
1613 for ptr in schema_descr.columns() {
1614 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
1615 columns.push(column);
1616 }
1617 let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
1618 .set_num_rows(1000)
1619 .set_total_byte_size(2000)
1620 .set_column_metadata(columns)
1621 .set_ordinal(1)
1622 .build()
1623 .unwrap();
1624
1625 let mut buf = Vec::new();
1626 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1627 row_group_meta.write_thrift(&mut writer).unwrap();
1628
1629 let row_group_res = read_row_group(&mut buf, schema_descr).unwrap();
1630
1631 assert_eq!(row_group_res, row_group_meta);
1632 }
1633
1634 #[test]
1635 fn test_row_group_metadata_thrift_conversion_empty() {
1636 let schema_descr = get_test_schema_descr();
1637
1638 let row_group_meta = RowGroupMetaData::builder(schema_descr).build();
1639
1640 assert!(row_group_meta.is_err());
1641 if let Err(e) = row_group_meta {
1642 assert_eq!(
1643 format!("{e}"),
1644 "Parquet error: Column length mismatch: 2 != 0"
1645 );
1646 }
1647 }
1648
1649 #[test]
1651 fn test_row_group_metadata_thrift_corrupted() {
1652 let schema_descr_2cols = Arc::new(SchemaDescriptor::new(Arc::new(
1653 SchemaType::group_type_builder("schema")
1654 .with_fields(vec![
1655 Arc::new(
1656 SchemaType::primitive_type_builder("a", Type::INT32)
1657 .build()
1658 .unwrap(),
1659 ),
1660 Arc::new(
1661 SchemaType::primitive_type_builder("b", Type::INT32)
1662 .build()
1663 .unwrap(),
1664 ),
1665 ])
1666 .build()
1667 .unwrap(),
1668 )));
1669
1670 let schema_descr_3cols = Arc::new(SchemaDescriptor::new(Arc::new(
1671 SchemaType::group_type_builder("schema")
1672 .with_fields(vec![
1673 Arc::new(
1674 SchemaType::primitive_type_builder("a", Type::INT32)
1675 .build()
1676 .unwrap(),
1677 ),
1678 Arc::new(
1679 SchemaType::primitive_type_builder("b", Type::INT32)
1680 .build()
1681 .unwrap(),
1682 ),
1683 Arc::new(
1684 SchemaType::primitive_type_builder("c", Type::INT32)
1685 .build()
1686 .unwrap(),
1687 ),
1688 ])
1689 .build()
1690 .unwrap(),
1691 )));
1692
1693 let row_group_meta_2cols = RowGroupMetaData::builder(schema_descr_2cols.clone())
1694 .set_num_rows(1000)
1695 .set_total_byte_size(2000)
1696 .set_column_metadata(vec![
1697 ColumnChunkMetaData::builder(schema_descr_2cols.column(0))
1698 .build()
1699 .unwrap(),
1700 ColumnChunkMetaData::builder(schema_descr_2cols.column(1))
1701 .build()
1702 .unwrap(),
1703 ])
1704 .set_ordinal(1)
1705 .build()
1706 .unwrap();
1707 let mut buf = Vec::new();
1708 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1709 row_group_meta_2cols.write_thrift(&mut writer).unwrap();
1710
1711 let err = read_row_group(&mut buf, schema_descr_3cols)
1712 .unwrap_err()
1713 .to_string();
1714 assert_eq!(
1715 err,
1716 "Parquet error: Column count mismatch. Schema has 3 columns while Row Group has 2"
1717 );
1718 }
1719
1720 #[test]
1721 fn test_column_chunk_metadata_thrift_conversion() {
1722 let column_descr = get_test_schema_descr().column(0);
1723 let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
1724 .set_encodings_mask(EncodingMask::new_from_encodings(
1725 [Encoding::PLAIN, Encoding::RLE].iter(),
1726 ))
1727 .set_file_path("file_path".to_owned())
1728 .set_num_values(1000)
1729 .set_compression(Compression::SNAPPY)
1730 .set_total_compressed_size(2000)
1731 .set_total_uncompressed_size(3000)
1732 .set_data_page_offset(4000)
1733 .set_dictionary_page_offset(Some(5000))
1734 .set_page_encoding_stats(vec![
1735 PageEncodingStats {
1736 page_type: PageType::DATA_PAGE,
1737 encoding: Encoding::PLAIN,
1738 count: 3,
1739 },
1740 PageEncodingStats {
1741 page_type: PageType::DATA_PAGE,
1742 encoding: Encoding::RLE,
1743 count: 5,
1744 },
1745 ])
1746 .set_bloom_filter_offset(Some(6000))
1747 .set_bloom_filter_length(Some(25))
1748 .set_offset_index_offset(Some(7000))
1749 .set_offset_index_length(Some(25))
1750 .set_column_index_offset(Some(8000))
1751 .set_column_index_length(Some(25))
1752 .set_unencoded_byte_array_data_bytes(Some(2000))
1753 .set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
1754 .set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
1755 .build()
1756 .unwrap();
1757
1758 let mut buf = Vec::new();
1759 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1760 col_metadata.write_thrift(&mut writer).unwrap();
1761 let col_chunk_res = read_column_chunk(&mut buf, column_descr).unwrap();
1762
1763 assert_eq!(col_chunk_res, col_metadata);
1764 }
1765
1766 #[test]
1767 fn test_column_chunk_metadata_thrift_conversion_empty() {
1768 let column_descr = get_test_schema_descr().column(0);
1769
1770 let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
1771 .build()
1772 .unwrap();
1773
1774 let mut buf = Vec::new();
1775 let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
1776 col_metadata.write_thrift(&mut writer).unwrap();
1777 let col_chunk_res = read_column_chunk(&mut buf, column_descr).unwrap();
1778
1779 assert_eq!(col_chunk_res, col_metadata);
1780 }
1781
1782 #[test]
1783 fn test_compressed_size() {
1784 let schema_descr = get_test_schema_descr();
1785
1786 let mut columns = vec![];
1787 for column_descr in schema_descr.columns() {
1788 let column = ColumnChunkMetaData::builder(column_descr.clone())
1789 .set_total_compressed_size(500)
1790 .set_total_uncompressed_size(700)
1791 .build()
1792 .unwrap();
1793 columns.push(column);
1794 }
1795 let row_group_meta = RowGroupMetaData::builder(schema_descr)
1796 .set_num_rows(1000)
1797 .set_column_metadata(columns)
1798 .build()
1799 .unwrap();
1800
1801 let compressed_size_res: i64 = row_group_meta.compressed_size();
1802 let compressed_size_exp: i64 = 1000;
1803
1804 assert_eq!(compressed_size_res, compressed_size_exp);
1805 }
1806
1807 #[test]
1808 fn test_memory_size() {
1809 let schema_descr = get_test_schema_descr();
1810
1811 let columns = schema_descr
1812 .columns()
1813 .iter()
1814 .map(|column_descr| {
1815 ColumnChunkMetaData::builder(column_descr.clone())
1816 .set_statistics(Statistics::new::<i32>(None, None, None, None, false))
1817 .build()
1818 })
1819 .collect::<Result<Vec<_>>>()
1820 .unwrap();
1821 let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
1822 .set_num_rows(1000)
1823 .set_column_metadata(columns)
1824 .build()
1825 .unwrap();
1826 let row_group_meta = vec![row_group_meta];
1827
1828 let version = 2;
1829 let num_rows = 1000;
1830 let created_by = Some(String::from("test harness"));
1831 let key_value_metadata = Some(vec![KeyValue::new(
1832 String::from("Foo"),
1833 Some(String::from("bar")),
1834 )]);
1835 let column_orders = Some(vec![
1836 ColumnOrder::UNDEFINED,
1837 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1838 ]);
1839 let file_metadata = FileMetaData::new(
1840 version,
1841 num_rows,
1842 created_by,
1843 key_value_metadata,
1844 schema_descr.clone(),
1845 column_orders,
1846 );
1847
1848 let columns_with_stats = schema_descr
1850 .columns()
1851 .iter()
1852 .map(|column_descr| {
1853 ColumnChunkMetaData::builder(column_descr.clone())
1854 .set_statistics(Statistics::new::<i32>(
1855 Some(0),
1856 Some(100),
1857 None,
1858 None,
1859 false,
1860 ))
1861 .build()
1862 })
1863 .collect::<Result<Vec<_>>>()
1864 .unwrap();
1865
1866 let row_group_meta_with_stats = RowGroupMetaData::builder(schema_descr)
1867 .set_num_rows(1000)
1868 .set_column_metadata(columns_with_stats)
1869 .build()
1870 .unwrap();
1871 let row_group_meta_with_stats = vec![row_group_meta_with_stats];
1872
1873 let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
1874 .set_row_groups(row_group_meta_with_stats)
1875 .build();
1876
1877 #[cfg(not(feature = "encryption"))]
1878 let base_expected_size = 2248;
1879 #[cfg(feature = "encryption")]
1880 let base_expected_size = 2416;
1882
1883 assert_eq!(parquet_meta.memory_size(), base_expected_size);
1884
1885 let mut column_index = ColumnIndexBuilder::new(Type::BOOLEAN);
1886 column_index.append(false, vec![1u8], vec![2u8, 3u8], 4);
1887 let column_index = column_index.build().unwrap();
1888 let native_index = match column_index {
1889 ColumnIndexMetaData::BOOLEAN(index) => index,
1890 _ => panic!("wrong type of column index"),
1891 };
1892
1893 let mut offset_index = OffsetIndexBuilder::new();
1895 offset_index.append_row_count(1);
1896 offset_index.append_offset_and_size(2, 3);
1897 offset_index.append_unencoded_byte_array_data_bytes(Some(10));
1898 offset_index.append_row_count(1);
1899 offset_index.append_offset_and_size(2, 3);
1900 offset_index.append_unencoded_byte_array_data_bytes(Some(10));
1901 let offset_index = offset_index.build();
1902
1903 let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
1904 .set_row_groups(row_group_meta)
1905 .set_column_index(Some(vec![vec![ColumnIndexMetaData::BOOLEAN(native_index)]]))
1906 .set_offset_index(Some(vec![vec![offset_index]]))
1907 .build();
1908
1909 #[cfg(not(feature = "encryption"))]
1910 let bigger_expected_size = 2674;
1911 #[cfg(feature = "encryption")]
1912 let bigger_expected_size = 2842;
1914
1915 assert!(bigger_expected_size > base_expected_size);
1917 assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
1918 }
1919
1920 fn get_test_schema_descr() -> SchemaDescPtr {
1922 let schema = SchemaType::group_type_builder("schema")
1923 .with_fields(vec![
1924 Arc::new(
1925 SchemaType::primitive_type_builder("a", Type::INT32)
1926 .build()
1927 .unwrap(),
1928 ),
1929 Arc::new(
1930 SchemaType::primitive_type_builder("b", Type::INT32)
1931 .build()
1932 .unwrap(),
1933 ),
1934 ])
1935 .build()
1936 .unwrap();
1937
1938 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1939 }
1940}