parquet/arrow/async_reader/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{ops::Range, sync::Arc};
19
20use crate::arrow::arrow_reader::ArrowReaderOptions;
21use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
24use bytes::Bytes;
25use futures::{FutureExt, TryFutureExt, future::BoxFuture};
26use object_store::{GetOptions, GetRange};
27use object_store::{ObjectStore, path::Path};
28use tokio::runtime::Handle;
29
30/// Reads Parquet files in object storage using [`ObjectStore`].
31///
32/// ```no_run
33/// # use std::io::stdout;
34/// # use std::sync::Arc;
35/// # use object_store::azure::MicrosoftAzureBuilder;
36/// # use object_store::ObjectStore;
37/// # use object_store::path::Path;
38/// # use parquet::arrow::async_reader::ParquetObjectReader;
39/// # use parquet::arrow::ParquetRecordBatchStreamBuilder;
40/// # use parquet::schema::printer::print_parquet_metadata;
41/// # async fn run() {
42/// // Populate configuration from environment
43/// let storage_container = Arc::new(MicrosoftAzureBuilder::from_env().build().unwrap());
44/// let location = Path::from("path/to/blob.parquet");
45/// let meta = storage_container.head(&location).await.unwrap();
46/// println!("Found Blob with {}B at {}", meta.size, meta.location);
47///
48/// // Show Parquet metadata
49/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size);
50/// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
51/// print_parquet_metadata(&mut stdout(), builder.metadata());
52/// # }
53/// ```
54#[derive(Clone, Debug)]
55pub struct ParquetObjectReader {
56    store: Arc<dyn ObjectStore>,
57    path: Path,
58    file_size: Option<u64>,
59    metadata_size_hint: Option<usize>,
60    preload_column_index: bool,
61    preload_offset_index: bool,
62    runtime: Option<Handle>,
63}
64
65impl ParquetObjectReader {
66    /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`Path`].
67    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
68        Self {
69            store,
70            path,
71            file_size: None,
72            metadata_size_hint: None,
73            preload_column_index: false,
74            preload_offset_index: false,
75            runtime: None,
76        }
77    }
78
79    /// Provide a hint as to the size of the parquet file's footer,
80    /// see [`ParquetMetaDataReader::with_prefetch_hint`]
81    pub fn with_footer_size_hint(self, hint: usize) -> Self {
82        Self {
83            metadata_size_hint: Some(hint),
84            ..self
85        }
86    }
87
88    /// Provide the byte size of this file.
89    ///
90    /// If provided, the file size will ensure that only bounded range requests are used. If file
91    /// size is not provided, the reader will use suffix range requests to fetch the metadata.
92    ///
93    /// Providing this size up front is an important optimization to avoid extra calls when the
94    /// underlying store does not support suffix range requests.
95    ///
96    /// The file size can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`].
97    pub fn with_file_size(self, file_size: u64) -> Self {
98        Self {
99            file_size: Some(file_size),
100            ..self
101        }
102    }
103
104    /// Load the Column Index as part of [`Self::get_metadata`]
105    pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
106        Self {
107            preload_column_index,
108            ..self
109        }
110    }
111
112    /// Load the Offset Index as part of [`Self::get_metadata`]
113    pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
114        Self {
115            preload_offset_index,
116            ..self
117        }
118    }
119
120    /// Perform IO on the provided tokio runtime
121    ///
122    /// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner
123    /// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding,
124    /// on the same tokio runtime can lead to degraded throughput, dropped connections and
125    /// other issues. For more information see [here].
126    ///
127    /// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
128    pub fn with_runtime(self, handle: Handle) -> Self {
129        Self {
130            runtime: Some(handle),
131            ..self
132        }
133    }
134
135    fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
136    where
137        F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
138            + Send
139            + 'static,
140        O: Send + 'static,
141        E: Into<ParquetError> + Send + 'static,
142    {
143        match &self.runtime {
144            Some(handle) => {
145                let path = self.path.clone();
146                let store = Arc::clone(&self.store);
147                handle
148                    .spawn(async move { f(&store, &path).await })
149                    .map_ok_or_else(
150                        |e| match e.try_into_panic() {
151                            Err(e) => Err(ParquetError::External(Box::new(e))),
152                            Ok(p) => std::panic::resume_unwind(p),
153                        },
154                        |res| res.map_err(|e| e.into()),
155                    )
156                    .boxed()
157            }
158            None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
159        }
160    }
161}
162
163impl MetadataSuffixFetch for &mut ParquetObjectReader {
164    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
165        let options = GetOptions {
166            range: Some(GetRange::Suffix(suffix as u64)),
167            ..Default::default()
168        };
169        self.spawn(|store, path| {
170            async move {
171                let resp = store.get_opts(path, options).await?;
172                Ok::<_, ParquetError>(resp.bytes().await?)
173            }
174            .boxed()
175        })
176    }
177}
178
179impl AsyncFileReader for ParquetObjectReader {
180    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
181        self.spawn(|store, path| store.get_range(path, range))
182    }
183
184    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
185    where
186        Self: Send,
187    {
188        self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
189    }
190
191    // This method doesn't directly call `self.spawn` because all of the IO that is done down the
192    // line due to this method call is done through `self.get_bytes` and/or `self.get_byte_ranges`.
193    // When `self` is passed into `ParquetMetaDataReader::load_and_finish`, it treats it as
194    // an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of
195    // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to
196    // `Self::get_bytes`.
197    fn get_metadata<'a>(
198        &'a mut self,
199        options: Option<&'a ArrowReaderOptions>,
200    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
201        Box::pin(async move {
202            let mut metadata = ParquetMetaDataReader::new()
203                .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
204                .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
205                .with_prefetch_hint(self.metadata_size_hint);
206
207            #[cfg(feature = "encryption")]
208            if let Some(options) = options {
209                metadata = metadata.with_decryption_properties(
210                    options.file_decryption_properties.as_ref().map(Arc::clone),
211                );
212            }
213
214            let metadata = if let Some(file_size) = self.file_size {
215                metadata.load_and_finish(self, file_size).await?
216            } else {
217                metadata.load_via_suffix_and_finish(self).await?
218            };
219
220            Ok(Arc::new(metadata))
221        })
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use std::sync::{
228        Arc,
229        atomic::{AtomicUsize, Ordering},
230    };
231
232    use futures::TryStreamExt;
233
234    use crate::arrow::ParquetRecordBatchStreamBuilder;
235    use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
236    use crate::errors::ParquetError;
237    use arrow::util::test_util::parquet_test_data;
238    use futures::FutureExt;
239    use object_store::local::LocalFileSystem;
240    use object_store::path::Path;
241    use object_store::{ObjectMeta, ObjectStore};
242
243    async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
244        let res = parquet_test_data();
245        let store = LocalFileSystem::new_with_prefix(res).unwrap();
246
247        let meta = store
248            .head(&Path::from("alltypes_plain.parquet"))
249            .await
250            .unwrap();
251
252        (meta, Arc::new(store) as Arc<dyn ObjectStore>)
253    }
254
255    #[tokio::test]
256    async fn test_simple() {
257        let (meta, store) = get_meta_store().await;
258        let object_reader =
259            ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
260
261        let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
262            .await
263            .unwrap();
264        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
265
266        assert_eq!(batches.len(), 1);
267        assert_eq!(batches[0].num_rows(), 8);
268    }
269
270    #[tokio::test]
271    async fn test_simple_without_file_length() {
272        let (meta, store) = get_meta_store().await;
273        let object_reader = ParquetObjectReader::new(store, meta.location);
274
275        let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
276            .await
277            .unwrap();
278        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
279
280        assert_eq!(batches.len(), 1);
281        assert_eq!(batches[0].num_rows(), 8);
282    }
283
284    #[tokio::test]
285    async fn test_not_found() {
286        let (mut meta, store) = get_meta_store().await;
287        meta.location = Path::from("I don't exist.parquet");
288
289        let object_reader =
290            ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
291        // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
292        match ParquetRecordBatchStreamBuilder::new(object_reader).await {
293            Ok(_) => panic!("expected failure"),
294            Err(e) => {
295                let err = e.to_string();
296                assert!(err.contains("I don't exist.parquet not found:"), "{err}",);
297            }
298        }
299    }
300
301    #[tokio::test]
302    async fn test_runtime_is_used() {
303        let num_actions = Arc::new(AtomicUsize::new(0));
304
305        let (a1, a2) = (num_actions.clone(), num_actions.clone());
306        let rt = tokio::runtime::Builder::new_multi_thread()
307            .on_thread_park(move || {
308                a1.fetch_add(1, Ordering::Relaxed);
309            })
310            .on_thread_unpark(move || {
311                a2.fetch_add(1, Ordering::Relaxed);
312            })
313            .build()
314            .unwrap();
315
316        let (meta, store) = get_meta_store().await;
317
318        let initial_actions = num_actions.load(Ordering::Relaxed);
319
320        let reader = ParquetObjectReader::new(store, meta.location)
321            .with_file_size(meta.size)
322            .with_runtime(rt.handle().clone());
323
324        let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
325        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
326
327        // Just copied these assert_eqs from the `test_simple` above
328        assert_eq!(batches.len(), 1);
329        assert_eq!(batches[0].num_rows(), 8);
330
331        assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
332
333        // Runtimes have to be dropped in blocking contexts, so we need to move this one to a new
334        // blocking thread to drop it.
335        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
336    }
337
338    /// Unit test that `ParquetObjectReader::spawn`spawns on the provided runtime
339    #[tokio::test]
340    async fn test_runtime_thread_id_different() {
341        let rt = tokio::runtime::Builder::new_multi_thread()
342            .worker_threads(1)
343            .build()
344            .unwrap();
345
346        let (meta, store) = get_meta_store().await;
347
348        let reader = ParquetObjectReader::new(store, meta.location)
349            .with_file_size(meta.size)
350            .with_runtime(rt.handle().clone());
351
352        let current_id = std::thread::current().id();
353
354        let other_id = reader
355            .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
356            .await
357            .unwrap();
358
359        assert_ne!(current_id, other_id);
360
361        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
362    }
363
364    #[tokio::test]
365    async fn io_fails_on_shutdown_runtime() {
366        let rt = tokio::runtime::Builder::new_multi_thread()
367            .worker_threads(1)
368            .build()
369            .unwrap();
370
371        let (meta, store) = get_meta_store().await;
372
373        let mut reader = ParquetObjectReader::new(store, meta.location)
374            .with_file_size(meta.size)
375            .with_runtime(rt.handle().clone());
376
377        rt.shutdown_background();
378
379        let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
380
381        assert!(err.to_string().contains("was cancelled"));
382    }
383}