Skip to main content

arrow_avro/reader/async_reader/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::codec::{AvroFieldBuilder, Tz};
19use crate::errors::AvroError;
20use crate::reader::async_reader::ReaderState;
21use crate::reader::header::{Header, HeaderDecoder, HeaderInfo};
22use crate::reader::record::RecordDecoder;
23use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
24use crate::schema::{AvroSchema, FingerprintAlgorithm};
25use indexmap::IndexMap;
26use std::ops::Range;
27
28const DEFAULT_HEADER_SIZE_HINT: u64 = 16 * 1024; // 16 KB
29
30/// Builder for an asynchronous Avro file reader.
31pub struct ReaderBuilder<R> {
32    reader: R,
33    file_size: u64,
34    batch_size: usize,
35    range: Option<Range<u64>>,
36    reader_schema: Option<AvroSchema>,
37    projection: Option<Vec<usize>>,
38    header_size_hint: Option<u64>,
39    utf8_view: bool,
40    strict_mode: bool,
41    tz: Tz,
42}
43
44impl<R> ReaderBuilder<R> {
45    pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
46        Self {
47            reader,
48            file_size,
49            batch_size,
50            range: None,
51            reader_schema: None,
52            projection: None,
53            header_size_hint: None,
54            utf8_view: false,
55            strict_mode: false,
56            tz: Default::default(),
57        }
58    }
59
60    /// Specify a byte range to read from the Avro file.
61    /// If this is provided, the reader will read all the blocks within the specified range,
62    /// if the range ends mid-block, it will attempt to fetch the remaining bytes to complete the block,
63    /// but no further blocks will be read.
64    /// If this is omitted, the full file will be read.
65    pub fn with_range(self, range: Range<u64>) -> Self {
66        Self {
67            range: Some(range),
68            ..self
69        }
70    }
71
72    /// Specify a reader schema to use when reading the Avro file.
73    /// This can be useful to project specific columns or handle schema evolution.
74    /// If this is not provided, the schema will be derived from the Arrow schema provided.
75    pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
76        Self {
77            reader_schema: Some(reader_schema),
78            ..self
79        }
80    }
81
82    /// Specify a projection of column indices to read from the Avro file.
83    /// This can help optimize reading by only fetching the necessary columns.
84    pub fn with_projection(self, projection: Vec<usize>) -> Self {
85        Self {
86            projection: Some(projection),
87            ..self
88        }
89    }
90
91    /// Provide a hint for the expected size of the Avro header in bytes.
92    /// This can help optimize the initial read operation when fetching the header.
93    pub fn with_header_size_hint(self, hint: u64) -> Self {
94        Self {
95            header_size_hint: Some(hint),
96            ..self
97        }
98    }
99
100    /// Enable usage of Utf8View types when reading string data.
101    pub fn with_utf8_view(self, utf8_view: bool) -> Self {
102        Self { utf8_view, ..self }
103    }
104
105    /// Enable strict mode for schema validation and data reading.
106    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
107        Self {
108            strict_mode,
109            ..self
110        }
111    }
112
113    /// Sets the timezone representation for Avro timestamp fields.
114    ///
115    /// The default is `Tz::OffsetZero`, meaning the "+00:00" time zone ID.
116    pub fn with_tz(mut self, tz: Tz) -> Self {
117        self.tz = tz;
118        self
119    }
120}
121
122/// Reads the Avro file header (magic, metadata, sync marker) asynchronously from `reader`.
123///
124/// On success, returns the parsed [`HeaderInfo`] containing the header and its length in bytes.
125pub async fn read_header_info<R>(
126    reader: &mut R,
127    file_size: u64,
128    header_size_hint: Option<u64>,
129) -> Result<HeaderInfo, AvroError>
130where
131    R: AsyncFileReader,
132{
133    read_header(reader, file_size, header_size_hint)
134        .await
135        .map(|(header, header_len)| HeaderInfo::new(header, header_len))
136}
137
138async fn read_header<R>(
139    reader: &mut R,
140    file_size: u64,
141    header_size_hint: Option<u64>,
142) -> Result<(Header, u64), AvroError>
143where
144    R: AsyncFileReader,
145{
146    let mut decoder = HeaderDecoder::default();
147    let mut position = 0;
148    loop {
149        let range_to_fetch = position
150            ..(position + header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT)).min(file_size);
151
152        // Maybe EOF after the header, no actual data
153        if range_to_fetch.is_empty() {
154            break;
155        }
156
157        let current_data = reader
158            .get_bytes(range_to_fetch.clone())
159            .await
160            .map_err(|err| {
161                AvroError::General(format!(
162                    "Error fetching Avro header from file reader: {err}"
163                ))
164            })?;
165        if current_data.is_empty() {
166            return Err(AvroError::EOF(
167                "Unexpected EOF while fetching header data".into(),
168            ));
169        }
170
171        let read = current_data.len();
172        let decoded = decoder.decode(&current_data)?;
173        if decoded != read {
174            position += decoded as u64;
175            break;
176        }
177        position += read as u64;
178    }
179
180    decoder
181        .flush()
182        .map(|header| (header, position))
183        .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into()))
184}
185
186impl<R: AsyncFileReader> ReaderBuilder<R> {
187    /// Build the asynchronous Avro reader with the provided parameters.
188    /// This reads the header first to initialize the reader state.
189    pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, AvroError> {
190        if self.file_size == 0 {
191            return Err(AvroError::InvalidArgument("File size cannot be 0".into()));
192        }
193
194        // Start by reading the header from the beginning of the avro file
195        // take the writer schema from the header
196        let header_info =
197            read_header_info(&mut self.reader, self.file_size, self.header_size_hint).await?;
198
199        self.build_with_header(header_info)
200    }
201
202    /// Build the asynchronous Avro reader with the provided header.
203    ///
204    /// This allows initializing the reader with pre-parsed header information.
205    /// Note that this method is not async because it does not need to perform any I/O operations.
206    ///
207    /// Note: Any `header_size_hint` set via [`Self::with_header_size_hint`] is not used
208    /// when building with a pre-parsed header, since no header fetching occurs.
209    pub fn build_with_header(
210        self,
211        header_info: HeaderInfo,
212    ) -> Result<AsyncAvroFileReader<R>, AvroError> {
213        let writer_schema = header_info.writer_schema()?;
214
215        // If projection exists, project the reader schema,
216        // if no reader schema is provided, parse it from the header(get the raw writer schema), and project that
217        // this projected schema will be the schema used for reading.
218        let projected_reader_schema = self
219            .projection
220            .as_deref()
221            .map(|projection| {
222                let base_schema = if let Some(reader_schema) = &self.reader_schema {
223                    reader_schema
224                } else {
225                    &writer_schema
226                };
227                base_schema.project(projection)
228            })
229            .transpose()?;
230
231        // Use either the projected reader schema or the original reader schema(if no projection)
232        // (both optional, at worst no reader schema is provided, in which case we read with the writer schema)
233        let effective_reader_schema = projected_reader_schema
234            .as_ref()
235            .or(self.reader_schema.as_ref())
236            .map(|s| s.schema())
237            .transpose()?;
238
239        let root = {
240            let writer_schema = writer_schema.schema()?;
241            let mut builder = AvroFieldBuilder::new(&writer_schema);
242            if let Some(reader_schema) = &effective_reader_schema {
243                builder = builder.with_reader_schema(reader_schema);
244            }
245            builder
246                .with_utf8view(self.utf8_view)
247                .with_strict_mode(self.strict_mode)
248                .with_tz(self.tz)
249                .build()
250        }?;
251
252        let record_decoder = RecordDecoder::try_new_with_options(root.data_type())?;
253        let decoder = Decoder::from_parts(
254            self.batch_size,
255            record_decoder,
256            None,
257            IndexMap::new(),
258            FingerprintAlgorithm::Rabin,
259        );
260        let header_len = header_info.header_len();
261        let range = match self.range {
262            Some(r) => {
263                // If this PartitionedFile's range starts at 0, we need to skip the header bytes.
264                // But then we need to seek back 16 bytes to include the sync marker for the first block,
265                // as the logic in this reader searches the data for the first sync marker(after which a block starts),
266                // then reads blocks from the count, size etc.
267                let start = r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?);
268                let end = r.end.max(start).min(self.file_size); // Ensure end is not less than start, worst case range is empty
269                start..end
270            }
271            None => 0..self.file_size,
272        };
273
274        // Determine if there is actually data to fetch, note that we subtract the header len from range.start,
275        // so we need to check if range.end == header_len to see if there's no data after the header
276        let reader_state = if range.start == range.end || header_len == range.end {
277            ReaderState::Finished
278        } else {
279            ReaderState::Idle {
280                reader: self.reader,
281            }
282        };
283
284        let codec = header_info.compression()?;
285        let sync_marker = header_info.sync();
286
287        Ok(AsyncAvroFileReader::new(
288            range,
289            self.file_size,
290            decoder,
291            codec,
292            sync_marker,
293            reader_state,
294        ))
295    }
296}