1use crate::reader::block::{Block, BlockDecoder};
21use crate::reader::header::{Header, HeaderDecoder};
22use arrow_schema::ArrowError;
23use std::io::BufRead;
24
25mod header;
26
27mod block;
28
29mod cursor;
30mod record;
31mod vlq;
32
33fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
35 let mut decoder = HeaderDecoder::default();
36 loop {
37 let buf = reader.fill_buf()?;
38 if buf.is_empty() {
39 break;
40 }
41 let read = buf.len();
42 let decoded = decoder.decode(buf)?;
43 reader.consume(decoded);
44 if decoded != read {
45 break;
46 }
47 }
48
49 decoder
50 .flush()
51 .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
52}
53
54fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = Result<Block, ArrowError>> {
56 let mut decoder = BlockDecoder::default();
57
58 let mut try_next = move || {
59 loop {
60 let buf = reader.fill_buf()?;
61 if buf.is_empty() {
62 break;
63 }
64 let read = buf.len();
65 let decoded = decoder.decode(buf)?;
66 reader.consume(decoded);
67 if decoded != read {
68 break;
69 }
70 }
71 Ok(decoder.flush())
72 };
73 std::iter::from_fn(move || try_next().transpose())
74}
75
76#[cfg(test)]
77mod test {
78 use crate::codec::AvroField;
79 use crate::compression::CompressionCodec;
80 use crate::reader::record::RecordDecoder;
81 use crate::reader::{read_blocks, read_header};
82 use crate::test_util::arrow_test_data;
83 use arrow_array::*;
84 use std::fs::File;
85 use std::io::BufReader;
86 use std::sync::Arc;
87
88 fn read_file(file: &str, batch_size: usize) -> RecordBatch {
89 let file = File::open(file).unwrap();
90 let mut reader = BufReader::new(file);
91 let header = read_header(&mut reader).unwrap();
92 let compression = header.compression().unwrap();
93 let schema = header.schema().unwrap().unwrap();
94 let root = AvroField::try_from(&schema).unwrap();
95 let mut decoder = RecordDecoder::try_new(root.data_type()).unwrap();
96
97 for result in read_blocks(reader) {
98 let block = result.unwrap();
99 assert_eq!(block.sync, header.sync());
100 if let Some(c) = compression {
101 let decompressed = c.decompress(&block.data).unwrap();
102
103 let mut offset = 0;
104 let mut remaining = block.count;
105 while remaining > 0 {
106 let to_read = remaining.max(batch_size);
107 offset += decoder
108 .decode(&decompressed[offset..], block.count)
109 .unwrap();
110
111 remaining -= to_read;
112 }
113 assert_eq!(offset, decompressed.len());
114 }
115 }
116 decoder.flush().unwrap()
117 }
118
119 #[test]
120 fn test_alltypes() {
121 let files = [
122 "avro/alltypes_plain.avro",
123 "avro/alltypes_plain.snappy.avro",
124 "avro/alltypes_plain.zstandard.avro",
125 ];
126
127 let expected = RecordBatch::try_from_iter_with_nullable([
128 (
129 "id",
130 Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _,
131 true,
132 ),
133 (
134 "bool_col",
135 Arc::new(BooleanArray::from_iter((0..8).map(|x| Some(x % 2 == 0)))) as _,
136 true,
137 ),
138 (
139 "tinyint_col",
140 Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
141 true,
142 ),
143 (
144 "smallint_col",
145 Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
146 true,
147 ),
148 (
149 "int_col",
150 Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
151 true,
152 ),
153 (
154 "bigint_col",
155 Arc::new(Int64Array::from_iter_values((0..8).map(|x| (x % 2) * 10))) as _,
156 true,
157 ),
158 (
159 "float_col",
160 Arc::new(Float32Array::from_iter_values(
161 (0..8).map(|x| (x % 2) as f32 * 1.1),
162 )) as _,
163 true,
164 ),
165 (
166 "double_col",
167 Arc::new(Float64Array::from_iter_values(
168 (0..8).map(|x| (x % 2) as f64 * 10.1),
169 )) as _,
170 true,
171 ),
172 (
173 "date_string_col",
174 Arc::new(BinaryArray::from_iter_values([
175 [48, 51, 47, 48, 49, 47, 48, 57],
176 [48, 51, 47, 48, 49, 47, 48, 57],
177 [48, 52, 47, 48, 49, 47, 48, 57],
178 [48, 52, 47, 48, 49, 47, 48, 57],
179 [48, 50, 47, 48, 49, 47, 48, 57],
180 [48, 50, 47, 48, 49, 47, 48, 57],
181 [48, 49, 47, 48, 49, 47, 48, 57],
182 [48, 49, 47, 48, 49, 47, 48, 57],
183 ])) as _,
184 true,
185 ),
186 (
187 "string_col",
188 Arc::new(BinaryArray::from_iter_values((0..8).map(|x| [48 + x % 2]))) as _,
189 true,
190 ),
191 (
192 "timestamp_col",
193 Arc::new(
194 TimestampMicrosecondArray::from_iter_values([
195 1235865600000000, 1235865660000000, 1238544000000000, 1238544060000000, 1233446400000000, 1233446460000000, 1230768000000000, 1230768060000000, ])
204 .with_timezone("+00:00"),
205 ) as _,
206 true,
207 ),
208 ])
209 .unwrap();
210
211 for file in files {
212 let file = arrow_test_data(file);
213
214 assert_eq!(read_file(&file, 8), expected);
215 assert_eq!(read_file(&file, 3), expected);
216 }
217 }
218}