parquet/arrow/async_reader/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{ops::Range, sync::Arc};
19
20use crate::arrow::arrow_reader::ArrowReaderOptions;
21use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
24use bytes::Bytes;
25use futures::{FutureExt, TryFutureExt, future::BoxFuture};
26use object_store::{GetOptions, GetRange};
27use object_store::{ObjectStore, path::Path};
28use tokio::runtime::Handle;
29
30/// Reads Parquet files in object storage using [`ObjectStore`].
31///
32/// ```no_run
33/// # use std::io::stdout;
34/// # use std::sync::Arc;
35/// # use object_store::azure::MicrosoftAzureBuilder;
36/// # use object_store::ObjectStore;
37/// # use object_store::path::Path;
38/// # use parquet::arrow::async_reader::ParquetObjectReader;
39/// # use parquet::arrow::ParquetRecordBatchStreamBuilder;
40/// # use parquet::schema::printer::print_parquet_metadata;
41/// # async fn run() {
42/// // Populate configuration from environment
43/// let storage_container = Arc::new(MicrosoftAzureBuilder::from_env().build().unwrap());
44/// let location = Path::from("path/to/blob.parquet");
45/// let meta = storage_container.head(&location).await.unwrap();
46/// println!("Found Blob with {}B at {}", meta.size, meta.location);
47///
48/// // Show Parquet metadata
49/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size);
50/// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
51/// print_parquet_metadata(&mut stdout(), builder.metadata());
52/// # }
53/// ```
54#[derive(Clone, Debug)]
55pub struct ParquetObjectReader {
56    store: Arc<dyn ObjectStore>,
57    path: Path,
58    file_size: Option<u64>,
59    metadata_size_hint: Option<usize>,
60    preload_column_index: bool,
61    preload_offset_index: bool,
62    runtime: Option<Handle>,
63}
64
65impl ParquetObjectReader {
66    /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`Path`].
67    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
68        Self {
69            store,
70            path,
71            file_size: None,
72            metadata_size_hint: None,
73            preload_column_index: false,
74            preload_offset_index: false,
75            runtime: None,
76        }
77    }
78
79    /// Provide a hint as to the size of the parquet file's footer,
80    /// see [`ParquetMetaDataReader::with_prefetch_hint`]
81    pub fn with_footer_size_hint(self, hint: usize) -> Self {
82        Self {
83            metadata_size_hint: Some(hint),
84            ..self
85        }
86    }
87
88    /// Provide the byte size of this file.
89    ///
90    /// If provided, the file size will ensure that only bounded range requests are used. If file
91    /// size is not provided, the reader will use suffix range requests to fetch the metadata.
92    ///
93    /// Providing this size up front is an important optimization to avoid extra calls when the
94    /// underlying store does not support suffix range requests.
95    ///
96    /// The file size can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`].
97    pub fn with_file_size(self, file_size: u64) -> Self {
98        Self {
99            file_size: Some(file_size),
100            ..self
101        }
102    }
103
104    /// Whether to load the Column Index as part of [`Self::get_metadata`]
105    ///
106    /// Note: This setting may be overridden by [`ArrowReaderOptions`] `page_index_policy`.
107    /// If `page_index_policy` is `Optional` or `Required`, it will take precedence
108    /// over this preload flag. When it is `Skip` (default), this flag is used.
109    pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
110        Self {
111            preload_column_index,
112            ..self
113        }
114    }
115
116    /// Whether to load the Offset Index as part of [`Self::get_metadata`]
117    ///
118    /// Note: This setting may be overridden by [`ArrowReaderOptions`] `page_index_policy`.
119    /// If `page_index_policy` is `Optional` or `Required`, it will take precedence
120    /// over this preload flag. When it is `Skip` (default), this flag is used.
121    pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
122        Self {
123            preload_offset_index,
124            ..self
125        }
126    }
127
128    /// Perform IO on the provided tokio runtime
129    ///
130    /// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner
131    /// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding,
132    /// on the same tokio runtime can lead to degraded throughput, dropped connections and
133    /// other issues. For more information see [here].
134    ///
135    /// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
136    pub fn with_runtime(self, handle: Handle) -> Self {
137        Self {
138            runtime: Some(handle),
139            ..self
140        }
141    }
142
143    fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
144    where
145        F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
146            + Send
147            + 'static,
148        O: Send + 'static,
149        E: Into<ParquetError> + Send + 'static,
150    {
151        match &self.runtime {
152            Some(handle) => {
153                let path = self.path.clone();
154                let store = Arc::clone(&self.store);
155                handle
156                    .spawn(async move { f(&store, &path).await })
157                    .map_ok_or_else(
158                        |e| match e.try_into_panic() {
159                            Err(e) => Err(ParquetError::External(Box::new(e))),
160                            Ok(p) => std::panic::resume_unwind(p),
161                        },
162                        |res| res.map_err(|e| e.into()),
163                    )
164                    .boxed()
165            }
166            None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
167        }
168    }
169}
170
171impl MetadataSuffixFetch for &mut ParquetObjectReader {
172    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
173        let options = GetOptions {
174            range: Some(GetRange::Suffix(suffix as u64)),
175            ..Default::default()
176        };
177        self.spawn(|store, path| {
178            async move {
179                let resp = store.get_opts(path, options).await?;
180                Ok::<_, ParquetError>(resp.bytes().await?)
181            }
182            .boxed()
183        })
184    }
185}
186
187impl AsyncFileReader for ParquetObjectReader {
188    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
189        self.spawn(|store, path| store.get_range(path, range))
190    }
191
192    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
193    where
194        Self: Send,
195    {
196        self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
197    }
198
199    // This method doesn't directly call `self.spawn` because all of the IO that is done down the
200    // line due to this method call is done through `self.get_bytes` and/or `self.get_byte_ranges`.
201    // When `self` is passed into `ParquetMetaDataReader::load_and_finish`, it treats it as
202    // an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of
203    // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to
204    // `Self::get_bytes`.
205    fn get_metadata<'a>(
206        &'a mut self,
207        options: Option<&'a ArrowReaderOptions>,
208    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
209        Box::pin(async move {
210            let metadata_opts = options.map(|o| o.metadata_options().clone());
211            let mut metadata = ParquetMetaDataReader::new()
212                .with_metadata_options(metadata_opts)
213                .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
214                .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
215                .with_prefetch_hint(self.metadata_size_hint);
216
217            #[cfg(feature = "encryption")]
218            if let Some(options) = options {
219                metadata = metadata.with_decryption_properties(
220                    options.file_decryption_properties.as_ref().map(Arc::clone),
221                );
222            }
223
224            // Override page index policies from ArrowReaderOptions if specified and not Skip.
225            // When page_index_policy is Skip (default), use the reader's preload flags.
226            // When page_index_policy is Optional or Required, override the preload flags
227            // to ensure the specified policy takes precedence.
228            if let Some(options) = options {
229                if options.page_index_policy != PageIndexPolicy::Skip {
230                    metadata = metadata.with_page_index_policy(options.page_index_policy);
231                }
232            }
233
234            let metadata = if let Some(file_size) = self.file_size {
235                metadata.load_and_finish(self, file_size).await?
236            } else {
237                metadata.load_via_suffix_and_finish(self).await?
238            };
239
240            Ok(Arc::new(metadata))
241        })
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use crate::arrow::async_reader::ArrowReaderOptions;
248    use crate::file::metadata::PageIndexPolicy;
249    use std::sync::{
250        Arc,
251        atomic::{AtomicUsize, Ordering},
252    };
253
254    use futures::TryStreamExt;
255
256    use crate::arrow::ParquetRecordBatchStreamBuilder;
257    use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
258    use crate::errors::ParquetError;
259    use arrow::util::test_util::parquet_test_data;
260    use futures::FutureExt;
261    use object_store::local::LocalFileSystem;
262    use object_store::path::Path;
263    use object_store::{ObjectMeta, ObjectStore};
264
265    async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
266        let res = parquet_test_data();
267        let store = LocalFileSystem::new_with_prefix(res).unwrap();
268
269        let meta = store
270            .head(&Path::from("alltypes_plain.parquet"))
271            .await
272            .unwrap();
273
274        (meta, Arc::new(store) as Arc<dyn ObjectStore>)
275    }
276
277    async fn get_meta_store_with_page_index() -> (ObjectMeta, Arc<dyn ObjectStore>) {
278        let res = parquet_test_data();
279        let store = LocalFileSystem::new_with_prefix(res).unwrap();
280
281        let meta = store
282            .head(&Path::from("alltypes_tiny_pages_plain.parquet"))
283            .await
284            .unwrap();
285
286        (meta, Arc::new(store) as Arc<dyn ObjectStore>)
287    }
288
289    #[tokio::test]
290    async fn test_simple() {
291        let (meta, store) = get_meta_store().await;
292        let object_reader =
293            ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
294
295        let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
296            .await
297            .unwrap();
298        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
299
300        assert_eq!(batches.len(), 1);
301        assert_eq!(batches[0].num_rows(), 8);
302    }
303
304    #[tokio::test]
305    async fn test_simple_without_file_length() {
306        let (meta, store) = get_meta_store().await;
307        let object_reader = ParquetObjectReader::new(store, meta.location);
308
309        let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
310            .await
311            .unwrap();
312        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
313
314        assert_eq!(batches.len(), 1);
315        assert_eq!(batches[0].num_rows(), 8);
316    }
317
318    #[tokio::test]
319    async fn test_not_found() {
320        let (mut meta, store) = get_meta_store().await;
321        meta.location = Path::from("I don't exist.parquet");
322
323        let object_reader =
324            ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
325        // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
326        match ParquetRecordBatchStreamBuilder::new(object_reader).await {
327            Ok(_) => panic!("expected failure"),
328            Err(e) => {
329                let err = e.to_string();
330                assert!(err.contains("I don't exist.parquet not found:"), "{err}",);
331            }
332        }
333    }
334
335    #[tokio::test]
336    async fn test_runtime_is_used() {
337        let num_actions = Arc::new(AtomicUsize::new(0));
338
339        let (a1, a2) = (num_actions.clone(), num_actions.clone());
340        let rt = tokio::runtime::Builder::new_multi_thread()
341            .on_thread_park(move || {
342                a1.fetch_add(1, Ordering::Relaxed);
343            })
344            .on_thread_unpark(move || {
345                a2.fetch_add(1, Ordering::Relaxed);
346            })
347            .build()
348            .unwrap();
349
350        let (meta, store) = get_meta_store().await;
351
352        let initial_actions = num_actions.load(Ordering::Relaxed);
353
354        let reader = ParquetObjectReader::new(store, meta.location)
355            .with_file_size(meta.size)
356            .with_runtime(rt.handle().clone());
357
358        let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
359        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
360
361        // Just copied these assert_eqs from the `test_simple` above
362        assert_eq!(batches.len(), 1);
363        assert_eq!(batches[0].num_rows(), 8);
364
365        assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
366
367        // Runtimes have to be dropped in blocking contexts, so we need to move this one to a new
368        // blocking thread to drop it.
369        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
370    }
371
372    /// Unit test that `ParquetObjectReader::spawn`spawns on the provided runtime
373    #[tokio::test]
374    async fn test_runtime_thread_id_different() {
375        let rt = tokio::runtime::Builder::new_multi_thread()
376            .worker_threads(1)
377            .build()
378            .unwrap();
379
380        let (meta, store) = get_meta_store().await;
381
382        let reader = ParquetObjectReader::new(store, meta.location)
383            .with_file_size(meta.size)
384            .with_runtime(rt.handle().clone());
385
386        let current_id = std::thread::current().id();
387
388        let other_id = reader
389            .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
390            .await
391            .unwrap();
392
393        assert_ne!(current_id, other_id);
394
395        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
396    }
397
398    #[tokio::test]
399    async fn io_fails_on_shutdown_runtime() {
400        let rt = tokio::runtime::Builder::new_multi_thread()
401            .worker_threads(1)
402            .build()
403            .unwrap();
404
405        let (meta, store) = get_meta_store().await;
406
407        let mut reader = ParquetObjectReader::new(store, meta.location)
408            .with_file_size(meta.size)
409            .with_runtime(rt.handle().clone());
410
411        rt.shutdown_background();
412
413        let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
414
415        assert!(err.to_string().contains("was cancelled"));
416    }
417
418    #[tokio::test]
419    async fn test_page_index_policy_skip_uses_preload_true() {
420        let (meta, store) = get_meta_store_with_page_index().await;
421
422        // Create reader with preload flags set to true
423        let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
424            .with_file_size(meta.size)
425            .with_preload_column_index(true)
426            .with_preload_offset_index(true);
427
428        // Create options with page_index_policy set to Skip (default)
429        let mut options = ArrowReaderOptions::new();
430        options.page_index_policy = PageIndexPolicy::Skip;
431
432        // Get metadata - Skip means use reader's preload flags (true)
433        let metadata = reader.get_metadata(Some(&options)).await.unwrap();
434
435        // With preload=true, indexes should be loaded since the test file has them
436        assert!(metadata.column_index().is_some());
437    }
438
439    #[tokio::test]
440    async fn test_page_index_policy_optional_overrides_preload_false() {
441        let (meta, store) = get_meta_store_with_page_index().await;
442
443        // Create reader with preload flags set to false
444        let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
445            .with_file_size(meta.size)
446            .with_preload_column_index(false)
447            .with_preload_offset_index(false);
448
449        // Create options with page_index_policy set to Optional
450        let mut options = ArrowReaderOptions::new();
451        options.page_index_policy = PageIndexPolicy::Optional;
452
453        // Get metadata - Optional overrides preload flags and attempts to load indexes
454        let metadata = reader.get_metadata(Some(&options)).await.unwrap();
455
456        // With Optional policy, it will TRY to load indexes but won't fail if they don't exist
457        // The test file has page indexes, so they will be some
458        assert!(metadata.column_index().is_some());
459    }
460
461    #[tokio::test]
462    async fn test_page_index_policy_optional_vs_skip() {
463        let (meta, store) = get_meta_store_with_page_index().await;
464
465        // Test 1: preload=false + Skip policy -> uses preload flags (false)
466        let mut reader1 = ParquetObjectReader::new(store.clone(), meta.location.clone())
467            .with_file_size(meta.size)
468            .with_preload_column_index(false)
469            .with_preload_offset_index(false);
470
471        let mut options1 = ArrowReaderOptions::new();
472        options1.page_index_policy = PageIndexPolicy::Skip;
473        let metadata1 = reader1.get_metadata(Some(&options1)).await.unwrap();
474
475        // Test 2: preload=false + Optional policy -> overrides to try loading
476        let mut reader2 = ParquetObjectReader::new(store.clone(), meta.location.clone())
477            .with_file_size(meta.size)
478            .with_preload_column_index(false)
479            .with_preload_offset_index(false);
480
481        let mut options2 = ArrowReaderOptions::new();
482        options2.page_index_policy = PageIndexPolicy::Optional;
483        let metadata2 = reader2.get_metadata(Some(&options2)).await.unwrap();
484
485        // Both should succeed (no panic/error)
486        // metadata1 (Skip) uses preload=false -> Skip policy
487        // metadata2 (Optional) overrides preload=false -> Optional policy
488        assert!(metadata1.column_index().is_none());
489        assert!(metadata2.column_index().is_some());
490    }
491
492    #[tokio::test]
493    async fn test_page_index_policy_no_options_uses_preload() {
494        let (meta, store) = get_meta_store_with_page_index().await;
495
496        // Create reader with preload flags set to true
497        let mut reader = ParquetObjectReader::new(store, meta.location)
498            .with_file_size(meta.size)
499            .with_preload_column_index(true)
500            .with_preload_offset_index(true);
501
502        // Get metadata without options - should use reader's preload flags
503        let metadata = reader.get_metadata(None).await.unwrap();
504
505        // With no options provided, preload flags (true) should be respected
506        // and converted to Optional policy internally (preload=true -> Optional)
507        // The test file has page indexes, so they will be some
508        assert!(metadata.column_index().is_some() && metadata.column_index().is_some());
509    }
510}