arrow_avro/reader/async_reader/
builder.rs1use crate::codec::AvroFieldBuilder;
19use crate::errors::AvroError;
20use crate::reader::async_reader::ReaderState;
21use crate::reader::header::{Header, HeaderDecoder};
22use crate::reader::record::RecordDecoder;
23use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
24use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY};
25use indexmap::IndexMap;
26use std::ops::Range;
27
28const DEFAULT_HEADER_SIZE_HINT: u64 = 16 * 1024; pub struct ReaderBuilder<R> {
32 reader: R,
33 file_size: u64,
34 batch_size: usize,
35 range: Option<Range<u64>>,
36 reader_schema: Option<AvroSchema>,
37 projection: Option<Vec<usize>>,
38 header_size_hint: Option<u64>,
39 utf8_view: bool,
40 strict_mode: bool,
41}
42
43impl<R> ReaderBuilder<R> {
44 pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
45 Self {
46 reader,
47 file_size,
48 batch_size,
49 range: None,
50 reader_schema: None,
51 projection: None,
52 header_size_hint: None,
53 utf8_view: false,
54 strict_mode: false,
55 }
56 }
57
58 pub fn with_range(self, range: Range<u64>) -> Self {
64 Self {
65 range: Some(range),
66 ..self
67 }
68 }
69
70 pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
74 Self {
75 reader_schema: Some(reader_schema),
76 ..self
77 }
78 }
79
80 pub fn with_projection(self, projection: Vec<usize>) -> Self {
83 Self {
84 projection: Some(projection),
85 ..self
86 }
87 }
88
89 pub fn with_header_size_hint(self, hint: u64) -> Self {
92 Self {
93 header_size_hint: Some(hint),
94 ..self
95 }
96 }
97
98 pub fn with_utf8_view(self, utf8_view: bool) -> Self {
100 Self { utf8_view, ..self }
101 }
102
103 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
105 Self {
106 strict_mode,
107 ..self
108 }
109 }
110}
111
112impl<R: AsyncFileReader> ReaderBuilder<R> {
113 async fn read_header(&mut self) -> Result<(Header, u64), AvroError> {
114 let mut decoder = HeaderDecoder::default();
115 let mut position = 0;
116 loop {
117 let range_to_fetch = position
118 ..(position + self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
119 .min(self.file_size);
120
121 if range_to_fetch.is_empty() {
123 break;
124 }
125
126 let current_data = self
127 .reader
128 .get_bytes(range_to_fetch.clone())
129 .await
130 .map_err(|err| {
131 AvroError::General(format!(
132 "Error fetching Avro header from file reader: {err}"
133 ))
134 })?;
135 if current_data.is_empty() {
136 return Err(AvroError::EOF(
137 "Unexpected EOF while fetching header data".into(),
138 ));
139 }
140
141 let read = current_data.len();
142 let decoded = decoder.decode(¤t_data)?;
143 if decoded != read {
144 position += decoded as u64;
145 break;
146 }
147 position += read as u64;
148 }
149
150 decoder
151 .flush()
152 .map(|header| (header, position))
153 .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into()))
154 }
155
156 pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, AvroError> {
159 if self.file_size == 0 {
160 return Err(AvroError::InvalidArgument("File size cannot be 0".into()));
161 }
162
163 let (header, header_len) = self.read_header().await?;
166 let writer_schema = {
167 let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
168 AvroError::ParseError("No Avro schema present in file header".to_string())
169 })?;
170 let json_string = std::str::from_utf8(raw)
171 .map_err(|e| {
172 AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}"))
173 })?
174 .to_string();
175 AvroSchema::new(json_string)
176 };
177
178 let projected_reader_schema = self
182 .projection
183 .as_deref()
184 .map(|projection| {
185 let base_schema = if let Some(reader_schema) = &self.reader_schema {
186 reader_schema
187 } else {
188 &writer_schema
189 };
190 base_schema.project(projection)
191 })
192 .transpose()?;
193
194 let effective_reader_schema = projected_reader_schema
197 .as_ref()
198 .or(self.reader_schema.as_ref())
199 .map(|s| s.schema())
200 .transpose()?;
201
202 let root = {
203 let writer_schema = writer_schema.schema()?;
204 let mut builder = AvroFieldBuilder::new(&writer_schema);
205 if let Some(reader_schema) = &effective_reader_schema {
206 builder = builder.with_reader_schema(reader_schema);
207 }
208 builder
209 .with_utf8view(self.utf8_view)
210 .with_strict_mode(self.strict_mode)
211 .build()
212 }?;
213
214 let record_decoder = RecordDecoder::try_new_with_options(root.data_type())?;
215 let decoder = Decoder::from_parts(
216 self.batch_size,
217 record_decoder,
218 None,
219 IndexMap::new(),
220 FingerprintAlgorithm::Rabin,
221 );
222 let range = match self.range {
223 Some(r) => {
224 let start = r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?);
229 let end = r.end.max(start).min(self.file_size); start..end
231 }
232 None => 0..self.file_size,
233 };
234
235 let reader_state = if range.start == range.end || header_len == range.end {
238 ReaderState::Finished
239 } else {
240 ReaderState::Idle {
241 reader: self.reader,
242 }
243 };
244 let codec = header.compression()?;
245 let sync_marker = header.sync();
246
247 Ok(AsyncAvroFileReader::new(
248 range,
249 self.file_size,
250 decoder,
251 codec,
252 sync_marker,
253 reader_state,
254 ))
255 }
256}