arrow_avro/reader/async_reader/store.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::AvroError;
19use crate::reader::async_reader::AsyncFileReader;
20use bytes::Bytes;
21use futures::future::BoxFuture;
22use futures::{FutureExt, TryFutureExt};
23use object_store::ObjectStore;
24use object_store::ObjectStoreExt;
25use object_store::path::Path;
26use std::error::Error;
27use std::ops::Range;
28use std::sync::Arc;
29use tokio::runtime::Handle;
30
31/// An implementation of an AsyncFileReader using the [`ObjectStore`] API.
32pub struct AvroObjectReader {
33 store: Arc<dyn ObjectStore>,
34 path: Path,
35 runtime: Option<Handle>,
36}
37
38impl AvroObjectReader {
39 /// Creates a new [`Self`] from a store implementation and file location.
40 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
41 Self {
42 store,
43 path,
44 runtime: None,
45 }
46 }
47
48 /// Perform IO on the provided tokio runtime
49 ///
50 /// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner
51 /// to service IO. Therefore, running IO and CPU-bound tasks, such as avro decoding,
52 /// on the same tokio runtime can lead to degraded throughput, dropped connections and
53 /// other issues. For more information see [here].
54 ///
55 /// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
56 pub fn with_runtime(self, handle: Handle) -> Self {
57 Self {
58 runtime: Some(handle),
59 ..self
60 }
61 }
62
63 fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O, AvroError>>
64 where
65 F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
66 + Send
67 + 'static,
68 O: Send + 'static,
69 E: Error + Send + 'static,
70 {
71 match &self.runtime {
72 Some(handle) => {
73 let path = self.path.clone();
74 let store = Arc::clone(&self.store);
75 handle
76 .spawn(async move { f(&store, &path).await })
77 .map_ok_or_else(
78 |e| match e.try_into_panic() {
79 Err(e) => Err(AvroError::External(Box::new(e))),
80 Ok(p) => std::panic::resume_unwind(p),
81 },
82 |res| res.map_err(|e| AvroError::General(e.to_string())),
83 )
84 .boxed()
85 }
86 None => f(&self.store, &self.path)
87 .map_err(|e| AvroError::General(e.to_string()))
88 .boxed(),
89 }
90 }
91}
92
93impl AsyncFileReader for AvroObjectReader {
94 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>> {
95 self.spawn(|store, path| async move { store.get_range(path, range).await }.boxed())
96 }
97
98 fn get_byte_ranges(
99 &mut self,
100 ranges: Vec<Range<u64>>,
101 ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>>
102 where
103 Self: Send,
104 {
105 self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
106 }
107}