parquet/arrow/async_reader/
metadata.rs1use crate::arrow::async_reader::AsyncFileReader;
19use crate::errors::{ParquetError, Result};
20use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
21use crate::file::page_index::index::Index;
22use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
23use crate::file::FOOTER_SIZE;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use futures::FutureExt;
27use std::future::Future;
28use std::ops::Range;
29
30pub trait MetadataFetch {
65 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
70}
71
72impl<T: AsyncFileReader> MetadataFetch for &mut T {
73 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
74 self.get_bytes(range)
75 }
76}
77
78pub trait MetadataSuffixFetch: MetadataFetch {
81 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
86}
87
88pub struct MetadataLoader<F> {
90 fetch: F,
92 metadata: ParquetMetaData,
94 remainder: Option<(usize, Bytes)>,
96}
97
98impl<F: MetadataFetch> MetadataLoader<F> {
99 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
103 pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
104 if file_size < FOOTER_SIZE {
105 return Err(ParquetError::EOF(format!(
106 "file size of {file_size} is less than footer"
107 )));
108 }
109
110 let footer_start = if let Some(size_hint) = prefetch {
113 let size_hint = std::cmp::max(size_hint, FOOTER_SIZE);
115 file_size.saturating_sub(size_hint)
116 } else {
117 file_size - FOOTER_SIZE
118 };
119
120 let suffix = fetch.fetch(footer_start..file_size).await?;
121 let suffix_len = suffix.len();
122
123 let mut footer = [0; FOOTER_SIZE];
124 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
125
126 let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
127 let length = footer.metadata_length();
128
129 if file_size < length + FOOTER_SIZE {
130 return Err(ParquetError::EOF(format!(
131 "file size of {} is less than footer + metadata {}",
132 file_size,
133 length + FOOTER_SIZE
134 )));
135 }
136
137 let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
139 let metadata_start = file_size - length - FOOTER_SIZE;
140 let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
141 (ParquetMetaDataReader::decode_metadata(&meta)?, None)
142 } else {
143 let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
144
145 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
146 (
147 ParquetMetaDataReader::decode_metadata(slice)?,
148 Some((footer_start, suffix.slice(..metadata_start))),
149 )
150 };
151
152 Ok(Self {
153 fetch,
154 metadata,
155 remainder,
156 })
157 }
158
159 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
161 pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
162 Self {
163 fetch,
164 metadata,
165 remainder: None,
166 }
167 }
168
169 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
174 pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
175 if !column_index && !offset_index {
176 return Ok(());
177 }
178
179 let mut range = None;
180 for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
181 range = acc_range(range, c.column_index_range());
182 range = acc_range(range, c.offset_index_range());
183 }
184 let range = match range {
185 None => return Ok(()),
186 Some(range) => range,
187 };
188
189 let data = match &self.remainder {
190 Some((remainder_start, remainder)) if *remainder_start <= range.start => {
191 let offset = range.start - *remainder_start;
192 remainder.slice(offset..range.end - *remainder_start + offset)
193 }
194 _ => self.fetch.fetch(range.start..range.end).await?,
196 };
197
198 assert_eq!(data.len(), range.end - range.start);
200 let offset = range.start;
201
202 if column_index {
203 let index = self
204 .metadata
205 .row_groups()
206 .iter()
207 .map(|x| {
208 x.columns()
209 .iter()
210 .map(|c| match c.column_index_range() {
211 Some(r) => decode_column_index(
212 &data[r.start - offset..r.end - offset],
213 c.column_type(),
214 ),
215 None => Ok(Index::NONE),
216 })
217 .collect::<Result<Vec<_>>>()
218 })
219 .collect::<Result<Vec<_>>>()?;
220
221 self.metadata.set_column_index(Some(index));
222 }
223
224 if offset_index {
225 let index = self
226 .metadata
227 .row_groups()
228 .iter()
229 .map(|x| {
230 x.columns()
231 .iter()
232 .map(|c| match c.offset_index_range() {
233 Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
234 None => Err(general_err!("missing offset index")),
235 })
236 .collect::<Result<Vec<_>>>()
237 })
238 .collect::<Result<Vec<_>>>()?;
239
240 self.metadata.set_offset_index(Some(index));
241 }
242
243 Ok(())
244 }
245
246 pub fn finish(self) -> ParquetMetaData {
248 self.metadata
249 }
250}
251
252struct MetadataFetchFn<F>(F);
253
254impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
255where
256 F: FnMut(Range<usize>) -> Fut + Send,
257 Fut: Future<Output = Result<Bytes>> + Send,
258{
259 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
260 async move { self.0(range).await }.boxed()
261 }
262}
263
264#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
281pub async fn fetch_parquet_metadata<F, Fut>(
282 fetch: F,
283 file_size: usize,
284 prefetch: Option<usize>,
285) -> Result<ParquetMetaData>
286where
287 F: FnMut(Range<usize>) -> Fut + Send,
288 Fut: Future<Output = Result<Bytes>> + Send,
289{
290 let fetch = MetadataFetchFn(fetch);
291 ParquetMetaDataReader::new()
292 .with_prefetch_hint(prefetch)
293 .load_and_finish(fetch, file_size)
294 .await
295}
296
297#[allow(deprecated)]
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use crate::file::reader::{FileReader, Length, SerializedFileReader};
303 use crate::util::test_common::file_util::get_test_file;
304 use std::fs::File;
305 use std::io::{Read, Seek, SeekFrom};
306 use std::sync::atomic::{AtomicUsize, Ordering};
307
308 fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
309 file.seek(SeekFrom::Start(range.start as _))?;
310 let len = range.end - range.start;
311 let mut buf = Vec::with_capacity(len);
312 file.take(len as _).read_to_end(&mut buf)?;
313 Ok(buf.into())
314 }
315
316 #[tokio::test]
317 async fn test_simple() {
318 let mut file = get_test_file("nulls.snappy.parquet");
319 let len = file.len() as usize;
320
321 let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
322 let expected = reader.metadata().file_metadata().schema();
323 let fetch_count = AtomicUsize::new(0);
324
325 let mut fetch = |range| {
326 fetch_count.fetch_add(1, Ordering::SeqCst);
327 futures::future::ready(read_range(&mut file, range))
328 };
329
330 let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
331 assert_eq!(actual.file_metadata().schema(), expected);
332 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
333
334 fetch_count.store(0, Ordering::SeqCst);
336 let actual = fetch_parquet_metadata(&mut fetch, len, Some(7))
337 .await
338 .unwrap();
339 assert_eq!(actual.file_metadata().schema(), expected);
340 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
341
342 fetch_count.store(0, Ordering::SeqCst);
344 let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
345 .await
346 .unwrap();
347 assert_eq!(actual.file_metadata().schema(), expected);
348 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
349
350 fetch_count.store(0, Ordering::SeqCst);
352 let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
353 .await
354 .unwrap();
355 assert_eq!(actual.file_metadata().schema(), expected);
356 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
357
358 fetch_count.store(0, Ordering::SeqCst);
360 let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
361 .await
362 .unwrap();
363 assert_eq!(actual.file_metadata().schema(), expected);
364 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
365
366 let err = fetch_parquet_metadata(&mut fetch, 4, None)
367 .await
368 .unwrap_err()
369 .to_string();
370 assert_eq!(err, "EOF: file size of 4 is less than footer");
371
372 let err = fetch_parquet_metadata(&mut fetch, 20, None)
373 .await
374 .unwrap_err()
375 .to_string();
376 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
377 }
378
379 #[tokio::test]
380 async fn test_page_index() {
381 let mut file = get_test_file("alltypes_tiny_pages.parquet");
382 let len = file.len() as usize;
383 let fetch_count = AtomicUsize::new(0);
384 let mut fetch = |range| {
385 fetch_count.fetch_add(1, Ordering::SeqCst);
386 futures::future::ready(read_range(&mut file, range))
387 };
388
389 let f = MetadataFetchFn(&mut fetch);
390 let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
391 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
392 loader.load_page_index(true, true).await.unwrap();
393 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
394 let metadata = loader.finish();
395 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
396
397 fetch_count.store(0, Ordering::SeqCst);
399 let f = MetadataFetchFn(&mut fetch);
400 let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
401 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
402 loader.load_page_index(true, true).await.unwrap();
403 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
404 let metadata = loader.finish();
405 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
406
407 fetch_count.store(0, Ordering::SeqCst);
409 let f = MetadataFetchFn(&mut fetch);
410 let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
411 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
412 loader.load_page_index(true, true).await.unwrap();
413 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
414 let metadata = loader.finish();
415 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
416
417 fetch_count.store(0, Ordering::SeqCst);
419 let f = MetadataFetchFn(&mut fetch);
420 let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
421 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
422 loader.load_page_index(true, true).await.unwrap();
423 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
424 let metadata = loader.finish();
425 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
426 }
427}