1use crate::codec::AvroFieldBuilder;
147use crate::compression::CompressionCodec;
148use crate::errors::AvroError;
149use crate::schema::{
150 AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY,
151};
152use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
153use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
154use arrow_array::RecordBatch;
155use arrow_schema::{Schema, SchemaRef};
156use bytes::{Bytes, BytesMut};
157use std::io::Write;
158use std::sync::Arc;
159
160mod encoder;
162pub mod format;
164
165#[derive(Debug, Clone)]
176pub struct EncodedRows {
177 data: Bytes,
178 offsets: Vec<usize>,
179}
180
181impl EncodedRows {
182 pub fn new(data: Bytes, offsets: Vec<usize>) -> Self {
187 Self { data, offsets }
188 }
189
190 #[inline]
192 pub fn len(&self) -> usize {
193 self.offsets.len().saturating_sub(1)
194 }
195
196 #[inline]
198 pub fn is_empty(&self) -> bool {
199 self.len() == 0
200 }
201
202 #[inline]
211 pub fn bytes(&self) -> &Bytes {
212 &self.data
213 }
214
215 #[inline]
220 pub fn offsets(&self) -> &[usize] {
221 &self.offsets
222 }
223
224 pub fn row(&self, n: usize) -> Result<Bytes, AvroError> {
256 if n >= self.len() {
257 return Err(AvroError::General(format!(
258 "Row index {n} out of bounds for len {}",
259 self.len()
260 )));
261 }
262 let (start, end) = unsafe {
267 (
268 *self.offsets.get_unchecked(n),
269 *self.offsets.get_unchecked(n + 1),
270 )
271 };
272 if start > end || end > self.data.len() {
273 return Err(AvroError::General(format!(
274 "Invalid row offsets for row {n}: start={start}, end={end}, data_len={}",
275 self.data.len()
276 )));
277 }
278 Ok(self.data.slice(start..end))
279 }
280
281 #[inline]
311 pub fn iter(&self) -> impl ExactSizeIterator<Item = Bytes> + '_ {
312 self.offsets.windows(2).map(|w| self.data.slice(w[0]..w[1]))
313 }
314}
315
316#[derive(Debug, Clone)]
318pub struct WriterBuilder {
319 schema: Schema,
320 codec: Option<CompressionCodec>,
321 row_capacity: Option<usize>,
322 capacity: usize,
323 fingerprint_strategy: Option<FingerprintStrategy>,
324}
325
326impl WriterBuilder {
327 pub fn new(schema: Schema) -> Self {
334 Self {
335 schema,
336 codec: None,
337 row_capacity: None,
338 capacity: 1024,
339 fingerprint_strategy: None,
340 }
341 }
342
343 pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self {
346 self.fingerprint_strategy = Some(strategy);
347 self
348 }
349
350 pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self {
352 self.codec = codec;
353 self
354 }
355
356 pub fn with_capacity(mut self, capacity: usize) -> Self {
360 self.capacity = capacity;
361 self
362 }
363
364 pub fn with_row_capacity(mut self, capacity: usize) -> Self {
369 self.row_capacity = Some(capacity);
370 self
371 }
372
373 fn prepare_encoder<F: AvroFormat>(&self) -> Result<(Arc<Schema>, RecordEncoder), AvroError> {
374 let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
375 Some(json) => AvroSchema::new(json.clone()),
376 None => AvroSchema::try_from(&self.schema)?,
377 };
378 let maybe_fingerprint = if F::NEEDS_PREFIX {
379 match &self.fingerprint_strategy {
380 Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(*id)),
381 Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(*id)),
382 Some(strategy) => {
383 Some(avro_schema.fingerprint(FingerprintAlgorithm::from(*strategy))?)
384 }
385 None => Some(
386 avro_schema
387 .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?,
388 ),
389 }
390 } else {
391 None
392 };
393 let mut md = self.schema.metadata().clone();
394 md.insert(
395 SCHEMA_METADATA_KEY.to_string(),
396 avro_schema.clone().json_string,
397 );
398 let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
399 let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
400 let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
401 .with_fingerprint(maybe_fingerprint)
402 .build()?;
403 Ok((schema, encoder))
404 }
405
406 pub fn build_encoder<F: AvroFormat>(self) -> Result<Encoder, AvroError> {
411 if F::default().sync_marker().is_some() {
412 return Err(AvroError::InvalidArgument(
413 "Encoder only supports stream formats (no OCF header/sync marker)".to_string(),
414 ));
415 }
416 let (schema, encoder) = self.prepare_encoder::<F>()?;
417 Ok(Encoder {
418 schema,
419 encoder,
420 row_capacity: self.row_capacity,
421 buffer: BytesMut::with_capacity(self.capacity),
422 offsets: vec![0],
423 })
424 }
425
426 pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, AvroError>
428 where
429 W: Write,
430 F: AvroFormat,
431 {
432 let mut format = F::default();
433 if format.sync_marker().is_none() && !F::NEEDS_PREFIX {
434 return Err(AvroError::InvalidArgument(
435 "AvroBinaryFormat is only supported with Encoder, use build_encoder instead"
436 .to_string(),
437 ));
438 }
439 let (schema, encoder) = self.prepare_encoder::<F>()?;
440 format.start_stream(&mut writer, &schema, self.codec)?;
441 Ok(Writer {
442 writer,
443 schema,
444 format,
445 compression: self.codec,
446 capacity: self.capacity,
447 encoder,
448 })
449 }
450}
451
452#[derive(Debug)]
498pub struct Encoder {
499 schema: SchemaRef,
500 encoder: RecordEncoder,
501 row_capacity: Option<usize>,
502 buffer: BytesMut,
503 offsets: Vec<usize>,
504}
505
506impl Encoder {
507 pub fn encode(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
509 if batch.schema().fields() != self.schema.fields() {
510 return Err(AvroError::SchemaError(
511 "Schema of RecordBatch differs from Writer schema".to_string(),
512 ));
513 }
514 self.encoder.encode_rows(
515 batch,
516 self.row_capacity.unwrap_or(0),
517 &mut self.buffer,
518 &mut self.offsets,
519 )?;
520 Ok(())
521 }
522
523 pub fn encode_batches(&mut self, batches: &[RecordBatch]) -> Result<(), AvroError> {
525 for b in batches {
526 self.encode(b)?;
527 }
528 Ok(())
529 }
530
531 pub fn flush(&mut self) -> EncodedRows {
535 let data = self.buffer.split().freeze();
536 let mut offsets = Vec::with_capacity(self.offsets.len());
537 offsets.append(&mut self.offsets);
538 self.offsets.push(0);
539 EncodedRows::new(data, offsets)
540 }
541
542 pub fn schema(&self) -> SchemaRef {
547 self.schema.clone()
548 }
549
550 pub fn buffered_len(&self) -> usize {
552 self.offsets.len().saturating_sub(1)
553 }
554}
555
556#[derive(Debug)]
564pub struct Writer<W: Write, F: AvroFormat> {
565 writer: W,
566 schema: SchemaRef,
567 format: F,
568 compression: Option<CompressionCodec>,
569 capacity: usize,
570 encoder: RecordEncoder,
571}
572
573pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
614
615pub type AvroStreamWriter<W> = Writer<W, AvroSoeFormat>;
647
648impl<W: Write> Writer<W, AvroOcfFormat> {
649 pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
675 WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
676 }
677
678 pub fn sync_marker(&self) -> Option<&[u8; 16]> {
680 self.format.sync_marker()
681 }
682}
683
684impl<W: Write> Writer<W, AvroSoeFormat> {
685 pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
713 WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
714 }
715}
716
717impl<W: Write, F: AvroFormat> Writer<W, F> {
718 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
720 if batch.schema().fields() != self.schema.fields() {
721 return Err(AvroError::SchemaError(
722 "Schema of RecordBatch differs from Writer schema".to_string(),
723 ));
724 }
725 match self.format.sync_marker() {
726 Some(&sync) => self.write_ocf_block(batch, &sync),
727 None => self.write_stream(batch),
728 }
729 }
730
731 pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), AvroError> {
735 for b in batches {
736 self.write(b)?;
737 }
738 Ok(())
739 }
740
741 pub fn finish(&mut self) -> Result<(), AvroError> {
743 self.writer
744 .flush()
745 .map_err(|e| AvroError::IoError(format!("Error flushing writer: {e}"), e))
746 }
747
748 pub fn into_inner(self) -> W {
750 self.writer
751 }
752
753 fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), AvroError> {
754 let mut buf = Vec::<u8>::with_capacity(self.capacity);
755 self.encoder.encode(&mut buf, batch)?;
756 let encoded = match self.compression {
757 Some(codec) => codec.compress(&buf)?,
758 None => buf,
759 };
760 write_long(&mut self.writer, batch.num_rows() as i64)?;
761 write_long(&mut self.writer, encoded.len() as i64)?;
762 self.writer
763 .write_all(&encoded)
764 .map_err(|e| AvroError::IoError(format!("Error writing Avro block: {e}"), e))?;
765 self.writer
766 .write_all(sync)
767 .map_err(|e| AvroError::IoError(format!("Error writing Avro sync: {e}"), e))?;
768 Ok(())
769 }
770
771 fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
772 self.encoder.encode(&mut self.writer, batch)?;
773 Ok(())
774 }
775}
776
777#[cfg(test)]
778mod tests {
779 use super::*;
780 use crate::compression::CompressionCodec;
781 use crate::reader::ReaderBuilder;
782 use crate::schema::AVRO_NAME_METADATA_KEY;
783 use crate::schema::{AvroSchema, SchemaStore};
784 use crate::test_util::arrow_test_data;
785 use arrow::datatypes::TimeUnit;
786 use arrow::util::pretty::pretty_format_batches;
787 #[cfg(not(feature = "avro_custom_types"))]
788 use arrow_array::Float32Array;
789 #[cfg(feature = "avro_custom_types")]
790 use arrow_array::RunArray;
791 use arrow_array::builder::{Int32Builder, ListBuilder};
792 use arrow_array::cast::AsArray;
793 #[cfg(feature = "avro_custom_types")]
794 use arrow_array::types::{Int16Type, Int64Type};
795 use arrow_array::types::{
796 Int32Type, Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
797 TimestampMillisecondType, TimestampNanosecondType,
798 };
799 use arrow_array::{
800 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Float16Array,
801 Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray,
802 IntervalMonthDayNanoArray, IntervalYearMonthArray, PrimitiveArray, RecordBatch,
803 StringArray, StructArray, Time32MillisecondArray, Time32SecondArray,
804 Time64MicrosecondArray, Time64NanosecondArray, TimestampMillisecondArray,
805 TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array, UnionArray,
806 };
807 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
808 #[cfg(not(feature = "avro_custom_types"))]
809 use arrow_schema::{DataType, Field, Schema};
810 #[cfg(feature = "avro_custom_types")]
811 use arrow_schema::{DataType, Field, Schema};
812 use arrow_schema::{IntervalUnit, UnionMode};
813 use bytes::BytesMut;
814 use half::f16;
815 use serde_json::{Value, json};
816 use std::collections::HashMap;
817 use std::collections::HashSet;
818 use std::fs::File;
819 use std::io::{BufReader, Cursor};
820 use std::path::PathBuf;
821 use std::sync::Arc;
822 use tempfile::NamedTempFile;
823
824 fn files() -> impl Iterator<Item = &'static str> {
825 [
826 #[cfg(feature = "snappy")]
828 "avro/alltypes_plain.avro",
829 #[cfg(feature = "snappy")]
830 "avro/alltypes_plain.snappy.avro",
831 #[cfg(feature = "zstd")]
832 "avro/alltypes_plain.zstandard.avro",
833 #[cfg(feature = "bzip2")]
834 "avro/alltypes_plain.bzip2.avro",
835 #[cfg(feature = "xz")]
836 "avro/alltypes_plain.xz.avro",
837 ]
838 .into_iter()
839 }
840
841 fn make_schema() -> Schema {
842 Schema::new(vec![
843 Field::new("id", DataType::Int32, false),
844 Field::new("name", DataType::Binary, false),
845 ])
846 }
847
848 fn make_batch() -> RecordBatch {
849 let ids = Int32Array::from(vec![1, 2, 3]);
850 let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]);
851 RecordBatch::try_new(
852 Arc::new(make_schema()),
853 vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
854 )
855 .expect("failed to build test RecordBatch")
856 }
857
858 #[test]
859 fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), AvroError> {
860 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
861 let batch = RecordBatch::try_new(
862 Arc::new(schema.clone()),
863 vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
864 )?;
865 let buf: Vec<u8> = Vec::new();
866 let mut writer = AvroStreamWriter::new(buf, schema.clone())?;
867 writer.write(&batch)?;
868 let encoded = writer.into_inner();
869 let mut store = SchemaStore::new(); let avro_schema = AvroSchema::try_from(&schema)?;
871 let _fp = store.register(avro_schema)?;
872 let mut decoder = ReaderBuilder::new()
873 .with_writer_schema_store(store)
874 .build_decoder()?;
875 let _consumed = decoder.decode(&encoded)?;
876 let decoded = decoder
877 .flush()?
878 .expect("expected at least one batch from decoder");
879 assert_eq!(decoded.num_columns(), 1);
880 assert_eq!(decoded.num_rows(), 2);
881 let col = decoded.column(0).as_primitive::<Int32Type>();
882 assert_eq!(col, &Int32Array::from(vec![10, 20]));
883 Ok(())
884 }
885
886 #[test]
887 fn test_nullable_struct_with_nonnullable_field_sliced_encoding() {
888 use arrow_array::{ArrayRef, Int32Array, StringArray, StructArray};
889 use arrow_buffer::NullBuffer;
890 use arrow_schema::{DataType, Field, Fields, Schema};
891 use std::sync::Arc;
892 let inner_fields = Fields::from(vec![
893 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, true), ]);
896 let inner_struct_type = DataType::Struct(inner_fields.clone());
897 let schema = Schema::new(vec![
898 Field::new("before", inner_struct_type.clone(), true), Field::new("after", inner_struct_type.clone(), true), Field::new("op", DataType::Utf8, false), ]);
902 let before_ids = Int32Array::from(vec![None, None]);
903 let before_names = StringArray::from(vec![None::<&str>, None]);
904 let before_struct = StructArray::new(
905 inner_fields.clone(),
906 vec![
907 Arc::new(before_ids) as ArrayRef,
908 Arc::new(before_names) as ArrayRef,
909 ],
910 Some(NullBuffer::from(vec![false, false])),
911 );
912 let after_ids = Int32Array::from(vec![1, 2]); let after_names = StringArray::from(vec![Some("Alice"), Some("Bob")]);
914 let after_struct = StructArray::new(
915 inner_fields.clone(),
916 vec![
917 Arc::new(after_ids) as ArrayRef,
918 Arc::new(after_names) as ArrayRef,
919 ],
920 Some(NullBuffer::from(vec![true, true])),
921 );
922 let op_col = StringArray::from(vec!["r", "r"]);
923 let batch = RecordBatch::try_new(
924 Arc::new(schema.clone()),
925 vec![
926 Arc::new(before_struct) as ArrayRef,
927 Arc::new(after_struct) as ArrayRef,
928 Arc::new(op_col) as ArrayRef,
929 ],
930 )
931 .expect("failed to create test batch");
932 let mut sink = Vec::new();
933 let mut writer = WriterBuilder::new(schema)
934 .with_fingerprint_strategy(FingerprintStrategy::Id(1))
935 .build::<_, AvroSoeFormat>(&mut sink)
936 .expect("failed to create writer");
937 for row_idx in 0..batch.num_rows() {
938 let single_row = batch.slice(row_idx, 1);
939 let after_col = single_row.column(1);
940 assert_eq!(
941 after_col.null_count(),
942 0,
943 "after column should have no nulls in sliced row"
944 );
945 writer
946 .write(&single_row)
947 .unwrap_or_else(|e| panic!("Failed to encode row {row_idx}: {e}"));
948 }
949 writer.finish().expect("failed to finish writer");
950 assert!(!sink.is_empty(), "encoded output should not be empty");
951 }
952
953 #[test]
954 fn test_nullable_struct_with_decimal_and_timestamp_sliced() {
955 use arrow_array::{
956 ArrayRef, Decimal128Array, Int32Array, StringArray, StructArray,
957 TimestampMicrosecondArray,
958 };
959 use arrow_buffer::NullBuffer;
960 use arrow_schema::{DataType, Field, Fields, Schema};
961 use std::sync::Arc;
962 let row_fields = Fields::from(vec![
963 Field::new("id", DataType::Int32, false),
964 Field::new("name", DataType::Utf8, true),
965 Field::new("category", DataType::Utf8, true),
966 Field::new("price", DataType::Decimal128(10, 2), true),
967 Field::new("stock_quantity", DataType::Int32, true),
968 Field::new(
969 "created_at",
970 DataType::Timestamp(TimeUnit::Microsecond, None),
971 true,
972 ),
973 ]);
974 let row_struct_type = DataType::Struct(row_fields.clone());
975 let schema = Schema::new(vec![
976 Field::new("before", row_struct_type.clone(), true),
977 Field::new("after", row_struct_type.clone(), true),
978 Field::new("op", DataType::Utf8, false),
979 ]);
980 let before_struct = StructArray::new_null(row_fields.clone(), 2);
981 let ids = Int32Array::from(vec![1, 2]);
982 let names = StringArray::from(vec![Some("Widget"), Some("Gadget")]);
983 let categories = StringArray::from(vec![Some("Electronics"), Some("Electronics")]);
984 let prices = Decimal128Array::from(vec![Some(1999), Some(2999)])
985 .with_precision_and_scale(10, 2)
986 .unwrap();
987 let quantities = Int32Array::from(vec![Some(100), Some(50)]);
988 let timestamps = TimestampMicrosecondArray::from(vec![
989 Some(1700000000000000i64),
990 Some(1700000001000000i64),
991 ]);
992 let after_struct = StructArray::new(
993 row_fields.clone(),
994 vec![
995 Arc::new(ids) as ArrayRef,
996 Arc::new(names) as ArrayRef,
997 Arc::new(categories) as ArrayRef,
998 Arc::new(prices) as ArrayRef,
999 Arc::new(quantities) as ArrayRef,
1000 Arc::new(timestamps) as ArrayRef,
1001 ],
1002 Some(NullBuffer::from(vec![true, true])),
1003 );
1004 let op_col = StringArray::from(vec!["r", "r"]);
1005 let batch = RecordBatch::try_new(
1006 Arc::new(schema.clone()),
1007 vec![
1008 Arc::new(before_struct) as ArrayRef,
1009 Arc::new(after_struct) as ArrayRef,
1010 Arc::new(op_col) as ArrayRef,
1011 ],
1012 )
1013 .expect("failed to create products batch");
1014 let mut sink = Vec::new();
1015 let mut writer = WriterBuilder::new(schema)
1016 .with_fingerprint_strategy(FingerprintStrategy::Id(1))
1017 .build::<_, AvroSoeFormat>(&mut sink)
1018 .expect("failed to create writer");
1019 for row_idx in 0..batch.num_rows() {
1021 let single_row = batch.slice(row_idx, 1);
1022 writer
1023 .write(&single_row)
1024 .unwrap_or_else(|e| panic!("Failed to encode product row {row_idx}: {e}"));
1025 }
1026 writer.finish().expect("failed to finish writer");
1027 assert!(!sink.is_empty());
1028 }
1029
1030 #[test]
1031 fn non_nullable_child_in_nullable_struct_should_encode_per_row() {
1032 use arrow_array::{
1033 ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
1034 };
1035 use arrow_schema::{DataType, Field, Fields, Schema};
1036 use std::sync::Arc;
1037 let row_fields = Fields::from(vec![
1038 Field::new("id", DataType::Int32, false),
1039 Field::new("name", DataType::Utf8, true),
1040 ]);
1041 let row_struct_dt = DataType::Struct(row_fields.clone());
1042 let before: ArrayRef = Arc::new(StructArray::new_null(row_fields.clone(), 1));
1043 let id_col: ArrayRef = Arc::new(Int32Array::from(vec![1]));
1044 let name_col: ArrayRef = Arc::new(StringArray::from(vec![None::<&str>]));
1045 let after: ArrayRef = Arc::new(StructArray::new(
1046 row_fields.clone(),
1047 vec![id_col, name_col],
1048 None,
1049 ));
1050 let schema = Arc::new(Schema::new(vec![
1051 Field::new("before", row_struct_dt.clone(), true),
1052 Field::new("after", row_struct_dt, true),
1053 Field::new("op", DataType::Utf8, false),
1054 Field::new("ts_ms", DataType::Int64, false),
1055 ]));
1056 let op = Arc::new(StringArray::from(vec!["r"])) as ArrayRef;
1057 let ts_ms = Arc::new(Int64Array::from(vec![1732900000000_i64])) as ArrayRef;
1058 let batch = RecordBatch::try_new(schema.clone(), vec![before, after, op, ts_ms]).unwrap();
1059 let mut buf = Vec::new();
1060 let mut writer = WriterBuilder::new(schema.as_ref().clone())
1061 .build::<_, AvroSoeFormat>(&mut buf)
1062 .unwrap();
1063 let single = batch.slice(0, 1);
1064 let res = writer.write(&single);
1065 assert!(
1066 res.is_ok(),
1067 "expected to encode successfully, got: {:?}",
1068 res.err()
1069 );
1070 }
1071
1072 #[test]
1073 fn test_union_nonzero_type_ids() -> Result<(), AvroError> {
1074 use arrow_array::UnionArray;
1075 use arrow_buffer::Buffer;
1076 use arrow_schema::UnionFields;
1077 let union_fields = UnionFields::try_new(
1078 vec![2, 5],
1079 vec![
1080 Field::new("v_str", DataType::Utf8, true),
1081 Field::new("v_int", DataType::Int32, true),
1082 ],
1083 )
1084 .unwrap();
1085 let strings = StringArray::from(vec!["hello", "world"]);
1086 let ints = Int32Array::from(vec![10, 20, 30]);
1087 let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
1088 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
1089 let union_array = UnionArray::try_new(
1090 union_fields.clone(),
1091 type_ids.into(),
1092 Some(offsets.into()),
1093 vec![Arc::new(strings) as ArrayRef, Arc::new(ints) as ArrayRef],
1094 )?;
1095 let schema = Schema::new(vec![Field::new(
1096 "union_col",
1097 DataType::Union(union_fields, UnionMode::Dense),
1098 false,
1099 )]);
1100 let batch = RecordBatch::try_new(
1101 Arc::new(schema.clone()),
1102 vec![Arc::new(union_array) as ArrayRef],
1103 )?;
1104 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1105 assert!(
1106 writer.write(&batch).is_ok(),
1107 "Expected no error from writing"
1108 );
1109 writer.finish()?;
1110 assert!(
1111 writer.finish().is_ok(),
1112 "Expected no error from finishing writer"
1113 );
1114 Ok(())
1115 }
1116
1117 #[test]
1118 fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), AvroError> {
1119 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1120 let batch = RecordBatch::try_new(
1121 Arc::new(schema.clone()),
1122 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1123 )?;
1124 let schema_id: u32 = 42;
1125 let mut writer = WriterBuilder::new(schema.clone())
1126 .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
1127 .build::<_, AvroSoeFormat>(Vec::new())?;
1128 writer.write(&batch)?;
1129 let encoded = writer.into_inner();
1130 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
1131 let avro_schema = AvroSchema::try_from(&schema)?;
1132 let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?;
1133 let mut decoder = ReaderBuilder::new()
1134 .with_writer_schema_store(store)
1135 .build_decoder()?;
1136 let _ = decoder.decode(&encoded)?;
1137 let decoded = decoder
1138 .flush()?
1139 .expect("expected at least one batch from decoder");
1140 assert_eq!(decoded.num_columns(), 1);
1141 assert_eq!(decoded.num_rows(), 3);
1142 let col = decoded.column(0).as_primitive::<Int32Type>();
1143 assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
1144 Ok(())
1145 }
1146
1147 #[test]
1148 fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), AvroError> {
1149 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1150 let batch = RecordBatch::try_new(
1151 Arc::new(schema.clone()),
1152 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1153 )?;
1154 let schema_id: u64 = 42;
1155 let mut writer = WriterBuilder::new(schema.clone())
1156 .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
1157 .build::<_, AvroSoeFormat>(Vec::new())?;
1158 writer.write(&batch)?;
1159 let encoded = writer.into_inner();
1160 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
1161 let avro_schema = AvroSchema::try_from(&schema)?;
1162 let _ = store.set(Fingerprint::Id64(schema_id), avro_schema)?;
1163 let mut decoder = ReaderBuilder::new()
1164 .with_writer_schema_store(store)
1165 .build_decoder()?;
1166 let _ = decoder.decode(&encoded)?;
1167 let decoded = decoder
1168 .flush()?
1169 .expect("expected at least one batch from decoder");
1170 assert_eq!(decoded.num_columns(), 1);
1171 assert_eq!(decoded.num_rows(), 3);
1172 let col = decoded.column(0).as_primitive::<Int32Type>();
1173 assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
1174 Ok(())
1175 }
1176
1177 #[test]
1178 fn test_ocf_writer_generates_header_and_sync() -> Result<(), AvroError> {
1179 let batch = make_batch();
1180 let buffer: Vec<u8> = Vec::new();
1181 let mut writer = AvroWriter::new(buffer, make_schema())?;
1182 writer.write(&batch)?;
1183 writer.finish()?;
1184 let out = writer.into_inner();
1185 assert_eq!(&out[..4], b"Obj\x01", "OCF magic bytes missing/incorrect");
1186 let trailer = &out[out.len() - 16..];
1187 assert_eq!(trailer.len(), 16, "expected 16‑byte sync marker");
1188 Ok(())
1189 }
1190
1191 #[test]
1192 fn test_schema_mismatch_yields_error() {
1193 let batch = make_batch();
1194 let alt_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
1195 let buffer = Vec::<u8>::new();
1196 let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
1197 let err = writer.write(&batch).unwrap_err();
1198 assert!(matches!(err, AvroError::SchemaError(_)));
1199 }
1200
1201 #[test]
1202 fn test_write_batches_accumulates_multiple() -> Result<(), AvroError> {
1203 let batch1 = make_batch();
1204 let batch2 = make_batch();
1205 let buffer = Vec::<u8>::new();
1206 let mut writer = AvroWriter::new(buffer, make_schema())?;
1207 writer.write_batches(&[&batch1, &batch2])?;
1208 writer.finish()?;
1209 let out = writer.into_inner();
1210 assert!(out.len() > 4, "combined batches produced tiny file");
1211 Ok(())
1212 }
1213
1214 #[test]
1215 fn test_finish_without_write_adds_header() -> Result<(), AvroError> {
1216 let buffer = Vec::<u8>::new();
1217 let mut writer = AvroWriter::new(buffer, make_schema())?;
1218 writer.finish()?;
1219 let out = writer.into_inner();
1220 assert_eq!(&out[..4], b"Obj\x01", "finish() should emit OCF header");
1221 Ok(())
1222 }
1223
1224 #[test]
1225 fn test_write_long_encodes_zigzag_varint() -> Result<(), AvroError> {
1226 let mut buf = Vec::new();
1227 write_long(&mut buf, 0)?;
1228 write_long(&mut buf, -1)?;
1229 write_long(&mut buf, 1)?;
1230 write_long(&mut buf, -2)?;
1231 write_long(&mut buf, 2147483647)?;
1232 assert!(
1233 buf.starts_with(&[0x00, 0x01, 0x02, 0x03]),
1234 "zig‑zag varint encodings incorrect: {buf:?}"
1235 );
1236 Ok(())
1237 }
1238
1239 #[test]
1240 fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), AvroError> {
1241 for rel in files() {
1242 let path = arrow_test_data(rel);
1243 let rdr_file = File::open(&path).expect("open input avro");
1244 let reader = ReaderBuilder::new()
1245 .build(BufReader::new(rdr_file))
1246 .expect("build reader");
1247 let schema = reader.schema();
1248 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1249 let original =
1250 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1251 let tmp = NamedTempFile::new().expect("create temp file");
1252 let out_path = tmp.into_temp_path();
1253 let out_file = File::create(&out_path).expect("create temp avro");
1254 let codec = if rel.contains(".snappy.") {
1255 Some(CompressionCodec::Snappy)
1256 } else if rel.contains(".zstandard.") {
1257 Some(CompressionCodec::ZStandard)
1258 } else if rel.contains(".bzip2.") {
1259 Some(CompressionCodec::Bzip2)
1260 } else if rel.contains(".xz.") {
1261 Some(CompressionCodec::Xz)
1262 } else {
1263 None
1264 };
1265 let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
1266 .with_compression(codec)
1267 .build::<_, AvroOcfFormat>(out_file)?;
1268 writer.write(&original)?;
1269 writer.finish()?;
1270 drop(writer);
1271 let rt_file = File::open(&out_path).expect("open roundtrip avro");
1272 let rt_reader = ReaderBuilder::new()
1273 .build(BufReader::new(rt_file))
1274 .expect("build roundtrip reader");
1275 let rt_schema = rt_reader.schema();
1276 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1277 let roundtrip =
1278 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1279 assert_eq!(
1280 roundtrip, original,
1281 "Round-trip batch mismatch for file: {}",
1282 rel
1283 );
1284 }
1285 Ok(())
1286 }
1287
1288 #[test]
1289 fn test_roundtrip_nested_records_writer() -> Result<(), AvroError> {
1290 let path = arrow_test_data("avro/nested_records.avro");
1291 let rdr_file = File::open(&path).expect("open nested_records.avro");
1292 let reader = ReaderBuilder::new()
1293 .build(BufReader::new(rdr_file))
1294 .expect("build reader for nested_records.avro");
1295 let schema = reader.schema();
1296 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1297 let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
1298 let tmp = NamedTempFile::new().expect("create temp file");
1299 let out_path = tmp.into_temp_path();
1300 {
1301 let out_file = File::create(&out_path).expect("create output avro");
1302 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1303 writer.write(&original)?;
1304 writer.finish()?;
1305 }
1306 let rt_file = File::open(&out_path).expect("open round_trip avro");
1307 let rt_reader = ReaderBuilder::new()
1308 .build(BufReader::new(rt_file))
1309 .expect("build round_trip reader");
1310 let rt_schema = rt_reader.schema();
1311 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1312 let round_trip =
1313 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1314 assert_eq!(
1315 round_trip, original,
1316 "Round-trip batch mismatch for nested_records.avro"
1317 );
1318 Ok(())
1319 }
1320
1321 #[test]
1322 #[cfg(feature = "snappy")]
1323 fn test_roundtrip_nested_lists_writer() -> Result<(), AvroError> {
1324 let path = arrow_test_data("avro/nested_lists.snappy.avro");
1325 let rdr_file = File::open(&path).expect("open nested_lists.snappy.avro");
1326 let reader = ReaderBuilder::new()
1327 .build(BufReader::new(rdr_file))
1328 .expect("build reader for nested_lists.snappy.avro");
1329 let schema = reader.schema();
1330 let batches = reader.collect::<Result<Vec<_>, _>>()?;
1331 let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
1332 let tmp = NamedTempFile::new().expect("create temp file");
1333 let out_path = tmp.into_temp_path();
1334 {
1335 let out_file = File::create(&out_path).expect("create output avro");
1336 let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
1337 .with_compression(Some(CompressionCodec::Snappy))
1338 .build::<_, AvroOcfFormat>(out_file)?;
1339 writer.write(&original)?;
1340 writer.finish()?;
1341 }
1342 let rt_file = File::open(&out_path).expect("open round_trip avro");
1343 let rt_reader = ReaderBuilder::new()
1344 .build(BufReader::new(rt_file))
1345 .expect("build round_trip reader");
1346 let rt_schema = rt_reader.schema();
1347 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1348 let round_trip =
1349 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1350 assert_eq!(
1351 round_trip, original,
1352 "Round-trip batch mismatch for nested_lists.snappy.avro"
1353 );
1354 Ok(())
1355 }
1356
1357 #[test]
1358 fn test_round_trip_simple_fixed_ocf() -> Result<(), AvroError> {
1359 let path = arrow_test_data("avro/simple_fixed.avro");
1360 let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
1361 let reader = ReaderBuilder::new()
1362 .build(BufReader::new(rdr_file))
1363 .expect("build avro reader");
1364 let schema = reader.schema();
1365 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1366 let original =
1367 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1368 let tmp = NamedTempFile::new().expect("create temp file");
1369 let out_file = File::create(tmp.path()).expect("create temp avro");
1370 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1371 writer.write(&original)?;
1372 writer.finish()?;
1373 drop(writer);
1374 let rt_file = File::open(tmp.path()).expect("open round_trip avro");
1375 let rt_reader = ReaderBuilder::new()
1376 .build(BufReader::new(rt_file))
1377 .expect("build round_trip reader");
1378 let rt_schema = rt_reader.schema();
1379 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1380 let round_trip =
1381 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1382 assert_eq!(round_trip, original);
1383 Ok(())
1384 }
1385
1386 #[test]
1388 #[cfg(feature = "canonical_extension_types")]
1389 fn test_round_trip_duration_and_uuid_ocf() -> Result<(), AvroError> {
1390 use arrow_schema::{DataType, IntervalUnit};
1391 let in_file =
1392 File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1393 let reader = ReaderBuilder::new()
1394 .build(BufReader::new(in_file))
1395 .expect("build reader for duration_uuid.avro");
1396 let in_schema = reader.schema();
1397 let has_mdn = in_schema.fields().iter().any(|f| {
1398 matches!(
1399 f.data_type(),
1400 DataType::Interval(IntervalUnit::MonthDayNano)
1401 )
1402 });
1403 assert!(
1404 has_mdn,
1405 "expected at least one Interval(MonthDayNano) field in duration_uuid.avro"
1406 );
1407 let has_uuid_fixed = in_schema
1408 .fields()
1409 .iter()
1410 .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16)));
1411 assert!(
1412 has_uuid_fixed,
1413 "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro"
1414 );
1415 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1416 let input =
1417 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1418 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1420 writer.write(&input)?;
1421 writer.finish()?;
1422 let bytes = writer.into_inner();
1423 let rt_reader = ReaderBuilder::new()
1424 .build(Cursor::new(bytes))
1425 .expect("build round_trip reader");
1426 let rt_schema = rt_reader.schema();
1427 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1428 let round_trip =
1429 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1430 assert_eq!(round_trip, input);
1431 Ok(())
1432 }
1433
1434 #[test]
1436 #[cfg(not(feature = "canonical_extension_types"))]
1437 fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> Result<(), AvroError> {
1438 use arrow::datatypes::{DataType, IntervalUnit};
1439 use std::io::BufReader;
1440
1441 let in_file =
1443 File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1444 let reader = ReaderBuilder::new()
1445 .build(BufReader::new(in_file))
1446 .expect("build reader for duration_uuid.avro");
1447 let in_schema = reader.schema();
1448
1449 assert!(
1451 in_schema.fields().iter().any(|f| {
1452 matches!(
1453 f.data_type(),
1454 DataType::Interval(IntervalUnit::MonthDayNano)
1455 )
1456 }),
1457 "expected at least one Interval(MonthDayNano) field"
1458 );
1459 assert!(
1460 in_schema
1461 .fields()
1462 .iter()
1463 .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))),
1464 "expected a FixedSizeBinary(16) field (uuid)"
1465 );
1466
1467 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1468 let input =
1469 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1470
1471 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1473 writer.write(&input)?;
1474 writer.finish()?;
1475 let bytes = writer.into_inner();
1476 let rt_reader = ReaderBuilder::new()
1477 .build(Cursor::new(bytes))
1478 .expect("build round_trip reader");
1479 let rt_schema = rt_reader.schema();
1480 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1481 let round_trip =
1482 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1483
1484 assert_eq!(
1486 round_trip.column(0),
1487 input.column(0),
1488 "duration column values differ"
1489 );
1490 assert_eq!(round_trip.column(1), input.column(1), "uuid bytes differ");
1491
1492 let uuid_rt = rt_schema.field_with_name("uuid_field")?;
1495 assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
1496 assert_eq!(
1497 uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
1498 Some("uuid"),
1499 "expected `logicalType = \"uuid\"` on round-tripped field metadata"
1500 );
1501
1502 let dur_rt = rt_schema.field_with_name("duration_field")?;
1504 assert!(matches!(
1505 dur_rt.data_type(),
1506 DataType::Interval(IntervalUnit::MonthDayNano)
1507 ));
1508
1509 Ok(())
1510 }
1511
1512 #[test]
1516 #[cfg(feature = "snappy")]
1518 fn test_nonnullable_impala_roundtrip_writer() -> Result<(), AvroError> {
1519 let path = arrow_test_data("avro/nonnullable.impala.avro");
1521 let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro");
1522 let reader = ReaderBuilder::new()
1523 .build(BufReader::new(rdr_file))
1524 .expect("build reader for nonnullable.impala.avro");
1525 let in_schema = reader.schema();
1527 let has_map = in_schema
1529 .fields()
1530 .iter()
1531 .any(|f| matches!(f.data_type(), DataType::Map(_, _)));
1532 assert!(
1533 has_map,
1534 "expected at least one Map field in avro/nonnullable.impala.avro"
1535 );
1536
1537 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1538 let original =
1539 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1540 let buffer = Vec::<u8>::new();
1542 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1543 writer.write(&original)?;
1544 writer.finish()?;
1545 let out_bytes = writer.into_inner();
1546 let rt_reader = ReaderBuilder::new()
1548 .build(Cursor::new(out_bytes))
1549 .expect("build reader for round-tripped in-memory OCF");
1550 let rt_schema = rt_reader.schema();
1551 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1552 let roundtrip =
1553 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1554 assert_eq!(
1556 roundtrip, original,
1557 "Round-trip Avro map data mismatch for nonnullable.impala.avro"
1558 );
1559 Ok(())
1560 }
1561
1562 #[test]
1563 #[cfg(feature = "snappy")]
1565 fn test_roundtrip_decimals_via_writer() -> Result<(), AvroError> {
1566 let files: [(&str, bool); 8] = [
1568 ("avro/fixed_length_decimal.avro", true), ("avro/fixed_length_decimal_legacy.avro", true), ("avro/int32_decimal.avro", true), ("avro/int64_decimal.avro", true), ("test/data/int256_decimal.avro", false), ("test/data/fixed256_decimal.avro", false), ("test/data/fixed_length_decimal_legacy_32.avro", false), ("test/data/int128_decimal.avro", false), ];
1577 for (rel, in_test_data_dir) in files {
1578 let path: String = if in_test_data_dir {
1580 arrow_test_data(rel)
1581 } else {
1582 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1583 .join(rel)
1584 .to_string_lossy()
1585 .into_owned()
1586 };
1587 let f_in = File::open(&path).expect("open input avro");
1589 let rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
1590 let in_schema = rdr.schema();
1591 let in_batches = rdr.collect::<Result<Vec<_>, _>>()?;
1592 let original =
1593 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
1594 let tmp = NamedTempFile::new().expect("create temp file");
1596 let out_path = tmp.into_temp_path();
1597 let out_file = File::create(&out_path).expect("create temp avro");
1598 let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1599 writer.write(&original)?;
1600 writer.finish()?;
1601 let f_rt = File::open(&out_path).expect("open roundtrip avro");
1603 let rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
1604 let rt_schema = rt_rdr.schema();
1605 let rt_batches = rt_rdr.collect::<Result<Vec<_>, _>>()?;
1606 let roundtrip =
1607 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt");
1608 assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}");
1609 }
1610 Ok(())
1611 }
1612
1613 #[test]
1614 fn test_named_types_complex_roundtrip() -> Result<(), AvroError> {
1615 let path =
1617 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
1618 let rdr_file = File::open(&path).expect("open avro/named_types_complex.avro");
1619
1620 let reader = ReaderBuilder::new()
1621 .build(BufReader::new(rdr_file))
1622 .expect("build reader for named_types_complex.avro");
1623
1624 let in_schema = reader.schema();
1626 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1627 let original =
1628 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1629
1630 {
1632 let arrow_schema = original.schema();
1633
1634 let author_field = arrow_schema.field_with_name("author")?;
1636 let author_type = author_field.data_type();
1637 let editors_field = arrow_schema.field_with_name("editors")?;
1638 let editors_item_type = match editors_field.data_type() {
1639 DataType::List(item_field) => item_field.data_type(),
1640 other => panic!("Editors field should be a List, but was {:?}", other),
1641 };
1642 assert_eq!(
1643 author_type, editors_item_type,
1644 "The DataType for the 'author' struct and the 'editors' list items must be identical"
1645 );
1646
1647 let status_field = arrow_schema.field_with_name("status")?;
1649 let status_type = status_field.data_type();
1650 assert!(
1651 matches!(status_type, DataType::Dictionary(_, _)),
1652 "Status field should be a Dictionary (Enum)"
1653 );
1654
1655 let prev_status_field = arrow_schema.field_with_name("previous_status")?;
1656 let prev_status_type = prev_status_field.data_type();
1657 assert_eq!(
1658 status_type, prev_status_type,
1659 "The DataType for 'status' and 'previous_status' enums must be identical"
1660 );
1661
1662 let content_hash_field = arrow_schema.field_with_name("content_hash")?;
1664 let content_hash_type = content_hash_field.data_type();
1665 assert!(
1666 matches!(content_hash_type, DataType::FixedSizeBinary(16)),
1667 "Content hash should be FixedSizeBinary(16)"
1668 );
1669
1670 let thumb_hash_field = arrow_schema.field_with_name("thumbnail_hash")?;
1671 let thumb_hash_type = thumb_hash_field.data_type();
1672 assert_eq!(
1673 content_hash_type, thumb_hash_type,
1674 "The DataType for 'content_hash' and 'thumbnail_hash' fixed types must be identical"
1675 );
1676 }
1677
1678 let buffer: Vec<u8> = Vec::new();
1680 let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1681 writer.write(&original)?;
1682 writer.finish()?;
1683 let bytes = writer.into_inner();
1684
1685 let rt_reader = ReaderBuilder::new()
1687 .build(Cursor::new(bytes))
1688 .expect("build reader for round-trip");
1689 let rt_schema = rt_reader.schema();
1690 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1691 let roundtrip =
1692 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1693
1694 assert_eq!(
1695 roundtrip, original,
1696 "Avro complex named types round-trip mismatch"
1697 );
1698
1699 Ok(())
1700 }
1701
1702 fn assert_schema_is_semantically_equivalent(expected: &Schema, actual: &Schema) {
1709 assert_metadata_is_superset(expected.metadata(), actual.metadata(), "Schema");
1711
1712 assert_eq!(
1714 expected.fields().len(),
1715 actual.fields().len(),
1716 "Schema must have the same number of fields"
1717 );
1718
1719 for (expected_field, actual_field) in expected.fields().iter().zip(actual.fields().iter()) {
1720 assert_field_is_semantically_equivalent(expected_field, actual_field);
1721 }
1722 }
1723
1724 fn assert_field_is_semantically_equivalent(expected: &Field, actual: &Field) {
1725 let context = format!("Field '{}'", expected.name());
1726
1727 assert_eq!(
1728 expected.name(),
1729 actual.name(),
1730 "{context}: names must match"
1731 );
1732 assert_eq!(
1733 expected.is_nullable(),
1734 actual.is_nullable(),
1735 "{context}: nullability must match"
1736 );
1737
1738 assert_datatype_is_semantically_equivalent(
1740 expected.data_type(),
1741 actual.data_type(),
1742 &context,
1743 );
1744
1745 assert_metadata_is_superset(expected.metadata(), actual.metadata(), &context);
1747 }
1748
1749 fn assert_datatype_is_semantically_equivalent(
1750 expected: &DataType,
1751 actual: &DataType,
1752 context: &str,
1753 ) {
1754 match (expected, actual) {
1755 (DataType::List(expected_field), DataType::List(actual_field))
1756 | (DataType::LargeList(expected_field), DataType::LargeList(actual_field))
1757 | (DataType::Map(expected_field, _), DataType::Map(actual_field, _)) => {
1758 assert_field_is_semantically_equivalent(expected_field, actual_field);
1759 }
1760 (DataType::Struct(expected_fields), DataType::Struct(actual_fields)) => {
1761 assert_eq!(
1762 expected_fields.len(),
1763 actual_fields.len(),
1764 "{context}: struct must have same number of fields"
1765 );
1766 for (ef, af) in expected_fields.iter().zip(actual_fields.iter()) {
1767 assert_field_is_semantically_equivalent(ef, af);
1768 }
1769 }
1770 (
1771 DataType::Union(expected_fields, expected_mode),
1772 DataType::Union(actual_fields, actual_mode),
1773 ) => {
1774 assert_eq!(
1775 expected_mode, actual_mode,
1776 "{context}: union mode must match"
1777 );
1778 assert_eq!(
1779 expected_fields.len(),
1780 actual_fields.len(),
1781 "{context}: union must have same number of variants"
1782 );
1783 for ((exp_id, exp_field), (act_id, act_field)) in
1784 expected_fields.iter().zip(actual_fields.iter())
1785 {
1786 assert_eq!(exp_id, act_id, "{context}: union type ids must match");
1787 assert_field_is_semantically_equivalent(exp_field, act_field);
1788 }
1789 }
1790 _ => {
1791 assert_eq!(expected, actual, "{context}: data types must be identical");
1792 }
1793 }
1794 }
1795
1796 fn assert_batch_data_is_identical(expected: &RecordBatch, actual: &RecordBatch) {
1797 assert_eq!(
1798 expected.num_columns(),
1799 actual.num_columns(),
1800 "RecordBatches must have the same number of columns"
1801 );
1802 assert_eq!(
1803 expected.num_rows(),
1804 actual.num_rows(),
1805 "RecordBatches must have the same number of rows"
1806 );
1807
1808 for i in 0..expected.num_columns() {
1809 let context = format!("Column {i}");
1810 let expected_col = expected.column(i);
1811 let actual_col = actual.column(i);
1812 assert_array_data_is_identical(expected_col, actual_col, &context);
1813 }
1814 }
1815
1816 fn assert_array_data_is_identical(expected: &dyn Array, actual: &dyn Array, context: &str) {
1818 assert_eq!(
1819 expected.nulls(),
1820 actual.nulls(),
1821 "{context}: null buffers must match"
1822 );
1823 assert_eq!(
1824 expected.len(),
1825 actual.len(),
1826 "{context}: array lengths must match"
1827 );
1828
1829 match (expected.data_type(), actual.data_type()) {
1830 (DataType::Union(expected_fields, _), DataType::Union(..)) => {
1831 let expected_union = expected.as_any().downcast_ref::<UnionArray>().unwrap();
1832 let actual_union = actual.as_any().downcast_ref::<UnionArray>().unwrap();
1833
1834 assert_eq!(
1836 &expected.to_data().buffers()[0],
1837 &actual.to_data().buffers()[0],
1838 "{context}: union type_ids buffer mismatch"
1839 );
1840
1841 if expected.to_data().buffers().len() > 1 {
1843 assert_eq!(
1844 &expected.to_data().buffers()[1],
1845 &actual.to_data().buffers()[1],
1846 "{context}: union value_offsets buffer mismatch"
1847 );
1848 }
1849
1850 for (type_id, _) in expected_fields.iter() {
1852 let child_context = format!("{context} -> child variant {type_id}");
1853 assert_array_data_is_identical(
1854 expected_union.child(type_id),
1855 actual_union.child(type_id),
1856 &child_context,
1857 );
1858 }
1859 }
1860 (DataType::Struct(_), DataType::Struct(_)) => {
1861 let expected_struct = expected.as_any().downcast_ref::<StructArray>().unwrap();
1862 let actual_struct = actual.as_any().downcast_ref::<StructArray>().unwrap();
1863 for i in 0..expected_struct.num_columns() {
1864 let child_context = format!("{context} -> struct child {i}");
1865 assert_array_data_is_identical(
1866 expected_struct.column(i),
1867 actual_struct.column(i),
1868 &child_context,
1869 );
1870 }
1871 }
1872 _ => {
1874 assert_eq!(
1875 expected.to_data().buffers(),
1876 actual.to_data().buffers(),
1877 "{context}: data buffers must match"
1878 );
1879 }
1880 }
1881 }
1882
1883 fn assert_metadata_is_superset(
1886 expected_meta: &HashMap<String, String>,
1887 actual_meta: &HashMap<String, String>,
1888 context: &str,
1889 ) {
1890 let allowed_additions: HashSet<&str> =
1891 vec!["arrowUnionMode", "arrowUnionTypeIds", "avro.name"]
1892 .into_iter()
1893 .collect();
1894 for (key, expected_value) in expected_meta {
1895 match actual_meta.get(key) {
1896 Some(actual_value) => assert_eq!(
1897 expected_value, actual_value,
1898 "{context}: preserved metadata for key '{key}' must have the same value"
1899 ),
1900 None => panic!("{context}: metadata key '{key}' was lost during roundtrip"),
1901 }
1902 }
1903 for key in actual_meta.keys() {
1904 if !expected_meta.contains_key(key) && !allowed_additions.contains(key.as_str()) {
1905 panic!("{context}: unexpected metadata key '{key}' was added during roundtrip");
1906 }
1907 }
1908 }
1909
1910 #[test]
1911 fn test_union_roundtrip() -> Result<(), AvroError> {
1912 let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1913 .join("test/data/union_fields.avro")
1914 .to_string_lossy()
1915 .into_owned();
1916 let rdr_file = File::open(&file_path).expect("open avro/union_fields.avro");
1917 let reader = ReaderBuilder::new()
1918 .build(BufReader::new(rdr_file))
1919 .expect("build reader for union_fields.avro");
1920 let schema = reader.schema();
1921 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1922 let original =
1923 arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1924 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1925 writer.write(&original)?;
1926 writer.finish()?;
1927 let bytes = writer.into_inner();
1928 let rt_reader = ReaderBuilder::new()
1929 .build(Cursor::new(bytes))
1930 .expect("build round_trip reader");
1931 let rt_schema = rt_reader.schema();
1932 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1933 let round_trip =
1934 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1935
1936 assert_schema_is_semantically_equivalent(&original.schema(), &round_trip.schema());
1939
1940 assert_batch_data_is_identical(&original, &round_trip);
1941 Ok(())
1942 }
1943
1944 #[test]
1945 fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), AvroError> {
1946 let path = arrow_test_data("avro/simple_enum.avro");
1948 let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
1949 let reader = ReaderBuilder::new()
1950 .build(BufReader::new(rdr_file))
1951 .expect("build reader for simple_enum.avro");
1952 let in_schema = reader.schema();
1954 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1955 let original =
1956 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1957 let has_enum_dict = in_schema.fields().iter().any(|f| {
1959 matches!(
1960 f.data_type(),
1961 DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8
1962 )
1963 });
1964 assert!(
1965 has_enum_dict,
1966 "Expected at least one enum-mapped Dictionary<Int32, Utf8> field"
1967 );
1968 let buffer: Vec<u8> = Vec::new();
1971 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1972 writer.write(&original)?;
1973 writer.finish()?;
1974 let bytes = writer.into_inner();
1975 let rt_reader = ReaderBuilder::new()
1977 .build(Cursor::new(bytes))
1978 .expect("reader for round-trip");
1979 let rt_schema = rt_reader.schema();
1980 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1981 let roundtrip =
1982 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1983 assert_eq!(roundtrip, original, "Avro enum round-trip mismatch");
1984 Ok(())
1985 }
1986
1987 #[test]
1988 fn test_builder_propagates_capacity_to_writer() -> Result<(), AvroError> {
1989 let cap = 64 * 1024;
1990 let buffer = Vec::<u8>::new();
1991 let mut writer = WriterBuilder::new(make_schema())
1992 .with_capacity(cap)
1993 .build::<_, AvroOcfFormat>(buffer)?;
1994 assert_eq!(writer.capacity, cap, "builder capacity not propagated");
1995 let batch = make_batch();
1996 writer.write(&batch)?;
1997 writer.finish()?;
1998 let out = writer.into_inner();
1999 assert_eq!(&out[..4], b"Obj\x01", "OCF magic missing/incorrect");
2000 Ok(())
2001 }
2002
2003 #[test]
2004 fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), AvroError> {
2005 use arrow_array::{ArrayRef, Int32Array};
2006 use arrow_schema::{DataType, Field, Schema};
2007 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2008 let batch = RecordBatch::try_new(
2009 Arc::new(schema.clone()),
2010 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
2011 )?;
2012 let cap = 8192;
2013 let mut writer = WriterBuilder::new(schema)
2014 .with_capacity(cap)
2015 .build::<_, AvroSoeFormat>(Vec::new())?;
2016 assert_eq!(writer.capacity, cap);
2017 writer.write(&batch)?;
2018 let _bytes = writer.into_inner();
2019 Ok(())
2020 }
2021
2022 #[cfg(feature = "avro_custom_types")]
2023 #[test]
2024 fn test_roundtrip_duration_logical_types_ocf() -> Result<(), AvroError> {
2025 let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2026 .join("test/data/duration_logical_types.avro")
2027 .to_string_lossy()
2028 .into_owned();
2029
2030 let in_file = File::open(&file_path)
2031 .unwrap_or_else(|_| panic!("Failed to open test file: {}", file_path));
2032
2033 let reader = ReaderBuilder::new()
2034 .build(BufReader::new(in_file))
2035 .expect("build reader for duration_logical_types.avro");
2036 let in_schema = reader.schema();
2037
2038 let expected_units: HashSet<TimeUnit> = [
2039 TimeUnit::Nanosecond,
2040 TimeUnit::Microsecond,
2041 TimeUnit::Millisecond,
2042 TimeUnit::Second,
2043 ]
2044 .into_iter()
2045 .collect();
2046
2047 let found_units: HashSet<TimeUnit> = in_schema
2048 .fields()
2049 .iter()
2050 .filter_map(|f| match f.data_type() {
2051 DataType::Duration(unit) => Some(*unit),
2052 _ => None,
2053 })
2054 .collect();
2055
2056 assert_eq!(
2057 found_units, expected_units,
2058 "Expected to find all four Duration TimeUnits in the schema from the initial read"
2059 );
2060
2061 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2062 let input =
2063 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2064
2065 let tmp = NamedTempFile::new().expect("create temp file");
2066 {
2067 let out_file = File::create(tmp.path()).expect("create temp avro");
2068 let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?;
2069 writer.write(&input)?;
2070 writer.finish()?;
2071 }
2072
2073 let rt_file = File::open(tmp.path()).expect("open round_trip avro");
2074 let rt_reader = ReaderBuilder::new()
2075 .build(BufReader::new(rt_file))
2076 .expect("build round_trip reader");
2077 let rt_schema = rt_reader.schema();
2078 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2079 let round_trip =
2080 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2081
2082 assert_eq!(round_trip, input);
2083 Ok(())
2084 }
2085
2086 #[cfg(feature = "avro_custom_types")]
2087 #[test]
2088 fn test_run_end_encoded_roundtrip_writer() -> Result<(), AvroError> {
2089 let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
2090 let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2091 let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
2092 let field = Field::new("x", ree.data_type().clone(), true);
2093 let schema = Schema::new(vec![field]);
2094 let batch = RecordBatch::try_new(
2095 Arc::new(schema.clone()),
2096 vec![Arc::new(ree.clone()) as ArrayRef],
2097 )?;
2098 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2099 writer.write(&batch)?;
2100 writer.finish()?;
2101 let bytes = writer.into_inner();
2102 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2103 let out_schema = reader.schema();
2104 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2105 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2106 assert_eq!(out.num_columns(), 1);
2107 assert_eq!(out.num_rows(), 8);
2108 match out.schema().field(0).data_type() {
2109 DataType::RunEndEncoded(run_ends_field, values_field) => {
2110 assert_eq!(run_ends_field.name(), "run_ends");
2111 assert_eq!(run_ends_field.data_type(), &DataType::Int32);
2112 assert_eq!(values_field.name(), "values");
2113 assert_eq!(values_field.data_type(), &DataType::Int32);
2114 assert!(values_field.is_nullable());
2115 let got_ree = out
2116 .column(0)
2117 .as_any()
2118 .downcast_ref::<RunArray<Int32Type>>()
2119 .expect("RunArray<Int32Type>");
2120 assert_eq!(got_ree, &ree);
2121 }
2122 other => panic!(
2123 "Unexpected DataType for round-tripped RunEndEncoded column: {:?}",
2124 other
2125 ),
2126 }
2127 Ok(())
2128 }
2129
2130 #[cfg(feature = "avro_custom_types")]
2131 #[test]
2132 fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> Result<(), AvroError>
2133 {
2134 let run_ends = Int16Array::from(vec![2, 5, 7]); let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
2136 let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
2137 let field = Field::new("s", ree.data_type().clone(), true);
2138 let schema = Schema::new(vec![field]);
2139 let batch = RecordBatch::try_new(
2140 Arc::new(schema.clone()),
2141 vec![Arc::new(ree.clone()) as ArrayRef],
2142 )?;
2143 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2144 writer.write(&batch)?;
2145 writer.finish()?;
2146 let bytes = writer.into_inner();
2147 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2148 let out_schema = reader.schema();
2149 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2150 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2151 assert_eq!(out.num_columns(), 1);
2152 assert_eq!(out.num_rows(), 7);
2153 match out.schema().field(0).data_type() {
2154 DataType::RunEndEncoded(run_ends_field, values_field) => {
2155 assert_eq!(run_ends_field.data_type(), &DataType::Int16);
2156 assert_eq!(values_field.data_type(), &DataType::Utf8);
2157 assert!(
2158 values_field.is_nullable(),
2159 "REE 'values' child should be nullable"
2160 );
2161 let got = out
2162 .column(0)
2163 .as_any()
2164 .downcast_ref::<RunArray<Int16Type>>()
2165 .expect("RunArray<Int16Type>");
2166 assert_eq!(got, &ree);
2167 }
2168 other => panic!("Unexpected DataType: {:?}", other),
2169 }
2170 Ok(())
2171 }
2172
2173 #[cfg(feature = "avro_custom_types")]
2174 #[test]
2175 fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer() -> Result<(), AvroError>
2176 {
2177 let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
2178 let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2179 let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
2180 let field = Field::new("y", ree.data_type().clone(), true);
2181 let schema = Schema::new(vec![field]);
2182 let batch = RecordBatch::try_new(
2183 Arc::new(schema.clone()),
2184 vec![Arc::new(ree.clone()) as ArrayRef],
2185 )?;
2186 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2187 writer.write(&batch)?;
2188 writer.finish()?;
2189 let bytes = writer.into_inner();
2190 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2191 let out_schema = reader.schema();
2192 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2193 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2194 assert_eq!(out.num_columns(), 1);
2195 assert_eq!(out.num_rows(), 8);
2196 match out.schema().field(0).data_type() {
2197 DataType::RunEndEncoded(run_ends_field, values_field) => {
2198 assert_eq!(run_ends_field.data_type(), &DataType::Int64);
2199 assert_eq!(values_field.data_type(), &DataType::Int32);
2200 assert!(values_field.is_nullable());
2201 let got = out
2202 .column(0)
2203 .as_any()
2204 .downcast_ref::<RunArray<Int64Type>>()
2205 .expect("RunArray<Int64Type>");
2206 assert_eq!(got, &ree);
2207 }
2208 other => panic!("Unexpected DataType for REE column: {:?}", other),
2209 }
2210 Ok(())
2211 }
2212
2213 #[cfg(feature = "avro_custom_types")]
2214 #[test]
2215 fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), AvroError> {
2216 let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
2217 let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2218 let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
2219 let offset = 1usize;
2220 let length = 6usize;
2221 let base_values = base.values().as_primitive::<Int32Type>();
2222 let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
2223 for i in offset..offset + length {
2224 let phys = base.get_physical_index(i);
2225 let v = if base_values.is_null(phys) {
2226 None
2227 } else {
2228 Some(base_values.value(phys))
2229 };
2230 logical_window.push(v);
2231 }
2232
2233 fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array, Int32Array) {
2234 if vals.is_empty() {
2235 return (Int32Array::new_null(0), Int32Array::new_null(0));
2236 }
2237 let mut run_ends_out: Vec<i32> = Vec::new();
2238 let mut run_vals_out: Vec<Option<i32>> = Vec::new();
2239 let mut cur = vals[0];
2240 let mut len = 1i32;
2241 for v in &vals[1..] {
2242 if *v == cur {
2243 len += 1;
2244 } else {
2245 let last_end = run_ends_out.last().copied().unwrap_or(0);
2246 run_ends_out.push(last_end + len);
2247 run_vals_out.push(cur);
2248 cur = *v;
2249 len = 1;
2250 }
2251 }
2252 let last_end = run_ends_out.last().copied().unwrap_or(0);
2253 run_ends_out.push(last_end + len);
2254 run_vals_out.push(cur);
2255 (
2256 Int32Array::from(run_ends_out),
2257 Int32Array::from(run_vals_out),
2258 )
2259 }
2260 let (owned_run_ends, owned_run_values) = compress_run_ends_i32(&logical_window);
2261 let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends, &owned_run_values)?;
2262 let field = Field::new("x", owned_slice.data_type().clone(), true);
2263 let schema = Schema::new(vec![field]);
2264 let batch = RecordBatch::try_new(
2265 Arc::new(schema.clone()),
2266 vec![Arc::new(owned_slice.clone()) as ArrayRef],
2267 )?;
2268 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2269 writer.write(&batch)?;
2270 writer.finish()?;
2271 let bytes = writer.into_inner();
2272 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2273 let out_schema = reader.schema();
2274 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2275 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2276 assert_eq!(out.num_columns(), 1);
2277 assert_eq!(out.num_rows(), length);
2278 match out.schema().field(0).data_type() {
2279 DataType::RunEndEncoded(run_ends_field, values_field) => {
2280 assert_eq!(run_ends_field.data_type(), &DataType::Int32);
2281 assert_eq!(values_field.data_type(), &DataType::Int32);
2282 assert!(values_field.is_nullable());
2283 let got = out
2284 .column(0)
2285 .as_any()
2286 .downcast_ref::<RunArray<Int32Type>>()
2287 .expect("RunArray<Int32Type>");
2288 fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
2289 let vals = a.values().as_primitive::<Int32Type>();
2290 let mut out: Vec<Option<i32>> = Vec::with_capacity(a.len());
2291 for i in 0..a.len() {
2292 let phys = a.get_physical_index(i);
2293 out.push(if vals.is_null(phys) {
2294 None
2295 } else {
2296 Some(vals.value(phys))
2297 });
2298 }
2299 Int32Array::from(out)
2300 }
2301 let got_logical = expand_ree_to_int32(got);
2302 let expected_logical = Int32Array::from(logical_window);
2303 assert_eq!(
2304 got_logical, expected_logical,
2305 "Logical values differ after REE slice round-trip"
2306 );
2307 }
2308 other => panic!("Unexpected DataType for REE column: {:?}", other),
2309 }
2310 Ok(())
2311 }
2312
2313 #[cfg(not(feature = "avro_custom_types"))]
2314 #[test]
2315 fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), AvroError> {
2316 use arrow_schema::{DataType, Field, Schema};
2317 let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
2318 let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2319 let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2320 &run_ends,
2321 &run_values,
2322 )?;
2323 let field = Field::new("x", ree.data_type().clone(), true);
2324 let schema = Schema::new(vec![field]);
2325 let batch =
2326 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2327 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2328 writer.write(&batch)?;
2329 writer.finish()?;
2330 let bytes = writer.into_inner();
2331 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2332 let out_schema = reader.schema();
2333 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2334 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2335 assert_eq!(out.num_columns(), 1);
2336 assert_eq!(out.num_rows(), 8);
2337 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2338 let got = out.column(0).as_primitive::<Int32Type>();
2339 let expected = Int32Array::from(vec![
2340 Some(1),
2341 Some(1),
2342 Some(1),
2343 Some(2),
2344 Some(2),
2345 None,
2346 None,
2347 Some(3),
2348 ]);
2349 assert_eq!(got, &expected);
2350 Ok(())
2351 }
2352
2353 #[cfg(not(feature = "avro_custom_types"))]
2354 #[test]
2355 fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
2356 -> Result<(), AvroError> {
2357 use arrow_schema::{DataType, Field, Schema};
2358 let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
2359 let run_values = arrow_array::StringArray::from(vec![Some("a"), None, Some("c")]);
2360 let ree = arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
2361 &run_ends,
2362 &run_values,
2363 )?;
2364 let field = Field::new("s", ree.data_type().clone(), true);
2365 let schema = Schema::new(vec![field]);
2366 let batch =
2367 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2368 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2369 writer.write(&batch)?;
2370 writer.finish()?;
2371 let bytes = writer.into_inner();
2372 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2373 let out_schema = reader.schema();
2374 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2375 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2376 assert_eq!(out.num_columns(), 1);
2377 assert_eq!(out.num_rows(), 7);
2378 assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
2379 let got = out
2380 .column(0)
2381 .as_any()
2382 .downcast_ref::<arrow_array::StringArray>()
2383 .expect("StringArray");
2384 let expected = arrow_array::StringArray::from(vec![
2385 Some("a"),
2386 Some("a"),
2387 None,
2388 None,
2389 None,
2390 Some("c"),
2391 Some("c"),
2392 ]);
2393 assert_eq!(got, &expected);
2394 Ok(())
2395 }
2396
2397 #[cfg(not(feature = "avro_custom_types"))]
2398 #[test]
2399 fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
2400 -> Result<(), AvroError> {
2401 use arrow_schema::{DataType, Field, Schema};
2402 let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
2403 let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2404 let ree = arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
2405 &run_ends,
2406 &run_values,
2407 )?;
2408 let field = Field::new("y", ree.data_type().clone(), true);
2409 let schema = Schema::new(vec![field]);
2410 let batch =
2411 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2412 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2413 writer.write(&batch)?;
2414 writer.finish()?;
2415 let bytes = writer.into_inner();
2416 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2417 let out_schema = reader.schema();
2418 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2419 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2420 assert_eq!(out.num_columns(), 1);
2421 assert_eq!(out.num_rows(), 8);
2422 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2423 let got = out.column(0).as_primitive::<Int32Type>();
2424 let expected = Int32Array::from(vec![
2425 Some(999),
2426 Some(999),
2427 Some(999),
2428 Some(999),
2429 Some(-5),
2430 Some(-5),
2431 Some(-5),
2432 Some(-5),
2433 ]);
2434 assert_eq!(got, &expected);
2435 Ok(())
2436 }
2437
2438 #[cfg(not(feature = "avro_custom_types"))]
2439 #[test]
2440 fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> Result<(), AvroError> {
2441 use arrow_schema::{DataType, Field, Schema};
2442 let run_ends = Int32Array::from(vec![2, 4, 6]);
2443 let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
2444 let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2445 &run_ends,
2446 &run_values,
2447 )?;
2448 let field = Field::new("x", ree.data_type().clone(), true);
2449 let schema = Schema::new(vec![field]);
2450 let batch =
2451 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2452 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2453 writer.write(&batch)?;
2454 writer.finish()?;
2455 let bytes = writer.into_inner();
2456 let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2457 let out_schema = reader.schema();
2458 let batches = reader.collect::<Result<Vec<_>, _>>()?;
2459 let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2460 assert_eq!(out.num_columns(), 1);
2461 assert_eq!(out.num_rows(), 6);
2462 assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2463 let got = out.column(0).as_primitive::<Int32Type>();
2464 let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), Some(2), None, None]);
2465 assert_eq!(got, &expected);
2466 Ok(())
2467 }
2468
2469 #[test]
2470 #[cfg(feature = "snappy")]
2472 fn test_nullable_impala_roundtrip() -> Result<(), AvroError> {
2473 let path = arrow_test_data("avro/nullable.impala.avro");
2474 let rdr_file = File::open(&path).expect("open avro/nullable.impala.avro");
2475 let reader = ReaderBuilder::new()
2476 .build(BufReader::new(rdr_file))
2477 .expect("build reader for nullable.impala.avro");
2478 let in_schema = reader.schema();
2479 assert!(
2480 in_schema.fields().iter().any(|f| f.is_nullable()),
2481 "expected at least one nullable field in avro/nullable.impala.avro"
2482 );
2483 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2484 let original =
2485 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2486 let buffer: Vec<u8> = Vec::new();
2487 let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
2488 writer.write(&original)?;
2489 writer.finish()?;
2490 let out_bytes = writer.into_inner();
2491 let rt_reader = ReaderBuilder::new()
2492 .build(Cursor::new(out_bytes))
2493 .expect("build reader for round-tripped in-memory OCF");
2494 let rt_schema = rt_reader.schema();
2495 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2496 let roundtrip =
2497 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2498 assert_eq!(
2499 roundtrip, original,
2500 "Round-trip Avro data mismatch for nullable.impala.avro"
2501 );
2502 Ok(())
2503 }
2504
2505 #[test]
2506 #[cfg(feature = "snappy")]
2507 fn test_datapage_v2_roundtrip() -> Result<(), AvroError> {
2508 let path = arrow_test_data("avro/datapage_v2.snappy.avro");
2509 let rdr_file = File::open(&path).expect("open avro/datapage_v2.snappy.avro");
2510 let reader = ReaderBuilder::new()
2511 .build(BufReader::new(rdr_file))
2512 .expect("build reader for datapage_v2.snappy.avro");
2513 let in_schema = reader.schema();
2514 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2515 let original =
2516 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2517 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2518 writer.write(&original)?;
2519 writer.finish()?;
2520 let bytes = writer.into_inner();
2521 let rt_reader = ReaderBuilder::new()
2522 .build(Cursor::new(bytes))
2523 .expect("build round-trip reader");
2524 let rt_schema = rt_reader.schema();
2525 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2526 let round_trip =
2527 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2528 assert_eq!(
2529 round_trip, original,
2530 "Round-trip batch mismatch for datapage_v2.snappy.avro"
2531 );
2532 Ok(())
2533 }
2534
2535 #[test]
2536 #[cfg(feature = "snappy")]
2537 fn test_single_nan_roundtrip() -> Result<(), AvroError> {
2538 let path = arrow_test_data("avro/single_nan.avro");
2539 let in_file = File::open(&path).expect("open avro/single_nan.avro");
2540 let reader = ReaderBuilder::new()
2541 .build(BufReader::new(in_file))
2542 .expect("build reader for single_nan.avro");
2543 let in_schema = reader.schema();
2544 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2545 let original =
2546 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2547 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2548 writer.write(&original)?;
2549 writer.finish()?;
2550 let bytes = writer.into_inner();
2551 let rt_reader = ReaderBuilder::new()
2552 .build(Cursor::new(bytes))
2553 .expect("build round_trip reader");
2554 let rt_schema = rt_reader.schema();
2555 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2556 let round_trip =
2557 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2558 assert_eq!(
2559 round_trip, original,
2560 "Round-trip batch mismatch for avro/single_nan.avro"
2561 );
2562 Ok(())
2563 }
2564 #[test]
2565 #[cfg(feature = "snappy")]
2567 fn test_dict_pages_offset_zero_roundtrip() -> Result<(), AvroError> {
2568 let path = arrow_test_data("avro/dict-page-offset-zero.avro");
2569 let rdr_file = File::open(&path).expect("open avro/dict-page-offset-zero.avro");
2570 let reader = ReaderBuilder::new()
2571 .build(BufReader::new(rdr_file))
2572 .expect("build reader for dict-page-offset-zero.avro");
2573 let in_schema = reader.schema();
2574 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2575 let original =
2576 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2577 let buffer: Vec<u8> = Vec::new();
2578 let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
2579 writer.write(&original)?;
2580 writer.finish()?;
2581 let bytes = writer.into_inner();
2582 let rt_reader = ReaderBuilder::new()
2583 .build(Cursor::new(bytes))
2584 .expect("build reader for round-trip");
2585 let rt_schema = rt_reader.schema();
2586 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2587 let roundtrip =
2588 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2589 assert_eq!(
2590 roundtrip, original,
2591 "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
2592 );
2593 Ok(())
2594 }
2595
2596 #[test]
2597 #[cfg(feature = "snappy")]
2598 fn test_repeated_no_annotation_roundtrip() -> Result<(), AvroError> {
2599 let path = arrow_test_data("avro/repeated_no_annotation.avro");
2600 let in_file = File::open(&path).expect("open avro/repeated_no_annotation.avro");
2601 let reader = ReaderBuilder::new()
2602 .build(BufReader::new(in_file))
2603 .expect("build reader for repeated_no_annotation.avro");
2604 let in_schema = reader.schema();
2605 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2606 let original =
2607 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2608 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2609 writer.write(&original)?;
2610 writer.finish()?;
2611 let bytes = writer.into_inner();
2612 let rt_reader = ReaderBuilder::new()
2613 .build(Cursor::new(bytes))
2614 .expect("build reader for round-trip buffer");
2615 let rt_schema = rt_reader.schema();
2616 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2617 let round_trip =
2618 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round-trip");
2619 assert_eq!(
2620 round_trip, original,
2621 "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
2622 );
2623 Ok(())
2624 }
2625
2626 #[test]
2627 fn test_nested_record_type_reuse_roundtrip() -> Result<(), AvroError> {
2628 let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2629 .join("test/data/nested_record_reuse.avro")
2630 .to_string_lossy()
2631 .into_owned();
2632 let in_file = File::open(&path).expect("open avro/nested_record_reuse.avro");
2633 let reader = ReaderBuilder::new()
2634 .build(BufReader::new(in_file))
2635 .expect("build reader for nested_record_reuse.avro");
2636 let in_schema = reader.schema();
2637 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2638 let input =
2639 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2640 let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2641 writer.write(&input)?;
2642 writer.finish()?;
2643 let bytes = writer.into_inner();
2644 let rt_reader = ReaderBuilder::new()
2645 .build(Cursor::new(bytes))
2646 .expect("build round_trip reader");
2647 let rt_schema = rt_reader.schema();
2648 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2649 let round_trip =
2650 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2651 assert_eq!(
2652 round_trip, input,
2653 "Round-trip batch mismatch for nested_record_reuse.avro"
2654 );
2655 Ok(())
2656 }
2657
2658 #[test]
2659 fn test_enum_type_reuse_roundtrip() -> Result<(), AvroError> {
2660 let path =
2661 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
2662 let rdr_file = std::fs::File::open(&path).expect("open test/data/enum_reuse.avro");
2663 let reader = ReaderBuilder::new()
2664 .build(std::io::BufReader::new(rdr_file))
2665 .expect("build reader for enum_reuse.avro");
2666 let in_schema = reader.schema();
2667 let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2668 let original =
2669 arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2670 let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2671 writer.write(&original)?;
2672 writer.finish()?;
2673 let bytes = writer.into_inner();
2674 let rt_reader = ReaderBuilder::new()
2675 .build(std::io::Cursor::new(bytes))
2676 .expect("build round_trip reader");
2677 let rt_schema = rt_reader.schema();
2678 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2679 let round_trip =
2680 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2681 assert_eq!(
2682 round_trip, original,
2683 "Avro enum type reuse round-trip mismatch"
2684 );
2685 Ok(())
2686 }
2687
2688 #[test]
2689 fn comprehensive_e2e_test_roundtrip() -> Result<(), AvroError> {
2690 let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2691 .join("test/data/comprehensive_e2e.avro");
2692 let rdr_file = File::open(&path).expect("open test/data/comprehensive_e2e.avro");
2693 let reader = ReaderBuilder::new()
2694 .build(BufReader::new(rdr_file))
2695 .expect("build reader for comprehensive_e2e.avro");
2696 let in_schema = reader.schema();
2697 let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2698 let original =
2699 arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2700 let sink: Vec<u8> = Vec::new();
2701 let mut writer = AvroWriter::new(sink, original.schema().as_ref().clone())?;
2702 writer.write(&original)?;
2703 writer.finish()?;
2704 let bytes = writer.into_inner();
2705 let rt_reader = ReaderBuilder::new()
2706 .build(Cursor::new(bytes))
2707 .expect("build round-trip reader");
2708 let rt_schema = rt_reader.schema();
2709 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2710 let roundtrip =
2711 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2712 assert_eq!(
2713 roundtrip, original,
2714 "Round-trip batch mismatch for comprehensive_e2e.avro"
2715 );
2716 Ok(())
2717 }
2718
2719 #[test]
2720 fn test_roundtrip_new_time_encoders_writer() -> Result<(), AvroError> {
2721 let schema = Schema::new(vec![
2722 Field::new("d32", DataType::Date32, false),
2723 Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), false),
2724 Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), false),
2725 Field::new(
2726 "ts_ms",
2727 DataType::Timestamp(TimeUnit::Millisecond, None),
2728 false,
2729 ),
2730 Field::new(
2731 "ts_us",
2732 DataType::Timestamp(TimeUnit::Microsecond, None),
2733 false,
2734 ),
2735 Field::new(
2736 "ts_ns",
2737 DataType::Timestamp(TimeUnit::Nanosecond, None),
2738 false,
2739 ),
2740 ]);
2741 let d32 = Date32Array::from(vec![0, 1, -1]);
2742 let t32_ms: PrimitiveArray<Time32MillisecondType> =
2743 vec![0_i32, 12_345_i32, 86_399_999_i32].into();
2744 let t64_us: PrimitiveArray<Time64MicrosecondType> =
2745 vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
2746 let ts_ms: PrimitiveArray<TimestampMillisecondType> =
2747 vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
2748 let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2749 let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2750 let batch = RecordBatch::try_new(
2751 Arc::new(schema.clone()),
2752 vec![
2753 Arc::new(d32) as ArrayRef,
2754 Arc::new(t32_ms) as ArrayRef,
2755 Arc::new(t64_us) as ArrayRef,
2756 Arc::new(ts_ms) as ArrayRef,
2757 Arc::new(ts_us) as ArrayRef,
2758 Arc::new(ts_ns) as ArrayRef,
2759 ],
2760 )?;
2761 let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2762 writer.write(&batch)?;
2763 writer.finish()?;
2764 let bytes = writer.into_inner();
2765 let rt_reader = ReaderBuilder::new()
2766 .build(std::io::Cursor::new(bytes))
2767 .expect("build reader for round-trip of new time encoders");
2768 let rt_schema = rt_reader.schema();
2769 let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2770 let roundtrip =
2771 arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2772 assert_eq!(roundtrip, batch);
2773 Ok(())
2774 }
2775
2776 fn make_encoder_schema() -> Schema {
2777 Schema::new(vec![
2778 Field::new("a", DataType::Int32, false),
2779 Field::new("b", DataType::Int32, false),
2780 ])
2781 }
2782
2783 fn make_encoder_batch(schema: &Schema) -> RecordBatch {
2784 let a = Int32Array::from(vec![1, 2, 3]);
2785 let b = Int32Array::from(vec![10, 20, 30]);
2786 RecordBatch::try_new(
2787 Arc::new(schema.clone()),
2788 vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2789 )
2790 .expect("failed to build test RecordBatch")
2791 }
2792
2793 fn make_real_avro_schema_and_batch() -> Result<(Schema, RecordBatch, AvroSchema), AvroError> {
2794 let avro_json = r#"
2795 {
2796 "type": "record",
2797 "name": "User",
2798 "fields": [
2799 { "name": "id", "type": "long" },
2800 { "name": "name", "type": "string" },
2801 { "name": "active", "type": "boolean" },
2802 { "name": "tags", "type": { "type": "array", "items": "int" } },
2803 { "name": "opt", "type": ["null", "string"], "default": null }
2804 ]
2805 }"#;
2806 let avro_schema = AvroSchema::new(avro_json.to_string());
2807 let mut md = HashMap::new();
2808 md.insert(
2809 SCHEMA_METADATA_KEY.to_string(),
2810 avro_schema.json_string.clone(),
2811 );
2812 let item_field = Arc::new(Field::new(
2813 Field::LIST_FIELD_DEFAULT_NAME,
2814 DataType::Int32,
2815 false,
2816 ));
2817 let schema = Schema::new_with_metadata(
2818 vec![
2819 Field::new("id", DataType::Int64, false),
2820 Field::new("name", DataType::Utf8, false),
2821 Field::new("active", DataType::Boolean, false),
2822 Field::new("tags", DataType::List(item_field.clone()), false),
2823 Field::new("opt", DataType::Utf8, true),
2824 ],
2825 md,
2826 );
2827 let id = Int64Array::from(vec![1, 2, 3]);
2828 let name = StringArray::from(vec!["alice", "bob", "carol"]);
2829 let active = BooleanArray::from(vec![true, false, true]);
2830 let mut tags_builder = ListBuilder::new(Int32Builder::new()).with_field(item_field);
2831 tags_builder.values().append_value(1);
2832 tags_builder.values().append_value(2);
2833 tags_builder.append(true);
2834 tags_builder.append(true);
2835 tags_builder.values().append_value(3);
2836 tags_builder.append(true);
2837 let tags = tags_builder.finish();
2838 let opt = StringArray::from(vec![Some("x"), None, Some("z")]);
2839 let batch = RecordBatch::try_new(
2840 Arc::new(schema.clone()),
2841 vec![
2842 Arc::new(id) as ArrayRef,
2843 Arc::new(name) as ArrayRef,
2844 Arc::new(active) as ArrayRef,
2845 Arc::new(tags) as ArrayRef,
2846 Arc::new(opt) as ArrayRef,
2847 ],
2848 )?;
2849 Ok((schema, batch, avro_schema))
2850 }
2851
2852 #[test]
2853 fn test_row_writer_matches_stream_writer_soe() -> Result<(), AvroError> {
2854 let schema = make_encoder_schema();
2855 let batch = make_encoder_batch(&schema);
2856 let mut stream = AvroStreamWriter::new(Vec::<u8>::new(), schema.clone())?;
2857 stream.write(&batch)?;
2858 stream.finish()?;
2859 let stream_bytes = stream.into_inner();
2860 let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2861 row_writer.encode(&batch)?;
2862 let rows = row_writer.flush();
2863 let row_bytes: Vec<u8> = rows.bytes().to_vec();
2864 assert_eq!(stream_bytes, row_bytes);
2865 Ok(())
2866 }
2867
2868 #[test]
2869 fn test_row_writer_flush_clears_buffer() -> Result<(), AvroError> {
2870 let schema = make_encoder_schema();
2871 let batch = make_encoder_batch(&schema);
2872 let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2873 row_writer.encode(&batch)?;
2874 assert_eq!(row_writer.buffered_len(), batch.num_rows());
2875 let out1 = row_writer.flush();
2876 assert_eq!(out1.len(), batch.num_rows());
2877 assert_eq!(row_writer.buffered_len(), 0);
2878 let out2 = row_writer.flush();
2879 assert_eq!(out2.len(), 0);
2880 Ok(())
2881 }
2882
2883 #[test]
2884 fn test_row_writer_roundtrip_decoder_soe_real_avro_data() -> Result<(), AvroError> {
2885 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2886 let mut store = SchemaStore::new();
2887 store.register(avro_schema.clone())?;
2888 let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2889 row_writer.encode(&batch)?;
2890 let rows = row_writer.flush();
2891 let mut decoder = ReaderBuilder::new()
2892 .with_writer_schema_store(store)
2893 .with_batch_size(1024)
2894 .build_decoder()?;
2895 for row in rows.iter() {
2896 let consumed = decoder.decode(row.as_ref())?;
2897 assert_eq!(
2898 consumed,
2899 row.len(),
2900 "decoder should consume the full row frame"
2901 );
2902 }
2903 let out = decoder.flush()?.expect("decoded batch");
2904 let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2905 let actual = pretty_format_batches(&[out])?.to_string();
2906 assert_eq!(expected, actual);
2907 Ok(())
2908 }
2909
2910 #[test]
2911 fn test_row_writer_roundtrip_decoder_soe_streaming_chunks() -> Result<(), AvroError> {
2912 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2913 let mut store = SchemaStore::new();
2914 store.register(avro_schema.clone())?;
2915 let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2916 row_writer.encode(&batch)?;
2917 let rows = row_writer.flush();
2918 let mut stream: Vec<u8> = Vec::new();
2920 let mut boundaries: Vec<usize> = Vec::with_capacity(rows.len() + 1);
2921 boundaries.push(0usize);
2922 for row in rows.iter() {
2923 stream.extend_from_slice(row.as_ref());
2924 boundaries.push(stream.len());
2925 }
2926 let mut decoder = ReaderBuilder::new()
2927 .with_writer_schema_store(store)
2928 .with_batch_size(1024)
2929 .build_decoder()?;
2930 let mut buffered = BytesMut::new();
2931 let chunk_rows = [1usize, 2, 3, 1, 4, 2];
2932 let mut row_idx = 0usize;
2933 let mut i = 0usize;
2934 let n_rows = rows.len();
2935 while row_idx < n_rows {
2936 let take = chunk_rows[i % chunk_rows.len()];
2937 i += 1;
2938 let end_row = (row_idx + take).min(n_rows);
2939 let byte_start = boundaries[row_idx];
2940 let byte_end = boundaries[end_row];
2941 buffered.extend_from_slice(&stream[byte_start..byte_end]);
2942 loop {
2943 let consumed = decoder.decode(&buffered)?;
2944 if consumed == 0 {
2945 break;
2946 }
2947 let _ = buffered.split_to(consumed);
2948 }
2949 assert!(
2950 buffered.is_empty(),
2951 "expected decoder to consume the entire frame-aligned chunk"
2952 );
2953 row_idx = end_row;
2954 }
2955 let out = decoder.flush()?.expect("decoded batch");
2956 let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2957 let actual = pretty_format_batches(&[out])?.to_string();
2958 assert_eq!(expected, actual);
2959 Ok(())
2960 }
2961
2962 #[test]
2963 fn test_row_writer_roundtrip_decoder_confluent_wire_format_id() -> Result<(), AvroError> {
2964 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2965 let schema_id: u32 = 42;
2966 let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
2967 store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;
2968 let mut row_writer = WriterBuilder::new(schema)
2969 .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
2970 .build_encoder::<AvroSoeFormat>()?;
2971 row_writer.encode(&batch)?;
2972 let rows = row_writer.flush();
2973 let mut decoder = ReaderBuilder::new()
2974 .with_writer_schema_store(store)
2975 .with_batch_size(1024)
2976 .build_decoder()?;
2977 for row in rows.iter() {
2978 let consumed = decoder.decode(row.as_ref())?;
2979 assert_eq!(consumed, row.len());
2980 }
2981 let out = decoder.flush()?.expect("decoded batch");
2982 let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2983 let actual = pretty_format_batches(&[out])?.to_string();
2984 assert_eq!(expected, actual);
2985 Ok(())
2986 }
2987 #[test]
2988 fn test_encoder_encode_batches_flush_and_encoded_rows_methods_with_avro_binary_format()
2989 -> Result<(), AvroError> {
2990 use crate::writer::format::AvroBinaryFormat;
2991 use arrow_array::{ArrayRef, Int32Array, RecordBatch};
2992 use arrow_schema::{DataType, Field, Schema};
2993 use std::sync::Arc;
2994 let schema = Schema::new(vec![
2995 Field::new("a", DataType::Int32, false),
2996 Field::new("b", DataType::Int32, false),
2997 ]);
2998 let schema_ref = Arc::new(schema.clone());
2999 let batch1 = RecordBatch::try_new(
3000 schema_ref.clone(),
3001 vec![
3002 Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
3003 Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef,
3004 ],
3005 )?;
3006 let batch2 = RecordBatch::try_new(
3007 schema_ref,
3008 vec![
3009 Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef,
3010 Arc::new(Int32Array::from(vec![40, 50])) as ArrayRef,
3011 ],
3012 )?;
3013 let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
3014 let empty = Encoder::flush(&mut encoder);
3015 assert_eq!(EncodedRows::len(&empty), 0);
3016 assert!(EncodedRows::is_empty(&empty));
3017 assert_eq!(EncodedRows::bytes(&empty).as_ref(), &[] as &[u8]);
3018 assert_eq!(EncodedRows::offsets(&empty), &[0usize]);
3019 assert_eq!(EncodedRows::iter(&empty).count(), 0);
3020 let empty_vecs: Vec<Vec<u8>> = empty.iter().map(|b| b.to_vec()).collect();
3021 assert!(empty_vecs.is_empty());
3022 let batches = vec![batch1, batch2];
3023 Encoder::encode_batches(&mut encoder, &batches)?;
3024 assert_eq!(encoder.buffered_len(), 5);
3025 let rows = Encoder::flush(&mut encoder);
3026 assert_eq!(
3027 encoder.buffered_len(),
3028 0,
3029 "Encoder::flush should reset the internal offsets"
3030 );
3031 assert_eq!(EncodedRows::len(&rows), 5);
3032 assert!(!EncodedRows::is_empty(&rows));
3033 let expected_offsets: &[usize] = &[0, 2, 4, 6, 8, 10];
3034 assert_eq!(EncodedRows::offsets(&rows), expected_offsets);
3035 let expected_rows: Vec<Vec<u8>> = vec![
3036 vec![2, 20],
3037 vec![4, 40],
3038 vec![6, 60],
3039 vec![8, 80],
3040 vec![10, 100],
3041 ];
3042 let expected_stream: Vec<u8> = expected_rows.concat();
3043 assert_eq!(
3044 EncodedRows::bytes(&rows).as_ref(),
3045 expected_stream.as_slice()
3046 );
3047 for (i, expected) in expected_rows.iter().enumerate() {
3048 assert_eq!(EncodedRows::row(&rows, i)?.as_ref(), expected.as_slice());
3049 }
3050 let iter_rows: Vec<Vec<u8>> = EncodedRows::iter(&rows).map(|b| b.to_vec()).collect();
3051 assert_eq!(iter_rows, expected_rows);
3052 let recreated = EncodedRows::new(
3053 EncodedRows::bytes(&rows).clone(),
3054 EncodedRows::offsets(&rows).to_vec(),
3055 );
3056 assert_eq!(EncodedRows::len(&recreated), EncodedRows::len(&rows));
3057 assert_eq!(EncodedRows::bytes(&recreated), EncodedRows::bytes(&rows));
3058 assert_eq!(
3059 EncodedRows::offsets(&recreated),
3060 EncodedRows::offsets(&rows)
3061 );
3062 let rec_vecs: Vec<Vec<u8>> = recreated.iter().map(|b| b.to_vec()).collect();
3063 assert_eq!(rec_vecs, iter_rows);
3064 let empty_again = Encoder::flush(&mut encoder);
3065 assert!(EncodedRows::is_empty(&empty_again));
3066 Ok(())
3067 }
3068
3069 #[test]
3070 fn test_writer_builder_build_rejects_avro_binary_format() {
3071 use crate::writer::format::AvroBinaryFormat;
3072 use arrow_schema::{DataType, Field, Schema};
3073 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3074 let err = WriterBuilder::new(schema)
3075 .build::<_, AvroBinaryFormat>(Vec::<u8>::new())
3076 .unwrap_err();
3077 match err {
3078 AvroError::InvalidArgument(msg) => assert_eq!(
3079 msg,
3080 "AvroBinaryFormat is only supported with Encoder, use build_encoder instead"
3081 ),
3082 other => panic!("expected InvalidArgumentError, got {:?}", other),
3083 }
3084 }
3085 #[test]
3086 fn test_row_encoder_avro_binary_format_roundtrip_decoder_with_soe_framing()
3087 -> Result<(), AvroError> {
3088 use crate::writer::format::AvroBinaryFormat;
3089 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
3090 let batches: Vec<RecordBatch> = vec![batch.clone(), batch.slice(1, 2)];
3091 let expected = arrow::compute::concat_batches(&batch.schema(), &batches)?;
3092 let mut binary_encoder =
3093 WriterBuilder::new(schema.clone()).build_encoder::<AvroBinaryFormat>()?;
3094 binary_encoder.encode_batches(&batches)?;
3095 let binary_rows = binary_encoder.flush();
3096 assert_eq!(
3097 binary_rows.len(),
3098 expected.num_rows(),
3099 "binary encoder row count mismatch"
3100 );
3101 let mut soe_encoder = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
3102 soe_encoder.encode_batches(&batches)?;
3103 let soe_rows = soe_encoder.flush();
3104 assert_eq!(
3105 soe_rows.len(),
3106 binary_rows.len(),
3107 "SOE vs binary row count mismatch"
3108 );
3109 let mut store = SchemaStore::new(); let fp = store.register(avro_schema)?;
3111 let fp_le_bytes = match fp {
3112 Fingerprint::Rabin(v) => v.to_le_bytes(),
3113 other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"),
3114 };
3115 const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
3116 const SOE_PREFIX_LEN: usize = 2 + 8;
3117 for i in 0..binary_rows.len() {
3118 let body = binary_rows.row(i)?;
3119 let soe = soe_rows.row(i)?;
3120 assert!(
3121 soe.len() >= SOE_PREFIX_LEN,
3122 "expected SOE row to include prefix"
3123 );
3124 assert_eq!(&soe.as_ref()[..2], &SOE_MAGIC);
3125 assert_eq!(&soe.as_ref()[2..SOE_PREFIX_LEN], &fp_le_bytes);
3126 assert_eq!(
3127 &soe.as_ref()[SOE_PREFIX_LEN..],
3128 body.as_ref(),
3129 "SOE body bytes differ from AvroBinaryFormat body bytes (row {i})"
3130 );
3131 }
3132 let mut decoder = ReaderBuilder::new()
3133 .with_writer_schema_store(store)
3134 .with_batch_size(1024)
3135 .build_decoder()?;
3136 for body in binary_rows.iter() {
3137 let mut framed = Vec::with_capacity(SOE_PREFIX_LEN + body.len());
3138 framed.extend_from_slice(&SOE_MAGIC);
3139 framed.extend_from_slice(&fp_le_bytes);
3140 framed.extend_from_slice(body.as_ref());
3141 let consumed = decoder.decode(&framed)?;
3142 assert_eq!(
3143 consumed,
3144 framed.len(),
3145 "decoder should consume the full SOE-framed message"
3146 );
3147 }
3148 let out = decoder.flush()?.expect("expected a decoded RecordBatch");
3149 let expected_str = pretty_format_batches(&[expected])?.to_string();
3150 let actual_str = pretty_format_batches(&[out])?.to_string();
3151 assert_eq!(expected_str, actual_str);
3152 Ok(())
3153 }
3154
3155 #[test]
3156 fn test_row_encoder_avro_binary_format_roundtrip_decoder_streaming_chunks()
3157 -> Result<(), AvroError> {
3158 use crate::writer::format::AvroBinaryFormat;
3159 let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
3160 let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
3161 encoder.encode(&batch)?;
3162 let rows = encoder.flush();
3163 let mut store = SchemaStore::new();
3164 let fp = store.register(avro_schema)?;
3165 let fp_le_bytes = match fp {
3166 Fingerprint::Rabin(v) => v.to_le_bytes(),
3167 other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"),
3168 };
3169 const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
3170 const SOE_PREFIX_LEN: usize = 2 + 8;
3171 let mut stream: Vec<u8> = Vec::new();
3172 for body in rows.iter() {
3173 let msg_len: u32 = (SOE_PREFIX_LEN + body.len())
3174 .try_into()
3175 .expect("message length must fit in u32");
3176 stream.extend_from_slice(&msg_len.to_le_bytes());
3177 stream.extend_from_slice(&SOE_MAGIC);
3178 stream.extend_from_slice(&fp_le_bytes);
3179 stream.extend_from_slice(body.as_ref());
3180 }
3181 let mut decoder = ReaderBuilder::new()
3182 .with_writer_schema_store(store)
3183 .with_batch_size(1024)
3184 .build_decoder()?;
3185 let chunk_sizes = [1usize, 2, 3, 5, 8, 13, 21, 34];
3186 let mut pos = 0usize;
3187 let mut i = 0usize;
3188 let mut buffered = BytesMut::new();
3189 let mut decoded_frames = 0usize;
3190 while pos < stream.len() {
3191 let take = chunk_sizes[i % chunk_sizes.len()];
3192 i += 1;
3193 let end = (pos + take).min(stream.len());
3194 buffered.extend_from_slice(&stream[pos..end]);
3195 pos = end;
3196 loop {
3197 if buffered.len() < 4 {
3198 break;
3199 }
3200 let msg_len =
3201 u32::from_le_bytes([buffered[0], buffered[1], buffered[2], buffered[3]])
3202 as usize;
3203 if buffered.len() < 4 + msg_len {
3204 break;
3205 }
3206 let frame = buffered.split_to(4 + msg_len);
3207 let payload = &frame[4..];
3208 let consumed = decoder.decode(payload)?;
3209 assert_eq!(
3210 consumed,
3211 payload.len(),
3212 "decoder should consume the full SOE-framed message"
3213 );
3214
3215 decoded_frames += 1;
3216 }
3217 }
3218 assert!(
3219 buffered.is_empty(),
3220 "expected transport framer to consume all bytes; leftover = {}",
3221 buffered.len()
3222 );
3223 assert_eq!(
3224 decoded_frames,
3225 rows.len(),
3226 "expected to decode exactly one frame per encoded row"
3227 );
3228 let out = decoder.flush()?.expect("expected decoded RecordBatch");
3229 let expected_str = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
3230 let actual_str = pretty_format_batches(&[out])?.to_string();
3231 assert_eq!(expected_str, actual_str);
3232 Ok(())
3233 }
3234
3235 fn roundtrip_ocf(batch: &RecordBatch) -> Result<RecordBatch, AvroError> {
3237 let schema = batch.schema();
3238 let mut buffer = Vec::<u8>::new();
3239 let mut writer = AvroWriter::new(&mut buffer, schema.as_ref().clone())?;
3240 writer.write(batch)?;
3241 writer.finish()?;
3242 drop(writer);
3243 let reader = ReaderBuilder::new()
3244 .build(Cursor::new(buffer))
3245 .expect("build reader for roundtrip OCF");
3246 let avro_schema_json = reader
3248 .avro_header()
3249 .get(SCHEMA_METADATA_KEY)
3250 .map(|raw| std::str::from_utf8(raw).expect("valid UTF-8").to_string());
3251 let arrow_schema = reader.schema();
3253 let rt_schema = if let Some(json) = avro_schema_json {
3254 let mut metadata = arrow_schema.metadata().clone();
3255 metadata.insert(SCHEMA_METADATA_KEY.to_string(), json);
3256 Arc::new(Schema::new_with_metadata(
3257 arrow_schema.fields().clone(),
3258 metadata,
3259 ))
3260 } else {
3261 arrow_schema
3262 };
3263 let rt_batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;
3264 Ok(arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip"))
3265 }
3266
3267 #[cfg(feature = "avro_custom_types")]
3269 fn assert_round_trip(array: ArrayRef) {
3270 assert_round_trip_widened(array.clone(), array);
3271 }
3272
3273 fn assert_round_trip_widened(input: ArrayRef, expected: ArrayRef) {
3275 let schema = Schema::new(vec![Field::new("val", input.data_type().clone(), true)]);
3276 let batch =
3277 RecordBatch::try_new(Arc::new(schema), vec![input]).expect("failed to create batch");
3278 let roundtrip = roundtrip_ocf(&batch).expect("roundtrip failed");
3279 assert_eq!(
3280 roundtrip.column(0).data_type(),
3281 expected.data_type(),
3282 "output data type mismatch"
3283 );
3284 assert_eq!(
3285 roundtrip.column(0).to_data(),
3286 expected.to_data(),
3287 "output data mismatch"
3288 );
3289 }
3290
3291 #[cfg(feature = "avro_custom_types")]
3292 #[test]
3293 fn test_roundtrip_int8_custom_types() {
3294 assert_round_trip(Arc::new(Int8Array::from(vec![
3295 Some(i8::MIN),
3296 Some(-1),
3297 Some(0),
3298 None,
3299 Some(1),
3300 Some(i8::MAX),
3301 ])));
3302 }
3303
3304 #[cfg(not(feature = "avro_custom_types"))]
3305 #[test]
3306 fn test_roundtrip_int8_no_custom_widens_to_int32() {
3307 assert_round_trip_widened(
3308 Arc::new(Int8Array::from(vec![
3309 Some(i8::MIN),
3310 Some(-1),
3311 Some(0),
3312 None,
3313 Some(1),
3314 Some(i8::MAX),
3315 ])),
3316 Arc::new(Int32Array::from(vec![
3317 Some(i8::MIN as i32),
3318 Some(-1),
3319 Some(0),
3320 None,
3321 Some(1),
3322 Some(i8::MAX as i32),
3323 ])),
3324 );
3325 }
3326
3327 #[cfg(feature = "avro_custom_types")]
3328 #[test]
3329 fn test_roundtrip_int16_custom_types() {
3330 assert_round_trip(Arc::new(Int16Array::from(vec![
3331 Some(i16::MIN),
3332 Some(-1),
3333 Some(0),
3334 None,
3335 Some(1),
3336 Some(i16::MAX),
3337 ])));
3338 }
3339
3340 #[cfg(not(feature = "avro_custom_types"))]
3341 #[test]
3342 fn test_roundtrip_int16_no_custom_widens_to_int32() {
3343 assert_round_trip_widened(
3344 Arc::new(Int16Array::from(vec![
3345 Some(i16::MIN),
3346 Some(-1),
3347 Some(0),
3348 None,
3349 Some(1),
3350 Some(i16::MAX),
3351 ])),
3352 Arc::new(Int32Array::from(vec![
3353 Some(i16::MIN as i32),
3354 Some(-1),
3355 Some(0),
3356 None,
3357 Some(1),
3358 Some(i16::MAX as i32),
3359 ])),
3360 );
3361 }
3362
3363 #[cfg(feature = "avro_custom_types")]
3364 #[test]
3365 fn test_roundtrip_uint8_custom_types() {
3366 assert_round_trip(Arc::new(UInt8Array::from(vec![
3367 Some(0u8),
3368 Some(1),
3369 None,
3370 Some(127),
3371 Some(u8::MAX),
3372 ])));
3373 }
3374
3375 #[cfg(not(feature = "avro_custom_types"))]
3376 #[test]
3377 fn test_roundtrip_uint8_no_custom_widens_to_int32() {
3378 assert_round_trip_widened(
3379 Arc::new(UInt8Array::from(vec![
3380 Some(0u8),
3381 Some(1),
3382 None,
3383 Some(127),
3384 Some(u8::MAX),
3385 ])),
3386 Arc::new(Int32Array::from(vec![
3387 Some(0i32),
3388 Some(1),
3389 None,
3390 Some(127),
3391 Some(u8::MAX as i32),
3392 ])),
3393 );
3394 }
3395
3396 #[cfg(feature = "avro_custom_types")]
3397 #[test]
3398 fn test_roundtrip_uint16_custom_types() {
3399 assert_round_trip(Arc::new(UInt16Array::from(vec![
3400 Some(0u16),
3401 Some(1),
3402 None,
3403 Some(32767),
3404 Some(u16::MAX),
3405 ])));
3406 }
3407
3408 #[cfg(not(feature = "avro_custom_types"))]
3409 #[test]
3410 fn test_roundtrip_uint16_no_custom_widens_to_int32() {
3411 assert_round_trip_widened(
3412 Arc::new(UInt16Array::from(vec![
3413 Some(0u16),
3414 Some(1),
3415 None,
3416 Some(32767),
3417 Some(u16::MAX),
3418 ])),
3419 Arc::new(Int32Array::from(vec![
3420 Some(0i32),
3421 Some(1),
3422 None,
3423 Some(32767),
3424 Some(u16::MAX as i32),
3425 ])),
3426 );
3427 }
3428
3429 #[cfg(feature = "avro_custom_types")]
3430 #[test]
3431 fn test_roundtrip_uint32_custom_types() {
3432 assert_round_trip(Arc::new(UInt32Array::from(vec![
3433 Some(0u32),
3434 Some(1),
3435 None,
3436 Some(i32::MAX as u32),
3437 Some(u32::MAX),
3438 ])));
3439 }
3440
3441 #[cfg(not(feature = "avro_custom_types"))]
3442 #[test]
3443 fn test_roundtrip_uint32_no_custom_widens_to_int64() {
3444 assert_round_trip_widened(
3445 Arc::new(UInt32Array::from(vec![
3446 Some(0u32),
3447 Some(1),
3448 None,
3449 Some(i32::MAX as u32),
3450 Some(u32::MAX),
3451 ])),
3452 Arc::new(Int64Array::from(vec![
3453 Some(0i64),
3454 Some(1),
3455 None,
3456 Some(i32::MAX as i64),
3457 Some(u32::MAX as i64),
3458 ])),
3459 );
3460 }
3461
3462 #[cfg(feature = "avro_custom_types")]
3463 #[test]
3464 fn test_roundtrip_uint64_custom_types() {
3465 assert_round_trip(Arc::new(UInt64Array::from(vec![
3466 Some(0u64),
3467 Some(1),
3468 None,
3469 Some(i64::MAX as u64),
3470 Some(u64::MAX),
3471 ])));
3472 }
3473
3474 #[cfg(not(feature = "avro_custom_types"))]
3475 #[test]
3476 fn test_roundtrip_uint64_no_custom_widens_to_int64() {
3477 assert_round_trip_widened(
3478 Arc::new(UInt64Array::from(vec![
3479 Some(0u64),
3480 Some(1),
3481 None,
3482 Some(i64::MAX as u64),
3483 ])),
3484 Arc::new(Int64Array::from(vec![
3485 Some(0i64),
3486 Some(1),
3487 None,
3488 Some(i64::MAX),
3489 ])),
3490 );
3491 }
3492
3493 #[cfg(not(feature = "avro_custom_types"))]
3494 #[test]
3495 fn test_roundtrip_uint64_overflow_errors_without_custom() {
3496 use arrow_array::UInt64Array;
3497 let schema = Schema::new(vec![Field::new("val", DataType::UInt64, false)]);
3498 let values: Vec<u64> = vec![u64::MAX];
3499 let array = UInt64Array::from(values);
3500 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array) as ArrayRef])
3501 .expect("create batch");
3502 let result = roundtrip_ocf(&batch);
3503 assert!(
3504 result.is_err(),
3505 "Expected error when encoding UInt64 > i64::MAX without avro_custom_types"
3506 );
3507 }
3508
3509 #[cfg(feature = "avro_custom_types")]
3510 #[test]
3511 fn test_roundtrip_float16_custom_types() {
3512 assert_round_trip(Arc::new(Float16Array::from(vec![
3513 Some(f16::ZERO),
3514 Some(f16::ONE),
3515 None,
3516 Some(f16::NEG_ONE),
3517 Some(f16::MAX),
3518 Some(f16::MIN),
3519 ])));
3520 }
3521
3522 #[cfg(not(feature = "avro_custom_types"))]
3523 #[test]
3524 fn test_roundtrip_float16_no_custom_widens_to_float32() {
3525 assert_round_trip_widened(
3526 Arc::new(Float16Array::from(vec![
3527 Some(f16::ZERO),
3528 Some(f16::ONE),
3529 None,
3530 Some(f16::NEG_ONE),
3531 ])),
3532 Arc::new(Float32Array::from(vec![
3533 Some(0.0f32),
3534 Some(1.0),
3535 None,
3536 Some(-1.0),
3537 ])),
3538 );
3539 }
3540
3541 #[cfg(feature = "avro_custom_types")]
3542 #[test]
3543 fn test_roundtrip_date64_custom_types() {
3544 assert_round_trip(Arc::new(Date64Array::from(vec![
3545 Some(0i64),
3546 Some(86_400_000),
3547 None,
3548 Some(1_609_459_200_000),
3549 ])));
3550 }
3551
3552 #[cfg(not(feature = "avro_custom_types"))]
3553 #[test]
3554 fn test_roundtrip_date64_no_custom_as_timestamp_millis() {
3555 assert_round_trip_widened(
3556 Arc::new(Date64Array::from(vec![
3557 Some(0i64),
3558 Some(86_400_000),
3559 None,
3560 Some(1_609_459_200_000),
3561 ])),
3562 Arc::new(TimestampMillisecondArray::from(vec![
3563 Some(0i64),
3564 Some(86_400_000),
3565 None,
3566 Some(1_609_459_200_000),
3567 ])),
3568 );
3569 }
3570
3571 #[cfg(feature = "avro_custom_types")]
3572 #[test]
3573 fn test_roundtrip_time64_nanosecond_custom_types() {
3574 assert_round_trip(Arc::new(Time64NanosecondArray::from(vec![
3575 Some(0i64),
3576 Some(1_000_000_000),
3577 None,
3578 Some(86_399_999_999_999),
3579 ])));
3580 }
3581
3582 #[cfg(not(feature = "avro_custom_types"))]
3583 #[test]
3584 fn test_roundtrip_time64_nanos_no_custom_truncates_to_micros() {
3585 assert_round_trip_widened(
3587 Arc::new(Time64NanosecondArray::from(vec![
3588 Some(0i64),
3589 Some(1_000_000_000),
3590 None,
3591 Some(86_399_999_000_000),
3592 ])),
3593 Arc::new(Time64MicrosecondArray::from(vec![
3594 Some(0i64),
3595 Some(1_000_000),
3596 None,
3597 Some(86_399_999_000),
3598 ])),
3599 );
3600 }
3601
3602 #[cfg(feature = "avro_custom_types")]
3603 #[test]
3604 fn test_roundtrip_time32_second_custom_types() {
3605 assert_round_trip(Arc::new(Time32SecondArray::from(vec![
3606 Some(0i32),
3607 Some(3600),
3608 None,
3609 Some(86399),
3610 ])));
3611 }
3612
3613 #[cfg(not(feature = "avro_custom_types"))]
3614 #[test]
3615 fn test_roundtrip_time32_second_no_custom_scales_to_millis() {
3616 assert_round_trip_widened(
3617 Arc::new(Time32SecondArray::from(vec![
3618 Some(0i32),
3619 Some(3600),
3620 None,
3621 Some(86399),
3622 ])),
3623 Arc::new(Time32MillisecondArray::from(vec![
3624 Some(0i32),
3625 Some(3_600_000),
3626 None,
3627 Some(86_399_000),
3628 ])),
3629 );
3630 }
3631
3632 #[cfg(feature = "avro_custom_types")]
3633 #[test]
3634 fn test_roundtrip_timestamp_second_custom_types() {
3635 assert_round_trip(Arc::new(
3636 TimestampSecondArray::from(vec![Some(0i64), Some(1609459200), None, Some(1735689600)])
3637 .with_timezone("+00:00"),
3638 ));
3639 }
3640
3641 #[cfg(not(feature = "avro_custom_types"))]
3642 #[test]
3643 fn test_roundtrip_timestamp_second_no_custom_scales_to_millis() {
3644 assert_round_trip_widened(
3645 Arc::new(
3646 TimestampSecondArray::from(vec![
3647 Some(0i64),
3648 Some(1609459200),
3649 None,
3650 Some(1735689600),
3651 ])
3652 .with_timezone("+00:00"),
3653 ),
3654 Arc::new(
3655 TimestampMillisecondArray::from(vec![
3656 Some(0i64),
3657 Some(1_609_459_200_000),
3658 None,
3659 Some(1_735_689_600_000),
3660 ])
3661 .with_timezone("+00:00"),
3662 ),
3663 );
3664 }
3665
3666 #[cfg(feature = "avro_custom_types")]
3667 #[test]
3668 fn test_roundtrip_interval_year_month_custom_types() {
3669 assert_round_trip(Arc::new(IntervalYearMonthArray::from(vec![
3670 Some(0i32),
3671 Some(12),
3672 None,
3673 Some(-6),
3674 Some(25),
3675 ])));
3676 }
3677
3678 #[cfg(not(feature = "avro_custom_types"))]
3679 #[test]
3680 fn test_roundtrip_interval_year_month_no_custom() {
3681 assert_round_trip_widened(
3683 Arc::new(IntervalYearMonthArray::from(vec![
3684 Some(0i32),
3685 Some(12),
3686 None,
3687 Some(25),
3688 ])),
3689 Arc::new(IntervalMonthDayNanoArray::from(vec![
3690 Some(IntervalMonthDayNano::new(0, 0, 0)),
3691 Some(IntervalMonthDayNano::new(12, 0, 0)),
3692 None,
3693 Some(IntervalMonthDayNano::new(25, 0, 0)),
3694 ])),
3695 );
3696 }
3697
3698 #[cfg(feature = "avro_custom_types")]
3699 #[test]
3700 fn test_roundtrip_interval_day_time_custom_types() {
3701 assert_round_trip(Arc::new(IntervalDayTimeArray::from(vec![
3702 Some(IntervalDayTime::new(0, 0)),
3703 Some(IntervalDayTime::new(1, 1000)),
3704 None,
3705 Some(IntervalDayTime::new(30, 3600000)),
3706 ])));
3707 }
3708
3709 #[cfg(not(feature = "avro_custom_types"))]
3710 #[test]
3711 fn test_roundtrip_interval_day_time_no_custom() {
3712 assert_round_trip_widened(
3713 Arc::new(IntervalDayTimeArray::from(vec![
3714 Some(IntervalDayTime::new(0, 0)),
3715 Some(IntervalDayTime::new(1, 1000)),
3716 None,
3717 Some(IntervalDayTime::new(30, 3600000)),
3718 ])),
3719 Arc::new(IntervalMonthDayNanoArray::from(vec![
3720 Some(IntervalMonthDayNano::new(0, 0, 0)),
3721 Some(IntervalMonthDayNano::new(0, 1, 1_000_000_000)),
3722 None,
3723 Some(IntervalMonthDayNano::new(0, 30, 3_600_000_000_000)),
3724 ])),
3725 );
3726 }
3727
3728 #[cfg(feature = "avro_custom_types")]
3729 #[test]
3730 fn test_roundtrip_interval_month_day_nano_custom_types() {
3731 assert_round_trip(Arc::new(IntervalMonthDayNanoArray::from(vec![
3732 Some(IntervalMonthDayNano::new(0, 0, 0)),
3733 Some(IntervalMonthDayNano::new(1, 2, 3)),
3734 None,
3735 Some(IntervalMonthDayNano::new(-4, -5, -6)),
3736 ])));
3737 }
3738
3739 #[cfg(not(feature = "avro_custom_types"))]
3740 #[test]
3741 fn test_roundtrip_interval_month_day_nano_no_custom() {
3742 assert_round_trip_widened(
3744 Arc::new(IntervalMonthDayNanoArray::from(vec![
3745 Some(IntervalMonthDayNano::new(0, 0, 0)),
3746 Some(IntervalMonthDayNano::new(1, 2, 3_000_000)),
3747 None,
3748 Some(IntervalMonthDayNano::new(4, 5, 6_000_000)),
3749 ])),
3750 Arc::new(IntervalMonthDayNanoArray::from(vec![
3751 Some(IntervalMonthDayNano::new(0, 0, 0)),
3752 Some(IntervalMonthDayNano::new(1, 2, 3_000_000)),
3753 None,
3754 Some(IntervalMonthDayNano::new(4, 5, 6_000_000)),
3755 ])),
3756 );
3757 }
3758
3759 fn schemas_equal_ignoring_metadata(left: &Schema, right: &Schema) -> bool {
3760 if left.fields().len() != right.fields().len() {
3761 return false;
3762 }
3763 for (l, r) in left.fields().iter().zip(right.fields().iter()) {
3764 if l.name() != r.name()
3765 || l.data_type() != r.data_type()
3766 || l.is_nullable() != r.is_nullable()
3767 {
3768 return false;
3769 }
3770 }
3771 true
3772 }
3773
3774 fn avro_field_type<'a>(avro_schema: &'a Value, name: &str) -> &'a Value {
3775 let fields = avro_schema
3776 .get("fields")
3777 .and_then(|v| v.as_array())
3778 .expect("avro schema has 'fields' array");
3779 fields
3780 .iter()
3781 .find(|f| f.get("name").and_then(|n| n.as_str()) == Some(name))
3782 .unwrap_or_else(|| panic!("avro schema missing field '{name}'"))
3783 .get("type")
3784 .expect("field has 'type'")
3785 }
3786
3787 #[test]
3788 fn e2e_types_and_schema_alignment() -> Result<(), AvroError> {
3789 let i8_values: Vec<Option<i8>> = vec![Some(i8::MIN), Some(-1), Some(i8::MAX)];
3794 let i16_values: Vec<Option<i16>> = vec![Some(i16::MIN), Some(-1), Some(i16::MAX)];
3795 let u8_values: Vec<Option<u8>> = vec![Some(0), Some(1), Some(u8::MAX)];
3796 let u16_values: Vec<Option<u16>> = vec![Some(0), Some(1), Some(u16::MAX)];
3797 let u32_values: Vec<Option<u32>> = vec![Some(0), Some(1), Some(u32::MAX)];
3798 let u64_values: Vec<Option<u64>> = if cfg!(feature = "avro_custom_types") {
3799 vec![Some(0), Some(i64::MAX as u64), Some((i64::MAX as u64) + 1)]
3800 } else {
3801 vec![Some(0), Some((i64::MAX as u64) - 1), Some(i64::MAX as u64)]
3803 };
3804 let f16_values: Vec<Option<f16>> = vec![
3805 Some(f16::from_f32(1.5)),
3806 Some(f16::from_f32(-2.0)),
3807 Some(f16::from_f32(0.0)),
3808 ];
3809 let date64_values: Vec<Option<i64>> = vec![Some(-86_400_000), Some(0), Some(86_400_000)];
3810 let time32s_values: Vec<Option<i32>> = vec![Some(0), Some(1), Some(86_399)];
3811 let time64ns_values: Vec<Option<i64>> = vec![
3812 Some(0),
3813 Some(1_234_567_890), Some(86_399_000_000_123_i64), ];
3816 let ts_s_local_values: Vec<Option<i64>> = vec![Some(-1), Some(0), Some(1)];
3817 let ts_s_utc_values: Vec<Option<i64>> = vec![Some(1), Some(2), Some(3)];
3818 let iv_ym_values: Vec<Option<i32>> = if cfg!(feature = "avro_custom_types") {
3819 vec![Some(0), Some(-6), Some(25)]
3820 } else {
3821 vec![Some(0), Some(12), Some(25)]
3823 };
3824 let iv_dt_values: Vec<Option<IntervalDayTime>> = if cfg!(feature = "avro_custom_types") {
3825 vec![
3826 Some(IntervalDayTime::new(0, 0)),
3827 Some(IntervalDayTime::new(1, 1000)),
3828 Some(IntervalDayTime::new(-1, -1000)),
3829 ]
3830 } else {
3831 vec![
3833 Some(IntervalDayTime::new(0, 0)),
3834 Some(IntervalDayTime::new(1, 1000)),
3835 Some(IntervalDayTime::new(30, 3_600_000)),
3836 ]
3837 };
3838 let iv_mdn_values: Vec<Option<IntervalMonthDayNano>> =
3839 if cfg!(feature = "avro_custom_types") {
3840 vec![
3841 Some(IntervalMonthDayNano::new(0, 0, 0)),
3842 Some(IntervalMonthDayNano::new(1, 2, 3)), Some(IntervalMonthDayNano::new(-1, -2, -3)), ]
3845 } else {
3846 vec![
3848 Some(IntervalMonthDayNano::new(0, 0, 0)),
3849 Some(IntervalMonthDayNano::new(1, 2, 3_000_000)), Some(IntervalMonthDayNano::new(10, 20, 30_000_000_000)), ]
3852 };
3853 let schema = Schema::new(vec![
3855 Field::new("i8", DataType::Int8, false),
3856 Field::new("i16", DataType::Int16, false),
3857 Field::new("u8", DataType::UInt8, false),
3858 Field::new("u16", DataType::UInt16, false),
3859 Field::new("u32", DataType::UInt32, false),
3860 Field::new("u64", DataType::UInt64, false),
3861 Field::new("f16", DataType::Float16, false),
3862 Field::new("date64", DataType::Date64, false),
3863 Field::new("time32s", DataType::Time32(TimeUnit::Second), false),
3864 Field::new("time64ns", DataType::Time64(TimeUnit::Nanosecond), false),
3865 Field::new(
3866 "ts_s_local",
3867 DataType::Timestamp(TimeUnit::Second, None),
3868 false,
3869 ),
3870 Field::new(
3871 "ts_s_utc",
3872 DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
3873 false,
3874 ),
3875 Field::new("iv_ym", DataType::Interval(IntervalUnit::YearMonth), false),
3876 Field::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false),
3877 Field::new(
3878 "iv_mdn",
3879 DataType::Interval(IntervalUnit::MonthDayNano),
3880 false,
3881 ),
3882 ]);
3883 let batch = RecordBatch::try_new(
3884 Arc::new(schema.clone()),
3885 vec![
3886 Arc::new(Int8Array::from(i8_values.clone())) as ArrayRef,
3887 Arc::new(Int16Array::from(i16_values.clone())) as ArrayRef,
3888 Arc::new(UInt8Array::from(u8_values.clone())) as ArrayRef,
3889 Arc::new(UInt16Array::from(u16_values.clone())) as ArrayRef,
3890 Arc::new(UInt32Array::from(u32_values.clone())) as ArrayRef,
3891 Arc::new(UInt64Array::from(u64_values.clone())) as ArrayRef,
3892 Arc::new(Float16Array::from(f16_values.clone())) as ArrayRef,
3893 Arc::new(Date64Array::from(date64_values.clone())) as ArrayRef,
3894 Arc::new(Time32SecondArray::from(time32s_values.clone())) as ArrayRef,
3895 Arc::new(Time64NanosecondArray::from(time64ns_values.clone())) as ArrayRef,
3896 Arc::new(TimestampSecondArray::from(ts_s_local_values.clone())) as ArrayRef,
3897 Arc::new(
3898 TimestampSecondArray::from(ts_s_utc_values.clone()).with_timezone("+00:00"),
3899 ) as ArrayRef,
3900 Arc::new(IntervalYearMonthArray::from(iv_ym_values.clone())) as ArrayRef,
3901 Arc::new(IntervalDayTimeArray::from(iv_dt_values.clone())) as ArrayRef,
3902 Arc::new(IntervalMonthDayNanoArray::from(iv_mdn_values.clone())) as ArrayRef,
3903 ],
3904 )?;
3905 let rt = roundtrip_ocf(&batch)?;
3906 let rt_schema = rt.schema();
3907 let avro_schema_json = rt_schema
3908 .metadata()
3909 .get(SCHEMA_METADATA_KEY)
3910 .expect("avro.schema missing in round-tripped batch metadata");
3911 let avro_schema: Value =
3912 serde_json::from_str(avro_schema_json).expect("valid avro schema json");
3913 let rt_arrow_schema = rt.schema();
3914 if cfg!(feature = "avro_custom_types") {
3915 assert!(
3916 schemas_equal_ignoring_metadata(rt_arrow_schema.as_ref(), &schema),
3917 "Schema fields mismatch.\nExpected: {:?}\nGot: {:?}",
3918 schema,
3919 rt_arrow_schema
3920 );
3921 for field_name in ["u64", "f16", "iv_ym", "iv_dt", "iv_mdn"] {
3922 let field = rt_arrow_schema
3923 .field_with_name(field_name)
3924 .expect("field exists");
3925 assert!(
3926 field.metadata().get(AVRO_NAME_METADATA_KEY).is_some(),
3927 "Field '{}' should have avro.name metadata",
3928 field_name
3929 );
3930 }
3931 } else {
3932 let exp_schema = Schema::new(vec![
3936 Field::new("i8", DataType::Int32, false),
3937 Field::new("i16", DataType::Int32, false),
3938 Field::new("u8", DataType::Int32, false),
3939 Field::new("u16", DataType::Int32, false),
3940 Field::new("u32", DataType::Int64, false),
3941 Field::new("u64", DataType::Int64, false),
3942 Field::new("f16", DataType::Float32, false),
3943 Field::new(
3944 "date64",
3945 DataType::Timestamp(TimeUnit::Millisecond, None),
3946 false,
3947 ),
3948 Field::new("time32s", DataType::Time32(TimeUnit::Millisecond), false),
3949 Field::new("time64ns", DataType::Time64(TimeUnit::Microsecond), false),
3950 Field::new(
3951 "ts_s_local",
3952 DataType::Timestamp(TimeUnit::Millisecond, None),
3953 false,
3954 ),
3955 Field::new(
3956 "ts_s_utc",
3957 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
3958 false,
3959 ),
3960 Field::new(
3961 "iv_ym",
3962 DataType::Interval(IntervalUnit::MonthDayNano),
3963 false,
3964 ),
3965 Field::new(
3966 "iv_dt",
3967 DataType::Interval(IntervalUnit::MonthDayNano),
3968 false,
3969 ),
3970 Field::new(
3971 "iv_mdn",
3972 DataType::Interval(IntervalUnit::MonthDayNano),
3973 false,
3974 ),
3975 ]);
3976 assert!(
3977 schemas_equal_ignoring_metadata(rt_arrow_schema.as_ref(), &exp_schema),
3978 "Schema fields mismatch.\nExpected: {:?}\nGot: {:?}",
3979 exp_schema,
3980 rt_arrow_schema
3981 );
3982 for field_name in ["iv_ym", "iv_dt", "iv_mdn"] {
3983 let field = rt_arrow_schema
3984 .field_with_name(field_name)
3985 .expect("field exists");
3986 assert!(
3987 field.metadata().get(AVRO_NAME_METADATA_KEY).is_some(),
3988 "Field '{}' should have avro.name metadata",
3989 field_name
3990 );
3991 }
3992 }
3993 if cfg!(feature = "avro_custom_types") {
3994 assert_eq!(
3995 avro_field_type(&avro_schema, "i8"),
3996 &json!({"type":"int","logicalType":"arrow.int8"})
3997 );
3998 assert_eq!(
3999 avro_field_type(&avro_schema, "i16"),
4000 &json!({"type":"int","logicalType":"arrow.int16"})
4001 );
4002 assert_eq!(
4003 avro_field_type(&avro_schema, "u8"),
4004 &json!({"type":"int","logicalType":"arrow.uint8"})
4005 );
4006 assert_eq!(
4007 avro_field_type(&avro_schema, "u16"),
4008 &json!({"type":"int","logicalType":"arrow.uint16"})
4009 );
4010 assert_eq!(
4011 avro_field_type(&avro_schema, "u32"),
4012 &json!({"type":"long","logicalType":"arrow.uint32"})
4013 );
4014 assert_eq!(
4015 avro_field_type(&avro_schema, "u64"),
4016 &json!({"type":"fixed","name":"u64","size":8,"logicalType":"arrow.uint64"})
4017 );
4018 assert_eq!(
4019 avro_field_type(&avro_schema, "f16"),
4020 &json!({"type":"fixed","name":"f16","size":2,"logicalType":"arrow.float16"})
4021 );
4022 assert_eq!(
4023 avro_field_type(&avro_schema, "date64"),
4024 &json!({"type":"long","logicalType":"arrow.date64"})
4025 );
4026 assert_eq!(
4027 avro_field_type(&avro_schema, "time32s"),
4028 &json!({"type":"int","logicalType":"arrow.time32-second"})
4029 );
4030 assert_eq!(
4031 avro_field_type(&avro_schema, "time64ns"),
4032 &json!({"type":"long","logicalType":"arrow.time64-nanosecond"})
4033 );
4034 assert_eq!(
4035 avro_field_type(&avro_schema, "ts_s_local"),
4036 &json!({"type":"long","logicalType":"arrow.local-timestamp-second"})
4037 );
4038 assert_eq!(
4039 avro_field_type(&avro_schema, "ts_s_utc"),
4040 &json!({"type":"long","logicalType":"arrow.timestamp-second"})
4041 );
4042 assert_eq!(
4043 avro_field_type(&avro_schema, "iv_ym"),
4044 &json!({"type":"fixed","name":"iv_ym","size":4,"logicalType":"arrow.interval-year-month"})
4045 );
4046 assert_eq!(
4047 avro_field_type(&avro_schema, "iv_dt"),
4048 &json!({"type":"fixed","name":"iv_dt","size":8,"logicalType":"arrow.interval-day-time"})
4049 );
4050 assert_eq!(
4051 avro_field_type(&avro_schema, "iv_mdn"),
4052 &json!({"type":"fixed","name":"iv_mdn","size":16,"logicalType":"arrow.interval-month-day-nano"})
4053 );
4054 } else {
4055 assert_eq!(avro_field_type(&avro_schema, "i8"), &json!("int"));
4065 assert_eq!(avro_field_type(&avro_schema, "i16"), &json!("int"));
4066 assert_eq!(avro_field_type(&avro_schema, "u8"), &json!("int"));
4067 assert_eq!(avro_field_type(&avro_schema, "u16"), &json!("int"));
4068 assert_eq!(avro_field_type(&avro_schema, "u32"), &json!("long"));
4069 assert_eq!(avro_field_type(&avro_schema, "u64"), &json!("long"));
4070 assert_eq!(avro_field_type(&avro_schema, "f16"), &json!("float"));
4071 assert_eq!(
4072 avro_field_type(&avro_schema, "date64"),
4073 &json!({"type":"long","logicalType":"local-timestamp-millis"})
4074 );
4075 assert_eq!(
4076 avro_field_type(&avro_schema, "time32s"),
4077 &json!({"type":"int","logicalType":"time-millis"})
4078 );
4079 assert_eq!(
4080 avro_field_type(&avro_schema, "time64ns"),
4081 &json!({"type":"long","logicalType":"time-micros"})
4082 );
4083 assert_eq!(
4084 avro_field_type(&avro_schema, "ts_s_local"),
4085 &json!({"type":"long","logicalType":"local-timestamp-millis"})
4086 );
4087 assert_eq!(
4088 avro_field_type(&avro_schema, "ts_s_utc"),
4089 &json!({"type":"long","logicalType":"timestamp-millis"})
4090 );
4091 assert_eq!(
4092 avro_field_type(&avro_schema, "iv_ym"),
4093 &json!({"type":"fixed","name":"iv_ym","size":12,"logicalType":"duration"})
4094 );
4095 assert_eq!(
4096 avro_field_type(&avro_schema, "iv_dt"),
4097 &json!({"type":"fixed","name":"iv_dt","size":12,"logicalType":"duration"})
4098 );
4099 assert_eq!(
4100 avro_field_type(&avro_schema, "iv_mdn"),
4101 &json!({"type":"fixed","name":"iv_mdn","size":12,"logicalType":"duration"})
4102 );
4103 }
4104 if cfg!(feature = "avro_custom_types") {
4105 assert_eq!(
4106 rt.column(0).as_ref(),
4107 &Int8Array::from(i8_values) as &dyn Array
4108 );
4109 assert_eq!(
4110 rt.column(1).as_ref(),
4111 &Int16Array::from(i16_values) as &dyn Array
4112 );
4113 assert_eq!(
4114 rt.column(2).as_ref(),
4115 &UInt8Array::from(u8_values) as &dyn Array
4116 );
4117 assert_eq!(
4118 rt.column(3).as_ref(),
4119 &UInt16Array::from(u16_values) as &dyn Array
4120 );
4121 assert_eq!(
4122 rt.column(4).as_ref(),
4123 &UInt32Array::from(u32_values) as &dyn Array
4124 );
4125 assert_eq!(
4126 rt.column(5).as_ref(),
4127 &UInt64Array::from(u64_values) as &dyn Array
4128 );
4129 assert_eq!(
4130 rt.column(6).as_ref(),
4131 &Float16Array::from(f16_values) as &dyn Array
4132 );
4133 assert_eq!(
4134 rt.column(7).as_ref(),
4135 &Date64Array::from(date64_values) as &dyn Array
4136 );
4137 assert_eq!(
4138 rt.column(8).as_ref(),
4139 &Time32SecondArray::from(time32s_values) as &dyn Array
4140 );
4141 assert_eq!(
4142 rt.column(9).as_ref(),
4143 &Time64NanosecondArray::from(time64ns_values) as &dyn Array
4144 );
4145 assert_eq!(
4146 rt.column(10).as_ref(),
4147 &TimestampSecondArray::from(ts_s_local_values) as &dyn Array
4148 );
4149 assert_eq!(
4150 rt.column(11).as_ref(),
4151 &TimestampSecondArray::from(ts_s_utc_values).with_timezone("+00:00") as &dyn Array
4152 );
4153 assert_eq!(
4154 rt.column(12).as_ref(),
4155 &IntervalYearMonthArray::from(iv_ym_values) as &dyn Array
4156 );
4157 assert_eq!(
4158 rt.column(13).as_ref(),
4159 &IntervalDayTimeArray::from(iv_dt_values) as &dyn Array
4160 );
4161 assert_eq!(
4162 rt.column(14).as_ref(),
4163 &IntervalMonthDayNanoArray::from(iv_mdn_values) as &dyn Array
4164 );
4165 } else {
4166 let exp_i8: Vec<Option<i32>> = i8_values.iter().map(|v| v.map(|x| x as i32)).collect();
4167 let exp_i16: Vec<Option<i32>> =
4168 i16_values.iter().map(|v| v.map(|x| x as i32)).collect();
4169 let exp_u8: Vec<Option<i32>> = u8_values.iter().map(|v| v.map(|x| x as i32)).collect();
4170 let exp_u16: Vec<Option<i32>> =
4171 u16_values.iter().map(|v| v.map(|x| x as i32)).collect();
4172 let exp_u32: Vec<Option<i64>> =
4173 u32_values.iter().map(|v| v.map(|x| x as i64)).collect();
4174 let exp_u64: Vec<Option<i64>> =
4175 u64_values.iter().map(|v| v.map(|x| x as i64)).collect();
4176 let exp_f16: Vec<Option<f32>> =
4177 f16_values.iter().map(|v| v.map(|x| x.to_f32())).collect();
4178 let exp_time32_ms: Vec<Option<i32>> = time32s_values
4179 .iter()
4180 .map(|v| v.map(|x| x.saturating_mul(1000)))
4181 .collect();
4182 let exp_time64_us: Vec<Option<i64>> = time64ns_values
4183 .iter()
4184 .map(|v| v.map(|x| x / 1000))
4185 .collect();
4186 let exp_ts_local_ms: Vec<Option<i64>> = ts_s_local_values
4187 .iter()
4188 .map(|v| v.map(|x| x * 1000))
4189 .collect();
4190 let exp_ts_utc_ms: Vec<Option<i64>> = ts_s_utc_values
4191 .iter()
4192 .map(|v| v.map(|x| x * 1000))
4193 .collect();
4194 let exp_iv_ym: Vec<Option<IntervalMonthDayNano>> = iv_ym_values
4196 .iter()
4197 .map(|v| v.map(|months| IntervalMonthDayNano::new(months, 0, 0)))
4198 .collect();
4199 let exp_iv_dt: Vec<Option<IntervalMonthDayNano>> = iv_dt_values
4200 .iter()
4201 .map(|v| {
4202 v.map(|dt| {
4203 IntervalMonthDayNano::new(0, dt.days, (dt.milliseconds as i64) * 1_000_000)
4204 })
4205 })
4206 .collect();
4207 assert_eq!(
4208 rt.column(0).as_ref(),
4209 &Int32Array::from(exp_i8) as &dyn Array
4210 );
4211 assert_eq!(
4212 rt.column(1).as_ref(),
4213 &Int32Array::from(exp_i16) as &dyn Array
4214 );
4215 assert_eq!(
4216 rt.column(2).as_ref(),
4217 &Int32Array::from(exp_u8) as &dyn Array
4218 );
4219 assert_eq!(
4220 rt.column(3).as_ref(),
4221 &Int32Array::from(exp_u16) as &dyn Array
4222 );
4223 assert_eq!(
4224 rt.column(4).as_ref(),
4225 &arrow_array::Int64Array::from(exp_u32) as &dyn Array
4226 );
4227 assert_eq!(
4228 rt.column(5).as_ref(),
4229 &arrow_array::Int64Array::from(exp_u64) as &dyn Array
4230 );
4231 assert_eq!(
4232 rt.column(6).as_ref(),
4233 &arrow_array::Float32Array::from(exp_f16) as &dyn Array
4234 );
4235 assert_eq!(
4236 rt.column(7).as_ref(),
4237 &TimestampMillisecondArray::from(date64_values) as &dyn Array
4238 );
4239 assert_eq!(
4240 rt.column(8).as_ref(),
4241 &Time32MillisecondArray::from(exp_time32_ms) as &dyn Array
4242 );
4243 assert_eq!(
4244 rt.column(9).as_ref(),
4245 &Time64MicrosecondArray::from(exp_time64_us) as &dyn Array
4246 );
4247 assert_eq!(
4248 rt.column(10).as_ref(),
4249 &TimestampMillisecondArray::from(exp_ts_local_ms) as &dyn Array
4250 );
4251 assert_eq!(
4252 rt.column(11).as_ref(),
4253 &TimestampMillisecondArray::from(exp_ts_utc_ms).with_timezone("+00:00")
4254 as &dyn Array
4255 );
4256 assert_eq!(
4257 rt.column(12).as_ref(),
4258 &IntervalMonthDayNanoArray::from(exp_iv_ym) as &dyn Array
4259 );
4260 assert_eq!(
4261 rt.column(13).as_ref(),
4262 &IntervalMonthDayNanoArray::from(exp_iv_dt) as &dyn Array
4263 );
4264 assert_eq!(
4265 rt.column(14).as_ref(),
4266 &IntervalMonthDayNanoArray::from(iv_mdn_values) as &dyn Array
4267 );
4268 }
4269 Ok(())
4270 }
4271
4272 #[cfg(not(feature = "avro_custom_types"))]
4273 #[test]
4274 fn non_custom_uint64_overflow_errors() -> Result<(), AvroError> {
4275 let schema = Schema::new(vec![Field::new("u64", DataType::UInt64, false)]);
4276 let values: Vec<Option<u64>> = vec![Some((i64::MAX as u64) + 1)];
4277 let batch = RecordBatch::try_new(
4278 Arc::new(schema.clone()),
4279 vec![Arc::new(UInt64Array::from(values)) as ArrayRef],
4280 )?;
4281 let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4282 let err = w
4283 .write(&batch)
4284 .expect_err("expected UInt64 overflow error when avro_custom_types is disabled");
4285 match err {
4286 AvroError::InvalidArgument(msg) => {
4287 assert_eq!(
4288 msg,
4289 "UInt64 value 9223372036854775808 exceeds i64::MAX; enable avro_custom_types feature for full UInt64 support"
4290 );
4291 }
4292 other => panic!("expected InvalidArgument, got {other:?}"),
4293 }
4294 Ok(())
4295 }
4296
4297 #[cfg(not(feature = "avro_custom_types"))]
4298 #[test]
4299 fn non_custom_interval_year_month_negative_errors() -> Result<(), AvroError> {
4300 let schema = Schema::new(vec![Field::new(
4301 "iv_ym",
4302 DataType::Interval(IntervalUnit::YearMonth),
4303 false,
4304 )]);
4305 let values: Vec<Option<i32>> = vec![Some(-1)];
4306 let batch = RecordBatch::try_new(
4307 Arc::new(schema.clone()),
4308 vec![Arc::new(IntervalYearMonthArray::from(values)) as ArrayRef],
4309 )?;
4310
4311 let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4312 let err = w
4313 .write(&batch)
4314 .expect_err("expected negative Interval(YearMonth) error");
4315 match err {
4316 AvroError::InvalidArgument(msg) => {
4317 assert_eq!(
4318 msg,
4319 "Avro 'duration' cannot encode negative months; enable `avro_custom_types` to round-trip signed Arrow Interval(YearMonth)"
4320 );
4321 }
4322 other => panic!("expected InvalidArgument, got {other:?}"),
4323 }
4324 Ok(())
4325 }
4326
4327 #[cfg(not(feature = "avro_custom_types"))]
4328 #[test]
4329 fn non_custom_interval_day_time_negative_errors() -> Result<(), AvroError> {
4330 let schema = Schema::new(vec![Field::new(
4331 "iv_dt",
4332 DataType::Interval(IntervalUnit::DayTime),
4333 false,
4334 )]);
4335 let values: Vec<Option<IntervalDayTime>> = vec![Some(IntervalDayTime::new(-1, 0))];
4336 let batch = RecordBatch::try_new(
4337 Arc::new(schema.clone()),
4338 vec![Arc::new(IntervalDayTimeArray::from(values)) as ArrayRef],
4339 )?;
4340 let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4341 let err = w
4342 .write(&batch)
4343 .expect_err("expected negative Interval(DayTime) error");
4344 match err {
4345 AvroError::InvalidArgument(msg) => {
4346 assert_eq!(
4347 msg,
4348 "Avro 'duration' cannot encode negative days or milliseconds; enable `avro_custom_types` to round-trip signed Arrow Interval(DayTime)"
4349 );
4350 }
4351 other => panic!("expected InvalidArgument, got {other:?}"),
4352 }
4353 Ok(())
4354 }
4355
4356 #[cfg(not(feature = "avro_custom_types"))]
4357 #[test]
4358 fn non_custom_interval_month_day_nano_negative_errors() -> Result<(), AvroError> {
4359 let schema = Schema::new(vec![Field::new(
4360 "iv_mdn",
4361 DataType::Interval(IntervalUnit::MonthDayNano),
4362 false,
4363 )]);
4364 let values: Vec<Option<IntervalMonthDayNano>> =
4365 vec![Some(IntervalMonthDayNano::new(-1, 0, 0))];
4366 let batch = RecordBatch::try_new(
4367 Arc::new(schema.clone()),
4368 vec![Arc::new(IntervalMonthDayNanoArray::from(values)) as ArrayRef],
4369 )?;
4370 let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4371 let err = w
4372 .write(&batch)
4373 .expect_err("expected negative Interval(MonthDayNano) error");
4374 match err {
4375 AvroError::InvalidArgument(msg) => {
4376 assert_eq!(
4377 msg,
4378 "Avro 'duration' cannot encode negative months/days/nanoseconds; enable `avro_custom_types` to round-trip signed Arrow intervals"
4379 );
4380 }
4381 other => panic!("expected InvalidArgument, got {other:?}"),
4382 }
4383 Ok(())
4384 }
4385
4386 #[cfg(not(feature = "avro_custom_types"))]
4387 #[test]
4388 fn non_custom_interval_month_day_nano_sub_millis_errors() -> Result<(), AvroError> {
4389 let schema = Schema::new(vec![Field::new(
4390 "iv_mdn",
4391 DataType::Interval(IntervalUnit::MonthDayNano),
4392 false,
4393 )]);
4394 let values: Vec<Option<IntervalMonthDayNano>> =
4395 vec![Some(IntervalMonthDayNano::new(0, 0, 1))];
4396 let batch = RecordBatch::try_new(
4397 Arc::new(schema.clone()),
4398 vec![Arc::new(IntervalMonthDayNanoArray::from(values)) as ArrayRef],
4399 )?;
4400 let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4401 let err = w
4402 .write(&batch)
4403 .expect_err("expected sub-millisecond Interval(MonthDayNano) error");
4404 match err {
4405 AvroError::InvalidArgument(msg) => {
4406 assert_eq!(
4407 msg,
4408 "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000 (enable `avro_custom_types` to preserve nanosecond intervals)"
4409 );
4410 }
4411 other => panic!("expected InvalidArgument, got {other:?}"),
4412 }
4413 Ok(())
4414 }
4415
4416 #[cfg(not(feature = "avro_custom_types"))]
4417 #[test]
4418 fn non_custom_time32_second_scaling_overflow_errors() -> Result<(), AvroError> {
4419 let schema = Schema::new(vec![Field::new(
4420 "time32s",
4421 DataType::Time32(TimeUnit::Second),
4422 false,
4423 )]);
4424 let values: Vec<Option<i32>> = vec![Some((i32::MAX / 1000) + 1)];
4425 let batch = RecordBatch::try_new(
4426 Arc::new(schema.clone()),
4427 vec![Arc::new(Time32SecondArray::from(values)) as ArrayRef],
4428 )?;
4429 let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4430 let err = w
4431 .write(&batch)
4432 .expect_err("expected time32 seconds->millis overflow error");
4433 match err {
4434 AvroError::InvalidArgument(msg) => {
4435 assert_eq!(msg, "time32(secs) * 1000 overflowed");
4436 }
4437 other => panic!("expected InvalidArgument, got {other:?}"),
4438 }
4439 Ok(())
4440 }
4441
4442 #[cfg(not(feature = "avro_custom_types"))]
4443 #[test]
4444 fn non_custom_timestamp_second_scaling_overflow_errors() -> Result<(), AvroError> {
4445 let schema = Schema::new(vec![Field::new(
4446 "ts_s_local",
4447 DataType::Timestamp(TimeUnit::Second, None),
4448 false,
4449 )]);
4450 let values: Vec<Option<i64>> = vec![Some((i64::MAX / 1000) + 1)];
4452 let batch = RecordBatch::try_new(
4453 Arc::new(schema.clone()),
4454 vec![Arc::new(TimestampSecondArray::from(values)) as ArrayRef],
4455 )?;
4456 let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4457 let err = w
4458 .write(&batch)
4459 .expect_err("expected timestamp seconds->millis overflow error");
4460 match err {
4461 AvroError::InvalidArgument(msg) => {
4462 assert_eq!(msg, "timestamp(secs) * 1000 overflowed");
4463 }
4464 other => panic!("expected InvalidArgument, got {other:?}"),
4465 }
4466 Ok(())
4467 }
4468}