arrow_avro/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Read Avro data to Arrow
19
20use crate::reader::block::{Block, BlockDecoder};
21use crate::reader::header::{Header, HeaderDecoder};
22use arrow_schema::ArrowError;
23use std::io::BufRead;
24
25mod block;
26mod cursor;
27mod header;
28mod record;
29mod vlq;
30
31/// Configuration options for reading Avro data into Arrow arrays
32///
33/// This struct contains configuration options that control how Avro data is
34/// converted into Arrow arrays. It allows customizing various aspects of the
35/// data conversion process.
36///
37/// # Examples
38///
39/// ```
40/// # use arrow_avro::reader::ReadOptions;
41/// // Use default options (regular StringArray for strings)
42/// let default_options = ReadOptions::default();
43///
44/// // Enable Utf8View support for better string performance
45/// let options = ReadOptions::default()
46///     .with_utf8view(true);
47/// ```
48#[derive(Default, Debug, Clone)]
49pub struct ReadOptions {
50    use_utf8view: bool,
51}
52
53impl ReadOptions {
54    /// Create a new `ReadOptions` with default values
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    /// Set whether to use StringViewArray for string data
60    ///
61    /// When enabled, string data from Avro files will be loaded into
62    /// Arrow's StringViewArray instead of the standard StringArray.
63    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
64        self.use_utf8view = use_utf8view;
65        self
66    }
67
68    /// Get whether StringViewArray is enabled for string data
69    pub fn use_utf8view(&self) -> bool {
70        self.use_utf8view
71    }
72}
73
74/// Read a [`Header`] from the provided [`BufRead`]
75fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
76    let mut decoder = HeaderDecoder::default();
77    loop {
78        let buf = reader.fill_buf()?;
79        if buf.is_empty() {
80            break;
81        }
82        let read = buf.len();
83        let decoded = decoder.decode(buf)?;
84        reader.consume(decoded);
85        if decoded != read {
86            break;
87        }
88    }
89
90    decoder
91        .flush()
92        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
93}
94
95/// Return an iterator of [`Block`] from the provided [`BufRead`]
96fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = Result<Block, ArrowError>> {
97    let mut decoder = BlockDecoder::default();
98
99    let mut try_next = move || {
100        loop {
101            let buf = reader.fill_buf()?;
102            if buf.is_empty() {
103                break;
104            }
105            let read = buf.len();
106            let decoded = decoder.decode(buf)?;
107            reader.consume(decoded);
108            if decoded != read {
109                break;
110            }
111        }
112        Ok(decoder.flush())
113    };
114    std::iter::from_fn(move || try_next().transpose())
115}
116
117#[cfg(test)]
118mod test {
119    use crate::codec::{AvroDataType, AvroField, Codec};
120    use crate::compression::CompressionCodec;
121    use crate::reader::record::RecordDecoder;
122    use crate::reader::{read_blocks, read_header};
123    use crate::test_util::arrow_test_data;
124    use arrow_array::*;
125    use arrow_schema::{DataType, Field};
126    use std::collections::HashMap;
127    use std::fs::File;
128    use std::io::BufReader;
129    use std::sync::Arc;
130
131    fn read_file(file: &str, batch_size: usize) -> RecordBatch {
132        read_file_with_options(file, batch_size, &crate::ReadOptions::default())
133    }
134
135    fn read_file_with_options(
136        file: &str,
137        batch_size: usize,
138        options: &crate::ReadOptions,
139    ) -> RecordBatch {
140        let file = File::open(file).unwrap();
141        let mut reader = BufReader::new(file);
142        let header = read_header(&mut reader).unwrap();
143        let compression = header.compression().unwrap();
144        let schema = header.schema().unwrap().unwrap();
145        let root = AvroField::try_from(&schema).unwrap();
146
147        let mut decoder =
148            RecordDecoder::try_new_with_options(root.data_type(), options.clone()).unwrap();
149
150        for result in read_blocks(reader) {
151            let block = result.unwrap();
152            assert_eq!(block.sync, header.sync());
153            if let Some(c) = compression {
154                let decompressed = c.decompress(&block.data).unwrap();
155
156                let mut offset = 0;
157                let mut remaining = block.count;
158                while remaining > 0 {
159                    let to_read = remaining.max(batch_size);
160                    offset += decoder
161                        .decode(&decompressed[offset..], block.count)
162                        .unwrap();
163
164                    remaining -= to_read;
165                }
166                assert_eq!(offset, decompressed.len());
167            }
168        }
169        decoder.flush().unwrap()
170    }
171
172    #[test]
173    fn test_utf8view_support() {
174        let schema_json = r#"{
175            "type": "record",
176            "name": "test",
177            "fields": [{
178                "name": "str_field",
179                "type": "string"
180            }]
181        }"#;
182
183        let schema: crate::schema::Schema = serde_json::from_str(schema_json).unwrap();
184        let avro_field = AvroField::try_from(&schema).unwrap();
185
186        let data_type = avro_field.data_type();
187
188        struct TestHelper;
189        impl TestHelper {
190            fn with_utf8view(field: &Field) -> Field {
191                match field.data_type() {
192                    DataType::Utf8 => {
193                        Field::new(field.name(), DataType::Utf8View, field.is_nullable())
194                            .with_metadata(field.metadata().clone())
195                    }
196                    _ => field.clone(),
197                }
198            }
199        }
200
201        let field = TestHelper::with_utf8view(&Field::new("str_field", DataType::Utf8, false));
202
203        assert_eq!(field.data_type(), &DataType::Utf8View);
204
205        let array = StringViewArray::from(vec!["test1", "test2"]);
206        let batch =
207            RecordBatch::try_from_iter(vec![("str_field", Arc::new(array) as ArrayRef)]).unwrap();
208
209        assert!(batch.column(0).as_any().is::<StringViewArray>());
210    }
211
212    #[test]
213    fn test_alltypes() {
214        let files = [
215            "avro/alltypes_plain.avro",
216            "avro/alltypes_plain.snappy.avro",
217            "avro/alltypes_plain.zstandard.avro",
218        ];
219
220        let expected = RecordBatch::try_from_iter_with_nullable([
221            (
222                "id",
223                Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _,
224                true,
225            ),
226            (
227                "bool_col",
228                Arc::new(BooleanArray::from_iter((0..8).map(|x| Some(x % 2 == 0)))) as _,
229                true,
230            ),
231            (
232                "tinyint_col",
233                Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
234                true,
235            ),
236            (
237                "smallint_col",
238                Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
239                true,
240            ),
241            (
242                "int_col",
243                Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
244                true,
245            ),
246            (
247                "bigint_col",
248                Arc::new(Int64Array::from_iter_values((0..8).map(|x| (x % 2) * 10))) as _,
249                true,
250            ),
251            (
252                "float_col",
253                Arc::new(Float32Array::from_iter_values(
254                    (0..8).map(|x| (x % 2) as f32 * 1.1),
255                )) as _,
256                true,
257            ),
258            (
259                "double_col",
260                Arc::new(Float64Array::from_iter_values(
261                    (0..8).map(|x| (x % 2) as f64 * 10.1),
262                )) as _,
263                true,
264            ),
265            (
266                "date_string_col",
267                Arc::new(BinaryArray::from_iter_values([
268                    [48, 51, 47, 48, 49, 47, 48, 57],
269                    [48, 51, 47, 48, 49, 47, 48, 57],
270                    [48, 52, 47, 48, 49, 47, 48, 57],
271                    [48, 52, 47, 48, 49, 47, 48, 57],
272                    [48, 50, 47, 48, 49, 47, 48, 57],
273                    [48, 50, 47, 48, 49, 47, 48, 57],
274                    [48, 49, 47, 48, 49, 47, 48, 57],
275                    [48, 49, 47, 48, 49, 47, 48, 57],
276                ])) as _,
277                true,
278            ),
279            (
280                "string_col",
281                Arc::new(BinaryArray::from_iter_values((0..8).map(|x| [48 + x % 2]))) as _,
282                true,
283            ),
284            (
285                "timestamp_col",
286                Arc::new(
287                    TimestampMicrosecondArray::from_iter_values([
288                        1235865600000000, // 2009-03-01T00:00:00.000
289                        1235865660000000, // 2009-03-01T00:01:00.000
290                        1238544000000000, // 2009-04-01T00:00:00.000
291                        1238544060000000, // 2009-04-01T00:01:00.000
292                        1233446400000000, // 2009-02-01T00:00:00.000
293                        1233446460000000, // 2009-02-01T00:01:00.000
294                        1230768000000000, // 2009-01-01T00:00:00.000
295                        1230768060000000, // 2009-01-01T00:01:00.000
296                    ])
297                    .with_timezone("+00:00"),
298                ) as _,
299                true,
300            ),
301        ])
302        .unwrap();
303
304        for file in files {
305            let file = arrow_test_data(file);
306
307            assert_eq!(read_file(&file, 8), expected);
308            assert_eq!(read_file(&file, 3), expected);
309        }
310    }
311}