1use crate::reader::block::{Block, BlockDecoder};
21use crate::reader::header::{Header, HeaderDecoder};
22use arrow_schema::ArrowError;
23use std::io::BufRead;
24
25mod block;
26mod cursor;
27mod header;
28mod record;
29mod vlq;
30
31#[derive(Default, Debug, Clone)]
49pub struct ReadOptions {
50 use_utf8view: bool,
51}
52
53impl ReadOptions {
54 pub fn new() -> Self {
56 Self::default()
57 }
58
59 pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
64 self.use_utf8view = use_utf8view;
65 self
66 }
67
68 pub fn use_utf8view(&self) -> bool {
70 self.use_utf8view
71 }
72}
73
74fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
76 let mut decoder = HeaderDecoder::default();
77 loop {
78 let buf = reader.fill_buf()?;
79 if buf.is_empty() {
80 break;
81 }
82 let read = buf.len();
83 let decoded = decoder.decode(buf)?;
84 reader.consume(decoded);
85 if decoded != read {
86 break;
87 }
88 }
89
90 decoder
91 .flush()
92 .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
93}
94
95fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = Result<Block, ArrowError>> {
97 let mut decoder = BlockDecoder::default();
98
99 let mut try_next = move || {
100 loop {
101 let buf = reader.fill_buf()?;
102 if buf.is_empty() {
103 break;
104 }
105 let read = buf.len();
106 let decoded = decoder.decode(buf)?;
107 reader.consume(decoded);
108 if decoded != read {
109 break;
110 }
111 }
112 Ok(decoder.flush())
113 };
114 std::iter::from_fn(move || try_next().transpose())
115}
116
117#[cfg(test)]
118mod test {
119 use crate::codec::{AvroDataType, AvroField, Codec};
120 use crate::compression::CompressionCodec;
121 use crate::reader::record::RecordDecoder;
122 use crate::reader::{read_blocks, read_header};
123 use crate::test_util::arrow_test_data;
124 use arrow_array::*;
125 use arrow_schema::{DataType, Field};
126 use std::collections::HashMap;
127 use std::fs::File;
128 use std::io::BufReader;
129 use std::sync::Arc;
130
131 fn read_file(file: &str, batch_size: usize) -> RecordBatch {
132 read_file_with_options(file, batch_size, &crate::ReadOptions::default())
133 }
134
135 fn read_file_with_options(
136 file: &str,
137 batch_size: usize,
138 options: &crate::ReadOptions,
139 ) -> RecordBatch {
140 let file = File::open(file).unwrap();
141 let mut reader = BufReader::new(file);
142 let header = read_header(&mut reader).unwrap();
143 let compression = header.compression().unwrap();
144 let schema = header.schema().unwrap().unwrap();
145 let root = AvroField::try_from(&schema).unwrap();
146
147 let mut decoder =
148 RecordDecoder::try_new_with_options(root.data_type(), options.clone()).unwrap();
149
150 for result in read_blocks(reader) {
151 let block = result.unwrap();
152 assert_eq!(block.sync, header.sync());
153 if let Some(c) = compression {
154 let decompressed = c.decompress(&block.data).unwrap();
155
156 let mut offset = 0;
157 let mut remaining = block.count;
158 while remaining > 0 {
159 let to_read = remaining.max(batch_size);
160 offset += decoder
161 .decode(&decompressed[offset..], block.count)
162 .unwrap();
163
164 remaining -= to_read;
165 }
166 assert_eq!(offset, decompressed.len());
167 }
168 }
169 decoder.flush().unwrap()
170 }
171
172 #[test]
173 fn test_utf8view_support() {
174 let schema_json = r#"{
175 "type": "record",
176 "name": "test",
177 "fields": [{
178 "name": "str_field",
179 "type": "string"
180 }]
181 }"#;
182
183 let schema: crate::schema::Schema = serde_json::from_str(schema_json).unwrap();
184 let avro_field = AvroField::try_from(&schema).unwrap();
185
186 let data_type = avro_field.data_type();
187
188 struct TestHelper;
189 impl TestHelper {
190 fn with_utf8view(field: &Field) -> Field {
191 match field.data_type() {
192 DataType::Utf8 => {
193 Field::new(field.name(), DataType::Utf8View, field.is_nullable())
194 .with_metadata(field.metadata().clone())
195 }
196 _ => field.clone(),
197 }
198 }
199 }
200
201 let field = TestHelper::with_utf8view(&Field::new("str_field", DataType::Utf8, false));
202
203 assert_eq!(field.data_type(), &DataType::Utf8View);
204
205 let array = StringViewArray::from(vec!["test1", "test2"]);
206 let batch =
207 RecordBatch::try_from_iter(vec![("str_field", Arc::new(array) as ArrayRef)]).unwrap();
208
209 assert!(batch.column(0).as_any().is::<StringViewArray>());
210 }
211
212 #[test]
213 fn test_alltypes() {
214 let files = [
215 "avro/alltypes_plain.avro",
216 "avro/alltypes_plain.snappy.avro",
217 "avro/alltypes_plain.zstandard.avro",
218 ];
219
220 let expected = RecordBatch::try_from_iter_with_nullable([
221 (
222 "id",
223 Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _,
224 true,
225 ),
226 (
227 "bool_col",
228 Arc::new(BooleanArray::from_iter((0..8).map(|x| Some(x % 2 == 0)))) as _,
229 true,
230 ),
231 (
232 "tinyint_col",
233 Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
234 true,
235 ),
236 (
237 "smallint_col",
238 Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
239 true,
240 ),
241 (
242 "int_col",
243 Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
244 true,
245 ),
246 (
247 "bigint_col",
248 Arc::new(Int64Array::from_iter_values((0..8).map(|x| (x % 2) * 10))) as _,
249 true,
250 ),
251 (
252 "float_col",
253 Arc::new(Float32Array::from_iter_values(
254 (0..8).map(|x| (x % 2) as f32 * 1.1),
255 )) as _,
256 true,
257 ),
258 (
259 "double_col",
260 Arc::new(Float64Array::from_iter_values(
261 (0..8).map(|x| (x % 2) as f64 * 10.1),
262 )) as _,
263 true,
264 ),
265 (
266 "date_string_col",
267 Arc::new(BinaryArray::from_iter_values([
268 [48, 51, 47, 48, 49, 47, 48, 57],
269 [48, 51, 47, 48, 49, 47, 48, 57],
270 [48, 52, 47, 48, 49, 47, 48, 57],
271 [48, 52, 47, 48, 49, 47, 48, 57],
272 [48, 50, 47, 48, 49, 47, 48, 57],
273 [48, 50, 47, 48, 49, 47, 48, 57],
274 [48, 49, 47, 48, 49, 47, 48, 57],
275 [48, 49, 47, 48, 49, 47, 48, 57],
276 ])) as _,
277 true,
278 ),
279 (
280 "string_col",
281 Arc::new(BinaryArray::from_iter_values((0..8).map(|x| [48 + x % 2]))) as _,
282 true,
283 ),
284 (
285 "timestamp_col",
286 Arc::new(
287 TimestampMicrosecondArray::from_iter_values([
288 1235865600000000, 1235865660000000, 1238544000000000, 1238544060000000, 1233446400000000, 1233446460000000, 1230768000000000, 1230768060000000, ])
297 .with_timezone("+00:00"),
298 ) as _,
299 true,
300 ),
301 ])
302 .unwrap();
303
304 for file in files {
305 let file = arrow_test_data(file);
306
307 assert_eq!(read_file(&file, 8), expected);
308 assert_eq!(read_file(&file, 3), expected);
309 }
310 }
311}