Skip to main content

arrow_avro/reader/async_reader/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::AvroError;
19use crate::reader::async_reader::AsyncFileReader;
20use bytes::Bytes;
21use futures::future::BoxFuture;
22use futures::{FutureExt, TryFutureExt};
23use object_store::ObjectStore;
24use object_store::ObjectStoreExt;
25use object_store::path::Path;
26use std::error::Error;
27use std::ops::Range;
28use std::sync::Arc;
29use tokio::runtime::Handle;
30
31/// An implementation of an AsyncFileReader using the [`ObjectStore`] API.
32pub struct AvroObjectReader {
33    store: Arc<dyn ObjectStore>,
34    path: Path,
35    runtime: Option<Handle>,
36}
37
38impl AvroObjectReader {
39    /// Creates a new [`Self`] from a store implementation and file location.
40    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
41        Self {
42            store,
43            path,
44            runtime: None,
45        }
46    }
47
48    /// Perform IO on the provided tokio runtime
49    ///
50    /// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner
51    /// to service IO. Therefore, running IO and CPU-bound tasks, such as avro decoding,
52    /// on the same tokio runtime can lead to degraded throughput, dropped connections and
53    /// other issues. For more information see [here].
54    ///
55    /// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
56    pub fn with_runtime(self, handle: Handle) -> Self {
57        Self {
58            runtime: Some(handle),
59            ..self
60        }
61    }
62
63    fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O, AvroError>>
64    where
65        F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
66            + Send
67            + 'static,
68        O: Send + 'static,
69        E: Error + Send + 'static,
70    {
71        match &self.runtime {
72            Some(handle) => {
73                let path = self.path.clone();
74                let store = Arc::clone(&self.store);
75                handle
76                    .spawn(async move { f(&store, &path).await })
77                    .map_ok_or_else(
78                        |e| match e.try_into_panic() {
79                            Err(e) => Err(AvroError::External(Box::new(e))),
80                            Ok(p) => std::panic::resume_unwind(p),
81                        },
82                        |res| res.map_err(|e| AvroError::General(e.to_string())),
83                    )
84                    .boxed()
85            }
86            None => f(&self.store, &self.path)
87                .map_err(|e| AvroError::General(e.to_string()))
88                .boxed(),
89        }
90    }
91}
92
93impl AsyncFileReader for AvroObjectReader {
94    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>> {
95        self.spawn(|store, path| async move { store.get_range(path, range).await }.boxed())
96    }
97
98    fn get_byte_ranges(
99        &mut self,
100        ranges: Vec<Range<u64>>,
101    ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>>
102    where
103        Self: Send,
104    {
105        self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
106    }
107}