1mod memory;
95pub(crate) mod reader;
96mod writer;
97
98use std::ops::Range;
99use std::sync::Arc;
100
101use crate::format::{
102 BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup,
103 SizeStatistics, SortingColumn,
104};
105
106use crate::basic::{ColumnOrder, Compression, Encoding, Type};
107use crate::errors::{ParquetError, Result};
108pub(crate) use crate::file::metadata::memory::HeapSize;
109use crate::file::page_encoding_stats::{self, PageEncodingStats};
110use crate::file::page_index::index::Index;
111use crate::file::page_index::offset_index::OffsetIndexMetaData;
112use crate::file::statistics::{self, Statistics};
113use crate::schema::types::{
114 ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor,
115 Type as SchemaType,
116};
117pub use reader::ParquetMetaDataReader;
118pub use writer::ParquetMetaDataWriter;
119pub(crate) use writer::ThriftMetadataWriter;
120
121pub type ParquetColumnIndex = Vec<Vec<Index>>;
137
138pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
149
150#[derive(Debug, Clone, PartialEq)]
168pub struct ParquetMetaData {
169 file_metadata: FileMetaData,
171 row_groups: Vec<RowGroupMetaData>,
173 column_index: Option<ParquetColumnIndex>,
175 offset_index: Option<ParquetOffsetIndex>,
177}
178
179impl ParquetMetaData {
180 pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self {
183 ParquetMetaData {
184 file_metadata,
185 row_groups,
186 column_index: None,
187 offset_index: None,
188 }
189 }
190
191 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataBuilder")]
194 pub fn new_with_page_index(
195 file_metadata: FileMetaData,
196 row_groups: Vec<RowGroupMetaData>,
197 column_index: Option<ParquetColumnIndex>,
198 offset_index: Option<ParquetOffsetIndex>,
199 ) -> Self {
200 ParquetMetaDataBuilder::new(file_metadata)
201 .set_row_groups(row_groups)
202 .set_column_index(column_index)
203 .set_offset_index(offset_index)
204 .build()
205 }
206
207 pub fn into_builder(self) -> ParquetMetaDataBuilder {
209 self.into()
210 }
211
212 pub fn file_metadata(&self) -> &FileMetaData {
214 &self.file_metadata
215 }
216
217 pub fn num_row_groups(&self) -> usize {
219 self.row_groups.len()
220 }
221
222 pub fn row_group(&self, i: usize) -> &RowGroupMetaData {
225 &self.row_groups[i]
226 }
227
228 pub fn row_groups(&self) -> &[RowGroupMetaData] {
230 &self.row_groups
231 }
232
233 pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
240 self.column_index.as_ref()
241 }
242
243 pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
250 self.offset_index.as_ref()
251 }
252
253 pub fn memory_size(&self) -> usize {
268 std::mem::size_of::<Self>()
269 + self.file_metadata.heap_size()
270 + self.row_groups.heap_size()
271 + self.column_index.heap_size()
272 + self.offset_index.heap_size()
273 }
274
275 pub(crate) fn set_column_index(&mut self, index: Option<ParquetColumnIndex>) {
277 self.column_index = index;
278 }
279
280 pub(crate) fn set_offset_index(&mut self, index: Option<ParquetOffsetIndex>) {
282 self.offset_index = index;
283 }
284}
285
286pub struct ParquetMetaDataBuilder(ParquetMetaData);
324
325impl ParquetMetaDataBuilder {
326 pub fn new(file_meta_data: FileMetaData) -> Self {
328 Self(ParquetMetaData::new(file_meta_data, vec![]))
329 }
330
331 pub fn new_from_metadata(metadata: ParquetMetaData) -> Self {
333 Self(metadata)
334 }
335
336 pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self {
338 self.0.row_groups.push(row_group);
339 self
340 }
341
342 pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self {
344 self.0.row_groups = row_groups;
345 self
346 }
347
348 pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> {
354 std::mem::take(&mut self.0.row_groups)
355 }
356
357 pub fn row_groups(&self) -> &[RowGroupMetaData] {
359 &self.0.row_groups
360 }
361
362 pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self {
364 self.0.column_index = column_index;
365 self
366 }
367
368 pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> {
370 std::mem::take(&mut self.0.column_index)
371 }
372
373 pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
375 self.0.column_index.as_ref()
376 }
377
378 pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self {
380 self.0.offset_index = offset_index;
381 self
382 }
383
384 pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> {
386 std::mem::take(&mut self.0.offset_index)
387 }
388
389 pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
391 self.0.offset_index.as_ref()
392 }
393
394 pub fn build(self) -> ParquetMetaData {
396 let Self(metadata) = self;
397 metadata
398 }
399}
400
401impl From<ParquetMetaData> for ParquetMetaDataBuilder {
402 fn from(meta_data: ParquetMetaData) -> Self {
403 Self(meta_data)
404 }
405}
406
407pub type KeyValue = crate::format::KeyValue;
409
410pub type FileMetaDataPtr = Arc<FileMetaData>;
412
413#[derive(Debug, Clone, PartialEq)]
417pub struct FileMetaData {
418 version: i32,
419 num_rows: i64,
420 created_by: Option<String>,
421 key_value_metadata: Option<Vec<KeyValue>>,
422 schema_descr: SchemaDescPtr,
423 column_orders: Option<Vec<ColumnOrder>>,
424}
425
426impl FileMetaData {
427 pub fn new(
429 version: i32,
430 num_rows: i64,
431 created_by: Option<String>,
432 key_value_metadata: Option<Vec<KeyValue>>,
433 schema_descr: SchemaDescPtr,
434 column_orders: Option<Vec<ColumnOrder>>,
435 ) -> Self {
436 FileMetaData {
437 version,
438 num_rows,
439 created_by,
440 key_value_metadata,
441 schema_descr,
442 column_orders,
443 }
444 }
445
446 pub fn version(&self) -> i32 {
448 self.version
449 }
450
451 pub fn num_rows(&self) -> i64 {
453 self.num_rows
454 }
455
456 pub fn created_by(&self) -> Option<&str> {
465 self.created_by.as_deref()
466 }
467
468 pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
470 self.key_value_metadata.as_ref()
471 }
472
473 pub fn schema(&self) -> &SchemaType {
477 self.schema_descr.root_schema()
478 }
479
480 pub fn schema_descr(&self) -> &SchemaDescriptor {
482 &self.schema_descr
483 }
484
485 pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
487 self.schema_descr.clone()
488 }
489
490 pub fn column_orders(&self) -> Option<&Vec<ColumnOrder>> {
498 self.column_orders.as_ref()
499 }
500
501 pub fn column_order(&self, i: usize) -> ColumnOrder {
504 self.column_orders
505 .as_ref()
506 .map(|data| data[i])
507 .unwrap_or(ColumnOrder::UNDEFINED)
508 }
509}
510
511pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
513
514#[derive(Debug, Clone, PartialEq)]
519pub struct RowGroupMetaData {
520 columns: Vec<ColumnChunkMetaData>,
521 num_rows: i64,
522 sorting_columns: Option<Vec<SortingColumn>>,
523 total_byte_size: i64,
524 schema_descr: SchemaDescPtr,
525 file_offset: Option<i64>,
527 ordinal: Option<i16>,
529}
530
531impl RowGroupMetaData {
532 pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
534 RowGroupMetaDataBuilder::new(schema_descr)
535 }
536
537 pub fn num_columns(&self) -> usize {
539 self.columns.len()
540 }
541
542 pub fn column(&self, i: usize) -> &ColumnChunkMetaData {
544 &self.columns[i]
545 }
546
547 pub fn columns(&self) -> &[ColumnChunkMetaData] {
549 &self.columns
550 }
551
552 pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
554 &mut self.columns
555 }
556
557 pub fn num_rows(&self) -> i64 {
559 self.num_rows
560 }
561
562 pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
564 self.sorting_columns.as_ref()
565 }
566
567 pub fn total_byte_size(&self) -> i64 {
569 self.total_byte_size
570 }
571
572 pub fn compressed_size(&self) -> i64 {
574 self.columns.iter().map(|c| c.total_compressed_size).sum()
575 }
576
577 pub fn schema_descr(&self) -> &SchemaDescriptor {
579 self.schema_descr.as_ref()
580 }
581
582 pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
584 self.schema_descr.clone()
585 }
586
587 #[inline(always)]
592 pub fn ordinal(&self) -> Option<i16> {
593 self.ordinal
594 }
595
596 #[inline(always)]
598 pub fn file_offset(&self) -> Option<i64> {
599 self.file_offset
600 }
601
602 pub fn from_thrift(schema_descr: SchemaDescPtr, mut rg: RowGroup) -> Result<RowGroupMetaData> {
604 if schema_descr.num_columns() != rg.columns.len() {
605 return Err(general_err!(
606 "Column count mismatch. Schema has {} columns while Row Group has {}",
607 schema_descr.num_columns(),
608 rg.columns.len()
609 ));
610 }
611 let total_byte_size = rg.total_byte_size;
612 let num_rows = rg.num_rows;
613 let mut columns = vec![];
614 for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) {
615 let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
616 columns.push(cc);
617 }
618 let sorting_columns = rg.sorting_columns;
619 Ok(RowGroupMetaData {
620 columns,
621 num_rows,
622 sorting_columns,
623 total_byte_size,
624 schema_descr,
625 file_offset: rg.file_offset,
626 ordinal: rg.ordinal,
627 })
628 }
629
630 pub fn to_thrift(&self) -> RowGroup {
632 RowGroup {
633 columns: self.columns().iter().map(|v| v.to_thrift()).collect(),
634 total_byte_size: self.total_byte_size,
635 num_rows: self.num_rows,
636 sorting_columns: self.sorting_columns().cloned(),
637 file_offset: self.file_offset(),
638 total_compressed_size: Some(self.compressed_size()),
639 ordinal: self.ordinal,
640 }
641 }
642
643 pub fn into_builder(self) -> RowGroupMetaDataBuilder {
645 RowGroupMetaDataBuilder(self)
646 }
647}
648
649pub struct RowGroupMetaDataBuilder(RowGroupMetaData);
651
652impl RowGroupMetaDataBuilder {
653 fn new(schema_descr: SchemaDescPtr) -> Self {
655 Self(RowGroupMetaData {
656 columns: Vec::with_capacity(schema_descr.num_columns()),
657 schema_descr,
658 file_offset: None,
659 num_rows: 0,
660 sorting_columns: None,
661 total_byte_size: 0,
662 ordinal: None,
663 })
664 }
665
666 pub fn set_num_rows(mut self, value: i64) -> Self {
668 self.0.num_rows = value;
669 self
670 }
671
672 pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
674 self.0.sorting_columns = value;
675 self
676 }
677
678 pub fn set_total_byte_size(mut self, value: i64) -> Self {
680 self.0.total_byte_size = value;
681 self
682 }
683
684 pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> {
690 std::mem::take(&mut self.0.columns)
691 }
692
693 pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
695 self.0.columns = value;
696 self
697 }
698
699 pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self {
701 self.0.columns.push(value);
702 self
703 }
704
705 pub fn set_ordinal(mut self, value: i16) -> Self {
707 self.0.ordinal = Some(value);
708 self
709 }
710
711 pub fn set_file_offset(mut self, value: i64) -> Self {
713 self.0.file_offset = Some(value);
714 self
715 }
716
717 pub fn build(self) -> Result<RowGroupMetaData> {
719 if self.0.schema_descr.num_columns() != self.0.columns.len() {
720 return Err(general_err!(
721 "Column length mismatch: {} != {}",
722 self.0.schema_descr.num_columns(),
723 self.0.columns.len()
724 ));
725 }
726
727 Ok(self.0)
728 }
729}
730
731#[derive(Debug, Clone, PartialEq)]
733pub struct ColumnChunkMetaData {
734 column_descr: ColumnDescPtr,
735 encodings: Vec<Encoding>,
736 file_path: Option<String>,
737 file_offset: i64,
738 num_values: i64,
739 compression: Compression,
740 total_compressed_size: i64,
741 total_uncompressed_size: i64,
742 data_page_offset: i64,
743 index_page_offset: Option<i64>,
744 dictionary_page_offset: Option<i64>,
745 statistics: Option<Statistics>,
746 encoding_stats: Option<Vec<PageEncodingStats>>,
747 bloom_filter_offset: Option<i64>,
748 bloom_filter_length: Option<i32>,
749 offset_index_offset: Option<i64>,
750 offset_index_length: Option<i32>,
751 column_index_offset: Option<i64>,
752 column_index_length: Option<i32>,
753 unencoded_byte_array_data_bytes: Option<i64>,
754 repetition_level_histogram: Option<LevelHistogram>,
755 definition_level_histogram: Option<LevelHistogram>,
756}
757
758#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
767pub struct LevelHistogram {
768 inner: Vec<i64>,
769}
770
771impl LevelHistogram {
772 pub fn try_new(max_level: i16) -> Option<Self> {
778 if max_level > 0 {
779 Some(Self {
780 inner: vec![0; max_level as usize + 1],
781 })
782 } else {
783 None
784 }
785 }
786 pub fn values(&self) -> &[i64] {
788 &self.inner
789 }
790
791 pub fn into_inner(self) -> Vec<i64> {
793 self.inner
794 }
795
796 pub fn get(&self, index: usize) -> Option<i64> {
803 self.inner.get(index).copied()
804 }
805
806 pub fn add(&mut self, other: &Self) {
811 assert_eq!(self.len(), other.len());
812 for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) {
813 *dst += src;
814 }
815 }
816
817 pub fn len(&self) -> usize {
819 self.inner.len()
820 }
821
822 pub fn is_empty(&self) -> bool {
824 self.inner.is_empty()
825 }
826
827 pub fn reset(&mut self) {
829 for value in self.inner.iter_mut() {
830 *value = 0;
831 }
832 }
833
834 pub fn update_from_levels(&mut self, levels: &[i16]) {
840 for &level in levels {
841 self.inner[level as usize] += 1;
842 }
843 }
844}
845
846impl From<Vec<i64>> for LevelHistogram {
847 fn from(inner: Vec<i64>) -> Self {
848 Self { inner }
849 }
850}
851
852impl From<LevelHistogram> for Vec<i64> {
853 fn from(value: LevelHistogram) -> Self {
854 value.into_inner()
855 }
856}
857
858impl HeapSize for LevelHistogram {
859 fn heap_size(&self) -> usize {
860 self.inner.heap_size()
861 }
862}
863
864impl ColumnChunkMetaData {
866 pub fn builder(column_descr: ColumnDescPtr) -> ColumnChunkMetaDataBuilder {
868 ColumnChunkMetaDataBuilder::new(column_descr)
869 }
870
871 pub fn file_path(&self) -> Option<&str> {
876 self.file_path.as_deref()
877 }
878
879 pub fn file_offset(&self) -> i64 {
886 self.file_offset
887 }
888
889 pub fn column_type(&self) -> Type {
891 self.column_descr.physical_type()
892 }
893
894 pub fn column_path(&self) -> &ColumnPath {
896 self.column_descr.path()
897 }
898
899 pub fn column_descr(&self) -> &ColumnDescriptor {
901 self.column_descr.as_ref()
902 }
903
904 pub fn column_descr_ptr(&self) -> ColumnDescPtr {
906 self.column_descr.clone()
907 }
908
909 pub fn encodings(&self) -> &Vec<Encoding> {
911 &self.encodings
912 }
913
914 pub fn num_values(&self) -> i64 {
916 self.num_values
917 }
918
919 pub fn compression(&self) -> Compression {
921 self.compression
922 }
923
924 pub fn compressed_size(&self) -> i64 {
926 self.total_compressed_size
927 }
928
929 pub fn uncompressed_size(&self) -> i64 {
931 self.total_uncompressed_size
932 }
933
934 pub fn data_page_offset(&self) -> i64 {
936 self.data_page_offset
937 }
938
939 pub fn index_page_offset(&self) -> Option<i64> {
941 self.index_page_offset
942 }
943
944 pub fn dictionary_page_offset(&self) -> Option<i64> {
946 self.dictionary_page_offset
947 }
948
949 pub fn byte_range(&self) -> (u64, u64) {
951 let col_start = match self.dictionary_page_offset() {
952 Some(dictionary_page_offset) => dictionary_page_offset,
953 None => self.data_page_offset(),
954 };
955 let col_len = self.compressed_size();
956 assert!(
957 col_start >= 0 && col_len >= 0,
958 "column start and length should not be negative"
959 );
960 (col_start as u64, col_len as u64)
961 }
962
963 pub fn statistics(&self) -> Option<&Statistics> {
966 self.statistics.as_ref()
967 }
968
969 pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
972 self.encoding_stats.as_ref()
973 }
974
975 pub fn bloom_filter_offset(&self) -> Option<i64> {
977 self.bloom_filter_offset
978 }
979
980 pub fn bloom_filter_length(&self) -> Option<i32> {
982 self.bloom_filter_length
983 }
984
985 pub fn column_index_offset(&self) -> Option<i64> {
987 self.column_index_offset
988 }
989
990 pub fn column_index_length(&self) -> Option<i32> {
992 self.column_index_length
993 }
994
995 pub(crate) fn column_index_range(&self) -> Option<Range<usize>> {
997 let offset = usize::try_from(self.column_index_offset?).ok()?;
998 let length = usize::try_from(self.column_index_length?).ok()?;
999 Some(offset..(offset + length))
1000 }
1001
1002 pub fn offset_index_offset(&self) -> Option<i64> {
1004 self.offset_index_offset
1005 }
1006
1007 pub fn offset_index_length(&self) -> Option<i32> {
1009 self.offset_index_length
1010 }
1011
1012 pub(crate) fn offset_index_range(&self) -> Option<Range<usize>> {
1014 let offset = usize::try_from(self.offset_index_offset?).ok()?;
1015 let length = usize::try_from(self.offset_index_length?).ok()?;
1016 Some(offset..(offset + length))
1017 }
1018
1019 pub fn unencoded_byte_array_data_bytes(&self) -> Option<i64> {
1024 self.unencoded_byte_array_data_bytes
1025 }
1026
1027 pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
1033 self.repetition_level_histogram.as_ref()
1034 }
1035
1036 pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
1042 self.definition_level_histogram.as_ref()
1043 }
1044
1045 pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result<Self> {
1047 if cc.meta_data.is_none() {
1048 return Err(general_err!("Expected to have column metadata"));
1049 }
1050 let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap();
1051 let column_type = Type::try_from(col_metadata.type_)?;
1052 let encodings = col_metadata
1053 .encodings
1054 .drain(0..)
1055 .map(Encoding::try_from)
1056 .collect::<Result<_>>()?;
1057 let compression = Compression::try_from(col_metadata.codec)?;
1058 let file_path = cc.file_path;
1059 let file_offset = cc.file_offset;
1060 let num_values = col_metadata.num_values;
1061 let total_compressed_size = col_metadata.total_compressed_size;
1062 let total_uncompressed_size = col_metadata.total_uncompressed_size;
1063 let data_page_offset = col_metadata.data_page_offset;
1064 let index_page_offset = col_metadata.index_page_offset;
1065 let dictionary_page_offset = col_metadata.dictionary_page_offset;
1066 let statistics = statistics::from_thrift(column_type, col_metadata.statistics)?;
1067 let encoding_stats = col_metadata
1068 .encoding_stats
1069 .as_ref()
1070 .map(|vec| {
1071 vec.iter()
1072 .map(page_encoding_stats::try_from_thrift)
1073 .collect::<Result<_>>()
1074 })
1075 .transpose()?;
1076 let bloom_filter_offset = col_metadata.bloom_filter_offset;
1077 let bloom_filter_length = col_metadata.bloom_filter_length;
1078 let offset_index_offset = cc.offset_index_offset;
1079 let offset_index_length = cc.offset_index_length;
1080 let column_index_offset = cc.column_index_offset;
1081 let column_index_length = cc.column_index_length;
1082 let (
1083 unencoded_byte_array_data_bytes,
1084 repetition_level_histogram,
1085 definition_level_histogram,
1086 ) = if let Some(size_stats) = col_metadata.size_statistics {
1087 (
1088 size_stats.unencoded_byte_array_data_bytes,
1089 size_stats.repetition_level_histogram,
1090 size_stats.definition_level_histogram,
1091 )
1092 } else {
1093 (None, None, None)
1094 };
1095
1096 let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from);
1097 let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from);
1098
1099 let result = ColumnChunkMetaData {
1100 column_descr,
1101 encodings,
1102 file_path,
1103 file_offset,
1104 num_values,
1105 compression,
1106 total_compressed_size,
1107 total_uncompressed_size,
1108 data_page_offset,
1109 index_page_offset,
1110 dictionary_page_offset,
1111 statistics,
1112 encoding_stats,
1113 bloom_filter_offset,
1114 bloom_filter_length,
1115 offset_index_offset,
1116 offset_index_length,
1117 column_index_offset,
1118 column_index_length,
1119 unencoded_byte_array_data_bytes,
1120 repetition_level_histogram,
1121 definition_level_histogram,
1122 };
1123 Ok(result)
1124 }
1125
1126 pub fn to_thrift(&self) -> ColumnChunk {
1128 let column_metadata = self.to_column_metadata_thrift();
1129
1130 ColumnChunk {
1131 file_path: self.file_path().map(|s| s.to_owned()),
1132 file_offset: self.file_offset,
1133 meta_data: Some(column_metadata),
1134 offset_index_offset: self.offset_index_offset,
1135 offset_index_length: self.offset_index_length,
1136 column_index_offset: self.column_index_offset,
1137 column_index_length: self.column_index_length,
1138 crypto_metadata: None,
1139 encrypted_column_metadata: None,
1140 }
1141 }
1142
1143 pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
1145 let size_statistics = if self.unencoded_byte_array_data_bytes.is_some()
1146 || self.repetition_level_histogram.is_some()
1147 || self.definition_level_histogram.is_some()
1148 {
1149 let repetition_level_histogram = self
1150 .repetition_level_histogram
1151 .as_ref()
1152 .map(|hist| hist.clone().into_inner());
1153
1154 let definition_level_histogram = self
1155 .definition_level_histogram
1156 .as_ref()
1157 .map(|hist| hist.clone().into_inner());
1158
1159 Some(SizeStatistics {
1160 unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes,
1161 repetition_level_histogram,
1162 definition_level_histogram,
1163 })
1164 } else {
1165 None
1166 };
1167
1168 ColumnMetaData {
1169 type_: self.column_type().into(),
1170 encodings: self.encodings().iter().map(|&v| v.into()).collect(),
1171 path_in_schema: self.column_path().as_ref().to_vec(),
1172 codec: self.compression.into(),
1173 num_values: self.num_values,
1174 total_uncompressed_size: self.total_uncompressed_size,
1175 total_compressed_size: self.total_compressed_size,
1176 key_value_metadata: None,
1177 data_page_offset: self.data_page_offset,
1178 index_page_offset: self.index_page_offset,
1179 dictionary_page_offset: self.dictionary_page_offset,
1180 statistics: statistics::to_thrift(self.statistics.as_ref()),
1181 encoding_stats: self
1182 .encoding_stats
1183 .as_ref()
1184 .map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()),
1185 bloom_filter_offset: self.bloom_filter_offset,
1186 bloom_filter_length: self.bloom_filter_length,
1187 size_statistics,
1188 }
1189 }
1190
1191 pub fn into_builder(self) -> ColumnChunkMetaDataBuilder {
1193 ColumnChunkMetaDataBuilder::from(self)
1194 }
1195}
1196
1197pub struct ColumnChunkMetaDataBuilder(ColumnChunkMetaData);
1216
1217impl ColumnChunkMetaDataBuilder {
1218 fn new(column_descr: ColumnDescPtr) -> Self {
1222 Self(ColumnChunkMetaData {
1223 column_descr,
1224 encodings: Vec::new(),
1225 file_path: None,
1226 file_offset: 0,
1227 num_values: 0,
1228 compression: Compression::UNCOMPRESSED,
1229 total_compressed_size: 0,
1230 total_uncompressed_size: 0,
1231 data_page_offset: 0,
1232 index_page_offset: None,
1233 dictionary_page_offset: None,
1234 statistics: None,
1235 encoding_stats: None,
1236 bloom_filter_offset: None,
1237 bloom_filter_length: None,
1238 offset_index_offset: None,
1239 offset_index_length: None,
1240 column_index_offset: None,
1241 column_index_length: None,
1242 unencoded_byte_array_data_bytes: None,
1243 repetition_level_histogram: None,
1244 definition_level_histogram: None,
1245 })
1246 }
1247
1248 pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self {
1250 self.0.encodings = encodings;
1251 self
1252 }
1253
1254 pub fn set_file_path(mut self, value: String) -> Self {
1256 self.0.file_path = Some(value);
1257 self
1258 }
1259
1260 #[deprecated(
1266 since = "53.0.0",
1267 note = "The Parquet specification requires this field to be 0"
1268 )]
1269 pub fn set_file_offset(mut self, value: i64) -> Self {
1270 self.0.file_offset = value;
1271 self
1272 }
1273
1274 pub fn set_num_values(mut self, value: i64) -> Self {
1276 self.0.num_values = value;
1277 self
1278 }
1279
1280 pub fn set_compression(mut self, value: Compression) -> Self {
1282 self.0.compression = value;
1283 self
1284 }
1285
1286 pub fn set_total_compressed_size(mut self, value: i64) -> Self {
1288 self.0.total_compressed_size = value;
1289 self
1290 }
1291
1292 pub fn set_total_uncompressed_size(mut self, value: i64) -> Self {
1294 self.0.total_uncompressed_size = value;
1295 self
1296 }
1297
1298 pub fn set_data_page_offset(mut self, value: i64) -> Self {
1300 self.0.data_page_offset = value;
1301 self
1302 }
1303
1304 pub fn set_dictionary_page_offset(mut self, value: Option<i64>) -> Self {
1306 self.0.dictionary_page_offset = value;
1307 self
1308 }
1309
1310 pub fn set_index_page_offset(mut self, value: Option<i64>) -> Self {
1312 self.0.index_page_offset = value;
1313 self
1314 }
1315
1316 pub fn set_statistics(mut self, value: Statistics) -> Self {
1318 self.0.statistics = Some(value);
1319 self
1320 }
1321
1322 pub fn clear_statistics(mut self) -> Self {
1324 self.0.statistics = None;
1325 self
1326 }
1327
1328 pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self {
1330 self.0.encoding_stats = Some(value);
1331 self
1332 }
1333
1334 pub fn clear_page_encoding_stats(mut self) -> Self {
1336 self.0.encoding_stats = None;
1337 self
1338 }
1339
1340 pub fn set_bloom_filter_offset(mut self, value: Option<i64>) -> Self {
1342 self.0.bloom_filter_offset = value;
1343 self
1344 }
1345
1346 pub fn set_bloom_filter_length(mut self, value: Option<i32>) -> Self {
1348 self.0.bloom_filter_length = value;
1349 self
1350 }
1351
1352 pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self {
1354 self.0.offset_index_offset = value;
1355 self
1356 }
1357
1358 pub fn set_offset_index_length(mut self, value: Option<i32>) -> Self {
1360 self.0.offset_index_length = value;
1361 self
1362 }
1363
1364 pub fn set_column_index_offset(mut self, value: Option<i64>) -> Self {
1366 self.0.column_index_offset = value;
1367 self
1368 }
1369
1370 pub fn set_column_index_length(mut self, value: Option<i32>) -> Self {
1372 self.0.column_index_length = value;
1373 self
1374 }
1375
1376 pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option<i64>) -> Self {
1378 self.0.unencoded_byte_array_data_bytes = value;
1379 self
1380 }
1381
1382 pub fn set_repetition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
1384 self.0.repetition_level_histogram = value;
1385 self
1386 }
1387
1388 pub fn set_definition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
1390 self.0.definition_level_histogram = value;
1391 self
1392 }
1393
1394 pub fn build(self) -> Result<ColumnChunkMetaData> {
1396 Ok(self.0)
1397 }
1398}
1399
1400pub struct ColumnIndexBuilder {
1404 null_pages: Vec<bool>,
1405 min_values: Vec<Vec<u8>>,
1406 max_values: Vec<Vec<u8>>,
1407 null_counts: Vec<i64>,
1408 boundary_order: BoundaryOrder,
1409 repetition_level_histograms: Option<Vec<i64>>,
1411 definition_level_histograms: Option<Vec<i64>>,
1413 valid: bool,
1421}
1422
1423impl Default for ColumnIndexBuilder {
1424 fn default() -> Self {
1425 Self::new()
1426 }
1427}
1428
1429impl ColumnIndexBuilder {
1430 pub fn new() -> Self {
1432 ColumnIndexBuilder {
1433 null_pages: Vec::new(),
1434 min_values: Vec::new(),
1435 max_values: Vec::new(),
1436 null_counts: Vec::new(),
1437 boundary_order: BoundaryOrder::UNORDERED,
1438 repetition_level_histograms: None,
1439 definition_level_histograms: None,
1440 valid: true,
1441 }
1442 }
1443
1444 pub fn append(
1446 &mut self,
1447 null_page: bool,
1448 min_value: Vec<u8>,
1449 max_value: Vec<u8>,
1450 null_count: i64,
1451 ) {
1452 self.null_pages.push(null_page);
1453 self.min_values.push(min_value);
1454 self.max_values.push(max_value);
1455 self.null_counts.push(null_count);
1456 }
1457
1458 pub fn append_histograms(
1461 &mut self,
1462 repetition_level_histogram: &Option<LevelHistogram>,
1463 definition_level_histogram: &Option<LevelHistogram>,
1464 ) {
1465 if !self.valid {
1466 return;
1467 }
1468 if let Some(ref rep_lvl_hist) = repetition_level_histogram {
1469 let hist = self.repetition_level_histograms.get_or_insert(Vec::new());
1470 hist.reserve(rep_lvl_hist.len());
1471 hist.extend(rep_lvl_hist.values());
1472 }
1473 if let Some(ref def_lvl_hist) = definition_level_histogram {
1474 let hist = self.definition_level_histograms.get_or_insert(Vec::new());
1475 hist.reserve(def_lvl_hist.len());
1476 hist.extend(def_lvl_hist.values());
1477 }
1478 }
1479
1480 pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
1482 self.boundary_order = boundary_order;
1483 }
1484
1485 pub fn to_invalid(&mut self) {
1487 self.valid = false;
1488 }
1489
1490 pub fn valid(&self) -> bool {
1492 self.valid
1493 }
1494
1495 pub fn build_to_thrift(self) -> ColumnIndex {
1499 ColumnIndex::new(
1500 self.null_pages,
1501 self.min_values,
1502 self.max_values,
1503 self.boundary_order,
1504 self.null_counts,
1505 self.repetition_level_histograms,
1506 self.definition_level_histograms,
1507 )
1508 }
1509}
1510
1511impl From<ColumnChunkMetaData> for ColumnChunkMetaDataBuilder {
1512 fn from(value: ColumnChunkMetaData) -> Self {
1513 ColumnChunkMetaDataBuilder(value)
1514 }
1515}
1516
1517pub struct OffsetIndexBuilder {
1521 offset_array: Vec<i64>,
1522 compressed_page_size_array: Vec<i32>,
1523 first_row_index_array: Vec<i64>,
1524 unencoded_byte_array_data_bytes_array: Option<Vec<i64>>,
1525 current_first_row_index: i64,
1526}
1527
1528impl Default for OffsetIndexBuilder {
1529 fn default() -> Self {
1530 Self::new()
1531 }
1532}
1533
1534impl OffsetIndexBuilder {
1535 pub fn new() -> Self {
1537 OffsetIndexBuilder {
1538 offset_array: Vec::new(),
1539 compressed_page_size_array: Vec::new(),
1540 first_row_index_array: Vec::new(),
1541 unencoded_byte_array_data_bytes_array: None,
1542 current_first_row_index: 0,
1543 }
1544 }
1545
1546 pub fn append_row_count(&mut self, row_count: i64) {
1548 let current_page_row_index = self.current_first_row_index;
1549 self.first_row_index_array.push(current_page_row_index);
1550 self.current_first_row_index += row_count;
1551 }
1552
1553 pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) {
1555 self.offset_array.push(offset);
1556 self.compressed_page_size_array.push(compressed_page_size);
1557 }
1558
1559 pub fn append_unencoded_byte_array_data_bytes(
1561 &mut self,
1562 unencoded_byte_array_data_bytes: Option<i64>,
1563 ) {
1564 if let Some(val) = unencoded_byte_array_data_bytes {
1565 self.unencoded_byte_array_data_bytes_array
1566 .get_or_insert(Vec::new())
1567 .push(val);
1568 }
1569 }
1570
1571 pub fn build_to_thrift(self) -> OffsetIndex {
1573 let locations = self
1574 .offset_array
1575 .iter()
1576 .zip(self.compressed_page_size_array.iter())
1577 .zip(self.first_row_index_array.iter())
1578 .map(|((offset, size), row_index)| PageLocation::new(*offset, *size, *row_index))
1579 .collect::<Vec<_>>();
1580 OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array)
1581 }
1582}
1583
1584#[cfg(test)]
1585mod tests {
1586 use super::*;
1587 use crate::basic::{PageType, SortOrder};
1588 use crate::file::page_index::index::NativeIndex;
1589
1590 #[test]
1591 fn test_row_group_metadata_thrift_conversion() {
1592 let schema_descr = get_test_schema_descr();
1593
1594 let mut columns = vec![];
1595 for ptr in schema_descr.columns() {
1596 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
1597 columns.push(column);
1598 }
1599 let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
1600 .set_num_rows(1000)
1601 .set_total_byte_size(2000)
1602 .set_column_metadata(columns)
1603 .set_ordinal(1)
1604 .build()
1605 .unwrap();
1606
1607 let row_group_exp = row_group_meta.to_thrift();
1608 let row_group_res = RowGroupMetaData::from_thrift(schema_descr, row_group_exp.clone())
1609 .unwrap()
1610 .to_thrift();
1611
1612 assert_eq!(row_group_res, row_group_exp);
1613 }
1614
1615 #[test]
1616 fn test_row_group_metadata_thrift_conversion_empty() {
1617 let schema_descr = get_test_schema_descr();
1618
1619 let row_group_meta = RowGroupMetaData::builder(schema_descr).build();
1620
1621 assert!(row_group_meta.is_err());
1622 if let Err(e) = row_group_meta {
1623 assert_eq!(
1624 format!("{e}"),
1625 "Parquet error: Column length mismatch: 2 != 0"
1626 );
1627 }
1628 }
1629
1630 #[test]
1632 fn test_row_group_metadata_thrift_corrupted() {
1633 let schema_descr_2cols = Arc::new(SchemaDescriptor::new(Arc::new(
1634 SchemaType::group_type_builder("schema")
1635 .with_fields(vec![
1636 Arc::new(
1637 SchemaType::primitive_type_builder("a", Type::INT32)
1638 .build()
1639 .unwrap(),
1640 ),
1641 Arc::new(
1642 SchemaType::primitive_type_builder("b", Type::INT32)
1643 .build()
1644 .unwrap(),
1645 ),
1646 ])
1647 .build()
1648 .unwrap(),
1649 )));
1650
1651 let schema_descr_3cols = Arc::new(SchemaDescriptor::new(Arc::new(
1652 SchemaType::group_type_builder("schema")
1653 .with_fields(vec![
1654 Arc::new(
1655 SchemaType::primitive_type_builder("a", Type::INT32)
1656 .build()
1657 .unwrap(),
1658 ),
1659 Arc::new(
1660 SchemaType::primitive_type_builder("b", Type::INT32)
1661 .build()
1662 .unwrap(),
1663 ),
1664 Arc::new(
1665 SchemaType::primitive_type_builder("c", Type::INT32)
1666 .build()
1667 .unwrap(),
1668 ),
1669 ])
1670 .build()
1671 .unwrap(),
1672 )));
1673
1674 let row_group_meta_2cols = RowGroupMetaData::builder(schema_descr_2cols.clone())
1675 .set_num_rows(1000)
1676 .set_total_byte_size(2000)
1677 .set_column_metadata(vec![
1678 ColumnChunkMetaData::builder(schema_descr_2cols.column(0))
1679 .build()
1680 .unwrap(),
1681 ColumnChunkMetaData::builder(schema_descr_2cols.column(1))
1682 .build()
1683 .unwrap(),
1684 ])
1685 .set_ordinal(1)
1686 .build()
1687 .unwrap();
1688
1689 let err =
1690 RowGroupMetaData::from_thrift(schema_descr_3cols, row_group_meta_2cols.to_thrift())
1691 .unwrap_err()
1692 .to_string();
1693 assert_eq!(
1694 err,
1695 "Parquet error: Column count mismatch. Schema has 3 columns while Row Group has 2"
1696 );
1697 }
1698
1699 #[test]
1700 fn test_column_chunk_metadata_thrift_conversion() {
1701 let column_descr = get_test_schema_descr().column(0);
1702
1703 let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
1704 .set_encodings(vec![Encoding::PLAIN, Encoding::RLE])
1705 .set_file_path("file_path".to_owned())
1706 .set_num_values(1000)
1707 .set_compression(Compression::SNAPPY)
1708 .set_total_compressed_size(2000)
1709 .set_total_uncompressed_size(3000)
1710 .set_data_page_offset(4000)
1711 .set_dictionary_page_offset(Some(5000))
1712 .set_page_encoding_stats(vec![
1713 PageEncodingStats {
1714 page_type: PageType::DATA_PAGE,
1715 encoding: Encoding::PLAIN,
1716 count: 3,
1717 },
1718 PageEncodingStats {
1719 page_type: PageType::DATA_PAGE,
1720 encoding: Encoding::RLE,
1721 count: 5,
1722 },
1723 ])
1724 .set_bloom_filter_offset(Some(6000))
1725 .set_bloom_filter_length(Some(25))
1726 .set_offset_index_offset(Some(7000))
1727 .set_offset_index_length(Some(25))
1728 .set_column_index_offset(Some(8000))
1729 .set_column_index_length(Some(25))
1730 .set_unencoded_byte_array_data_bytes(Some(2000))
1731 .set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
1732 .set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
1733 .build()
1734 .unwrap();
1735
1736 let col_chunk_res =
1737 ColumnChunkMetaData::from_thrift(column_descr, col_metadata.to_thrift()).unwrap();
1738
1739 assert_eq!(col_chunk_res, col_metadata);
1740 }
1741
1742 #[test]
1743 fn test_column_chunk_metadata_thrift_conversion_empty() {
1744 let column_descr = get_test_schema_descr().column(0);
1745
1746 let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
1747 .build()
1748 .unwrap();
1749
1750 let col_chunk_exp = col_metadata.to_thrift();
1751 let col_chunk_res = ColumnChunkMetaData::from_thrift(column_descr, col_chunk_exp.clone())
1752 .unwrap()
1753 .to_thrift();
1754
1755 assert_eq!(col_chunk_res, col_chunk_exp);
1756 }
1757
1758 #[test]
1759 fn test_compressed_size() {
1760 let schema_descr = get_test_schema_descr();
1761
1762 let mut columns = vec![];
1763 for column_descr in schema_descr.columns() {
1764 let column = ColumnChunkMetaData::builder(column_descr.clone())
1765 .set_total_compressed_size(500)
1766 .set_total_uncompressed_size(700)
1767 .build()
1768 .unwrap();
1769 columns.push(column);
1770 }
1771 let row_group_meta = RowGroupMetaData::builder(schema_descr)
1772 .set_num_rows(1000)
1773 .set_column_metadata(columns)
1774 .build()
1775 .unwrap();
1776
1777 let compressed_size_res: i64 = row_group_meta.compressed_size();
1778 let compressed_size_exp: i64 = 1000;
1779
1780 assert_eq!(compressed_size_res, compressed_size_exp);
1781 }
1782
1783 #[test]
1784 fn test_memory_size() {
1785 let schema_descr = get_test_schema_descr();
1786
1787 let columns = schema_descr
1788 .columns()
1789 .iter()
1790 .map(|column_descr| {
1791 ColumnChunkMetaData::builder(column_descr.clone())
1792 .set_statistics(Statistics::new::<i32>(None, None, None, None, false))
1793 .build()
1794 })
1795 .collect::<Result<Vec<_>>>()
1796 .unwrap();
1797 let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
1798 .set_num_rows(1000)
1799 .set_column_metadata(columns)
1800 .build()
1801 .unwrap();
1802 let row_group_meta = vec![row_group_meta];
1803
1804 let version = 2;
1805 let num_rows = 1000;
1806 let created_by = Some(String::from("test harness"));
1807 let key_value_metadata = Some(vec![KeyValue::new(
1808 String::from("Foo"),
1809 Some(String::from("bar")),
1810 )]);
1811 let column_orders = Some(vec![
1812 ColumnOrder::UNDEFINED,
1813 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1814 ]);
1815 let file_metadata = FileMetaData::new(
1816 version,
1817 num_rows,
1818 created_by,
1819 key_value_metadata,
1820 schema_descr.clone(),
1821 column_orders,
1822 );
1823
1824 let columns_with_stats = schema_descr
1826 .columns()
1827 .iter()
1828 .map(|column_descr| {
1829 ColumnChunkMetaData::builder(column_descr.clone())
1830 .set_statistics(Statistics::new::<i32>(
1831 Some(0),
1832 Some(100),
1833 None,
1834 None,
1835 false,
1836 ))
1837 .build()
1838 })
1839 .collect::<Result<Vec<_>>>()
1840 .unwrap();
1841
1842 let row_group_meta_with_stats = RowGroupMetaData::builder(schema_descr)
1843 .set_num_rows(1000)
1844 .set_column_metadata(columns_with_stats)
1845 .build()
1846 .unwrap();
1847 let row_group_meta_with_stats = vec![row_group_meta_with_stats];
1848
1849 let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
1850 .set_row_groups(row_group_meta_with_stats)
1851 .build();
1852 let base_expected_size = 2312;
1853
1854 assert_eq!(parquet_meta.memory_size(), base_expected_size);
1855
1856 let mut column_index = ColumnIndexBuilder::new();
1857 column_index.append(false, vec![1u8], vec![2u8, 3u8], 4);
1858 let column_index = column_index.build_to_thrift();
1859 let native_index = NativeIndex::<bool>::try_new(column_index).unwrap();
1860
1861 let mut offset_index = OffsetIndexBuilder::new();
1863 offset_index.append_row_count(1);
1864 offset_index.append_offset_and_size(2, 3);
1865 offset_index.append_unencoded_byte_array_data_bytes(Some(10));
1866 offset_index.append_row_count(1);
1867 offset_index.append_offset_and_size(2, 3);
1868 offset_index.append_unencoded_byte_array_data_bytes(Some(10));
1869 let offset_index = offset_index.build_to_thrift();
1870
1871 let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
1872 .set_row_groups(row_group_meta)
1873 .set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]]))
1874 .set_offset_index(Some(vec![vec![
1875 OffsetIndexMetaData::try_new(offset_index).unwrap()
1876 ]]))
1877 .build();
1878
1879 let bigger_expected_size = 2816;
1880 assert!(bigger_expected_size > base_expected_size);
1882 assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
1883 }
1884
1885 fn get_test_schema_descr() -> SchemaDescPtr {
1887 let schema = SchemaType::group_type_builder("schema")
1888 .with_fields(vec![
1889 Arc::new(
1890 SchemaType::primitive_type_builder("a", Type::INT32)
1891 .build()
1892 .unwrap(),
1893 ),
1894 Arc::new(
1895 SchemaType::primitive_type_builder("b", Type::INT32)
1896 .build()
1897 .unwrap(),
1898 ),
1899 ])
1900 .build()
1901 .unwrap();
1902
1903 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1904 }
1905}