parquet/arrow/async_reader/
metadata.rs1use crate::arrow::async_reader::AsyncFileReader;
19use crate::errors::{ParquetError, Result};
20use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
21use crate::file::page_index::index::Index;
22use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
23use crate::file::FOOTER_SIZE;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use futures::FutureExt;
27use std::future::Future;
28use std::ops::Range;
29
30pub trait MetadataFetch {
65 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
70}
71
72impl<T: AsyncFileReader> MetadataFetch for &mut T {
73 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
74 self.get_bytes(range)
75 }
76}
77
78pub struct MetadataLoader<F> {
80 fetch: F,
82 metadata: ParquetMetaData,
84 remainder: Option<(usize, Bytes)>,
86}
87
88impl<F: MetadataFetch> MetadataLoader<F> {
89 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
93 pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
94 if file_size < FOOTER_SIZE {
95 return Err(ParquetError::EOF(format!(
96 "file size of {file_size} is less than footer"
97 )));
98 }
99
100 let footer_start = if let Some(size_hint) = prefetch {
103 let size_hint = std::cmp::max(size_hint, FOOTER_SIZE);
105 file_size.saturating_sub(size_hint)
106 } else {
107 file_size - FOOTER_SIZE
108 };
109
110 let suffix = fetch.fetch(footer_start..file_size).await?;
111 let suffix_len = suffix.len();
112
113 let mut footer = [0; FOOTER_SIZE];
114 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
115
116 let length = ParquetMetaDataReader::decode_footer(&footer)?;
117
118 if file_size < length + FOOTER_SIZE {
119 return Err(ParquetError::EOF(format!(
120 "file size of {} is less than footer + metadata {}",
121 file_size,
122 length + FOOTER_SIZE
123 )));
124 }
125
126 let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
128 let metadata_start = file_size - length - FOOTER_SIZE;
129 let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
130 (ParquetMetaDataReader::decode_metadata(&meta)?, None)
131 } else {
132 let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
133
134 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
135 (
136 ParquetMetaDataReader::decode_metadata(slice)?,
137 Some((footer_start, suffix.slice(..metadata_start))),
138 )
139 };
140
141 Ok(Self {
142 fetch,
143 metadata,
144 remainder,
145 })
146 }
147
148 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
150 pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
151 Self {
152 fetch,
153 metadata,
154 remainder: None,
155 }
156 }
157
158 #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
163 pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
164 if !column_index && !offset_index {
165 return Ok(());
166 }
167
168 let mut range = None;
169 for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
170 range = acc_range(range, c.column_index_range());
171 range = acc_range(range, c.offset_index_range());
172 }
173 let range = match range {
174 None => return Ok(()),
175 Some(range) => range,
176 };
177
178 let data = match &self.remainder {
179 Some((remainder_start, remainder)) if *remainder_start <= range.start => {
180 let offset = range.start - *remainder_start;
181 remainder.slice(offset..range.end - *remainder_start + offset)
182 }
183 _ => self.fetch.fetch(range.start..range.end).await?,
185 };
186
187 assert_eq!(data.len(), range.end - range.start);
189 let offset = range.start;
190
191 if column_index {
192 let index = self
193 .metadata
194 .row_groups()
195 .iter()
196 .map(|x| {
197 x.columns()
198 .iter()
199 .map(|c| match c.column_index_range() {
200 Some(r) => decode_column_index(
201 &data[r.start - offset..r.end - offset],
202 c.column_type(),
203 ),
204 None => Ok(Index::NONE),
205 })
206 .collect::<Result<Vec<_>>>()
207 })
208 .collect::<Result<Vec<_>>>()?;
209
210 self.metadata.set_column_index(Some(index));
211 }
212
213 if offset_index {
214 let index = self
215 .metadata
216 .row_groups()
217 .iter()
218 .map(|x| {
219 x.columns()
220 .iter()
221 .map(|c| match c.offset_index_range() {
222 Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
223 None => Err(general_err!("missing offset index")),
224 })
225 .collect::<Result<Vec<_>>>()
226 })
227 .collect::<Result<Vec<_>>>()?;
228
229 self.metadata.set_offset_index(Some(index));
230 }
231
232 Ok(())
233 }
234
235 pub fn finish(self) -> ParquetMetaData {
237 self.metadata
238 }
239}
240
241struct MetadataFetchFn<F>(F);
242
243impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
244where
245 F: FnMut(Range<usize>) -> Fut + Send,
246 Fut: Future<Output = Result<Bytes>> + Send,
247{
248 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
249 async move { self.0(range).await }.boxed()
250 }
251}
252
253#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
270pub async fn fetch_parquet_metadata<F, Fut>(
271 fetch: F,
272 file_size: usize,
273 prefetch: Option<usize>,
274) -> Result<ParquetMetaData>
275where
276 F: FnMut(Range<usize>) -> Fut + Send,
277 Fut: Future<Output = Result<Bytes>> + Send,
278{
279 let fetch = MetadataFetchFn(fetch);
280 ParquetMetaDataReader::new()
281 .with_prefetch_hint(prefetch)
282 .load_and_finish(fetch, file_size)
283 .await
284}
285
286#[allow(deprecated)]
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use crate::file::reader::{FileReader, Length, SerializedFileReader};
292 use crate::util::test_common::file_util::get_test_file;
293 use std::fs::File;
294 use std::io::{Read, Seek, SeekFrom};
295 use std::sync::atomic::{AtomicUsize, Ordering};
296
297 fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
298 file.seek(SeekFrom::Start(range.start as _))?;
299 let len = range.end - range.start;
300 let mut buf = Vec::with_capacity(len);
301 file.take(len as _).read_to_end(&mut buf)?;
302 Ok(buf.into())
303 }
304
305 #[tokio::test]
306 async fn test_simple() {
307 let mut file = get_test_file("nulls.snappy.parquet");
308 let len = file.len() as usize;
309
310 let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
311 let expected = reader.metadata().file_metadata().schema();
312 let fetch_count = AtomicUsize::new(0);
313
314 let mut fetch = |range| {
315 fetch_count.fetch_add(1, Ordering::SeqCst);
316 futures::future::ready(read_range(&mut file, range))
317 };
318
319 let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
320 assert_eq!(actual.file_metadata().schema(), expected);
321 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
322
323 fetch_count.store(0, Ordering::SeqCst);
325 let actual = fetch_parquet_metadata(&mut fetch, len, Some(7))
326 .await
327 .unwrap();
328 assert_eq!(actual.file_metadata().schema(), expected);
329 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
330
331 fetch_count.store(0, Ordering::SeqCst);
333 let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
334 .await
335 .unwrap();
336 assert_eq!(actual.file_metadata().schema(), expected);
337 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
338
339 fetch_count.store(0, Ordering::SeqCst);
341 let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
342 .await
343 .unwrap();
344 assert_eq!(actual.file_metadata().schema(), expected);
345 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
346
347 fetch_count.store(0, Ordering::SeqCst);
349 let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
350 .await
351 .unwrap();
352 assert_eq!(actual.file_metadata().schema(), expected);
353 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
354
355 let err = fetch_parquet_metadata(&mut fetch, 4, None)
356 .await
357 .unwrap_err()
358 .to_string();
359 assert_eq!(err, "EOF: file size of 4 is less than footer");
360
361 let err = fetch_parquet_metadata(&mut fetch, 20, None)
362 .await
363 .unwrap_err()
364 .to_string();
365 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
366 }
367
368 #[tokio::test]
369 async fn test_page_index() {
370 let mut file = get_test_file("alltypes_tiny_pages.parquet");
371 let len = file.len() as usize;
372 let fetch_count = AtomicUsize::new(0);
373 let mut fetch = |range| {
374 fetch_count.fetch_add(1, Ordering::SeqCst);
375 futures::future::ready(read_range(&mut file, range))
376 };
377
378 let f = MetadataFetchFn(&mut fetch);
379 let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
380 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
381 loader.load_page_index(true, true).await.unwrap();
382 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
383 let metadata = loader.finish();
384 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
385
386 fetch_count.store(0, Ordering::SeqCst);
388 let f = MetadataFetchFn(&mut fetch);
389 let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
390 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
391 loader.load_page_index(true, true).await.unwrap();
392 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
393 let metadata = loader.finish();
394 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
395
396 fetch_count.store(0, Ordering::SeqCst);
398 let f = MetadataFetchFn(&mut fetch);
399 let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
400 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
401 loader.load_page_index(true, true).await.unwrap();
402 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
403 let metadata = loader.finish();
404 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
405
406 fetch_count.store(0, Ordering::SeqCst);
408 let f = MetadataFetchFn(&mut fetch);
409 let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
410 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
411 loader.load_page_index(true, true).await.unwrap();
412 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
413 let metadata = loader.finish();
414 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
415 }
416}