1use crate::codec::AvroFieldBuilder;
94use crate::schema::Schema as AvroSchema;
95use arrow_array::{RecordBatch, RecordBatchReader};
96use arrow_schema::{ArrowError, SchemaRef};
97use block::BlockDecoder;
98use header::{Header, HeaderDecoder};
99use record::RecordDecoder;
100use std::io::BufRead;
101
102mod block;
103mod cursor;
104mod header;
105mod record;
106mod vlq;
107
108fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
110 let mut decoder = HeaderDecoder::default();
111 loop {
112 let buf = reader.fill_buf()?;
113 if buf.is_empty() {
114 break;
115 }
116 let read = buf.len();
117 let decoded = decoder.decode(buf)?;
118 reader.consume(decoded);
119 if decoded != read {
120 break;
121 }
122 }
123 decoder.flush().ok_or_else(|| {
124 ArrowError::ParseError("Unexpected EOF while reading Avro header".to_string())
125 })
126}
127
128#[derive(Debug)]
130pub struct Decoder {
131 record_decoder: RecordDecoder,
132 batch_size: usize,
133 decoded_rows: usize,
134}
135
136impl Decoder {
137 fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
138 Self {
139 record_decoder,
140 batch_size,
141 decoded_rows: 0,
142 }
143 }
144
145 pub fn schema(&self) -> SchemaRef {
147 self.record_decoder.schema().clone()
148 }
149
150 pub fn batch_size(&self) -> usize {
152 self.batch_size
153 }
154
155 pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
161 let mut total_consumed = 0usize;
162 while total_consumed < data.len() && self.decoded_rows < self.batch_size {
163 let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?;
164 total_consumed += consumed;
169 self.decoded_rows += 1;
170 }
171 Ok(total_consumed)
172 }
173
174 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
177 if self.decoded_rows == 0 {
178 Ok(None)
179 } else {
180 let batch = self.record_decoder.flush()?;
181 self.decoded_rows = 0;
182 Ok(Some(batch))
183 }
184 }
185
186 pub fn capacity(&self) -> usize {
188 self.batch_size.saturating_sub(self.decoded_rows)
189 }
190
191 pub fn batch_is_full(&self) -> bool {
193 self.capacity() == 0
194 }
195}
196
197#[derive(Debug)]
200pub struct ReaderBuilder {
201 batch_size: usize,
202 strict_mode: bool,
203 utf8_view: bool,
204 schema: Option<AvroSchema<'static>>,
205}
206
207impl Default for ReaderBuilder {
208 fn default() -> Self {
209 Self {
210 batch_size: 1024,
211 strict_mode: false,
212 utf8_view: false,
213 schema: None,
214 }
215 }
216}
217
218impl ReaderBuilder {
219 pub fn new() -> Self {
225 Self::default()
226 }
227
228 fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result<RecordDecoder, ArrowError> {
229 let root_field = AvroFieldBuilder::new(schema)
230 .with_utf8view(self.utf8_view)
231 .with_strict_mode(self.strict_mode)
232 .build()?;
233 RecordDecoder::try_new_with_options(root_field.data_type(), self.utf8_view)
234 }
235
236 fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> {
237 let header = read_header(reader)?;
238 let record_decoder = if let Some(schema) = &self.schema {
239 self.make_record_decoder(schema)?
240 } else {
241 let avro_schema: Option<AvroSchema<'_>> = header
242 .schema()
243 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
244 let avro_schema = avro_schema.ok_or_else(|| {
245 ArrowError::ParseError("No Avro schema present in file header".to_string())
246 })?;
247 self.make_record_decoder(&avro_schema)?
248 };
249 let decoder = Decoder::new(record_decoder, self.batch_size);
250 Ok((header, decoder))
251 }
252
253 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
255 self.batch_size = batch_size;
256 self
257 }
258
259 pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
264 self.utf8_view = utf8_view;
265 self
266 }
267
268 pub fn use_utf8view(&self) -> bool {
270 self.utf8_view
271 }
272
273 pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
275 self.strict_mode = strict_mode;
276 self
277 }
278
279 pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
283 self.schema = Some(schema);
284 self
285 }
286
287 pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, ArrowError> {
289 let (header, decoder) = self.build_impl(&mut reader)?;
290 Ok(Reader {
291 reader,
292 header,
293 decoder,
294 block_decoder: BlockDecoder::default(),
295 block_data: Vec::new(),
296 block_cursor: 0,
297 finished: false,
298 })
299 }
300
301 pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, ArrowError> {
305 match self.schema {
306 Some(ref schema) => {
307 let record_decoder = self.make_record_decoder(schema)?;
308 Ok(Decoder::new(record_decoder, self.batch_size))
309 }
310 None => {
311 let (_, decoder) = self.build_impl(&mut reader)?;
312 Ok(decoder)
313 }
314 }
315 }
316}
317
318#[derive(Debug)]
321pub struct Reader<R: BufRead> {
322 reader: R,
323 header: Header,
324 decoder: Decoder,
325 block_decoder: BlockDecoder,
326 block_data: Vec<u8>,
327 block_cursor: usize,
328 finished: bool,
329}
330
331impl<R: BufRead> Reader<R> {
332 pub fn schema(&self) -> SchemaRef {
334 self.decoder.schema()
335 }
336
337 pub fn avro_header(&self) -> &Header {
339 &self.header
340 }
341
342 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
344 'outer: while !self.finished && !self.decoder.batch_is_full() {
345 while self.block_cursor == self.block_data.len() {
346 let buf = self.reader.fill_buf()?;
347 if buf.is_empty() {
348 self.finished = true;
349 break 'outer;
350 }
351 let consumed = self.block_decoder.decode(buf)?;
353 self.reader.consume(consumed);
354 if let Some(block) = self.block_decoder.flush() {
355 let block_data = if let Some(ref codec) = self.header.compression()? {
357 codec.decompress(&block.data)?
358 } else {
359 block.data
360 };
361 self.block_data = block_data;
362 self.block_cursor = 0;
363 } else if consumed == 0 {
364 return Err(ArrowError::ParseError(
366 "Could not decode next Avro block from partial data".to_string(),
367 ));
368 }
369 }
370 let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
372 self.block_cursor += consumed;
373 }
374 self.decoder.flush()
375 }
376}
377
378impl<R: BufRead> Iterator for Reader<R> {
379 type Item = Result<RecordBatch, ArrowError>;
380
381 fn next(&mut self) -> Option<Self::Item> {
382 self.read().transpose()
383 }
384}
385
386impl<R: BufRead> RecordBatchReader for Reader<R> {
387 fn schema(&self) -> SchemaRef {
388 self.schema()
389 }
390}
391
392#[cfg(test)]
393mod test {
394 use crate::codec::{AvroDataType, AvroField, Codec};
395 use crate::compression::CompressionCodec;
396 use crate::reader::record::RecordDecoder;
397 use crate::reader::vlq::VLQDecoder;
398 use crate::reader::{read_header, Decoder, Reader, ReaderBuilder};
399 use crate::test_util::arrow_test_data;
400 use arrow::array::ArrayDataBuilder;
401 use arrow_array::builder::{
402 ArrayBuilder, BooleanBuilder, Float32Builder, Float64Builder, Int32Builder, Int64Builder,
403 ListBuilder, MapBuilder, StringBuilder, StructBuilder,
404 };
405 use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
406 use arrow_array::*;
407 use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
408 use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
409 use bytes::{Buf, BufMut, Bytes};
410 use futures::executor::block_on;
411 use futures::{stream, Stream, StreamExt, TryStreamExt};
412 use std::collections::HashMap;
413 use std::fs;
414 use std::fs::File;
415 use std::io::{BufReader, Cursor, Read};
416 use std::sync::Arc;
417 use std::task::{ready, Poll};
418
419 fn read_file(path: &str, batch_size: usize, utf8_view: bool) -> RecordBatch {
420 let file = File::open(path).unwrap();
421 let reader = ReaderBuilder::new()
422 .with_batch_size(batch_size)
423 .with_utf8_view(utf8_view)
424 .build(BufReader::new(file))
425 .unwrap();
426 let schema = reader.schema();
427 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
428 arrow::compute::concat_batches(&schema, &batches).unwrap()
429 }
430
431 fn read_file_strict(
432 path: &str,
433 batch_size: usize,
434 utf8_view: bool,
435 ) -> Result<Reader<BufReader<File>>, ArrowError> {
436 let file = File::open(path).unwrap();
437 ReaderBuilder::new()
438 .with_batch_size(batch_size)
439 .with_utf8_view(utf8_view)
440 .with_strict_mode(true)
441 .build(BufReader::new(file))
442 }
443
444 fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
445 mut decoder: Decoder,
446 mut input: S,
447 ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
448 async_stream::try_stream! {
449 if let Some(data) = input.next().await {
450 let consumed = decoder.decode(&data)?;
451 if consumed < data.len() {
452 Err(ArrowError::ParseError(
453 "did not consume all bytes".to_string(),
454 ))?;
455 }
456 }
457 if let Some(batch) = decoder.flush()? {
458 yield batch
459 }
460 }
461 }
462
463 #[test]
464 fn test_utf8view_support() {
465 let schema_json = r#"{
466 "type": "record",
467 "name": "test",
468 "fields": [{
469 "name": "str_field",
470 "type": "string"
471 }]
472 }"#;
473
474 let schema: crate::schema::Schema = serde_json::from_str(schema_json).unwrap();
475 let avro_field = AvroField::try_from(&schema).unwrap();
476
477 let data_type = avro_field.data_type();
478
479 struct TestHelper;
480 impl TestHelper {
481 fn with_utf8view(field: &Field) -> Field {
482 match field.data_type() {
483 DataType::Utf8 => {
484 Field::new(field.name(), DataType::Utf8View, field.is_nullable())
485 .with_metadata(field.metadata().clone())
486 }
487 _ => field.clone(),
488 }
489 }
490 }
491
492 let field = TestHelper::with_utf8view(&Field::new("str_field", DataType::Utf8, false));
493
494 assert_eq!(field.data_type(), &DataType::Utf8View);
495
496 let array = StringViewArray::from(vec!["test1", "test2"]);
497 let batch =
498 RecordBatch::try_from_iter(vec![("str_field", Arc::new(array) as ArrayRef)]).unwrap();
499
500 assert!(batch.column(0).as_any().is::<StringViewArray>());
501 }
502
503 #[test]
504 fn test_read_zero_byte_avro_file() {
505 let batch = read_file("test/data/zero_byte.avro", 3, false);
506 let schema = batch.schema();
507 assert_eq!(schema.fields().len(), 1);
508 let field = schema.field(0);
509 assert_eq!(field.name(), "data");
510 assert_eq!(field.data_type(), &DataType::Binary);
511 assert!(field.is_nullable());
512 assert_eq!(batch.num_rows(), 3);
513 assert_eq!(batch.num_columns(), 1);
514 let binary_array = batch
515 .column(0)
516 .as_any()
517 .downcast_ref::<BinaryArray>()
518 .unwrap();
519 assert!(binary_array.is_null(0));
520 assert!(binary_array.is_valid(1));
521 assert_eq!(binary_array.value(1), b"");
522 assert!(binary_array.is_valid(2));
523 assert_eq!(binary_array.value(2), b"some bytes");
524 }
525
526 #[test]
527 fn test_alltypes() {
528 let files = [
529 "avro/alltypes_plain.avro",
530 "avro/alltypes_plain.snappy.avro",
531 "avro/alltypes_plain.zstandard.avro",
532 "avro/alltypes_plain.bzip2.avro",
533 "avro/alltypes_plain.xz.avro",
534 ];
535
536 let expected = RecordBatch::try_from_iter_with_nullable([
537 (
538 "id",
539 Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _,
540 true,
541 ),
542 (
543 "bool_col",
544 Arc::new(BooleanArray::from_iter((0..8).map(|x| Some(x % 2 == 0)))) as _,
545 true,
546 ),
547 (
548 "tinyint_col",
549 Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
550 true,
551 ),
552 (
553 "smallint_col",
554 Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
555 true,
556 ),
557 (
558 "int_col",
559 Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
560 true,
561 ),
562 (
563 "bigint_col",
564 Arc::new(Int64Array::from_iter_values((0..8).map(|x| (x % 2) * 10))) as _,
565 true,
566 ),
567 (
568 "float_col",
569 Arc::new(Float32Array::from_iter_values(
570 (0..8).map(|x| (x % 2) as f32 * 1.1),
571 )) as _,
572 true,
573 ),
574 (
575 "double_col",
576 Arc::new(Float64Array::from_iter_values(
577 (0..8).map(|x| (x % 2) as f64 * 10.1),
578 )) as _,
579 true,
580 ),
581 (
582 "date_string_col",
583 Arc::new(BinaryArray::from_iter_values([
584 [48, 51, 47, 48, 49, 47, 48, 57],
585 [48, 51, 47, 48, 49, 47, 48, 57],
586 [48, 52, 47, 48, 49, 47, 48, 57],
587 [48, 52, 47, 48, 49, 47, 48, 57],
588 [48, 50, 47, 48, 49, 47, 48, 57],
589 [48, 50, 47, 48, 49, 47, 48, 57],
590 [48, 49, 47, 48, 49, 47, 48, 57],
591 [48, 49, 47, 48, 49, 47, 48, 57],
592 ])) as _,
593 true,
594 ),
595 (
596 "string_col",
597 Arc::new(BinaryArray::from_iter_values((0..8).map(|x| [48 + x % 2]))) as _,
598 true,
599 ),
600 (
601 "timestamp_col",
602 Arc::new(
603 TimestampMicrosecondArray::from_iter_values([
604 1235865600000000, 1235865660000000, 1238544000000000, 1238544060000000, 1233446400000000, 1233446460000000, 1230768000000000, 1230768060000000, ])
613 .with_timezone("+00:00"),
614 ) as _,
615 true,
616 ),
617 ])
618 .unwrap();
619
620 for file in files {
621 let file = arrow_test_data(file);
622
623 assert_eq!(read_file(&file, 8, false), expected);
624 assert_eq!(read_file(&file, 3, false), expected);
625 }
626 }
627
628 #[test]
629 fn test_alltypes_dictionary() {
630 let file = "avro/alltypes_dictionary.avro";
631 let expected = RecordBatch::try_from_iter_with_nullable([
632 ("id", Arc::new(Int32Array::from(vec![0, 1])) as _, true),
633 (
634 "bool_col",
635 Arc::new(BooleanArray::from(vec![Some(true), Some(false)])) as _,
636 true,
637 ),
638 (
639 "tinyint_col",
640 Arc::new(Int32Array::from(vec![0, 1])) as _,
641 true,
642 ),
643 (
644 "smallint_col",
645 Arc::new(Int32Array::from(vec![0, 1])) as _,
646 true,
647 ),
648 ("int_col", Arc::new(Int32Array::from(vec![0, 1])) as _, true),
649 (
650 "bigint_col",
651 Arc::new(Int64Array::from(vec![0, 10])) as _,
652 true,
653 ),
654 (
655 "float_col",
656 Arc::new(Float32Array::from(vec![0.0, 1.1])) as _,
657 true,
658 ),
659 (
660 "double_col",
661 Arc::new(Float64Array::from(vec![0.0, 10.1])) as _,
662 true,
663 ),
664 (
665 "date_string_col",
666 Arc::new(BinaryArray::from_iter_values([b"01/01/09", b"01/01/09"])) as _,
667 true,
668 ),
669 (
670 "string_col",
671 Arc::new(BinaryArray::from_iter_values([b"0", b"1"])) as _,
672 true,
673 ),
674 (
675 "timestamp_col",
676 Arc::new(
677 TimestampMicrosecondArray::from_iter_values([
678 1230768000000000, 1230768060000000, ])
681 .with_timezone("+00:00"),
682 ) as _,
683 true,
684 ),
685 ])
686 .unwrap();
687 let file_path = arrow_test_data(file);
688 let batch_large = read_file(&file_path, 8, false);
689 assert_eq!(
690 batch_large, expected,
691 "Decoded RecordBatch does not match for file {file}"
692 );
693 let batch_small = read_file(&file_path, 3, false);
694 assert_eq!(
695 batch_small, expected,
696 "Decoded RecordBatch (batch size 3) does not match for file {file}"
697 );
698 }
699
700 #[test]
701 fn test_alltypes_nulls_plain() {
702 let file = "avro/alltypes_nulls_plain.avro";
703 let expected = RecordBatch::try_from_iter_with_nullable([
704 (
705 "string_col",
706 Arc::new(StringArray::from(vec![None::<&str>])) as _,
707 true,
708 ),
709 ("int_col", Arc::new(Int32Array::from(vec![None])) as _, true),
710 (
711 "bool_col",
712 Arc::new(BooleanArray::from(vec![None])) as _,
713 true,
714 ),
715 (
716 "bigint_col",
717 Arc::new(Int64Array::from(vec![None])) as _,
718 true,
719 ),
720 (
721 "float_col",
722 Arc::new(Float32Array::from(vec![None])) as _,
723 true,
724 ),
725 (
726 "double_col",
727 Arc::new(Float64Array::from(vec![None])) as _,
728 true,
729 ),
730 (
731 "bytes_col",
732 Arc::new(BinaryArray::from(vec![None::<&[u8]>])) as _,
733 true,
734 ),
735 ])
736 .unwrap();
737 let file_path = arrow_test_data(file);
738 let batch_large = read_file(&file_path, 8, false);
739 assert_eq!(
740 batch_large, expected,
741 "Decoded RecordBatch does not match for file {file}"
742 );
743 let batch_small = read_file(&file_path, 3, false);
744 assert_eq!(
745 batch_small, expected,
746 "Decoded RecordBatch (batch size 3) does not match for file {file}"
747 );
748 }
749
750 #[test]
751 fn test_binary() {
752 let file = arrow_test_data("avro/binary.avro");
753 let batch = read_file(&file, 8, false);
754 let expected = RecordBatch::try_from_iter_with_nullable([(
755 "foo",
756 Arc::new(BinaryArray::from_iter_values(vec![
757 b"\x00".as_ref(),
758 b"\x01".as_ref(),
759 b"\x02".as_ref(),
760 b"\x03".as_ref(),
761 b"\x04".as_ref(),
762 b"\x05".as_ref(),
763 b"\x06".as_ref(),
764 b"\x07".as_ref(),
765 b"\x08".as_ref(),
766 b"\t".as_ref(),
767 b"\n".as_ref(),
768 b"\x0b".as_ref(),
769 ])) as Arc<dyn Array>,
770 true,
771 )])
772 .unwrap();
773 assert_eq!(batch, expected);
774 }
775
776 #[test]
777 fn test_decode_stream_with_schema() {
778 struct TestCase<'a> {
779 name: &'a str,
780 schema: &'a str,
781 expected_error: Option<&'a str>,
782 }
783 let tests = vec![
784 TestCase {
785 name: "success",
786 schema: r#"{"type":"record","name":"test","fields":[{"name":"f2","type":"string"}]}"#,
787 expected_error: None,
788 },
789 TestCase {
790 name: "valid schema invalid data",
791 schema: r#"{"type":"record","name":"test","fields":[{"name":"f2","type":"long"}]}"#,
792 expected_error: Some("did not consume all bytes"),
793 },
794 ];
795 for test in tests {
796 let schema_s2: crate::schema::Schema = serde_json::from_str(test.schema).unwrap();
797 let record_val = "some_string";
798 let mut body = vec![];
799 body.push((record_val.len() as u8) << 1);
800 body.extend_from_slice(record_val.as_bytes());
801 let mut reader_placeholder = Cursor::new(&[] as &[u8]);
802 let builder = ReaderBuilder::new()
803 .with_batch_size(1)
804 .with_schema(schema_s2);
805 let decoder_result = builder.build_decoder(&mut reader_placeholder);
806 let decoder = match decoder_result {
807 Ok(decoder) => decoder,
808 Err(e) => {
809 if let Some(expected) = test.expected_error {
810 assert!(
811 e.to_string().contains(expected),
812 "Test '{}' failed: unexpected error message at build.\nExpected to contain: '{expected}'\nActual: '{e}'",
813 test.name,
814 );
815 continue;
816 } else {
817 panic!("Test '{}' failed at decoder build: {e}", test.name);
818 }
819 }
820 };
821 let stream = Box::pin(stream::once(async { Bytes::from(body) }));
822 let decoded_stream = decode_stream(decoder, stream);
823 let batches_result: Result<Vec<RecordBatch>, ArrowError> =
824 block_on(decoded_stream.try_collect());
825 match (batches_result, test.expected_error) {
826 (Ok(batches), None) => {
827 let batch =
828 arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
829 let expected_field = Field::new("f2", DataType::Utf8, false);
830 let expected_schema = Arc::new(Schema::new(vec![expected_field]));
831 let expected_array = Arc::new(StringArray::from(vec![record_val]));
832 let expected_batch =
833 RecordBatch::try_new(expected_schema, vec![expected_array]).unwrap();
834 assert_eq!(batch, expected_batch, "Test '{}' failed", test.name);
835 assert_eq!(
836 batch.schema().field(0).name(),
837 "f2",
838 "Test '{}' failed",
839 test.name
840 );
841 }
842 (Err(e), Some(expected)) => {
843 assert!(
844 e.to_string().contains(expected),
845 "Test '{}' failed: unexpected error message at decode.\nExpected to contain: '{expected}'\nActual: '{e}'",
846 test.name,
847 );
848 }
849 (Ok(batches), Some(expected)) => {
850 panic!(
851 "Test '{}' was expected to fail with '{expected}', but it succeeded with: {:?}",
852 test.name, batches
853 );
854 }
855 (Err(e), None) => {
856 panic!(
857 "Test '{}' was not expected to fail, but it did with '{e}'",
858 test.name
859 );
860 }
861 }
862 }
863 }
864
865 #[test]
866 fn test_decimal() {
867 let files = [
868 ("avro/fixed_length_decimal.avro", 25, 2),
869 ("avro/fixed_length_decimal_legacy.avro", 13, 2),
870 ("avro/int32_decimal.avro", 4, 2),
871 ("avro/int64_decimal.avro", 10, 2),
872 ];
873 let decimal_values: Vec<i128> = (1..=24).map(|n| n as i128 * 100).collect();
874 for (file, precision, scale) in files {
875 let file_path = arrow_test_data(file);
876 let actual_batch = read_file(&file_path, 8, false);
877 let expected_array = Decimal128Array::from_iter_values(decimal_values.clone())
878 .with_precision_and_scale(precision, scale)
879 .unwrap();
880 let mut meta = HashMap::new();
881 meta.insert("precision".to_string(), precision.to_string());
882 meta.insert("scale".to_string(), scale.to_string());
883 let field_with_meta = Field::new("value", DataType::Decimal128(precision, scale), true)
884 .with_metadata(meta);
885 let expected_schema = Arc::new(Schema::new(vec![field_with_meta]));
886 let expected_batch =
887 RecordBatch::try_new(expected_schema.clone(), vec![Arc::new(expected_array)])
888 .expect("Failed to build expected RecordBatch");
889 assert_eq!(
890 actual_batch, expected_batch,
891 "Decoded RecordBatch does not match the expected Decimal128 data for file {file}"
892 );
893 let actual_batch_small = read_file(&file_path, 3, false);
894 assert_eq!(
895 actual_batch_small,
896 expected_batch,
897 "Decoded RecordBatch does not match the expected Decimal128 data for file {file} with batch size 3"
898 );
899 }
900 }
901
902 #[test]
903 fn test_dict_pages_offset_zero() {
904 let file = arrow_test_data("avro/dict-page-offset-zero.avro");
905 let batch = read_file(&file, 32, false);
906 let num_rows = batch.num_rows();
907 let expected_field = Int32Array::from(vec![Some(1552); num_rows]);
908 let expected = RecordBatch::try_from_iter_with_nullable([(
909 "l_partkey",
910 Arc::new(expected_field) as Arc<dyn Array>,
911 true,
912 )])
913 .unwrap();
914 assert_eq!(batch, expected);
915 }
916
917 #[test]
918 fn test_list_columns() {
919 let file = arrow_test_data("avro/list_columns.avro");
920 let mut int64_list_builder = ListBuilder::new(Int64Builder::new());
921 {
922 {
923 let values = int64_list_builder.values();
924 values.append_value(1);
925 values.append_value(2);
926 values.append_value(3);
927 }
928 int64_list_builder.append(true);
929 }
930 {
931 {
932 let values = int64_list_builder.values();
933 values.append_null();
934 values.append_value(1);
935 }
936 int64_list_builder.append(true);
937 }
938 {
939 {
940 let values = int64_list_builder.values();
941 values.append_value(4);
942 }
943 int64_list_builder.append(true);
944 }
945 let int64_list = int64_list_builder.finish();
946 let mut utf8_list_builder = ListBuilder::new(StringBuilder::new());
947 {
948 {
949 let values = utf8_list_builder.values();
950 values.append_value("abc");
951 values.append_value("efg");
952 values.append_value("hij");
953 }
954 utf8_list_builder.append(true);
955 }
956 {
957 utf8_list_builder.append(false);
958 }
959 {
960 {
961 let values = utf8_list_builder.values();
962 values.append_value("efg");
963 values.append_null();
964 values.append_value("hij");
965 values.append_value("xyz");
966 }
967 utf8_list_builder.append(true);
968 }
969 let utf8_list = utf8_list_builder.finish();
970 let expected = RecordBatch::try_from_iter_with_nullable([
971 ("int64_list", Arc::new(int64_list) as Arc<dyn Array>, true),
972 ("utf8_list", Arc::new(utf8_list) as Arc<dyn Array>, true),
973 ])
974 .unwrap();
975 let batch = read_file(&file, 8, false);
976 assert_eq!(batch, expected);
977 }
978
979 #[test]
980 fn test_nested_lists() {
981 use arrow_data::ArrayDataBuilder;
982 let file = arrow_test_data("avro/nested_lists.snappy.avro");
983 let inner_values = StringArray::from(vec![
984 Some("a"),
985 Some("b"),
986 Some("c"),
987 Some("d"),
988 Some("a"),
989 Some("b"),
990 Some("c"),
991 Some("d"),
992 Some("e"),
993 Some("a"),
994 Some("b"),
995 Some("c"),
996 Some("d"),
997 Some("e"),
998 Some("f"),
999 ]);
1000 let inner_offsets = Buffer::from_slice_ref([0, 2, 3, 3, 4, 6, 8, 8, 9, 11, 13, 14, 14, 15]);
1001 let inner_validity = [
1002 true, true, false, true, true, true, false, true, true, true, true, false, true,
1003 ];
1004 let inner_null_buffer = Buffer::from_iter(inner_validity.iter().copied());
1005 let inner_field = Field::new("item", DataType::Utf8, true);
1006 let inner_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(inner_field)))
1007 .len(13)
1008 .add_buffer(inner_offsets)
1009 .add_child_data(inner_values.to_data())
1010 .null_bit_buffer(Some(inner_null_buffer))
1011 .build()
1012 .unwrap();
1013 let inner_list_array = ListArray::from(inner_list_data);
1014 let middle_offsets = Buffer::from_slice_ref([0, 2, 4, 6, 8, 11, 13]);
1015 let middle_validity = [true; 6];
1016 let middle_null_buffer = Buffer::from_iter(middle_validity.iter().copied());
1017 let middle_field = Field::new("item", inner_list_array.data_type().clone(), true);
1018 let middle_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(middle_field)))
1019 .len(6)
1020 .add_buffer(middle_offsets)
1021 .add_child_data(inner_list_array.to_data())
1022 .null_bit_buffer(Some(middle_null_buffer))
1023 .build()
1024 .unwrap();
1025 let middle_list_array = ListArray::from(middle_list_data);
1026 let outer_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
1027 let outer_null_buffer = Buffer::from_slice_ref([0b111]); let outer_field = Field::new("item", middle_list_array.data_type().clone(), true);
1029 let outer_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(outer_field)))
1030 .len(3)
1031 .add_buffer(outer_offsets)
1032 .add_child_data(middle_list_array.to_data())
1033 .null_bit_buffer(Some(outer_null_buffer))
1034 .build()
1035 .unwrap();
1036 let a_expected = ListArray::from(outer_list_data);
1037 let b_expected = Int32Array::from(vec![1, 1, 1]);
1038 let expected = RecordBatch::try_from_iter_with_nullable([
1039 ("a", Arc::new(a_expected) as Arc<dyn Array>, true),
1040 ("b", Arc::new(b_expected) as Arc<dyn Array>, true),
1041 ])
1042 .unwrap();
1043 let left = read_file(&file, 8, false);
1044 assert_eq!(left, expected, "Mismatch for batch size=8");
1045 let left_small = read_file(&file, 3, false);
1046 assert_eq!(left_small, expected, "Mismatch for batch size=3");
1047 }
1048
1049 #[test]
1050 fn test_simple() {
1051 let tests = [
1052 ("avro/simple_enum.avro", 4, build_expected_enum(), 2),
1053 ("avro/simple_fixed.avro", 2, build_expected_fixed(), 1),
1054 ];
1055
1056 fn build_expected_enum() -> RecordBatch {
1057 let keys_f1 = Int32Array::from(vec![0, 1, 2, 3]);
1059 let vals_f1 = StringArray::from(vec!["a", "b", "c", "d"]);
1060 let f1_dict =
1061 DictionaryArray::<Int32Type>::try_new(keys_f1, Arc::new(vals_f1)).unwrap();
1062 let keys_f2 = Int32Array::from(vec![2, 3, 0, 1]);
1063 let vals_f2 = StringArray::from(vec!["e", "f", "g", "h"]);
1064 let f2_dict =
1065 DictionaryArray::<Int32Type>::try_new(keys_f2, Arc::new(vals_f2)).unwrap();
1066 let keys_f3 = Int32Array::from(vec![Some(1), Some(2), None, Some(0)]);
1067 let vals_f3 = StringArray::from(vec!["i", "j", "k"]);
1068 let f3_dict =
1069 DictionaryArray::<Int32Type>::try_new(keys_f3, Arc::new(vals_f3)).unwrap();
1070 let dict_type =
1071 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1072 let mut md_f1 = HashMap::new();
1073 md_f1.insert(
1074 "avro.enum.symbols".to_string(),
1075 r#"["a","b","c","d"]"#.to_string(),
1076 );
1077 let f1_field = Field::new("f1", dict_type.clone(), false).with_metadata(md_f1);
1078 let mut md_f2 = HashMap::new();
1079 md_f2.insert(
1080 "avro.enum.symbols".to_string(),
1081 r#"["e","f","g","h"]"#.to_string(),
1082 );
1083 let f2_field = Field::new("f2", dict_type.clone(), false).with_metadata(md_f2);
1084 let mut md_f3 = HashMap::new();
1085 md_f3.insert(
1086 "avro.enum.symbols".to_string(),
1087 r#"["i","j","k"]"#.to_string(),
1088 );
1089 let f3_field = Field::new("f3", dict_type.clone(), true).with_metadata(md_f3);
1090 let expected_schema = Arc::new(Schema::new(vec![f1_field, f2_field, f3_field]));
1091 RecordBatch::try_new(
1092 expected_schema,
1093 vec![
1094 Arc::new(f1_dict) as Arc<dyn Array>,
1095 Arc::new(f2_dict) as Arc<dyn Array>,
1096 Arc::new(f3_dict) as Arc<dyn Array>,
1097 ],
1098 )
1099 .unwrap()
1100 }
1101
1102 fn build_expected_fixed() -> RecordBatch {
1103 let f1 =
1104 FixedSizeBinaryArray::try_from_iter(vec![b"abcde", b"12345"].into_iter()).unwrap();
1105 let f2 =
1106 FixedSizeBinaryArray::try_from_iter(vec![b"fghijklmno", b"1234567890"].into_iter())
1107 .unwrap();
1108 let f3 = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1109 vec![Some(b"ABCDEF" as &[u8]), None].into_iter(),
1110 6,
1111 )
1112 .unwrap();
1113 let expected_schema = Arc::new(Schema::new(vec![
1114 Field::new("f1", DataType::FixedSizeBinary(5), false),
1115 Field::new("f2", DataType::FixedSizeBinary(10), false),
1116 Field::new("f3", DataType::FixedSizeBinary(6), true),
1117 ]));
1118 RecordBatch::try_new(
1119 expected_schema,
1120 vec![
1121 Arc::new(f1) as Arc<dyn Array>,
1122 Arc::new(f2) as Arc<dyn Array>,
1123 Arc::new(f3) as Arc<dyn Array>,
1124 ],
1125 )
1126 .unwrap()
1127 }
1128 for (file_name, batch_size, expected, alt_batch_size) in tests {
1129 let file = arrow_test_data(file_name);
1130 let actual = read_file(&file, batch_size, false);
1131 assert_eq!(actual, expected);
1132 let actual2 = read_file(&file, alt_batch_size, false);
1133 assert_eq!(actual2, expected);
1134 }
1135 }
1136
1137 #[test]
1138 fn test_single_nan() {
1139 let file = arrow_test_data("avro/single_nan.avro");
1140 let actual = read_file(&file, 1, false);
1141 use arrow_array::Float64Array;
1142 let schema = Arc::new(Schema::new(vec![Field::new(
1143 "mycol",
1144 DataType::Float64,
1145 true,
1146 )]));
1147 let col = Float64Array::from(vec![None]);
1148 let expected = RecordBatch::try_new(schema, vec![Arc::new(col)]).unwrap();
1149 assert_eq!(actual, expected);
1150 let actual2 = read_file(&file, 2, false);
1151 assert_eq!(actual2, expected);
1152 }
1153
1154 #[test]
1155 fn test_duration_uuid() {
1156 let batch = read_file("test/data/duration_uuid.avro", 4, false);
1157 let schema = batch.schema();
1158 let fields = schema.fields();
1159 assert_eq!(fields.len(), 2);
1160 assert_eq!(fields[0].name(), "duration_field");
1161 assert_eq!(
1162 fields[0].data_type(),
1163 &DataType::Interval(IntervalUnit::MonthDayNano)
1164 );
1165 assert_eq!(fields[1].name(), "uuid_field");
1166 assert_eq!(fields[1].data_type(), &DataType::FixedSizeBinary(16));
1167 assert_eq!(batch.num_rows(), 4);
1168 assert_eq!(batch.num_columns(), 2);
1169 let duration_array = batch
1170 .column(0)
1171 .as_any()
1172 .downcast_ref::<IntervalMonthDayNanoArray>()
1173 .unwrap();
1174 let expected_duration_array: IntervalMonthDayNanoArray = [
1175 Some(IntervalMonthDayNanoType::make_value(1, 15, 500_000_000)),
1176 Some(IntervalMonthDayNanoType::make_value(0, 5, 2_500_000_000)),
1177 Some(IntervalMonthDayNanoType::make_value(2, 0, 0)),
1178 Some(IntervalMonthDayNanoType::make_value(12, 31, 999_000_000)),
1179 ]
1180 .iter()
1181 .copied()
1182 .collect();
1183 assert_eq!(&expected_duration_array, duration_array);
1184 let uuid_array = batch
1185 .column(1)
1186 .as_any()
1187 .downcast_ref::<FixedSizeBinaryArray>()
1188 .unwrap();
1189 let expected_uuid_array = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1190 [
1191 Some([
1192 0xfe, 0x7b, 0xc3, 0x0b, 0x4c, 0xe8, 0x4c, 0x5e, 0xb6, 0x7c, 0x22, 0x34, 0xa2,
1193 0xd3, 0x8e, 0x66,
1194 ]),
1195 Some([
1196 0xb3, 0x3f, 0x2a, 0xd7, 0x97, 0xb4, 0x4d, 0xe1, 0x8b, 0xfe, 0x94, 0x94, 0x1d,
1197 0x60, 0x15, 0x6e,
1198 ]),
1199 Some([
1200 0x5f, 0x74, 0x92, 0x64, 0x07, 0x4b, 0x40, 0x05, 0x84, 0xbf, 0x11, 0x5e, 0xa8,
1201 0x4e, 0xd2, 0x0a,
1202 ]),
1203 Some([
1204 0x08, 0x26, 0xcc, 0x06, 0xd2, 0xe3, 0x45, 0x99, 0xb4, 0xad, 0xaf, 0x5f, 0xa6,
1205 0x90, 0x5c, 0xdb,
1206 ]),
1207 ]
1208 .into_iter(),
1209 16,
1210 )
1211 .unwrap();
1212 assert_eq!(&expected_uuid_array, uuid_array);
1213 }
1214
1215 #[test]
1216 fn test_datapage_v2() {
1217 let file = arrow_test_data("avro/datapage_v2.snappy.avro");
1218 let batch = read_file(&file, 8, false);
1219 let a = StringArray::from(vec![
1220 Some("abc"),
1221 Some("abc"),
1222 Some("abc"),
1223 None,
1224 Some("abc"),
1225 ]);
1226 let b = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
1227 let c = Float64Array::from(vec![Some(2.0), Some(3.0), Some(4.0), Some(5.0), Some(2.0)]);
1228 let d = BooleanArray::from(vec![
1229 Some(true),
1230 Some(true),
1231 Some(true),
1232 Some(false),
1233 Some(true),
1234 ]);
1235 let e_values = Int32Array::from(vec![
1236 Some(1),
1237 Some(2),
1238 Some(3),
1239 Some(1),
1240 Some(2),
1241 Some(3),
1242 Some(1),
1243 Some(2),
1244 ]);
1245 let e_offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 3, 3, 3, 6, 8]));
1246 let e_validity = Some(NullBuffer::from(vec![true, false, false, true, true]));
1247 let field_e = Arc::new(Field::new("item", DataType::Int32, true));
1248 let e = ListArray::new(field_e, e_offsets, Arc::new(e_values), e_validity);
1249 let expected = RecordBatch::try_from_iter_with_nullable([
1250 ("a", Arc::new(a) as Arc<dyn Array>, true),
1251 ("b", Arc::new(b) as Arc<dyn Array>, true),
1252 ("c", Arc::new(c) as Arc<dyn Array>, true),
1253 ("d", Arc::new(d) as Arc<dyn Array>, true),
1254 ("e", Arc::new(e) as Arc<dyn Array>, true),
1255 ])
1256 .unwrap();
1257 assert_eq!(batch, expected);
1258 }
1259
1260 #[test]
1261 fn test_nested_records() {
1262 let f1_f1_1 = StringArray::from(vec!["aaa", "bbb"]);
1263 let f1_f1_2 = Int32Array::from(vec![10, 20]);
1264 let rounded_pi = (std::f64::consts::PI * 100.0).round() / 100.0;
1265 let f1_f1_3_1 = Float64Array::from(vec![rounded_pi, rounded_pi]);
1266 let f1_f1_3 = StructArray::from(vec![(
1267 Arc::new(Field::new("f1_3_1", DataType::Float64, false)),
1268 Arc::new(f1_f1_3_1) as Arc<dyn Array>,
1269 )]);
1270 let f1_expected = StructArray::from(vec![
1271 (
1272 Arc::new(Field::new("f1_1", DataType::Utf8, false)),
1273 Arc::new(f1_f1_1) as Arc<dyn Array>,
1274 ),
1275 (
1276 Arc::new(Field::new("f1_2", DataType::Int32, false)),
1277 Arc::new(f1_f1_2) as Arc<dyn Array>,
1278 ),
1279 (
1280 Arc::new(Field::new(
1281 "f1_3",
1282 DataType::Struct(Fields::from(vec![Field::new(
1283 "f1_3_1",
1284 DataType::Float64,
1285 false,
1286 )])),
1287 false,
1288 )),
1289 Arc::new(f1_f1_3) as Arc<dyn Array>,
1290 ),
1291 ]);
1292
1293 let f2_fields = vec![
1294 Field::new("f2_1", DataType::Boolean, false),
1295 Field::new("f2_2", DataType::Float32, false),
1296 ];
1297 let f2_struct_builder = StructBuilder::new(
1298 f2_fields
1299 .iter()
1300 .map(|f| Arc::new(f.clone()))
1301 .collect::<Vec<Arc<Field>>>(),
1302 vec![
1303 Box::new(BooleanBuilder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>,
1304 Box::new(Float32Builder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>,
1305 ],
1306 );
1307 let mut f2_list_builder = ListBuilder::new(f2_struct_builder);
1308 {
1309 let struct_builder = f2_list_builder.values();
1310 struct_builder.append(true);
1311 {
1312 let b = struct_builder.field_builder::<BooleanBuilder>(0).unwrap();
1313 b.append_value(true);
1314 }
1315 {
1316 let b = struct_builder.field_builder::<Float32Builder>(1).unwrap();
1317 b.append_value(1.2_f32);
1318 }
1319 struct_builder.append(true);
1320 {
1321 let b = struct_builder.field_builder::<BooleanBuilder>(0).unwrap();
1322 b.append_value(true);
1323 }
1324 {
1325 let b = struct_builder.field_builder::<Float32Builder>(1).unwrap();
1326 b.append_value(2.2_f32);
1327 }
1328 f2_list_builder.append(true);
1329 }
1330 {
1331 let struct_builder = f2_list_builder.values();
1332 struct_builder.append(true);
1333 {
1334 let b = struct_builder.field_builder::<BooleanBuilder>(0).unwrap();
1335 b.append_value(false);
1336 }
1337 {
1338 let b = struct_builder.field_builder::<Float32Builder>(1).unwrap();
1339 b.append_value(10.2_f32);
1340 }
1341 f2_list_builder.append(true);
1342 }
1343
1344 let list_array_with_nullable_items = f2_list_builder.finish();
1345
1346 let item_field = Arc::new(Field::new(
1347 "item",
1348 list_array_with_nullable_items.values().data_type().clone(),
1349 false,
1350 ));
1351 let list_data_type = DataType::List(item_field);
1352
1353 let f2_array_data = list_array_with_nullable_items
1354 .to_data()
1355 .into_builder()
1356 .data_type(list_data_type)
1357 .build()
1358 .unwrap();
1359 let f2_expected = ListArray::from(f2_array_data);
1360
1361 let mut f3_struct_builder = StructBuilder::new(
1362 vec![Arc::new(Field::new("f3_1", DataType::Utf8, false))],
1363 vec![Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>],
1364 );
1365 f3_struct_builder.append(true);
1366 {
1367 let b = f3_struct_builder.field_builder::<StringBuilder>(0).unwrap();
1368 b.append_value("xyz");
1369 }
1370 f3_struct_builder.append(false);
1371 {
1372 let b = f3_struct_builder.field_builder::<StringBuilder>(0).unwrap();
1373 b.append_null();
1374 }
1375 let f3_expected = f3_struct_builder.finish();
1376 let f4_fields = [Field::new("f4_1", DataType::Int64, false)];
1377 let f4_struct_builder = StructBuilder::new(
1378 f4_fields
1379 .iter()
1380 .map(|f| Arc::new(f.clone()))
1381 .collect::<Vec<Arc<Field>>>(),
1382 vec![Box::new(Int64Builder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>],
1383 );
1384 let mut f4_list_builder = ListBuilder::new(f4_struct_builder);
1385 {
1386 let struct_builder = f4_list_builder.values();
1387 struct_builder.append(true);
1388 {
1389 let b = struct_builder.field_builder::<Int64Builder>(0).unwrap();
1390 b.append_value(200);
1391 }
1392 struct_builder.append(false);
1393 {
1394 let b = struct_builder.field_builder::<Int64Builder>(0).unwrap();
1395 b.append_null();
1396 }
1397 f4_list_builder.append(true);
1398 }
1399 {
1400 let struct_builder = f4_list_builder.values();
1401 struct_builder.append(false);
1402 {
1403 let b = struct_builder.field_builder::<Int64Builder>(0).unwrap();
1404 b.append_null();
1405 }
1406 struct_builder.append(true);
1407 {
1408 let b = struct_builder.field_builder::<Int64Builder>(0).unwrap();
1409 b.append_value(300);
1410 }
1411 f4_list_builder.append(true);
1412 }
1413 let f4_expected = f4_list_builder.finish();
1414
1415 let expected = RecordBatch::try_from_iter_with_nullable([
1416 ("f1", Arc::new(f1_expected) as Arc<dyn Array>, false),
1417 ("f2", Arc::new(f2_expected) as Arc<dyn Array>, false),
1418 ("f3", Arc::new(f3_expected) as Arc<dyn Array>, true),
1419 ("f4", Arc::new(f4_expected) as Arc<dyn Array>, false),
1420 ])
1421 .unwrap();
1422
1423 let file = arrow_test_data("avro/nested_records.avro");
1424 let batch_large = read_file(&file, 8, false);
1425 assert_eq!(
1426 batch_large, expected,
1427 "Decoded RecordBatch does not match expected data for nested records (batch size 8)"
1428 );
1429 let batch_small = read_file(&file, 3, false);
1430 assert_eq!(
1431 batch_small, expected,
1432 "Decoded RecordBatch does not match expected data for nested records (batch size 3)"
1433 );
1434 }
1435
1436 #[test]
1437 fn test_repeated_no_annotation() {
1438 let file = arrow_test_data("avro/repeated_no_annotation.avro");
1439 let batch_large = read_file(&file, 8, false);
1440 use arrow_array::{Int32Array, Int64Array, ListArray, StringArray, StructArray};
1441 use arrow_buffer::Buffer;
1442 use arrow_schema::{DataType, Field, Fields};
1443 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1444 let number_array = Int64Array::from(vec![
1445 Some(5555555555),
1446 Some(1111111111),
1447 Some(1111111111),
1448 Some(2222222222),
1449 Some(3333333333),
1450 ]);
1451 let kind_array =
1452 StringArray::from(vec![None, Some("home"), Some("home"), None, Some("mobile")]);
1453 let phone_fields = Fields::from(vec![
1454 Field::new("number", DataType::Int64, true),
1455 Field::new("kind", DataType::Utf8, true),
1456 ]);
1457 let phone_struct_data = ArrayDataBuilder::new(DataType::Struct(phone_fields))
1458 .len(5)
1459 .child_data(vec![number_array.into_data(), kind_array.into_data()])
1460 .build()
1461 .unwrap();
1462 let phone_struct_array = StructArray::from(phone_struct_data);
1463 let phone_list_offsets = Buffer::from_slice_ref([0, 0, 0, 0, 1, 2, 5]);
1464 let phone_list_validity = Buffer::from_iter([false, false, true, true, true, true]);
1465 let phone_item_field = Field::new("item", phone_struct_array.data_type().clone(), true);
1466 let phone_list_data = ArrayDataBuilder::new(DataType::List(Arc::new(phone_item_field)))
1467 .len(6)
1468 .add_buffer(phone_list_offsets)
1469 .null_bit_buffer(Some(phone_list_validity))
1470 .child_data(vec![phone_struct_array.into_data()])
1471 .build()
1472 .unwrap();
1473 let phone_list_array = ListArray::from(phone_list_data);
1474 let phone_numbers_validity = Buffer::from_iter([false, false, true, true, true, true]);
1475 let phone_numbers_field = Field::new("phone", phone_list_array.data_type().clone(), true);
1476 let phone_numbers_struct_data =
1477 ArrayDataBuilder::new(DataType::Struct(Fields::from(vec![phone_numbers_field])))
1478 .len(6)
1479 .null_bit_buffer(Some(phone_numbers_validity))
1480 .child_data(vec![phone_list_array.into_data()])
1481 .build()
1482 .unwrap();
1483 let phone_numbers_struct_array = StructArray::from(phone_numbers_struct_data);
1484 let expected = arrow_array::RecordBatch::try_from_iter_with_nullable([
1485 ("id", Arc::new(id_array) as _, true),
1486 (
1487 "phoneNumbers",
1488 Arc::new(phone_numbers_struct_array) as _,
1489 true,
1490 ),
1491 ])
1492 .unwrap();
1493 assert_eq!(batch_large, expected, "Mismatch for batch_size=8");
1494 let batch_small = read_file(&file, 3, false);
1495 assert_eq!(batch_small, expected, "Mismatch for batch_size=3");
1496 }
1497
1498 #[test]
1499 fn test_nonnullable_impala() {
1500 let file = arrow_test_data("avro/nonnullable.impala.avro");
1501 let id = Int64Array::from(vec![Some(8)]);
1502 let mut int_array_builder = ListBuilder::new(Int32Builder::new());
1503 {
1504 let vb = int_array_builder.values();
1505 vb.append_value(-1);
1506 }
1507 int_array_builder.append(true); let int_array = int_array_builder.finish();
1509 let mut iaa_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
1510 {
1511 let inner_list_builder = iaa_builder.values();
1512 {
1513 let vb = inner_list_builder.values();
1514 vb.append_value(-1);
1515 vb.append_value(-2);
1516 }
1517 inner_list_builder.append(true);
1518 inner_list_builder.append(true);
1519 }
1520 iaa_builder.append(true);
1521 let int_array_array = iaa_builder.finish();
1522 use arrow_array::builder::MapFieldNames;
1523 let field_names = MapFieldNames {
1524 entry: "entries".to_string(),
1525 key: "key".to_string(),
1526 value: "value".to_string(),
1527 };
1528 let mut int_map_builder =
1529 MapBuilder::new(Some(field_names), StringBuilder::new(), Int32Builder::new());
1530 {
1531 let (keys, vals) = int_map_builder.entries();
1532 keys.append_value("k1");
1533 vals.append_value(-1);
1534 }
1535 int_map_builder.append(true).unwrap(); let int_map = int_map_builder.finish();
1537 let field_names2 = MapFieldNames {
1538 entry: "entries".to_string(),
1539 key: "key".to_string(),
1540 value: "value".to_string(),
1541 };
1542 let mut ima_builder = ListBuilder::new(MapBuilder::new(
1543 Some(field_names2),
1544 StringBuilder::new(),
1545 Int32Builder::new(),
1546 ));
1547 {
1548 let map_builder = ima_builder.values();
1549 map_builder.append(true).unwrap();
1550 {
1551 let (keys, vals) = map_builder.entries();
1552 keys.append_value("k1");
1553 vals.append_value(1);
1554 }
1555 map_builder.append(true).unwrap();
1556 map_builder.append(true).unwrap();
1557 map_builder.append(true).unwrap();
1558 }
1559 ima_builder.append(true);
1560 let int_map_array_ = ima_builder.finish();
1561 let mut nested_sb = StructBuilder::new(
1562 vec![
1563 Arc::new(Field::new("a", DataType::Int32, true)),
1564 Arc::new(Field::new(
1565 "B",
1566 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1567 true,
1568 )),
1569 Arc::new(Field::new(
1570 "c",
1571 DataType::Struct(
1572 vec![Field::new(
1573 "D",
1574 DataType::List(Arc::new(Field::new(
1575 "item",
1576 DataType::List(Arc::new(Field::new(
1577 "item",
1578 DataType::Struct(
1579 vec![
1580 Field::new("e", DataType::Int32, true),
1581 Field::new("f", DataType::Utf8, true),
1582 ]
1583 .into(),
1584 ),
1585 true,
1586 ))),
1587 true,
1588 ))),
1589 true,
1590 )]
1591 .into(),
1592 ),
1593 true,
1594 )),
1595 Arc::new(Field::new(
1596 "G",
1597 DataType::Map(
1598 Arc::new(Field::new(
1599 "entries",
1600 DataType::Struct(
1601 vec![
1602 Field::new("key", DataType::Utf8, false),
1603 Field::new(
1604 "value",
1605 DataType::Struct(
1606 vec![Field::new(
1607 "h",
1608 DataType::Struct(
1609 vec![Field::new(
1610 "i",
1611 DataType::List(Arc::new(Field::new(
1612 "item",
1613 DataType::Float64,
1614 true,
1615 ))),
1616 true,
1617 )]
1618 .into(),
1619 ),
1620 true,
1621 )]
1622 .into(),
1623 ),
1624 true,
1625 ),
1626 ]
1627 .into(),
1628 ),
1629 false,
1630 )),
1631 false,
1632 ),
1633 true,
1634 )),
1635 ],
1636 vec![
1637 Box::new(Int32Builder::new()),
1638 Box::new(ListBuilder::new(Int32Builder::new())),
1639 {
1640 let d_field = Field::new(
1641 "D",
1642 DataType::List(Arc::new(Field::new(
1643 "item",
1644 DataType::List(Arc::new(Field::new(
1645 "item",
1646 DataType::Struct(
1647 vec![
1648 Field::new("e", DataType::Int32, true),
1649 Field::new("f", DataType::Utf8, true),
1650 ]
1651 .into(),
1652 ),
1653 true,
1654 ))),
1655 true,
1656 ))),
1657 true,
1658 );
1659 Box::new(StructBuilder::new(
1660 vec![Arc::new(d_field)],
1661 vec![Box::new({
1662 let ef_struct_builder = StructBuilder::new(
1663 vec![
1664 Arc::new(Field::new("e", DataType::Int32, true)),
1665 Arc::new(Field::new("f", DataType::Utf8, true)),
1666 ],
1667 vec![
1668 Box::new(Int32Builder::new()),
1669 Box::new(StringBuilder::new()),
1670 ],
1671 );
1672 let list_of_ef = ListBuilder::new(ef_struct_builder);
1673 ListBuilder::new(list_of_ef)
1674 })],
1675 ))
1676 },
1677 {
1678 let map_field_names = MapFieldNames {
1679 entry: "entries".to_string(),
1680 key: "key".to_string(),
1681 value: "value".to_string(),
1682 };
1683 let i_list_builder = ListBuilder::new(Float64Builder::new());
1684 let h_struct = StructBuilder::new(
1685 vec![Arc::new(Field::new(
1686 "i",
1687 DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
1688 true,
1689 ))],
1690 vec![Box::new(i_list_builder)],
1691 );
1692 let g_value_builder = StructBuilder::new(
1693 vec![Arc::new(Field::new(
1694 "h",
1695 DataType::Struct(
1696 vec![Field::new(
1697 "i",
1698 DataType::List(Arc::new(Field::new(
1699 "item",
1700 DataType::Float64,
1701 true,
1702 ))),
1703 true,
1704 )]
1705 .into(),
1706 ),
1707 true,
1708 ))],
1709 vec![Box::new(h_struct)],
1710 );
1711 Box::new(MapBuilder::new(
1712 Some(map_field_names),
1713 StringBuilder::new(),
1714 g_value_builder,
1715 ))
1716 },
1717 ],
1718 );
1719 nested_sb.append(true);
1720 {
1721 let a_builder = nested_sb.field_builder::<Int32Builder>(0).unwrap();
1722 a_builder.append_value(-1);
1723 }
1724 {
1725 let b_builder = nested_sb
1726 .field_builder::<ListBuilder<Int32Builder>>(1)
1727 .unwrap();
1728 {
1729 let vb = b_builder.values();
1730 vb.append_value(-1);
1731 }
1732 b_builder.append(true);
1733 }
1734 {
1735 let c_struct_builder = nested_sb.field_builder::<StructBuilder>(2).unwrap();
1736 c_struct_builder.append(true);
1737 let d_list_builder = c_struct_builder
1738 .field_builder::<ListBuilder<ListBuilder<StructBuilder>>>(0)
1739 .unwrap();
1740 {
1741 let sub_list_builder = d_list_builder.values();
1742 {
1743 let ef_struct = sub_list_builder.values();
1744 ef_struct.append(true);
1745 {
1746 let e_b = ef_struct.field_builder::<Int32Builder>(0).unwrap();
1747 e_b.append_value(-1);
1748 let f_b = ef_struct.field_builder::<StringBuilder>(1).unwrap();
1749 f_b.append_value("nonnullable");
1750 }
1751 sub_list_builder.append(true);
1752 }
1753 d_list_builder.append(true);
1754 }
1755 }
1756 {
1757 let g_map_builder = nested_sb
1758 .field_builder::<MapBuilder<StringBuilder, StructBuilder>>(3)
1759 .unwrap();
1760 g_map_builder.append(true).unwrap();
1761 }
1762 let nested_struct = nested_sb.finish();
1763 let expected = RecordBatch::try_from_iter_with_nullable([
1764 ("ID", Arc::new(id) as Arc<dyn Array>, true),
1765 ("Int_Array", Arc::new(int_array), true),
1766 ("int_array_array", Arc::new(int_array_array), true),
1767 ("Int_Map", Arc::new(int_map), true),
1768 ("int_map_array", Arc::new(int_map_array_), true),
1769 ("nested_Struct", Arc::new(nested_struct), true),
1770 ])
1771 .unwrap();
1772 let batch_large = read_file(&file, 8, false);
1773 assert_eq!(batch_large, expected, "Mismatch for batch_size=8");
1774 let batch_small = read_file(&file, 3, false);
1775 assert_eq!(batch_small, expected, "Mismatch for batch_size=3");
1776 }
1777
1778 #[test]
1779 fn test_nonnullable_impala_strict() {
1780 let file = arrow_test_data("avro/nonnullable.impala.avro");
1781 let err = read_file_strict(&file, 8, false).unwrap_err();
1782 assert!(err.to_string().contains(
1783 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1784 ));
1785 }
1786
1787 #[test]
1788 fn test_nullable_impala() {
1789 let file = arrow_test_data("avro/nullable.impala.avro");
1790 let batch1 = read_file(&file, 3, false);
1791 let batch2 = read_file(&file, 8, false);
1792 assert_eq!(batch1, batch2);
1793 let batch = batch1;
1794 assert_eq!(batch.num_rows(), 7);
1795 let id_array = batch
1796 .column(0)
1797 .as_any()
1798 .downcast_ref::<Int64Array>()
1799 .expect("id column should be an Int64Array");
1800 let expected_ids = [1, 2, 3, 4, 5, 6, 7];
1801 for (i, &expected_id) in expected_ids.iter().enumerate() {
1802 assert_eq!(id_array.value(i), expected_id, "Mismatch in id at row {i}",);
1803 }
1804 let int_array = batch
1805 .column(1)
1806 .as_any()
1807 .downcast_ref::<ListArray>()
1808 .expect("int_array column should be a ListArray");
1809 {
1810 let offsets = int_array.value_offsets();
1811 let start = offsets[0] as usize;
1812 let end = offsets[1] as usize;
1813 let values = int_array
1814 .values()
1815 .as_any()
1816 .downcast_ref::<Int32Array>()
1817 .expect("Values of int_array should be an Int32Array");
1818 let row0: Vec<Option<i32>> = (start..end).map(|i| Some(values.value(i))).collect();
1819 assert_eq!(
1820 row0,
1821 vec![Some(1), Some(2), Some(3)],
1822 "Mismatch in int_array row 0"
1823 );
1824 }
1825 let nested_struct = batch
1826 .column(5)
1827 .as_any()
1828 .downcast_ref::<StructArray>()
1829 .expect("nested_struct column should be a StructArray");
1830 let a_array = nested_struct
1831 .column_by_name("A")
1832 .expect("Field A should exist in nested_struct")
1833 .as_any()
1834 .downcast_ref::<Int32Array>()
1835 .expect("Field A should be an Int32Array");
1836 assert_eq!(a_array.value(0), 1, "Mismatch in nested_struct.A at row 0");
1837 assert!(
1838 !a_array.is_valid(1),
1839 "Expected null in nested_struct.A at row 1"
1840 );
1841 assert!(
1842 !a_array.is_valid(3),
1843 "Expected null in nested_struct.A at row 3"
1844 );
1845 assert_eq!(a_array.value(6), 7, "Mismatch in nested_struct.A at row 6");
1846 }
1847
1848 #[test]
1849 fn test_nullable_impala_strict() {
1850 let file = arrow_test_data("avro/nullable.impala.avro");
1851 let err = read_file_strict(&file, 8, false).unwrap_err();
1852 assert!(err.to_string().contains(
1853 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1854 ));
1855 }
1856}