arrow_avro/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Read Avro data to Arrow
19
20use crate::reader::block::{Block, BlockDecoder};
21use crate::reader::header::{Header, HeaderDecoder};
22use arrow_schema::ArrowError;
23use std::io::BufRead;
24
25mod header;
26
27mod block;
28
29mod cursor;
30mod record;
31mod vlq;
32
33/// Read a [`Header`] from the provided [`BufRead`]
34fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
35    let mut decoder = HeaderDecoder::default();
36    loop {
37        let buf = reader.fill_buf()?;
38        if buf.is_empty() {
39            break;
40        }
41        let read = buf.len();
42        let decoded = decoder.decode(buf)?;
43        reader.consume(decoded);
44        if decoded != read {
45            break;
46        }
47    }
48
49    decoder
50        .flush()
51        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
52}
53
54/// Return an iterator of [`Block`] from the provided [`BufRead`]
55fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = Result<Block, ArrowError>> {
56    let mut decoder = BlockDecoder::default();
57
58    let mut try_next = move || {
59        loop {
60            let buf = reader.fill_buf()?;
61            if buf.is_empty() {
62                break;
63            }
64            let read = buf.len();
65            let decoded = decoder.decode(buf)?;
66            reader.consume(decoded);
67            if decoded != read {
68                break;
69            }
70        }
71        Ok(decoder.flush())
72    };
73    std::iter::from_fn(move || try_next().transpose())
74}
75
76#[cfg(test)]
77mod test {
78    use crate::codec::AvroField;
79    use crate::compression::CompressionCodec;
80    use crate::reader::record::RecordDecoder;
81    use crate::reader::{read_blocks, read_header};
82    use crate::test_util::arrow_test_data;
83    use arrow_array::*;
84    use std::fs::File;
85    use std::io::BufReader;
86    use std::sync::Arc;
87
88    fn read_file(file: &str, batch_size: usize) -> RecordBatch {
89        let file = File::open(file).unwrap();
90        let mut reader = BufReader::new(file);
91        let header = read_header(&mut reader).unwrap();
92        let compression = header.compression().unwrap();
93        let schema = header.schema().unwrap().unwrap();
94        let root = AvroField::try_from(&schema).unwrap();
95        let mut decoder = RecordDecoder::try_new(root.data_type()).unwrap();
96
97        for result in read_blocks(reader) {
98            let block = result.unwrap();
99            assert_eq!(block.sync, header.sync());
100            if let Some(c) = compression {
101                let decompressed = c.decompress(&block.data).unwrap();
102
103                let mut offset = 0;
104                let mut remaining = block.count;
105                while remaining > 0 {
106                    let to_read = remaining.max(batch_size);
107                    offset += decoder
108                        .decode(&decompressed[offset..], block.count)
109                        .unwrap();
110
111                    remaining -= to_read;
112                }
113                assert_eq!(offset, decompressed.len());
114            }
115        }
116        decoder.flush().unwrap()
117    }
118
119    #[test]
120    fn test_alltypes() {
121        let files = [
122            "avro/alltypes_plain.avro",
123            "avro/alltypes_plain.snappy.avro",
124            "avro/alltypes_plain.zstandard.avro",
125        ];
126
127        let expected = RecordBatch::try_from_iter_with_nullable([
128            (
129                "id",
130                Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _,
131                true,
132            ),
133            (
134                "bool_col",
135                Arc::new(BooleanArray::from_iter((0..8).map(|x| Some(x % 2 == 0)))) as _,
136                true,
137            ),
138            (
139                "tinyint_col",
140                Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
141                true,
142            ),
143            (
144                "smallint_col",
145                Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
146                true,
147            ),
148            (
149                "int_col",
150                Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
151                true,
152            ),
153            (
154                "bigint_col",
155                Arc::new(Int64Array::from_iter_values((0..8).map(|x| (x % 2) * 10))) as _,
156                true,
157            ),
158            (
159                "float_col",
160                Arc::new(Float32Array::from_iter_values(
161                    (0..8).map(|x| (x % 2) as f32 * 1.1),
162                )) as _,
163                true,
164            ),
165            (
166                "double_col",
167                Arc::new(Float64Array::from_iter_values(
168                    (0..8).map(|x| (x % 2) as f64 * 10.1),
169                )) as _,
170                true,
171            ),
172            (
173                "date_string_col",
174                Arc::new(BinaryArray::from_iter_values([
175                    [48, 51, 47, 48, 49, 47, 48, 57],
176                    [48, 51, 47, 48, 49, 47, 48, 57],
177                    [48, 52, 47, 48, 49, 47, 48, 57],
178                    [48, 52, 47, 48, 49, 47, 48, 57],
179                    [48, 50, 47, 48, 49, 47, 48, 57],
180                    [48, 50, 47, 48, 49, 47, 48, 57],
181                    [48, 49, 47, 48, 49, 47, 48, 57],
182                    [48, 49, 47, 48, 49, 47, 48, 57],
183                ])) as _,
184                true,
185            ),
186            (
187                "string_col",
188                Arc::new(BinaryArray::from_iter_values((0..8).map(|x| [48 + x % 2]))) as _,
189                true,
190            ),
191            (
192                "timestamp_col",
193                Arc::new(
194                    TimestampMicrosecondArray::from_iter_values([
195                        1235865600000000, // 2009-03-01T00:00:00.000
196                        1235865660000000, // 2009-03-01T00:01:00.000
197                        1238544000000000, // 2009-04-01T00:00:00.000
198                        1238544060000000, // 2009-04-01T00:01:00.000
199                        1233446400000000, // 2009-02-01T00:00:00.000
200                        1233446460000000, // 2009-02-01T00:01:00.000
201                        1230768000000000, // 2009-01-01T00:00:00.000
202                        1230768060000000, // 2009-01-01T00:01:00.000
203                    ])
204                    .with_timezone("+00:00"),
205                ) as _,
206                true,
207            ),
208        ])
209        .unwrap();
210
211        for file in files {
212            let file = arrow_test_data(file);
213
214            assert_eq!(read_file(&file, 8), expected);
215            assert_eq!(read_file(&file, 3), expected);
216        }
217    }
218}