arrow_avro/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro reader
19//!
20//! This module provides facilities to read Apache Avro-encoded files or streams
21//! into Arrow's `RecordBatch` format. In particular, it introduces:
22//!
23//! * `ReaderBuilder`: Configures Avro reading, e.g., batch size
24//! * `Reader`: Yields `RecordBatch` values, implementing `Iterator`
25//! * `Decoder`: A low-level push-based decoder for Avro records
26//!
27//! # Basic Usage
28//!
29//! `Reader` can be used directly with synchronous data sources, such as [`std::fs::File`].
30//!
31//! ## Reading a Single Batch
32//!
33//! ```
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use arrow_avro::reader::ReaderBuilder;
37//! # let path = "avro/alltypes_plain.avro";
38//! # let path = match std::env::var("ARROW_TEST_DATA") {
39//! #   Ok(dir) => format!("{dir}/{path}"),
40//! #   Err(_) => format!("../testing/data/{path}")
41//! # };
42//! let file = File::open(path).unwrap();
43//! let mut avro = ReaderBuilder::new().build(BufReader::new(file)).unwrap();
44//! let batch = avro.next().unwrap();
45//! ```
46//!
47//! # Async Usage
48//!
49//! The lower-level `Decoder` can be integrated with various forms of async data streams,
50//! and is designed to be agnostic to different async IO primitives within
51//! the Rust ecosystem. It works by incrementally decoding Avro data from byte slices.
52//!
53//! For example, see below for how it could be used with an arbitrary `Stream` of `Bytes`:
54//!
55//! ```
56//! # use std::task::{Poll, ready};
57//! # use bytes::{Buf, Bytes};
58//! # use arrow_schema::ArrowError;
59//! # use futures::stream::{Stream, StreamExt};
60//! # use arrow_array::RecordBatch;
61//! # use arrow_avro::reader::Decoder;
62//!
63//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
64//!     mut decoder: Decoder,
65//!     mut input: S,
66//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
67//!     let mut buffered = Bytes::new();
68//!     futures::stream::poll_fn(move |cx| {
69//!         loop {
70//!             if buffered.is_empty() {
71//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
72//!                     Some(b) => b,
73//!                     None => break,
74//!                 };
75//!             }
76//!             let decoded = match decoder.decode(buffered.as_ref()) {
77//!                 Ok(decoded) => decoded,
78//!                 Err(e) => return Poll::Ready(Some(Err(e))),
79//!             };
80//!             let read = buffered.len();
81//!             buffered.advance(decoded);
82//!             if decoded != read {
83//!                 break
84//!             }
85//!         }
86//!         // Convert any fully-decoded rows to a RecordBatch, if available
87//!         Poll::Ready(decoder.flush().transpose())
88//!     })
89//! }
90//! ```
91//!
92
93use crate::codec::AvroFieldBuilder;
94use crate::schema::Schema as AvroSchema;
95use arrow_array::{RecordBatch, RecordBatchReader};
96use arrow_schema::{ArrowError, SchemaRef};
97use block::BlockDecoder;
98use header::{Header, HeaderDecoder};
99use record::RecordDecoder;
100use std::io::BufRead;
101
102mod block;
103mod cursor;
104mod header;
105mod record;
106mod vlq;
107
108/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
109fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
110    let mut decoder = HeaderDecoder::default();
111    loop {
112        let buf = reader.fill_buf()?;
113        if buf.is_empty() {
114            break;
115        }
116        let read = buf.len();
117        let decoded = decoder.decode(buf)?;
118        reader.consume(decoded);
119        if decoded != read {
120            break;
121        }
122    }
123    decoder.flush().ok_or_else(|| {
124        ArrowError::ParseError("Unexpected EOF while reading Avro header".to_string())
125    })
126}
127
128/// A low-level interface for decoding Avro-encoded bytes into Arrow `RecordBatch`.
129#[derive(Debug)]
130pub struct Decoder {
131    record_decoder: RecordDecoder,
132    batch_size: usize,
133    decoded_rows: usize,
134}
135
136impl Decoder {
137    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
138        Self {
139            record_decoder,
140            batch_size,
141            decoded_rows: 0,
142        }
143    }
144
145    /// Return the Arrow schema for the rows decoded by this decoder
146    pub fn schema(&self) -> SchemaRef {
147        self.record_decoder.schema().clone()
148    }
149
150    /// Return the configured maximum number of rows per batch
151    pub fn batch_size(&self) -> usize {
152        self.batch_size
153    }
154
155    /// Feed `data` into the decoder row by row until we either:
156    /// - consume all bytes in `data`, or
157    /// - reach `batch_size` decoded rows.
158    ///
159    /// Returns the number of bytes consumed.
160    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
161        let mut total_consumed = 0usize;
162        while total_consumed < data.len() && self.decoded_rows < self.batch_size {
163            let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?;
164            // A successful call to record_decoder.decode means one row was decoded.
165            // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record.
166            // We increment `decoded_rows` to mark progress and avoid an infinite loop.
167            // We add `consumed` (which can be 0) to `total_consumed`.
168            total_consumed += consumed;
169            self.decoded_rows += 1;
170        }
171        Ok(total_consumed)
172    }
173
174    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
175    /// `Ok(None)` if no new rows are available.
176    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
177        if self.decoded_rows == 0 {
178            Ok(None)
179        } else {
180            let batch = self.record_decoder.flush()?;
181            self.decoded_rows = 0;
182            Ok(Some(batch))
183        }
184    }
185
186    /// Returns the number of rows that can be added to this decoder before it is full.
187    pub fn capacity(&self) -> usize {
188        self.batch_size.saturating_sub(self.decoded_rows)
189    }
190
191    /// Returns true if the decoder has reached its capacity for the current batch.
192    pub fn batch_is_full(&self) -> bool {
193        self.capacity() == 0
194    }
195}
196
197/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
198/// into Arrow `RecordBatch`.
199#[derive(Debug)]
200pub struct ReaderBuilder {
201    batch_size: usize,
202    strict_mode: bool,
203    utf8_view: bool,
204    schema: Option<AvroSchema<'static>>,
205}
206
207impl Default for ReaderBuilder {
208    fn default() -> Self {
209        Self {
210            batch_size: 1024,
211            strict_mode: false,
212            utf8_view: false,
213            schema: None,
214        }
215    }
216}
217
218impl ReaderBuilder {
219    /// Creates a new [`ReaderBuilder`] with default settings:
220    /// - `batch_size` = 1024
221    /// - `strict_mode` = false
222    /// - `utf8_view` = false
223    /// - `schema` = None
224    pub fn new() -> Self {
225        Self::default()
226    }
227
228    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result<RecordDecoder, ArrowError> {
229        let root_field = AvroFieldBuilder::new(schema)
230            .with_utf8view(self.utf8_view)
231            .with_strict_mode(self.strict_mode)
232            .build()?;
233        RecordDecoder::try_new_with_options(root_field.data_type(), self.utf8_view)
234    }
235
236    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> {
237        let header = read_header(reader)?;
238        let record_decoder = if let Some(schema) = &self.schema {
239            self.make_record_decoder(schema)?
240        } else {
241            let avro_schema: Option<AvroSchema<'_>> = header
242                .schema()
243                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
244            let avro_schema = avro_schema.ok_or_else(|| {
245                ArrowError::ParseError("No Avro schema present in file header".to_string())
246            })?;
247            self.make_record_decoder(&avro_schema)?
248        };
249        let decoder = Decoder::new(record_decoder, self.batch_size);
250        Ok((header, decoder))
251    }
252
253    /// Sets the row-based batch size
254    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
255        self.batch_size = batch_size;
256        self
257    }
258
259    /// Set whether to use StringViewArray for string data
260    ///
261    /// When enabled, string data from Avro files will be loaded into
262    /// Arrow's StringViewArray instead of the standard StringArray.
263    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
264        self.utf8_view = utf8_view;
265        self
266    }
267
268    /// Get whether StringViewArray is enabled for string data
269    pub fn use_utf8view(&self) -> bool {
270        self.utf8_view
271    }
272
273    /// Controls whether certain Avro unions of the form `[T, "null"]` should produce an error.
274    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
275        self.strict_mode = strict_mode;
276        self
277    }
278
279    /// Sets the Avro schema.
280    ///
281    /// If a schema is not provided, the schema will be read from the Avro file header.
282    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
283        self.schema = Some(schema);
284        self
285    }
286
287    /// Create a [`Reader`] from this builder and a `BufRead`
288    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, ArrowError> {
289        let (header, decoder) = self.build_impl(&mut reader)?;
290        Ok(Reader {
291            reader,
292            header,
293            decoder,
294            block_decoder: BlockDecoder::default(),
295            block_data: Vec::new(),
296            block_cursor: 0,
297            finished: false,
298        })
299    }
300
301    /// Create a [`Decoder`] from this builder and a `BufRead` by
302    /// reading and parsing the Avro file's header. This will
303    /// not create a full [`Reader`].
304    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, ArrowError> {
305        match self.schema {
306            Some(ref schema) => {
307                let record_decoder = self.make_record_decoder(schema)?;
308                Ok(Decoder::new(record_decoder, self.batch_size))
309            }
310            None => {
311                let (_, decoder) = self.build_impl(&mut reader)?;
312                Ok(decoder)
313            }
314        }
315    }
316}
317
318/// A high-level Avro `Reader` that reads container-file blocks
319/// and feeds them into a row-level [`Decoder`].
320#[derive(Debug)]
321pub struct Reader<R: BufRead> {
322    reader: R,
323    header: Header,
324    decoder: Decoder,
325    block_decoder: BlockDecoder,
326    block_data: Vec<u8>,
327    block_cursor: usize,
328    finished: bool,
329}
330
331impl<R: BufRead> Reader<R> {
332    /// Return the Arrow schema discovered from the Avro file header
333    pub fn schema(&self) -> SchemaRef {
334        self.decoder.schema()
335    }
336
337    /// Return the Avro container-file header
338    pub fn avro_header(&self) -> &Header {
339        &self.header
340    }
341
342    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
343    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
344        'outer: while !self.finished && !self.decoder.batch_is_full() {
345            while self.block_cursor == self.block_data.len() {
346                let buf = self.reader.fill_buf()?;
347                if buf.is_empty() {
348                    self.finished = true;
349                    break 'outer;
350                }
351                // Try to decode another block from the buffered reader.
352                let consumed = self.block_decoder.decode(buf)?;
353                self.reader.consume(consumed);
354                if let Some(block) = self.block_decoder.flush() {
355                    // Successfully decoded a block.
356                    let block_data = if let Some(ref codec) = self.header.compression()? {
357                        codec.decompress(&block.data)?
358                    } else {
359                        block.data
360                    };
361                    self.block_data = block_data;
362                    self.block_cursor = 0;
363                } else if consumed == 0 {
364                    // The block decoder made no progress on a non-empty buffer.
365                    return Err(ArrowError::ParseError(
366                        "Could not decode next Avro block from partial data".to_string(),
367                    ));
368                }
369            }
370            // Try to decode more rows from the current block.
371            let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
372            self.block_cursor += consumed;
373        }
374        self.decoder.flush()
375    }
376}
377
378impl<R: BufRead> Iterator for Reader<R> {
379    type Item = Result<RecordBatch, ArrowError>;
380
381    fn next(&mut self) -> Option<Self::Item> {
382        self.read().transpose()
383    }
384}
385
386impl<R: BufRead> RecordBatchReader for Reader<R> {
387    fn schema(&self) -> SchemaRef {
388        self.schema()
389    }
390}
391
392#[cfg(test)]
393mod test {
394    use crate::codec::{AvroDataType, AvroField, Codec};
395    use crate::compression::CompressionCodec;
396    use crate::reader::record::RecordDecoder;
397    use crate::reader::vlq::VLQDecoder;
398    use crate::reader::{read_header, Decoder, Reader, ReaderBuilder};
399    use crate::test_util::arrow_test_data;
400    use arrow::array::ArrayDataBuilder;
401    use arrow_array::builder::{
402        ArrayBuilder, BooleanBuilder, Float32Builder, Float64Builder, Int32Builder, Int64Builder,
403        ListBuilder, MapBuilder, StringBuilder, StructBuilder,
404    };
405    use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
406    use arrow_array::*;
407    use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
408    use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
409    use bytes::{Buf, BufMut, Bytes};
410    use futures::executor::block_on;
411    use futures::{stream, Stream, StreamExt, TryStreamExt};
412    use std::collections::HashMap;
413    use std::fs;
414    use std::fs::File;
415    use std::io::{BufReader, Cursor, Read};
416    use std::sync::Arc;
417    use std::task::{ready, Poll};
418
419    fn read_file(path: &str, batch_size: usize, utf8_view: bool) -> RecordBatch {
420        let file = File::open(path).unwrap();
421        let reader = ReaderBuilder::new()
422            .with_batch_size(batch_size)
423            .with_utf8_view(utf8_view)
424            .build(BufReader::new(file))
425            .unwrap();
426        let schema = reader.schema();
427        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
428        arrow::compute::concat_batches(&schema, &batches).unwrap()
429    }
430
431    fn read_file_strict(
432        path: &str,
433        batch_size: usize,
434        utf8_view: bool,
435    ) -> Result<Reader<BufReader<File>>, ArrowError> {
436        let file = File::open(path).unwrap();
437        ReaderBuilder::new()
438            .with_batch_size(batch_size)
439            .with_utf8_view(utf8_view)
440            .with_strict_mode(true)
441            .build(BufReader::new(file))
442    }
443
444    fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
445        mut decoder: Decoder,
446        mut input: S,
447    ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
448        async_stream::try_stream! {
449            if let Some(data) = input.next().await {
450                let consumed = decoder.decode(&data)?;
451                if consumed < data.len() {
452                    Err(ArrowError::ParseError(
453                        "did not consume all bytes".to_string(),
454                    ))?;
455                }
456            }
457            if let Some(batch) = decoder.flush()? {
458                yield batch
459            }
460        }
461    }
462
463    #[test]
464    fn test_utf8view_support() {
465        let schema_json = r#"{
466            "type": "record",
467            "name": "test",
468            "fields": [{
469                "name": "str_field",
470                "type": "string"
471            }]
472        }"#;
473
474        let schema: crate::schema::Schema = serde_json::from_str(schema_json).unwrap();
475        let avro_field = AvroField::try_from(&schema).unwrap();
476
477        let data_type = avro_field.data_type();
478
479        struct TestHelper;
480        impl TestHelper {
481            fn with_utf8view(field: &Field) -> Field {
482                match field.data_type() {
483                    DataType::Utf8 => {
484                        Field::new(field.name(), DataType::Utf8View, field.is_nullable())
485                            .with_metadata(field.metadata().clone())
486                    }
487                    _ => field.clone(),
488                }
489            }
490        }
491
492        let field = TestHelper::with_utf8view(&Field::new("str_field", DataType::Utf8, false));
493
494        assert_eq!(field.data_type(), &DataType::Utf8View);
495
496        let array = StringViewArray::from(vec!["test1", "test2"]);
497        let batch =
498            RecordBatch::try_from_iter(vec![("str_field", Arc::new(array) as ArrayRef)]).unwrap();
499
500        assert!(batch.column(0).as_any().is::<StringViewArray>());
501    }
502
503    #[test]
504    fn test_read_zero_byte_avro_file() {
505        let batch = read_file("test/data/zero_byte.avro", 3, false);
506        let schema = batch.schema();
507        assert_eq!(schema.fields().len(), 1);
508        let field = schema.field(0);
509        assert_eq!(field.name(), "data");
510        assert_eq!(field.data_type(), &DataType::Binary);
511        assert!(field.is_nullable());
512        assert_eq!(batch.num_rows(), 3);
513        assert_eq!(batch.num_columns(), 1);
514        let binary_array = batch
515            .column(0)
516            .as_any()
517            .downcast_ref::<BinaryArray>()
518            .unwrap();
519        assert!(binary_array.is_null(0));
520        assert!(binary_array.is_valid(1));
521        assert_eq!(binary_array.value(1), b"");
522        assert!(binary_array.is_valid(2));
523        assert_eq!(binary_array.value(2), b"some bytes");
524    }
525
526    #[test]
527    fn test_alltypes() {
528        let files = [
529            "avro/alltypes_plain.avro",
530            "avro/alltypes_plain.snappy.avro",
531            "avro/alltypes_plain.zstandard.avro",
532            "avro/alltypes_plain.bzip2.avro",
533            "avro/alltypes_plain.xz.avro",
534        ];
535
536        let expected = RecordBatch::try_from_iter_with_nullable([
537            (
538                "id",
539                Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _,
540                true,
541            ),
542            (
543                "bool_col",
544                Arc::new(BooleanArray::from_iter((0..8).map(|x| Some(x % 2 == 0)))) as _,
545                true,
546            ),
547            (
548                "tinyint_col",
549                Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
550                true,
551            ),
552            (
553                "smallint_col",
554                Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
555                true,
556            ),
557            (
558                "int_col",
559                Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
560                true,
561            ),
562            (
563                "bigint_col",
564                Arc::new(Int64Array::from_iter_values((0..8).map(|x| (x % 2) * 10))) as _,
565                true,
566            ),
567            (
568                "float_col",
569                Arc::new(Float32Array::from_iter_values(
570                    (0..8).map(|x| (x % 2) as f32 * 1.1),
571                )) as _,
572                true,
573            ),
574            (
575                "double_col",
576                Arc::new(Float64Array::from_iter_values(
577                    (0..8).map(|x| (x % 2) as f64 * 10.1),
578                )) as _,
579                true,
580            ),
581            (
582                "date_string_col",
583                Arc::new(BinaryArray::from_iter_values([
584                    [48, 51, 47, 48, 49, 47, 48, 57],
585                    [48, 51, 47, 48, 49, 47, 48, 57],
586                    [48, 52, 47, 48, 49, 47, 48, 57],
587                    [48, 52, 47, 48, 49, 47, 48, 57],
588                    [48, 50, 47, 48, 49, 47, 48, 57],
589                    [48, 50, 47, 48, 49, 47, 48, 57],
590                    [48, 49, 47, 48, 49, 47, 48, 57],
591                    [48, 49, 47, 48, 49, 47, 48, 57],
592                ])) as _,
593                true,
594            ),
595            (
596                "string_col",
597                Arc::new(BinaryArray::from_iter_values((0..8).map(|x| [48 + x % 2]))) as _,
598                true,
599            ),
600            (
601                "timestamp_col",
602                Arc::new(
603                    TimestampMicrosecondArray::from_iter_values([
604                        1235865600000000, // 2009-03-01T00:00:00.000
605                        1235865660000000, // 2009-03-01T00:01:00.000
606                        1238544000000000, // 2009-04-01T00:00:00.000
607                        1238544060000000, // 2009-04-01T00:01:00.000
608                        1233446400000000, // 2009-02-01T00:00:00.000
609                        1233446460000000, // 2009-02-01T00:01:00.000
610                        1230768000000000, // 2009-01-01T00:00:00.000
611                        1230768060000000, // 2009-01-01T00:01:00.000
612                    ])
613                    .with_timezone("+00:00"),
614                ) as _,
615                true,
616            ),
617        ])
618        .unwrap();
619
620        for file in files {
621            let file = arrow_test_data(file);
622
623            assert_eq!(read_file(&file, 8, false), expected);
624            assert_eq!(read_file(&file, 3, false), expected);
625        }
626    }
627
628    #[test]
629    fn test_alltypes_dictionary() {
630        let file = "avro/alltypes_dictionary.avro";
631        let expected = RecordBatch::try_from_iter_with_nullable([
632            ("id", Arc::new(Int32Array::from(vec![0, 1])) as _, true),
633            (
634                "bool_col",
635                Arc::new(BooleanArray::from(vec![Some(true), Some(false)])) as _,
636                true,
637            ),
638            (
639                "tinyint_col",
640                Arc::new(Int32Array::from(vec![0, 1])) as _,
641                true,
642            ),
643            (
644                "smallint_col",
645                Arc::new(Int32Array::from(vec![0, 1])) as _,
646                true,
647            ),
648            ("int_col", Arc::new(Int32Array::from(vec![0, 1])) as _, true),
649            (
650                "bigint_col",
651                Arc::new(Int64Array::from(vec![0, 10])) as _,
652                true,
653            ),
654            (
655                "float_col",
656                Arc::new(Float32Array::from(vec![0.0, 1.1])) as _,
657                true,
658            ),
659            (
660                "double_col",
661                Arc::new(Float64Array::from(vec![0.0, 10.1])) as _,
662                true,
663            ),
664            (
665                "date_string_col",
666                Arc::new(BinaryArray::from_iter_values([b"01/01/09", b"01/01/09"])) as _,
667                true,
668            ),
669            (
670                "string_col",
671                Arc::new(BinaryArray::from_iter_values([b"0", b"1"])) as _,
672                true,
673            ),
674            (
675                "timestamp_col",
676                Arc::new(
677                    TimestampMicrosecondArray::from_iter_values([
678                        1230768000000000, // 2009-01-01T00:00:00.000
679                        1230768060000000, // 2009-01-01T00:01:00.000
680                    ])
681                    .with_timezone("+00:00"),
682                ) as _,
683                true,
684            ),
685        ])
686        .unwrap();
687        let file_path = arrow_test_data(file);
688        let batch_large = read_file(&file_path, 8, false);
689        assert_eq!(
690            batch_large, expected,
691            "Decoded RecordBatch does not match for file {file}"
692        );
693        let batch_small = read_file(&file_path, 3, false);
694        assert_eq!(
695            batch_small, expected,
696            "Decoded RecordBatch (batch size 3) does not match for file {file}"
697        );
698    }
699
700    #[test]
701    fn test_alltypes_nulls_plain() {
702        let file = "avro/alltypes_nulls_plain.avro";
703        let expected = RecordBatch::try_from_iter_with_nullable([
704            (
705                "string_col",
706                Arc::new(StringArray::from(vec![None::<&str>])) as _,
707                true,
708            ),
709            ("int_col", Arc::new(Int32Array::from(vec![None])) as _, true),
710            (
711                "bool_col",
712                Arc::new(BooleanArray::from(vec![None])) as _,
713                true,
714            ),
715            (
716                "bigint_col",
717                Arc::new(Int64Array::from(vec![None])) as _,
718                true,
719            ),
720            (
721                "float_col",
722                Arc::new(Float32Array::from(vec![None])) as _,
723                true,
724            ),
725            (
726                "double_col",
727                Arc::new(Float64Array::from(vec![None])) as _,
728                true,
729            ),
730            (
731                "bytes_col",
732                Arc::new(BinaryArray::from(vec![None::<&[u8]>])) as _,
733                true,
734            ),
735        ])
736        .unwrap();
737        let file_path = arrow_test_data(file);
738        let batch_large = read_file(&file_path, 8, false);
739        assert_eq!(
740            batch_large, expected,
741            "Decoded RecordBatch does not match for file {file}"
742        );
743        let batch_small = read_file(&file_path, 3, false);
744        assert_eq!(
745            batch_small, expected,
746            "Decoded RecordBatch (batch size 3) does not match for file {file}"
747        );
748    }
749
750    #[test]
751    fn test_binary() {
752        let file = arrow_test_data("avro/binary.avro");
753        let batch = read_file(&file, 8, false);
754        let expected = RecordBatch::try_from_iter_with_nullable([(
755            "foo",
756            Arc::new(BinaryArray::from_iter_values(vec![
757                b"\x00".as_ref(),
758                b"\x01".as_ref(),
759                b"\x02".as_ref(),
760                b"\x03".as_ref(),
761                b"\x04".as_ref(),
762                b"\x05".as_ref(),
763                b"\x06".as_ref(),
764                b"\x07".as_ref(),
765                b"\x08".as_ref(),
766                b"\t".as_ref(),
767                b"\n".as_ref(),
768                b"\x0b".as_ref(),
769            ])) as Arc<dyn Array>,
770            true,
771        )])
772        .unwrap();
773        assert_eq!(batch, expected);
774    }
775
776    #[test]
777    fn test_decode_stream_with_schema() {
778        struct TestCase<'a> {
779            name: &'a str,
780            schema: &'a str,
781            expected_error: Option<&'a str>,
782        }
783        let tests = vec![
784            TestCase {
785                name: "success",
786                schema: r#"{"type":"record","name":"test","fields":[{"name":"f2","type":"string"}]}"#,
787                expected_error: None,
788            },
789            TestCase {
790                name: "valid schema invalid data",
791                schema: r#"{"type":"record","name":"test","fields":[{"name":"f2","type":"long"}]}"#,
792                expected_error: Some("did not consume all bytes"),
793            },
794        ];
795        for test in tests {
796            let schema_s2: crate::schema::Schema = serde_json::from_str(test.schema).unwrap();
797            let record_val = "some_string";
798            let mut body = vec![];
799            body.push((record_val.len() as u8) << 1);
800            body.extend_from_slice(record_val.as_bytes());
801            let mut reader_placeholder = Cursor::new(&[] as &[u8]);
802            let builder = ReaderBuilder::new()
803                .with_batch_size(1)
804                .with_schema(schema_s2);
805            let decoder_result = builder.build_decoder(&mut reader_placeholder);
806            let decoder = match decoder_result {
807                Ok(decoder) => decoder,
808                Err(e) => {
809                    if let Some(expected) = test.expected_error {
810                        assert!(
811                            e.to_string().contains(expected),
812                            "Test '{}' failed: unexpected error message at build.\nExpected to contain: '{expected}'\nActual: '{e}'",
813                            test.name,
814                        );
815                        continue;
816                    } else {
817                        panic!("Test '{}' failed at decoder build: {e}", test.name);
818                    }
819                }
820            };
821            let stream = Box::pin(stream::once(async { Bytes::from(body) }));
822            let decoded_stream = decode_stream(decoder, stream);
823            let batches_result: Result<Vec<RecordBatch>, ArrowError> =
824                block_on(decoded_stream.try_collect());
825            match (batches_result, test.expected_error) {
826                (Ok(batches), None) => {
827                    let batch =
828                        arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
829                    let expected_field = Field::new("f2", DataType::Utf8, false);
830                    let expected_schema = Arc::new(Schema::new(vec![expected_field]));
831                    let expected_array = Arc::new(StringArray::from(vec![record_val]));
832                    let expected_batch =
833                        RecordBatch::try_new(expected_schema, vec![expected_array]).unwrap();
834                    assert_eq!(batch, expected_batch, "Test '{}' failed", test.name);
835                    assert_eq!(
836                        batch.schema().field(0).name(),
837                        "f2",
838                        "Test '{}' failed",
839                        test.name
840                    );
841                }
842                (Err(e), Some(expected)) => {
843                    assert!(
844                        e.to_string().contains(expected),
845                        "Test '{}' failed: unexpected error message at decode.\nExpected to contain: '{expected}'\nActual: '{e}'",
846                        test.name,
847                    );
848                }
849                (Ok(batches), Some(expected)) => {
850                    panic!(
851                        "Test '{}' was expected to fail with '{expected}', but it succeeded with: {:?}",
852                        test.name, batches
853                    );
854                }
855                (Err(e), None) => {
856                    panic!(
857                        "Test '{}' was not expected to fail, but it did with '{e}'",
858                        test.name
859                    );
860                }
861            }
862        }
863    }
864
865    #[test]
866    fn test_decimal() {
867        let files = [
868            ("avro/fixed_length_decimal.avro", 25, 2),
869            ("avro/fixed_length_decimal_legacy.avro", 13, 2),
870            ("avro/int32_decimal.avro", 4, 2),
871            ("avro/int64_decimal.avro", 10, 2),
872        ];
873        let decimal_values: Vec<i128> = (1..=24).map(|n| n as i128 * 100).collect();
874        for (file, precision, scale) in files {
875            let file_path = arrow_test_data(file);
876            let actual_batch = read_file(&file_path, 8, false);
877            let expected_array = Decimal128Array::from_iter_values(decimal_values.clone())
878                .with_precision_and_scale(precision, scale)
879                .unwrap();
880            let mut meta = HashMap::new();
881            meta.insert("precision".to_string(), precision.to_string());
882            meta.insert("scale".to_string(), scale.to_string());
883            let field_with_meta = Field::new("value", DataType::Decimal128(precision, scale), true)
884                .with_metadata(meta);
885            let expected_schema = Arc::new(Schema::new(vec![field_with_meta]));
886            let expected_batch =
887                RecordBatch::try_new(expected_schema.clone(), vec![Arc::new(expected_array)])
888                    .expect("Failed to build expected RecordBatch");
889            assert_eq!(
890                actual_batch, expected_batch,
891                "Decoded RecordBatch does not match the expected Decimal128 data for file {file}"
892            );
893            let actual_batch_small = read_file(&file_path, 3, false);
894            assert_eq!(
895                actual_batch_small,
896                expected_batch,
897                "Decoded RecordBatch does not match the expected Decimal128 data for file {file} with batch size 3"
898            );
899        }
900    }
901
902    #[test]
903    fn test_dict_pages_offset_zero() {
904        let file = arrow_test_data("avro/dict-page-offset-zero.avro");
905        let batch = read_file(&file, 32, false);
906        let num_rows = batch.num_rows();
907        let expected_field = Int32Array::from(vec![Some(1552); num_rows]);
908        let expected = RecordBatch::try_from_iter_with_nullable([(
909            "l_partkey",
910            Arc::new(expected_field) as Arc<dyn Array>,
911            true,
912        )])
913        .unwrap();
914        assert_eq!(batch, expected);
915    }
916
917    #[test]
918    fn test_list_columns() {
919        let file = arrow_test_data("avro/list_columns.avro");
920        let mut int64_list_builder = ListBuilder::new(Int64Builder::new());
921        {
922            {
923                let values = int64_list_builder.values();
924                values.append_value(1);
925                values.append_value(2);
926                values.append_value(3);
927            }
928            int64_list_builder.append(true);
929        }
930        {
931            {
932                let values = int64_list_builder.values();
933                values.append_null();
934                values.append_value(1);
935            }
936            int64_list_builder.append(true);
937        }
938        {
939            {
940                let values = int64_list_builder.values();
941                values.append_value(4);
942            }
943            int64_list_builder.append(true);
944        }
945        let int64_list = int64_list_builder.finish();
946        let mut utf8_list_builder = ListBuilder::new(StringBuilder::new());
947        {
948            {
949                let values = utf8_list_builder.values();
950                values.append_value("abc");
951                values.append_value("efg");
952                values.append_value("hij");
953            }
954            utf8_list_builder.append(true);
955        }
956        {
957            utf8_list_builder.append(false);
958        }
959        {
960            {
961                let values = utf8_list_builder.values();
962                values.append_value("efg");
963                values.append_null();
964                values.append_value("hij");
965                values.append_value("xyz");
966            }
967            utf8_list_builder.append(true);
968        }
969        let utf8_list = utf8_list_builder.finish();
970        let expected = RecordBatch::try_from_iter_with_nullable([
971            ("int64_list", Arc::new(int64_list) as Arc<dyn Array>, true),
972            ("utf8_list", Arc::new(utf8_list) as Arc<dyn Array>, true),
973        ])
974        .unwrap();
975        let batch = read_file(&file, 8, false);
976        assert_eq!(batch, expected);
977    }
978
979    #[test]
980    fn test_nested_lists() {
981        use arrow_data::ArrayDataBuilder;
982        let file = arrow_test_data("avro/nested_lists.snappy.avro");
983        let inner_values = StringArray::from(vec![
984            Some("a"),
985            Some("b"),
986            Some("c"),
987            Some("d"),
988            Some("a"),
989            Some("b"),
990            Some("c"),
991            Some("d"),
992            Some("e"),
993            Some("a"),
994            Some("b"),
995            Some("c"),
996            Some("d"),
997            Some("e"),
998            Some("f"),
999        ]);
1000        let inner_offsets = Buffer::from_slice_ref([0, 2, 3, 3, 4, 6, 8, 8, 9, 11, 13, 14, 14, 15]);
1001        let inner_validity = [
1002            true, true, false, true, true, true, false, true, true, true, true, false, true,
1003        ];
1004        let inner_null_buffer = Buffer::from_iter(inner_validity.iter().copied());
1005        let inner_field = Field::new("item", DataType::Utf8, true);
1006        let inner_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(inner_field)))
1007            .len(13)
1008            .add_buffer(inner_offsets)
1009            .add_child_data(inner_values.to_data())
1010            .null_bit_buffer(Some(inner_null_buffer))
1011            .build()
1012            .unwrap();
1013        let inner_list_array = ListArray::from(inner_list_data);
1014        let middle_offsets = Buffer::from_slice_ref([0, 2, 4, 6, 8, 11, 13]);
1015        let middle_validity = [true; 6];
1016        let middle_null_buffer = Buffer::from_iter(middle_validity.iter().copied());
1017        let middle_field = Field::new("item", inner_list_array.data_type().clone(), true);
1018        let middle_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(middle_field)))
1019            .len(6)
1020            .add_buffer(middle_offsets)
1021            .add_child_data(inner_list_array.to_data())
1022            .null_bit_buffer(Some(middle_null_buffer))
1023            .build()
1024            .unwrap();
1025        let middle_list_array = ListArray::from(middle_list_data);
1026        let outer_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
1027        let outer_null_buffer = Buffer::from_slice_ref([0b111]); // all 3 rows valid
1028        let outer_field = Field::new("item", middle_list_array.data_type().clone(), true);
1029        let outer_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(outer_field)))
1030            .len(3)
1031            .add_buffer(outer_offsets)
1032            .add_child_data(middle_list_array.to_data())
1033            .null_bit_buffer(Some(outer_null_buffer))
1034            .build()
1035            .unwrap();
1036        let a_expected = ListArray::from(outer_list_data);
1037        let b_expected = Int32Array::from(vec![1, 1, 1]);
1038        let expected = RecordBatch::try_from_iter_with_nullable([
1039            ("a", Arc::new(a_expected) as Arc<dyn Array>, true),
1040            ("b", Arc::new(b_expected) as Arc<dyn Array>, true),
1041        ])
1042        .unwrap();
1043        let left = read_file(&file, 8, false);
1044        assert_eq!(left, expected, "Mismatch for batch size=8");
1045        let left_small = read_file(&file, 3, false);
1046        assert_eq!(left_small, expected, "Mismatch for batch size=3");
1047    }
1048
1049    #[test]
1050    fn test_simple() {
1051        let tests = [
1052            ("avro/simple_enum.avro", 4, build_expected_enum(), 2),
1053            ("avro/simple_fixed.avro", 2, build_expected_fixed(), 1),
1054        ];
1055
1056        fn build_expected_enum() -> RecordBatch {
1057            // Build the DictionaryArrays for f1, f2, f3
1058            let keys_f1 = Int32Array::from(vec![0, 1, 2, 3]);
1059            let vals_f1 = StringArray::from(vec!["a", "b", "c", "d"]);
1060            let f1_dict =
1061                DictionaryArray::<Int32Type>::try_new(keys_f1, Arc::new(vals_f1)).unwrap();
1062            let keys_f2 = Int32Array::from(vec![2, 3, 0, 1]);
1063            let vals_f2 = StringArray::from(vec!["e", "f", "g", "h"]);
1064            let f2_dict =
1065                DictionaryArray::<Int32Type>::try_new(keys_f2, Arc::new(vals_f2)).unwrap();
1066            let keys_f3 = Int32Array::from(vec![Some(1), Some(2), None, Some(0)]);
1067            let vals_f3 = StringArray::from(vec!["i", "j", "k"]);
1068            let f3_dict =
1069                DictionaryArray::<Int32Type>::try_new(keys_f3, Arc::new(vals_f3)).unwrap();
1070            let dict_type =
1071                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1072            let mut md_f1 = HashMap::new();
1073            md_f1.insert(
1074                "avro.enum.symbols".to_string(),
1075                r#"["a","b","c","d"]"#.to_string(),
1076            );
1077            let f1_field = Field::new("f1", dict_type.clone(), false).with_metadata(md_f1);
1078            let mut md_f2 = HashMap::new();
1079            md_f2.insert(
1080                "avro.enum.symbols".to_string(),
1081                r#"["e","f","g","h"]"#.to_string(),
1082            );
1083            let f2_field = Field::new("f2", dict_type.clone(), false).with_metadata(md_f2);
1084            let mut md_f3 = HashMap::new();
1085            md_f3.insert(
1086                "avro.enum.symbols".to_string(),
1087                r#"["i","j","k"]"#.to_string(),
1088            );
1089            let f3_field = Field::new("f3", dict_type.clone(), true).with_metadata(md_f3);
1090            let expected_schema = Arc::new(Schema::new(vec![f1_field, f2_field, f3_field]));
1091            RecordBatch::try_new(
1092                expected_schema,
1093                vec![
1094                    Arc::new(f1_dict) as Arc<dyn Array>,
1095                    Arc::new(f2_dict) as Arc<dyn Array>,
1096                    Arc::new(f3_dict) as Arc<dyn Array>,
1097                ],
1098            )
1099            .unwrap()
1100        }
1101
1102        fn build_expected_fixed() -> RecordBatch {
1103            let f1 =
1104                FixedSizeBinaryArray::try_from_iter(vec![b"abcde", b"12345"].into_iter()).unwrap();
1105            let f2 =
1106                FixedSizeBinaryArray::try_from_iter(vec![b"fghijklmno", b"1234567890"].into_iter())
1107                    .unwrap();
1108            let f3 = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1109                vec![Some(b"ABCDEF" as &[u8]), None].into_iter(),
1110                6,
1111            )
1112            .unwrap();
1113            let expected_schema = Arc::new(Schema::new(vec![
1114                Field::new("f1", DataType::FixedSizeBinary(5), false),
1115                Field::new("f2", DataType::FixedSizeBinary(10), false),
1116                Field::new("f3", DataType::FixedSizeBinary(6), true),
1117            ]));
1118            RecordBatch::try_new(
1119                expected_schema,
1120                vec![
1121                    Arc::new(f1) as Arc<dyn Array>,
1122                    Arc::new(f2) as Arc<dyn Array>,
1123                    Arc::new(f3) as Arc<dyn Array>,
1124                ],
1125            )
1126            .unwrap()
1127        }
1128        for (file_name, batch_size, expected, alt_batch_size) in tests {
1129            let file = arrow_test_data(file_name);
1130            let actual = read_file(&file, batch_size, false);
1131            assert_eq!(actual, expected);
1132            let actual2 = read_file(&file, alt_batch_size, false);
1133            assert_eq!(actual2, expected);
1134        }
1135    }
1136
1137    #[test]
1138    fn test_single_nan() {
1139        let file = arrow_test_data("avro/single_nan.avro");
1140        let actual = read_file(&file, 1, false);
1141        use arrow_array::Float64Array;
1142        let schema = Arc::new(Schema::new(vec![Field::new(
1143            "mycol",
1144            DataType::Float64,
1145            true,
1146        )]));
1147        let col = Float64Array::from(vec![None]);
1148        let expected = RecordBatch::try_new(schema, vec![Arc::new(col)]).unwrap();
1149        assert_eq!(actual, expected);
1150        let actual2 = read_file(&file, 2, false);
1151        assert_eq!(actual2, expected);
1152    }
1153
1154    #[test]
1155    fn test_duration_uuid() {
1156        let batch = read_file("test/data/duration_uuid.avro", 4, false);
1157        let schema = batch.schema();
1158        let fields = schema.fields();
1159        assert_eq!(fields.len(), 2);
1160        assert_eq!(fields[0].name(), "duration_field");
1161        assert_eq!(
1162            fields[0].data_type(),
1163            &DataType::Interval(IntervalUnit::MonthDayNano)
1164        );
1165        assert_eq!(fields[1].name(), "uuid_field");
1166        assert_eq!(fields[1].data_type(), &DataType::FixedSizeBinary(16));
1167        assert_eq!(batch.num_rows(), 4);
1168        assert_eq!(batch.num_columns(), 2);
1169        let duration_array = batch
1170            .column(0)
1171            .as_any()
1172            .downcast_ref::<IntervalMonthDayNanoArray>()
1173            .unwrap();
1174        let expected_duration_array: IntervalMonthDayNanoArray = [
1175            Some(IntervalMonthDayNanoType::make_value(1, 15, 500_000_000)),
1176            Some(IntervalMonthDayNanoType::make_value(0, 5, 2_500_000_000)),
1177            Some(IntervalMonthDayNanoType::make_value(2, 0, 0)),
1178            Some(IntervalMonthDayNanoType::make_value(12, 31, 999_000_000)),
1179        ]
1180        .iter()
1181        .copied()
1182        .collect();
1183        assert_eq!(&expected_duration_array, duration_array);
1184        let uuid_array = batch
1185            .column(1)
1186            .as_any()
1187            .downcast_ref::<FixedSizeBinaryArray>()
1188            .unwrap();
1189        let expected_uuid_array = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1190            [
1191                Some([
1192                    0xfe, 0x7b, 0xc3, 0x0b, 0x4c, 0xe8, 0x4c, 0x5e, 0xb6, 0x7c, 0x22, 0x34, 0xa2,
1193                    0xd3, 0x8e, 0x66,
1194                ]),
1195                Some([
1196                    0xb3, 0x3f, 0x2a, 0xd7, 0x97, 0xb4, 0x4d, 0xe1, 0x8b, 0xfe, 0x94, 0x94, 0x1d,
1197                    0x60, 0x15, 0x6e,
1198                ]),
1199                Some([
1200                    0x5f, 0x74, 0x92, 0x64, 0x07, 0x4b, 0x40, 0x05, 0x84, 0xbf, 0x11, 0x5e, 0xa8,
1201                    0x4e, 0xd2, 0x0a,
1202                ]),
1203                Some([
1204                    0x08, 0x26, 0xcc, 0x06, 0xd2, 0xe3, 0x45, 0x99, 0xb4, 0xad, 0xaf, 0x5f, 0xa6,
1205                    0x90, 0x5c, 0xdb,
1206                ]),
1207            ]
1208            .into_iter(),
1209            16,
1210        )
1211        .unwrap();
1212        assert_eq!(&expected_uuid_array, uuid_array);
1213    }
1214
1215    #[test]
1216    fn test_datapage_v2() {
1217        let file = arrow_test_data("avro/datapage_v2.snappy.avro");
1218        let batch = read_file(&file, 8, false);
1219        let a = StringArray::from(vec![
1220            Some("abc"),
1221            Some("abc"),
1222            Some("abc"),
1223            None,
1224            Some("abc"),
1225        ]);
1226        let b = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
1227        let c = Float64Array::from(vec![Some(2.0), Some(3.0), Some(4.0), Some(5.0), Some(2.0)]);
1228        let d = BooleanArray::from(vec![
1229            Some(true),
1230            Some(true),
1231            Some(true),
1232            Some(false),
1233            Some(true),
1234        ]);
1235        let e_values = Int32Array::from(vec![
1236            Some(1),
1237            Some(2),
1238            Some(3),
1239            Some(1),
1240            Some(2),
1241            Some(3),
1242            Some(1),
1243            Some(2),
1244        ]);
1245        let e_offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 3, 3, 3, 6, 8]));
1246        let e_validity = Some(NullBuffer::from(vec![true, false, false, true, true]));
1247        let field_e = Arc::new(Field::new("item", DataType::Int32, true));
1248        let e = ListArray::new(field_e, e_offsets, Arc::new(e_values), e_validity);
1249        let expected = RecordBatch::try_from_iter_with_nullable([
1250            ("a", Arc::new(a) as Arc<dyn Array>, true),
1251            ("b", Arc::new(b) as Arc<dyn Array>, true),
1252            ("c", Arc::new(c) as Arc<dyn Array>, true),
1253            ("d", Arc::new(d) as Arc<dyn Array>, true),
1254            ("e", Arc::new(e) as Arc<dyn Array>, true),
1255        ])
1256        .unwrap();
1257        assert_eq!(batch, expected);
1258    }
1259
1260    #[test]
1261    fn test_nested_records() {
1262        let f1_f1_1 = StringArray::from(vec!["aaa", "bbb"]);
1263        let f1_f1_2 = Int32Array::from(vec![10, 20]);
1264        let rounded_pi = (std::f64::consts::PI * 100.0).round() / 100.0;
1265        let f1_f1_3_1 = Float64Array::from(vec![rounded_pi, rounded_pi]);
1266        let f1_f1_3 = StructArray::from(vec![(
1267            Arc::new(Field::new("f1_3_1", DataType::Float64, false)),
1268            Arc::new(f1_f1_3_1) as Arc<dyn Array>,
1269        )]);
1270        let f1_expected = StructArray::from(vec![
1271            (
1272                Arc::new(Field::new("f1_1", DataType::Utf8, false)),
1273                Arc::new(f1_f1_1) as Arc<dyn Array>,
1274            ),
1275            (
1276                Arc::new(Field::new("f1_2", DataType::Int32, false)),
1277                Arc::new(f1_f1_2) as Arc<dyn Array>,
1278            ),
1279            (
1280                Arc::new(Field::new(
1281                    "f1_3",
1282                    DataType::Struct(Fields::from(vec![Field::new(
1283                        "f1_3_1",
1284                        DataType::Float64,
1285                        false,
1286                    )])),
1287                    false,
1288                )),
1289                Arc::new(f1_f1_3) as Arc<dyn Array>,
1290            ),
1291        ]);
1292
1293        let f2_fields = vec![
1294            Field::new("f2_1", DataType::Boolean, false),
1295            Field::new("f2_2", DataType::Float32, false),
1296        ];
1297        let f2_struct_builder = StructBuilder::new(
1298            f2_fields
1299                .iter()
1300                .map(|f| Arc::new(f.clone()))
1301                .collect::<Vec<Arc<Field>>>(),
1302            vec![
1303                Box::new(BooleanBuilder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>,
1304                Box::new(Float32Builder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>,
1305            ],
1306        );
1307        let mut f2_list_builder = ListBuilder::new(f2_struct_builder);
1308        {
1309            let struct_builder = f2_list_builder.values();
1310            struct_builder.append(true);
1311            {
1312                let b = struct_builder.field_builder::<BooleanBuilder>(0).unwrap();
1313                b.append_value(true);
1314            }
1315            {
1316                let b = struct_builder.field_builder::<Float32Builder>(1).unwrap();
1317                b.append_value(1.2_f32);
1318            }
1319            struct_builder.append(true);
1320            {
1321                let b = struct_builder.field_builder::<BooleanBuilder>(0).unwrap();
1322                b.append_value(true);
1323            }
1324            {
1325                let b = struct_builder.field_builder::<Float32Builder>(1).unwrap();
1326                b.append_value(2.2_f32);
1327            }
1328            f2_list_builder.append(true);
1329        }
1330        {
1331            let struct_builder = f2_list_builder.values();
1332            struct_builder.append(true);
1333            {
1334                let b = struct_builder.field_builder::<BooleanBuilder>(0).unwrap();
1335                b.append_value(false);
1336            }
1337            {
1338                let b = struct_builder.field_builder::<Float32Builder>(1).unwrap();
1339                b.append_value(10.2_f32);
1340            }
1341            f2_list_builder.append(true);
1342        }
1343
1344        let list_array_with_nullable_items = f2_list_builder.finish();
1345
1346        let item_field = Arc::new(Field::new(
1347            "item",
1348            list_array_with_nullable_items.values().data_type().clone(),
1349            false,
1350        ));
1351        let list_data_type = DataType::List(item_field);
1352
1353        let f2_array_data = list_array_with_nullable_items
1354            .to_data()
1355            .into_builder()
1356            .data_type(list_data_type)
1357            .build()
1358            .unwrap();
1359        let f2_expected = ListArray::from(f2_array_data);
1360
1361        let mut f3_struct_builder = StructBuilder::new(
1362            vec![Arc::new(Field::new("f3_1", DataType::Utf8, false))],
1363            vec![Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>],
1364        );
1365        f3_struct_builder.append(true);
1366        {
1367            let b = f3_struct_builder.field_builder::<StringBuilder>(0).unwrap();
1368            b.append_value("xyz");
1369        }
1370        f3_struct_builder.append(false);
1371        {
1372            let b = f3_struct_builder.field_builder::<StringBuilder>(0).unwrap();
1373            b.append_null();
1374        }
1375        let f3_expected = f3_struct_builder.finish();
1376        let f4_fields = [Field::new("f4_1", DataType::Int64, false)];
1377        let f4_struct_builder = StructBuilder::new(
1378            f4_fields
1379                .iter()
1380                .map(|f| Arc::new(f.clone()))
1381                .collect::<Vec<Arc<Field>>>(),
1382            vec![Box::new(Int64Builder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>],
1383        );
1384        let mut f4_list_builder = ListBuilder::new(f4_struct_builder);
1385        {
1386            let struct_builder = f4_list_builder.values();
1387            struct_builder.append(true);
1388            {
1389                let b = struct_builder.field_builder::<Int64Builder>(0).unwrap();
1390                b.append_value(200);
1391            }
1392            struct_builder.append(false);
1393            {
1394                let b = struct_builder.field_builder::<Int64Builder>(0).unwrap();
1395                b.append_null();
1396            }
1397            f4_list_builder.append(true);
1398        }
1399        {
1400            let struct_builder = f4_list_builder.values();
1401            struct_builder.append(false);
1402            {
1403                let b = struct_builder.field_builder::<Int64Builder>(0).unwrap();
1404                b.append_null();
1405            }
1406            struct_builder.append(true);
1407            {
1408                let b = struct_builder.field_builder::<Int64Builder>(0).unwrap();
1409                b.append_value(300);
1410            }
1411            f4_list_builder.append(true);
1412        }
1413        let f4_expected = f4_list_builder.finish();
1414
1415        let expected = RecordBatch::try_from_iter_with_nullable([
1416            ("f1", Arc::new(f1_expected) as Arc<dyn Array>, false),
1417            ("f2", Arc::new(f2_expected) as Arc<dyn Array>, false),
1418            ("f3", Arc::new(f3_expected) as Arc<dyn Array>, true),
1419            ("f4", Arc::new(f4_expected) as Arc<dyn Array>, false),
1420        ])
1421        .unwrap();
1422
1423        let file = arrow_test_data("avro/nested_records.avro");
1424        let batch_large = read_file(&file, 8, false);
1425        assert_eq!(
1426            batch_large, expected,
1427            "Decoded RecordBatch does not match expected data for nested records (batch size 8)"
1428        );
1429        let batch_small = read_file(&file, 3, false);
1430        assert_eq!(
1431            batch_small, expected,
1432            "Decoded RecordBatch does not match expected data for nested records (batch size 3)"
1433        );
1434    }
1435
1436    #[test]
1437    fn test_repeated_no_annotation() {
1438        let file = arrow_test_data("avro/repeated_no_annotation.avro");
1439        let batch_large = read_file(&file, 8, false);
1440        use arrow_array::{Int32Array, Int64Array, ListArray, StringArray, StructArray};
1441        use arrow_buffer::Buffer;
1442        use arrow_schema::{DataType, Field, Fields};
1443        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1444        let number_array = Int64Array::from(vec![
1445            Some(5555555555),
1446            Some(1111111111),
1447            Some(1111111111),
1448            Some(2222222222),
1449            Some(3333333333),
1450        ]);
1451        let kind_array =
1452            StringArray::from(vec![None, Some("home"), Some("home"), None, Some("mobile")]);
1453        let phone_fields = Fields::from(vec![
1454            Field::new("number", DataType::Int64, true),
1455            Field::new("kind", DataType::Utf8, true),
1456        ]);
1457        let phone_struct_data = ArrayDataBuilder::new(DataType::Struct(phone_fields))
1458            .len(5)
1459            .child_data(vec![number_array.into_data(), kind_array.into_data()])
1460            .build()
1461            .unwrap();
1462        let phone_struct_array = StructArray::from(phone_struct_data);
1463        let phone_list_offsets = Buffer::from_slice_ref([0, 0, 0, 0, 1, 2, 5]);
1464        let phone_list_validity = Buffer::from_iter([false, false, true, true, true, true]);
1465        let phone_item_field = Field::new("item", phone_struct_array.data_type().clone(), true);
1466        let phone_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(phone_item_field)))
1467            .len(6)
1468            .add_buffer(phone_list_offsets)
1469            .null_bit_buffer(Some(phone_list_validity))
1470            .child_data(vec![phone_struct_array.into_data()])
1471            .build()
1472            .unwrap();
1473        let phone_list_array = ListArray::from(phone_list_data);
1474        let phone_numbers_validity = Buffer::from_iter([false, false, true, true, true, true]);
1475        let phone_numbers_field = Field::new("phone", phone_list_array.data_type().clone(), true);
1476        let phone_numbers_struct_data =
1477            ArrayDataBuilder::new(DataType::Struct(Fields::from(vec![phone_numbers_field])))
1478                .len(6)
1479                .null_bit_buffer(Some(phone_numbers_validity))
1480                .child_data(vec![phone_list_array.into_data()])
1481                .build()
1482                .unwrap();
1483        let phone_numbers_struct_array = StructArray::from(phone_numbers_struct_data);
1484        let expected = arrow_array::RecordBatch::try_from_iter_with_nullable([
1485            ("id", Arc::new(id_array) as _, true),
1486            (
1487                "phoneNumbers",
1488                Arc::new(phone_numbers_struct_array) as _,
1489                true,
1490            ),
1491        ])
1492        .unwrap();
1493        assert_eq!(batch_large, expected, "Mismatch for batch_size=8");
1494        let batch_small = read_file(&file, 3, false);
1495        assert_eq!(batch_small, expected, "Mismatch for batch_size=3");
1496    }
1497
1498    #[test]
1499    fn test_nonnullable_impala() {
1500        let file = arrow_test_data("avro/nonnullable.impala.avro");
1501        let id = Int64Array::from(vec![Some(8)]);
1502        let mut int_array_builder = ListBuilder::new(Int32Builder::new());
1503        {
1504            let vb = int_array_builder.values();
1505            vb.append_value(-1);
1506        }
1507        int_array_builder.append(true); // finalize one sub-list
1508        let int_array = int_array_builder.finish();
1509        let mut iaa_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
1510        {
1511            let inner_list_builder = iaa_builder.values();
1512            {
1513                let vb = inner_list_builder.values();
1514                vb.append_value(-1);
1515                vb.append_value(-2);
1516            }
1517            inner_list_builder.append(true);
1518            inner_list_builder.append(true);
1519        }
1520        iaa_builder.append(true);
1521        let int_array_array = iaa_builder.finish();
1522        use arrow_array::builder::MapFieldNames;
1523        let field_names = MapFieldNames {
1524            entry: "entries".to_string(),
1525            key: "key".to_string(),
1526            value: "value".to_string(),
1527        };
1528        let mut int_map_builder =
1529            MapBuilder::new(Some(field_names), StringBuilder::new(), Int32Builder::new());
1530        {
1531            let (keys, vals) = int_map_builder.entries();
1532            keys.append_value("k1");
1533            vals.append_value(-1);
1534        }
1535        int_map_builder.append(true).unwrap(); // finalize map for row 0
1536        let int_map = int_map_builder.finish();
1537        let field_names2 = MapFieldNames {
1538            entry: "entries".to_string(),
1539            key: "key".to_string(),
1540            value: "value".to_string(),
1541        };
1542        let mut ima_builder = ListBuilder::new(MapBuilder::new(
1543            Some(field_names2),
1544            StringBuilder::new(),
1545            Int32Builder::new(),
1546        ));
1547        {
1548            let map_builder = ima_builder.values();
1549            map_builder.append(true).unwrap();
1550            {
1551                let (keys, vals) = map_builder.entries();
1552                keys.append_value("k1");
1553                vals.append_value(1);
1554            }
1555            map_builder.append(true).unwrap();
1556            map_builder.append(true).unwrap();
1557            map_builder.append(true).unwrap();
1558        }
1559        ima_builder.append(true);
1560        let int_map_array_ = ima_builder.finish();
1561        let mut nested_sb = StructBuilder::new(
1562            vec![
1563                Arc::new(Field::new("a", DataType::Int32, true)),
1564                Arc::new(Field::new(
1565                    "B",
1566                    DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1567                    true,
1568                )),
1569                Arc::new(Field::new(
1570                    "c",
1571                    DataType::Struct(
1572                        vec![Field::new(
1573                            "D",
1574                            DataType::List(Arc::new(Field::new(
1575                                "item",
1576                                DataType::List(Arc::new(Field::new(
1577                                    "item",
1578                                    DataType::Struct(
1579                                        vec![
1580                                            Field::new("e", DataType::Int32, true),
1581                                            Field::new("f", DataType::Utf8, true),
1582                                        ]
1583                                        .into(),
1584                                    ),
1585                                    true,
1586                                ))),
1587                                true,
1588                            ))),
1589                            true,
1590                        )]
1591                        .into(),
1592                    ),
1593                    true,
1594                )),
1595                Arc::new(Field::new(
1596                    "G",
1597                    DataType::Map(
1598                        Arc::new(Field::new(
1599                            "entries",
1600                            DataType::Struct(
1601                                vec![
1602                                    Field::new("key", DataType::Utf8, false),
1603                                    Field::new(
1604                                        "value",
1605                                        DataType::Struct(
1606                                            vec![Field::new(
1607                                                "h",
1608                                                DataType::Struct(
1609                                                    vec![Field::new(
1610                                                        "i",
1611                                                        DataType::List(Arc::new(Field::new(
1612                                                            "item",
1613                                                            DataType::Float64,
1614                                                            true,
1615                                                        ))),
1616                                                        true,
1617                                                    )]
1618                                                    .into(),
1619                                                ),
1620                                                true,
1621                                            )]
1622                                            .into(),
1623                                        ),
1624                                        true,
1625                                    ),
1626                                ]
1627                                .into(),
1628                            ),
1629                            false,
1630                        )),
1631                        false,
1632                    ),
1633                    true,
1634                )),
1635            ],
1636            vec![
1637                Box::new(Int32Builder::new()),
1638                Box::new(ListBuilder::new(Int32Builder::new())),
1639                {
1640                    let d_field = Field::new(
1641                        "D",
1642                        DataType::List(Arc::new(Field::new(
1643                            "item",
1644                            DataType::List(Arc::new(Field::new(
1645                                "item",
1646                                DataType::Struct(
1647                                    vec![
1648                                        Field::new("e", DataType::Int32, true),
1649                                        Field::new("f", DataType::Utf8, true),
1650                                    ]
1651                                    .into(),
1652                                ),
1653                                true,
1654                            ))),
1655                            true,
1656                        ))),
1657                        true,
1658                    );
1659                    Box::new(StructBuilder::new(
1660                        vec![Arc::new(d_field)],
1661                        vec![Box::new({
1662                            let ef_struct_builder = StructBuilder::new(
1663                                vec![
1664                                    Arc::new(Field::new("e", DataType::Int32, true)),
1665                                    Arc::new(Field::new("f", DataType::Utf8, true)),
1666                                ],
1667                                vec![
1668                                    Box::new(Int32Builder::new()),
1669                                    Box::new(StringBuilder::new()),
1670                                ],
1671                            );
1672                            let list_of_ef = ListBuilder::new(ef_struct_builder);
1673                            ListBuilder::new(list_of_ef)
1674                        })],
1675                    ))
1676                },
1677                {
1678                    let map_field_names = MapFieldNames {
1679                        entry: "entries".to_string(),
1680                        key: "key".to_string(),
1681                        value: "value".to_string(),
1682                    };
1683                    let i_list_builder = ListBuilder::new(Float64Builder::new());
1684                    let h_struct = StructBuilder::new(
1685                        vec![Arc::new(Field::new(
1686                            "i",
1687                            DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
1688                            true,
1689                        ))],
1690                        vec![Box::new(i_list_builder)],
1691                    );
1692                    let g_value_builder = StructBuilder::new(
1693                        vec![Arc::new(Field::new(
1694                            "h",
1695                            DataType::Struct(
1696                                vec![Field::new(
1697                                    "i",
1698                                    DataType::List(Arc::new(Field::new(
1699                                        "item",
1700                                        DataType::Float64,
1701                                        true,
1702                                    ))),
1703                                    true,
1704                                )]
1705                                .into(),
1706                            ),
1707                            true,
1708                        ))],
1709                        vec![Box::new(h_struct)],
1710                    );
1711                    Box::new(MapBuilder::new(
1712                        Some(map_field_names),
1713                        StringBuilder::new(),
1714                        g_value_builder,
1715                    ))
1716                },
1717            ],
1718        );
1719        nested_sb.append(true);
1720        {
1721            let a_builder = nested_sb.field_builder::<Int32Builder>(0).unwrap();
1722            a_builder.append_value(-1);
1723        }
1724        {
1725            let b_builder = nested_sb
1726                .field_builder::<ListBuilder<Int32Builder>>(1)
1727                .unwrap();
1728            {
1729                let vb = b_builder.values();
1730                vb.append_value(-1);
1731            }
1732            b_builder.append(true);
1733        }
1734        {
1735            let c_struct_builder = nested_sb.field_builder::<StructBuilder>(2).unwrap();
1736            c_struct_builder.append(true);
1737            let d_list_builder = c_struct_builder
1738                .field_builder::<ListBuilder<ListBuilder<StructBuilder>>>(0)
1739                .unwrap();
1740            {
1741                let sub_list_builder = d_list_builder.values();
1742                {
1743                    let ef_struct = sub_list_builder.values();
1744                    ef_struct.append(true);
1745                    {
1746                        let e_b = ef_struct.field_builder::<Int32Builder>(0).unwrap();
1747                        e_b.append_value(-1);
1748                        let f_b = ef_struct.field_builder::<StringBuilder>(1).unwrap();
1749                        f_b.append_value("nonnullable");
1750                    }
1751                    sub_list_builder.append(true);
1752                }
1753                d_list_builder.append(true);
1754            }
1755        }
1756        {
1757            let g_map_builder = nested_sb
1758                .field_builder::<MapBuilder<StringBuilder, StructBuilder>>(3)
1759                .unwrap();
1760            g_map_builder.append(true).unwrap();
1761        }
1762        let nested_struct = nested_sb.finish();
1763        let expected = RecordBatch::try_from_iter_with_nullable([
1764            ("ID", Arc::new(id) as Arc<dyn Array>, true),
1765            ("Int_Array", Arc::new(int_array), true),
1766            ("int_array_array", Arc::new(int_array_array), true),
1767            ("Int_Map", Arc::new(int_map), true),
1768            ("int_map_array", Arc::new(int_map_array_), true),
1769            ("nested_Struct", Arc::new(nested_struct), true),
1770        ])
1771        .unwrap();
1772        let batch_large = read_file(&file, 8, false);
1773        assert_eq!(batch_large, expected, "Mismatch for batch_size=8");
1774        let batch_small = read_file(&file, 3, false);
1775        assert_eq!(batch_small, expected, "Mismatch for batch_size=3");
1776    }
1777
1778    #[test]
1779    fn test_nonnullable_impala_strict() {
1780        let file = arrow_test_data("avro/nonnullable.impala.avro");
1781        let err = read_file_strict(&file, 8, false).unwrap_err();
1782        assert!(err.to_string().contains(
1783            "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1784        ));
1785    }
1786
1787    #[test]
1788    fn test_nullable_impala() {
1789        let file = arrow_test_data("avro/nullable.impala.avro");
1790        let batch1 = read_file(&file, 3, false);
1791        let batch2 = read_file(&file, 8, false);
1792        assert_eq!(batch1, batch2);
1793        let batch = batch1;
1794        assert_eq!(batch.num_rows(), 7);
1795        let id_array = batch
1796            .column(0)
1797            .as_any()
1798            .downcast_ref::<Int64Array>()
1799            .expect("id column should be an Int64Array");
1800        let expected_ids = [1, 2, 3, 4, 5, 6, 7];
1801        for (i, &expected_id) in expected_ids.iter().enumerate() {
1802            assert_eq!(id_array.value(i), expected_id, "Mismatch in id at row {i}",);
1803        }
1804        let int_array = batch
1805            .column(1)
1806            .as_any()
1807            .downcast_ref::<ListArray>()
1808            .expect("int_array column should be a ListArray");
1809        {
1810            let offsets = int_array.value_offsets();
1811            let start = offsets[0] as usize;
1812            let end = offsets[1] as usize;
1813            let values = int_array
1814                .values()
1815                .as_any()
1816                .downcast_ref::<Int32Array>()
1817                .expect("Values of int_array should be an Int32Array");
1818            let row0: Vec<Option<i32>> = (start..end).map(|i| Some(values.value(i))).collect();
1819            assert_eq!(
1820                row0,
1821                vec![Some(1), Some(2), Some(3)],
1822                "Mismatch in int_array row 0"
1823            );
1824        }
1825        let nested_struct = batch
1826            .column(5)
1827            .as_any()
1828            .downcast_ref::<StructArray>()
1829            .expect("nested_struct column should be a StructArray");
1830        let a_array = nested_struct
1831            .column_by_name("A")
1832            .expect("Field A should exist in nested_struct")
1833            .as_any()
1834            .downcast_ref::<Int32Array>()
1835            .expect("Field A should be an Int32Array");
1836        assert_eq!(a_array.value(0), 1, "Mismatch in nested_struct.A at row 0");
1837        assert!(
1838            !a_array.is_valid(1),
1839            "Expected null in nested_struct.A at row 1"
1840        );
1841        assert!(
1842            !a_array.is_valid(3),
1843            "Expected null in nested_struct.A at row 3"
1844        );
1845        assert_eq!(a_array.value(6), 7, "Mismatch in nested_struct.A at row 6");
1846    }
1847
1848    #[test]
1849    fn test_nullable_impala_strict() {
1850        let file = arrow_test_data("avro/nullable.impala.avro");
1851        let err = read_file_strict(&file, 8, false).unwrap_err();
1852        assert!(err.to_string().contains(
1853            "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1854        ));
1855    }
1856}