1use crate::errors::ParquetError;
24use crate::file::metadata::thrift::parquet_metadata_from_bytes;
25use crate::file::metadata::{
26 ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions,
27};
28
29use crate::file::page_index::column_index::ColumnIndexMetaData;
30use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index};
31use crate::file::page_index::offset_index::OffsetIndexMetaData;
32use bytes::Bytes;
33
34pub(crate) use inner::MetadataParser;
42
43#[cfg(feature = "encryption")]
44mod inner {
45 use std::sync::Arc;
46
47 use super::*;
48 use crate::encryption::decrypt::FileDecryptionProperties;
49 use crate::errors::Result;
50
51 #[derive(Debug, Default)]
53 pub(crate) struct MetadataParser {
54 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
56 metadata_options: Option<Arc<ParquetMetaDataOptions>>,
58 }
59
60 impl MetadataParser {
61 pub(crate) fn new() -> Self {
62 MetadataParser::default()
63 }
64
65 pub(crate) fn with_file_decryption_properties(
66 mut self,
67 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
68 ) -> Self {
69 self.file_decryption_properties = file_decryption_properties;
70 self
71 }
72
73 pub(crate) fn with_metadata_options(
74 self,
75 options: Option<Arc<ParquetMetaDataOptions>>,
76 ) -> Self {
77 Self {
78 metadata_options: options,
79 ..self
80 }
81 }
82
83 pub(crate) fn decode_metadata(
84 &self,
85 buf: &[u8],
86 encrypted_footer: bool,
87 ) -> Result<ParquetMetaData> {
88 if encrypted_footer || self.file_decryption_properties.is_some() {
89 crate::file::metadata::thrift::encryption::parquet_metadata_with_encryption(
90 self.file_decryption_properties.as_ref(),
91 encrypted_footer,
92 buf,
93 self.metadata_options.as_deref(),
94 )
95 } else {
96 decode_metadata(buf, self.metadata_options.as_deref())
97 }
98 }
99 }
100
101 pub(super) fn parse_single_column_index(
102 bytes: &[u8],
103 metadata: &ParquetMetaData,
104 column: &ColumnChunkMetaData,
105 row_group_index: usize,
106 col_index: usize,
107 ) -> crate::errors::Result<ColumnIndexMetaData> {
108 use crate::encryption::decrypt::CryptoContext;
109 match &column.column_crypto_metadata {
110 Some(crypto_metadata) => {
111 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
112 general_err!("Cannot decrypt column index, no file decryptor set")
113 })?;
114 let crypto_context = CryptoContext::for_column(
115 file_decryptor,
116 crypto_metadata,
117 row_group_index,
118 col_index,
119 )?;
120 let column_decryptor = crypto_context.metadata_decryptor();
121 let aad = crypto_context.create_column_index_aad()?;
122 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
123 decode_column_index(&plaintext, column.column_type())
124 }
125 None => decode_column_index(bytes, column.column_type()),
126 }
127 }
128
129 pub(super) fn parse_single_offset_index(
130 bytes: &[u8],
131 metadata: &ParquetMetaData,
132 column: &ColumnChunkMetaData,
133 row_group_index: usize,
134 col_index: usize,
135 ) -> crate::errors::Result<OffsetIndexMetaData> {
136 use crate::encryption::decrypt::CryptoContext;
137 match &column.column_crypto_metadata {
138 Some(crypto_metadata) => {
139 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
140 general_err!("Cannot decrypt offset index, no file decryptor set")
141 })?;
142 let crypto_context = CryptoContext::for_column(
143 file_decryptor,
144 crypto_metadata,
145 row_group_index,
146 col_index,
147 )?;
148 let column_decryptor = crypto_context.metadata_decryptor();
149 let aad = crypto_context.create_offset_index_aad()?;
150 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
151 decode_offset_index(&plaintext)
152 }
153 None => decode_offset_index(bytes),
154 }
155 }
156}
157
158#[cfg(not(feature = "encryption"))]
159mod inner {
160 use super::*;
161 use crate::errors::Result;
162 use std::sync::Arc;
163 #[derive(Debug, Default)]
167 pub(crate) struct MetadataParser {
168 metadata_options: Option<Arc<ParquetMetaDataOptions>>,
170 }
171
172 impl MetadataParser {
173 pub(crate) fn new() -> Self {
174 MetadataParser::default()
175 }
176
177 pub(crate) fn with_metadata_options(
178 self,
179 options: Option<Arc<ParquetMetaDataOptions>>,
180 ) -> Self {
181 Self {
182 metadata_options: options,
183 }
184 }
185
186 pub(crate) fn decode_metadata(
187 &self,
188 buf: &[u8],
189 encrypted_footer: bool,
190 ) -> Result<ParquetMetaData> {
191 if encrypted_footer {
192 Err(general_err!(
193 "Parquet file has an encrypted footer but the encryption feature is disabled"
194 ))
195 } else {
196 decode_metadata(buf, self.metadata_options.as_deref())
197 }
198 }
199 }
200
201 pub(super) fn parse_single_column_index(
202 bytes: &[u8],
203 _metadata: &ParquetMetaData,
204 column: &ColumnChunkMetaData,
205 _row_group_index: usize,
206 _col_index: usize,
207 ) -> crate::errors::Result<ColumnIndexMetaData> {
208 decode_column_index(bytes, column.column_type())
209 }
210
211 pub(super) fn parse_single_offset_index(
212 bytes: &[u8],
213 _metadata: &ParquetMetaData,
214 _column: &ColumnChunkMetaData,
215 _row_group_index: usize,
216 _col_index: usize,
217 ) -> crate::errors::Result<OffsetIndexMetaData> {
218 decode_offset_index(bytes)
219 }
220}
221
222pub(crate) fn decode_metadata(
230 buf: &[u8],
231 options: Option<&ParquetMetaDataOptions>,
232) -> crate::errors::Result<ParquetMetaData> {
233 parquet_metadata_from_bytes(buf, options)
234}
235
236pub(crate) fn parse_column_index(
245 metadata: &mut ParquetMetaData,
246 column_index_policy: PageIndexPolicy,
247 bytes: &Bytes,
248 start_offset: u64,
249) -> crate::errors::Result<()> {
250 if column_index_policy == PageIndexPolicy::Skip {
251 return Ok(());
252 }
253 let index = metadata
254 .row_groups()
255 .iter()
256 .enumerate()
257 .map(|(rg_idx, x)| {
258 x.columns()
259 .iter()
260 .enumerate()
261 .map(|(col_idx, c)| match c.column_index_range() {
262 Some(r) => {
263 let r_start = usize::try_from(r.start - start_offset)?;
264 let r_end = usize::try_from(r.end - start_offset)?;
265 inner::parse_single_column_index(
266 &bytes[r_start..r_end],
267 metadata,
268 c,
269 rg_idx,
270 col_idx,
271 )
272 }
273 None => Ok(ColumnIndexMetaData::NONE),
274 })
275 .collect::<crate::errors::Result<Vec<_>>>()
276 })
277 .collect::<crate::errors::Result<Vec<_>>>()?;
278
279 metadata.set_column_index(Some(index));
280 Ok(())
281}
282
283pub(crate) fn parse_offset_index(
284 metadata: &mut ParquetMetaData,
285 offset_index_policy: PageIndexPolicy,
286 bytes: &Bytes,
287 start_offset: u64,
288) -> crate::errors::Result<()> {
289 if offset_index_policy == PageIndexPolicy::Skip {
290 return Ok(());
291 }
292 let row_groups = metadata.row_groups();
293 let mut all_indexes = Vec::with_capacity(row_groups.len());
294 for (rg_idx, x) in row_groups.iter().enumerate() {
295 let mut row_group_indexes = Vec::with_capacity(x.columns().len());
296 for (col_idx, c) in x.columns().iter().enumerate() {
297 let result = match c.offset_index_range() {
298 Some(r) => {
299 let r_start = usize::try_from(r.start - start_offset)?;
300 let r_end = usize::try_from(r.end - start_offset)?;
301 inner::parse_single_offset_index(
302 &bytes[r_start..r_end],
303 metadata,
304 c,
305 rg_idx,
306 col_idx,
307 )
308 }
309 None => Err(general_err!("missing offset index")),
310 };
311
312 match result {
313 Ok(index) => row_group_indexes.push(index),
314 Err(e) => {
315 if offset_index_policy == PageIndexPolicy::Required {
316 return Err(e);
317 } else {
318 metadata.set_column_index(None);
320 metadata.set_offset_index(None);
321 return Ok(());
322 }
323 }
324 }
325 }
326 all_indexes.push(row_group_indexes);
327 }
328 metadata.set_offset_index(Some(all_indexes));
329 Ok(())
330}