Skip to main content

arrow_avro/reader/async_reader/
async_file_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::AvroError;
19use bytes::Bytes;
20use futures::FutureExt;
21use futures::future::BoxFuture;
22use std::io::SeekFrom;
23use std::ops::Range;
24use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
25
26/// The asynchronous interface used by [`super::AsyncAvroFileReader`] to read avro files
27///
28/// Notes:
29///
30/// 1. There is a default implementation for types that implement [`AsyncRead`]
31///    and [`AsyncSeek`], for example [`tokio::fs::File`].
32///
33/// 2. [`super::AvroObjectReader`], available when the `object_store` crate feature
34///    is enabled, implements this interface for [`ObjectStore`].
35///
36/// [`ObjectStore`]: object_store::ObjectStore
37///
38/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
39pub trait AsyncFileReader: Send {
40    /// Retrieve the bytes in `range`
41    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>>;
42
43    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
44    fn get_byte_ranges(
45        &mut self,
46        ranges: Vec<Range<u64>>,
47    ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
48        async move {
49            let mut result = Vec::with_capacity(ranges.len());
50
51            for range in ranges.into_iter() {
52                let data = self.get_bytes(range).await?;
53                result.push(data);
54            }
55
56            Ok(result)
57        }
58        .boxed()
59    }
60}
61
62/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
63impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
64    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>> {
65        self.as_mut().get_bytes(range)
66    }
67
68    fn get_byte_ranges(
69        &mut self,
70        ranges: Vec<Range<u64>>,
71    ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
72        self.as_mut().get_byte_ranges(ranges)
73    }
74}
75
76impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
77    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>> {
78        async move {
79            self.seek(SeekFrom::Start(range.start)).await?;
80
81            let to_read = range.end - range.start;
82            let mut buffer = Vec::with_capacity(to_read as usize);
83            let read = self.take(to_read).read_to_end(&mut buffer).await?;
84            if read as u64 != to_read {
85                return Err(AvroError::EOF(format!(
86                    "expected to read {} bytes, got {}",
87                    to_read, read
88                )));
89            }
90
91            Ok(buffer.into())
92        }
93        .boxed()
94    }
95}