parquet/arrow/async_reader/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::async_reader::AsyncFileReader;
19use crate::errors::{ParquetError, Result};
20use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
21use crate::file::page_index::index::Index;
22use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
23use crate::file::FOOTER_SIZE;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use futures::FutureExt;
27use std::future::Future;
28use std::ops::Range;
29
30/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
31///
32/// Note that implementation is provided for [`AsyncFileReader`].
33///
34/// # Example `MetadataFetch` for a custom async data source
35///
36/// ```rust
37/// # use parquet::errors::Result;
38/// # use parquet::arrow::async_reader::MetadataFetch;
39/// # use bytes::Bytes;
40/// # use std::ops::Range;
41/// # use std::io::SeekFrom;
42/// # use futures::future::BoxFuture;
43/// # use futures::FutureExt;
44/// # use tokio::io::{AsyncReadExt, AsyncSeekExt};
45/// // Adapter that implements the API for reading bytes from an async source (in
46/// // this case a tokio::fs::File)
47/// struct TokioFileMetadata {
48///     file: tokio::fs::File,
49/// }
50/// impl MetadataFetch for TokioFileMetadata {
51///     fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
52///         // return a future that fetches data in range
53///         async move {
54///             let mut buf = vec![0; range.len()]; // target buffer
55///             // seek to the start of the range and read the data
56///             self.file.seek(SeekFrom::Start(range.start as u64)).await?;
57///             self.file.read_exact(&mut buf).await?;
58///             Ok(Bytes::from(buf)) // convert to Bytes
59///         }
60///             .boxed() // turn into BoxedFuture, using FutureExt::boxed
61///     }
62/// }
63///```
64pub trait MetadataFetch {
65    /// Return a future that fetches the specified range of bytes asynchronously
66    ///
67    /// Note the returned type is a boxed future, often created by
68    /// [FutureExt::boxed]. See the trait documentation for an example
69    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
70}
71
72impl<T: AsyncFileReader> MetadataFetch for &mut T {
73    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
74        self.get_bytes(range)
75    }
76}
77
78/// An asynchronous interface to load [`ParquetMetaData`] from an async source
79pub struct MetadataLoader<F> {
80    /// Function that fetches byte ranges asynchronously
81    fetch: F,
82    /// The in-progress metadata
83    metadata: ParquetMetaData,
84    /// The offset and bytes of remaining unparsed data
85    remainder: Option<(usize, Bytes)>,
86}
87
88impl<F: MetadataFetch> MetadataLoader<F> {
89    /// Create a new [`MetadataLoader`] by reading the footer information
90    ///
91    /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
92    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
93    pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
94        if file_size < FOOTER_SIZE {
95            return Err(ParquetError::EOF(format!(
96                "file size of {file_size} is less than footer"
97            )));
98        }
99
100        // If a size hint is provided, read more than the minimum size
101        // to try and avoid a second fetch.
102        let footer_start = if let Some(size_hint) = prefetch {
103            // check for hint smaller than footer
104            let size_hint = std::cmp::max(size_hint, FOOTER_SIZE);
105            file_size.saturating_sub(size_hint)
106        } else {
107            file_size - FOOTER_SIZE
108        };
109
110        let suffix = fetch.fetch(footer_start..file_size).await?;
111        let suffix_len = suffix.len();
112
113        let mut footer = [0; FOOTER_SIZE];
114        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
115
116        let length = ParquetMetaDataReader::decode_footer(&footer)?;
117
118        if file_size < length + FOOTER_SIZE {
119            return Err(ParquetError::EOF(format!(
120                "file size of {} is less than footer + metadata {}",
121                file_size,
122                length + FOOTER_SIZE
123            )));
124        }
125
126        // Did not fetch the entire file metadata in the initial read, need to make a second request
127        let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
128            let metadata_start = file_size - length - FOOTER_SIZE;
129            let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
130            (ParquetMetaDataReader::decode_metadata(&meta)?, None)
131        } else {
132            let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
133
134            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
135            (
136                ParquetMetaDataReader::decode_metadata(slice)?,
137                Some((footer_start, suffix.slice(..metadata_start))),
138            )
139        };
140
141        Ok(Self {
142            fetch,
143            metadata,
144            remainder,
145        })
146    }
147
148    /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
149    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
150    pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
151        Self {
152            fetch,
153            metadata,
154            remainder: None,
155        }
156    }
157
158    /// Loads the page index, if any
159    ///
160    /// * `column_index`: if true will load column index
161    /// * `offset_index`: if true will load offset index
162    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
163    pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
164        if !column_index && !offset_index {
165            return Ok(());
166        }
167
168        let mut range = None;
169        for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
170            range = acc_range(range, c.column_index_range());
171            range = acc_range(range, c.offset_index_range());
172        }
173        let range = match range {
174            None => return Ok(()),
175            Some(range) => range,
176        };
177
178        let data = match &self.remainder {
179            Some((remainder_start, remainder)) if *remainder_start <= range.start => {
180                let offset = range.start - *remainder_start;
181                remainder.slice(offset..range.end - *remainder_start + offset)
182            }
183            // Note: this will potentially fetch data already in remainder, this keeps things simple
184            _ => self.fetch.fetch(range.start..range.end).await?,
185        };
186
187        // Sanity check
188        assert_eq!(data.len(), range.end - range.start);
189        let offset = range.start;
190
191        if column_index {
192            let index = self
193                .metadata
194                .row_groups()
195                .iter()
196                .map(|x| {
197                    x.columns()
198                        .iter()
199                        .map(|c| match c.column_index_range() {
200                            Some(r) => decode_column_index(
201                                &data[r.start - offset..r.end - offset],
202                                c.column_type(),
203                            ),
204                            None => Ok(Index::NONE),
205                        })
206                        .collect::<Result<Vec<_>>>()
207                })
208                .collect::<Result<Vec<_>>>()?;
209
210            self.metadata.set_column_index(Some(index));
211        }
212
213        if offset_index {
214            let index = self
215                .metadata
216                .row_groups()
217                .iter()
218                .map(|x| {
219                    x.columns()
220                        .iter()
221                        .map(|c| match c.offset_index_range() {
222                            Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
223                            None => Err(general_err!("missing offset index")),
224                        })
225                        .collect::<Result<Vec<_>>>()
226                })
227                .collect::<Result<Vec<_>>>()?;
228
229            self.metadata.set_offset_index(Some(index));
230        }
231
232        Ok(())
233    }
234
235    /// Returns the finished [`ParquetMetaData`]
236    pub fn finish(self) -> ParquetMetaData {
237        self.metadata
238    }
239}
240
241struct MetadataFetchFn<F>(F);
242
243impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
244where
245    F: FnMut(Range<usize>) -> Fut + Send,
246    Fut: Future<Output = Result<Bytes>> + Send,
247{
248    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
249        async move { self.0(range).await }.boxed()
250    }
251}
252
253/// Fetches parquet metadata
254///
255/// Parameters:
256/// * fetch: an async function that can fetch byte ranges
257/// * file_size: the total size of the parquet file
258/// * footer_size_hint: footer prefetch size (see comments below)
259///
260/// The length of the parquet footer, which contains file metadata, is not
261/// known up front. Therefore this function will first issue a request to read
262/// the last 8 bytes to determine the footer's precise length, before
263/// issuing a second request to fetch the metadata bytes
264///
265/// If `prefetch` is `Some`, this will read the specified number of bytes
266/// in the first request, instead of 8, and only issue further requests
267/// if additional bytes are needed. Providing a `prefetch` hint can therefore
268/// significantly reduce the number of `fetch` requests, and consequently latency
269#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
270pub async fn fetch_parquet_metadata<F, Fut>(
271    fetch: F,
272    file_size: usize,
273    prefetch: Option<usize>,
274) -> Result<ParquetMetaData>
275where
276    F: FnMut(Range<usize>) -> Fut + Send,
277    Fut: Future<Output = Result<Bytes>> + Send,
278{
279    let fetch = MetadataFetchFn(fetch);
280    ParquetMetaDataReader::new()
281        .with_prefetch_hint(prefetch)
282        .load_and_finish(fetch, file_size)
283        .await
284}
285
286// these tests are all replicated in parquet::file::metadata::reader
287#[allow(deprecated)]
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use crate::file::reader::{FileReader, Length, SerializedFileReader};
292    use crate::util::test_common::file_util::get_test_file;
293    use std::fs::File;
294    use std::io::{Read, Seek, SeekFrom};
295    use std::sync::atomic::{AtomicUsize, Ordering};
296
297    fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
298        file.seek(SeekFrom::Start(range.start as _))?;
299        let len = range.end - range.start;
300        let mut buf = Vec::with_capacity(len);
301        file.take(len as _).read_to_end(&mut buf)?;
302        Ok(buf.into())
303    }
304
305    #[tokio::test]
306    async fn test_simple() {
307        let mut file = get_test_file("nulls.snappy.parquet");
308        let len = file.len() as usize;
309
310        let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
311        let expected = reader.metadata().file_metadata().schema();
312        let fetch_count = AtomicUsize::new(0);
313
314        let mut fetch = |range| {
315            fetch_count.fetch_add(1, Ordering::SeqCst);
316            futures::future::ready(read_range(&mut file, range))
317        };
318
319        let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
320        assert_eq!(actual.file_metadata().schema(), expected);
321        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
322
323        // Metadata hint too small - below footer size
324        fetch_count.store(0, Ordering::SeqCst);
325        let actual = fetch_parquet_metadata(&mut fetch, len, Some(7))
326            .await
327            .unwrap();
328        assert_eq!(actual.file_metadata().schema(), expected);
329        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
330
331        // Metadata hint too small
332        fetch_count.store(0, Ordering::SeqCst);
333        let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
334            .await
335            .unwrap();
336        assert_eq!(actual.file_metadata().schema(), expected);
337        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
338
339        // Metadata hint too large
340        fetch_count.store(0, Ordering::SeqCst);
341        let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
342            .await
343            .unwrap();
344        assert_eq!(actual.file_metadata().schema(), expected);
345        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
346
347        // Metadata hint exactly correct
348        fetch_count.store(0, Ordering::SeqCst);
349        let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
350            .await
351            .unwrap();
352        assert_eq!(actual.file_metadata().schema(), expected);
353        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
354
355        let err = fetch_parquet_metadata(&mut fetch, 4, None)
356            .await
357            .unwrap_err()
358            .to_string();
359        assert_eq!(err, "EOF: file size of 4 is less than footer");
360
361        let err = fetch_parquet_metadata(&mut fetch, 20, None)
362            .await
363            .unwrap_err()
364            .to_string();
365        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
366    }
367
368    #[tokio::test]
369    async fn test_page_index() {
370        let mut file = get_test_file("alltypes_tiny_pages.parquet");
371        let len = file.len() as usize;
372        let fetch_count = AtomicUsize::new(0);
373        let mut fetch = |range| {
374            fetch_count.fetch_add(1, Ordering::SeqCst);
375            futures::future::ready(read_range(&mut file, range))
376        };
377
378        let f = MetadataFetchFn(&mut fetch);
379        let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
380        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
381        loader.load_page_index(true, true).await.unwrap();
382        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
383        let metadata = loader.finish();
384        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
385
386        // Prefetch just footer exactly
387        fetch_count.store(0, Ordering::SeqCst);
388        let f = MetadataFetchFn(&mut fetch);
389        let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
390        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
391        loader.load_page_index(true, true).await.unwrap();
392        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
393        let metadata = loader.finish();
394        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
395
396        // Prefetch more than footer but not enough
397        fetch_count.store(0, Ordering::SeqCst);
398        let f = MetadataFetchFn(&mut fetch);
399        let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
400        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
401        loader.load_page_index(true, true).await.unwrap();
402        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
403        let metadata = loader.finish();
404        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
405
406        // Prefetch exactly enough
407        fetch_count.store(0, Ordering::SeqCst);
408        let f = MetadataFetchFn(&mut fetch);
409        let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
410        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
411        loader.load_page_index(true, true).await.unwrap();
412        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
413        let metadata = loader.finish();
414        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
415    }
416}