1use crate::codec::AvroFieldBuilder;
147use crate::compression::CompressionCodec;
148use crate::errors::AvroError;
149use crate::schema::{
150 AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY,
151};
152use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
153use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
154use arrow_array::RecordBatch;
155use arrow_schema::{Schema, SchemaRef};
156use bytes::{Bytes, BytesMut};
157use std::io::Write;
158use std::sync::Arc;
159
160mod encoder;
162pub mod format;
164
165#[derive(Debug, Clone)]
176pub struct EncodedRows {
177 data: Bytes,
178 offsets: Vec<usize>,
179}
180
181impl EncodedRows {
182 pub fn new(data: Bytes, offsets: Vec<usize>) -> Self {
187 Self { data, offsets }
188 }
189
190 #[inline]
192 pub fn len(&self) -> usize {
193 self.offsets.len().saturating_sub(1)
194 }
195
196 #[inline]
198 pub fn is_empty(&self) -> bool {
199 self.len() == 0
200 }
201
202 #[inline]
211 pub fn bytes(&self) -> &Bytes {
212 &self.data
213 }
214
215 #[inline]
220 pub fn offsets(&self) -> &[usize] {
221 &self.offsets
222 }
223
224 pub fn row(&self, n: usize) -> Result<Bytes, AvroError> {
256 if n >= self.len() {
257 return Err(AvroError::General(format!(
258 "Row index {n} out of bounds for len {}",
259 self.len()
260 )));
261 }
262 let (start, end) = unsafe {
267 (
268 *self.offsets.get_unchecked(n),
269 *self.offsets.get_unchecked(n + 1),
270 )
271 };
272 if start > end || end > self.data.len() {
273 return Err(AvroError::General(format!(
274 "Invalid row offsets for row {n}: start={start}, end={end}, data_len={}",
275 self.data.len()
276 )));
277 }
278 Ok(self.data.slice(start..end))
279 }
280
281 #[inline]
311 pub fn iter(&self) -> impl ExactSizeIterator<Item = Bytes> + '_ {
312 self.offsets.windows(2).map(|w| self.data.slice(w[0]..w[1]))
313 }
314}
315
316#[derive(Debug, Clone)]
318pub struct WriterBuilder {
319 schema: Schema,
320 codec: Option<CompressionCodec>,
321 row_capacity: Option<usize>,
322 capacity: usize,
323 fingerprint_strategy: Option<FingerprintStrategy>,
324}
325
326impl WriterBuilder {
327 pub fn new(schema: Schema) -> Self {
334 Self {
335 schema,
336 codec: None,
337 row_capacity: None,
338 capacity: 1024,
339 fingerprint_strategy: None,
340 }
341 }
342
343 pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self {
346 self.fingerprint_strategy = Some(strategy);
347 self
348 }
349
350 pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self {
352 self.codec = codec;
353 self
354 }
355
356 pub fn with_capacity(mut self, capacity: usize) -> Self {
360 self.capacity = capacity;
361 self
362 }
363
364 pub fn with_row_capacity(mut self, capacity: usize) -> Self {
369 self.row_capacity = Some(capacity);
370 self
371 }
372
373 fn prepare_encoder<F: AvroFormat>(&self) -> Result<(Arc<Schema>, RecordEncoder), AvroError> {
374 let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
375 Some(json) => AvroSchema::new(json.clone()),
376 None => AvroSchema::try_from(&self.schema)?,
377 };
378 let maybe_fingerprint = if F::NEEDS_PREFIX {
379 match &self.fingerprint_strategy {
380 Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(*id)),
381 Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(*id)),
382 Some(strategy) => {
383 Some(avro_schema.fingerprint(FingerprintAlgorithm::from(*strategy))?)
384 }
385 None => Some(
386 avro_schema
387 .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?,
388 ),
389 }
390 } else {
391 None
392 };
393 let mut md = self.schema.metadata().clone();
394 md.insert(
395 SCHEMA_METADATA_KEY.to_string(),
396 avro_schema.clone().json_string,
397 );
398 let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
399 let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
400 let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
401 .with_fingerprint(maybe_fingerprint)
402 .build()?;
403 Ok((schema, encoder))
404 }
405
406 pub fn build_encoder<F: AvroFormat>(self) -> Result<Encoder, AvroError> {
411 if F::default().sync_marker().is_some() {
412 return Err(AvroError::InvalidArgument(
413 "Encoder only supports stream formats (no OCF header/sync marker)".to_string(),
414 ));
415 }
416 let (schema, encoder) = self.prepare_encoder::<F>()?;
417 Ok(Encoder {
418 schema,
419 encoder,
420 row_capacity: self.row_capacity,
421 buffer: BytesMut::with_capacity(self.capacity),
422 offsets: vec![0],
423 })
424 }
425
426 pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, AvroError>
428 where
429 W: Write,
430 F: AvroFormat,
431 {
432 let mut format = F::default();
433 if format.sync_marker().is_none() && !F::NEEDS_PREFIX {
434 return Err(AvroError::InvalidArgument(
435 "AvroBinaryFormat is only supported with Encoder, use build_encoder instead"
436 .to_string(),
437 ));
438 }
439 let (schema, encoder) = self.prepare_encoder::<F>()?;
440 format.start_stream(&mut writer, &schema, self.codec)?;
441 Ok(Writer {
442 writer,
443 schema,
444 format,
445 compression: self.codec,
446 capacity: self.capacity,
447 encoder,
448 })
449 }
450}
451
452#[derive(Debug)]
498pub struct Encoder {
499 schema: SchemaRef,
500 encoder: RecordEncoder,
501 row_capacity: Option<usize>,
502 buffer: BytesMut,
503 offsets: Vec<usize>,
504}
505
506impl Encoder {
507 pub fn encode(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
509 if batch.schema().fields() != self.schema.fields() {
510 return Err(AvroError::SchemaError(
511 "Schema of RecordBatch differs from Writer schema".to_string(),
512 ));
513 }
514 self.encoder.encode_rows(
515 batch,
516 self.row_capacity.unwrap_or(0),
517 &mut self.buffer,
518 &mut self.offsets,
519 )?;
520 Ok(())
521 }
522
523 pub fn encode_batches(&mut self, batches: &[RecordBatch]) -> Result<(), AvroError> {
525 for b in batches {
526 self.encode(b)?;
527 }
528 Ok(())
529 }
530
531 pub fn flush(&mut self) -> EncodedRows {
535 let data = self.buffer.split().freeze();
536 let mut offsets = Vec::with_capacity(self.offsets.len());
537 offsets.append(&mut self.offsets);
538 self.offsets.push(0);
539 EncodedRows::new(data, offsets)
540 }
541
542 pub fn schema(&self) -> SchemaRef {
547 self.schema.clone()
548 }
549
550 pub fn buffered_len(&self) -> usize {
552 self.offsets.len().saturating_sub(1)
553 }
554}
555
556#[derive(Debug)]
564pub struct Writer<W: Write, F: AvroFormat> {
565 writer: W,
566 schema: SchemaRef,
567 format: F,
568 compression: Option<CompressionCodec>,
569 capacity: usize,
570 encoder: RecordEncoder,
571}
572
573pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
614
615pub type AvroStreamWriter<W> = Writer<W, AvroSoeFormat>;
647
648impl<W: Write> Writer<W, AvroOcfFormat> {
649 pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
675 WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
676 }
677
678 pub fn sync_marker(&self) -> Option<&[u8; 16]> {
680 self.format.sync_marker()
681 }
682}
683
684impl<W: Write> Writer<W, AvroSoeFormat> {
685 pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
713 WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
714 }
715}
716
717impl<W: Write, F: AvroFormat> Writer<W, F> {
718 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
720 if batch.schema().fields() != self.schema.fields() {
721 return Err(AvroError::SchemaError(
722 "Schema of RecordBatch differs from Writer schema".to_string(),
723 ));
724 }
725 match self.format.sync_marker() {
726 Some(&sync) => self.write_ocf_block(batch, &sync),
727 None => self.write_stream(batch),
728 }
729 }
730
731 pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), AvroError> {
735 for b in batches {
736 self.write(b)?;
737 }
738 Ok(())
739 }
740
741 pub fn finish(&mut self) -> Result<(), AvroError> {
743 self.writer
744 .flush()
745 .map_err(|e| AvroError::IoError(format!("Error flushing writer: {e}"), e))
746 }
747
748 pub fn into_inner(self) -> W {
750 self.writer
751 }
752
753 fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), AvroError> {
754 let mut buf = Vec::<u8>::with_capacity(self.capacity);
755 self.encoder.encode(&mut buf, batch)?;
756 let encoded = match self.compression {
757 Some(codec) => codec.compress(&buf)?,
758 None => buf,
759 };
760 write_long(&mut self.writer, batch.num_rows() as i64)?;
761 write_long(&mut self.writer, encoded.len() as i64)?;
762 self.writer
763 .write_all(&encoded)
764 .map_err(|e| AvroError::IoError(format!("Error writing Avro block: {e}"), e))?;
765 self.writer
766 .write_all(sync)
767 .map_err(|e| AvroError::IoError(format!("Error writing Avro sync: {e}"), e))?;
768 Ok(())
769 }
770
771 fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
772 self.encoder.encode(&mut self.writer, batch)?;
773 Ok(())
774 }
775}
776
777#[cfg(test)]
778mod tests {
779 use super::*;
780 use crate::compression::CompressionCodec;
781 use crate::reader::ReaderBuilder;
782 use crate::schema::{AvroSchema, SchemaStore};
783 use crate::test_util::arrow_test_data;
784 use arrow::datatypes::TimeUnit;
785 use arrow::util::pretty::pretty_format_batches;
786 use arrow_array::builder::{Int32Builder, ListBuilder};
787 #[cfg(feature = "avro_custom_types")]
788 use arrow_array::types::{Int16Type, Int32Type, Int64Type};
789 use arrow_array::types::{
790 Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
791 TimestampMillisecondType, TimestampNanosecondType,
792 };
793 use arrow_array::{
794 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Int32Array, Int64Array,
795 PrimitiveArray, RecordBatch, StringArray, StructArray, UnionArray,
796 };
797 #[cfg(feature = "avro_custom_types")]
798 use arrow_array::{Int16Array, RunArray};
799 use arrow_schema::UnionMode;
800 #[cfg(not(feature = "avro_custom_types"))]
801 use arrow_schema::{DataType, Field, Schema};
802 #[cfg(feature = "avro_custom_types")]
803 use arrow_schema::{DataType, Field, Schema};
804 use bytes::BytesMut;
805 use std::collections::HashMap;
806 use std::collections::HashSet;
807 use std::fs::File;
808 use std::io::{BufReader, Cursor};
809 use std::path::PathBuf;
810 use std::sync::Arc;
811 use tempfile::NamedTempFile;
812
813 fn files() -> impl Iterator<Item = &'static str> {
814 [
815 #[cfg(feature = "snappy")]
817 "avro/alltypes_plain.avro",
818 #[cfg(feature = "snappy")]
819 "avro/alltypes_plain.snappy.avro",
820 #[cfg(feature = "zstd")]
821 "avro/alltypes_plain.zstandard.avro",
822 #[cfg(feature = "bzip2")]
823 "avro/alltypes_plain.bzip2.avro",
824 #[cfg(feature = "xz")]
825 "avro/alltypes_plain.xz.avro",
826 ]
827 .into_iter()
828 }
829
830 fn make_schema() -> Schema {
831 Schema::new(vec![
832 Field::new("id", DataType::Int32, false),
833 Field::new("name", DataType::Binary, false),
834 ])
835 }
836
837 fn make_batch() -> RecordBatch {
838 let ids = Int32Array::from(vec![1, 2, 3]);
839 let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]);
840 RecordBatch::try_new(
841 Arc::new(make_schema()),
842 vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
843 )
844 .expect("failed to build test RecordBatch")
845 }
846
847 #[test]
848 fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), AvroError> {
849 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
850 let batch = RecordBatch::try_new(
851 Arc::new(schema.clone()),
852 vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
853 )?;
854 let buf: Vec<u8> = Vec::new();
855 let mut writer = AvroStreamWriter::new(buf, schema.clone())?;
856 writer.write(&batch)?;
857 let encoded = writer.into_inner();
858 let mut store = SchemaStore::new(); let avro_schema = AvroSchema::try_from(&schema)?;
860 let _fp = store.register(avro_schema)?;
861 let mut decoder = ReaderBuilder::new()
862 .with_writer_schema_store(store)
863 .build_decoder()?;
864 let _consumed = decoder.decode(&encoded)?;
865 let decoded = decoder
866 .flush()?
867 .expect("expected at least one batch from decoder");
868 assert_eq!(decoded.num_columns(), 1);
869 assert_eq!(decoded.num_rows(), 2);
870 let col = decoded
871 .column(0)
872 .as_any()
873 .downcast_ref::<Int32Array>()
874 .expect("int column");
875 assert_eq!(col, &Int32Array::from(vec![10, 20]));
876 Ok(())
877 }
878
879 #[test]
880 fn test_nullable_struct_with_nonnullable_field_sliced_encoding() {
881 use arrow_array::{ArrayRef, Int32Array, StringArray, StructArray};
882 use arrow_buffer::NullBuffer;
883 use arrow_schema::{DataType, Field, Fields, Schema};
884 use std::sync::Arc;
885 let inner_fields = Fields::from(vec![
886 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, true), ]);
889 let inner_struct_type = DataType::Struct(inner_fields.clone());
890 let schema = Schema::new(vec![
891 Field::new("before", inner_struct_type.clone(), true), Field::new("after", inner_struct_type.clone(), true), Field::new("op", DataType::Utf8, false), ]);
895 let before_ids = Int32Array::from(vec![None, None]);
896 let before_names = StringArray::from(vec![None::<&str>, None]);
897 let before_struct = StructArray::new(
898 inner_fields.clone(),
899 vec![
900 Arc::new(before_ids) as ArrayRef,
901 Arc::new(before_names) as ArrayRef,
902 ],
903 Some(NullBuffer::from(vec![false, false])),
904 );
905 let after_ids = Int32Array::from(vec![1, 2]); let after_names = StringArray::from(vec![Some("Alice"), Some("Bob")]);
907 let after_struct = StructArray::new(
908 inner_fields.clone(),
909 vec![
910 Arc::new(after_ids) as ArrayRef,
911 Arc::new(after_names) as ArrayRef,
912 ],
913 Some(NullBuffer::from(vec![true, true])),
914 );
915 let op_col = StringArray::from(vec!["r", "r"]);
916 let batch = RecordBatch::try_new(
917 Arc::new(schema.clone()),
918 vec![
919 Arc::new(before_struct) as ArrayRef,
920 Arc::new(after_struct) as ArrayRef,
921 Arc::new(op_col) as ArrayRef,
922 ],
923 )
924 .expect("failed to create test batch");
925 let mut sink = Vec::new();
926 let mut writer = WriterBuilder::new(schema)
927 .with_fingerprint_strategy(FingerprintStrategy::Id(1))
928 .build::<_, AvroSoeFormat>(&mut sink)
929 .expect("failed to create writer");
930 for row_idx in 0..batch.num_rows() {
931 let single_row = batch.slice(row_idx, 1);
932 let after_col = single_row.column(1);
933 assert_eq!(
934 after_col.null_count(),
935 0,
936 "after column should have no nulls in sliced row"
937 );
938 writer
939 .write(&single_row)
940 .unwrap_or_else(|e| panic!("Failed to encode row {row_idx}: {e}"));
941 }
942 writer.finish().expect("failed to finish writer");
943 assert!(!sink.is_empty(), "encoded output should not be empty");
944 }
945
946 #[test]
947 fn test_nullable_struct_with_decimal_and_timestamp_sliced() {
948 use arrow_array::{
949 ArrayRef, Decimal128Array, Int32Array, StringArray, StructArray,
950 TimestampMicrosecondArray,
951 };
952 use arrow_buffer::NullBuffer;
953 use arrow_schema::{DataType, Field, Fields, Schema};
954 use std::sync::Arc;
955 let row_fields = Fields::from(vec![
956 Field::new("id", DataType::Int32, false),
957 Field::new("name", DataType::Utf8, true),
958 Field::new("category", DataType::Utf8, true),
959 Field::new("price", DataType::Decimal128(10, 2), true),
960 Field::new("stock_quantity", DataType::Int32, true),
961 Field::new(
962 "created_at",
963 DataType::Timestamp(TimeUnit::Microsecond, None),
964 true,
965 ),
966 ]);
967 let row_struct_type = DataType::Struct(row_fields.clone());
968 let schema = Schema::new(vec![
969 Field::new("before", row_struct_type.clone(), true),
970 Field::new("after", row_struct_type.clone(), true),
971 Field::new("op", DataType::Utf8, false),
972 ]);
973 let before_struct = StructArray::new_null(row_fields.clone(), 2);
974 let ids = Int32Array::from(vec![1, 2]);
975 let names = StringArray::from(vec![Some("Widget"), Some("Gadget")]);
976 let categories = StringArray::from(vec![Some("Electronics"), Some("Electronics")]);
977 let prices = Decimal128Array::from(vec![Some(1999), Some(2999)])
978 .with_precision_and_scale(10, 2)
979 .unwrap();
980 let quantities = Int32Array::from(vec![Some(100), Some(50)]);
981 let timestamps = TimestampMicrosecondArray::from(vec![
982 Some(1700000000000000i64),
983 Some(1700000001000000i64),
984 ]);
985 let after_struct = StructArray::new(
986 row_fields.clone(),
987 vec![
988 Arc::new(ids) as ArrayRef,
989 Arc::new(names) as ArrayRef,
990 Arc::new(categories) as ArrayRef,
991 Arc::new(prices) as ArrayRef,
992 Arc::new(quantities) as ArrayRef,
993 Arc::new(timestamps) as ArrayRef,
994 ],
995 Some(NullBuffer::from(vec![true, true])),
996 );
997 let op_col = StringArray::from(vec!["r", "r"]);
998 let batch = RecordBatch::try_new(
999 Arc::new(schema.clone()),
1000 vec![
1001 Arc::new(before_struct) as ArrayRef,
1002 Arc::new(after_struct) as ArrayRef,
1003 Arc::new(op_col) as ArrayRef,
1004 ],
1005 )
1006 .expect("failed to create products batch");
1007 let mut sink = Vec::new();
1008 let mut writer = WriterBuilder::new(schema)
1009 .with_fingerprint_strategy(FingerprintStrategy::Id(1))
1010 .build::<_, AvroSoeFormat>(&mut sink)
1011 .expect("failed to create writer");
1012 for row_idx in 0..batch.num_rows() {
1014 let single_row = batch.slice(row_idx, 1);
1015 writer
1016 .write(&single_row)
1017 .unwrap_or_else(|e| panic!("Failed to encode product row {row_idx}: {e}"));
1018 }
1019 writer.finish().expect("failed to finish writer");
1020 assert!(!sink.is_empty());
1021 }
1022
1023 #[test]
1024 fn non_nullable_child_in_nullable_struct_should_encode_per_row() {
1025 use arrow_array::{
1026 ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
1027 };
1028 use arrow_schema::{DataType, Field, Fields, Schema};
1029 use std::sync::Arc;
1030 let row_fields = Fields::from(vec![
1031 Field::new("id", DataType::Int32, false),
1032 Field::new("name", DataType::Utf8, true),
1033 ]);
1034 let row_struct_dt = DataType::Struct(row_fields.clone());
1035 let before: ArrayRef = Arc::new(StructArray::new_null(row_fields.clone(), 1));
1036 let id_col: ArrayRef = Arc::new(Int32Array::from(vec![1]));
1037 let name_col: ArrayRef = Arc::new(StringArray::from(vec![None::<&str>]));
1038 let after: ArrayRef = Arc::new(StructArray::new(
1039 row_fields.clone(),
1040 vec![id_col, name_col],
1041 None,
1042 ));
1043 let schema = Arc::new(Schema::new(vec![
1044 Field::new("before", row_struct_dt.clone(), true),
1045 Field::new("after", row_struct_dt, true),
1046 Field::new("op", DataType::Utf8, false),
1047 Field::new("ts_ms", DataType::Int64, false),
1048 ]));
1049 let op = Arc::new(StringArray::from(vec!["r"])) as ArrayRef;
1050 let ts_ms = Arc::new(Int64Array::from(vec![1732900000000_i64])) as ArrayRef;
1051 let batch = RecordBatch::try_new(schema.clone(), vec![before, after, op, ts_ms]).unwrap();
1052 let mut buf = Vec::new();
1053 let mut writer = WriterBuilder::new(schema.as_ref().clone())
1054 .build::<_, AvroSoeFormat>(&mut buf)
1055 .unwrap();
1056 let single = batch.slice(0, 1);
1057 let res = writer.write(&single);
1058 assert!(
1059 res.is_ok(),
1060 "expected to encode successfully, got: {:?}",
1061 res.err()
1062 );
1063 }
1064
1065 #[test]
1066 fn test_union_nonzero_type_ids() -> Result<(), AvroError> {
1067 use arrow_array::UnionArray;
1068 use arrow_buffer::Buffer;
1069 use arrow_schema::UnionFields;
1070 let union_fields = UnionFields::try_new(
1071 vec![2, 5],
1072 vec![
1073 Field::new("v_str", DataType::Utf8, true),
1074 Field::new("v_int", DataType::Int32, true),
1075 ],
1076 )
1077 .unwrap();
1078 let strings = StringArray::from(vec!["hello", "world"]);
1079 let ints = Int32Array::from(vec![10, 20, 30]);
1080 let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
1081 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
1082 let union_array = UnionArray::try_new(
1083 union_fields.clone(),
1084 type_ids.into(),
1085 Some(offsets.into()),
1086 vec![Arc::new(strings) as ArrayRef, Arc::new(ints) as ArrayRef],
1087 )?;
1088 let schema = Schema::new(vec![Field::new(
1089 "union_col",
1090 DataType::Union(union_fields, UnionMode::Dense),
1091 false,
1092 )]);
1093 let batch = RecordBatch::try_new(
1094 Arc::new(schema.clone()),
1095 vec![Arc::new(union_array) as ArrayRef],
1096 )?;
1097 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1098 assert!(
1099 writer.write(&batch).is_ok(),
1100 "Expected no error from writing"
1101 );
1102 writer.finish()?;
1103 assert!(
1104 writer.finish().is_ok(),
1105 "Expected no error from finishing writer"
1106 );
1107 Ok(())
1108 }
1109
1110 #[test]
1111 fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), AvroError> {
1112 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1113 let batch = RecordBatch::try_new(
1114 Arc::new(schema.clone()),
1115 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1116 )?;
1117 let schema_id: u32 = 42;
1118 let mut writer = WriterBuilder::new(schema.clone())
1119 .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
1120 .build::<_, AvroSoeFormat>(Vec::new())?;
1121 writer.write(&batch)?;
1122 let encoded = writer.into_inner();
1123 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
1124 let avro_schema = AvroSchema::try_from(&schema)?;
1125 let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?;
1126 let mut decoder = ReaderBuilder::new()
1127 .with_writer_schema_store(store)
1128 .build_decoder()?;
1129 let _ = decoder.decode(&encoded)?;
1130 let decoded = decoder
1131 .flush()?
1132 .expect("expected at least one batch from decoder");
1133 assert_eq!(decoded.num_columns(), 1);
1134 assert_eq!(decoded.num_rows(), 3);
1135 let col = decoded
1136 .column(0)
1137 .as_any()
1138 .downcast_ref::<Int32Array>()
1139 .expect("int column");
1140 assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
1141 Ok(())
1142 }
1143
1144 #[test]
1145 fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), AvroError> {
1146 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1147 let batch = RecordBatch::try_new(
1148 Arc::new(schema.clone()),
1149 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1150 )?;
1151 let schema_id: u64 = 42;
1152 let mut writer = WriterBuilder::new(schema.clone())
1153 .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
1154 .build::<_, AvroSoeFormat>(Vec::new())?;
1155 writer.write(&batch)?;
1156 let encoded = writer.into_inner();
1157 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
1158 let avro_schema = AvroSchema::try_from(&schema)?;
1159 let _ = store.set(Fingerprint::Id64(schema_id), avro_schema)?;
1160 let mut decoder = ReaderBuilder::new()
1161 .with_writer_schema_store(store)
1162 .build_decoder()?;
1163 let _ = decoder.decode(&encoded)?;
1164 let decoded = decoder
1165 .flush()?
1166 .expect("expected at least one batch from decoder");
1167 assert_eq!(decoded.num_columns(), 1);
1168 assert_eq!(decoded.num_rows(), 3);
1169 let col = decoded
1170 .column(0)
1171 .as_any()
1172 .downcast_ref::<Int32Array>()
1173 .expect("int column");
1174 assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
1175 Ok(())
1176 }
1177
1178 #[test]
1179 fn test_ocf_writer_generates_header_and_sync() -> Result<(), AvroError> {
1180 let batch = make_batch();
1181 let buffer: Vec<u8> = Vec::new();
1182 let mut writer = AvroWriter::new(buffer, make_schema())?;
1183 writer.write(&batch)?;
1184 writer.finish()?;
1185 let out = writer.into_inner();
1186 assert_eq!(&out[..4], b"Obj\x01", "OCF magic bytes missing/incorrect");
1187 let trailer = &out[out.len() - 16..];
1188 assert_eq!(trailer.len(), 16, "expected 16‑byte sync marker");
1189 Ok(())
1190 }
1191
1192 #[test]
1193 fn test_schema_mismatch_yields_error() {
1194 let batch = make_batch();
1195 let alt_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
1196 let buffer = Vec::<u8>::new();
1197 let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
1198 let err = writer.write(&batch).unwrap_err();
1199 assert!(matches!(err, AvroError::SchemaError(_)));
1200 }
1201
1202 #[test]
1203 fn test_write_batches_accumulates_multiple() -> Result<(), AvroError> {
1204 let batch1 = make_batch();
1205 let batch2 = make_batch();
1206 let buffer = Vec::<u8>::new();
1207 let mut writer = AvroWriter::new(buffer, make_schema())?;
1208 writer.write_batches(&[&batch1, &batch2])?;
1209 writer.finish()?;
1210 let out = writer.into_inner();
1211 assert!(out.len() > 4, "combined batches produced tiny file");
1212 Ok(())
1213 }
1214
1215 #[test]
1216 fn test_finish_without_write_adds_header() -> Result<(), AvroError> {
1217 let buffer = Vec::<u8>::new();
1218 let mut writer = AvroWriter::new(buffer, make_schema())?;
1219 writer.finish()?;
1220 let out = writer.into_inner();
1221 assert_eq!(&out[..4], b"Obj\x01", "finish() should emit OCF header");
1222 Ok(())
1223 }
1224
1225 #[test]
1226 fn test_write_long_encodes_zigzag_varint() -> Result<(), AvroError> {
1227 let mut buf = Vec::new();
1228 write_long(&mut buf, 0)?;
1229 write_long(&mut buf, -1)?;
1230 write_long(&mut buf, 1)?;
1231 write_long(&mut buf, -2)?;
1232 write_long(&mut buf, 2147483647)?;
1233 assert!(
1234 buf.starts_with(&[0x00, 0x01, 0x02, 0x03]),
1235 "zig‑zag varint encodings incorrect: {buf:?}"
1236 );
1237 Ok(())
1238 }
1239
1240 #[test]
1241 fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), AvroError> {
1242 for rel in files() {
1243 let path = arrow_test_data(rel);
1244 let rdr_file = File::open(&path).expect("open input avro");
1245 let reader = ReaderBuilder::new()
1246 .build(BufReader::new(rdr_file))
1247 .expect("build reader");
1248 let schema = reader.schema();
1249 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1250 let original =
1251 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1252 let tmp = NamedTempFile::new().expect("create temp file");
1253 let out_path = tmp.into_temp_path();
1254 let out_file = File::create(&out_path).expect("create temp avro");
1255 let codec = if rel.contains(".snappy.") {
1256 Some(CompressionCodec::Snappy)
1257 } else if rel.contains(".zstandard.") {
1258 Some(CompressionCodec::ZStandard)
1259 } else if rel.contains(".bzip2.") {
1260 Some(CompressionCodec::Bzip2)
1261 } else if rel.contains(".xz.") {
1262 Some(CompressionCodec::Xz)
1263 } else {
1264 None
1265 };
1266 let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
1267 .with_compression(codec)
1268 .build::<_, AvroOcfFormat>(out_file)?;
1269 writer.write(&original)?;
1270 writer.finish()?;
1271 drop(writer);
1272 let rt_file = File::open(&out_path).expect("open roundtrip avro");
1273 let rt_reader = ReaderBuilder::new()
1274 .build(BufReader::new(rt_file))
1275 .expect("build roundtrip reader");
1276 let rt_schema = rt_reader.schema();
1277 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1278 let roundtrip =
1279 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1280 assert_eq!(
1281 roundtrip, original,
1282 "Round-trip batch mismatch for file: {}",
1283 rel
1284 );
1285 }
1286 Ok(())
1287 }
1288
1289 #[test]
1290 fn test_roundtrip_nested_records_writer() -> Result<(), AvroError> {
1291 let path = arrow_test_data("avro/nested_records.avro");
1292 let rdr_file = File::open(&path).expect("open nested_records.avro");
1293 let reader = ReaderBuilder::new()
1294 .build(BufReader::new(rdr_file))
1295 .expect("build reader for nested_records.avro");
1296 let schema = reader.schema();
1297 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1298 let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
1299 let tmp = NamedTempFile::new().expect("create temp file");
1300 let out_path = tmp.into_temp_path();
1301 {
1302 let out_file = File::create(&out_path).expect("create output avro");
1303 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1304 writer.write(&original)?;
1305 writer.finish()?;
1306 }
1307 let rt_file = File::open(&out_path).expect("open round_trip avro");
1308 let rt_reader = ReaderBuilder::new()
1309 .build(BufReader::new(rt_file))
1310 .expect("build round_trip reader");
1311 let rt_schema = rt_reader.schema();
1312 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1313 let round_trip =
1314 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1315 assert_eq!(
1316 round_trip, original,
1317 "Round-trip batch mismatch for nested_records.avro"
1318 );
1319 Ok(())
1320 }
1321
1322 #[test]
1323 #[cfg(feature = "snappy")]
1324 fn test_roundtrip_nested_lists_writer() -> Result<(), AvroError> {
1325 let path = arrow_test_data("avro/nested_lists.snappy.avro");
1326 let rdr_file = File::open(&path).expect("open nested_lists.snappy.avro");
1327 let reader = ReaderBuilder::new()
1328 .build(BufReader::new(rdr_file))
1329 .expect("build reader for nested_lists.snappy.avro");
1330 let schema = reader.schema();
1331 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1332 let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
1333 let tmp = NamedTempFile::new().expect("create temp file");
1334 let out_path = tmp.into_temp_path();
1335 {
1336 let out_file = File::create(&out_path).expect("create output avro");
1337 let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
1338 .with_compression(Some(CompressionCodec::Snappy))
1339 .build::<_, AvroOcfFormat>(out_file)?;
1340 writer.write(&original)?;
1341 writer.finish()?;
1342 }
1343 let rt_file = File::open(&out_path).expect("open round_trip avro");
1344 let rt_reader = ReaderBuilder::new()
1345 .build(BufReader::new(rt_file))
1346 .expect("build round_trip reader");
1347 let rt_schema = rt_reader.schema();
1348 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1349 let round_trip =
1350 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1351 assert_eq!(
1352 round_trip, original,
1353 "Round-trip batch mismatch for nested_lists.snappy.avro"
1354 );
1355 Ok(())
1356 }
1357
1358 #[test]
1359 fn test_round_trip_simple_fixed_ocf() -> Result<(), AvroError> {
1360 let path = arrow_test_data("avro/simple_fixed.avro");
1361 let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
1362 let reader = ReaderBuilder::new()
1363 .build(BufReader::new(rdr_file))
1364 .expect("build avro reader");
1365 let schema = reader.schema();
1366 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1367 let original =
1368 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1369 let tmp = NamedTempFile::new().expect("create temp file");
1370 let out_file = File::create(tmp.path()).expect("create temp avro");
1371 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1372 writer.write(&original)?;
1373 writer.finish()?;
1374 drop(writer);
1375 let rt_file = File::open(tmp.path()).expect("open round_trip avro");
1376 let rt_reader = ReaderBuilder::new()
1377 .build(BufReader::new(rt_file))
1378 .expect("build round_trip reader");
1379 let rt_schema = rt_reader.schema();
1380 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1381 let round_trip =
1382 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1383 assert_eq!(round_trip, original);
1384 Ok(())
1385 }
1386
1387 #[test]
1389 #[cfg(feature = "canonical_extension_types")]
1390 fn test_round_trip_duration_and_uuid_ocf() -> Result<(), AvroError> {
1391 use arrow_schema::{DataType, IntervalUnit};
1392 let in_file =
1393 File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1394 let reader = ReaderBuilder::new()
1395 .build(BufReader::new(in_file))
1396 .expect("build reader for duration_uuid.avro");
1397 let in_schema = reader.schema();
1398 let has_mdn = in_schema.fields().iter().any(|f| {
1399 matches!(
1400 f.data_type(),
1401 DataType::Interval(IntervalUnit::MonthDayNano)
1402 )
1403 });
1404 assert!(
1405 has_mdn,
1406 "expected at least one Interval(MonthDayNano) field in duration_uuid.avro"
1407 );
1408 let has_uuid_fixed = in_schema
1409 .fields()
1410 .iter()
1411 .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16)));
1412 assert!(
1413 has_uuid_fixed,
1414 "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro"
1415 );
1416 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1417 let input =
1418 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1419 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1421 writer.write(&input)?;
1422 writer.finish()?;
1423 let bytes = writer.into_inner();
1424 let rt_reader = ReaderBuilder::new()
1425 .build(Cursor::new(bytes))
1426 .expect("build round_trip reader");
1427 let rt_schema = rt_reader.schema();
1428 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1429 let round_trip =
1430 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1431 assert_eq!(round_trip, input);
1432 Ok(())
1433 }
1434
1435 #[test]
1437 #[cfg(not(feature = "canonical_extension_types"))]
1438 fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> Result<(), AvroError> {
1439 use arrow::datatypes::{DataType, IntervalUnit};
1440 use std::io::BufReader;
1441
1442 let in_file =
1444 File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1445 let reader = ReaderBuilder::new()
1446 .build(BufReader::new(in_file))
1447 .expect("build reader for duration_uuid.avro");
1448 let in_schema = reader.schema();
1449
1450 assert!(
1452 in_schema.fields().iter().any(|f| {
1453 matches!(
1454 f.data_type(),
1455 DataType::Interval(IntervalUnit::MonthDayNano)
1456 )
1457 }),
1458 "expected at least one Interval(MonthDayNano) field"
1459 );
1460 assert!(
1461 in_schema
1462 .fields()
1463 .iter()
1464 .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))),
1465 "expected a FixedSizeBinary(16) field (uuid)"
1466 );
1467
1468 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1469 let input =
1470 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1471
1472 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1474 writer.write(&input)?;
1475 writer.finish()?;
1476 let bytes = writer.into_inner();
1477 let rt_reader = ReaderBuilder::new()
1478 .build(Cursor::new(bytes))
1479 .expect("build round_trip reader");
1480 let rt_schema = rt_reader.schema();
1481 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1482 let round_trip =
1483 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1484
1485 assert_eq!(
1487 round_trip.column(0),
1488 input.column(0),
1489 "duration column values differ"
1490 );
1491 assert_eq!(round_trip.column(1), input.column(1), "uuid bytes differ");
1492
1493 let uuid_rt = rt_schema.field_with_name("uuid_field")?;
1496 assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
1497 assert_eq!(
1498 uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
1499 Some("uuid"),
1500 "expected `logicalType = \"uuid\"` on round-tripped field metadata"
1501 );
1502
1503 let dur_rt = rt_schema.field_with_name("duration_field")?;
1505 assert!(matches!(
1506 dur_rt.data_type(),
1507 DataType::Interval(IntervalUnit::MonthDayNano)
1508 ));
1509
1510 Ok(())
1511 }
1512
1513 #[test]
1517 #[cfg(feature = "snappy")]
1519 fn test_nonnullable_impala_roundtrip_writer() -> Result<(), AvroError> {
1520 let path = arrow_test_data("avro/nonnullable.impala.avro");
1522 let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro");
1523 let reader = ReaderBuilder::new()
1524 .build(BufReader::new(rdr_file))
1525 .expect("build reader for nonnullable.impala.avro");
1526 let in_schema = reader.schema();
1528 let has_map = in_schema
1530 .fields()
1531 .iter()
1532 .any(|f| matches!(f.data_type(), DataType::Map(_, _)));
1533 assert!(
1534 has_map,
1535 "expected at least one Map field in avro/nonnullable.impala.avro"
1536 );
1537
1538 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1539 let original =
1540 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1541 let buffer = Vec::<u8>::new();
1543 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1544 writer.write(&original)?;
1545 writer.finish()?;
1546 let out_bytes = writer.into_inner();
1547 let rt_reader = ReaderBuilder::new()
1549 .build(Cursor::new(out_bytes))
1550 .expect("build reader for round-tripped in-memory OCF");
1551 let rt_schema = rt_reader.schema();
1552 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1553 let roundtrip =
1554 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1555 assert_eq!(
1557 roundtrip, original,
1558 "Round-trip Avro map data mismatch for nonnullable.impala.avro"
1559 );
1560 Ok(())
1561 }
1562
1563 #[test]
1564 #[cfg(feature = "snappy")]
1566 fn test_roundtrip_decimals_via_writer() -> Result<(), AvroError> {
1567 let files: [(&str, bool); 8] = [
1569 ("avro/fixed_length_decimal.avro", true), ("avro/fixed_length_decimal_legacy.avro", true), ("avro/int32_decimal.avro", true), ("avro/int64_decimal.avro", true), ("test/data/int256_decimal.avro", false), ("test/data/fixed256_decimal.avro", false), ("test/data/fixed_length_decimal_legacy_32.avro", false), ("test/data/int128_decimal.avro", false), ];
1578 for (rel, in_test_data_dir) in files {
1579 let path: String = if in_test_data_dir {
1581 arrow_test_data(rel)
1582 } else {
1583 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1584 .join(rel)
1585 .to_string_lossy()
1586 .into_owned()
1587 };
1588 let f_in = File::open(&path).expect("open input avro");
1590 let rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
1591 let in_schema = rdr.schema();
1592 let in_batches = rdr.collect::<Result<Vec<_>, _>>()?;
1593 let original =
1594 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
1595 let tmp = NamedTempFile::new().expect("create temp file");
1597 let out_path = tmp.into_temp_path();
1598 let out_file = File::create(&out_path).expect("create temp avro");
1599 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1600 writer.write(&original)?;
1601 writer.finish()?;
1602 let f_rt = File::open(&out_path).expect("open roundtrip avro");
1604 let rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
1605 let rt_schema = rt_rdr.schema();
1606 let rt_batches = rt_rdr.collect::<Result<Vec<_>, _>>()?;
1607 let roundtrip =
1608 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt");
1609 assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}");
1610 }
1611 Ok(())
1612 }
1613
1614 #[test]
1615 fn test_named_types_complex_roundtrip() -> Result<(), AvroError> {
1616 let path =
1618 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
1619 let rdr_file = File::open(&path).expect("open avro/named_types_complex.avro");
1620
1621 let reader = ReaderBuilder::new()
1622 .build(BufReader::new(rdr_file))
1623 .expect("build reader for named_types_complex.avro");
1624
1625 let in_schema = reader.schema();
1627 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1628 let original =
1629 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1630
1631 {
1633 let arrow_schema = original.schema();
1634
1635 let author_field = arrow_schema.field_with_name("author")?;
1637 let author_type = author_field.data_type();
1638 let editors_field = arrow_schema.field_with_name("editors")?;
1639 let editors_item_type = match editors_field.data_type() {
1640 DataType::List(item_field) => item_field.data_type(),
1641 other => panic!("Editors field should be a List, but was {:?}", other),
1642 };
1643 assert_eq!(
1644 author_type, editors_item_type,
1645 "The DataType for the 'author' struct and the 'editors' list items must be identical"
1646 );
1647
1648 let status_field = arrow_schema.field_with_name("status")?;
1650 let status_type = status_field.data_type();
1651 assert!(
1652 matches!(status_type, DataType::Dictionary(_, _)),
1653 "Status field should be a Dictionary (Enum)"
1654 );
1655
1656 let prev_status_field = arrow_schema.field_with_name("previous_status")?;
1657 let prev_status_type = prev_status_field.data_type();
1658 assert_eq!(
1659 status_type, prev_status_type,
1660 "The DataType for 'status' and 'previous_status' enums must be identical"
1661 );
1662
1663 let content_hash_field = arrow_schema.field_with_name("content_hash")?;
1665 let content_hash_type = content_hash_field.data_type();
1666 assert!(
1667 matches!(content_hash_type, DataType::FixedSizeBinary(16)),
1668 "Content hash should be FixedSizeBinary(16)"
1669 );
1670
1671 let thumb_hash_field = arrow_schema.field_with_name("thumbnail_hash")?;
1672 let thumb_hash_type = thumb_hash_field.data_type();
1673 assert_eq!(
1674 content_hash_type, thumb_hash_type,
1675 "The DataType for 'content_hash' and 'thumbnail_hash' fixed types must be identical"
1676 );
1677 }
1678
1679 let buffer: Vec<u8> = Vec::new();
1681 let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1682 writer.write(&original)?;
1683 writer.finish()?;
1684 let bytes = writer.into_inner();
1685
1686 let rt_reader = ReaderBuilder::new()
1688 .build(Cursor::new(bytes))
1689 .expect("build reader for round-trip");
1690 let rt_schema = rt_reader.schema();
1691 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1692 let roundtrip =
1693 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1694
1695 assert_eq!(
1696 roundtrip, original,
1697 "Avro complex named types round-trip mismatch"
1698 );
1699
1700 Ok(())
1701 }
1702
1703 fn assert_schema_is_semantically_equivalent(expected: &Schema, actual: &Schema) {
1710 assert_metadata_is_superset(expected.metadata(), actual.metadata(), "Schema");
1712
1713 assert_eq!(
1715 expected.fields().len(),
1716 actual.fields().len(),
1717 "Schema must have the same number of fields"
1718 );
1719
1720 for (expected_field, actual_field) in expected.fields().iter().zip(actual.fields().iter()) {
1721 assert_field_is_semantically_equivalent(expected_field, actual_field);
1722 }
1723 }
1724
1725 fn assert_field_is_semantically_equivalent(expected: &Field, actual: &Field) {
1726 let context = format!("Field '{}'", expected.name());
1727
1728 assert_eq!(
1729 expected.name(),
1730 actual.name(),
1731 "{context}: names must match"
1732 );
1733 assert_eq!(
1734 expected.is_nullable(),
1735 actual.is_nullable(),
1736 "{context}: nullability must match"
1737 );
1738
1739 assert_datatype_is_semantically_equivalent(
1741 expected.data_type(),
1742 actual.data_type(),
1743 &context,
1744 );
1745
1746 assert_metadata_is_superset(expected.metadata(), actual.metadata(), &context);
1748 }
1749
1750 fn assert_datatype_is_semantically_equivalent(
1751 expected: &DataType,
1752 actual: &DataType,
1753 context: &str,
1754 ) {
1755 match (expected, actual) {
1756 (DataType::List(expected_field), DataType::List(actual_field))
1757 | (DataType::LargeList(expected_field), DataType::LargeList(actual_field))
1758 | (DataType::Map(expected_field, _), DataType::Map(actual_field, _)) => {
1759 assert_field_is_semantically_equivalent(expected_field, actual_field);
1760 }
1761 (DataType::Struct(expected_fields), DataType::Struct(actual_fields)) => {
1762 assert_eq!(
1763 expected_fields.len(),
1764 actual_fields.len(),
1765 "{context}: struct must have same number of fields"
1766 );
1767 for (ef, af) in expected_fields.iter().zip(actual_fields.iter()) {
1768 assert_field_is_semantically_equivalent(ef, af);
1769 }
1770 }
1771 (
1772 DataType::Union(expected_fields, expected_mode),
1773 DataType::Union(actual_fields, actual_mode),
1774 ) => {
1775 assert_eq!(
1776 expected_mode, actual_mode,
1777 "{context}: union mode must match"
1778 );
1779 assert_eq!(
1780 expected_fields.len(),
1781 actual_fields.len(),
1782 "{context}: union must have same number of variants"
1783 );
1784 for ((exp_id, exp_field), (act_id, act_field)) in
1785 expected_fields.iter().zip(actual_fields.iter())
1786 {
1787 assert_eq!(exp_id, act_id, "{context}: union type ids must match");
1788 assert_field_is_semantically_equivalent(exp_field, act_field);
1789 }
1790 }
1791 _ => {
1792 assert_eq!(expected, actual, "{context}: data types must be identical");
1793 }
1794 }
1795 }
1796
1797 fn assert_batch_data_is_identical(expected: &RecordBatch, actual: &RecordBatch) {
1798 assert_eq!(
1799 expected.num_columns(),
1800 actual.num_columns(),
1801 "RecordBatches must have the same number of columns"
1802 );
1803 assert_eq!(
1804 expected.num_rows(),
1805 actual.num_rows(),
1806 "RecordBatches must have the same number of rows"
1807 );
1808
1809 for i in 0..expected.num_columns() {
1810 let context = format!("Column {i}");
1811 let expected_col = expected.column(i);
1812 let actual_col = actual.column(i);
1813 assert_array_data_is_identical(expected_col, actual_col, &context);
1814 }
1815 }
1816
1817 fn assert_array_data_is_identical(expected: &dyn Array, actual: &dyn Array, context: &str) {
1819 assert_eq!(
1820 expected.nulls(),
1821 actual.nulls(),
1822 "{context}: null buffers must match"
1823 );
1824 assert_eq!(
1825 expected.len(),
1826 actual.len(),
1827 "{context}: array lengths must match"
1828 );
1829
1830 match (expected.data_type(), actual.data_type()) {
1831 (DataType::Union(expected_fields, _), DataType::Union(..)) => {
1832 let expected_union = expected.as_any().downcast_ref::<UnionArray>().unwrap();
1833 let actual_union = actual.as_any().downcast_ref::<UnionArray>().unwrap();
1834
1835 assert_eq!(
1837 &expected.to_data().buffers()[0],
1838 &actual.to_data().buffers()[0],
1839 "{context}: union type_ids buffer mismatch"
1840 );
1841
1842 if expected.to_data().buffers().len() > 1 {
1844 assert_eq!(
1845 &expected.to_data().buffers()[1],
1846 &actual.to_data().buffers()[1],
1847 "{context}: union value_offsets buffer mismatch"
1848 );
1849 }
1850
1851 for (type_id, _) in expected_fields.iter() {
1853 let child_context = format!("{context} -> child variant {type_id}");
1854 assert_array_data_is_identical(
1855 expected_union.child(type_id),
1856 actual_union.child(type_id),
1857 &child_context,
1858 );
1859 }
1860 }
1861 (DataType::Struct(_), DataType::Struct(_)) => {
1862 let expected_struct = expected.as_any().downcast_ref::<StructArray>().unwrap();
1863 let actual_struct = actual.as_any().downcast_ref::<StructArray>().unwrap();
1864 for i in 0..expected_struct.num_columns() {
1865 let child_context = format!("{context} -> struct child {i}");
1866 assert_array_data_is_identical(
1867 expected_struct.column(i),
1868 actual_struct.column(i),
1869 &child_context,
1870 );
1871 }
1872 }
1873 _ => {
1875 assert_eq!(
1876 expected.to_data().buffers(),
1877 actual.to_data().buffers(),
1878 "{context}: data buffers must match"
1879 );
1880 }
1881 }
1882 }
1883
1884 fn assert_metadata_is_superset(
1887 expected_meta: &HashMap<String, String>,
1888 actual_meta: &HashMap<String, String>,
1889 context: &str,
1890 ) {
1891 let allowed_additions: HashSet<&str> =
1892 vec!["arrowUnionMode", "arrowUnionTypeIds", "avro.name"]
1893 .into_iter()
1894 .collect();
1895 for (key, expected_value) in expected_meta {
1896 match actual_meta.get(key) {
1897 Some(actual_value) => assert_eq!(
1898 expected_value, actual_value,
1899 "{context}: preserved metadata for key '{key}' must have the same value"
1900 ),
1901 None => panic!("{context}: metadata key '{key}' was lost during roundtrip"),
1902 }
1903 }
1904 for key in actual_meta.keys() {
1905 if !expected_meta.contains_key(key) && !allowed_additions.contains(key.as_str()) {
1906 panic!("{context}: unexpected metadata key '{key}' was added during roundtrip");
1907 }
1908 }
1909 }
1910
1911 #[test]
1912 fn test_union_roundtrip() -> Result<(), AvroError> {
1913 let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1914 .join("test/data/union_fields.avro")
1915 .to_string_lossy()
1916 .into_owned();
1917 let rdr_file = File::open(&file_path).expect("open avro/union_fields.avro");
1918 let reader = ReaderBuilder::new()
1919 .build(BufReader::new(rdr_file))
1920 .expect("build reader for union_fields.avro");
1921 let schema = reader.schema();
1922 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1923 let original =
1924 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1925 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1926 writer.write(&original)?;
1927 writer.finish()?;
1928 let bytes = writer.into_inner();
1929 let rt_reader = ReaderBuilder::new()
1930 .build(Cursor::new(bytes))
1931 .expect("build round_trip reader");
1932 let rt_schema = rt_reader.schema();
1933 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1934 let round_trip =
1935 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1936
1937 assert_schema_is_semantically_equivalent(&original.schema(), &round_trip.schema());
1940
1941 assert_batch_data_is_identical(&original, &round_trip);
1942 Ok(())
1943 }
1944
1945 #[test]
1946 fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), AvroError> {
1947 let path = arrow_test_data("avro/simple_enum.avro");
1949 let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
1950 let reader = ReaderBuilder::new()
1951 .build(BufReader::new(rdr_file))
1952 .expect("build reader for simple_enum.avro");
1953 let in_schema = reader.schema();
1955 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1956 let original =
1957 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1958 let has_enum_dict = in_schema.fields().iter().any(|f| {
1960 matches!(
1961 f.data_type(),
1962 DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8
1963 )
1964 });
1965 assert!(
1966 has_enum_dict,
1967 "Expected at least one enum-mapped Dictionary<Int32, Utf8> field"
1968 );
1969 let buffer: Vec<u8> = Vec::new();
1972 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1973 writer.write(&original)?;
1974 writer.finish()?;
1975 let bytes = writer.into_inner();
1976 let rt_reader = ReaderBuilder::new()
1978 .build(Cursor::new(bytes))
1979 .expect("reader for round-trip");
1980 let rt_schema = rt_reader.schema();
1981 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1982 let roundtrip =
1983 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1984 assert_eq!(roundtrip, original, "Avro enum round-trip mismatch");
1985 Ok(())
1986 }
1987
1988 #[test]
1989 fn test_builder_propagates_capacity_to_writer() -> Result<(), AvroError> {
1990 let cap = 64 * 1024;
1991 let buffer = Vec::<u8>::new();
1992 let mut writer = WriterBuilder::new(make_schema())
1993 .with_capacity(cap)
1994 .build::<_, AvroOcfFormat>(buffer)?;
1995 assert_eq!(writer.capacity, cap, "builder capacity not propagated");
1996 let batch = make_batch();
1997 writer.write(&batch)?;
1998 writer.finish()?;
1999 let out = writer.into_inner();
2000 assert_eq!(&out[..4], b"Obj\x01", "OCF magic missing/incorrect");
2001 Ok(())
2002 }
2003
2004 #[test]
2005 fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), AvroError> {
2006 use arrow_array::{ArrayRef, Int32Array};
2007 use arrow_schema::{DataType, Field, Schema};
2008 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2009 let batch = RecordBatch::try_new(
2010 Arc::new(schema.clone()),
2011 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
2012 )?;
2013 let cap = 8192;
2014 let mut writer = WriterBuilder::new(schema)
2015 .with_capacity(cap)
2016 .build::<_, AvroSoeFormat>(Vec::new())?;
2017 assert_eq!(writer.capacity, cap);
2018 writer.write(&batch)?;
2019 let _bytes = writer.into_inner();
2020 Ok(())
2021 }
2022
2023 #[cfg(feature = "avro_custom_types")]
2024 #[test]
2025 fn test_roundtrip_duration_logical_types_ocf() -> Result<(), AvroError> {
2026 let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2027 .join("test/data/duration_logical_types.avro")
2028 .to_string_lossy()
2029 .into_owned();
2030
2031 let in_file = File::open(&file_path)
2032 .unwrap_or_else(|_| panic!("Failed to open test file: {}", file_path));
2033
2034 let reader = ReaderBuilder::new()
2035 .build(BufReader::new(in_file))
2036 .expect("build reader for duration_logical_types.avro");
2037 let in_schema = reader.schema();
2038
2039 let expected_units: HashSet<TimeUnit> = [
2040 TimeUnit::Nanosecond,
2041 TimeUnit::Microsecond,
2042 TimeUnit::Millisecond,
2043 TimeUnit::Second,
2044 ]
2045 .into_iter()
2046 .collect();
2047
2048 let found_units: HashSet<TimeUnit> = in_schema
2049 .fields()
2050 .iter()
2051 .filter_map(|f| match f.data_type() {
2052 DataType::Duration(unit) => Some(*unit),
2053 _ => None,
2054 })
2055 .collect();
2056
2057 assert_eq!(
2058 found_units, expected_units,
2059 "Expected to find all four Duration TimeUnits in the schema from the initial read"
2060 );
2061
2062 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2063 let input =
2064 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2065
2066 let tmp = NamedTempFile::new().expect("create temp file");
2067 {
2068 let out_file = File::create(tmp.path()).expect("create temp avro");
2069 let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?;
2070 writer.write(&input)?;
2071 writer.finish()?;
2072 }
2073
2074 let rt_file = File::open(tmp.path()).expect("open round_trip avro");
2075 let rt_reader = ReaderBuilder::new()
2076 .build(BufReader::new(rt_file))
2077 .expect("build round_trip reader");
2078 let rt_schema = rt_reader.schema();
2079 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2080 let round_trip =
2081 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2082
2083 assert_eq!(round_trip, input);
2084 Ok(())
2085 }
2086
2087 #[cfg(feature = "avro_custom_types")]
2088 #[test]
2089 fn test_run_end_encoded_roundtrip_writer() -> Result<(), AvroError> {
2090 let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
2091 let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2092 let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
2093 let field = Field::new("x", ree.data_type().clone(), true);
2094 let schema = Schema::new(vec![field]);
2095 let batch = RecordBatch::try_new(
2096 Arc::new(schema.clone()),
2097 vec![Arc::new(ree.clone()) as ArrayRef],
2098 )?;
2099 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2100 writer.write(&batch)?;
2101 writer.finish()?;
2102 let bytes = writer.into_inner();
2103 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2104 let out_schema = reader.schema();
2105 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2106 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2107 assert_eq!(out.num_columns(), 1);
2108 assert_eq!(out.num_rows(), 8);
2109 match out.schema().field(0).data_type() {
2110 DataType::RunEndEncoded(run_ends_field, values_field) => {
2111 assert_eq!(run_ends_field.name(), "run_ends");
2112 assert_eq!(run_ends_field.data_type(), &DataType::Int32);
2113 assert_eq!(values_field.name(), "values");
2114 assert_eq!(values_field.data_type(), &DataType::Int32);
2115 assert!(values_field.is_nullable());
2116 let got_ree = out
2117 .column(0)
2118 .as_any()
2119 .downcast_ref::<RunArray<Int32Type>>()
2120 .expect("RunArray<Int32Type>");
2121 assert_eq!(got_ree, &ree);
2122 }
2123 other => panic!(
2124 "Unexpected DataType for round-tripped RunEndEncoded column: {:?}",
2125 other
2126 ),
2127 }
2128 Ok(())
2129 }
2130
2131 #[cfg(feature = "avro_custom_types")]
2132 #[test]
2133 fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> Result<(), AvroError>
2134 {
2135 let run_ends = Int16Array::from(vec![2, 5, 7]); let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
2137 let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
2138 let field = Field::new("s", ree.data_type().clone(), true);
2139 let schema = Schema::new(vec![field]);
2140 let batch = RecordBatch::try_new(
2141 Arc::new(schema.clone()),
2142 vec![Arc::new(ree.clone()) as ArrayRef],
2143 )?;
2144 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2145 writer.write(&batch)?;
2146 writer.finish()?;
2147 let bytes = writer.into_inner();
2148 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2149 let out_schema = reader.schema();
2150 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2151 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2152 assert_eq!(out.num_columns(), 1);
2153 assert_eq!(out.num_rows(), 7);
2154 match out.schema().field(0).data_type() {
2155 DataType::RunEndEncoded(run_ends_field, values_field) => {
2156 assert_eq!(run_ends_field.data_type(), &DataType::Int16);
2157 assert_eq!(values_field.data_type(), &DataType::Utf8);
2158 assert!(
2159 values_field.is_nullable(),
2160 "REE 'values' child should be nullable"
2161 );
2162 let got = out
2163 .column(0)
2164 .as_any()
2165 .downcast_ref::<RunArray<Int16Type>>()
2166 .expect("RunArray<Int16Type>");
2167 assert_eq!(got, &ree);
2168 }
2169 other => panic!("Unexpected DataType: {:?}", other),
2170 }
2171 Ok(())
2172 }
2173
2174 #[cfg(feature = "avro_custom_types")]
2175 #[test]
2176 fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer() -> Result<(), AvroError>
2177 {
2178 let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
2179 let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2180 let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
2181 let field = Field::new("y", ree.data_type().clone(), true);
2182 let schema = Schema::new(vec![field]);
2183 let batch = RecordBatch::try_new(
2184 Arc::new(schema.clone()),
2185 vec![Arc::new(ree.clone()) as ArrayRef],
2186 )?;
2187 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2188 writer.write(&batch)?;
2189 writer.finish()?;
2190 let bytes = writer.into_inner();
2191 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2192 let out_schema = reader.schema();
2193 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2194 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2195 assert_eq!(out.num_columns(), 1);
2196 assert_eq!(out.num_rows(), 8);
2197 match out.schema().field(0).data_type() {
2198 DataType::RunEndEncoded(run_ends_field, values_field) => {
2199 assert_eq!(run_ends_field.data_type(), &DataType::Int64);
2200 assert_eq!(values_field.data_type(), &DataType::Int32);
2201 assert!(values_field.is_nullable());
2202 let got = out
2203 .column(0)
2204 .as_any()
2205 .downcast_ref::<RunArray<Int64Type>>()
2206 .expect("RunArray<Int64Type>");
2207 assert_eq!(got, &ree);
2208 }
2209 other => panic!("Unexpected DataType for REE column: {:?}", other),
2210 }
2211 Ok(())
2212 }
2213
2214 #[cfg(feature = "avro_custom_types")]
2215 #[test]
2216 fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), AvroError> {
2217 let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
2218 let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2219 let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
2220 let offset = 1usize;
2221 let length = 6usize;
2222 let base_values = base
2223 .values()
2224 .as_any()
2225 .downcast_ref::<Int32Array>()
2226 .expect("REE values as Int32Array");
2227 let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
2228 for i in offset..offset + length {
2229 let phys = base.get_physical_index(i);
2230 let v = if base_values.is_null(phys) {
2231 None
2232 } else {
2233 Some(base_values.value(phys))
2234 };
2235 logical_window.push(v);
2236 }
2237
2238 fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array, Int32Array) {
2239 if vals.is_empty() {
2240 return (Int32Array::new_null(0), Int32Array::new_null(0));
2241 }
2242 let mut run_ends_out: Vec<i32> = Vec::new();
2243 let mut run_vals_out: Vec<Option<i32>> = Vec::new();
2244 let mut cur = vals[0];
2245 let mut len = 1i32;
2246 for v in &vals[1..] {
2247 if *v == cur {
2248 len += 1;
2249 } else {
2250 let last_end = run_ends_out.last().copied().unwrap_or(0);
2251 run_ends_out.push(last_end + len);
2252 run_vals_out.push(cur);
2253 cur = *v;
2254 len = 1;
2255 }
2256 }
2257 let last_end = run_ends_out.last().copied().unwrap_or(0);
2258 run_ends_out.push(last_end + len);
2259 run_vals_out.push(cur);
2260 (
2261 Int32Array::from(run_ends_out),
2262 Int32Array::from(run_vals_out),
2263 )
2264 }
2265 let (owned_run_ends, owned_run_values) = compress_run_ends_i32(&logical_window);
2266 let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends, &owned_run_values)?;
2267 let field = Field::new("x", owned_slice.data_type().clone(), true);
2268 let schema = Schema::new(vec![field]);
2269 let batch = RecordBatch::try_new(
2270 Arc::new(schema.clone()),
2271 vec![Arc::new(owned_slice.clone()) as ArrayRef],
2272 )?;
2273 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2274 writer.write(&batch)?;
2275 writer.finish()?;
2276 let bytes = writer.into_inner();
2277 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2278 let out_schema = reader.schema();
2279 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2280 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2281 assert_eq!(out.num_columns(), 1);
2282 assert_eq!(out.num_rows(), length);
2283 match out.schema().field(0).data_type() {
2284 DataType::RunEndEncoded(run_ends_field, values_field) => {
2285 assert_eq!(run_ends_field.data_type(), &DataType::Int32);
2286 assert_eq!(values_field.data_type(), &DataType::Int32);
2287 assert!(values_field.is_nullable());
2288 let got = out
2289 .column(0)
2290 .as_any()
2291 .downcast_ref::<RunArray<Int32Type>>()
2292 .expect("RunArray<Int32Type>");
2293 fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
2294 let vals = a
2295 .values()
2296 .as_any()
2297 .downcast_ref::<Int32Array>()
2298 .expect("REE values as Int32Array");
2299 let mut out: Vec<Option<i32>> = Vec::with_capacity(a.len());
2300 for i in 0..a.len() {
2301 let phys = a.get_physical_index(i);
2302 out.push(if vals.is_null(phys) {
2303 None
2304 } else {
2305 Some(vals.value(phys))
2306 });
2307 }
2308 Int32Array::from(out)
2309 }
2310 let got_logical = expand_ree_to_int32(got);
2311 let expected_logical = Int32Array::from(logical_window);
2312 assert_eq!(
2313 got_logical, expected_logical,
2314 "Logical values differ after REE slice round-trip"
2315 );
2316 }
2317 other => panic!("Unexpected DataType for REE column: {:?}", other),
2318 }
2319 Ok(())
2320 }
2321
2322 #[cfg(not(feature = "avro_custom_types"))]
2323 #[test]
2324 fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), AvroError> {
2325 use arrow_schema::{DataType, Field, Schema};
2326 let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
2327 let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2328 let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2329 &run_ends,
2330 &run_values,
2331 )?;
2332 let field = Field::new("x", ree.data_type().clone(), true);
2333 let schema = Schema::new(vec![field]);
2334 let batch =
2335 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2336 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2337 writer.write(&batch)?;
2338 writer.finish()?;
2339 let bytes = writer.into_inner();
2340 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2341 let out_schema = reader.schema();
2342 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2343 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2344 assert_eq!(out.num_columns(), 1);
2345 assert_eq!(out.num_rows(), 8);
2346 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2347 let got = out
2348 .column(0)
2349 .as_any()
2350 .downcast_ref::<Int32Array>()
2351 .expect("Int32Array");
2352 let expected = Int32Array::from(vec![
2353 Some(1),
2354 Some(1),
2355 Some(1),
2356 Some(2),
2357 Some(2),
2358 None,
2359 None,
2360 Some(3),
2361 ]);
2362 assert_eq!(got, &expected);
2363 Ok(())
2364 }
2365
2366 #[cfg(not(feature = "avro_custom_types"))]
2367 #[test]
2368 fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
2369 -> Result<(), AvroError> {
2370 use arrow_schema::{DataType, Field, Schema};
2371 let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
2372 let run_values = arrow_array::StringArray::from(vec![Some("a"), None, Some("c")]);
2373 let ree = arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
2374 &run_ends,
2375 &run_values,
2376 )?;
2377 let field = Field::new("s", ree.data_type().clone(), true);
2378 let schema = Schema::new(vec![field]);
2379 let batch =
2380 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2381 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2382 writer.write(&batch)?;
2383 writer.finish()?;
2384 let bytes = writer.into_inner();
2385 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2386 let out_schema = reader.schema();
2387 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2388 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2389 assert_eq!(out.num_columns(), 1);
2390 assert_eq!(out.num_rows(), 7);
2391 assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
2392 let got = out
2393 .column(0)
2394 .as_any()
2395 .downcast_ref::<arrow_array::StringArray>()
2396 .expect("StringArray");
2397 let expected = arrow_array::StringArray::from(vec![
2398 Some("a"),
2399 Some("a"),
2400 None,
2401 None,
2402 None,
2403 Some("c"),
2404 Some("c"),
2405 ]);
2406 assert_eq!(got, &expected);
2407 Ok(())
2408 }
2409
2410 #[cfg(not(feature = "avro_custom_types"))]
2411 #[test]
2412 fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
2413 -> Result<(), AvroError> {
2414 use arrow_schema::{DataType, Field, Schema};
2415 let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
2416 let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2417 let ree = arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
2418 &run_ends,
2419 &run_values,
2420 )?;
2421 let field = Field::new("y", ree.data_type().clone(), true);
2422 let schema = Schema::new(vec![field]);
2423 let batch =
2424 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2425 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2426 writer.write(&batch)?;
2427 writer.finish()?;
2428 let bytes = writer.into_inner();
2429 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2430 let out_schema = reader.schema();
2431 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2432 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2433 assert_eq!(out.num_columns(), 1);
2434 assert_eq!(out.num_rows(), 8);
2435 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2436 let got = out
2437 .column(0)
2438 .as_any()
2439 .downcast_ref::<Int32Array>()
2440 .expect("Int32Array");
2441 let expected = Int32Array::from(vec![
2442 Some(999),
2443 Some(999),
2444 Some(999),
2445 Some(999),
2446 Some(-5),
2447 Some(-5),
2448 Some(-5),
2449 Some(-5),
2450 ]);
2451 assert_eq!(got, &expected);
2452 Ok(())
2453 }
2454
2455 #[cfg(not(feature = "avro_custom_types"))]
2456 #[test]
2457 fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> Result<(), AvroError> {
2458 use arrow_schema::{DataType, Field, Schema};
2459 let run_ends = Int32Array::from(vec![2, 4, 6]);
2460 let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
2461 let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2462 &run_ends,
2463 &run_values,
2464 )?;
2465 let field = Field::new("x", ree.data_type().clone(), true);
2466 let schema = Schema::new(vec![field]);
2467 let batch =
2468 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2469 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2470 writer.write(&batch)?;
2471 writer.finish()?;
2472 let bytes = writer.into_inner();
2473 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2474 let out_schema = reader.schema();
2475 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2476 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2477 assert_eq!(out.num_columns(), 1);
2478 assert_eq!(out.num_rows(), 6);
2479 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2480 let got = out
2481 .column(0)
2482 .as_any()
2483 .downcast_ref::<Int32Array>()
2484 .expect("Int32Array");
2485 let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), Some(2), None, None]);
2486 assert_eq!(got, &expected);
2487 Ok(())
2488 }
2489
2490 #[test]
2491 #[cfg(feature = "snappy")]
2493 fn test_nullable_impala_roundtrip() -> Result<(), AvroError> {
2494 let path = arrow_test_data("avro/nullable.impala.avro");
2495 let rdr_file = File::open(&path).expect("open avro/nullable.impala.avro");
2496 let reader = ReaderBuilder::new()
2497 .build(BufReader::new(rdr_file))
2498 .expect("build reader for nullable.impala.avro");
2499 let in_schema = reader.schema();
2500 assert!(
2501 in_schema.fields().iter().any(|f| f.is_nullable()),
2502 "expected at least one nullable field in avro/nullable.impala.avro"
2503 );
2504 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2505 let original =
2506 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2507 let buffer: Vec<u8> = Vec::new();
2508 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
2509 writer.write(&original)?;
2510 writer.finish()?;
2511 let out_bytes = writer.into_inner();
2512 let rt_reader = ReaderBuilder::new()
2513 .build(Cursor::new(out_bytes))
2514 .expect("build reader for round-tripped in-memory OCF");
2515 let rt_schema = rt_reader.schema();
2516 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2517 let roundtrip =
2518 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2519 assert_eq!(
2520 roundtrip, original,
2521 "Round-trip Avro data mismatch for nullable.impala.avro"
2522 );
2523 Ok(())
2524 }
2525
2526 #[test]
2527 #[cfg(feature = "snappy")]
2528 fn test_datapage_v2_roundtrip() -> Result<(), AvroError> {
2529 let path = arrow_test_data("avro/datapage_v2.snappy.avro");
2530 let rdr_file = File::open(&path).expect("open avro/datapage_v2.snappy.avro");
2531 let reader = ReaderBuilder::new()
2532 .build(BufReader::new(rdr_file))
2533 .expect("build reader for datapage_v2.snappy.avro");
2534 let in_schema = reader.schema();
2535 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2536 let original =
2537 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2538 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2539 writer.write(&original)?;
2540 writer.finish()?;
2541 let bytes = writer.into_inner();
2542 let rt_reader = ReaderBuilder::new()
2543 .build(Cursor::new(bytes))
2544 .expect("build round-trip reader");
2545 let rt_schema = rt_reader.schema();
2546 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2547 let round_trip =
2548 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2549 assert_eq!(
2550 round_trip, original,
2551 "Round-trip batch mismatch for datapage_v2.snappy.avro"
2552 );
2553 Ok(())
2554 }
2555
2556 #[test]
2557 #[cfg(feature = "snappy")]
2558 fn test_single_nan_roundtrip() -> Result<(), AvroError> {
2559 let path = arrow_test_data("avro/single_nan.avro");
2560 let in_file = File::open(&path).expect("open avro/single_nan.avro");
2561 let reader = ReaderBuilder::new()
2562 .build(BufReader::new(in_file))
2563 .expect("build reader for single_nan.avro");
2564 let in_schema = reader.schema();
2565 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2566 let original =
2567 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2568 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2569 writer.write(&original)?;
2570 writer.finish()?;
2571 let bytes = writer.into_inner();
2572 let rt_reader = ReaderBuilder::new()
2573 .build(Cursor::new(bytes))
2574 .expect("build round_trip reader");
2575 let rt_schema = rt_reader.schema();
2576 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2577 let round_trip =
2578 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2579 assert_eq!(
2580 round_trip, original,
2581 "Round-trip batch mismatch for avro/single_nan.avro"
2582 );
2583 Ok(())
2584 }
2585 #[test]
2586 #[cfg(feature = "snappy")]
2588 fn test_dict_pages_offset_zero_roundtrip() -> Result<(), AvroError> {
2589 let path = arrow_test_data("avro/dict-page-offset-zero.avro");
2590 let rdr_file = File::open(&path).expect("open avro/dict-page-offset-zero.avro");
2591 let reader = ReaderBuilder::new()
2592 .build(BufReader::new(rdr_file))
2593 .expect("build reader for dict-page-offset-zero.avro");
2594 let in_schema = reader.schema();
2595 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2596 let original =
2597 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2598 let buffer: Vec<u8> = Vec::new();
2599 let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
2600 writer.write(&original)?;
2601 writer.finish()?;
2602 let bytes = writer.into_inner();
2603 let rt_reader = ReaderBuilder::new()
2604 .build(Cursor::new(bytes))
2605 .expect("build reader for round-trip");
2606 let rt_schema = rt_reader.schema();
2607 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2608 let roundtrip =
2609 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2610 assert_eq!(
2611 roundtrip, original,
2612 "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
2613 );
2614 Ok(())
2615 }
2616
2617 #[test]
2618 #[cfg(feature = "snappy")]
2619 fn test_repeated_no_annotation_roundtrip() -> Result<(), AvroError> {
2620 let path = arrow_test_data("avro/repeated_no_annotation.avro");
2621 let in_file = File::open(&path).expect("open avro/repeated_no_annotation.avro");
2622 let reader = ReaderBuilder::new()
2623 .build(BufReader::new(in_file))
2624 .expect("build reader for repeated_no_annotation.avro");
2625 let in_schema = reader.schema();
2626 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2627 let original =
2628 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2629 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2630 writer.write(&original)?;
2631 writer.finish()?;
2632 let bytes = writer.into_inner();
2633 let rt_reader = ReaderBuilder::new()
2634 .build(Cursor::new(bytes))
2635 .expect("build reader for round-trip buffer");
2636 let rt_schema = rt_reader.schema();
2637 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2638 let round_trip =
2639 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round-trip");
2640 assert_eq!(
2641 round_trip, original,
2642 "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
2643 );
2644 Ok(())
2645 }
2646
2647 #[test]
2648 fn test_nested_record_type_reuse_roundtrip() -> Result<(), AvroError> {
2649 let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2650 .join("test/data/nested_record_reuse.avro")
2651 .to_string_lossy()
2652 .into_owned();
2653 let in_file = File::open(&path).expect("open avro/nested_record_reuse.avro");
2654 let reader = ReaderBuilder::new()
2655 .build(BufReader::new(in_file))
2656 .expect("build reader for nested_record_reuse.avro");
2657 let in_schema = reader.schema();
2658 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2659 let input =
2660 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2661 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2662 writer.write(&input)?;
2663 writer.finish()?;
2664 let bytes = writer.into_inner();
2665 let rt_reader = ReaderBuilder::new()
2666 .build(Cursor::new(bytes))
2667 .expect("build round_trip reader");
2668 let rt_schema = rt_reader.schema();
2669 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2670 let round_trip =
2671 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2672 assert_eq!(
2673 round_trip, input,
2674 "Round-trip batch mismatch for nested_record_reuse.avro"
2675 );
2676 Ok(())
2677 }
2678
2679 #[test]
2680 fn test_enum_type_reuse_roundtrip() -> Result<(), AvroError> {
2681 let path =
2682 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
2683 let rdr_file = std::fs::File::open(&path).expect("open test/data/enum_reuse.avro");
2684 let reader = ReaderBuilder::new()
2685 .build(std::io::BufReader::new(rdr_file))
2686 .expect("build reader for enum_reuse.avro");
2687 let in_schema = reader.schema();
2688 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2689 let original =
2690 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2691 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2692 writer.write(&original)?;
2693 writer.finish()?;
2694 let bytes = writer.into_inner();
2695 let rt_reader = ReaderBuilder::new()
2696 .build(std::io::Cursor::new(bytes))
2697 .expect("build round_trip reader");
2698 let rt_schema = rt_reader.schema();
2699 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2700 let round_trip =
2701 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2702 assert_eq!(
2703 round_trip, original,
2704 "Avro enum type reuse round-trip mismatch"
2705 );
2706 Ok(())
2707 }
2708
2709 #[test]
2710 fn comprehensive_e2e_test_roundtrip() -> Result<(), AvroError> {
2711 let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2712 .join("test/data/comprehensive_e2e.avro");
2713 let rdr_file = File::open(&path).expect("open test/data/comprehensive_e2e.avro");
2714 let reader = ReaderBuilder::new()
2715 .build(BufReader::new(rdr_file))
2716 .expect("build reader for comprehensive_e2e.avro");
2717 let in_schema = reader.schema();
2718 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2719 let original =
2720 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2721 let sink: Vec<u8> = Vec::new();
2722 let mut writer = AvroWriter::new(sink, original.schema().as_ref().clone())?;
2723 writer.write(&original)?;
2724 writer.finish()?;
2725 let bytes = writer.into_inner();
2726 let rt_reader = ReaderBuilder::new()
2727 .build(Cursor::new(bytes))
2728 .expect("build round-trip reader");
2729 let rt_schema = rt_reader.schema();
2730 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2731 let roundtrip =
2732 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2733 assert_eq!(
2734 roundtrip, original,
2735 "Round-trip batch mismatch for comprehensive_e2e.avro"
2736 );
2737 Ok(())
2738 }
2739
2740 #[test]
2741 fn test_roundtrip_new_time_encoders_writer() -> Result<(), AvroError> {
2742 let schema = Schema::new(vec![
2743 Field::new("d32", DataType::Date32, false),
2744 Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), false),
2745 Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), false),
2746 Field::new(
2747 "ts_ms",
2748 DataType::Timestamp(TimeUnit::Millisecond, None),
2749 false,
2750 ),
2751 Field::new(
2752 "ts_us",
2753 DataType::Timestamp(TimeUnit::Microsecond, None),
2754 false,
2755 ),
2756 Field::new(
2757 "ts_ns",
2758 DataType::Timestamp(TimeUnit::Nanosecond, None),
2759 false,
2760 ),
2761 ]);
2762 let d32 = Date32Array::from(vec![0, 1, -1]);
2763 let t32_ms: PrimitiveArray<Time32MillisecondType> =
2764 vec![0_i32, 12_345_i32, 86_399_999_i32].into();
2765 let t64_us: PrimitiveArray<Time64MicrosecondType> =
2766 vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
2767 let ts_ms: PrimitiveArray<TimestampMillisecondType> =
2768 vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
2769 let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2770 let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2771 let batch = RecordBatch::try_new(
2772 Arc::new(schema.clone()),
2773 vec![
2774 Arc::new(d32) as ArrayRef,
2775 Arc::new(t32_ms) as ArrayRef,
2776 Arc::new(t64_us) as ArrayRef,
2777 Arc::new(ts_ms) as ArrayRef,
2778 Arc::new(ts_us) as ArrayRef,
2779 Arc::new(ts_ns) as ArrayRef,
2780 ],
2781 )?;
2782 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2783 writer.write(&batch)?;
2784 writer.finish()?;
2785 let bytes = writer.into_inner();
2786 let rt_reader = ReaderBuilder::new()
2787 .build(std::io::Cursor::new(bytes))
2788 .expect("build reader for round-trip of new time encoders");
2789 let rt_schema = rt_reader.schema();
2790 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2791 let roundtrip =
2792 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2793 assert_eq!(roundtrip, batch);
2794 Ok(())
2795 }
2796
2797 fn make_encoder_schema() -> Schema {
2798 Schema::new(vec![
2799 Field::new("a", DataType::Int32, false),
2800 Field::new("b", DataType::Int32, false),
2801 ])
2802 }
2803
2804 fn make_encoder_batch(schema: &Schema) -> RecordBatch {
2805 let a = Int32Array::from(vec![1, 2, 3]);
2806 let b = Int32Array::from(vec![10, 20, 30]);
2807 RecordBatch::try_new(
2808 Arc::new(schema.clone()),
2809 vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2810 )
2811 .expect("failed to build test RecordBatch")
2812 }
2813
2814 fn make_real_avro_schema_and_batch() -> Result<(Schema, RecordBatch, AvroSchema), AvroError> {
2815 let avro_json = r#"
2816 {
2817 "type": "record",
2818 "name": "User",
2819 "fields": [
2820 { "name": "id", "type": "long" },
2821 { "name": "name", "type": "string" },
2822 { "name": "active", "type": "boolean" },
2823 { "name": "tags", "type": { "type": "array", "items": "int" } },
2824 { "name": "opt", "type": ["null", "string"], "default": null }
2825 ]
2826 }"#;
2827 let avro_schema = AvroSchema::new(avro_json.to_string());
2828 let mut md = HashMap::new();
2829 md.insert(
2830 SCHEMA_METADATA_KEY.to_string(),
2831 avro_schema.json_string.clone(),
2832 );
2833 let item_field = Arc::new(Field::new(
2834 Field::LIST_FIELD_DEFAULT_NAME,
2835 DataType::Int32,
2836 false,
2837 ));
2838 let schema = Schema::new_with_metadata(
2839 vec![
2840 Field::new("id", DataType::Int64, false),
2841 Field::new("name", DataType::Utf8, false),
2842 Field::new("active", DataType::Boolean, false),
2843 Field::new("tags", DataType::List(item_field.clone()), false),
2844 Field::new("opt", DataType::Utf8, true),
2845 ],
2846 md,
2847 );
2848 let id = Int64Array::from(vec![1, 2, 3]);
2849 let name = StringArray::from(vec!["alice", "bob", "carol"]);
2850 let active = BooleanArray::from(vec![true, false, true]);
2851 let mut tags_builder = ListBuilder::new(Int32Builder::new()).with_field(item_field);
2852 tags_builder.values().append_value(1);
2853 tags_builder.values().append_value(2);
2854 tags_builder.append(true);
2855 tags_builder.append(true);
2856 tags_builder.values().append_value(3);
2857 tags_builder.append(true);
2858 let tags = tags_builder.finish();
2859 let opt = StringArray::from(vec![Some("x"), None, Some("z")]);
2860 let batch = RecordBatch::try_new(
2861 Arc::new(schema.clone()),
2862 vec![
2863 Arc::new(id) as ArrayRef,
2864 Arc::new(name) as ArrayRef,
2865 Arc::new(active) as ArrayRef,
2866 Arc::new(tags) as ArrayRef,
2867 Arc::new(opt) as ArrayRef,
2868 ],
2869 )?;
2870 Ok((schema, batch, avro_schema))
2871 }
2872
2873 #[test]
2874 fn test_row_writer_matches_stream_writer_soe() -> Result<(), AvroError> {
2875 let schema = make_encoder_schema();
2876 let batch = make_encoder_batch(&schema);
2877 let mut stream = AvroStreamWriter::new(Vec::<u8>::new(), schema.clone())?;
2878 stream.write(&batch)?;
2879 stream.finish()?;
2880 let stream_bytes = stream.into_inner();
2881 let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2882 row_writer.encode(&batch)?;
2883 let rows = row_writer.flush();
2884 let row_bytes: Vec<u8> = rows.bytes().to_vec();
2885 assert_eq!(stream_bytes, row_bytes);
2886 Ok(())
2887 }
2888
2889 #[test]
2890 fn test_row_writer_flush_clears_buffer() -> Result<(), AvroError> {
2891 let schema = make_encoder_schema();
2892 let batch = make_encoder_batch(&schema);
2893 let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2894 row_writer.encode(&batch)?;
2895 assert_eq!(row_writer.buffered_len(), batch.num_rows());
2896 let out1 = row_writer.flush();
2897 assert_eq!(out1.len(), batch.num_rows());
2898 assert_eq!(row_writer.buffered_len(), 0);
2899 let out2 = row_writer.flush();
2900 assert_eq!(out2.len(), 0);
2901 Ok(())
2902 }
2903
2904 #[test]
2905 fn test_row_writer_roundtrip_decoder_soe_real_avro_data() -> Result<(), AvroError> {
2906 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2907 let mut store = SchemaStore::new();
2908 store.register(avro_schema.clone())?;
2909 let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2910 row_writer.encode(&batch)?;
2911 let rows = row_writer.flush();
2912 let mut decoder = ReaderBuilder::new()
2913 .with_writer_schema_store(store)
2914 .with_batch_size(1024)
2915 .build_decoder()?;
2916 for row in rows.iter() {
2917 let consumed = decoder.decode(row.as_ref())?;
2918 assert_eq!(
2919 consumed,
2920 row.len(),
2921 "decoder should consume the full row frame"
2922 );
2923 }
2924 let out = decoder.flush()?.expect("decoded batch");
2925 let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2926 let actual = pretty_format_batches(&[out])?.to_string();
2927 assert_eq!(expected, actual);
2928 Ok(())
2929 }
2930
2931 #[test]
2932 fn test_row_writer_roundtrip_decoder_soe_streaming_chunks() -> Result<(), AvroError> {
2933 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2934 let mut store = SchemaStore::new();
2935 store.register(avro_schema.clone())?;
2936 let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2937 row_writer.encode(&batch)?;
2938 let rows = row_writer.flush();
2939 let mut stream: Vec<u8> = Vec::new();
2941 let mut boundaries: Vec<usize> = Vec::with_capacity(rows.len() + 1);
2942 boundaries.push(0usize);
2943 for row in rows.iter() {
2944 stream.extend_from_slice(row.as_ref());
2945 boundaries.push(stream.len());
2946 }
2947 let mut decoder = ReaderBuilder::new()
2948 .with_writer_schema_store(store)
2949 .with_batch_size(1024)
2950 .build_decoder()?;
2951 let mut buffered = BytesMut::new();
2952 let chunk_rows = [1usize, 2, 3, 1, 4, 2];
2953 let mut row_idx = 0usize;
2954 let mut i = 0usize;
2955 let n_rows = rows.len();
2956 while row_idx < n_rows {
2957 let take = chunk_rows[i % chunk_rows.len()];
2958 i += 1;
2959 let end_row = (row_idx + take).min(n_rows);
2960 let byte_start = boundaries[row_idx];
2961 let byte_end = boundaries[end_row];
2962 buffered.extend_from_slice(&stream[byte_start..byte_end]);
2963 loop {
2964 let consumed = decoder.decode(&buffered)?;
2965 if consumed == 0 {
2966 break;
2967 }
2968 let _ = buffered.split_to(consumed);
2969 }
2970 assert!(
2971 buffered.is_empty(),
2972 "expected decoder to consume the entire frame-aligned chunk"
2973 );
2974 row_idx = end_row;
2975 }
2976 let out = decoder.flush()?.expect("decoded batch");
2977 let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2978 let actual = pretty_format_batches(&[out])?.to_string();
2979 assert_eq!(expected, actual);
2980 Ok(())
2981 }
2982
2983 #[test]
2984 fn test_row_writer_roundtrip_decoder_confluent_wire_format_id() -> Result<(), AvroError> {
2985 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2986 let schema_id: u32 = 42;
2987 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
2988 store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;
2989 let mut row_writer = WriterBuilder::new(schema)
2990 .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
2991 .build_encoder::<AvroSoeFormat>()?;
2992 row_writer.encode(&batch)?;
2993 let rows = row_writer.flush();
2994 let mut decoder = ReaderBuilder::new()
2995 .with_writer_schema_store(store)
2996 .with_batch_size(1024)
2997 .build_decoder()?;
2998 for row in rows.iter() {
2999 let consumed = decoder.decode(row.as_ref())?;
3000 assert_eq!(consumed, row.len());
3001 }
3002 let out = decoder.flush()?.expect("decoded batch");
3003 let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
3004 let actual = pretty_format_batches(&[out])?.to_string();
3005 assert_eq!(expected, actual);
3006 Ok(())
3007 }
3008 #[test]
3009 fn test_encoder_encode_batches_flush_and_encoded_rows_methods_with_avro_binary_format()
3010 -> Result<(), AvroError> {
3011 use crate::writer::format::AvroBinaryFormat;
3012 use arrow_array::{ArrayRef, Int32Array, RecordBatch};
3013 use arrow_schema::{DataType, Field, Schema};
3014 use std::sync::Arc;
3015 let schema = Schema::new(vec![
3016 Field::new("a", DataType::Int32, false),
3017 Field::new("b", DataType::Int32, false),
3018 ]);
3019 let schema_ref = Arc::new(schema.clone());
3020 let batch1 = RecordBatch::try_new(
3021 schema_ref.clone(),
3022 vec![
3023 Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
3024 Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef,
3025 ],
3026 )?;
3027 let batch2 = RecordBatch::try_new(
3028 schema_ref,
3029 vec![
3030 Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef,
3031 Arc::new(Int32Array::from(vec![40, 50])) as ArrayRef,
3032 ],
3033 )?;
3034 let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
3035 let empty = Encoder::flush(&mut encoder);
3036 assert_eq!(EncodedRows::len(&empty), 0);
3037 assert!(EncodedRows::is_empty(&empty));
3038 assert_eq!(EncodedRows::bytes(&empty).as_ref(), &[] as &[u8]);
3039 assert_eq!(EncodedRows::offsets(&empty), &[0usize]);
3040 assert_eq!(EncodedRows::iter(&empty).count(), 0);
3041 let empty_vecs: Vec<Vec<u8>> = empty.iter().map(|b| b.to_vec()).collect();
3042 assert!(empty_vecs.is_empty());
3043 let batches = vec![batch1, batch2];
3044 Encoder::encode_batches(&mut encoder, &batches)?;
3045 assert_eq!(encoder.buffered_len(), 5);
3046 let rows = Encoder::flush(&mut encoder);
3047 assert_eq!(
3048 encoder.buffered_len(),
3049 0,
3050 "Encoder::flush should reset the internal offsets"
3051 );
3052 assert_eq!(EncodedRows::len(&rows), 5);
3053 assert!(!EncodedRows::is_empty(&rows));
3054 let expected_offsets: &[usize] = &[0, 2, 4, 6, 8, 10];
3055 assert_eq!(EncodedRows::offsets(&rows), expected_offsets);
3056 let expected_rows: Vec<Vec<u8>> = vec![
3057 vec![2, 20],
3058 vec![4, 40],
3059 vec![6, 60],
3060 vec![8, 80],
3061 vec![10, 100],
3062 ];
3063 let expected_stream: Vec<u8> = expected_rows.concat();
3064 assert_eq!(
3065 EncodedRows::bytes(&rows).as_ref(),
3066 expected_stream.as_slice()
3067 );
3068 for (i, expected) in expected_rows.iter().enumerate() {
3069 assert_eq!(EncodedRows::row(&rows, i)?.as_ref(), expected.as_slice());
3070 }
3071 let iter_rows: Vec<Vec<u8>> = EncodedRows::iter(&rows).map(|b| b.to_vec()).collect();
3072 assert_eq!(iter_rows, expected_rows);
3073 let recreated = EncodedRows::new(
3074 EncodedRows::bytes(&rows).clone(),
3075 EncodedRows::offsets(&rows).to_vec(),
3076 );
3077 assert_eq!(EncodedRows::len(&recreated), EncodedRows::len(&rows));
3078 assert_eq!(EncodedRows::bytes(&recreated), EncodedRows::bytes(&rows));
3079 assert_eq!(
3080 EncodedRows::offsets(&recreated),
3081 EncodedRows::offsets(&rows)
3082 );
3083 let rec_vecs: Vec<Vec<u8>> = recreated.iter().map(|b| b.to_vec()).collect();
3084 assert_eq!(rec_vecs, iter_rows);
3085 let empty_again = Encoder::flush(&mut encoder);
3086 assert!(EncodedRows::is_empty(&empty_again));
3087 Ok(())
3088 }
3089
3090 #[test]
3091 fn test_writer_builder_build_rejects_avro_binary_format() {
3092 use crate::writer::format::AvroBinaryFormat;
3093 use arrow_schema::{DataType, Field, Schema};
3094 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3095 let err = WriterBuilder::new(schema)
3096 .build::<_, AvroBinaryFormat>(Vec::<u8>::new())
3097 .unwrap_err();
3098 match err {
3099 AvroError::InvalidArgument(msg) => assert_eq!(
3100 msg,
3101 "AvroBinaryFormat is only supported with Encoder, use build_encoder instead"
3102 ),
3103 other => panic!("expected InvalidArgumentError, got {:?}", other),
3104 }
3105 }
3106 #[test]
3107 fn test_row_encoder_avro_binary_format_roundtrip_decoder_with_soe_framing()
3108 -> Result<(), AvroError> {
3109 use crate::writer::format::AvroBinaryFormat;
3110 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
3111 let batches: Vec<RecordBatch> = vec![batch.clone(), batch.slice(1, 2)];
3112 let expected = arrow::compute::concat_batches(&batch.schema(), &batches)?;
3113 let mut binary_encoder =
3114 WriterBuilder::new(schema.clone()).build_encoder::<AvroBinaryFormat>()?;
3115 binary_encoder.encode_batches(&batches)?;
3116 let binary_rows = binary_encoder.flush();
3117 assert_eq!(
3118 binary_rows.len(),
3119 expected.num_rows(),
3120 "binary encoder row count mismatch"
3121 );
3122 let mut soe_encoder = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
3123 soe_encoder.encode_batches(&batches)?;
3124 let soe_rows = soe_encoder.flush();
3125 assert_eq!(
3126 soe_rows.len(),
3127 binary_rows.len(),
3128 "SOE vs binary row count mismatch"
3129 );
3130 let mut store = SchemaStore::new(); let fp = store.register(avro_schema)?;
3132 let fp_le_bytes = match fp {
3133 Fingerprint::Rabin(v) => v.to_le_bytes(),
3134 other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"),
3135 };
3136 const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
3137 const SOE_PREFIX_LEN: usize = 2 + 8;
3138 for i in 0..binary_rows.len() {
3139 let body = binary_rows.row(i)?;
3140 let soe = soe_rows.row(i)?;
3141 assert!(
3142 soe.len() >= SOE_PREFIX_LEN,
3143 "expected SOE row to include prefix"
3144 );
3145 assert_eq!(&soe.as_ref()[..2], &SOE_MAGIC);
3146 assert_eq!(&soe.as_ref()[2..SOE_PREFIX_LEN], &fp_le_bytes);
3147 assert_eq!(
3148 &soe.as_ref()[SOE_PREFIX_LEN..],
3149 body.as_ref(),
3150 "SOE body bytes differ from AvroBinaryFormat body bytes (row {i})"
3151 );
3152 }
3153 let mut decoder = ReaderBuilder::new()
3154 .with_writer_schema_store(store)
3155 .with_batch_size(1024)
3156 .build_decoder()?;
3157 for body in binary_rows.iter() {
3158 let mut framed = Vec::with_capacity(SOE_PREFIX_LEN + body.len());
3159 framed.extend_from_slice(&SOE_MAGIC);
3160 framed.extend_from_slice(&fp_le_bytes);
3161 framed.extend_from_slice(body.as_ref());
3162 let consumed = decoder.decode(&framed)?;
3163 assert_eq!(
3164 consumed,
3165 framed.len(),
3166 "decoder should consume the full SOE-framed message"
3167 );
3168 }
3169 let out = decoder.flush()?.expect("expected a decoded RecordBatch");
3170 let expected_str = pretty_format_batches(&[expected])?.to_string();
3171 let actual_str = pretty_format_batches(&[out])?.to_string();
3172 assert_eq!(expected_str, actual_str);
3173 Ok(())
3174 }
3175
3176 #[test]
3177 fn test_row_encoder_avro_binary_format_roundtrip_decoder_streaming_chunks()
3178 -> Result<(), AvroError> {
3179 use crate::writer::format::AvroBinaryFormat;
3180 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
3181 let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
3182 encoder.encode(&batch)?;
3183 let rows = encoder.flush();
3184 let mut store = SchemaStore::new();
3185 let fp = store.register(avro_schema)?;
3186 let fp_le_bytes = match fp {
3187 Fingerprint::Rabin(v) => v.to_le_bytes(),
3188 other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"),
3189 };
3190 const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
3191 const SOE_PREFIX_LEN: usize = 2 + 8;
3192 let mut stream: Vec<u8> = Vec::new();
3193 for body in rows.iter() {
3194 let msg_len: u32 = (SOE_PREFIX_LEN + body.len())
3195 .try_into()
3196 .expect("message length must fit in u32");
3197 stream.extend_from_slice(&msg_len.to_le_bytes());
3198 stream.extend_from_slice(&SOE_MAGIC);
3199 stream.extend_from_slice(&fp_le_bytes);
3200 stream.extend_from_slice(body.as_ref());
3201 }
3202 let mut decoder = ReaderBuilder::new()
3203 .with_writer_schema_store(store)
3204 .with_batch_size(1024)
3205 .build_decoder()?;
3206 let chunk_sizes = [1usize, 2, 3, 5, 8, 13, 21, 34];
3207 let mut pos = 0usize;
3208 let mut i = 0usize;
3209 let mut buffered = BytesMut::new();
3210 let mut decoded_frames = 0usize;
3211 while pos < stream.len() {
3212 let take = chunk_sizes[i % chunk_sizes.len()];
3213 i += 1;
3214 let end = (pos + take).min(stream.len());
3215 buffered.extend_from_slice(&stream[pos..end]);
3216 pos = end;
3217 loop {
3218 if buffered.len() < 4 {
3219 break;
3220 }
3221 let msg_len =
3222 u32::from_le_bytes([buffered[0], buffered[1], buffered[2], buffered[3]])
3223 as usize;
3224 if buffered.len() < 4 + msg_len {
3225 break;
3226 }
3227 let frame = buffered.split_to(4 + msg_len);
3228 let payload = &frame[4..];
3229 let consumed = decoder.decode(payload)?;
3230 assert_eq!(
3231 consumed,
3232 payload.len(),
3233 "decoder should consume the full SOE-framed message"
3234 );
3235
3236 decoded_frames += 1;
3237 }
3238 }
3239 assert!(
3240 buffered.is_empty(),
3241 "expected transport framer to consume all bytes; leftover = {}",
3242 buffered.len()
3243 );
3244 assert_eq!(
3245 decoded_frames,
3246 rows.len(),
3247 "expected to decode exactly one frame per encoded row"
3248 );
3249 let out = decoder.flush()?.expect("expected decoded RecordBatch");
3250 let expected_str = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
3251 let actual_str = pretty_format_batches(&[out])?.to_string();
3252 assert_eq!(expected_str, actual_str);
3253 Ok(())
3254 }
3255}