arrow_avro/reader/async_reader/async_file_reader.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::AvroError;
19use bytes::Bytes;
20use futures::FutureExt;
21use futures::future::BoxFuture;
22use std::io::SeekFrom;
23use std::ops::Range;
24use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
25
26/// The asynchronous interface used by [`super::AsyncAvroFileReader`] to read avro files
27///
28/// Notes:
29///
30/// 1. There is a default implementation for types that implement [`AsyncRead`]
31/// and [`AsyncSeek`], for example [`tokio::fs::File`].
32///
33/// 2. [`super::AvroObjectReader`], available when the `object_store` crate feature
34/// is enabled, implements this interface for [`ObjectStore`].
35///
36/// [`ObjectStore`]: object_store::ObjectStore
37///
38/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
39pub trait AsyncFileReader: Send {
40 /// Retrieve the bytes in `range`
41 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>>;
42
43 /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
44 fn get_byte_ranges(
45 &mut self,
46 ranges: Vec<Range<u64>>,
47 ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
48 async move {
49 let mut result = Vec::with_capacity(ranges.len());
50
51 for range in ranges.into_iter() {
52 let data = self.get_bytes(range).await?;
53 result.push(data);
54 }
55
56 Ok(result)
57 }
58 .boxed()
59 }
60}
61
62/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
63impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
64 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>> {
65 self.as_mut().get_bytes(range)
66 }
67
68 fn get_byte_ranges(
69 &mut self,
70 ranges: Vec<Range<u64>>,
71 ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
72 self.as_mut().get_byte_ranges(ranges)
73 }
74}
75
76impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
77 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>> {
78 async move {
79 self.seek(SeekFrom::Start(range.start)).await?;
80
81 let to_read = range.end - range.start;
82 let mut buffer = Vec::with_capacity(to_read as usize);
83 let read = self.take(to_read).read_to_end(&mut buffer).await?;
84 if read as u64 != to_read {
85 return Err(AvroError::EOF(format!(
86 "expected to read {} bytes, got {}",
87 to_read, read
88 )));
89 }
90
91 Ok(buffer.into())
92 }
93 .boxed()
94 }
95}