parquet/arrow/async_reader/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::async_reader::AsyncFileReader;
19use crate::errors::{ParquetError, Result};
20use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
21use crate::file::page_index::index::Index;
22use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
23use crate::file::FOOTER_SIZE;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use futures::FutureExt;
27use std::future::Future;
28use std::ops::Range;
29
30/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
31///
32/// Note that implementation is provided for [`AsyncFileReader`].
33///
34/// # Example `MetadataFetch` for a custom async data source
35///
36/// ```rust
37/// # use parquet::errors::Result;
38/// # use parquet::arrow::async_reader::MetadataFetch;
39/// # use bytes::Bytes;
40/// # use std::ops::Range;
41/// # use std::io::SeekFrom;
42/// # use futures::future::BoxFuture;
43/// # use futures::FutureExt;
44/// # use tokio::io::{AsyncReadExt, AsyncSeekExt};
45/// // Adapter that implements the API for reading bytes from an async source (in
46/// // this case a tokio::fs::File)
47/// struct TokioFileMetadata {
48///     file: tokio::fs::File,
49/// }
50/// impl MetadataFetch for TokioFileMetadata {
51///     fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
52///         // return a future that fetches data in range
53///         async move {
54///             let mut buf = vec![0; range.len()]; // target buffer
55///             // seek to the start of the range and read the data
56///             self.file.seek(SeekFrom::Start(range.start as u64)).await?;
57///             self.file.read_exact(&mut buf).await?;
58///             Ok(Bytes::from(buf)) // convert to Bytes
59///         }
60///             .boxed() // turn into BoxedFuture, using FutureExt::boxed
61///     }
62/// }
63///```
64pub trait MetadataFetch {
65    /// Return a future that fetches the specified range of bytes asynchronously
66    ///
67    /// Note the returned type is a boxed future, often created by
68    /// [FutureExt::boxed]. See the trait documentation for an example
69    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
70}
71
72impl<T: AsyncFileReader> MetadataFetch for &mut T {
73    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
74        self.get_bytes(range)
75    }
76}
77
78/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`] via suffix
79/// requests, without knowing the file size
80pub trait MetadataSuffixFetch: MetadataFetch {
81    /// Return a future that fetches the last `n` bytes asynchronously
82    ///
83    /// Note the returned type is a boxed future, often created by
84    /// [FutureExt::boxed]. See the trait documentation for an example
85    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
86}
87
88/// An asynchronous interface to load [`ParquetMetaData`] from an async source
89pub struct MetadataLoader<F> {
90    /// Function that fetches byte ranges asynchronously
91    fetch: F,
92    /// The in-progress metadata
93    metadata: ParquetMetaData,
94    /// The offset and bytes of remaining unparsed data
95    remainder: Option<(usize, Bytes)>,
96}
97
98impl<F: MetadataFetch> MetadataLoader<F> {
99    /// Create a new [`MetadataLoader`] by reading the footer information
100    ///
101    /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
102    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
103    pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
104        if file_size < FOOTER_SIZE {
105            return Err(ParquetError::EOF(format!(
106                "file size of {file_size} is less than footer"
107            )));
108        }
109
110        // If a size hint is provided, read more than the minimum size
111        // to try and avoid a second fetch.
112        let footer_start = if let Some(size_hint) = prefetch {
113            // check for hint smaller than footer
114            let size_hint = std::cmp::max(size_hint, FOOTER_SIZE);
115            file_size.saturating_sub(size_hint)
116        } else {
117            file_size - FOOTER_SIZE
118        };
119
120        let suffix = fetch.fetch(footer_start..file_size).await?;
121        let suffix_len = suffix.len();
122
123        let mut footer = [0; FOOTER_SIZE];
124        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
125
126        let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
127        let length = footer.metadata_length();
128
129        if file_size < length + FOOTER_SIZE {
130            return Err(ParquetError::EOF(format!(
131                "file size of {} is less than footer + metadata {}",
132                file_size,
133                length + FOOTER_SIZE
134            )));
135        }
136
137        // Did not fetch the entire file metadata in the initial read, need to make a second request
138        let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
139            let metadata_start = file_size - length - FOOTER_SIZE;
140            let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
141            (ParquetMetaDataReader::decode_metadata(&meta)?, None)
142        } else {
143            let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
144
145            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
146            (
147                ParquetMetaDataReader::decode_metadata(slice)?,
148                Some((footer_start, suffix.slice(..metadata_start))),
149            )
150        };
151
152        Ok(Self {
153            fetch,
154            metadata,
155            remainder,
156        })
157    }
158
159    /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
160    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
161    pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
162        Self {
163            fetch,
164            metadata,
165            remainder: None,
166        }
167    }
168
169    /// Loads the page index, if any
170    ///
171    /// * `column_index`: if true will load column index
172    /// * `offset_index`: if true will load offset index
173    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
174    pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
175        if !column_index && !offset_index {
176            return Ok(());
177        }
178
179        let mut range = None;
180        for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
181            range = acc_range(range, c.column_index_range());
182            range = acc_range(range, c.offset_index_range());
183        }
184        let range = match range {
185            None => return Ok(()),
186            Some(range) => range,
187        };
188
189        let data = match &self.remainder {
190            Some((remainder_start, remainder)) if *remainder_start <= range.start => {
191                let offset = range.start - *remainder_start;
192                remainder.slice(offset..range.end - *remainder_start + offset)
193            }
194            // Note: this will potentially fetch data already in remainder, this keeps things simple
195            _ => self.fetch.fetch(range.start..range.end).await?,
196        };
197
198        // Sanity check
199        assert_eq!(data.len(), range.end - range.start);
200        let offset = range.start;
201
202        if column_index {
203            let index = self
204                .metadata
205                .row_groups()
206                .iter()
207                .map(|x| {
208                    x.columns()
209                        .iter()
210                        .map(|c| match c.column_index_range() {
211                            Some(r) => decode_column_index(
212                                &data[r.start - offset..r.end - offset],
213                                c.column_type(),
214                            ),
215                            None => Ok(Index::NONE),
216                        })
217                        .collect::<Result<Vec<_>>>()
218                })
219                .collect::<Result<Vec<_>>>()?;
220
221            self.metadata.set_column_index(Some(index));
222        }
223
224        if offset_index {
225            let index = self
226                .metadata
227                .row_groups()
228                .iter()
229                .map(|x| {
230                    x.columns()
231                        .iter()
232                        .map(|c| match c.offset_index_range() {
233                            Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
234                            None => Err(general_err!("missing offset index")),
235                        })
236                        .collect::<Result<Vec<_>>>()
237                })
238                .collect::<Result<Vec<_>>>()?;
239
240            self.metadata.set_offset_index(Some(index));
241        }
242
243        Ok(())
244    }
245
246    /// Returns the finished [`ParquetMetaData`]
247    pub fn finish(self) -> ParquetMetaData {
248        self.metadata
249    }
250}
251
252struct MetadataFetchFn<F>(F);
253
254impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
255where
256    F: FnMut(Range<usize>) -> Fut + Send,
257    Fut: Future<Output = Result<Bytes>> + Send,
258{
259    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
260        async move { self.0(range).await }.boxed()
261    }
262}
263
264/// Fetches parquet metadata
265///
266/// Parameters:
267/// * fetch: an async function that can fetch byte ranges
268/// * file_size: the total size of the parquet file
269/// * footer_size_hint: footer prefetch size (see comments below)
270///
271/// The length of the parquet footer, which contains file metadata, is not
272/// known up front. Therefore this function will first issue a request to read
273/// the last 8 bytes to determine the footer's precise length, before
274/// issuing a second request to fetch the metadata bytes
275///
276/// If `prefetch` is `Some`, this will read the specified number of bytes
277/// in the first request, instead of 8, and only issue further requests
278/// if additional bytes are needed. Providing a `prefetch` hint can therefore
279/// significantly reduce the number of `fetch` requests, and consequently latency
280#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
281pub async fn fetch_parquet_metadata<F, Fut>(
282    fetch: F,
283    file_size: usize,
284    prefetch: Option<usize>,
285) -> Result<ParquetMetaData>
286where
287    F: FnMut(Range<usize>) -> Fut + Send,
288    Fut: Future<Output = Result<Bytes>> + Send,
289{
290    let fetch = MetadataFetchFn(fetch);
291    ParquetMetaDataReader::new()
292        .with_prefetch_hint(prefetch)
293        .load_and_finish(fetch, file_size)
294        .await
295}
296
297// these tests are all replicated in parquet::file::metadata::reader
298#[allow(deprecated)]
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use crate::file::reader::{FileReader, Length, SerializedFileReader};
303    use crate::util::test_common::file_util::get_test_file;
304    use std::fs::File;
305    use std::io::{Read, Seek, SeekFrom};
306    use std::sync::atomic::{AtomicUsize, Ordering};
307
308    fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
309        file.seek(SeekFrom::Start(range.start as _))?;
310        let len = range.end - range.start;
311        let mut buf = Vec::with_capacity(len);
312        file.take(len as _).read_to_end(&mut buf)?;
313        Ok(buf.into())
314    }
315
316    #[tokio::test]
317    async fn test_simple() {
318        let mut file = get_test_file("nulls.snappy.parquet");
319        let len = file.len() as usize;
320
321        let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
322        let expected = reader.metadata().file_metadata().schema();
323        let fetch_count = AtomicUsize::new(0);
324
325        let mut fetch = |range| {
326            fetch_count.fetch_add(1, Ordering::SeqCst);
327            futures::future::ready(read_range(&mut file, range))
328        };
329
330        let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
331        assert_eq!(actual.file_metadata().schema(), expected);
332        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
333
334        // Metadata hint too small - below footer size
335        fetch_count.store(0, Ordering::SeqCst);
336        let actual = fetch_parquet_metadata(&mut fetch, len, Some(7))
337            .await
338            .unwrap();
339        assert_eq!(actual.file_metadata().schema(), expected);
340        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
341
342        // Metadata hint too small
343        fetch_count.store(0, Ordering::SeqCst);
344        let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
345            .await
346            .unwrap();
347        assert_eq!(actual.file_metadata().schema(), expected);
348        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
349
350        // Metadata hint too large
351        fetch_count.store(0, Ordering::SeqCst);
352        let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
353            .await
354            .unwrap();
355        assert_eq!(actual.file_metadata().schema(), expected);
356        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
357
358        // Metadata hint exactly correct
359        fetch_count.store(0, Ordering::SeqCst);
360        let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
361            .await
362            .unwrap();
363        assert_eq!(actual.file_metadata().schema(), expected);
364        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
365
366        let err = fetch_parquet_metadata(&mut fetch, 4, None)
367            .await
368            .unwrap_err()
369            .to_string();
370        assert_eq!(err, "EOF: file size of 4 is less than footer");
371
372        let err = fetch_parquet_metadata(&mut fetch, 20, None)
373            .await
374            .unwrap_err()
375            .to_string();
376        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
377    }
378
379    #[tokio::test]
380    async fn test_page_index() {
381        let mut file = get_test_file("alltypes_tiny_pages.parquet");
382        let len = file.len() as usize;
383        let fetch_count = AtomicUsize::new(0);
384        let mut fetch = |range| {
385            fetch_count.fetch_add(1, Ordering::SeqCst);
386            futures::future::ready(read_range(&mut file, range))
387        };
388
389        let f = MetadataFetchFn(&mut fetch);
390        let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
391        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
392        loader.load_page_index(true, true).await.unwrap();
393        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
394        let metadata = loader.finish();
395        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
396
397        // Prefetch just footer exactly
398        fetch_count.store(0, Ordering::SeqCst);
399        let f = MetadataFetchFn(&mut fetch);
400        let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
401        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
402        loader.load_page_index(true, true).await.unwrap();
403        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
404        let metadata = loader.finish();
405        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
406
407        // Prefetch more than footer but not enough
408        fetch_count.store(0, Ordering::SeqCst);
409        let f = MetadataFetchFn(&mut fetch);
410        let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
411        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
412        loader.load_page_index(true, true).await.unwrap();
413        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
414        let metadata = loader.finish();
415        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
416
417        // Prefetch exactly enough
418        fetch_count.store(0, Ordering::SeqCst);
419        let f = MetadataFetchFn(&mut fetch);
420        let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
421        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
422        loader.load_page_index(true, true).await.unwrap();
423        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
424        let metadata = loader.finish();
425        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
426    }
427}