parquet/arrow/async_reader/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{ops::Range, sync::Arc};
19
20use crate::arrow::arrow_reader::ArrowReaderOptions;
21use crate::arrow::async_reader::AsyncFileReader;
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
24use bytes::Bytes;
25use futures::{future::BoxFuture, FutureExt, TryFutureExt};
26use object_store::{path::Path, ObjectMeta, ObjectStore};
27use tokio::runtime::Handle;
28
29/// Reads Parquet files in object storage using [`ObjectStore`].
30///
31/// ```no_run
32/// # use std::io::stdout;
33/// # use std::sync::Arc;
34/// # use object_store::azure::MicrosoftAzureBuilder;
35/// # use object_store::ObjectStore;
36/// # use object_store::path::Path;
37/// # use parquet::arrow::async_reader::ParquetObjectReader;
38/// # use parquet::arrow::ParquetRecordBatchStreamBuilder;
39/// # use parquet::schema::printer::print_parquet_metadata;
40/// # async fn run() {
41/// // Populate configuration from environment
42/// let storage_container = Arc::new(MicrosoftAzureBuilder::from_env().build().unwrap());
43/// let location = Path::from("path/to/blob.parquet");
44/// let meta = storage_container.head(&location).await.unwrap();
45/// println!("Found Blob with {}B at {}", meta.size, meta.location);
46///
47/// // Show Parquet metadata
48/// let reader = ParquetObjectReader::new(storage_container, meta);
49/// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
50/// print_parquet_metadata(&mut stdout(), builder.metadata());
51/// # }
52/// ```
53#[derive(Clone, Debug)]
54pub struct ParquetObjectReader {
55    store: Arc<dyn ObjectStore>,
56    meta: ObjectMeta,
57    metadata_size_hint: Option<usize>,
58    preload_column_index: bool,
59    preload_offset_index: bool,
60    runtime: Option<Handle>,
61}
62
63impl ParquetObjectReader {
64    /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`]
65    ///
66    /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`]
67    pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
68        Self {
69            store,
70            meta,
71            metadata_size_hint: None,
72            preload_column_index: false,
73            preload_offset_index: false,
74            runtime: None,
75        }
76    }
77
78    /// Provide a hint as to the size of the parquet file's footer,
79    /// see [fetch_parquet_metadata](crate::arrow::async_reader::fetch_parquet_metadata)
80    pub fn with_footer_size_hint(self, hint: usize) -> Self {
81        Self {
82            metadata_size_hint: Some(hint),
83            ..self
84        }
85    }
86
87    /// Load the Column Index as part of [`Self::get_metadata`]
88    pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
89        Self {
90            preload_column_index,
91            ..self
92        }
93    }
94
95    /// Load the Offset Index as part of [`Self::get_metadata`]
96    pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
97        Self {
98            preload_offset_index,
99            ..self
100        }
101    }
102
103    /// Perform IO on the provided tokio runtime
104    ///
105    /// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner
106    /// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding,
107    /// on the same tokio runtime can lead to degraded throughput, dropped connections and
108    /// other issues. For more information see [here].
109    ///
110    /// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
111    pub fn with_runtime(self, handle: Handle) -> Self {
112        Self {
113            runtime: Some(handle),
114            ..self
115        }
116    }
117
118    fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
119    where
120        F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
121            + Send
122            + 'static,
123        O: Send + 'static,
124        E: Into<ParquetError> + Send + 'static,
125    {
126        match &self.runtime {
127            Some(handle) => {
128                let path = self.meta.location.clone();
129                let store = Arc::clone(&self.store);
130                handle
131                    .spawn(async move { f(&store, &path).await })
132                    .map_ok_or_else(
133                        |e| match e.try_into_panic() {
134                            Err(e) => Err(ParquetError::External(Box::new(e))),
135                            Ok(p) => std::panic::resume_unwind(p),
136                        },
137                        |res| res.map_err(|e| e.into()),
138                    )
139                    .boxed()
140            }
141            None => f(&self.store, &self.meta.location)
142                .map_err(|e| e.into())
143                .boxed(),
144        }
145    }
146}
147
148impl AsyncFileReader for ParquetObjectReader {
149    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
150        self.spawn(|store, path| store.get_range(path, range))
151    }
152
153    fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
154    where
155        Self: Send,
156    {
157        self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
158    }
159
160    // This method doesn't directly call `self.spawn` because all of the IO that is done down the
161    // line due to this method call is done through `self.get_bytes` and/or `self.get_byte_ranges`.
162    // When `self` is passed into `ParquetMetaDataReader::load_and_finish`, it treats it as
163    // an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of
164    // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to
165    // `Self::get_bytes`.
166    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
167        Box::pin(async move {
168            let file_size = self.meta.size;
169            let metadata = ParquetMetaDataReader::new()
170                .with_column_indexes(self.preload_column_index)
171                .with_offset_indexes(self.preload_offset_index)
172                .with_prefetch_hint(self.metadata_size_hint)
173                .load_and_finish(self, file_size)
174                .await?;
175            Ok(Arc::new(metadata))
176        })
177    }
178
179    fn get_metadata_with_options<'a>(
180        &'a mut self,
181        options: &'a ArrowReaderOptions,
182    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
183        Box::pin(async move {
184            let file_size = self.meta.size;
185            let metadata = ParquetMetaDataReader::new()
186                .with_column_indexes(self.preload_column_index)
187                .with_offset_indexes(self.preload_offset_index)
188                .with_prefetch_hint(self.metadata_size_hint);
189
190            #[cfg(feature = "encryption")]
191            let metadata =
192                metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
193
194            let metadata = metadata.load_and_finish(self, file_size).await?;
195
196            Ok(Arc::new(metadata))
197        })
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use std::sync::{
204        atomic::{AtomicUsize, Ordering},
205        Arc,
206    };
207
208    use futures::TryStreamExt;
209
210    use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
211    use crate::arrow::ParquetRecordBatchStreamBuilder;
212    use crate::errors::ParquetError;
213    use arrow::util::test_util::parquet_test_data;
214    use futures::FutureExt;
215    use object_store::local::LocalFileSystem;
216    use object_store::path::Path;
217    use object_store::{ObjectMeta, ObjectStore};
218
219    async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
220        let res = parquet_test_data();
221        let store = LocalFileSystem::new_with_prefix(res).unwrap();
222
223        let meta = store
224            .head(&Path::from("alltypes_plain.parquet"))
225            .await
226            .unwrap();
227
228        (meta, Arc::new(store) as Arc<dyn ObjectStore>)
229    }
230
231    #[tokio::test]
232    async fn test_simple() {
233        let (meta, store) = get_meta_store().await;
234        let object_reader = ParquetObjectReader::new(store, meta);
235
236        let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
237            .await
238            .unwrap();
239        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
240
241        assert_eq!(batches.len(), 1);
242        assert_eq!(batches[0].num_rows(), 8);
243    }
244
245    #[tokio::test]
246    async fn test_not_found() {
247        let (mut meta, store) = get_meta_store().await;
248        meta.location = Path::from("I don't exist.parquet");
249
250        let object_reader = ParquetObjectReader::new(store, meta);
251        // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
252        match ParquetRecordBatchStreamBuilder::new(object_reader).await {
253            Ok(_) => panic!("expected failure"),
254            Err(e) => {
255                let err = e.to_string();
256                assert!(
257                    err.contains("not found: No such file or directory (os error 2)"),
258                    "{err}",
259                );
260            }
261        }
262    }
263
264    #[tokio::test]
265    async fn test_runtime_is_used() {
266        let num_actions = Arc::new(AtomicUsize::new(0));
267
268        let (a1, a2) = (num_actions.clone(), num_actions.clone());
269        let rt = tokio::runtime::Builder::new_multi_thread()
270            .on_thread_park(move || {
271                a1.fetch_add(1, Ordering::Relaxed);
272            })
273            .on_thread_unpark(move || {
274                a2.fetch_add(1, Ordering::Relaxed);
275            })
276            .build()
277            .unwrap();
278
279        let (meta, store) = get_meta_store().await;
280
281        let initial_actions = num_actions.load(Ordering::Relaxed);
282
283        let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
284
285        let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
286        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
287
288        // Just copied these assert_eqs from the `test_simple` above
289        assert_eq!(batches.len(), 1);
290        assert_eq!(batches[0].num_rows(), 8);
291
292        assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
293
294        // Runtimes have to be dropped in blocking contexts, so we need to move this one to a new
295        // blocking thread to drop it.
296        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
297    }
298
299    /// Unit test that `ParquetObjectReader::spawn`spawns on the provided runtime
300    #[tokio::test]
301    async fn test_runtime_thread_id_different() {
302        let rt = tokio::runtime::Builder::new_multi_thread()
303            .worker_threads(1)
304            .build()
305            .unwrap();
306
307        let (meta, store) = get_meta_store().await;
308
309        let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
310
311        let current_id = std::thread::current().id();
312
313        let other_id = reader
314            .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
315            .await
316            .unwrap();
317
318        assert_ne!(current_id, other_id);
319
320        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
321    }
322
323    #[tokio::test]
324    async fn io_fails_on_shutdown_runtime() {
325        let rt = tokio::runtime::Builder::new_multi_thread()
326            .worker_threads(1)
327            .build()
328            .unwrap();
329
330        let (meta, store) = get_meta_store().await;
331
332        let mut reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
333
334        rt.shutdown_background();
335
336        let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
337
338        assert!(err.to_string().contains("was cancelled"));
339    }
340}