parquet/arrow/async_reader/
metadata.rs1use crate::arrow::async_reader::AsyncFileReader;
19use crate::errors::{ParquetError, Result};
20use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
21use crate::file::page_index::index::Index;
22use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
23use crate::file::FOOTER_SIZE;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use futures::FutureExt;
27use std::future::Future;
28use std::ops::Range;
29
30pub trait MetadataFetch {
66 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
71}
72
73impl<T: AsyncFileReader> MetadataFetch for &mut T {
74 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
75 self.get_bytes(range)
76 }
77}
78
79pub trait MetadataSuffixFetch: MetadataFetch {
82 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
87}
88
89pub struct MetadataLoader<F> {
91 fetch: F,
93 metadata: ParquetMetaData,
95 remainder: Option<(usize, Bytes)>,
97}
98
99impl<F: MetadataFetch> MetadataLoader<F> {
100 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
104 pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
105 if file_size < FOOTER_SIZE {
106 return Err(ParquetError::EOF(format!(
107 "file size of {file_size} is less than footer"
108 )));
109 }
110
111 let footer_start = if let Some(size_hint) = prefetch {
114 let size_hint = std::cmp::max(size_hint, FOOTER_SIZE);
116 file_size.saturating_sub(size_hint)
117 } else {
118 file_size - FOOTER_SIZE
119 };
120
121 let suffix = fetch.fetch(footer_start as u64..file_size as u64).await?;
122 let suffix_len = suffix.len();
123
124 let mut footer = [0; FOOTER_SIZE];
125 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
126
127 let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
128 let length = footer.metadata_length();
129
130 if file_size < length + FOOTER_SIZE {
131 return Err(ParquetError::EOF(format!(
132 "file size of {} is less than footer + metadata {}",
133 file_size,
134 length + FOOTER_SIZE
135 )));
136 }
137
138 let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
140 let metadata_start = file_size - length - FOOTER_SIZE;
141 let meta = fetch
142 .fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64)
143 .await?;
144 (ParquetMetaDataReader::decode_metadata(&meta)?, None)
145 } else {
146 let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
147
148 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
149 (
150 ParquetMetaDataReader::decode_metadata(slice)?,
151 Some((footer_start, suffix.slice(..metadata_start))),
152 )
153 };
154
155 Ok(Self {
156 fetch,
157 metadata,
158 remainder,
159 })
160 }
161
162 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
164 pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
165 Self {
166 fetch,
167 metadata,
168 remainder: None,
169 }
170 }
171
172 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
177 pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
178 if !column_index && !offset_index {
179 return Ok(());
180 }
181
182 let mut range = None;
183 for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
184 range = acc_range(range, c.column_index_range());
185 range = acc_range(range, c.offset_index_range());
186 }
187 let range = match range {
188 None => return Ok(()),
189 Some(range) => range,
190 };
191
192 let data = match &self.remainder {
193 Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
194 let remainder_start = *remainder_start as u64;
195 let range_start = usize::try_from(range.start - remainder_start)?;
196 let range_end = usize::try_from(range.end - remainder_start)?;
197 remainder.slice(range_start..range_end)
198 }
199 _ => self.fetch.fetch(range.start..range.end).await?,
201 };
202
203 assert_eq!(data.len(), (range.end - range.start) as usize);
205 let offset = range.start;
206
207 if column_index {
208 let index = self
209 .metadata
210 .row_groups()
211 .iter()
212 .map(|x| {
213 x.columns()
214 .iter()
215 .map(|c| match c.column_index_range() {
216 Some(r) => {
217 let r_start = usize::try_from(r.start - offset)?;
218 let r_end = usize::try_from(r.end - offset)?;
219 decode_column_index(&data[r_start..r_end], c.column_type())
220 }
221 None => Ok(Index::NONE),
222 })
223 .collect::<Result<Vec<_>>>()
224 })
225 .collect::<Result<Vec<_>>>()?;
226
227 self.metadata.set_column_index(Some(index));
228 }
229
230 if offset_index {
231 let index = self
232 .metadata
233 .row_groups()
234 .iter()
235 .map(|x| {
236 x.columns()
237 .iter()
238 .map(|c| match c.offset_index_range() {
239 Some(r) => {
240 let r_start = usize::try_from(r.start - offset)?;
241 let r_end = usize::try_from(r.end - offset)?;
242 decode_offset_index(&data[r_start..r_end])
243 }
244 None => Err(general_err!("missing offset index")),
245 })
246 .collect::<Result<Vec<_>>>()
247 })
248 .collect::<Result<Vec<_>>>()?;
249
250 self.metadata.set_offset_index(Some(index));
251 }
252
253 Ok(())
254 }
255
256 pub fn finish(self) -> ParquetMetaData {
258 self.metadata
259 }
260}
261
262struct MetadataFetchFn<F>(F);
263
264impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
265where
266 F: FnMut(Range<usize>) -> Fut + Send,
267 Fut: Future<Output = Result<Bytes>> + Send,
268{
269 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
270 async move { self.0(range.start.try_into()?..range.end.try_into()?).await }.boxed()
271 }
272}
273
274#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
291pub async fn fetch_parquet_metadata<F, Fut>(
292 fetch: F,
293 file_size: usize,
294 prefetch: Option<usize>,
295) -> Result<ParquetMetaData>
296where
297 F: FnMut(Range<usize>) -> Fut + Send,
298 Fut: Future<Output = Result<Bytes>> + Send,
299{
300 let file_size = u64::try_from(file_size)?;
301 let fetch = MetadataFetchFn(fetch);
302 ParquetMetaDataReader::new()
303 .with_prefetch_hint(prefetch)
304 .load_and_finish(fetch, file_size)
305 .await
306}
307
308#[allow(deprecated)]
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use crate::file::reader::{FileReader, Length, SerializedFileReader};
314 use crate::util::test_common::file_util::get_test_file;
315 use std::fs::File;
316 use std::io::{Read, Seek, SeekFrom};
317 use std::sync::atomic::{AtomicUsize, Ordering};
318
319 fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
320 file.seek(SeekFrom::Start(range.start as _))?;
321 let len = range.end - range.start;
322 let mut buf = Vec::with_capacity(len);
323 file.take(len as _).read_to_end(&mut buf)?;
324 Ok(buf.into())
325 }
326
327 #[tokio::test]
328 async fn test_simple() {
329 let mut file = get_test_file("nulls.snappy.parquet");
330 let len = file.len() as usize;
331
332 let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
333 let expected = reader.metadata().file_metadata().schema();
334 let fetch_count = AtomicUsize::new(0);
335
336 let mut fetch = |range| {
337 fetch_count.fetch_add(1, Ordering::SeqCst);
338 futures::future::ready(read_range(&mut file, range))
339 };
340
341 let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
342 assert_eq!(actual.file_metadata().schema(), expected);
343 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
344
345 fetch_count.store(0, Ordering::SeqCst);
347 let actual = fetch_parquet_metadata(&mut fetch, len, Some(7))
348 .await
349 .unwrap();
350 assert_eq!(actual.file_metadata().schema(), expected);
351 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
352
353 fetch_count.store(0, Ordering::SeqCst);
355 let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
356 .await
357 .unwrap();
358 assert_eq!(actual.file_metadata().schema(), expected);
359 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
360
361 fetch_count.store(0, Ordering::SeqCst);
363 let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
364 .await
365 .unwrap();
366 assert_eq!(actual.file_metadata().schema(), expected);
367 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
368
369 fetch_count.store(0, Ordering::SeqCst);
371 let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
372 .await
373 .unwrap();
374 assert_eq!(actual.file_metadata().schema(), expected);
375 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
376
377 let err = fetch_parquet_metadata(&mut fetch, 4, None)
378 .await
379 .unwrap_err()
380 .to_string();
381 assert_eq!(err, "EOF: file size of 4 is less than footer");
382
383 let err = fetch_parquet_metadata(&mut fetch, 20, None)
384 .await
385 .unwrap_err()
386 .to_string();
387 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
388 }
389
390 #[tokio::test]
391 async fn test_page_index() {
392 let mut file = get_test_file("alltypes_tiny_pages.parquet");
393 let len = file.len() as usize;
394 let fetch_count = AtomicUsize::new(0);
395 let mut fetch = |range| {
396 fetch_count.fetch_add(1, Ordering::SeqCst);
397 futures::future::ready(read_range(&mut file, range))
398 };
399
400 let f = MetadataFetchFn(&mut fetch);
401 let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
402 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
403 loader.load_page_index(true, true).await.unwrap();
404 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
405 let metadata = loader.finish();
406 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
407
408 fetch_count.store(0, Ordering::SeqCst);
410 let f = MetadataFetchFn(&mut fetch);
411 let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
412 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
413 loader.load_page_index(true, true).await.unwrap();
414 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
415 let metadata = loader.finish();
416 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
417
418 fetch_count.store(0, Ordering::SeqCst);
420 let f = MetadataFetchFn(&mut fetch);
421 let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
422 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
423 loader.load_page_index(true, true).await.unwrap();
424 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
425 let metadata = loader.finish();
426 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
427
428 fetch_count.store(0, Ordering::SeqCst);
430 let f = MetadataFetchFn(&mut fetch);
431 let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
432 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
433 loader.load_page_index(true, true).await.unwrap();
434 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
435 let metadata = loader.finish();
436 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
437 }
438}