parquet/arrow/async_reader/
metadata.rsuse crate::arrow::async_reader::AsyncFileReader;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use crate::file::FOOTER_SIZE;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::future::Future;
use std::ops::Range;
pub trait MetadataFetch {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
}
impl<T: AsyncFileReader> MetadataFetch for &mut T {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
self.get_bytes(range)
}
}
pub struct MetadataLoader<F> {
fetch: F,
metadata: ParquetMetaData,
remainder: Option<(usize, Bytes)>,
}
impl<F: MetadataFetch> MetadataLoader<F> {
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
if file_size < FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
"file size of {file_size} is less than footer"
)));
}
let footer_start = if let Some(size_hint) = prefetch {
let size_hint = std::cmp::max(size_hint, FOOTER_SIZE);
file_size.saturating_sub(size_hint)
} else {
file_size - FOOTER_SIZE
};
let suffix = fetch.fetch(footer_start..file_size).await?;
let suffix_len = suffix.len();
let mut footer = [0; FOOTER_SIZE];
footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
let length = ParquetMetaDataReader::decode_footer(&footer)?;
if file_size < length + FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
"file size of {} is less than footer + metadata {}",
file_size,
length + FOOTER_SIZE
)));
}
let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
let metadata_start = file_size - length - FOOTER_SIZE;
let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
(ParquetMetaDataReader::decode_metadata(&meta)?, None)
} else {
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
(
ParquetMetaDataReader::decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
)
};
Ok(Self {
fetch,
metadata,
remainder,
})
}
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
Self {
fetch,
metadata,
remainder: None,
}
}
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
if !column_index && !offset_index {
return Ok(());
}
let mut range = None;
for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
range = acc_range(range, c.column_index_range());
range = acc_range(range, c.offset_index_range());
}
let range = match range {
None => return Ok(()),
Some(range) => range,
};
let data = match &self.remainder {
Some((remainder_start, remainder)) if *remainder_start <= range.start => {
let offset = range.start - *remainder_start;
remainder.slice(offset..range.end - *remainder_start + offset)
}
_ => self.fetch.fetch(range.start..range.end).await?,
};
assert_eq!(data.len(), range.end - range.start);
let offset = range.start;
if column_index {
let index = self
.metadata
.row_groups()
.iter()
.map(|x| {
x.columns()
.iter()
.map(|c| match c.column_index_range() {
Some(r) => decode_column_index(
&data[r.start - offset..r.end - offset],
c.column_type(),
),
None => Ok(Index::NONE),
})
.collect::<Result<Vec<_>>>()
})
.collect::<Result<Vec<_>>>()?;
self.metadata.set_column_index(Some(index));
}
if offset_index {
let index = self
.metadata
.row_groups()
.iter()
.map(|x| {
x.columns()
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
None => Err(general_err!("missing offset index")),
})
.collect::<Result<Vec<_>>>()
})
.collect::<Result<Vec<_>>>()?;
self.metadata.set_offset_index(Some(index));
}
Ok(())
}
pub fn finish(self) -> ParquetMetaData {
self.metadata
}
}
struct MetadataFetchFn<F>(F);
impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
{
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
async move { self.0(range).await }.boxed()
}
}
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn fetch_parquet_metadata<F, Fut>(
fetch: F,
file_size: usize,
prefetch: Option<usize>,
) -> Result<ParquetMetaData>
where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
ParquetMetaDataReader::new()
.with_prefetch_hint(prefetch)
.load_and_finish(fetch, file_size)
.await
}
#[allow(deprecated)]
#[cfg(test)]
mod tests {
use super::*;
use crate::file::reader::{FileReader, Length, SerializedFileReader};
use crate::util::test_common::file_util::get_test_file;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::sync::atomic::{AtomicUsize, Ordering};
fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
file.seek(SeekFrom::Start(range.start as _))?;
let len = range.end - range.start;
let mut buf = Vec::with_capacity(len);
file.take(len as _).read_to_end(&mut buf)?;
Ok(buf.into())
}
#[tokio::test]
async fn test_simple() {
let mut file = get_test_file("nulls.snappy.parquet");
let len = file.len() as usize;
let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
let expected = reader.metadata().file_metadata().schema();
let fetch_count = AtomicUsize::new(0);
let mut fetch = |range| {
fetch_count.fetch_add(1, Ordering::SeqCst);
futures::future::ready(read_range(&mut file, range))
};
let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(7))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
let err = fetch_parquet_metadata(&mut fetch, 4, None)
.await
.unwrap_err()
.to_string();
assert_eq!(err, "EOF: file size of 4 is less than footer");
let err = fetch_parquet_metadata(&mut fetch, 20, None)
.await
.unwrap_err()
.to_string();
assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
}
#[tokio::test]
async fn test_page_index() {
let mut file = get_test_file("alltypes_tiny_pages.parquet");
let len = file.len() as usize;
let fetch_count = AtomicUsize::new(0);
let mut fetch = |range| {
fetch_count.fetch_add(1, Ordering::SeqCst);
futures::future::ready(read_range(&mut file, range))
};
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
let metadata = loader.finish();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
}
}