parquet/arrow/async_reader/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{ops::Range, sync::Arc};
19
20use crate::arrow::arrow_reader::ArrowReaderOptions;
21use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
24use bytes::Bytes;
25use futures::{FutureExt, TryFutureExt, future::BoxFuture};
26use object_store::{GetOptions, GetRange};
27use object_store::{ObjectStore, path::Path};
28use tokio::runtime::Handle;
29
30/// Reads Parquet files in object storage using [`ObjectStore`].
31///
32/// ```no_run
33/// # use std::io::stdout;
34/// # use std::sync::Arc;
35/// # use object_store::azure::MicrosoftAzureBuilder;
36/// # use object_store::ObjectStore;
37/// # use object_store::path::Path;
38/// # use parquet::arrow::async_reader::ParquetObjectReader;
39/// # use parquet::arrow::ParquetRecordBatchStreamBuilder;
40/// # use parquet::schema::printer::print_parquet_metadata;
41/// # async fn run() {
42/// // Populate configuration from environment
43/// let storage_container = Arc::new(MicrosoftAzureBuilder::from_env().build().unwrap());
44/// let location = Path::from("path/to/blob.parquet");
45/// let meta = storage_container.head(&location).await.unwrap();
46/// println!("Found Blob with {}B at {}", meta.size, meta.location);
47///
48/// // Show Parquet metadata
49/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size);
50/// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
51/// print_parquet_metadata(&mut stdout(), builder.metadata());
52/// # }
53/// ```
54#[derive(Clone, Debug)]
55pub struct ParquetObjectReader {
56    store: Arc<dyn ObjectStore>,
57    path: Path,
58    file_size: Option<u64>,
59    metadata_size_hint: Option<usize>,
60    preload_column_index: bool,
61    preload_offset_index: bool,
62    runtime: Option<Handle>,
63}
64
65impl ParquetObjectReader {
66    /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`Path`].
67    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
68        Self {
69            store,
70            path,
71            file_size: None,
72            metadata_size_hint: None,
73            preload_column_index: false,
74            preload_offset_index: false,
75            runtime: None,
76        }
77    }
78
79    /// Provide a hint as to the size of the parquet file's footer,
80    /// see [`ParquetMetaDataReader::with_prefetch_hint`]
81    pub fn with_footer_size_hint(self, hint: usize) -> Self {
82        Self {
83            metadata_size_hint: Some(hint),
84            ..self
85        }
86    }
87
88    /// Provide the byte size of this file.
89    ///
90    /// If provided, the file size will ensure that only bounded range requests are used. If file
91    /// size is not provided, the reader will use suffix range requests to fetch the metadata.
92    ///
93    /// Providing this size up front is an important optimization to avoid extra calls when the
94    /// underlying store does not support suffix range requests.
95    ///
96    /// The file size can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`].
97    pub fn with_file_size(self, file_size: u64) -> Self {
98        Self {
99            file_size: Some(file_size),
100            ..self
101        }
102    }
103
104    /// Whether to load the Column Index as part of [`Self::get_metadata`]
105    ///
106    /// Note: This setting may be overridden by [`ArrowReaderOptions`] `page_index_policy`.
107    /// If `page_index_policy` is `Optional` or `Required`, it will take precedence
108    /// over this preload flag. When it is `Skip` (default), this flag is used.
109    pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
110        Self {
111            preload_column_index,
112            ..self
113        }
114    }
115
116    /// Whether to load the Offset Index as part of [`Self::get_metadata`]
117    ///
118    /// Note: This setting may be overridden by [`ArrowReaderOptions`] `page_index_policy`.
119    /// If `page_index_policy` is `Optional` or `Required`, it will take precedence
120    /// over this preload flag. When it is `Skip` (default), this flag is used.
121    pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
122        Self {
123            preload_offset_index,
124            ..self
125        }
126    }
127
128    /// Perform IO on the provided tokio runtime
129    ///
130    /// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner
131    /// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding,
132    /// on the same tokio runtime can lead to degraded throughput, dropped connections and
133    /// other issues. For more information see [here].
134    ///
135    /// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
136    pub fn with_runtime(self, handle: Handle) -> Self {
137        Self {
138            runtime: Some(handle),
139            ..self
140        }
141    }
142
143    fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
144    where
145        F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
146            + Send
147            + 'static,
148        O: Send + 'static,
149        E: Into<ParquetError> + Send + 'static,
150    {
151        match &self.runtime {
152            Some(handle) => {
153                let path = self.path.clone();
154                let store = Arc::clone(&self.store);
155                handle
156                    .spawn(async move { f(&store, &path).await })
157                    .map_ok_or_else(
158                        |e| match e.try_into_panic() {
159                            Err(e) => Err(ParquetError::External(Box::new(e))),
160                            Ok(p) => std::panic::resume_unwind(p),
161                        },
162                        |res| res.map_err(|e| e.into()),
163                    )
164                    .boxed()
165            }
166            None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
167        }
168    }
169}
170
171impl MetadataSuffixFetch for &mut ParquetObjectReader {
172    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
173        let options = GetOptions {
174            range: Some(GetRange::Suffix(suffix as u64)),
175            ..Default::default()
176        };
177        self.spawn(|store, path| {
178            async move {
179                let resp = store.get_opts(path, options).await?;
180                Ok::<_, ParquetError>(resp.bytes().await?)
181            }
182            .boxed()
183        })
184    }
185}
186
187impl AsyncFileReader for ParquetObjectReader {
188    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
189        self.spawn(|store, path| store.get_range(path, range))
190    }
191
192    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
193    where
194        Self: Send,
195    {
196        self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
197    }
198
199    // This method doesn't directly call `self.spawn` because all of the IO that is done down the
200    // line due to this method call is done through `self.get_bytes` and/or `self.get_byte_ranges`.
201    // When `self` is passed into `ParquetMetaDataReader::load_and_finish`, it treats it as
202    // an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of
203    // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to
204    // `Self::get_bytes`.
205    fn get_metadata<'a>(
206        &'a mut self,
207        options: Option<&'a ArrowReaderOptions>,
208    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
209        Box::pin(async move {
210            let metadata_opts = options.map(|o| o.metadata_options().clone());
211            let mut metadata = ParquetMetaDataReader::new()
212                .with_metadata_options(metadata_opts)
213                .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
214                .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
215                .with_prefetch_hint(self.metadata_size_hint);
216
217            #[cfg(feature = "encryption")]
218            if let Some(options) = options {
219                metadata = metadata.with_decryption_properties(
220                    options.file_decryption_properties.as_ref().map(Arc::clone),
221                );
222            }
223
224            // Override page index policies from ArrowReaderOptions if specified and not Skip.
225            // When page_index_policy is Skip (default), use the reader's preload flags.
226            // When page_index_policy is Optional or Required, override the preload flags
227            // to ensure the specified policy takes precedence.
228            if let Some(options) = options {
229                if options.column_index_policy() != PageIndexPolicy::Skip
230                    || options.offset_index_policy() != PageIndexPolicy::Skip
231                {
232                    metadata = metadata
233                        .with_column_index_policy(options.column_index_policy())
234                        .with_offset_index_policy(options.offset_index_policy());
235                }
236            }
237
238            let metadata = if let Some(file_size) = self.file_size {
239                metadata.load_and_finish(self, file_size).await?
240            } else {
241                metadata.load_via_suffix_and_finish(self).await?
242            };
243
244            Ok(Arc::new(metadata))
245        })
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use crate::arrow::async_reader::ArrowReaderOptions;
252    use crate::file::metadata::PageIndexPolicy;
253    use std::sync::{
254        Arc,
255        atomic::{AtomicUsize, Ordering},
256    };
257
258    use futures::TryStreamExt;
259
260    use crate::arrow::ParquetRecordBatchStreamBuilder;
261    use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
262    use crate::errors::ParquetError;
263    use arrow::util::test_util::parquet_test_data;
264    use futures::FutureExt;
265    use object_store::local::LocalFileSystem;
266    use object_store::path::Path;
267    use object_store::{ObjectMeta, ObjectStore};
268
269    async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
270        let res = parquet_test_data();
271        let store = LocalFileSystem::new_with_prefix(res).unwrap();
272
273        let meta = store
274            .head(&Path::from("alltypes_plain.parquet"))
275            .await
276            .unwrap();
277
278        (meta, Arc::new(store) as Arc<dyn ObjectStore>)
279    }
280
281    async fn get_meta_store_with_page_index() -> (ObjectMeta, Arc<dyn ObjectStore>) {
282        let res = parquet_test_data();
283        let store = LocalFileSystem::new_with_prefix(res).unwrap();
284
285        let meta = store
286            .head(&Path::from("alltypes_tiny_pages_plain.parquet"))
287            .await
288            .unwrap();
289
290        (meta, Arc::new(store) as Arc<dyn ObjectStore>)
291    }
292
293    #[tokio::test]
294    async fn test_simple() {
295        let (meta, store) = get_meta_store().await;
296        let object_reader =
297            ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
298
299        let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
300            .await
301            .unwrap();
302        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
303
304        assert_eq!(batches.len(), 1);
305        assert_eq!(batches[0].num_rows(), 8);
306    }
307
308    #[tokio::test]
309    async fn test_simple_without_file_length() {
310        let (meta, store) = get_meta_store().await;
311        let object_reader = ParquetObjectReader::new(store, meta.location);
312
313        let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
314            .await
315            .unwrap();
316        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
317
318        assert_eq!(batches.len(), 1);
319        assert_eq!(batches[0].num_rows(), 8);
320    }
321
322    #[tokio::test]
323    async fn test_not_found() {
324        let (mut meta, store) = get_meta_store().await;
325        meta.location = Path::from("I don't exist.parquet");
326
327        let object_reader =
328            ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
329        // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
330        match ParquetRecordBatchStreamBuilder::new(object_reader).await {
331            Ok(_) => panic!("expected failure"),
332            Err(e) => {
333                let err = e.to_string();
334                assert!(err.contains("I don't exist.parquet not found:"), "{err}",);
335            }
336        }
337    }
338
339    #[tokio::test]
340    async fn test_runtime_is_used() {
341        let num_actions = Arc::new(AtomicUsize::new(0));
342
343        let (a1, a2) = (num_actions.clone(), num_actions.clone());
344        let rt = tokio::runtime::Builder::new_multi_thread()
345            .on_thread_park(move || {
346                a1.fetch_add(1, Ordering::Relaxed);
347            })
348            .on_thread_unpark(move || {
349                a2.fetch_add(1, Ordering::Relaxed);
350            })
351            .build()
352            .unwrap();
353
354        let (meta, store) = get_meta_store().await;
355
356        let initial_actions = num_actions.load(Ordering::Relaxed);
357
358        let reader = ParquetObjectReader::new(store, meta.location)
359            .with_file_size(meta.size)
360            .with_runtime(rt.handle().clone());
361
362        let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
363        let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
364
365        // Just copied these assert_eqs from the `test_simple` above
366        assert_eq!(batches.len(), 1);
367        assert_eq!(batches[0].num_rows(), 8);
368
369        assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
370
371        // Runtimes have to be dropped in blocking contexts, so we need to move this one to a new
372        // blocking thread to drop it.
373        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
374    }
375
376    /// Unit test that `ParquetObjectReader::spawn`spawns on the provided runtime
377    #[tokio::test]
378    async fn test_runtime_thread_id_different() {
379        let rt = tokio::runtime::Builder::new_multi_thread()
380            .worker_threads(1)
381            .build()
382            .unwrap();
383
384        let (meta, store) = get_meta_store().await;
385
386        let reader = ParquetObjectReader::new(store, meta.location)
387            .with_file_size(meta.size)
388            .with_runtime(rt.handle().clone());
389
390        let current_id = std::thread::current().id();
391
392        let other_id = reader
393            .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
394            .await
395            .unwrap();
396
397        assert_ne!(current_id, other_id);
398
399        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
400    }
401
402    #[tokio::test]
403    async fn io_fails_on_shutdown_runtime() {
404        let rt = tokio::runtime::Builder::new_multi_thread()
405            .worker_threads(1)
406            .build()
407            .unwrap();
408
409        let (meta, store) = get_meta_store().await;
410
411        let mut reader = ParquetObjectReader::new(store, meta.location)
412            .with_file_size(meta.size)
413            .with_runtime(rt.handle().clone());
414
415        rt.shutdown_background();
416
417        let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
418
419        assert!(err.to_string().contains("was cancelled"));
420    }
421
422    #[tokio::test]
423    async fn test_page_index_policy_skip_uses_preload_true() {
424        let (meta, store) = get_meta_store_with_page_index().await;
425
426        // Create reader with preload flags set to true
427        let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
428            .with_file_size(meta.size)
429            .with_preload_column_index(true)
430            .with_preload_offset_index(true);
431
432        // Create options with page_index_policy set to Skip (default)
433        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
434
435        // Get metadata - Skip means use reader's preload flags (true)
436        let metadata = reader.get_metadata(Some(&options)).await.unwrap();
437
438        // With preload=true, indexes should be loaded since the test file has them
439        assert!(metadata.column_index().is_some());
440    }
441
442    #[tokio::test]
443    async fn test_page_index_policy_optional_overrides_preload_false() {
444        let (meta, store) = get_meta_store_with_page_index().await;
445
446        // Create reader with preload flags set to false
447        let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
448            .with_file_size(meta.size)
449            .with_preload_column_index(false)
450            .with_preload_offset_index(false);
451
452        // Create options with page_index_policy set to Optional
453        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Optional);
454
455        // Get metadata - Optional overrides preload flags and attempts to load indexes
456        let metadata = reader.get_metadata(Some(&options)).await.unwrap();
457
458        // With Optional policy, it will TRY to load indexes but won't fail if they don't exist
459        // The test file has page indexes, so they will be some
460        assert!(metadata.column_index().is_some());
461    }
462
463    #[tokio::test]
464    async fn test_page_index_policy_optional_vs_skip() {
465        let (meta, store) = get_meta_store_with_page_index().await;
466
467        // Test 1: preload=false + Skip policy -> uses preload flags (false)
468        let mut reader1 = ParquetObjectReader::new(store.clone(), meta.location.clone())
469            .with_file_size(meta.size)
470            .with_preload_column_index(false)
471            .with_preload_offset_index(false);
472
473        let options1 = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
474        let metadata1 = reader1.get_metadata(Some(&options1)).await.unwrap();
475
476        // Test 2: preload=false + Optional policy -> overrides to try loading
477        let mut reader2 = ParquetObjectReader::new(store.clone(), meta.location.clone())
478            .with_file_size(meta.size)
479            .with_preload_column_index(false)
480            .with_preload_offset_index(false);
481
482        let options2 = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Optional);
483        let metadata2 = reader2.get_metadata(Some(&options2)).await.unwrap();
484
485        // Both should succeed (no panic/error)
486        // metadata1 (Skip) uses preload=false -> Skip policy
487        // metadata2 (Optional) overrides preload=false -> Optional policy
488        assert!(metadata1.column_index().is_none());
489        assert!(metadata2.column_index().is_some());
490    }
491
492    #[tokio::test]
493    async fn test_page_index_policy_no_options_uses_preload() {
494        let (meta, store) = get_meta_store_with_page_index().await;
495
496        // Create reader with preload flags set to true
497        let mut reader = ParquetObjectReader::new(store, meta.location)
498            .with_file_size(meta.size)
499            .with_preload_column_index(true)
500            .with_preload_offset_index(true);
501
502        // Get metadata without options - should use reader's preload flags
503        let metadata = reader.get_metadata(None).await.unwrap();
504
505        // With no options provided, preload flags (true) should be respected
506        // and converted to Optional policy internally (preload=true -> Optional)
507        // The test file has page indexes, so they will be some
508        assert!(metadata.column_index().is_some() && metadata.column_index().is_some());
509    }
510}