parquet/arrow/async_reader/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::async_reader::AsyncFileReader;
19use crate::errors::{ParquetError, Result};
20use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
21use crate::file::page_index::index::Index;
22use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
23use crate::file::FOOTER_SIZE;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use futures::FutureExt;
27use std::future::Future;
28use std::ops::Range;
29
30/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
31///
32/// Note that implementation is provided for [`AsyncFileReader`].
33///
34/// # Example `MetadataFetch` for a custom async data source
35///
36/// ```rust
37/// # use parquet::errors::Result;
38/// # use parquet::arrow::async_reader::MetadataFetch;
39/// # use bytes::Bytes;
40/// # use std::ops::Range;
41/// # use std::io::SeekFrom;
42/// # use futures::future::BoxFuture;
43/// # use futures::FutureExt;
44/// # use tokio::io::{AsyncReadExt, AsyncSeekExt};
45/// // Adapter that implements the API for reading bytes from an async source (in
46/// // this case a tokio::fs::File)
47/// struct TokioFileMetadata {
48///     file: tokio::fs::File,
49/// }
50/// impl MetadataFetch for TokioFileMetadata {
51///     fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
52///         // return a future that fetches data in range
53///         async move {
54///             let len = (range.end - range.start).try_into().unwrap();
55///             let mut buf = vec![0; len]; // target buffer
56///             // seek to the start of the range and read the data
57///             self.file.seek(SeekFrom::Start(range.start)).await?;
58///             self.file.read_exact(&mut buf).await?;
59///             Ok(Bytes::from(buf)) // convert to Bytes
60///         }
61///             .boxed() // turn into BoxedFuture, using FutureExt::boxed
62///     }
63/// }
64///```
65pub trait MetadataFetch {
66    /// Return a future that fetches the specified range of bytes asynchronously
67    ///
68    /// Note the returned type is a boxed future, often created by
69    /// [FutureExt::boxed]. See the trait documentation for an example
70    fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
71}
72
73impl<T: AsyncFileReader> MetadataFetch for &mut T {
74    fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
75        self.get_bytes(range)
76    }
77}
78
79/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`] via suffix
80/// requests, without knowing the file size
81pub trait MetadataSuffixFetch: MetadataFetch {
82    /// Return a future that fetches the last `n` bytes asynchronously
83    ///
84    /// Note the returned type is a boxed future, often created by
85    /// [FutureExt::boxed]. See the trait documentation for an example
86    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
87}
88
89/// An asynchronous interface to load [`ParquetMetaData`] from an async source
90pub struct MetadataLoader<F> {
91    /// Function that fetches byte ranges asynchronously
92    fetch: F,
93    /// The in-progress metadata
94    metadata: ParquetMetaData,
95    /// The offset and bytes of remaining unparsed data
96    remainder: Option<(usize, Bytes)>,
97}
98
99impl<F: MetadataFetch> MetadataLoader<F> {
100    /// Create a new [`MetadataLoader`] by reading the footer information
101    ///
102    /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
103    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
104    pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
105        if file_size < FOOTER_SIZE {
106            return Err(ParquetError::EOF(format!(
107                "file size of {file_size} is less than footer"
108            )));
109        }
110
111        // If a size hint is provided, read more than the minimum size
112        // to try and avoid a second fetch.
113        let footer_start = if let Some(size_hint) = prefetch {
114            // check for hint smaller than footer
115            let size_hint = std::cmp::max(size_hint, FOOTER_SIZE);
116            file_size.saturating_sub(size_hint)
117        } else {
118            file_size - FOOTER_SIZE
119        };
120
121        let suffix = fetch.fetch(footer_start as u64..file_size as u64).await?;
122        let suffix_len = suffix.len();
123
124        let mut footer = [0; FOOTER_SIZE];
125        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
126
127        let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
128        let length = footer.metadata_length();
129
130        if file_size < length + FOOTER_SIZE {
131            return Err(ParquetError::EOF(format!(
132                "file size of {} is less than footer + metadata {}",
133                file_size,
134                length + FOOTER_SIZE
135            )));
136        }
137
138        // Did not fetch the entire file metadata in the initial read, need to make a second request
139        let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
140            let metadata_start = file_size - length - FOOTER_SIZE;
141            let meta = fetch
142                .fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64)
143                .await?;
144            (ParquetMetaDataReader::decode_metadata(&meta)?, None)
145        } else {
146            let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
147
148            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
149            (
150                ParquetMetaDataReader::decode_metadata(slice)?,
151                Some((footer_start, suffix.slice(..metadata_start))),
152            )
153        };
154
155        Ok(Self {
156            fetch,
157            metadata,
158            remainder,
159        })
160    }
161
162    /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
163    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
164    pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
165        Self {
166            fetch,
167            metadata,
168            remainder: None,
169        }
170    }
171
172    /// Loads the page index, if any
173    ///
174    /// * `column_index`: if true will load column index
175    /// * `offset_index`: if true will load offset index
176    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
177    pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
178        if !column_index && !offset_index {
179            return Ok(());
180        }
181
182        let mut range = None;
183        for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
184            range = acc_range(range, c.column_index_range());
185            range = acc_range(range, c.offset_index_range());
186        }
187        let range = match range {
188            None => return Ok(()),
189            Some(range) => range,
190        };
191
192        let data = match &self.remainder {
193            Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
194                let remainder_start = *remainder_start as u64;
195                let range_start = usize::try_from(range.start - remainder_start)?;
196                let range_end = usize::try_from(range.end - remainder_start)?;
197                remainder.slice(range_start..range_end)
198            }
199            // Note: this will potentially fetch data already in remainder, this keeps things simple
200            _ => self.fetch.fetch(range.start..range.end).await?,
201        };
202
203        // Sanity check
204        assert_eq!(data.len(), (range.end - range.start) as usize);
205        let offset = range.start;
206
207        if column_index {
208            let index = self
209                .metadata
210                .row_groups()
211                .iter()
212                .map(|x| {
213                    x.columns()
214                        .iter()
215                        .map(|c| match c.column_index_range() {
216                            Some(r) => {
217                                let r_start = usize::try_from(r.start - offset)?;
218                                let r_end = usize::try_from(r.end - offset)?;
219                                decode_column_index(&data[r_start..r_end], c.column_type())
220                            }
221                            None => Ok(Index::NONE),
222                        })
223                        .collect::<Result<Vec<_>>>()
224                })
225                .collect::<Result<Vec<_>>>()?;
226
227            self.metadata.set_column_index(Some(index));
228        }
229
230        if offset_index {
231            let index = self
232                .metadata
233                .row_groups()
234                .iter()
235                .map(|x| {
236                    x.columns()
237                        .iter()
238                        .map(|c| match c.offset_index_range() {
239                            Some(r) => {
240                                let r_start = usize::try_from(r.start - offset)?;
241                                let r_end = usize::try_from(r.end - offset)?;
242                                decode_offset_index(&data[r_start..r_end])
243                            }
244                            None => Err(general_err!("missing offset index")),
245                        })
246                        .collect::<Result<Vec<_>>>()
247                })
248                .collect::<Result<Vec<_>>>()?;
249
250            self.metadata.set_offset_index(Some(index));
251        }
252
253        Ok(())
254    }
255
256    /// Returns the finished [`ParquetMetaData`]
257    pub fn finish(self) -> ParquetMetaData {
258        self.metadata
259    }
260}
261
262struct MetadataFetchFn<F>(F);
263
264impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
265where
266    F: FnMut(Range<usize>) -> Fut + Send,
267    Fut: Future<Output = Result<Bytes>> + Send,
268{
269    fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
270        async move { self.0(range.start.try_into()?..range.end.try_into()?).await }.boxed()
271    }
272}
273
274/// Fetches parquet metadata
275///
276/// Parameters:
277/// * fetch: an async function that can fetch byte ranges
278/// * file_size: the total size of the parquet file
279/// * footer_size_hint: footer prefetch size (see comments below)
280///
281/// The length of the parquet footer, which contains file metadata, is not
282/// known up front. Therefore this function will first issue a request to read
283/// the last 8 bytes to determine the footer's precise length, before
284/// issuing a second request to fetch the metadata bytes
285///
286/// If `prefetch` is `Some`, this will read the specified number of bytes
287/// in the first request, instead of 8, and only issue further requests
288/// if additional bytes are needed. Providing a `prefetch` hint can therefore
289/// significantly reduce the number of `fetch` requests, and consequently latency
290#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
291pub async fn fetch_parquet_metadata<F, Fut>(
292    fetch: F,
293    file_size: usize,
294    prefetch: Option<usize>,
295) -> Result<ParquetMetaData>
296where
297    F: FnMut(Range<usize>) -> Fut + Send,
298    Fut: Future<Output = Result<Bytes>> + Send,
299{
300    let file_size = u64::try_from(file_size)?;
301    let fetch = MetadataFetchFn(fetch);
302    ParquetMetaDataReader::new()
303        .with_prefetch_hint(prefetch)
304        .load_and_finish(fetch, file_size)
305        .await
306}
307
308// these tests are all replicated in parquet::file::metadata::reader
309#[allow(deprecated)]
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use crate::file::reader::{FileReader, Length, SerializedFileReader};
314    use crate::util::test_common::file_util::get_test_file;
315    use std::fs::File;
316    use std::io::{Read, Seek, SeekFrom};
317    use std::sync::atomic::{AtomicUsize, Ordering};
318
319    fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
320        file.seek(SeekFrom::Start(range.start as _))?;
321        let len = range.end - range.start;
322        let mut buf = Vec::with_capacity(len);
323        file.take(len as _).read_to_end(&mut buf)?;
324        Ok(buf.into())
325    }
326
327    #[tokio::test]
328    async fn test_simple() {
329        let mut file = get_test_file("nulls.snappy.parquet");
330        let len = file.len() as usize;
331
332        let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
333        let expected = reader.metadata().file_metadata().schema();
334        let fetch_count = AtomicUsize::new(0);
335
336        let mut fetch = |range| {
337            fetch_count.fetch_add(1, Ordering::SeqCst);
338            futures::future::ready(read_range(&mut file, range))
339        };
340
341        let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
342        assert_eq!(actual.file_metadata().schema(), expected);
343        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
344
345        // Metadata hint too small - below footer size
346        fetch_count.store(0, Ordering::SeqCst);
347        let actual = fetch_parquet_metadata(&mut fetch, len, Some(7))
348            .await
349            .unwrap();
350        assert_eq!(actual.file_metadata().schema(), expected);
351        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
352
353        // Metadata hint too small
354        fetch_count.store(0, Ordering::SeqCst);
355        let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
356            .await
357            .unwrap();
358        assert_eq!(actual.file_metadata().schema(), expected);
359        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
360
361        // Metadata hint too large
362        fetch_count.store(0, Ordering::SeqCst);
363        let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
364            .await
365            .unwrap();
366        assert_eq!(actual.file_metadata().schema(), expected);
367        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
368
369        // Metadata hint exactly correct
370        fetch_count.store(0, Ordering::SeqCst);
371        let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
372            .await
373            .unwrap();
374        assert_eq!(actual.file_metadata().schema(), expected);
375        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
376
377        let err = fetch_parquet_metadata(&mut fetch, 4, None)
378            .await
379            .unwrap_err()
380            .to_string();
381        assert_eq!(err, "EOF: file size of 4 is less than footer");
382
383        let err = fetch_parquet_metadata(&mut fetch, 20, None)
384            .await
385            .unwrap_err()
386            .to_string();
387        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
388    }
389
390    #[tokio::test]
391    async fn test_page_index() {
392        let mut file = get_test_file("alltypes_tiny_pages.parquet");
393        let len = file.len() as usize;
394        let fetch_count = AtomicUsize::new(0);
395        let mut fetch = |range| {
396            fetch_count.fetch_add(1, Ordering::SeqCst);
397            futures::future::ready(read_range(&mut file, range))
398        };
399
400        let f = MetadataFetchFn(&mut fetch);
401        let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
402        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
403        loader.load_page_index(true, true).await.unwrap();
404        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
405        let metadata = loader.finish();
406        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
407
408        // Prefetch just footer exactly
409        fetch_count.store(0, Ordering::SeqCst);
410        let f = MetadataFetchFn(&mut fetch);
411        let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
412        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
413        loader.load_page_index(true, true).await.unwrap();
414        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
415        let metadata = loader.finish();
416        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
417
418        // Prefetch more than footer but not enough
419        fetch_count.store(0, Ordering::SeqCst);
420        let f = MetadataFetchFn(&mut fetch);
421        let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
422        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
423        loader.load_page_index(true, true).await.unwrap();
424        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
425        let metadata = loader.finish();
426        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
427
428        // Prefetch exactly enough
429        fetch_count.store(0, Ordering::SeqCst);
430        let f = MetadataFetchFn(&mut fetch);
431        let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
432        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
433        loader.load_page_index(true, true).await.unwrap();
434        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
435        let metadata = loader.finish();
436        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
437    }
438}