1use crate::codec::AvroFieldBuilder;
66use crate::compression::CompressionCodec;
67use crate::schema::{
68 AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY,
69};
70use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
71use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
72use arrow_array::RecordBatch;
73use arrow_schema::{ArrowError, Schema};
74use std::io::Write;
75use std::sync::Arc;
76
77mod encoder;
79pub mod format;
81
82#[derive(Debug, Clone)]
84pub struct WriterBuilder {
85 schema: Schema,
86 codec: Option<CompressionCodec>,
87 capacity: usize,
88 fingerprint_strategy: Option<FingerprintStrategy>,
89}
90
91impl WriterBuilder {
92 pub fn new(schema: Schema) -> Self {
99 Self {
100 schema,
101 codec: None,
102 capacity: 1024,
103 fingerprint_strategy: None,
104 }
105 }
106
107 pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self {
110 self.fingerprint_strategy = Some(strategy);
111 self
112 }
113
114 pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self {
116 self.codec = codec;
117 self
118 }
119
120 pub fn with_capacity(mut self, capacity: usize) -> Self {
122 self.capacity = capacity;
123 self
124 }
125
126 pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, ArrowError>
129 where
130 W: Write,
131 F: AvroFormat,
132 {
133 let mut format = F::default();
134 let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
135 Some(json) => AvroSchema::new(json.clone()),
136 None => AvroSchema::try_from(&self.schema)?,
137 };
138 let maybe_fingerprint = if F::NEEDS_PREFIX {
139 match self.fingerprint_strategy {
140 Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)),
141 Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(id)),
142 Some(strategy) => {
143 Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?)
144 }
145 None => Some(
146 avro_schema
147 .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?,
148 ),
149 }
150 } else {
151 None
152 };
153 let mut md = self.schema.metadata().clone();
154 md.insert(
155 SCHEMA_METADATA_KEY.to_string(),
156 avro_schema.clone().json_string,
157 );
158 let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
159 format.start_stream(&mut writer, &schema, self.codec)?;
160 let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
161 let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
162 .with_fingerprint(maybe_fingerprint)
163 .build()?;
164 Ok(Writer {
165 writer,
166 schema,
167 format,
168 compression: self.codec,
169 capacity: self.capacity,
170 encoder,
171 })
172 }
173}
174
175#[derive(Debug)]
183pub struct Writer<W: Write, F: AvroFormat> {
184 writer: W,
185 schema: Arc<Schema>,
186 format: F,
187 compression: Option<CompressionCodec>,
188 capacity: usize,
189 encoder: RecordEncoder,
190}
191
192pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
233
234pub type AvroStreamWriter<W> = Writer<W, AvroSoeFormat>;
266
267impl<W: Write> Writer<W, AvroOcfFormat> {
268 pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
294 WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
295 }
296
297 pub fn sync_marker(&self) -> Option<&[u8; 16]> {
299 self.format.sync_marker()
300 }
301}
302
303impl<W: Write> Writer<W, AvroSoeFormat> {
304 pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
332 WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
333 }
334}
335
336impl<W: Write, F: AvroFormat> Writer<W, F> {
337 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
339 if batch.schema().fields() != self.schema.fields() {
340 return Err(ArrowError::SchemaError(
341 "Schema of RecordBatch differs from Writer schema".to_string(),
342 ));
343 }
344 match self.format.sync_marker() {
345 Some(&sync) => self.write_ocf_block(batch, &sync),
346 None => self.write_stream(batch),
347 }
348 }
349
350 pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
354 for b in batches {
355 self.write(b)?;
356 }
357 Ok(())
358 }
359
360 pub fn finish(&mut self) -> Result<(), ArrowError> {
362 self.writer
363 .flush()
364 .map_err(|e| ArrowError::IoError(format!("Error flushing writer: {e}"), e))
365 }
366
367 pub fn into_inner(self) -> W {
369 self.writer
370 }
371
372 fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), ArrowError> {
373 let mut buf = Vec::<u8>::with_capacity(self.capacity);
374 self.encoder.encode(&mut buf, batch)?;
375 let encoded = match self.compression {
376 Some(codec) => codec.compress(&buf)?,
377 None => buf,
378 };
379 write_long(&mut self.writer, batch.num_rows() as i64)?;
380 write_long(&mut self.writer, encoded.len() as i64)?;
381 self.writer
382 .write_all(&encoded)
383 .map_err(|e| ArrowError::IoError(format!("Error writing Avro block: {e}"), e))?;
384 self.writer
385 .write_all(sync)
386 .map_err(|e| ArrowError::IoError(format!("Error writing Avro sync: {e}"), e))?;
387 Ok(())
388 }
389
390 fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
391 self.encoder.encode(&mut self.writer, batch)?;
392 Ok(())
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use crate::compression::CompressionCodec;
400 use crate::reader::ReaderBuilder;
401 use crate::schema::{AvroSchema, SchemaStore};
402 use crate::test_util::arrow_test_data;
403 use arrow::datatypes::TimeUnit;
404 #[cfg(feature = "avro_custom_types")]
405 use arrow_array::types::{Int16Type, Int32Type, Int64Type};
406 use arrow_array::types::{
407 Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
408 TimestampMillisecondType, TimestampNanosecondType,
409 };
410 use arrow_array::{
411 Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray, RecordBatch,
412 StructArray, UnionArray,
413 };
414 #[cfg(feature = "avro_custom_types")]
415 use arrow_array::{Int16Array, Int64Array, RunArray, StringArray};
416 #[cfg(not(feature = "avro_custom_types"))]
417 use arrow_schema::{DataType, Field, Schema};
418 #[cfg(feature = "avro_custom_types")]
419 use arrow_schema::{DataType, Field, Schema};
420 use std::collections::HashMap;
421 use std::collections::HashSet;
422 use std::fs::File;
423 use std::io::{BufReader, Cursor};
424 use std::path::PathBuf;
425 use std::sync::Arc;
426 use tempfile::NamedTempFile;
427
428 fn files() -> impl Iterator<Item = &'static str> {
429 [
430 #[cfg(feature = "snappy")]
432 "avro/alltypes_plain.avro",
433 #[cfg(feature = "snappy")]
434 "avro/alltypes_plain.snappy.avro",
435 #[cfg(feature = "zstd")]
436 "avro/alltypes_plain.zstandard.avro",
437 #[cfg(feature = "bzip2")]
438 "avro/alltypes_plain.bzip2.avro",
439 #[cfg(feature = "xz")]
440 "avro/alltypes_plain.xz.avro",
441 ]
442 .into_iter()
443 }
444
445 fn make_schema() -> Schema {
446 Schema::new(vec![
447 Field::new("id", DataType::Int32, false),
448 Field::new("name", DataType::Binary, false),
449 ])
450 }
451
452 fn make_batch() -> RecordBatch {
453 let ids = Int32Array::from(vec![1, 2, 3]);
454 let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]);
455 RecordBatch::try_new(
456 Arc::new(make_schema()),
457 vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
458 )
459 .expect("failed to build test RecordBatch")
460 }
461
462 #[test]
463 fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), ArrowError> {
464 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
465 let batch = RecordBatch::try_new(
466 Arc::new(schema.clone()),
467 vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
468 )?;
469 let buf: Vec<u8> = Vec::new();
470 let mut writer = AvroStreamWriter::new(buf, schema.clone())?;
471 writer.write(&batch)?;
472 let encoded = writer.into_inner();
473 let mut store = SchemaStore::new(); let avro_schema = AvroSchema::try_from(&schema)?;
475 let _fp = store.register(avro_schema)?;
476 let mut decoder = ReaderBuilder::new()
477 .with_writer_schema_store(store)
478 .build_decoder()?;
479 let _consumed = decoder.decode(&encoded)?;
480 let decoded = decoder
481 .flush()?
482 .expect("expected at least one batch from decoder");
483 assert_eq!(decoded.num_columns(), 1);
484 assert_eq!(decoded.num_rows(), 2);
485 let col = decoded
486 .column(0)
487 .as_any()
488 .downcast_ref::<Int32Array>()
489 .expect("int column");
490 assert_eq!(col, &Int32Array::from(vec![10, 20]));
491 Ok(())
492 }
493
494 #[test]
495 fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), ArrowError> {
496 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
497 let batch = RecordBatch::try_new(
498 Arc::new(schema.clone()),
499 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
500 )?;
501 let schema_id: u32 = 42;
502 let mut writer = WriterBuilder::new(schema.clone())
503 .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
504 .build::<_, AvroSoeFormat>(Vec::new())?;
505 writer.write(&batch)?;
506 let encoded = writer.into_inner();
507 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
508 let avro_schema = AvroSchema::try_from(&schema)?;
509 let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?;
510 let mut decoder = ReaderBuilder::new()
511 .with_writer_schema_store(store)
512 .build_decoder()?;
513 let _ = decoder.decode(&encoded)?;
514 let decoded = decoder
515 .flush()?
516 .expect("expected at least one batch from decoder");
517 assert_eq!(decoded.num_columns(), 1);
518 assert_eq!(decoded.num_rows(), 3);
519 let col = decoded
520 .column(0)
521 .as_any()
522 .downcast_ref::<Int32Array>()
523 .expect("int column");
524 assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
525 Ok(())
526 }
527
528 #[test]
529 fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), ArrowError> {
530 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
531 let batch = RecordBatch::try_new(
532 Arc::new(schema.clone()),
533 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
534 )?;
535 let schema_id: u64 = 42;
536 let mut writer = WriterBuilder::new(schema.clone())
537 .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
538 .build::<_, AvroSoeFormat>(Vec::new())?;
539 writer.write(&batch)?;
540 let encoded = writer.into_inner();
541 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
542 let avro_schema = AvroSchema::try_from(&schema)?;
543 let _ = store.set(Fingerprint::Id64(schema_id), avro_schema)?;
544 let mut decoder = ReaderBuilder::new()
545 .with_writer_schema_store(store)
546 .build_decoder()?;
547 let _ = decoder.decode(&encoded)?;
548 let decoded = decoder
549 .flush()?
550 .expect("expected at least one batch from decoder");
551 assert_eq!(decoded.num_columns(), 1);
552 assert_eq!(decoded.num_rows(), 3);
553 let col = decoded
554 .column(0)
555 .as_any()
556 .downcast_ref::<Int32Array>()
557 .expect("int column");
558 assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
559 Ok(())
560 }
561
562 #[test]
563 fn test_ocf_writer_generates_header_and_sync() -> Result<(), ArrowError> {
564 let batch = make_batch();
565 let buffer: Vec<u8> = Vec::new();
566 let mut writer = AvroWriter::new(buffer, make_schema())?;
567 writer.write(&batch)?;
568 writer.finish()?;
569 let out = writer.into_inner();
570 assert_eq!(&out[..4], b"Obj\x01", "OCF magic bytes missing/incorrect");
571 let trailer = &out[out.len() - 16..];
572 assert_eq!(trailer.len(), 16, "expected 16‑byte sync marker");
573 Ok(())
574 }
575
576 #[test]
577 fn test_schema_mismatch_yields_error() {
578 let batch = make_batch();
579 let alt_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
580 let buffer = Vec::<u8>::new();
581 let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
582 let err = writer.write(&batch).unwrap_err();
583 assert!(matches!(err, ArrowError::SchemaError(_)));
584 }
585
586 #[test]
587 fn test_write_batches_accumulates_multiple() -> Result<(), ArrowError> {
588 let batch1 = make_batch();
589 let batch2 = make_batch();
590 let buffer = Vec::<u8>::new();
591 let mut writer = AvroWriter::new(buffer, make_schema())?;
592 writer.write_batches(&[&batch1, &batch2])?;
593 writer.finish()?;
594 let out = writer.into_inner();
595 assert!(out.len() > 4, "combined batches produced tiny file");
596 Ok(())
597 }
598
599 #[test]
600 fn test_finish_without_write_adds_header() -> Result<(), ArrowError> {
601 let buffer = Vec::<u8>::new();
602 let mut writer = AvroWriter::new(buffer, make_schema())?;
603 writer.finish()?;
604 let out = writer.into_inner();
605 assert_eq!(&out[..4], b"Obj\x01", "finish() should emit OCF header");
606 Ok(())
607 }
608
609 #[test]
610 fn test_write_long_encodes_zigzag_varint() -> Result<(), ArrowError> {
611 let mut buf = Vec::new();
612 write_long(&mut buf, 0)?;
613 write_long(&mut buf, -1)?;
614 write_long(&mut buf, 1)?;
615 write_long(&mut buf, -2)?;
616 write_long(&mut buf, 2147483647)?;
617 assert!(
618 buf.starts_with(&[0x00, 0x01, 0x02, 0x03]),
619 "zig‑zag varint encodings incorrect: {buf:?}"
620 );
621 Ok(())
622 }
623
624 #[test]
625 fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), ArrowError> {
626 for rel in files() {
627 let path = arrow_test_data(rel);
628 let rdr_file = File::open(&path).expect("open input avro");
629 let reader = ReaderBuilder::new()
630 .build(BufReader::new(rdr_file))
631 .expect("build reader");
632 let schema = reader.schema();
633 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
634 let original =
635 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
636 let tmp = NamedTempFile::new().expect("create temp file");
637 let out_path = tmp.into_temp_path();
638 let out_file = File::create(&out_path).expect("create temp avro");
639 let codec = if rel.contains(".snappy.") {
640 Some(CompressionCodec::Snappy)
641 } else if rel.contains(".zstandard.") {
642 Some(CompressionCodec::ZStandard)
643 } else if rel.contains(".bzip2.") {
644 Some(CompressionCodec::Bzip2)
645 } else if rel.contains(".xz.") {
646 Some(CompressionCodec::Xz)
647 } else {
648 None
649 };
650 let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
651 .with_compression(codec)
652 .build::<_, AvroOcfFormat>(out_file)?;
653 writer.write(&original)?;
654 writer.finish()?;
655 drop(writer);
656 let rt_file = File::open(&out_path).expect("open roundtrip avro");
657 let rt_reader = ReaderBuilder::new()
658 .build(BufReader::new(rt_file))
659 .expect("build roundtrip reader");
660 let rt_schema = rt_reader.schema();
661 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
662 let roundtrip =
663 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
664 assert_eq!(
665 roundtrip, original,
666 "Round-trip batch mismatch for file: {}",
667 rel
668 );
669 }
670 Ok(())
671 }
672
673 #[test]
674 fn test_roundtrip_nested_records_writer() -> Result<(), ArrowError> {
675 let path = arrow_test_data("avro/nested_records.avro");
676 let rdr_file = File::open(&path).expect("open nested_records.avro");
677 let reader = ReaderBuilder::new()
678 .build(BufReader::new(rdr_file))
679 .expect("build reader for nested_records.avro");
680 let schema = reader.schema();
681 let batches = reader.collect::<Result<Vec<_>, _>>()?;
682 let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
683 let tmp = NamedTempFile::new().expect("create temp file");
684 let out_path = tmp.into_temp_path();
685 {
686 let out_file = File::create(&out_path).expect("create output avro");
687 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
688 writer.write(&original)?;
689 writer.finish()?;
690 }
691 let rt_file = File::open(&out_path).expect("open round_trip avro");
692 let rt_reader = ReaderBuilder::new()
693 .build(BufReader::new(rt_file))
694 .expect("build round_trip reader");
695 let rt_schema = rt_reader.schema();
696 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
697 let round_trip =
698 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
699 assert_eq!(
700 round_trip, original,
701 "Round-trip batch mismatch for nested_records.avro"
702 );
703 Ok(())
704 }
705
706 #[test]
707 #[cfg(feature = "snappy")]
708 fn test_roundtrip_nested_lists_writer() -> Result<(), ArrowError> {
709 let path = arrow_test_data("avro/nested_lists.snappy.avro");
710 let rdr_file = File::open(&path).expect("open nested_lists.snappy.avro");
711 let reader = ReaderBuilder::new()
712 .build(BufReader::new(rdr_file))
713 .expect("build reader for nested_lists.snappy.avro");
714 let schema = reader.schema();
715 let batches = reader.collect::<Result<Vec<_>, _>>()?;
716 let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
717 let tmp = NamedTempFile::new().expect("create temp file");
718 let out_path = tmp.into_temp_path();
719 {
720 let out_file = File::create(&out_path).expect("create output avro");
721 let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
722 .with_compression(Some(CompressionCodec::Snappy))
723 .build::<_, AvroOcfFormat>(out_file)?;
724 writer.write(&original)?;
725 writer.finish()?;
726 }
727 let rt_file = File::open(&out_path).expect("open round_trip avro");
728 let rt_reader = ReaderBuilder::new()
729 .build(BufReader::new(rt_file))
730 .expect("build round_trip reader");
731 let rt_schema = rt_reader.schema();
732 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
733 let round_trip =
734 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
735 assert_eq!(
736 round_trip, original,
737 "Round-trip batch mismatch for nested_lists.snappy.avro"
738 );
739 Ok(())
740 }
741
742 #[test]
743 fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> {
744 let path = arrow_test_data("avro/simple_fixed.avro");
745 let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
746 let reader = ReaderBuilder::new()
747 .build(BufReader::new(rdr_file))
748 .expect("build avro reader");
749 let schema = reader.schema();
750 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
751 let original =
752 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
753 let tmp = NamedTempFile::new().expect("create temp file");
754 let out_file = File::create(tmp.path()).expect("create temp avro");
755 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
756 writer.write(&original)?;
757 writer.finish()?;
758 drop(writer);
759 let rt_file = File::open(tmp.path()).expect("open round_trip avro");
760 let rt_reader = ReaderBuilder::new()
761 .build(BufReader::new(rt_file))
762 .expect("build round_trip reader");
763 let rt_schema = rt_reader.schema();
764 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
765 let round_trip =
766 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
767 assert_eq!(round_trip, original);
768 Ok(())
769 }
770
771 #[test]
773 #[cfg(feature = "canonical_extension_types")]
774 fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> {
775 use arrow_schema::{DataType, IntervalUnit};
776 let in_file =
777 File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
778 let reader = ReaderBuilder::new()
779 .build(BufReader::new(in_file))
780 .expect("build reader for duration_uuid.avro");
781 let in_schema = reader.schema();
782 let has_mdn = in_schema.fields().iter().any(|f| {
783 matches!(
784 f.data_type(),
785 DataType::Interval(IntervalUnit::MonthDayNano)
786 )
787 });
788 assert!(
789 has_mdn,
790 "expected at least one Interval(MonthDayNano) field in duration_uuid.avro"
791 );
792 let has_uuid_fixed = in_schema
793 .fields()
794 .iter()
795 .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16)));
796 assert!(
797 has_uuid_fixed,
798 "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro"
799 );
800 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
801 let input =
802 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
803 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
805 writer.write(&input)?;
806 writer.finish()?;
807 let bytes = writer.into_inner();
808 let rt_reader = ReaderBuilder::new()
809 .build(Cursor::new(bytes))
810 .expect("build round_trip reader");
811 let rt_schema = rt_reader.schema();
812 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
813 let round_trip =
814 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
815 assert_eq!(round_trip, input);
816 Ok(())
817 }
818
819 #[test]
821 #[cfg(not(feature = "canonical_extension_types"))]
822 fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> Result<(), ArrowError>
823 {
824 use arrow::datatypes::{DataType, IntervalUnit};
825 use std::io::BufReader;
826
827 let in_file =
829 File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
830 let reader = ReaderBuilder::new()
831 .build(BufReader::new(in_file))
832 .expect("build reader for duration_uuid.avro");
833 let in_schema = reader.schema();
834
835 assert!(
837 in_schema.fields().iter().any(|f| {
838 matches!(
839 f.data_type(),
840 DataType::Interval(IntervalUnit::MonthDayNano)
841 )
842 }),
843 "expected at least one Interval(MonthDayNano) field"
844 );
845 assert!(
846 in_schema
847 .fields()
848 .iter()
849 .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))),
850 "expected a FixedSizeBinary(16) field (uuid)"
851 );
852
853 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
854 let input =
855 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
856
857 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
859 writer.write(&input)?;
860 writer.finish()?;
861 let bytes = writer.into_inner();
862 let rt_reader = ReaderBuilder::new()
863 .build(Cursor::new(bytes))
864 .expect("build round_trip reader");
865 let rt_schema = rt_reader.schema();
866 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
867 let round_trip =
868 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
869
870 assert_eq!(
872 round_trip.column(0),
873 input.column(0),
874 "duration column values differ"
875 );
876 assert_eq!(round_trip.column(1), input.column(1), "uuid bytes differ");
877
878 let uuid_rt = rt_schema.field_with_name("uuid_field")?;
881 assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
882 assert_eq!(
883 uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
884 Some("uuid"),
885 "expected `logicalType = \"uuid\"` on round-tripped field metadata"
886 );
887
888 let dur_rt = rt_schema.field_with_name("duration_field")?;
890 assert!(matches!(
891 dur_rt.data_type(),
892 DataType::Interval(IntervalUnit::MonthDayNano)
893 ));
894
895 Ok(())
896 }
897
898 #[test]
902 #[cfg(feature = "snappy")]
904 fn test_nonnullable_impala_roundtrip_writer() -> Result<(), ArrowError> {
905 let path = arrow_test_data("avro/nonnullable.impala.avro");
907 let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro");
908 let reader = ReaderBuilder::new()
909 .build(BufReader::new(rdr_file))
910 .expect("build reader for nonnullable.impala.avro");
911 let in_schema = reader.schema();
913 let has_map = in_schema
915 .fields()
916 .iter()
917 .any(|f| matches!(f.data_type(), DataType::Map(_, _)));
918 assert!(
919 has_map,
920 "expected at least one Map field in avro/nonnullable.impala.avro"
921 );
922
923 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
924 let original =
925 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
926 let buffer = Vec::<u8>::new();
928 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
929 writer.write(&original)?;
930 writer.finish()?;
931 let out_bytes = writer.into_inner();
932 let rt_reader = ReaderBuilder::new()
934 .build(Cursor::new(out_bytes))
935 .expect("build reader for round-tripped in-memory OCF");
936 let rt_schema = rt_reader.schema();
937 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
938 let roundtrip =
939 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
940 assert_eq!(
942 roundtrip, original,
943 "Round-trip Avro map data mismatch for nonnullable.impala.avro"
944 );
945 Ok(())
946 }
947
948 #[test]
949 #[cfg(feature = "snappy")]
951 fn test_roundtrip_decimals_via_writer() -> Result<(), ArrowError> {
952 let files: [(&str, bool); 8] = [
954 ("avro/fixed_length_decimal.avro", true), ("avro/fixed_length_decimal_legacy.avro", true), ("avro/int32_decimal.avro", true), ("avro/int64_decimal.avro", true), ("test/data/int256_decimal.avro", false), ("test/data/fixed256_decimal.avro", false), ("test/data/fixed_length_decimal_legacy_32.avro", false), ("test/data/int128_decimal.avro", false), ];
963 for (rel, in_test_data_dir) in files {
964 let path: String = if in_test_data_dir {
966 arrow_test_data(rel)
967 } else {
968 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
969 .join(rel)
970 .to_string_lossy()
971 .into_owned()
972 };
973 let f_in = File::open(&path).expect("open input avro");
975 let rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
976 let in_schema = rdr.schema();
977 let in_batches = rdr.collect::<Result<Vec<_>, _>>()?;
978 let original =
979 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
980 let tmp = NamedTempFile::new().expect("create temp file");
982 let out_path = tmp.into_temp_path();
983 let out_file = File::create(&out_path).expect("create temp avro");
984 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
985 writer.write(&original)?;
986 writer.finish()?;
987 let f_rt = File::open(&out_path).expect("open roundtrip avro");
989 let rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
990 let rt_schema = rt_rdr.schema();
991 let rt_batches = rt_rdr.collect::<Result<Vec<_>, _>>()?;
992 let roundtrip =
993 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt");
994 assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}");
995 }
996 Ok(())
997 }
998
999 #[test]
1000 fn test_named_types_complex_roundtrip() -> Result<(), ArrowError> {
1001 let path =
1003 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
1004 let rdr_file = File::open(&path).expect("open avro/named_types_complex.avro");
1005
1006 let reader = ReaderBuilder::new()
1007 .build(BufReader::new(rdr_file))
1008 .expect("build reader for named_types_complex.avro");
1009
1010 let in_schema = reader.schema();
1012 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1013 let original =
1014 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1015
1016 {
1018 let arrow_schema = original.schema();
1019
1020 let author_field = arrow_schema.field_with_name("author")?;
1022 let author_type = author_field.data_type();
1023 let editors_field = arrow_schema.field_with_name("editors")?;
1024 let editors_item_type = match editors_field.data_type() {
1025 DataType::List(item_field) => item_field.data_type(),
1026 other => panic!("Editors field should be a List, but was {:?}", other),
1027 };
1028 assert_eq!(
1029 author_type, editors_item_type,
1030 "The DataType for the 'author' struct and the 'editors' list items must be identical"
1031 );
1032
1033 let status_field = arrow_schema.field_with_name("status")?;
1035 let status_type = status_field.data_type();
1036 assert!(
1037 matches!(status_type, DataType::Dictionary(_, _)),
1038 "Status field should be a Dictionary (Enum)"
1039 );
1040
1041 let prev_status_field = arrow_schema.field_with_name("previous_status")?;
1042 let prev_status_type = prev_status_field.data_type();
1043 assert_eq!(
1044 status_type, prev_status_type,
1045 "The DataType for 'status' and 'previous_status' enums must be identical"
1046 );
1047
1048 let content_hash_field = arrow_schema.field_with_name("content_hash")?;
1050 let content_hash_type = content_hash_field.data_type();
1051 assert!(
1052 matches!(content_hash_type, DataType::FixedSizeBinary(16)),
1053 "Content hash should be FixedSizeBinary(16)"
1054 );
1055
1056 let thumb_hash_field = arrow_schema.field_with_name("thumbnail_hash")?;
1057 let thumb_hash_type = thumb_hash_field.data_type();
1058 assert_eq!(
1059 content_hash_type, thumb_hash_type,
1060 "The DataType for 'content_hash' and 'thumbnail_hash' fixed types must be identical"
1061 );
1062 }
1063
1064 let buffer: Vec<u8> = Vec::new();
1066 let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1067 writer.write(&original)?;
1068 writer.finish()?;
1069 let bytes = writer.into_inner();
1070
1071 let rt_reader = ReaderBuilder::new()
1073 .build(Cursor::new(bytes))
1074 .expect("build reader for round-trip");
1075 let rt_schema = rt_reader.schema();
1076 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1077 let roundtrip =
1078 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1079
1080 assert_eq!(
1081 roundtrip, original,
1082 "Avro complex named types round-trip mismatch"
1083 );
1084
1085 Ok(())
1086 }
1087
1088 fn assert_schema_is_semantically_equivalent(expected: &Schema, actual: &Schema) {
1095 assert_metadata_is_superset(expected.metadata(), actual.metadata(), "Schema");
1097
1098 assert_eq!(
1100 expected.fields().len(),
1101 actual.fields().len(),
1102 "Schema must have the same number of fields"
1103 );
1104
1105 for (expected_field, actual_field) in expected.fields().iter().zip(actual.fields().iter()) {
1106 assert_field_is_semantically_equivalent(expected_field, actual_field);
1107 }
1108 }
1109
1110 fn assert_field_is_semantically_equivalent(expected: &Field, actual: &Field) {
1111 let context = format!("Field '{}'", expected.name());
1112
1113 assert_eq!(
1114 expected.name(),
1115 actual.name(),
1116 "{context}: names must match"
1117 );
1118 assert_eq!(
1119 expected.is_nullable(),
1120 actual.is_nullable(),
1121 "{context}: nullability must match"
1122 );
1123
1124 assert_datatype_is_semantically_equivalent(
1126 expected.data_type(),
1127 actual.data_type(),
1128 &context,
1129 );
1130
1131 assert_metadata_is_superset(expected.metadata(), actual.metadata(), &context);
1133 }
1134
1135 fn assert_datatype_is_semantically_equivalent(
1136 expected: &DataType,
1137 actual: &DataType,
1138 context: &str,
1139 ) {
1140 match (expected, actual) {
1141 (DataType::List(expected_field), DataType::List(actual_field))
1142 | (DataType::LargeList(expected_field), DataType::LargeList(actual_field))
1143 | (DataType::Map(expected_field, _), DataType::Map(actual_field, _)) => {
1144 assert_field_is_semantically_equivalent(expected_field, actual_field);
1145 }
1146 (DataType::Struct(expected_fields), DataType::Struct(actual_fields)) => {
1147 assert_eq!(
1148 expected_fields.len(),
1149 actual_fields.len(),
1150 "{context}: struct must have same number of fields"
1151 );
1152 for (ef, af) in expected_fields.iter().zip(actual_fields.iter()) {
1153 assert_field_is_semantically_equivalent(ef, af);
1154 }
1155 }
1156 (
1157 DataType::Union(expected_fields, expected_mode),
1158 DataType::Union(actual_fields, actual_mode),
1159 ) => {
1160 assert_eq!(
1161 expected_mode, actual_mode,
1162 "{context}: union mode must match"
1163 );
1164 assert_eq!(
1165 expected_fields.len(),
1166 actual_fields.len(),
1167 "{context}: union must have same number of variants"
1168 );
1169 for ((exp_id, exp_field), (act_id, act_field)) in
1170 expected_fields.iter().zip(actual_fields.iter())
1171 {
1172 assert_eq!(exp_id, act_id, "{context}: union type ids must match");
1173 assert_field_is_semantically_equivalent(exp_field, act_field);
1174 }
1175 }
1176 _ => {
1177 assert_eq!(expected, actual, "{context}: data types must be identical");
1178 }
1179 }
1180 }
1181
1182 fn assert_batch_data_is_identical(expected: &RecordBatch, actual: &RecordBatch) {
1183 assert_eq!(
1184 expected.num_columns(),
1185 actual.num_columns(),
1186 "RecordBatches must have the same number of columns"
1187 );
1188 assert_eq!(
1189 expected.num_rows(),
1190 actual.num_rows(),
1191 "RecordBatches must have the same number of rows"
1192 );
1193
1194 for i in 0..expected.num_columns() {
1195 let context = format!("Column {i}");
1196 let expected_col = expected.column(i);
1197 let actual_col = actual.column(i);
1198 assert_array_data_is_identical(expected_col, actual_col, &context);
1199 }
1200 }
1201
1202 fn assert_array_data_is_identical(expected: &dyn Array, actual: &dyn Array, context: &str) {
1204 assert_eq!(
1205 expected.nulls(),
1206 actual.nulls(),
1207 "{context}: null buffers must match"
1208 );
1209 assert_eq!(
1210 expected.len(),
1211 actual.len(),
1212 "{context}: array lengths must match"
1213 );
1214
1215 match (expected.data_type(), actual.data_type()) {
1216 (DataType::Union(expected_fields, _), DataType::Union(..)) => {
1217 let expected_union = expected.as_any().downcast_ref::<UnionArray>().unwrap();
1218 let actual_union = actual.as_any().downcast_ref::<UnionArray>().unwrap();
1219
1220 assert_eq!(
1222 &expected.to_data().buffers()[0],
1223 &actual.to_data().buffers()[0],
1224 "{context}: union type_ids buffer mismatch"
1225 );
1226
1227 if expected.to_data().buffers().len() > 1 {
1229 assert_eq!(
1230 &expected.to_data().buffers()[1],
1231 &actual.to_data().buffers()[1],
1232 "{context}: union value_offsets buffer mismatch"
1233 );
1234 }
1235
1236 for (type_id, _) in expected_fields.iter() {
1238 let child_context = format!("{context} -> child variant {type_id}");
1239 assert_array_data_is_identical(
1240 expected_union.child(type_id),
1241 actual_union.child(type_id),
1242 &child_context,
1243 );
1244 }
1245 }
1246 (DataType::Struct(_), DataType::Struct(_)) => {
1247 let expected_struct = expected.as_any().downcast_ref::<StructArray>().unwrap();
1248 let actual_struct = actual.as_any().downcast_ref::<StructArray>().unwrap();
1249 for i in 0..expected_struct.num_columns() {
1250 let child_context = format!("{context} -> struct child {i}");
1251 assert_array_data_is_identical(
1252 expected_struct.column(i),
1253 actual_struct.column(i),
1254 &child_context,
1255 );
1256 }
1257 }
1258 _ => {
1260 assert_eq!(
1261 expected.to_data().buffers(),
1262 actual.to_data().buffers(),
1263 "{context}: data buffers must match"
1264 );
1265 }
1266 }
1267 }
1268
1269 fn assert_metadata_is_superset(
1272 expected_meta: &HashMap<String, String>,
1273 actual_meta: &HashMap<String, String>,
1274 context: &str,
1275 ) {
1276 let allowed_additions: HashSet<&str> =
1277 vec!["arrowUnionMode", "arrowUnionTypeIds", "avro.name"]
1278 .into_iter()
1279 .collect();
1280 for (key, expected_value) in expected_meta {
1281 match actual_meta.get(key) {
1282 Some(actual_value) => assert_eq!(
1283 expected_value, actual_value,
1284 "{context}: preserved metadata for key '{key}' must have the same value"
1285 ),
1286 None => panic!("{context}: metadata key '{key}' was lost during roundtrip"),
1287 }
1288 }
1289 for key in actual_meta.keys() {
1290 if !expected_meta.contains_key(key) && !allowed_additions.contains(key.as_str()) {
1291 panic!("{context}: unexpected metadata key '{key}' was added during roundtrip");
1292 }
1293 }
1294 }
1295
1296 #[test]
1297 fn test_union_roundtrip() -> Result<(), ArrowError> {
1298 let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1299 .join("test/data/union_fields.avro")
1300 .to_string_lossy()
1301 .into_owned();
1302 let rdr_file = File::open(&file_path).expect("open avro/union_fields.avro");
1303 let reader = ReaderBuilder::new()
1304 .build(BufReader::new(rdr_file))
1305 .expect("build reader for union_fields.avro");
1306 let schema = reader.schema();
1307 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1308 let original =
1309 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1310 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1311 writer.write(&original)?;
1312 writer.finish()?;
1313 let bytes = writer.into_inner();
1314 let rt_reader = ReaderBuilder::new()
1315 .build(Cursor::new(bytes))
1316 .expect("build round_trip reader");
1317 let rt_schema = rt_reader.schema();
1318 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1319 let round_trip =
1320 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1321
1322 assert_schema_is_semantically_equivalent(&original.schema(), &round_trip.schema());
1325
1326 assert_batch_data_is_identical(&original, &round_trip);
1327 Ok(())
1328 }
1329
1330 #[test]
1331 fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), ArrowError> {
1332 let path = arrow_test_data("avro/simple_enum.avro");
1334 let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
1335 let reader = ReaderBuilder::new()
1336 .build(BufReader::new(rdr_file))
1337 .expect("build reader for simple_enum.avro");
1338 let in_schema = reader.schema();
1340 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1341 let original =
1342 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1343 let has_enum_dict = in_schema.fields().iter().any(|f| {
1345 matches!(
1346 f.data_type(),
1347 DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8
1348 )
1349 });
1350 assert!(
1351 has_enum_dict,
1352 "Expected at least one enum-mapped Dictionary<Int32, Utf8> field"
1353 );
1354 let buffer: Vec<u8> = Vec::new();
1357 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1358 writer.write(&original)?;
1359 writer.finish()?;
1360 let bytes = writer.into_inner();
1361 let rt_reader = ReaderBuilder::new()
1363 .build(Cursor::new(bytes))
1364 .expect("reader for round-trip");
1365 let rt_schema = rt_reader.schema();
1366 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1367 let roundtrip =
1368 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1369 assert_eq!(roundtrip, original, "Avro enum round-trip mismatch");
1370 Ok(())
1371 }
1372
1373 #[test]
1374 fn test_builder_propagates_capacity_to_writer() -> Result<(), ArrowError> {
1375 let cap = 64 * 1024;
1376 let buffer = Vec::<u8>::new();
1377 let mut writer = WriterBuilder::new(make_schema())
1378 .with_capacity(cap)
1379 .build::<_, AvroOcfFormat>(buffer)?;
1380 assert_eq!(writer.capacity, cap, "builder capacity not propagated");
1381 let batch = make_batch();
1382 writer.write(&batch)?;
1383 writer.finish()?;
1384 let out = writer.into_inner();
1385 assert_eq!(&out[..4], b"Obj\x01", "OCF magic missing/incorrect");
1386 Ok(())
1387 }
1388
1389 #[test]
1390 fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), ArrowError> {
1391 use arrow_array::{ArrayRef, Int32Array};
1392 use arrow_schema::{DataType, Field, Schema};
1393 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1394 let batch = RecordBatch::try_new(
1395 Arc::new(schema.clone()),
1396 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1397 )?;
1398 let cap = 8192;
1399 let mut writer = WriterBuilder::new(schema)
1400 .with_capacity(cap)
1401 .build::<_, AvroSoeFormat>(Vec::new())?;
1402 assert_eq!(writer.capacity, cap);
1403 writer.write(&batch)?;
1404 let _bytes = writer.into_inner();
1405 Ok(())
1406 }
1407
1408 #[cfg(feature = "avro_custom_types")]
1409 #[test]
1410 fn test_roundtrip_duration_logical_types_ocf() -> Result<(), ArrowError> {
1411 let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1412 .join("test/data/duration_logical_types.avro")
1413 .to_string_lossy()
1414 .into_owned();
1415
1416 let in_file = File::open(&file_path)
1417 .unwrap_or_else(|_| panic!("Failed to open test file: {}", file_path));
1418
1419 let reader = ReaderBuilder::new()
1420 .build(BufReader::new(in_file))
1421 .expect("build reader for duration_logical_types.avro");
1422 let in_schema = reader.schema();
1423
1424 let expected_units: HashSet<TimeUnit> = [
1425 TimeUnit::Nanosecond,
1426 TimeUnit::Microsecond,
1427 TimeUnit::Millisecond,
1428 TimeUnit::Second,
1429 ]
1430 .into_iter()
1431 .collect();
1432
1433 let found_units: HashSet<TimeUnit> = in_schema
1434 .fields()
1435 .iter()
1436 .filter_map(|f| match f.data_type() {
1437 DataType::Duration(unit) => Some(*unit),
1438 _ => None,
1439 })
1440 .collect();
1441
1442 assert_eq!(
1443 found_units, expected_units,
1444 "Expected to find all four Duration TimeUnits in the schema from the initial read"
1445 );
1446
1447 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1448 let input =
1449 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1450
1451 let tmp = NamedTempFile::new().expect("create temp file");
1452 {
1453 let out_file = File::create(tmp.path()).expect("create temp avro");
1454 let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?;
1455 writer.write(&input)?;
1456 writer.finish()?;
1457 }
1458
1459 let rt_file = File::open(tmp.path()).expect("open round_trip avro");
1460 let rt_reader = ReaderBuilder::new()
1461 .build(BufReader::new(rt_file))
1462 .expect("build round_trip reader");
1463 let rt_schema = rt_reader.schema();
1464 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1465 let round_trip =
1466 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1467
1468 assert_eq!(round_trip, input);
1469 Ok(())
1470 }
1471
1472 #[cfg(feature = "avro_custom_types")]
1473 #[test]
1474 fn test_run_end_encoded_roundtrip_writer() -> Result<(), ArrowError> {
1475 let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
1476 let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1477 let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
1478 let field = Field::new("x", ree.data_type().clone(), true);
1479 let schema = Schema::new(vec![field]);
1480 let batch = RecordBatch::try_new(
1481 Arc::new(schema.clone()),
1482 vec![Arc::new(ree.clone()) as ArrayRef],
1483 )?;
1484 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1485 writer.write(&batch)?;
1486 writer.finish()?;
1487 let bytes = writer.into_inner();
1488 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1489 let out_schema = reader.schema();
1490 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1491 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1492 assert_eq!(out.num_columns(), 1);
1493 assert_eq!(out.num_rows(), 8);
1494 match out.schema().field(0).data_type() {
1495 DataType::RunEndEncoded(run_ends_field, values_field) => {
1496 assert_eq!(run_ends_field.name(), "run_ends");
1497 assert_eq!(run_ends_field.data_type(), &DataType::Int32);
1498 assert_eq!(values_field.name(), "values");
1499 assert_eq!(values_field.data_type(), &DataType::Int32);
1500 assert!(values_field.is_nullable());
1501 let got_ree = out
1502 .column(0)
1503 .as_any()
1504 .downcast_ref::<RunArray<Int32Type>>()
1505 .expect("RunArray<Int32Type>");
1506 assert_eq!(got_ree, &ree);
1507 }
1508 other => panic!(
1509 "Unexpected DataType for round-tripped RunEndEncoded column: {:?}",
1510 other
1511 ),
1512 }
1513 Ok(())
1514 }
1515
1516 #[cfg(feature = "avro_custom_types")]
1517 #[test]
1518 fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> Result<(), ArrowError>
1519 {
1520 let run_ends = Int16Array::from(vec![2, 5, 7]); let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
1522 let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
1523 let field = Field::new("s", ree.data_type().clone(), true);
1524 let schema = Schema::new(vec![field]);
1525 let batch = RecordBatch::try_new(
1526 Arc::new(schema.clone()),
1527 vec![Arc::new(ree.clone()) as ArrayRef],
1528 )?;
1529 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1530 writer.write(&batch)?;
1531 writer.finish()?;
1532 let bytes = writer.into_inner();
1533 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1534 let out_schema = reader.schema();
1535 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1536 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1537 assert_eq!(out.num_columns(), 1);
1538 assert_eq!(out.num_rows(), 7);
1539 match out.schema().field(0).data_type() {
1540 DataType::RunEndEncoded(run_ends_field, values_field) => {
1541 assert_eq!(run_ends_field.data_type(), &DataType::Int16);
1542 assert_eq!(values_field.data_type(), &DataType::Utf8);
1543 assert!(
1544 values_field.is_nullable(),
1545 "REE 'values' child should be nullable"
1546 );
1547 let got = out
1548 .column(0)
1549 .as_any()
1550 .downcast_ref::<RunArray<Int16Type>>()
1551 .expect("RunArray<Int16Type>");
1552 assert_eq!(got, &ree);
1553 }
1554 other => panic!("Unexpected DataType: {:?}", other),
1555 }
1556 Ok(())
1557 }
1558
1559 #[cfg(feature = "avro_custom_types")]
1560 #[test]
1561 fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer()
1562 -> Result<(), ArrowError> {
1563 let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
1564 let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
1565 let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
1566 let field = Field::new("y", ree.data_type().clone(), true);
1567 let schema = Schema::new(vec![field]);
1568 let batch = RecordBatch::try_new(
1569 Arc::new(schema.clone()),
1570 vec![Arc::new(ree.clone()) as ArrayRef],
1571 )?;
1572 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1573 writer.write(&batch)?;
1574 writer.finish()?;
1575 let bytes = writer.into_inner();
1576 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1577 let out_schema = reader.schema();
1578 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1579 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1580 assert_eq!(out.num_columns(), 1);
1581 assert_eq!(out.num_rows(), 8);
1582 match out.schema().field(0).data_type() {
1583 DataType::RunEndEncoded(run_ends_field, values_field) => {
1584 assert_eq!(run_ends_field.data_type(), &DataType::Int64);
1585 assert_eq!(values_field.data_type(), &DataType::Int32);
1586 assert!(values_field.is_nullable());
1587 let got = out
1588 .column(0)
1589 .as_any()
1590 .downcast_ref::<RunArray<Int64Type>>()
1591 .expect("RunArray<Int64Type>");
1592 assert_eq!(got, &ree);
1593 }
1594 other => panic!("Unexpected DataType for REE column: {:?}", other),
1595 }
1596 Ok(())
1597 }
1598
1599 #[cfg(feature = "avro_custom_types")]
1600 #[test]
1601 fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), ArrowError> {
1602 let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
1603 let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1604 let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
1605 let offset = 1usize;
1606 let length = 6usize;
1607 let base_values = base
1608 .values()
1609 .as_any()
1610 .downcast_ref::<Int32Array>()
1611 .expect("REE values as Int32Array");
1612 let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
1613 for i in offset..offset + length {
1614 let phys = base.get_physical_index(i);
1615 let v = if base_values.is_null(phys) {
1616 None
1617 } else {
1618 Some(base_values.value(phys))
1619 };
1620 logical_window.push(v);
1621 }
1622
1623 fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array, Int32Array) {
1624 if vals.is_empty() {
1625 return (Int32Array::new_null(0), Int32Array::new_null(0));
1626 }
1627 let mut run_ends_out: Vec<i32> = Vec::new();
1628 let mut run_vals_out: Vec<Option<i32>> = Vec::new();
1629 let mut cur = vals[0];
1630 let mut len = 1i32;
1631 for v in &vals[1..] {
1632 if *v == cur {
1633 len += 1;
1634 } else {
1635 let last_end = run_ends_out.last().copied().unwrap_or(0);
1636 run_ends_out.push(last_end + len);
1637 run_vals_out.push(cur);
1638 cur = *v;
1639 len = 1;
1640 }
1641 }
1642 let last_end = run_ends_out.last().copied().unwrap_or(0);
1643 run_ends_out.push(last_end + len);
1644 run_vals_out.push(cur);
1645 (
1646 Int32Array::from(run_ends_out),
1647 Int32Array::from(run_vals_out),
1648 )
1649 }
1650 let (owned_run_ends, owned_run_values) = compress_run_ends_i32(&logical_window);
1651 let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends, &owned_run_values)?;
1652 let field = Field::new("x", owned_slice.data_type().clone(), true);
1653 let schema = Schema::new(vec![field]);
1654 let batch = RecordBatch::try_new(
1655 Arc::new(schema.clone()),
1656 vec![Arc::new(owned_slice.clone()) as ArrayRef],
1657 )?;
1658 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1659 writer.write(&batch)?;
1660 writer.finish()?;
1661 let bytes = writer.into_inner();
1662 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1663 let out_schema = reader.schema();
1664 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1665 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1666 assert_eq!(out.num_columns(), 1);
1667 assert_eq!(out.num_rows(), length);
1668 match out.schema().field(0).data_type() {
1669 DataType::RunEndEncoded(run_ends_field, values_field) => {
1670 assert_eq!(run_ends_field.data_type(), &DataType::Int32);
1671 assert_eq!(values_field.data_type(), &DataType::Int32);
1672 assert!(values_field.is_nullable());
1673 let got = out
1674 .column(0)
1675 .as_any()
1676 .downcast_ref::<RunArray<Int32Type>>()
1677 .expect("RunArray<Int32Type>");
1678 fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
1679 let vals = a
1680 .values()
1681 .as_any()
1682 .downcast_ref::<Int32Array>()
1683 .expect("REE values as Int32Array");
1684 let mut out: Vec<Option<i32>> = Vec::with_capacity(a.len());
1685 for i in 0..a.len() {
1686 let phys = a.get_physical_index(i);
1687 out.push(if vals.is_null(phys) {
1688 None
1689 } else {
1690 Some(vals.value(phys))
1691 });
1692 }
1693 Int32Array::from(out)
1694 }
1695 let got_logical = expand_ree_to_int32(got);
1696 let expected_logical = Int32Array::from(logical_window);
1697 assert_eq!(
1698 got_logical, expected_logical,
1699 "Logical values differ after REE slice round-trip"
1700 );
1701 }
1702 other => panic!("Unexpected DataType for REE column: {:?}", other),
1703 }
1704 Ok(())
1705 }
1706
1707 #[cfg(not(feature = "avro_custom_types"))]
1708 #[test]
1709 fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), ArrowError> {
1710 use arrow_schema::{DataType, Field, Schema};
1711 let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
1712 let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1713 let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
1714 &run_ends,
1715 &run_values,
1716 )?;
1717 let field = Field::new("x", ree.data_type().clone(), true);
1718 let schema = Schema::new(vec![field]);
1719 let batch =
1720 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1721 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1722 writer.write(&batch)?;
1723 writer.finish()?;
1724 let bytes = writer.into_inner();
1725 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1726 let out_schema = reader.schema();
1727 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1728 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1729 assert_eq!(out.num_columns(), 1);
1730 assert_eq!(out.num_rows(), 8);
1731 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
1732 let got = out
1733 .column(0)
1734 .as_any()
1735 .downcast_ref::<Int32Array>()
1736 .expect("Int32Array");
1737 let expected = Int32Array::from(vec![
1738 Some(1),
1739 Some(1),
1740 Some(1),
1741 Some(2),
1742 Some(2),
1743 None,
1744 None,
1745 Some(3),
1746 ]);
1747 assert_eq!(got, &expected);
1748 Ok(())
1749 }
1750
1751 #[cfg(not(feature = "avro_custom_types"))]
1752 #[test]
1753 fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
1754 -> Result<(), ArrowError> {
1755 use arrow_schema::{DataType, Field, Schema};
1756 let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
1757 let run_values = arrow_array::StringArray::from(vec![Some("a"), None, Some("c")]);
1758 let ree = arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
1759 &run_ends,
1760 &run_values,
1761 )?;
1762 let field = Field::new("s", ree.data_type().clone(), true);
1763 let schema = Schema::new(vec![field]);
1764 let batch =
1765 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1766 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1767 writer.write(&batch)?;
1768 writer.finish()?;
1769 let bytes = writer.into_inner();
1770 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1771 let out_schema = reader.schema();
1772 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1773 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1774 assert_eq!(out.num_columns(), 1);
1775 assert_eq!(out.num_rows(), 7);
1776 assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
1777 let got = out
1778 .column(0)
1779 .as_any()
1780 .downcast_ref::<arrow_array::StringArray>()
1781 .expect("StringArray");
1782 let expected = arrow_array::StringArray::from(vec![
1783 Some("a"),
1784 Some("a"),
1785 None,
1786 None,
1787 None,
1788 Some("c"),
1789 Some("c"),
1790 ]);
1791 assert_eq!(got, &expected);
1792 Ok(())
1793 }
1794
1795 #[cfg(not(feature = "avro_custom_types"))]
1796 #[test]
1797 fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
1798 -> Result<(), ArrowError> {
1799 use arrow_schema::{DataType, Field, Schema};
1800 let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
1801 let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
1802 let ree = arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
1803 &run_ends,
1804 &run_values,
1805 )?;
1806 let field = Field::new("y", ree.data_type().clone(), true);
1807 let schema = Schema::new(vec![field]);
1808 let batch =
1809 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1810 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1811 writer.write(&batch)?;
1812 writer.finish()?;
1813 let bytes = writer.into_inner();
1814 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1815 let out_schema = reader.schema();
1816 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1817 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1818 assert_eq!(out.num_columns(), 1);
1819 assert_eq!(out.num_rows(), 8);
1820 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
1821 let got = out
1822 .column(0)
1823 .as_any()
1824 .downcast_ref::<Int32Array>()
1825 .expect("Int32Array");
1826 let expected = Int32Array::from(vec![
1827 Some(999),
1828 Some(999),
1829 Some(999),
1830 Some(999),
1831 Some(-5),
1832 Some(-5),
1833 Some(-5),
1834 Some(-5),
1835 ]);
1836 assert_eq!(got, &expected);
1837 Ok(())
1838 }
1839
1840 #[cfg(not(feature = "avro_custom_types"))]
1841 #[test]
1842 fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> Result<(), ArrowError> {
1843 use arrow_schema::{DataType, Field, Schema};
1844 let run_ends = Int32Array::from(vec![2, 4, 6]);
1845 let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
1846 let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
1847 &run_ends,
1848 &run_values,
1849 )?;
1850 let field = Field::new("x", ree.data_type().clone(), true);
1851 let schema = Schema::new(vec![field]);
1852 let batch =
1853 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1854 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1855 writer.write(&batch)?;
1856 writer.finish()?;
1857 let bytes = writer.into_inner();
1858 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1859 let out_schema = reader.schema();
1860 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1861 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1862 assert_eq!(out.num_columns(), 1);
1863 assert_eq!(out.num_rows(), 6);
1864 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
1865 let got = out
1866 .column(0)
1867 .as_any()
1868 .downcast_ref::<Int32Array>()
1869 .expect("Int32Array");
1870 let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), Some(2), None, None]);
1871 assert_eq!(got, &expected);
1872 Ok(())
1873 }
1874
1875 #[test]
1876 #[cfg(feature = "snappy")]
1878 fn test_nullable_impala_roundtrip() -> Result<(), ArrowError> {
1879 let path = arrow_test_data("avro/nullable.impala.avro");
1880 let rdr_file = File::open(&path).expect("open avro/nullable.impala.avro");
1881 let reader = ReaderBuilder::new()
1882 .build(BufReader::new(rdr_file))
1883 .expect("build reader for nullable.impala.avro");
1884 let in_schema = reader.schema();
1885 assert!(
1886 in_schema.fields().iter().any(|f| f.is_nullable()),
1887 "expected at least one nullable field in avro/nullable.impala.avro"
1888 );
1889 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1890 let original =
1891 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1892 let buffer: Vec<u8> = Vec::new();
1893 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1894 writer.write(&original)?;
1895 writer.finish()?;
1896 let out_bytes = writer.into_inner();
1897 let rt_reader = ReaderBuilder::new()
1898 .build(Cursor::new(out_bytes))
1899 .expect("build reader for round-tripped in-memory OCF");
1900 let rt_schema = rt_reader.schema();
1901 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1902 let roundtrip =
1903 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1904 assert_eq!(
1905 roundtrip, original,
1906 "Round-trip Avro data mismatch for nullable.impala.avro"
1907 );
1908 Ok(())
1909 }
1910
1911 #[test]
1912 #[cfg(feature = "snappy")]
1913 fn test_datapage_v2_roundtrip() -> Result<(), ArrowError> {
1914 let path = arrow_test_data("avro/datapage_v2.snappy.avro");
1915 let rdr_file = File::open(&path).expect("open avro/datapage_v2.snappy.avro");
1916 let reader = ReaderBuilder::new()
1917 .build(BufReader::new(rdr_file))
1918 .expect("build reader for datapage_v2.snappy.avro");
1919 let in_schema = reader.schema();
1920 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1921 let original =
1922 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1923 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1924 writer.write(&original)?;
1925 writer.finish()?;
1926 let bytes = writer.into_inner();
1927 let rt_reader = ReaderBuilder::new()
1928 .build(Cursor::new(bytes))
1929 .expect("build round-trip reader");
1930 let rt_schema = rt_reader.schema();
1931 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1932 let round_trip =
1933 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1934 assert_eq!(
1935 round_trip, original,
1936 "Round-trip batch mismatch for datapage_v2.snappy.avro"
1937 );
1938 Ok(())
1939 }
1940
1941 #[test]
1942 #[cfg(feature = "snappy")]
1943 fn test_single_nan_roundtrip() -> Result<(), ArrowError> {
1944 let path = arrow_test_data("avro/single_nan.avro");
1945 let in_file = File::open(&path).expect("open avro/single_nan.avro");
1946 let reader = ReaderBuilder::new()
1947 .build(BufReader::new(in_file))
1948 .expect("build reader for single_nan.avro");
1949 let in_schema = reader.schema();
1950 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
1951 let original =
1952 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
1953 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1954 writer.write(&original)?;
1955 writer.finish()?;
1956 let bytes = writer.into_inner();
1957 let rt_reader = ReaderBuilder::new()
1958 .build(Cursor::new(bytes))
1959 .expect("build round_trip reader");
1960 let rt_schema = rt_reader.schema();
1961 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1962 let round_trip =
1963 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1964 assert_eq!(
1965 round_trip, original,
1966 "Round-trip batch mismatch for avro/single_nan.avro"
1967 );
1968 Ok(())
1969 }
1970 #[test]
1971 #[cfg(feature = "snappy")]
1973 fn test_dict_pages_offset_zero_roundtrip() -> Result<(), ArrowError> {
1974 let path = arrow_test_data("avro/dict-page-offset-zero.avro");
1975 let rdr_file = File::open(&path).expect("open avro/dict-page-offset-zero.avro");
1976 let reader = ReaderBuilder::new()
1977 .build(BufReader::new(rdr_file))
1978 .expect("build reader for dict-page-offset-zero.avro");
1979 let in_schema = reader.schema();
1980 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1981 let original =
1982 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1983 let buffer: Vec<u8> = Vec::new();
1984 let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1985 writer.write(&original)?;
1986 writer.finish()?;
1987 let bytes = writer.into_inner();
1988 let rt_reader = ReaderBuilder::new()
1989 .build(Cursor::new(bytes))
1990 .expect("build reader for round-trip");
1991 let rt_schema = rt_reader.schema();
1992 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1993 let roundtrip =
1994 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1995 assert_eq!(
1996 roundtrip, original,
1997 "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
1998 );
1999 Ok(())
2000 }
2001
2002 #[test]
2003 #[cfg(feature = "snappy")]
2004 fn test_repeated_no_annotation_roundtrip() -> Result<(), ArrowError> {
2005 let path = arrow_test_data("avro/repeated_no_annotation.avro");
2006 let in_file = File::open(&path).expect("open avro/repeated_no_annotation.avro");
2007 let reader = ReaderBuilder::new()
2008 .build(BufReader::new(in_file))
2009 .expect("build reader for repeated_no_annotation.avro");
2010 let in_schema = reader.schema();
2011 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2012 let original =
2013 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2014 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2015 writer.write(&original)?;
2016 writer.finish()?;
2017 let bytes = writer.into_inner();
2018 let rt_reader = ReaderBuilder::new()
2019 .build(Cursor::new(bytes))
2020 .expect("build reader for round-trip buffer");
2021 let rt_schema = rt_reader.schema();
2022 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2023 let round_trip =
2024 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round-trip");
2025 assert_eq!(
2026 round_trip, original,
2027 "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
2028 );
2029 Ok(())
2030 }
2031
2032 #[test]
2033 fn test_nested_record_type_reuse_roundtrip() -> Result<(), ArrowError> {
2034 let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2035 .join("test/data/nested_record_reuse.avro")
2036 .to_string_lossy()
2037 .into_owned();
2038 let in_file = File::open(&path).expect("open avro/nested_record_reuse.avro");
2039 let reader = ReaderBuilder::new()
2040 .build(BufReader::new(in_file))
2041 .expect("build reader for nested_record_reuse.avro");
2042 let in_schema = reader.schema();
2043 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2044 let input =
2045 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2046 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2047 writer.write(&input)?;
2048 writer.finish()?;
2049 let bytes = writer.into_inner();
2050 let rt_reader = ReaderBuilder::new()
2051 .build(Cursor::new(bytes))
2052 .expect("build round_trip reader");
2053 let rt_schema = rt_reader.schema();
2054 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2055 let round_trip =
2056 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2057 assert_eq!(
2058 round_trip, input,
2059 "Round-trip batch mismatch for nested_record_reuse.avro"
2060 );
2061 Ok(())
2062 }
2063
2064 #[test]
2065 fn test_enum_type_reuse_roundtrip() -> Result<(), ArrowError> {
2066 let path =
2067 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
2068 let rdr_file = std::fs::File::open(&path).expect("open test/data/enum_reuse.avro");
2069 let reader = ReaderBuilder::new()
2070 .build(std::io::BufReader::new(rdr_file))
2071 .expect("build reader for enum_reuse.avro");
2072 let in_schema = reader.schema();
2073 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2074 let original =
2075 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2076 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2077 writer.write(&original)?;
2078 writer.finish()?;
2079 let bytes = writer.into_inner();
2080 let rt_reader = ReaderBuilder::new()
2081 .build(std::io::Cursor::new(bytes))
2082 .expect("build round_trip reader");
2083 let rt_schema = rt_reader.schema();
2084 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2085 let round_trip =
2086 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2087 assert_eq!(
2088 round_trip, original,
2089 "Avro enum type reuse round-trip mismatch"
2090 );
2091 Ok(())
2092 }
2093
2094 #[test]
2095 fn comprehensive_e2e_test_roundtrip() -> Result<(), ArrowError> {
2096 let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2097 .join("test/data/comprehensive_e2e.avro");
2098 let rdr_file = File::open(&path).expect("open test/data/comprehensive_e2e.avro");
2099 let reader = ReaderBuilder::new()
2100 .build(BufReader::new(rdr_file))
2101 .expect("build reader for comprehensive_e2e.avro");
2102 let in_schema = reader.schema();
2103 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2104 let original =
2105 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2106 let sink: Vec<u8> = Vec::new();
2107 let mut writer = AvroWriter::new(sink, original.schema().as_ref().clone())?;
2108 writer.write(&original)?;
2109 writer.finish()?;
2110 let bytes = writer.into_inner();
2111 let rt_reader = ReaderBuilder::new()
2112 .build(Cursor::new(bytes))
2113 .expect("build round-trip reader");
2114 let rt_schema = rt_reader.schema();
2115 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2116 let roundtrip =
2117 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2118 assert_eq!(
2119 roundtrip, original,
2120 "Round-trip batch mismatch for comprehensive_e2e.avro"
2121 );
2122 Ok(())
2123 }
2124
2125 #[test]
2126 fn test_roundtrip_new_time_encoders_writer() -> Result<(), ArrowError> {
2127 let schema = Schema::new(vec![
2128 Field::new("d32", DataType::Date32, false),
2129 Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), false),
2130 Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), false),
2131 Field::new(
2132 "ts_ms",
2133 DataType::Timestamp(TimeUnit::Millisecond, None),
2134 false,
2135 ),
2136 Field::new(
2137 "ts_us",
2138 DataType::Timestamp(TimeUnit::Microsecond, None),
2139 false,
2140 ),
2141 Field::new(
2142 "ts_ns",
2143 DataType::Timestamp(TimeUnit::Nanosecond, None),
2144 false,
2145 ),
2146 ]);
2147 let d32 = Date32Array::from(vec![0, 1, -1]);
2148 let t32_ms: PrimitiveArray<Time32MillisecondType> =
2149 vec![0_i32, 12_345_i32, 86_399_999_i32].into();
2150 let t64_us: PrimitiveArray<Time64MicrosecondType> =
2151 vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
2152 let ts_ms: PrimitiveArray<TimestampMillisecondType> =
2153 vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
2154 let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2155 let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2156 let batch = RecordBatch::try_new(
2157 Arc::new(schema.clone()),
2158 vec![
2159 Arc::new(d32) as ArrayRef,
2160 Arc::new(t32_ms) as ArrayRef,
2161 Arc::new(t64_us) as ArrayRef,
2162 Arc::new(ts_ms) as ArrayRef,
2163 Arc::new(ts_us) as ArrayRef,
2164 Arc::new(ts_ns) as ArrayRef,
2165 ],
2166 )?;
2167 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2168 writer.write(&batch)?;
2169 writer.finish()?;
2170 let bytes = writer.into_inner();
2171 let rt_reader = ReaderBuilder::new()
2172 .build(std::io::Cursor::new(bytes))
2173 .expect("build reader for round-trip of new time encoders");
2174 let rt_schema = rt_reader.schema();
2175 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2176 let roundtrip =
2177 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2178 assert_eq!(roundtrip, batch);
2179 Ok(())
2180 }
2181}