arrow_avro/reader/async_reader/
builder.rs1use crate::codec::{AvroFieldBuilder, Tz};
19use crate::errors::AvroError;
20use crate::reader::async_reader::ReaderState;
21use crate::reader::header::{Header, HeaderDecoder, HeaderInfo};
22use crate::reader::record::RecordDecoder;
23use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
24use crate::schema::{AvroSchema, FingerprintAlgorithm};
25use indexmap::IndexMap;
26use std::ops::Range;
27
28const DEFAULT_HEADER_SIZE_HINT: u64 = 16 * 1024; pub struct ReaderBuilder<R> {
32 reader: R,
33 file_size: u64,
34 batch_size: usize,
35 range: Option<Range<u64>>,
36 reader_schema: Option<AvroSchema>,
37 projection: Option<Vec<usize>>,
38 header_size_hint: Option<u64>,
39 utf8_view: bool,
40 strict_mode: bool,
41 tz: Tz,
42}
43
44impl<R> ReaderBuilder<R> {
45 pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
46 Self {
47 reader,
48 file_size,
49 batch_size,
50 range: None,
51 reader_schema: None,
52 projection: None,
53 header_size_hint: None,
54 utf8_view: false,
55 strict_mode: false,
56 tz: Default::default(),
57 }
58 }
59
60 pub fn with_range(self, range: Range<u64>) -> Self {
66 Self {
67 range: Some(range),
68 ..self
69 }
70 }
71
72 pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
76 Self {
77 reader_schema: Some(reader_schema),
78 ..self
79 }
80 }
81
82 pub fn with_projection(self, projection: Vec<usize>) -> Self {
85 Self {
86 projection: Some(projection),
87 ..self
88 }
89 }
90
91 pub fn with_header_size_hint(self, hint: u64) -> Self {
94 Self {
95 header_size_hint: Some(hint),
96 ..self
97 }
98 }
99
100 pub fn with_utf8_view(self, utf8_view: bool) -> Self {
102 Self { utf8_view, ..self }
103 }
104
105 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
107 Self {
108 strict_mode,
109 ..self
110 }
111 }
112
113 pub fn with_tz(mut self, tz: Tz) -> Self {
117 self.tz = tz;
118 self
119 }
120}
121
122pub async fn read_header_info<R>(
126 reader: &mut R,
127 file_size: u64,
128 header_size_hint: Option<u64>,
129) -> Result<HeaderInfo, AvroError>
130where
131 R: AsyncFileReader,
132{
133 read_header(reader, file_size, header_size_hint)
134 .await
135 .map(|(header, header_len)| HeaderInfo::new(header, header_len))
136}
137
138async fn read_header<R>(
139 reader: &mut R,
140 file_size: u64,
141 header_size_hint: Option<u64>,
142) -> Result<(Header, u64), AvroError>
143where
144 R: AsyncFileReader,
145{
146 let mut decoder = HeaderDecoder::default();
147 let mut position = 0;
148 loop {
149 let range_to_fetch = position
150 ..(position + header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT)).min(file_size);
151
152 if range_to_fetch.is_empty() {
154 break;
155 }
156
157 let current_data = reader
158 .get_bytes(range_to_fetch.clone())
159 .await
160 .map_err(|err| {
161 AvroError::General(format!(
162 "Error fetching Avro header from file reader: {err}"
163 ))
164 })?;
165 if current_data.is_empty() {
166 return Err(AvroError::EOF(
167 "Unexpected EOF while fetching header data".into(),
168 ));
169 }
170
171 let read = current_data.len();
172 let decoded = decoder.decode(¤t_data)?;
173 if decoded != read {
174 position += decoded as u64;
175 break;
176 }
177 position += read as u64;
178 }
179
180 decoder
181 .flush()
182 .map(|header| (header, position))
183 .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into()))
184}
185
186impl<R: AsyncFileReader> ReaderBuilder<R> {
187 pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, AvroError> {
190 if self.file_size == 0 {
191 return Err(AvroError::InvalidArgument("File size cannot be 0".into()));
192 }
193
194 let header_info =
197 read_header_info(&mut self.reader, self.file_size, self.header_size_hint).await?;
198
199 self.build_with_header(header_info)
200 }
201
202 pub fn build_with_header(
210 self,
211 header_info: HeaderInfo,
212 ) -> Result<AsyncAvroFileReader<R>, AvroError> {
213 let writer_schema = header_info.writer_schema()?;
214
215 let projected_reader_schema = self
219 .projection
220 .as_deref()
221 .map(|projection| {
222 let base_schema = if let Some(reader_schema) = &self.reader_schema {
223 reader_schema
224 } else {
225 &writer_schema
226 };
227 base_schema.project(projection)
228 })
229 .transpose()?;
230
231 let effective_reader_schema = projected_reader_schema
234 .as_ref()
235 .or(self.reader_schema.as_ref())
236 .map(|s| s.schema())
237 .transpose()?;
238
239 let root = {
240 let writer_schema = writer_schema.schema()?;
241 let mut builder = AvroFieldBuilder::new(&writer_schema);
242 if let Some(reader_schema) = &effective_reader_schema {
243 builder = builder.with_reader_schema(reader_schema);
244 }
245 builder
246 .with_utf8view(self.utf8_view)
247 .with_strict_mode(self.strict_mode)
248 .with_tz(self.tz)
249 .build()
250 }?;
251
252 let record_decoder = RecordDecoder::try_new_with_options(root.data_type())?;
253 let decoder = Decoder::from_parts(
254 self.batch_size,
255 record_decoder,
256 None,
257 IndexMap::new(),
258 FingerprintAlgorithm::Rabin,
259 );
260 let header_len = header_info.header_len();
261 let range = match self.range {
262 Some(r) => {
263 let start = r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?);
268 let end = r.end.max(start).min(self.file_size); start..end
270 }
271 None => 0..self.file_size,
272 };
273
274 let reader_state = if range.start == range.end || header_len == range.end {
277 ReaderState::Finished
278 } else {
279 ReaderState::Idle {
280 reader: self.reader,
281 }
282 };
283
284 let codec = header_info.compression()?;
285 let sync_marker = header_info.sync();
286
287 Ok(AsyncAvroFileReader::new(
288 range,
289 self.file_size,
290 decoder,
291 codec,
292 sync_marker,
293 reader_state,
294 ))
295 }
296}