1use crate::codec::AvroFieldBuilder;
66use crate::compression::CompressionCodec;
67use crate::schema::{
68 AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY,
69};
70use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
71use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
72use arrow_array::RecordBatch;
73use arrow_schema::{ArrowError, Schema};
74use std::io::Write;
75use std::sync::Arc;
76
77mod encoder;
79pub mod format;
81
82#[derive(Debug, Clone)]
84pub struct WriterBuilder {
85 schema: Schema,
86 codec: Option<CompressionCodec>,
87 capacity: usize,
88 fingerprint_strategy: Option<FingerprintStrategy>,
89}
90
91impl WriterBuilder {
92 pub fn new(schema: Schema) -> Self {
99 Self {
100 schema,
101 codec: None,
102 capacity: 1024,
103 fingerprint_strategy: None,
104 }
105 }
106
107 pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self {
110 self.fingerprint_strategy = Some(strategy);
111 self
112 }
113
114 pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self {
116 self.codec = codec;
117 self
118 }
119
120 pub fn with_capacity(mut self, capacity: usize) -> Self {
122 self.capacity = capacity;
123 self
124 }
125
126 pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, ArrowError>
129 where
130 W: Write,
131 F: AvroFormat,
132 {
133 let mut format = F::default();
134 let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
135 Some(json) => AvroSchema::new(json.clone()),
136 None => AvroSchema::try_from(&self.schema)?,
137 };
138 let maybe_fingerprint = if F::NEEDS_PREFIX {
139 match self.fingerprint_strategy {
140 Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)),
141 Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(id)),
142 Some(strategy) => {
143 Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?)
144 }
145 None => Some(
146 avro_schema
147 .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?,
148 ),
149 }
150 } else {
151 None
152 };
153 let mut md = self.schema.metadata().clone();
154 md.insert(
155 SCHEMA_METADATA_KEY.to_string(),
156 avro_schema.clone().json_string,
157 );
158 let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
159 format.start_stream(&mut writer, &schema, self.codec)?;
160 let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
161 let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
162 .with_fingerprint(maybe_fingerprint)
163 .build()?;
164 Ok(Writer {
165 writer,
166 schema,
167 format,
168 compression: self.codec,
169 capacity: self.capacity,
170 encoder,
171 })
172 }
173}
174
175#[derive(Debug)]
183pub struct Writer<W: Write, F: AvroFormat> {
184 writer: W,
185 schema: Arc<Schema>,
186 format: F,
187 compression: Option<CompressionCodec>,
188 capacity: usize,
189 encoder: RecordEncoder,
190}
191
192pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
233
234pub type AvroStreamWriter<W> = Writer<W, AvroSoeFormat>;
266
267impl<W: Write> Writer<W, AvroOcfFormat> {
268 pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
294 WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
295 }
296
297 pub fn sync_marker(&self) -> Option<&[u8; 16]> {
299 self.format.sync_marker()
300 }
301}
302
303impl<W: Write> Writer<W, AvroSoeFormat> {
304 pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
332 WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
333 }
334}
335
336impl<W: Write, F: AvroFormat> Writer<W, F> {
337 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
339 if batch.schema().fields() != self.schema.fields() {
340 return Err(ArrowError::SchemaError(
341 "Schema of RecordBatch differs from Writer schema".to_string(),
342 ));
343 }
344 match self.format.sync_marker() {
345 Some(&sync) => self.write_ocf_block(batch, &sync),
346 None => self.write_stream(batch),
347 }
348 }
349
350 pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
354 for b in batches {
355 self.write(b)?;
356 }
357 Ok(())
358 }
359
360 pub fn finish(&mut self) -> Result<(), ArrowError> {
362 self.writer
363 .flush()
364 .map_err(|e| ArrowError::IoError(format!("Error flushing writer: {e}"), e))
365 }
366
367 pub fn into_inner(self) -> W {
369 self.writer
370 }
371
372 fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), ArrowError> {
373 let mut buf = Vec::<u8>::with_capacity(self.capacity);
374 self.encoder.encode(&mut buf, batch)?;
375 let encoded = match self.compression {
376 Some(codec) => codec.compress(&buf)?,
377 None => buf,
378 };
379 write_long(&mut self.writer, batch.num_rows() as i64)?;
380 write_long(&mut self.writer, encoded.len() as i64)?;
381 self.writer
382 .write_all(&encoded)
383 .map_err(|e| ArrowError::IoError(format!("Error writing Avro block: {e}"), e))?;
384 self.writer
385 .write_all(sync)
386 .map_err(|e| ArrowError::IoError(format!("Error writing Avro sync: {e}"), e))?;
387 Ok(())
388 }
389
390 fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
391 self.encoder.encode(&mut self.writer, batch)?;
392 Ok(())
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use crate::compression::CompressionCodec;
400 use crate::reader::ReaderBuilder;
401 use crate::schema::{AvroSchema, SchemaStore};
402 use crate::test_util::arrow_test_data;
403 use arrow::datatypes::TimeUnit;
404 #[cfg(feature = "avro_custom_types")]
405 use arrow_array::types::{Int16Type, Int32Type, Int64Type};
406 use arrow_array::types::{
407 Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
408 TimestampMillisecondType, TimestampNanosecondType,
409 };
410 use arrow_array::{
411 Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray, RecordBatch,
412 StringArray, StructArray, UnionArray,
413 };
414 #[cfg(feature = "avro_custom_types")]
415 use arrow_array::{Int16Array, Int64Array, RunArray};
416 use arrow_schema::UnionMode;
417 #[cfg(not(feature = "avro_custom_types"))]
418 use arrow_schema::{DataType, Field, Schema};
419 #[cfg(feature = "avro_custom_types")]
420 use arrow_schema::{DataType, Field, Schema};
421 use std::collections::HashMap;
422 use std::collections::HashSet;
423 use std::fs::File;
424 use std::io::{BufReader, Cursor};
425 use std::path::PathBuf;
426 use std::sync::Arc;
427 use tempfile::NamedTempFile;
428
429 fn files() -> impl Iterator<Item = &'static str> {
430 [
431 #[cfg(feature = "snappy")]
433 "avro/alltypes_plain.avro",
434 #[cfg(feature = "snappy")]
435 "avro/alltypes_plain.snappy.avro",
436 #[cfg(feature = "zstd")]
437 "avro/alltypes_plain.zstandard.avro",
438 #[cfg(feature = "bzip2")]
439 "avro/alltypes_plain.bzip2.avro",
440 #[cfg(feature = "xz")]
441 "avro/alltypes_plain.xz.avro",
442 ]
443 .into_iter()
444 }
445
446 fn make_schema() -> Schema {
447 Schema::new(vec![
448 Field::new("id", DataType::Int32, false),
449 Field::new("name", DataType::Binary, false),
450 ])
451 }
452
453 fn make_batch() -> RecordBatch {
454 let ids = Int32Array::from(vec![1, 2, 3]);
455 let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]);
456 RecordBatch::try_new(
457 Arc::new(make_schema()),
458 vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
459 )
460 .expect("failed to build test RecordBatch")
461 }
462
463 #[test]
464 fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), ArrowError> {
465 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
466 let batch = RecordBatch::try_new(
467 Arc::new(schema.clone()),
468 vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
469 )?;
470 let buf: Vec<u8> = Vec::new();
471 let mut writer = AvroStreamWriter::new(buf, schema.clone())?;
472 writer.write(&batch)?;
473 let encoded = writer.into_inner();
474 let mut store = SchemaStore::new(); let avro_schema = AvroSchema::try_from(&schema)?;
476 let _fp = store.register(avro_schema)?;
477 let mut decoder = ReaderBuilder::new()
478 .with_writer_schema_store(store)
479 .build_decoder()?;
480 let _consumed = decoder.decode(&encoded)?;
481 let decoded = decoder
482 .flush()?
483 .expect("expected at least one batch from decoder");
484 assert_eq!(decoded.num_columns(), 1);
485 assert_eq!(decoded.num_rows(), 2);
486 let col = decoded
487 .column(0)
488 .as_any()
489 .downcast_ref::<Int32Array>()
490 .expect("int column");
491 assert_eq!(col, &Int32Array::from(vec![10, 20]));
492 Ok(())
493 }
494
495 #[test]
496 fn test_nullable_struct_with_nonnullable_field_sliced_encoding() {
497 use arrow_array::{ArrayRef, Int32Array, StringArray, StructArray};
498 use arrow_buffer::NullBuffer;
499 use arrow_schema::{DataType, Field, Fields, Schema};
500 use std::sync::Arc;
501 let inner_fields = Fields::from(vec![
502 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, true), ]);
505 let inner_struct_type = DataType::Struct(inner_fields.clone());
506 let schema = Schema::new(vec![
507 Field::new("before", inner_struct_type.clone(), true), Field::new("after", inner_struct_type.clone(), true), Field::new("op", DataType::Utf8, false), ]);
511 let before_ids = Int32Array::from(vec![None, None]);
512 let before_names = StringArray::from(vec![None::<&str>, None]);
513 let before_struct = StructArray::new(
514 inner_fields.clone(),
515 vec![
516 Arc::new(before_ids) as ArrayRef,
517 Arc::new(before_names) as ArrayRef,
518 ],
519 Some(NullBuffer::from(vec![false, false])),
520 );
521 let after_ids = Int32Array::from(vec![1, 2]); let after_names = StringArray::from(vec![Some("Alice"), Some("Bob")]);
523 let after_struct = StructArray::new(
524 inner_fields.clone(),
525 vec![
526 Arc::new(after_ids) as ArrayRef,
527 Arc::new(after_names) as ArrayRef,
528 ],
529 Some(NullBuffer::from(vec![true, true])),
530 );
531 let op_col = StringArray::from(vec!["r", "r"]);
532 let batch = RecordBatch::try_new(
533 Arc::new(schema.clone()),
534 vec![
535 Arc::new(before_struct) as ArrayRef,
536 Arc::new(after_struct) as ArrayRef,
537 Arc::new(op_col) as ArrayRef,
538 ],
539 )
540 .expect("failed to create test batch");
541 let mut sink = Vec::new();
542 let mut writer = WriterBuilder::new(schema)
543 .with_fingerprint_strategy(FingerprintStrategy::Id(1))
544 .build::<_, AvroSoeFormat>(&mut sink)
545 .expect("failed to create writer");
546 for row_idx in 0..batch.num_rows() {
547 let single_row = batch.slice(row_idx, 1);
548 let after_col = single_row.column(1);
549 assert_eq!(
550 after_col.null_count(),
551 0,
552 "after column should have no nulls in sliced row"
553 );
554 writer
555 .write(&single_row)
556 .unwrap_or_else(|e| panic!("Failed to encode row {row_idx}: {e}"));
557 }
558 writer.finish().expect("failed to finish writer");
559 assert!(!sink.is_empty(), "encoded output should not be empty");
560 }
561
562 #[test]
563 fn test_nullable_struct_with_decimal_and_timestamp_sliced() {
564 use arrow_array::{
565 ArrayRef, Decimal128Array, Int32Array, StringArray, StructArray,
566 TimestampMicrosecondArray,
567 };
568 use arrow_buffer::NullBuffer;
569 use arrow_schema::{DataType, Field, Fields, Schema};
570 use std::sync::Arc;
571 let row_fields = Fields::from(vec![
572 Field::new("id", DataType::Int32, false),
573 Field::new("name", DataType::Utf8, true),
574 Field::new("category", DataType::Utf8, true),
575 Field::new("price", DataType::Decimal128(10, 2), true),
576 Field::new("stock_quantity", DataType::Int32, true),
577 Field::new(
578 "created_at",
579 DataType::Timestamp(TimeUnit::Microsecond, None),
580 true,
581 ),
582 ]);
583 let row_struct_type = DataType::Struct(row_fields.clone());
584 let schema = Schema::new(vec![
585 Field::new("before", row_struct_type.clone(), true),
586 Field::new("after", row_struct_type.clone(), true),
587 Field::new("op", DataType::Utf8, false),
588 ]);
589 let before_struct = StructArray::new_null(row_fields.clone(), 2);
590 let ids = Int32Array::from(vec![1, 2]);
591 let names = StringArray::from(vec![Some("Widget"), Some("Gadget")]);
592 let categories = StringArray::from(vec![Some("Electronics"), Some("Electronics")]);
593 let prices = Decimal128Array::from(vec![Some(1999), Some(2999)])
594 .with_precision_and_scale(10, 2)
595 .unwrap();
596 let quantities = Int32Array::from(vec![Some(100), Some(50)]);
597 let timestamps = TimestampMicrosecondArray::from(vec![
598 Some(1700000000000000i64),
599 Some(1700000001000000i64),
600 ]);
601 let after_struct = StructArray::new(
602 row_fields.clone(),
603 vec![
604 Arc::new(ids) as ArrayRef,
605 Arc::new(names) as ArrayRef,
606 Arc::new(categories) as ArrayRef,
607 Arc::new(prices) as ArrayRef,
608 Arc::new(quantities) as ArrayRef,
609 Arc::new(timestamps) as ArrayRef,
610 ],
611 Some(NullBuffer::from(vec![true, true])),
612 );
613 let op_col = StringArray::from(vec!["r", "r"]);
614 let batch = RecordBatch::try_new(
615 Arc::new(schema.clone()),
616 vec![
617 Arc::new(before_struct) as ArrayRef,
618 Arc::new(after_struct) as ArrayRef,
619 Arc::new(op_col) as ArrayRef,
620 ],
621 )
622 .expect("failed to create products batch");
623 let mut sink = Vec::new();
624 let mut writer = WriterBuilder::new(schema)
625 .with_fingerprint_strategy(FingerprintStrategy::Id(1))
626 .build::<_, AvroSoeFormat>(&mut sink)
627 .expect("failed to create writer");
628 for row_idx in 0..batch.num_rows() {
630 let single_row = batch.slice(row_idx, 1);
631 writer
632 .write(&single_row)
633 .unwrap_or_else(|e| panic!("Failed to encode product row {row_idx}: {e}"));
634 }
635 writer.finish().expect("failed to finish writer");
636 assert!(!sink.is_empty());
637 }
638
639 #[test]
640 fn non_nullable_child_in_nullable_struct_should_encode_per_row() {
641 use arrow_array::{
642 ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
643 };
644 use arrow_schema::{DataType, Field, Fields, Schema};
645 use std::sync::Arc;
646 let row_fields = Fields::from(vec![
647 Field::new("id", DataType::Int32, false),
648 Field::new("name", DataType::Utf8, true),
649 ]);
650 let row_struct_dt = DataType::Struct(row_fields.clone());
651 let before: ArrayRef = Arc::new(StructArray::new_null(row_fields.clone(), 1));
652 let id_col: ArrayRef = Arc::new(Int32Array::from(vec![1]));
653 let name_col: ArrayRef = Arc::new(StringArray::from(vec![None::<&str>]));
654 let after: ArrayRef = Arc::new(StructArray::new(
655 row_fields.clone(),
656 vec![id_col, name_col],
657 None,
658 ));
659 let schema = Arc::new(Schema::new(vec![
660 Field::new("before", row_struct_dt.clone(), true),
661 Field::new("after", row_struct_dt, true),
662 Field::new("op", DataType::Utf8, false),
663 Field::new("ts_ms", DataType::Int64, false),
664 ]));
665 let op = Arc::new(StringArray::from(vec!["r"])) as ArrayRef;
666 let ts_ms = Arc::new(Int64Array::from(vec![1732900000000_i64])) as ArrayRef;
667 let batch = RecordBatch::try_new(schema.clone(), vec![before, after, op, ts_ms]).unwrap();
668 let mut buf = Vec::new();
669 let mut writer = WriterBuilder::new(schema.as_ref().clone())
670 .build::<_, AvroSoeFormat>(&mut buf)
671 .unwrap();
672 let single = batch.slice(0, 1);
673 let res = writer.write(&single);
674 assert!(
675 res.is_ok(),
676 "expected to encode successfully, got: {:?}",
677 res.err()
678 );
679 }
680
681 #[test]
682 fn test_union_nonzero_type_ids() -> Result<(), ArrowError> {
683 use arrow_array::UnionArray;
684 use arrow_buffer::Buffer;
685 use arrow_schema::UnionFields;
686 let union_fields = UnionFields::try_new(
687 vec![2, 5],
688 vec![
689 Field::new("v_str", DataType::Utf8, true),
690 Field::new("v_int", DataType::Int32, true),
691 ],
692 )
693 .unwrap();
694 let strings = StringArray::from(vec!["hello", "world"]);
695 let ints = Int32Array::from(vec![10, 20, 30]);
696 let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
697 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
698 let union_array = UnionArray::try_new(
699 union_fields.clone(),
700 type_ids.into(),
701 Some(offsets.into()),
702 vec![Arc::new(strings) as ArrayRef, Arc::new(ints) as ArrayRef],
703 )?;
704 let schema = Schema::new(vec![Field::new(
705 "union_col",
706 DataType::Union(union_fields, UnionMode::Dense),
707 false,
708 )]);
709 let batch = RecordBatch::try_new(
710 Arc::new(schema.clone()),
711 vec![Arc::new(union_array) as ArrayRef],
712 )?;
713 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
714 assert!(
715 writer.write(&batch).is_ok(),
716 "Expected no error from writing"
717 );
718 writer.finish()?;
719 assert!(
720 writer.finish().is_ok(),
721 "Expected no error from finishing writer"
722 );
723 Ok(())
724 }
725
726 #[test]
727 fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), ArrowError> {
728 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
729 let batch = RecordBatch::try_new(
730 Arc::new(schema.clone()),
731 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
732 )?;
733 let schema_id: u32 = 42;
734 let mut writer = WriterBuilder::new(schema.clone())
735 .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
736 .build::<_, AvroSoeFormat>(Vec::new())?;
737 writer.write(&batch)?;
738 let encoded = writer.into_inner();
739 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
740 let avro_schema = AvroSchema::try_from(&schema)?;
741 let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?;
742 let mut decoder = ReaderBuilder::new()
743 .with_writer_schema_store(store)
744 .build_decoder()?;
745 let _ = decoder.decode(&encoded)?;
746 let decoded = decoder
747 .flush()?
748 .expect("expected at least one batch from decoder");
749 assert_eq!(decoded.num_columns(), 1);
750 assert_eq!(decoded.num_rows(), 3);
751 let col = decoded
752 .column(0)
753 .as_any()
754 .downcast_ref::<Int32Array>()
755 .expect("int column");
756 assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
757 Ok(())
758 }
759
760 #[test]
761 fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), ArrowError> {
762 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
763 let batch = RecordBatch::try_new(
764 Arc::new(schema.clone()),
765 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
766 )?;
767 let schema_id: u64 = 42;
768 let mut writer = WriterBuilder::new(schema.clone())
769 .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
770 .build::<_, AvroSoeFormat>(Vec::new())?;
771 writer.write(&batch)?;
772 let encoded = writer.into_inner();
773 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
774 let avro_schema = AvroSchema::try_from(&schema)?;
775 let _ = store.set(Fingerprint::Id64(schema_id), avro_schema)?;
776 let mut decoder = ReaderBuilder::new()
777 .with_writer_schema_store(store)
778 .build_decoder()?;
779 let _ = decoder.decode(&encoded)?;
780 let decoded = decoder
781 .flush()?
782 .expect("expected at least one batch from decoder");
783 assert_eq!(decoded.num_columns(), 1);
784 assert_eq!(decoded.num_rows(), 3);
785 let col = decoded
786 .column(0)
787 .as_any()
788 .downcast_ref::<Int32Array>()
789 .expect("int column");
790 assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
791 Ok(())
792 }
793
794 #[test]
795 fn test_ocf_writer_generates_header_and_sync() -> Result<(), ArrowError> {
796 let batch = make_batch();
797 let buffer: Vec<u8> = Vec::new();
798 let mut writer = AvroWriter::new(buffer, make_schema())?;
799 writer.write(&batch)?;
800 writer.finish()?;
801 let out = writer.into_inner();
802 assert_eq!(&out[..4], b"Obj\x01", "OCF magic bytes missing/incorrect");
803 let trailer = &out[out.len() - 16..];
804 assert_eq!(trailer.len(), 16, "expected 16‑byte sync marker");
805 Ok(())
806 }
807
808 #[test]
809 fn test_schema_mismatch_yields_error() {
810 let batch = make_batch();
811 let alt_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
812 let buffer = Vec::<u8>::new();
813 let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
814 let err = writer.write(&batch).unwrap_err();
815 assert!(matches!(err, ArrowError::SchemaError(_)));
816 }
817
818 #[test]
819 fn test_write_batches_accumulates_multiple() -> Result<(), ArrowError> {
820 let batch1 = make_batch();
821 let batch2 = make_batch();
822 let buffer = Vec::<u8>::new();
823 let mut writer = AvroWriter::new(buffer, make_schema())?;
824 writer.write_batches(&[&batch1, &batch2])?;
825 writer.finish()?;
826 let out = writer.into_inner();
827 assert!(out.len() > 4, "combined batches produced tiny file");
828 Ok(())
829 }
830
831 #[test]
832 fn test_finish_without_write_adds_header() -> Result<(), ArrowError> {
833 let buffer = Vec::<u8>::new();
834 let mut writer = AvroWriter::new(buffer, make_schema())?;
835 writer.finish()?;
836 let out = writer.into_inner();
837 assert_eq!(&out[..4], b"Obj\x01", "finish() should emit OCF header");
838 Ok(())
839 }
840
841 #[test]
842 fn test_write_long_encodes_zigzag_varint() -> Result<(), ArrowError> {
843 let mut buf = Vec::new();
844 write_long(&mut buf, 0)?;
845 write_long(&mut buf, -1)?;
846 write_long(&mut buf, 1)?;
847 write_long(&mut buf, -2)?;
848 write_long(&mut buf, 2147483647)?;
849 assert!(
850 buf.starts_with(&[0x00, 0x01, 0x02, 0x03]),
851 "zig‑zag varint encodings incorrect: {buf:?}"
852 );
853 Ok(())
854 }
855
856 #[test]
857 fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), ArrowError> {
858 for rel in files() {
859 let path = arrow_test_data(rel);
860 let rdr_file = File::open(&path).expect("open input avro");
861 let reader = ReaderBuilder::new()
862 .build(BufReader::new(rdr_file))
863 .expect("build reader");
864 let schema = reader.schema();
865 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
866 let original =
867 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
868 let tmp = NamedTempFile::new().expect("create temp file");
869 let out_path = tmp.into_temp_path();
870 let out_file = File::create(&out_path).expect("create temp avro");
871 let codec = if rel.contains(".snappy.") {
872 Some(CompressionCodec::Snappy)
873 } else if rel.contains(".zstandard.") {
874 Some(CompressionCodec::ZStandard)
875 } else if rel.contains(".bzip2.") {
876 Some(CompressionCodec::Bzip2)
877 } else if rel.contains(".xz.") {
878 Some(CompressionCodec::Xz)
879 } else {
880 None
881 };
882 let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
883 .with_compression(codec)
884 .build::<_, AvroOcfFormat>(out_file)?;
885 writer.write(&original)?;
886 writer.finish()?;
887 drop(writer);
888 let rt_file = File::open(&out_path).expect("open roundtrip avro");
889 let rt_reader = ReaderBuilder::new()
890 .build(BufReader::new(rt_file))
891 .expect("build roundtrip reader");
892 let rt_schema = rt_reader.schema();
893 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
894 let roundtrip =
895 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
896 assert_eq!(
897 roundtrip, original,
898 "Round-trip batch mismatch for file: {}",
899 rel
900 );
901 }
902 Ok(())
903 }
904
905 #[test]
906 fn test_roundtrip_nested_records_writer() -> Result<(), ArrowError> {
907 let path = arrow_test_data("avro/nested_records.avro");
908 let rdr_file = File::open(&path).expect("open nested_records.avro");
909 let reader = ReaderBuilder::new()
910 .build(BufReader::new(rdr_file))
911 .expect("build reader for nested_records.avro");
912 let schema = reader.schema();
913 let batches = reader.collect::<Result<Vec<_>, _>>()?;
914 let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
915 let tmp = NamedTempFile::new().expect("create temp file");
916 let out_path = tmp.into_temp_path();
917 {
918 let out_file = File::create(&out_path).expect("create output avro");
919 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
920 writer.write(&original)?;
921 writer.finish()?;
922 }
923 let rt_file = File::open(&out_path).expect("open round_trip avro");
924 let rt_reader = ReaderBuilder::new()
925 .build(BufReader::new(rt_file))
926 .expect("build round_trip reader");
927 let rt_schema = rt_reader.schema();
928 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
929 let round_trip =
930 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
931 assert_eq!(
932 round_trip, original,
933 "Round-trip batch mismatch for nested_records.avro"
934 );
935 Ok(())
936 }
937
938 #[test]
939 #[cfg(feature = "snappy")]
940 fn test_roundtrip_nested_lists_writer() -> Result<(), ArrowError> {
941 let path = arrow_test_data("avro/nested_lists.snappy.avro");
942 let rdr_file = File::open(&path).expect("open nested_lists.snappy.avro");
943 let reader = ReaderBuilder::new()
944 .build(BufReader::new(rdr_file))
945 .expect("build reader for nested_lists.snappy.avro");
946 let schema = reader.schema();
947 let batches = reader.collect::<Result<Vec<_>, _>>()?;
948 let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
949 let tmp = NamedTempFile::new().expect("create temp file");
950 let out_path = tmp.into_temp_path();
951 {
952 let out_file = File::create(&out_path).expect("create output avro");
953 let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
954 .with_compression(Some(CompressionCodec::Snappy))
955 .build::<_, AvroOcfFormat>(out_file)?;
956 writer.write(&original)?;
957 writer.finish()?;
958 }
959 let rt_file = File::open(&out_path).expect("open round_trip avro");
960 let rt_reader = ReaderBuilder::new()
961 .build(BufReader::new(rt_file))
962 .expect("build round_trip reader");
963 let rt_schema = rt_reader.schema();
964 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
965 let round_trip =
966 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
967 assert_eq!(
968 round_trip, original,
969 "Round-trip batch mismatch for nested_lists.snappy.avro"
970 );
971 Ok(())
972 }
973
974 #[test]
975 fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> {
976 let path = arrow_test_data("avro/simple_fixed.avro");
977 let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
978 let reader = ReaderBuilder::new()
979 .build(BufReader::new(rdr_file))
980 .expect("build avro reader");
981 let schema = reader.schema();
982 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
983 let original =
984 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
985 let tmp = NamedTempFile::new().expect("create temp file");
986 let out_file = File::create(tmp.path()).expect("create temp avro");
987 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
988 writer.write(&original)?;
989 writer.finish()?;
990 drop(writer);
991 let rt_file = File::open(tmp.path()).expect("open round_trip avro");
992 let rt_reader = ReaderBuilder::new()
993 .build(BufReader::new(rt_file))
994 .expect("build round_trip reader");
995 let rt_schema = rt_reader.schema();
996 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
997 let round_trip =
998 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
999 assert_eq!(round_trip, original);
1000 Ok(())
1001 }
1002
1003 #[test]
1005 #[cfg(feature = "canonical_extension_types")]
1006 fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> {
1007 use arrow_schema::{DataType, IntervalUnit};
1008 let in_file =
1009 File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1010 let reader = ReaderBuilder::new()
1011 .build(BufReader::new(in_file))
1012 .expect("build reader for duration_uuid.avro");
1013 let in_schema = reader.schema();
1014 let has_mdn = in_schema.fields().iter().any(|f| {
1015 matches!(
1016 f.data_type(),
1017 DataType::Interval(IntervalUnit::MonthDayNano)
1018 )
1019 });
1020 assert!(
1021 has_mdn,
1022 "expected at least one Interval(MonthDayNano) field in duration_uuid.avro"
1023 );
1024 let has_uuid_fixed = in_schema
1025 .fields()
1026 .iter()
1027 .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16)));
1028 assert!(
1029 has_uuid_fixed,
1030 "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro"
1031 );
1032 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1033 let input =
1034 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1035 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1037 writer.write(&input)?;
1038 writer.finish()?;
1039 let bytes = writer.into_inner();
1040 let rt_reader = ReaderBuilder::new()
1041 .build(Cursor::new(bytes))
1042 .expect("build round_trip reader");
1043 let rt_schema = rt_reader.schema();
1044 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1045 let round_trip =
1046 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1047 assert_eq!(round_trip, input);
1048 Ok(())
1049 }
1050
1051 #[test]
1053 #[cfg(not(feature = "canonical_extension_types"))]
1054 fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> Result<(), ArrowError>
1055 {
1056 use arrow::datatypes::{DataType, IntervalUnit};
1057 use std::io::BufReader;
1058
1059 let in_file =
1061 File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1062 let reader = ReaderBuilder::new()
1063 .build(BufReader::new(in_file))
1064 .expect("build reader for duration_uuid.avro");
1065 let in_schema = reader.schema();
1066
1067 assert!(
1069 in_schema.fields().iter().any(|f| {
1070 matches!(
1071 f.data_type(),
1072 DataType::Interval(IntervalUnit::MonthDayNano)
1073 )
1074 }),
1075 "expected at least one Interval(MonthDayNano) field"
1076 );
1077 assert!(
1078 in_schema
1079 .fields()
1080 .iter()
1081 .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))),
1082 "expected a FixedSizeBinary(16) field (uuid)"
1083 );
1084
1085 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1086 let input =
1087 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1088
1089 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1091 writer.write(&input)?;
1092 writer.finish()?;
1093 let bytes = writer.into_inner();
1094 let rt_reader = ReaderBuilder::new()
1095 .build(Cursor::new(bytes))
1096 .expect("build round_trip reader");
1097 let rt_schema = rt_reader.schema();
1098 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1099 let round_trip =
1100 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1101
1102 assert_eq!(
1104 round_trip.column(0),
1105 input.column(0),
1106 "duration column values differ"
1107 );
1108 assert_eq!(round_trip.column(1), input.column(1), "uuid bytes differ");
1109
1110 let uuid_rt = rt_schema.field_with_name("uuid_field")?;
1113 assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
1114 assert_eq!(
1115 uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
1116 Some("uuid"),
1117 "expected `logicalType = \"uuid\"` on round-tripped field metadata"
1118 );
1119
1120 let dur_rt = rt_schema.field_with_name("duration_field")?;
1122 assert!(matches!(
1123 dur_rt.data_type(),
1124 DataType::Interval(IntervalUnit::MonthDayNano)
1125 ));
1126
1127 Ok(())
1128 }
1129
1130 #[test]
1134 #[cfg(feature = "snappy")]
1136 fn test_nonnullable_impala_roundtrip_writer() -> Result<(), ArrowError> {
1137 let path = arrow_test_data("avro/nonnullable.impala.avro");
1139 let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro");
1140 let reader = ReaderBuilder::new()
1141 .build(BufReader::new(rdr_file))
1142 .expect("build reader for nonnullable.impala.avro");
1143 let in_schema = reader.schema();
1145 let has_map = in_schema
1147 .fields()
1148 .iter()
1149 .any(|f| matches!(f.data_type(), DataType::Map(_, _)));
1150 assert!(
1151 has_map,
1152 "expected at least one Map field in avro/nonnullable.impala.avro"
1153 );
1154
1155 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1156 let original =
1157 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1158 let buffer = Vec::<u8>::new();
1160 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1161 writer.write(&original)?;
1162 writer.finish()?;
1163 let out_bytes = writer.into_inner();
1164 let rt_reader = ReaderBuilder::new()
1166 .build(Cursor::new(out_bytes))
1167 .expect("build reader for round-tripped in-memory OCF");
1168 let rt_schema = rt_reader.schema();
1169 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1170 let roundtrip =
1171 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1172 assert_eq!(
1174 roundtrip, original,
1175 "Round-trip Avro map data mismatch for nonnullable.impala.avro"
1176 );
1177 Ok(())
1178 }
1179
1180 #[test]
1181 #[cfg(feature = "snappy")]
1183 fn test_roundtrip_decimals_via_writer() -> Result<(), ArrowError> {
1184 let files: [(&str, bool); 8] = [
1186 ("avro/fixed_length_decimal.avro", true), ("avro/fixed_length_decimal_legacy.avro", true), ("avro/int32_decimal.avro", true), ("avro/int64_decimal.avro", true), ("test/data/int256_decimal.avro", false), ("test/data/fixed256_decimal.avro", false), ("test/data/fixed_length_decimal_legacy_32.avro", false), ("test/data/int128_decimal.avro", false), ];
1195 for (rel, in_test_data_dir) in files {
1196 let path: String = if in_test_data_dir {
1198 arrow_test_data(rel)
1199 } else {
1200 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1201 .join(rel)
1202 .to_string_lossy()
1203 .into_owned()
1204 };
1205 let f_in = File::open(&path).expect("open input avro");
1207 let rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
1208 let in_schema = rdr.schema();
1209 let in_batches = rdr.collect::<Result<Vec<_>, _>>()?;
1210 let original =
1211 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
1212 let tmp = NamedTempFile::new().expect("create temp file");
1214 let out_path = tmp.into_temp_path();
1215 let out_file = File::create(&out_path).expect("create temp avro");
1216 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1217 writer.write(&original)?;
1218 writer.finish()?;
1219 let f_rt = File::open(&out_path).expect("open roundtrip avro");
1221 let rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
1222 let rt_schema = rt_rdr.schema();
1223 let rt_batches = rt_rdr.collect::<Result<Vec<_>, _>>()?;
1224 let roundtrip =
1225 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt");
1226 assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}");
1227 }
1228 Ok(())
1229 }
1230
1231 #[test]
1232 fn test_named_types_complex_roundtrip() -> Result<(), ArrowError> {
1233 let path =
1235 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
1236 let rdr_file = File::open(&path).expect("open avro/named_types_complex.avro");
1237
1238 let reader = ReaderBuilder::new()
1239 .build(BufReader::new(rdr_file))
1240 .expect("build reader for named_types_complex.avro");
1241
1242 let in_schema = reader.schema();
1244 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1245 let original =
1246 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1247
1248 {
1250 let arrow_schema = original.schema();
1251
1252 let author_field = arrow_schema.field_with_name("author")?;
1254 let author_type = author_field.data_type();
1255 let editors_field = arrow_schema.field_with_name("editors")?;
1256 let editors_item_type = match editors_field.data_type() {
1257 DataType::List(item_field) => item_field.data_type(),
1258 other => panic!("Editors field should be a List, but was {:?}", other),
1259 };
1260 assert_eq!(
1261 author_type, editors_item_type,
1262 "The DataType for the 'author' struct and the 'editors' list items must be identical"
1263 );
1264
1265 let status_field = arrow_schema.field_with_name("status")?;
1267 let status_type = status_field.data_type();
1268 assert!(
1269 matches!(status_type, DataType::Dictionary(_, _)),
1270 "Status field should be a Dictionary (Enum)"
1271 );
1272
1273 let prev_status_field = arrow_schema.field_with_name("previous_status")?;
1274 let prev_status_type = prev_status_field.data_type();
1275 assert_eq!(
1276 status_type, prev_status_type,
1277 "The DataType for 'status' and 'previous_status' enums must be identical"
1278 );
1279
1280 let content_hash_field = arrow_schema.field_with_name("content_hash")?;
1282 let content_hash_type = content_hash_field.data_type();
1283 assert!(
1284 matches!(content_hash_type, DataType::FixedSizeBinary(16)),
1285 "Content hash should be FixedSizeBinary(16)"
1286 );
1287
1288 let thumb_hash_field = arrow_schema.field_with_name("thumbnail_hash")?;
1289 let thumb_hash_type = thumb_hash_field.data_type();
1290 assert_eq!(
1291 content_hash_type, thumb_hash_type,
1292 "The DataType for 'content_hash' and 'thumbnail_hash' fixed types must be identical"
1293 );
1294 }
1295
1296 let buffer: Vec<u8> = Vec::new();
1298 let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1299 writer.write(&original)?;
1300 writer.finish()?;
1301 let bytes = writer.into_inner();
1302
1303 let rt_reader = ReaderBuilder::new()
1305 .build(Cursor::new(bytes))
1306 .expect("build reader for round-trip");
1307 let rt_schema = rt_reader.schema();
1308 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1309 let roundtrip =
1310 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1311
1312 assert_eq!(
1313 roundtrip, original,
1314 "Avro complex named types round-trip mismatch"
1315 );
1316
1317 Ok(())
1318 }
1319
1320 fn assert_schema_is_semantically_equivalent(expected: &Schema, actual: &Schema) {
1327 assert_metadata_is_superset(expected.metadata(), actual.metadata(), "Schema");
1329
1330 assert_eq!(
1332 expected.fields().len(),
1333 actual.fields().len(),
1334 "Schema must have the same number of fields"
1335 );
1336
1337 for (expected_field, actual_field) in expected.fields().iter().zip(actual.fields().iter()) {
1338 assert_field_is_semantically_equivalent(expected_field, actual_field);
1339 }
1340 }
1341
1342 fn assert_field_is_semantically_equivalent(expected: &Field, actual: &Field) {
1343 let context = format!("Field '{}'", expected.name());
1344
1345 assert_eq!(
1346 expected.name(),
1347 actual.name(),
1348 "{context}: names must match"
1349 );
1350 assert_eq!(
1351 expected.is_nullable(),
1352 actual.is_nullable(),
1353 "{context}: nullability must match"
1354 );
1355
1356 assert_datatype_is_semantically_equivalent(
1358 expected.data_type(),
1359 actual.data_type(),
1360 &context,
1361 );
1362
1363 assert_metadata_is_superset(expected.metadata(), actual.metadata(), &context);
1365 }
1366
1367 fn assert_datatype_is_semantically_equivalent(
1368 expected: &DataType,
1369 actual: &DataType,
1370 context: &str,
1371 ) {
1372 match (expected, actual) {
1373 (DataType::List(expected_field), DataType::List(actual_field))
1374 | (DataType::LargeList(expected_field), DataType::LargeList(actual_field))
1375 | (DataType::Map(expected_field, _), DataType::Map(actual_field, _)) => {
1376 assert_field_is_semantically_equivalent(expected_field, actual_field);
1377 }
1378 (DataType::Struct(expected_fields), DataType::Struct(actual_fields)) => {
1379 assert_eq!(
1380 expected_fields.len(),
1381 actual_fields.len(),
1382 "{context}: struct must have same number of fields"
1383 );
1384 for (ef, af) in expected_fields.iter().zip(actual_fields.iter()) {
1385 assert_field_is_semantically_equivalent(ef, af);
1386 }
1387 }
1388 (
1389 DataType::Union(expected_fields, expected_mode),
1390 DataType::Union(actual_fields, actual_mode),
1391 ) => {
1392 assert_eq!(
1393 expected_mode, actual_mode,
1394 "{context}: union mode must match"
1395 );
1396 assert_eq!(
1397 expected_fields.len(),
1398 actual_fields.len(),
1399 "{context}: union must have same number of variants"
1400 );
1401 for ((exp_id, exp_field), (act_id, act_field)) in
1402 expected_fields.iter().zip(actual_fields.iter())
1403 {
1404 assert_eq!(exp_id, act_id, "{context}: union type ids must match");
1405 assert_field_is_semantically_equivalent(exp_field, act_field);
1406 }
1407 }
1408 _ => {
1409 assert_eq!(expected, actual, "{context}: data types must be identical");
1410 }
1411 }
1412 }
1413
1414 fn assert_batch_data_is_identical(expected: &RecordBatch, actual: &RecordBatch) {
1415 assert_eq!(
1416 expected.num_columns(),
1417 actual.num_columns(),
1418 "RecordBatches must have the same number of columns"
1419 );
1420 assert_eq!(
1421 expected.num_rows(),
1422 actual.num_rows(),
1423 "RecordBatches must have the same number of rows"
1424 );
1425
1426 for i in 0..expected.num_columns() {
1427 let context = format!("Column {i}");
1428 let expected_col = expected.column(i);
1429 let actual_col = actual.column(i);
1430 assert_array_data_is_identical(expected_col, actual_col, &context);
1431 }
1432 }
1433
1434 fn assert_array_data_is_identical(expected: &dyn Array, actual: &dyn Array, context: &str) {
1436 assert_eq!(
1437 expected.nulls(),
1438 actual.nulls(),
1439 "{context}: null buffers must match"
1440 );
1441 assert_eq!(
1442 expected.len(),
1443 actual.len(),
1444 "{context}: array lengths must match"
1445 );
1446
1447 match (expected.data_type(), actual.data_type()) {
1448 (DataType::Union(expected_fields, _), DataType::Union(..)) => {
1449 let expected_union = expected.as_any().downcast_ref::<UnionArray>().unwrap();
1450 let actual_union = actual.as_any().downcast_ref::<UnionArray>().unwrap();
1451
1452 assert_eq!(
1454 &expected.to_data().buffers()[0],
1455 &actual.to_data().buffers()[0],
1456 "{context}: union type_ids buffer mismatch"
1457 );
1458
1459 if expected.to_data().buffers().len() > 1 {
1461 assert_eq!(
1462 &expected.to_data().buffers()[1],
1463 &actual.to_data().buffers()[1],
1464 "{context}: union value_offsets buffer mismatch"
1465 );
1466 }
1467
1468 for (type_id, _) in expected_fields.iter() {
1470 let child_context = format!("{context} -> child variant {type_id}");
1471 assert_array_data_is_identical(
1472 expected_union.child(type_id),
1473 actual_union.child(type_id),
1474 &child_context,
1475 );
1476 }
1477 }
1478 (DataType::Struct(_), DataType::Struct(_)) => {
1479 let expected_struct = expected.as_any().downcast_ref::<StructArray>().unwrap();
1480 let actual_struct = actual.as_any().downcast_ref::<StructArray>().unwrap();
1481 for i in 0..expected_struct.num_columns() {
1482 let child_context = format!("{context} -> struct child {i}");
1483 assert_array_data_is_identical(
1484 expected_struct.column(i),
1485 actual_struct.column(i),
1486 &child_context,
1487 );
1488 }
1489 }
1490 _ => {
1492 assert_eq!(
1493 expected.to_data().buffers(),
1494 actual.to_data().buffers(),
1495 "{context}: data buffers must match"
1496 );
1497 }
1498 }
1499 }
1500
1501 fn assert_metadata_is_superset(
1504 expected_meta: &HashMap<String, String>,
1505 actual_meta: &HashMap<String, String>,
1506 context: &str,
1507 ) {
1508 let allowed_additions: HashSet<&str> =
1509 vec!["arrowUnionMode", "arrowUnionTypeIds", "avro.name"]
1510 .into_iter()
1511 .collect();
1512 for (key, expected_value) in expected_meta {
1513 match actual_meta.get(key) {
1514 Some(actual_value) => assert_eq!(
1515 expected_value, actual_value,
1516 "{context}: preserved metadata for key '{key}' must have the same value"
1517 ),
1518 None => panic!("{context}: metadata key '{key}' was lost during roundtrip"),
1519 }
1520 }
1521 for key in actual_meta.keys() {
1522 if !expected_meta.contains_key(key) && !allowed_additions.contains(key.as_str()) {
1523 panic!("{context}: unexpected metadata key '{key}' was added during roundtrip");
1524 }
1525 }
1526 }
1527
1528 #[test]
1529 fn test_union_roundtrip() -> Result<(), ArrowError> {
1530 let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1531 .join("test/data/union_fields.avro")
1532 .to_string_lossy()
1533 .into_owned();
1534 let rdr_file = File::open(&file_path).expect("open avro/union_fields.avro");
1535 let reader = ReaderBuilder::new()
1536 .build(BufReader::new(rdr_file))
1537 .expect("build reader for union_fields.avro");
1538 let schema = reader.schema();
1539 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1540 let original =
1541 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1542 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1543 writer.write(&original)?;
1544 writer.finish()?;
1545 let bytes = writer.into_inner();
1546 let rt_reader = ReaderBuilder::new()
1547 .build(Cursor::new(bytes))
1548 .expect("build round_trip reader");
1549 let rt_schema = rt_reader.schema();
1550 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1551 let round_trip =
1552 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1553
1554 assert_schema_is_semantically_equivalent(&original.schema(), &round_trip.schema());
1557
1558 assert_batch_data_is_identical(&original, &round_trip);
1559 Ok(())
1560 }
1561
1562 #[test]
1563 fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), ArrowError> {
1564 let path = arrow_test_data("avro/simple_enum.avro");
1566 let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
1567 let reader = ReaderBuilder::new()
1568 .build(BufReader::new(rdr_file))
1569 .expect("build reader for simple_enum.avro");
1570 let in_schema = reader.schema();
1572 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1573 let original =
1574 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1575 let has_enum_dict = in_schema.fields().iter().any(|f| {
1577 matches!(
1578 f.data_type(),
1579 DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8
1580 )
1581 });
1582 assert!(
1583 has_enum_dict,
1584 "Expected at least one enum-mapped Dictionary<Int32, Utf8> field"
1585 );
1586 let buffer: Vec<u8> = Vec::new();
1589 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1590 writer.write(&original)?;
1591 writer.finish()?;
1592 let bytes = writer.into_inner();
1593 let rt_reader = ReaderBuilder::new()
1595 .build(Cursor::new(bytes))
1596 .expect("reader for round-trip");
1597 let rt_schema = rt_reader.schema();
1598 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1599 let roundtrip =
1600 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1601 assert_eq!(roundtrip, original, "Avro enum round-trip mismatch");
1602 Ok(())
1603 }
1604
1605 #[test]
1606 fn test_builder_propagates_capacity_to_writer() -> Result<(), ArrowError> {
1607 let cap = 64 * 1024;
1608 let buffer = Vec::<u8>::new();
1609 let mut writer = WriterBuilder::new(make_schema())
1610 .with_capacity(cap)
1611 .build::<_, AvroOcfFormat>(buffer)?;
1612 assert_eq!(writer.capacity, cap, "builder capacity not propagated");
1613 let batch = make_batch();
1614 writer.write(&batch)?;
1615 writer.finish()?;
1616 let out = writer.into_inner();
1617 assert_eq!(&out[..4], b"Obj\x01", "OCF magic missing/incorrect");
1618 Ok(())
1619 }
1620
1621 #[test]
1622 fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), ArrowError> {
1623 use arrow_array::{ArrayRef, Int32Array};
1624 use arrow_schema::{DataType, Field, Schema};
1625 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1626 let batch = RecordBatch::try_new(
1627 Arc::new(schema.clone()),
1628 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1629 )?;
1630 let cap = 8192;
1631 let mut writer = WriterBuilder::new(schema)
1632 .with_capacity(cap)
1633 .build::<_, AvroSoeFormat>(Vec::new())?;
1634 assert_eq!(writer.capacity, cap);
1635 writer.write(&batch)?;
1636 let _bytes = writer.into_inner();
1637 Ok(())
1638 }
1639
1640 #[cfg(feature = "avro_custom_types")]
1641 #[test]
1642 fn test_roundtrip_duration_logical_types_ocf() -> Result<(), ArrowError> {
1643 let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1644 .join("test/data/duration_logical_types.avro")
1645 .to_string_lossy()
1646 .into_owned();
1647
1648 let in_file = File::open(&file_path)
1649 .unwrap_or_else(|_| panic!("Failed to open test file: {}", file_path));
1650
1651 let reader = ReaderBuilder::new()
1652 .build(BufReader::new(in_file))
1653 .expect("build reader for duration_logical_types.avro");
1654 let in_schema = reader.schema();
1655
1656 let expected_units: HashSet<TimeUnit> = [
1657 TimeUnit::Nanosecond,
1658 TimeUnit::Microsecond,
1659 TimeUnit::Millisecond,
1660 TimeUnit::Second,
1661 ]
1662 .into_iter()
1663 .collect();
1664
1665 let found_units: HashSet<TimeUnit> = in_schema
1666 .fields()
1667 .iter()
1668 .filter_map(|f| match f.data_type() {
1669 DataType::Duration(unit) => Some(*unit),
1670 _ => None,
1671 })
1672 .collect();
1673
1674 assert_eq!(
1675 found_units, expected_units,
1676 "Expected to find all four Duration TimeUnits in the schema from the initial read"
1677 );
1678
1679 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1680 let input =
1681 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1682
1683 let tmp = NamedTempFile::new().expect("create temp file");
1684 {
1685 let out_file = File::create(tmp.path()).expect("create temp avro");
1686 let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?;
1687 writer.write(&input)?;
1688 writer.finish()?;
1689 }
1690
1691 let rt_file = File::open(tmp.path()).expect("open round_trip avro");
1692 let rt_reader = ReaderBuilder::new()
1693 .build(BufReader::new(rt_file))
1694 .expect("build round_trip reader");
1695 let rt_schema = rt_reader.schema();
1696 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1697 let round_trip =
1698 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1699
1700 assert_eq!(round_trip, input);
1701 Ok(())
1702 }
1703
1704 #[cfg(feature = "avro_custom_types")]
1705 #[test]
1706 fn test_run_end_encoded_roundtrip_writer() -> Result<(), ArrowError> {
1707 let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
1708 let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1709 let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
1710 let field = Field::new("x", ree.data_type().clone(), true);
1711 let schema = Schema::new(vec![field]);
1712 let batch = RecordBatch::try_new(
1713 Arc::new(schema.clone()),
1714 vec![Arc::new(ree.clone()) as ArrayRef],
1715 )?;
1716 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1717 writer.write(&batch)?;
1718 writer.finish()?;
1719 let bytes = writer.into_inner();
1720 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1721 let out_schema = reader.schema();
1722 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1723 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1724 assert_eq!(out.num_columns(), 1);
1725 assert_eq!(out.num_rows(), 8);
1726 match out.schema().field(0).data_type() {
1727 DataType::RunEndEncoded(run_ends_field, values_field) => {
1728 assert_eq!(run_ends_field.name(), "run_ends");
1729 assert_eq!(run_ends_field.data_type(), &DataType::Int32);
1730 assert_eq!(values_field.name(), "values");
1731 assert_eq!(values_field.data_type(), &DataType::Int32);
1732 assert!(values_field.is_nullable());
1733 let got_ree = out
1734 .column(0)
1735 .as_any()
1736 .downcast_ref::<RunArray<Int32Type>>()
1737 .expect("RunArray<Int32Type>");
1738 assert_eq!(got_ree, &ree);
1739 }
1740 other => panic!(
1741 "Unexpected DataType for round-tripped RunEndEncoded column: {:?}",
1742 other
1743 ),
1744 }
1745 Ok(())
1746 }
1747
1748 #[cfg(feature = "avro_custom_types")]
1749 #[test]
1750 fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> Result<(), ArrowError>
1751 {
1752 let run_ends = Int16Array::from(vec![2, 5, 7]); let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
1754 let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
1755 let field = Field::new("s", ree.data_type().clone(), true);
1756 let schema = Schema::new(vec![field]);
1757 let batch = RecordBatch::try_new(
1758 Arc::new(schema.clone()),
1759 vec![Arc::new(ree.clone()) as ArrayRef],
1760 )?;
1761 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1762 writer.write(&batch)?;
1763 writer.finish()?;
1764 let bytes = writer.into_inner();
1765 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1766 let out_schema = reader.schema();
1767 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1768 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1769 assert_eq!(out.num_columns(), 1);
1770 assert_eq!(out.num_rows(), 7);
1771 match out.schema().field(0).data_type() {
1772 DataType::RunEndEncoded(run_ends_field, values_field) => {
1773 assert_eq!(run_ends_field.data_type(), &DataType::Int16);
1774 assert_eq!(values_field.data_type(), &DataType::Utf8);
1775 assert!(
1776 values_field.is_nullable(),
1777 "REE 'values' child should be nullable"
1778 );
1779 let got = out
1780 .column(0)
1781 .as_any()
1782 .downcast_ref::<RunArray<Int16Type>>()
1783 .expect("RunArray<Int16Type>");
1784 assert_eq!(got, &ree);
1785 }
1786 other => panic!("Unexpected DataType: {:?}", other),
1787 }
1788 Ok(())
1789 }
1790
1791 #[cfg(feature = "avro_custom_types")]
1792 #[test]
1793 fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer()
1794 -> Result<(), ArrowError> {
1795 let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
1796 let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
1797 let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
1798 let field = Field::new("y", ree.data_type().clone(), true);
1799 let schema = Schema::new(vec![field]);
1800 let batch = RecordBatch::try_new(
1801 Arc::new(schema.clone()),
1802 vec![Arc::new(ree.clone()) as ArrayRef],
1803 )?;
1804 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1805 writer.write(&batch)?;
1806 writer.finish()?;
1807 let bytes = writer.into_inner();
1808 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1809 let out_schema = reader.schema();
1810 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1811 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1812 assert_eq!(out.num_columns(), 1);
1813 assert_eq!(out.num_rows(), 8);
1814 match out.schema().field(0).data_type() {
1815 DataType::RunEndEncoded(run_ends_field, values_field) => {
1816 assert_eq!(run_ends_field.data_type(), &DataType::Int64);
1817 assert_eq!(values_field.data_type(), &DataType::Int32);
1818 assert!(values_field.is_nullable());
1819 let got = out
1820 .column(0)
1821 .as_any()
1822 .downcast_ref::<RunArray<Int64Type>>()
1823 .expect("RunArray<Int64Type>");
1824 assert_eq!(got, &ree);
1825 }
1826 other => panic!("Unexpected DataType for REE column: {:?}", other),
1827 }
1828 Ok(())
1829 }
1830
1831 #[cfg(feature = "avro_custom_types")]
1832 #[test]
1833 fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), ArrowError> {
1834 let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
1835 let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1836 let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
1837 let offset = 1usize;
1838 let length = 6usize;
1839 let base_values = base
1840 .values()
1841 .as_any()
1842 .downcast_ref::<Int32Array>()
1843 .expect("REE values as Int32Array");
1844 let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
1845 for i in offset..offset + length {
1846 let phys = base.get_physical_index(i);
1847 let v = if base_values.is_null(phys) {
1848 None
1849 } else {
1850 Some(base_values.value(phys))
1851 };
1852 logical_window.push(v);
1853 }
1854
1855 fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array, Int32Array) {
1856 if vals.is_empty() {
1857 return (Int32Array::new_null(0), Int32Array::new_null(0));
1858 }
1859 let mut run_ends_out: Vec<i32> = Vec::new();
1860 let mut run_vals_out: Vec<Option<i32>> = Vec::new();
1861 let mut cur = vals[0];
1862 let mut len = 1i32;
1863 for v in &vals[1..] {
1864 if *v == cur {
1865 len += 1;
1866 } else {
1867 let last_end = run_ends_out.last().copied().unwrap_or(0);
1868 run_ends_out.push(last_end + len);
1869 run_vals_out.push(cur);
1870 cur = *v;
1871 len = 1;
1872 }
1873 }
1874 let last_end = run_ends_out.last().copied().unwrap_or(0);
1875 run_ends_out.push(last_end + len);
1876 run_vals_out.push(cur);
1877 (
1878 Int32Array::from(run_ends_out),
1879 Int32Array::from(run_vals_out),
1880 )
1881 }
1882 let (owned_run_ends, owned_run_values) = compress_run_ends_i32(&logical_window);
1883 let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends, &owned_run_values)?;
1884 let field = Field::new("x", owned_slice.data_type().clone(), true);
1885 let schema = Schema::new(vec![field]);
1886 let batch = RecordBatch::try_new(
1887 Arc::new(schema.clone()),
1888 vec![Arc::new(owned_slice.clone()) as ArrayRef],
1889 )?;
1890 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1891 writer.write(&batch)?;
1892 writer.finish()?;
1893 let bytes = writer.into_inner();
1894 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1895 let out_schema = reader.schema();
1896 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1897 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1898 assert_eq!(out.num_columns(), 1);
1899 assert_eq!(out.num_rows(), length);
1900 match out.schema().field(0).data_type() {
1901 DataType::RunEndEncoded(run_ends_field, values_field) => {
1902 assert_eq!(run_ends_field.data_type(), &DataType::Int32);
1903 assert_eq!(values_field.data_type(), &DataType::Int32);
1904 assert!(values_field.is_nullable());
1905 let got = out
1906 .column(0)
1907 .as_any()
1908 .downcast_ref::<RunArray<Int32Type>>()
1909 .expect("RunArray<Int32Type>");
1910 fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
1911 let vals = a
1912 .values()
1913 .as_any()
1914 .downcast_ref::<Int32Array>()
1915 .expect("REE values as Int32Array");
1916 let mut out: Vec<Option<i32>> = Vec::with_capacity(a.len());
1917 for i in 0..a.len() {
1918 let phys = a.get_physical_index(i);
1919 out.push(if vals.is_null(phys) {
1920 None
1921 } else {
1922 Some(vals.value(phys))
1923 });
1924 }
1925 Int32Array::from(out)
1926 }
1927 let got_logical = expand_ree_to_int32(got);
1928 let expected_logical = Int32Array::from(logical_window);
1929 assert_eq!(
1930 got_logical, expected_logical,
1931 "Logical values differ after REE slice round-trip"
1932 );
1933 }
1934 other => panic!("Unexpected DataType for REE column: {:?}", other),
1935 }
1936 Ok(())
1937 }
1938
1939 #[cfg(not(feature = "avro_custom_types"))]
1940 #[test]
1941 fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), ArrowError> {
1942 use arrow_schema::{DataType, Field, Schema};
1943 let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
1944 let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1945 let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
1946 &run_ends,
1947 &run_values,
1948 )?;
1949 let field = Field::new("x", ree.data_type().clone(), true);
1950 let schema = Schema::new(vec![field]);
1951 let batch =
1952 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1953 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1954 writer.write(&batch)?;
1955 writer.finish()?;
1956 let bytes = writer.into_inner();
1957 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1958 let out_schema = reader.schema();
1959 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1960 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1961 assert_eq!(out.num_columns(), 1);
1962 assert_eq!(out.num_rows(), 8);
1963 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
1964 let got = out
1965 .column(0)
1966 .as_any()
1967 .downcast_ref::<Int32Array>()
1968 .expect("Int32Array");
1969 let expected = Int32Array::from(vec![
1970 Some(1),
1971 Some(1),
1972 Some(1),
1973 Some(2),
1974 Some(2),
1975 None,
1976 None,
1977 Some(3),
1978 ]);
1979 assert_eq!(got, &expected);
1980 Ok(())
1981 }
1982
1983 #[cfg(not(feature = "avro_custom_types"))]
1984 #[test]
1985 fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
1986 -> Result<(), ArrowError> {
1987 use arrow_schema::{DataType, Field, Schema};
1988 let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
1989 let run_values = arrow_array::StringArray::from(vec![Some("a"), None, Some("c")]);
1990 let ree = arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
1991 &run_ends,
1992 &run_values,
1993 )?;
1994 let field = Field::new("s", ree.data_type().clone(), true);
1995 let schema = Schema::new(vec![field]);
1996 let batch =
1997 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1998 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1999 writer.write(&batch)?;
2000 writer.finish()?;
2001 let bytes = writer.into_inner();
2002 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2003 let out_schema = reader.schema();
2004 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2005 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2006 assert_eq!(out.num_columns(), 1);
2007 assert_eq!(out.num_rows(), 7);
2008 assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
2009 let got = out
2010 .column(0)
2011 .as_any()
2012 .downcast_ref::<arrow_array::StringArray>()
2013 .expect("StringArray");
2014 let expected = arrow_array::StringArray::from(vec![
2015 Some("a"),
2016 Some("a"),
2017 None,
2018 None,
2019 None,
2020 Some("c"),
2021 Some("c"),
2022 ]);
2023 assert_eq!(got, &expected);
2024 Ok(())
2025 }
2026
2027 #[cfg(not(feature = "avro_custom_types"))]
2028 #[test]
2029 fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
2030 -> Result<(), ArrowError> {
2031 use arrow_schema::{DataType, Field, Schema};
2032 let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
2033 let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2034 let ree = arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
2035 &run_ends,
2036 &run_values,
2037 )?;
2038 let field = Field::new("y", ree.data_type().clone(), true);
2039 let schema = Schema::new(vec![field]);
2040 let batch =
2041 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2042 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2043 writer.write(&batch)?;
2044 writer.finish()?;
2045 let bytes = writer.into_inner();
2046 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2047 let out_schema = reader.schema();
2048 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2049 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2050 assert_eq!(out.num_columns(), 1);
2051 assert_eq!(out.num_rows(), 8);
2052 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2053 let got = out
2054 .column(0)
2055 .as_any()
2056 .downcast_ref::<Int32Array>()
2057 .expect("Int32Array");
2058 let expected = Int32Array::from(vec![
2059 Some(999),
2060 Some(999),
2061 Some(999),
2062 Some(999),
2063 Some(-5),
2064 Some(-5),
2065 Some(-5),
2066 Some(-5),
2067 ]);
2068 assert_eq!(got, &expected);
2069 Ok(())
2070 }
2071
2072 #[cfg(not(feature = "avro_custom_types"))]
2073 #[test]
2074 fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> Result<(), ArrowError> {
2075 use arrow_schema::{DataType, Field, Schema};
2076 let run_ends = Int32Array::from(vec![2, 4, 6]);
2077 let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
2078 let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2079 &run_ends,
2080 &run_values,
2081 )?;
2082 let field = Field::new("x", ree.data_type().clone(), true);
2083 let schema = Schema::new(vec![field]);
2084 let batch =
2085 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2086 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2087 writer.write(&batch)?;
2088 writer.finish()?;
2089 let bytes = writer.into_inner();
2090 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2091 let out_schema = reader.schema();
2092 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2093 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2094 assert_eq!(out.num_columns(), 1);
2095 assert_eq!(out.num_rows(), 6);
2096 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2097 let got = out
2098 .column(0)
2099 .as_any()
2100 .downcast_ref::<Int32Array>()
2101 .expect("Int32Array");
2102 let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), Some(2), None, None]);
2103 assert_eq!(got, &expected);
2104 Ok(())
2105 }
2106
2107 #[test]
2108 #[cfg(feature = "snappy")]
2110 fn test_nullable_impala_roundtrip() -> Result<(), ArrowError> {
2111 let path = arrow_test_data("avro/nullable.impala.avro");
2112 let rdr_file = File::open(&path).expect("open avro/nullable.impala.avro");
2113 let reader = ReaderBuilder::new()
2114 .build(BufReader::new(rdr_file))
2115 .expect("build reader for nullable.impala.avro");
2116 let in_schema = reader.schema();
2117 assert!(
2118 in_schema.fields().iter().any(|f| f.is_nullable()),
2119 "expected at least one nullable field in avro/nullable.impala.avro"
2120 );
2121 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2122 let original =
2123 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2124 let buffer: Vec<u8> = Vec::new();
2125 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
2126 writer.write(&original)?;
2127 writer.finish()?;
2128 let out_bytes = writer.into_inner();
2129 let rt_reader = ReaderBuilder::new()
2130 .build(Cursor::new(out_bytes))
2131 .expect("build reader for round-tripped in-memory OCF");
2132 let rt_schema = rt_reader.schema();
2133 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2134 let roundtrip =
2135 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2136 assert_eq!(
2137 roundtrip, original,
2138 "Round-trip Avro data mismatch for nullable.impala.avro"
2139 );
2140 Ok(())
2141 }
2142
2143 #[test]
2144 #[cfg(feature = "snappy")]
2145 fn test_datapage_v2_roundtrip() -> Result<(), ArrowError> {
2146 let path = arrow_test_data("avro/datapage_v2.snappy.avro");
2147 let rdr_file = File::open(&path).expect("open avro/datapage_v2.snappy.avro");
2148 let reader = ReaderBuilder::new()
2149 .build(BufReader::new(rdr_file))
2150 .expect("build reader for datapage_v2.snappy.avro");
2151 let in_schema = reader.schema();
2152 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2153 let original =
2154 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2155 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2156 writer.write(&original)?;
2157 writer.finish()?;
2158 let bytes = writer.into_inner();
2159 let rt_reader = ReaderBuilder::new()
2160 .build(Cursor::new(bytes))
2161 .expect("build round-trip reader");
2162 let rt_schema = rt_reader.schema();
2163 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2164 let round_trip =
2165 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2166 assert_eq!(
2167 round_trip, original,
2168 "Round-trip batch mismatch for datapage_v2.snappy.avro"
2169 );
2170 Ok(())
2171 }
2172
2173 #[test]
2174 #[cfg(feature = "snappy")]
2175 fn test_single_nan_roundtrip() -> Result<(), ArrowError> {
2176 let path = arrow_test_data("avro/single_nan.avro");
2177 let in_file = File::open(&path).expect("open avro/single_nan.avro");
2178 let reader = ReaderBuilder::new()
2179 .build(BufReader::new(in_file))
2180 .expect("build reader for single_nan.avro");
2181 let in_schema = reader.schema();
2182 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2183 let original =
2184 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2185 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2186 writer.write(&original)?;
2187 writer.finish()?;
2188 let bytes = writer.into_inner();
2189 let rt_reader = ReaderBuilder::new()
2190 .build(Cursor::new(bytes))
2191 .expect("build round_trip reader");
2192 let rt_schema = rt_reader.schema();
2193 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2194 let round_trip =
2195 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2196 assert_eq!(
2197 round_trip, original,
2198 "Round-trip batch mismatch for avro/single_nan.avro"
2199 );
2200 Ok(())
2201 }
2202 #[test]
2203 #[cfg(feature = "snappy")]
2205 fn test_dict_pages_offset_zero_roundtrip() -> Result<(), ArrowError> {
2206 let path = arrow_test_data("avro/dict-page-offset-zero.avro");
2207 let rdr_file = File::open(&path).expect("open avro/dict-page-offset-zero.avro");
2208 let reader = ReaderBuilder::new()
2209 .build(BufReader::new(rdr_file))
2210 .expect("build reader for dict-page-offset-zero.avro");
2211 let in_schema = reader.schema();
2212 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2213 let original =
2214 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2215 let buffer: Vec<u8> = Vec::new();
2216 let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
2217 writer.write(&original)?;
2218 writer.finish()?;
2219 let bytes = writer.into_inner();
2220 let rt_reader = ReaderBuilder::new()
2221 .build(Cursor::new(bytes))
2222 .expect("build reader for round-trip");
2223 let rt_schema = rt_reader.schema();
2224 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2225 let roundtrip =
2226 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2227 assert_eq!(
2228 roundtrip, original,
2229 "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
2230 );
2231 Ok(())
2232 }
2233
2234 #[test]
2235 #[cfg(feature = "snappy")]
2236 fn test_repeated_no_annotation_roundtrip() -> Result<(), ArrowError> {
2237 let path = arrow_test_data("avro/repeated_no_annotation.avro");
2238 let in_file = File::open(&path).expect("open avro/repeated_no_annotation.avro");
2239 let reader = ReaderBuilder::new()
2240 .build(BufReader::new(in_file))
2241 .expect("build reader for repeated_no_annotation.avro");
2242 let in_schema = reader.schema();
2243 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2244 let original =
2245 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2246 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2247 writer.write(&original)?;
2248 writer.finish()?;
2249 let bytes = writer.into_inner();
2250 let rt_reader = ReaderBuilder::new()
2251 .build(Cursor::new(bytes))
2252 .expect("build reader for round-trip buffer");
2253 let rt_schema = rt_reader.schema();
2254 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2255 let round_trip =
2256 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round-trip");
2257 assert_eq!(
2258 round_trip, original,
2259 "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
2260 );
2261 Ok(())
2262 }
2263
2264 #[test]
2265 fn test_nested_record_type_reuse_roundtrip() -> Result<(), ArrowError> {
2266 let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2267 .join("test/data/nested_record_reuse.avro")
2268 .to_string_lossy()
2269 .into_owned();
2270 let in_file = File::open(&path).expect("open avro/nested_record_reuse.avro");
2271 let reader = ReaderBuilder::new()
2272 .build(BufReader::new(in_file))
2273 .expect("build reader for nested_record_reuse.avro");
2274 let in_schema = reader.schema();
2275 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2276 let input =
2277 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2278 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2279 writer.write(&input)?;
2280 writer.finish()?;
2281 let bytes = writer.into_inner();
2282 let rt_reader = ReaderBuilder::new()
2283 .build(Cursor::new(bytes))
2284 .expect("build round_trip reader");
2285 let rt_schema = rt_reader.schema();
2286 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2287 let round_trip =
2288 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2289 assert_eq!(
2290 round_trip, input,
2291 "Round-trip batch mismatch for nested_record_reuse.avro"
2292 );
2293 Ok(())
2294 }
2295
2296 #[test]
2297 fn test_enum_type_reuse_roundtrip() -> Result<(), ArrowError> {
2298 let path =
2299 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
2300 let rdr_file = std::fs::File::open(&path).expect("open test/data/enum_reuse.avro");
2301 let reader = ReaderBuilder::new()
2302 .build(std::io::BufReader::new(rdr_file))
2303 .expect("build reader for enum_reuse.avro");
2304 let in_schema = reader.schema();
2305 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2306 let original =
2307 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2308 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2309 writer.write(&original)?;
2310 writer.finish()?;
2311 let bytes = writer.into_inner();
2312 let rt_reader = ReaderBuilder::new()
2313 .build(std::io::Cursor::new(bytes))
2314 .expect("build round_trip reader");
2315 let rt_schema = rt_reader.schema();
2316 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2317 let round_trip =
2318 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2319 assert_eq!(
2320 round_trip, original,
2321 "Avro enum type reuse round-trip mismatch"
2322 );
2323 Ok(())
2324 }
2325
2326 #[test]
2327 fn comprehensive_e2e_test_roundtrip() -> Result<(), ArrowError> {
2328 let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2329 .join("test/data/comprehensive_e2e.avro");
2330 let rdr_file = File::open(&path).expect("open test/data/comprehensive_e2e.avro");
2331 let reader = ReaderBuilder::new()
2332 .build(BufReader::new(rdr_file))
2333 .expect("build reader for comprehensive_e2e.avro");
2334 let in_schema = reader.schema();
2335 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2336 let original =
2337 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2338 let sink: Vec<u8> = Vec::new();
2339 let mut writer = AvroWriter::new(sink, original.schema().as_ref().clone())?;
2340 writer.write(&original)?;
2341 writer.finish()?;
2342 let bytes = writer.into_inner();
2343 let rt_reader = ReaderBuilder::new()
2344 .build(Cursor::new(bytes))
2345 .expect("build round-trip reader");
2346 let rt_schema = rt_reader.schema();
2347 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2348 let roundtrip =
2349 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2350 assert_eq!(
2351 roundtrip, original,
2352 "Round-trip batch mismatch for comprehensive_e2e.avro"
2353 );
2354 Ok(())
2355 }
2356
2357 #[test]
2358 fn test_roundtrip_new_time_encoders_writer() -> Result<(), ArrowError> {
2359 let schema = Schema::new(vec![
2360 Field::new("d32", DataType::Date32, false),
2361 Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), false),
2362 Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), false),
2363 Field::new(
2364 "ts_ms",
2365 DataType::Timestamp(TimeUnit::Millisecond, None),
2366 false,
2367 ),
2368 Field::new(
2369 "ts_us",
2370 DataType::Timestamp(TimeUnit::Microsecond, None),
2371 false,
2372 ),
2373 Field::new(
2374 "ts_ns",
2375 DataType::Timestamp(TimeUnit::Nanosecond, None),
2376 false,
2377 ),
2378 ]);
2379 let d32 = Date32Array::from(vec![0, 1, -1]);
2380 let t32_ms: PrimitiveArray<Time32MillisecondType> =
2381 vec![0_i32, 12_345_i32, 86_399_999_i32].into();
2382 let t64_us: PrimitiveArray<Time64MicrosecondType> =
2383 vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
2384 let ts_ms: PrimitiveArray<TimestampMillisecondType> =
2385 vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
2386 let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2387 let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2388 let batch = RecordBatch::try_new(
2389 Arc::new(schema.clone()),
2390 vec![
2391 Arc::new(d32) as ArrayRef,
2392 Arc::new(t32_ms) as ArrayRef,
2393 Arc::new(t64_us) as ArrayRef,
2394 Arc::new(ts_ms) as ArrayRef,
2395 Arc::new(ts_us) as ArrayRef,
2396 Arc::new(ts_ns) as ArrayRef,
2397 ],
2398 )?;
2399 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2400 writer.write(&batch)?;
2401 writer.finish()?;
2402 let bytes = writer.into_inner();
2403 let rt_reader = ReaderBuilder::new()
2404 .build(std::io::Cursor::new(bytes))
2405 .expect("build reader for round-trip of new time encoders");
2406 let rt_schema = rt_reader.schema();
2407 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2408 let roundtrip =
2409 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2410 assert_eq!(roundtrip, batch);
2411 Ok(())
2412 }
2413}