parquet/file/metadata/
parser.rs1use crate::errors::ParquetError;
24use crate::file::metadata::thrift::parquet_metadata_from_bytes;
25use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData};
26
27use crate::file::page_index::column_index::ColumnIndexMetaData;
28use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index};
29use crate::file::page_index::offset_index::OffsetIndexMetaData;
30use bytes::Bytes;
31
32pub(crate) use inner::MetadataParser;
40
41#[cfg(feature = "encryption")]
42mod inner {
43 use std::sync::Arc;
44
45 use super::*;
46 use crate::encryption::decrypt::FileDecryptionProperties;
47 use crate::errors::Result;
48
49 #[derive(Debug, Default)]
51 pub(crate) struct MetadataParser {
52 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
54 }
55
56 impl MetadataParser {
57 pub(crate) fn new() -> Self {
58 MetadataParser::default()
59 }
60
61 pub(crate) fn with_file_decryption_properties(
62 mut self,
63 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
64 ) -> Self {
65 self.file_decryption_properties = file_decryption_properties;
66 self
67 }
68
69 pub(crate) fn decode_metadata(
70 &self,
71 buf: &[u8],
72 encrypted_footer: bool,
73 ) -> Result<ParquetMetaData> {
74 if encrypted_footer || self.file_decryption_properties.is_some() {
75 crate::file::metadata::thrift::encryption::parquet_metadata_with_encryption(
76 self.file_decryption_properties.as_ref(),
77 encrypted_footer,
78 buf,
79 )
80 } else {
81 decode_metadata(buf)
82 }
83 }
84 }
85
86 pub(super) fn parse_single_column_index(
87 bytes: &[u8],
88 metadata: &ParquetMetaData,
89 column: &ColumnChunkMetaData,
90 row_group_index: usize,
91 col_index: usize,
92 ) -> crate::errors::Result<ColumnIndexMetaData> {
93 use crate::encryption::decrypt::CryptoContext;
94 match &column.column_crypto_metadata {
95 Some(crypto_metadata) => {
96 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
97 general_err!("Cannot decrypt column index, no file decryptor set")
98 })?;
99 let crypto_context = CryptoContext::for_column(
100 file_decryptor,
101 crypto_metadata,
102 row_group_index,
103 col_index,
104 )?;
105 let column_decryptor = crypto_context.metadata_decryptor();
106 let aad = crypto_context.create_column_index_aad()?;
107 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
108 decode_column_index(&plaintext, column.column_type())
109 }
110 None => decode_column_index(bytes, column.column_type()),
111 }
112 }
113
114 pub(super) fn parse_single_offset_index(
115 bytes: &[u8],
116 metadata: &ParquetMetaData,
117 column: &ColumnChunkMetaData,
118 row_group_index: usize,
119 col_index: usize,
120 ) -> crate::errors::Result<OffsetIndexMetaData> {
121 use crate::encryption::decrypt::CryptoContext;
122 match &column.column_crypto_metadata {
123 Some(crypto_metadata) => {
124 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
125 general_err!("Cannot decrypt offset index, no file decryptor set")
126 })?;
127 let crypto_context = CryptoContext::for_column(
128 file_decryptor,
129 crypto_metadata,
130 row_group_index,
131 col_index,
132 )?;
133 let column_decryptor = crypto_context.metadata_decryptor();
134 let aad = crypto_context.create_offset_index_aad()?;
135 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
136 decode_offset_index(&plaintext)
137 }
138 None => decode_offset_index(bytes),
139 }
140 }
141}
142
143#[cfg(not(feature = "encryption"))]
144mod inner {
145 use super::*;
146 use crate::errors::Result;
147 #[derive(Debug, Default)]
151 pub(crate) struct MetadataParser;
152
153 impl MetadataParser {
154 pub(crate) fn new() -> Self {
155 MetadataParser
156 }
157
158 pub(crate) fn decode_metadata(
159 &self,
160 buf: &[u8],
161 encrypted_footer: bool,
162 ) -> Result<ParquetMetaData> {
163 if encrypted_footer {
164 Err(general_err!(
165 "Parquet file has an encrypted footer but the encryption feature is disabled"
166 ))
167 } else {
168 decode_metadata(buf)
169 }
170 }
171 }
172
173 pub(super) fn parse_single_column_index(
174 bytes: &[u8],
175 _metadata: &ParquetMetaData,
176 column: &ColumnChunkMetaData,
177 _row_group_index: usize,
178 _col_index: usize,
179 ) -> crate::errors::Result<ColumnIndexMetaData> {
180 decode_column_index(bytes, column.column_type())
181 }
182
183 pub(super) fn parse_single_offset_index(
184 bytes: &[u8],
185 _metadata: &ParquetMetaData,
186 _column: &ColumnChunkMetaData,
187 _row_group_index: usize,
188 _col_index: usize,
189 ) -> crate::errors::Result<OffsetIndexMetaData> {
190 decode_offset_index(bytes)
191 }
192}
193
194pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result<ParquetMetaData> {
202 parquet_metadata_from_bytes(buf)
203}
204
205pub(crate) fn parse_column_index(
214 metadata: &mut ParquetMetaData,
215 column_index_policy: PageIndexPolicy,
216 bytes: &Bytes,
217 start_offset: u64,
218) -> crate::errors::Result<()> {
219 if column_index_policy == PageIndexPolicy::Skip {
220 return Ok(());
221 }
222 let index = metadata
223 .row_groups()
224 .iter()
225 .enumerate()
226 .map(|(rg_idx, x)| {
227 x.columns()
228 .iter()
229 .enumerate()
230 .map(|(col_idx, c)| match c.column_index_range() {
231 Some(r) => {
232 let r_start = usize::try_from(r.start - start_offset)?;
233 let r_end = usize::try_from(r.end - start_offset)?;
234 inner::parse_single_column_index(
235 &bytes[r_start..r_end],
236 metadata,
237 c,
238 rg_idx,
239 col_idx,
240 )
241 }
242 None => Ok(ColumnIndexMetaData::NONE),
243 })
244 .collect::<crate::errors::Result<Vec<_>>>()
245 })
246 .collect::<crate::errors::Result<Vec<_>>>()?;
247
248 metadata.set_column_index(Some(index));
249 Ok(())
250}
251
252pub(crate) fn parse_offset_index(
253 metadata: &mut ParquetMetaData,
254 offset_index_policy: PageIndexPolicy,
255 bytes: &Bytes,
256 start_offset: u64,
257) -> crate::errors::Result<()> {
258 if offset_index_policy == PageIndexPolicy::Skip {
259 return Ok(());
260 }
261 let row_groups = metadata.row_groups();
262 let mut all_indexes = Vec::with_capacity(row_groups.len());
263 for (rg_idx, x) in row_groups.iter().enumerate() {
264 let mut row_group_indexes = Vec::with_capacity(x.columns().len());
265 for (col_idx, c) in x.columns().iter().enumerate() {
266 let result = match c.offset_index_range() {
267 Some(r) => {
268 let r_start = usize::try_from(r.start - start_offset)?;
269 let r_end = usize::try_from(r.end - start_offset)?;
270 inner::parse_single_offset_index(
271 &bytes[r_start..r_end],
272 metadata,
273 c,
274 rg_idx,
275 col_idx,
276 )
277 }
278 None => Err(general_err!("missing offset index")),
279 };
280
281 match result {
282 Ok(index) => row_group_indexes.push(index),
283 Err(e) => {
284 if offset_index_policy == PageIndexPolicy::Required {
285 return Err(e);
286 } else {
287 metadata.set_column_index(None);
289 metadata.set_offset_index(None);
290 return Ok(());
291 }
292 }
293 }
294 }
295 all_indexes.push(row_group_indexes);
296 }
297 metadata.set_offset_index(Some(all_indexes));
298 Ok(())
299}