Skip to main content

arrow_avro/reader/async_reader/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::codec::AvroFieldBuilder;
19use crate::errors::AvroError;
20use crate::reader::async_reader::ReaderState;
21use crate::reader::header::{Header, HeaderDecoder};
22use crate::reader::record::RecordDecoder;
23use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
24use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY};
25use indexmap::IndexMap;
26use std::ops::Range;
27
28const DEFAULT_HEADER_SIZE_HINT: u64 = 16 * 1024; // 16 KB
29
30/// Builder for an asynchronous Avro file reader.
31pub struct ReaderBuilder<R> {
32    reader: R,
33    file_size: u64,
34    batch_size: usize,
35    range: Option<Range<u64>>,
36    reader_schema: Option<AvroSchema>,
37    projection: Option<Vec<usize>>,
38    header_size_hint: Option<u64>,
39    utf8_view: bool,
40    strict_mode: bool,
41}
42
43impl<R> ReaderBuilder<R> {
44    pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
45        Self {
46            reader,
47            file_size,
48            batch_size,
49            range: None,
50            reader_schema: None,
51            projection: None,
52            header_size_hint: None,
53            utf8_view: false,
54            strict_mode: false,
55        }
56    }
57
58    /// Specify a byte range to read from the Avro file.
59    /// If this is provided, the reader will read all the blocks within the specified range,
60    /// if the range ends mid-block, it will attempt to fetch the remaining bytes to complete the block,
61    /// but no further blocks will be read.
62    /// If this is omitted, the full file will be read.
63    pub fn with_range(self, range: Range<u64>) -> Self {
64        Self {
65            range: Some(range),
66            ..self
67        }
68    }
69
70    /// Specify a reader schema to use when reading the Avro file.
71    /// This can be useful to project specific columns or handle schema evolution.
72    /// If this is not provided, the schema will be derived from the Arrow schema provided.
73    pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
74        Self {
75            reader_schema: Some(reader_schema),
76            ..self
77        }
78    }
79
80    /// Specify a projection of column indices to read from the Avro file.
81    /// This can help optimize reading by only fetching the necessary columns.
82    pub fn with_projection(self, projection: Vec<usize>) -> Self {
83        Self {
84            projection: Some(projection),
85            ..self
86        }
87    }
88
89    /// Provide a hint for the expected size of the Avro header in bytes.
90    /// This can help optimize the initial read operation when fetching the header.
91    pub fn with_header_size_hint(self, hint: u64) -> Self {
92        Self {
93            header_size_hint: Some(hint),
94            ..self
95        }
96    }
97
98    /// Enable usage of Utf8View types when reading string data.
99    pub fn with_utf8_view(self, utf8_view: bool) -> Self {
100        Self { utf8_view, ..self }
101    }
102
103    /// Enable strict mode for schema validation and data reading.
104    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
105        Self {
106            strict_mode,
107            ..self
108        }
109    }
110}
111
112impl<R: AsyncFileReader> ReaderBuilder<R> {
113    async fn read_header(&mut self) -> Result<(Header, u64), AvroError> {
114        let mut decoder = HeaderDecoder::default();
115        let mut position = 0;
116        loop {
117            let range_to_fetch = position
118                ..(position + self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
119                    .min(self.file_size);
120
121            // Maybe EOF after the header, no actual data
122            if range_to_fetch.is_empty() {
123                break;
124            }
125
126            let current_data = self
127                .reader
128                .get_bytes(range_to_fetch.clone())
129                .await
130                .map_err(|err| {
131                    AvroError::General(format!(
132                        "Error fetching Avro header from file reader: {err}"
133                    ))
134                })?;
135            if current_data.is_empty() {
136                return Err(AvroError::EOF(
137                    "Unexpected EOF while fetching header data".into(),
138                ));
139            }
140
141            let read = current_data.len();
142            let decoded = decoder.decode(&current_data)?;
143            if decoded != read {
144                position += decoded as u64;
145                break;
146            }
147            position += read as u64;
148        }
149
150        decoder
151            .flush()
152            .map(|header| (header, position))
153            .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into()))
154    }
155
156    /// Build the asynchronous Avro reader with the provided parameters.
157    /// This reads the header first to initialize the reader state.
158    pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, AvroError> {
159        if self.file_size == 0 {
160            return Err(AvroError::InvalidArgument("File size cannot be 0".into()));
161        }
162
163        // Start by reading the header from the beginning of the avro file
164        // take the writer schema from the header
165        let (header, header_len) = self.read_header().await?;
166        let writer_schema = {
167            let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
168                AvroError::ParseError("No Avro schema present in file header".to_string())
169            })?;
170            let json_string = std::str::from_utf8(raw)
171                .map_err(|e| {
172                    AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}"))
173                })?
174                .to_string();
175            AvroSchema::new(json_string)
176        };
177
178        // If projection exists, project the reader schema,
179        // if no reader schema is provided, parse it from the header(get the raw writer schema), and project that
180        // this projected schema will be the schema used for reading.
181        let projected_reader_schema = self
182            .projection
183            .as_deref()
184            .map(|projection| {
185                let base_schema = if let Some(reader_schema) = &self.reader_schema {
186                    reader_schema
187                } else {
188                    &writer_schema
189                };
190                base_schema.project(projection)
191            })
192            .transpose()?;
193
194        // Use either the projected reader schema or the original reader schema(if no projection)
195        // (both optional, at worst no reader schema is provided, in which case we read with the writer schema)
196        let effective_reader_schema = projected_reader_schema
197            .as_ref()
198            .or(self.reader_schema.as_ref())
199            .map(|s| s.schema())
200            .transpose()?;
201
202        let root = {
203            let writer_schema = writer_schema.schema()?;
204            let mut builder = AvroFieldBuilder::new(&writer_schema);
205            if let Some(reader_schema) = &effective_reader_schema {
206                builder = builder.with_reader_schema(reader_schema);
207            }
208            builder
209                .with_utf8view(self.utf8_view)
210                .with_strict_mode(self.strict_mode)
211                .build()
212        }?;
213
214        let record_decoder = RecordDecoder::try_new_with_options(root.data_type())?;
215        let decoder = Decoder::from_parts(
216            self.batch_size,
217            record_decoder,
218            None,
219            IndexMap::new(),
220            FingerprintAlgorithm::Rabin,
221        );
222        let range = match self.range {
223            Some(r) => {
224                // If this PartitionedFile's range starts at 0, we need to skip the header bytes.
225                // But then we need to seek back 16 bytes to include the sync marker for the first block,
226                // as the logic in this reader searches the data for the first sync marker(after which a block starts),
227                // then reads blocks from the count, size etc.
228                let start = r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?);
229                let end = r.end.max(start).min(self.file_size); // Ensure end is not less than start, worst case range is empty
230                start..end
231            }
232            None => 0..self.file_size,
233        };
234
235        // Determine if there is actually data to fetch, note that we subtract the header len from range.start,
236        // so we need to check if range.end == header_len to see if there's no data after the header
237        let reader_state = if range.start == range.end || header_len == range.end {
238            ReaderState::Finished
239        } else {
240            ReaderState::Idle {
241                reader: self.reader,
242            }
243        };
244        let codec = header.compression()?;
245        let sync_marker = header.sync();
246
247        Ok(AsyncAvroFileReader::new(
248            range,
249            self.file_size,
250            decoder,
251            codec,
252            sync_marker,
253            reader_state,
254        ))
255    }
256}