1use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::errors::AvroError;
22use crate::schema::{Fingerprint, Nullability, Prefix};
23use arrow_array::Float16Array;
24use arrow_array::cast::AsArray;
25use arrow_array::types::{
26 ArrowPrimitiveType, Date32Type, Date64Type, DurationMicrosecondType, DurationMillisecondType,
27 DurationNanosecondType, DurationSecondType, Float16Type, Float32Type, Float64Type, Int8Type,
28 Int16Type, Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType,
29 IntervalYearMonthType, RunEndIndexType, Time32MillisecondType, Time32SecondType,
30 Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
31 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type,
32 UInt32Type, UInt64Type,
33};
34use arrow_array::{
35 Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
36 FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray,
37 GenericListViewArray, GenericStringArray, LargeListArray, LargeListViewArray, ListArray,
38 ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
39 StringViewArray, StructArray, UnionArray,
40};
41#[cfg(feature = "small_decimals")]
42use arrow_array::{Decimal32Array, Decimal64Array};
43use arrow_buffer::{ArrowNativeType, NullBuffer};
44use arrow_schema::{DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode};
45use bytes::{BufMut, BytesMut};
46use std::io::Write;
47use std::sync::Arc;
48use uuid::Uuid;
49
50macro_rules! for_rows_with_prefix {
51 ($n:expr, $prefix:expr, $out:ident, |$row:ident| $body:block) => {{
52 match $prefix {
53 Some(prefix) => {
54 for $row in 0..$n {
55 $out.write_all(prefix)
56 .map_err(|e| AvroError::IoError(format!("write prefix: {e}"), e))?;
57 $body
58 }
59 }
60 None => {
61 for $row in 0..$n {
62 $body
63 }
64 }
65 }
66 }};
67}
68
69#[inline]
73pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), AvroError> {
74 let mut zz = ((value << 1) ^ (value >> 63)) as u64;
75 let mut buf = [0u8; 10];
77 let mut i = 0;
78 while (zz & !0x7F) != 0 {
79 buf[i] = ((zz & 0x7F) as u8) | 0x80;
80 i += 1;
81 zz >>= 7;
82 }
83 buf[i] = (zz & 0x7F) as u8;
84 i += 1;
85 out.write_all(&buf[..i])
86 .map_err(|e| AvroError::IoError(format!("write long: {e}"), e))
87}
88
89#[inline]
90fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), AvroError> {
91 write_long(out, value as i64)
92}
93
94#[inline]
95fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), AvroError> {
96 write_long(out, bytes.len() as i64)?;
97 out.write_all(bytes)
98 .map_err(|e| AvroError::IoError(format!("write bytes: {e}"), e))
99}
100
101#[inline]
102fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), AvroError> {
103 out.write_all(&[if v { 1 } else { 0 }])
104 .map_err(|e| AvroError::IoError(format!("write bool: {e}"), e))
105}
106
107#[inline]
116fn minimal_twos_complement(be: &[u8]) -> &[u8] {
117 if be.is_empty() {
118 return be;
119 }
120 let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
121 let mut k = 0usize;
122 while k < be.len() && be[k] == sign_byte {
123 k += 1;
124 }
125 if k == 0 {
126 return be;
127 }
128 if k == be.len() {
129 return &be[be.len() - 1..];
130 }
131 let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
132 k
133 } else {
134 k - 1
135 };
136 &be[drop..]
137}
138
139#[inline]
151fn write_sign_extended<W: Write + ?Sized>(
152 out: &mut W,
153 src_be: &[u8],
154 n: usize,
155) -> Result<(), AvroError> {
156 let len = src_be.len();
157 if len == n {
158 out.write_all(src_be)?;
159 return Ok(());
160 }
161 let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
162 0xFF
163 } else {
164 0x00
165 };
166 if len > n {
167 let extra = len - n;
168 if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
169 return Ok(());
170 }
171 if src_be[..extra].iter().any(|&b| b != sign_byte)
174 || ((src_be[extra] ^ sign_byte) & 0x80) != 0
175 {
176 return Err(AvroError::InvalidArgument(format!(
177 "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
178 )));
179 }
180 return out
181 .write_all(&src_be[extra..])
182 .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e));
183 }
184 let pad_len = n - len;
186 const ZPAD: [u8; 64] = [0x00; 64];
188 const FPAD: [u8; 64] = [0xFF; 64];
189 let pad = if sign_byte == 0x00 {
190 &ZPAD[..]
191 } else {
192 &FPAD[..]
193 };
194 let mut rem = pad_len;
197 while rem >= pad.len() {
198 out.write_all(pad)
199 .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))?;
200 rem -= pad.len();
201 }
202 if rem > 0 {
203 out.write_all(&pad[..rem])
204 .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))?;
205 }
206 out.write_all(src_be)
207 .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))
208}
209
210fn write_optional_index<W: Write + ?Sized>(
216 out: &mut W,
217 is_null: bool,
218 null_order: Nullability,
219) -> Result<(), AvroError> {
220 let byte = union_value_branch_byte(null_order, is_null);
221 out.write_all(&[byte])
222 .map_err(|e| AvroError::IoError(format!("write union branch: {e}"), e))
223}
224
225#[derive(Debug, Clone)]
226enum NullState<'a> {
227 NonNullable,
228 NullableNoNulls {
229 union_value_byte: u8,
230 },
231 Nullable {
232 nulls: &'a NullBuffer,
233 null_order: Nullability,
234 },
235}
236
237pub(crate) struct FieldEncoder<'a> {
241 encoder: Encoder<'a>,
242 null_state: NullState<'a>,
243}
244
245impl<'a> FieldEncoder<'a> {
246 fn make_encoder(
247 array: &'a dyn Array,
248 plan: &FieldPlan,
249 nullability: Option<Nullability>,
250 ) -> Result<Self, AvroError> {
251 let encoder = match plan {
252 FieldPlan::Scalar => match array.data_type() {
253 DataType::Null => Encoder::Null,
254 DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
255 DataType::Utf8 => {
256 Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
257 }
258 DataType::LargeUtf8 => {
259 Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
260 }
261 DataType::Utf8View => {
262 let arr = array
263 .as_any()
264 .downcast_ref::<StringViewArray>()
265 .ok_or_else(|| AvroError::SchemaError("Expected StringViewArray".into()))?;
266 Encoder::Utf8View(Utf8ViewEncoder(arr))
267 }
268 DataType::BinaryView => {
269 let arr = array
270 .as_any()
271 .downcast_ref::<BinaryViewArray>()
272 .ok_or_else(|| AvroError::SchemaError("Expected BinaryViewArray".into()))?;
273 Encoder::BinaryView(BinaryViewEncoder(arr))
274 }
275 DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
276 DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
277 #[cfg(feature = "avro_custom_types")]
278 DataType::Int8 => Encoder::Int8(Int8Encoder(array.as_primitive::<Int8Type>())),
279 #[cfg(not(feature = "avro_custom_types"))]
280 DataType::Int8 => {
281 Encoder::Int8ToInt(Int8ToIntEncoder(array.as_primitive::<Int8Type>()))
282 }
283 #[cfg(feature = "avro_custom_types")]
284 DataType::Int16 => Encoder::Int16(Int16Encoder(array.as_primitive::<Int16Type>())),
285 #[cfg(not(feature = "avro_custom_types"))]
286 DataType::Int16 => {
287 Encoder::Int16ToInt(Int16ToIntEncoder(array.as_primitive::<Int16Type>()))
288 }
289 #[cfg(feature = "avro_custom_types")]
290 DataType::UInt8 => Encoder::UInt8(UInt8Encoder(array.as_primitive::<UInt8Type>())),
291 #[cfg(not(feature = "avro_custom_types"))]
292 DataType::UInt8 => {
293 Encoder::UInt8ToInt(UInt8ToIntEncoder(array.as_primitive::<UInt8Type>()))
294 }
295 #[cfg(feature = "avro_custom_types")]
296 DataType::UInt16 => {
297 Encoder::UInt16(UInt16Encoder(array.as_primitive::<UInt16Type>()))
298 }
299 #[cfg(not(feature = "avro_custom_types"))]
300 DataType::UInt16 => {
301 Encoder::UInt16ToInt(UInt16ToIntEncoder(array.as_primitive::<UInt16Type>()))
302 }
303 #[cfg(feature = "avro_custom_types")]
304 DataType::UInt32 => {
305 Encoder::UInt32(UInt32Encoder(array.as_primitive::<UInt32Type>()))
306 }
307 #[cfg(not(feature = "avro_custom_types"))]
308 DataType::UInt32 => {
309 Encoder::UInt32ToLong(UInt32ToLongEncoder(array.as_primitive::<UInt32Type>()))
310 }
311 #[cfg(feature = "avro_custom_types")]
312 DataType::UInt64 => {
313 Encoder::UInt64Fixed(UInt64FixedEncoder(array.as_primitive::<UInt64Type>()))
314 }
315 #[cfg(not(feature = "avro_custom_types"))]
316 DataType::UInt64 => {
317 Encoder::UInt64ToLong(UInt64ToLongEncoder(array.as_primitive::<UInt64Type>()))
318 }
319 #[cfg(feature = "avro_custom_types")]
320 DataType::Float16 => {
321 Encoder::Float16Fixed(Float16FixedEncoder(array.as_primitive::<Float16Type>()))
322 }
323 #[cfg(not(feature = "avro_custom_types"))]
324 DataType::Float16 => Encoder::Float16ToFloat(Float16ToFloatEncoder(
325 array.as_primitive::<Float16Type>(),
326 )),
327 DataType::Date32 => Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
328 #[cfg(not(feature = "avro_custom_types"))]
329 DataType::Date64 => {
330 Encoder::Date64ToLong(Date64ToLongEncoder(array.as_primitive::<Date64Type>()))
332 }
333 #[cfg(feature = "avro_custom_types")]
334 DataType::Date64 => {
335 Encoder::Date64(LongEncoder(array.as_primitive::<Date64Type>()))
336 }
337 #[cfg(feature = "avro_custom_types")]
338 DataType::Time32(TimeUnit::Second) => {
339 Encoder::Time32Secs(IntEncoder(array.as_primitive::<Time32SecondType>()))
340 }
341 DataType::Time32(TimeUnit::Millisecond) => {
342 Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
343 }
344 DataType::Time32(TimeUnit::Microsecond) => {
345 return Err(AvroError::InvalidArgument(
346 "Arrow Time32 only supports Second or Millisecond. Use Time64 for microseconds."
347 .into(),
348 ));
349 }
350 DataType::Time32(TimeUnit::Nanosecond) => {
351 return Err(AvroError::InvalidArgument(
352 "Arrow Time32 only supports Second or Millisecond. Use Time64 for nanoseconds."
353 .into(),
354 ));
355 }
356 DataType::Time64(TimeUnit::Microsecond) => Encoder::Time64Micros(LongEncoder(
357 array.as_primitive::<Time64MicrosecondType>(),
358 )),
359 #[cfg(not(feature = "avro_custom_types"))]
360 DataType::Time64(TimeUnit::Nanosecond) => {
361 Encoder::Time64NanosToMicros(Time64NanosToMicrosEncoder(
363 array.as_primitive::<Time64NanosecondType>(),
364 ))
365 }
366 #[cfg(feature = "avro_custom_types")]
367 DataType::Time64(TimeUnit::Nanosecond) => {
368 Encoder::Time64Nanos(LongEncoder(array.as_primitive::<Time64NanosecondType>()))
369 }
370 DataType::Time64(TimeUnit::Millisecond) => {
371 return Err(AvroError::InvalidArgument(
372 "Arrow Time64 with millisecond unit is not a valid Arrow type (use Time32 for millis)."
373 .into(),
374 ));
375 }
376 DataType::Time64(TimeUnit::Second) => {
377 return Err(AvroError::InvalidArgument(
378 "Arrow Time64 with second unit is not a valid Arrow type (use Time32 for seconds)."
379 .into(),
380 ));
381 }
382 DataType::Float32 => {
383 Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
384 }
385 DataType::Float64 => {
386 Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
387 }
388 DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
389 DataType::LargeBinary => {
390 Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
391 }
392 DataType::FixedSizeBinary(_len) => {
393 let arr = array
394 .as_any()
395 .downcast_ref::<FixedSizeBinaryArray>()
396 .ok_or_else(|| {
397 AvroError::SchemaError("Expected FixedSizeBinaryArray".into())
398 })?;
399 Encoder::Fixed(FixedEncoder(arr))
400 }
401 DataType::Timestamp(unit, _) => match unit {
402 TimeUnit::Second => {
403 #[cfg(not(feature = "avro_custom_types"))]
404 {
405 Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
406 array.as_primitive::<TimestampSecondType>(),
407 ))
408 }
409 #[cfg(feature = "avro_custom_types")]
410 {
411 Encoder::TimestampSecs(LongEncoder(
412 array.as_primitive::<TimestampSecondType>(),
413 ))
414 }
415 }
416 TimeUnit::Millisecond => Encoder::TimestampMillis(LongEncoder(
417 array.as_primitive::<TimestampMillisecondType>(),
418 )),
419 TimeUnit::Microsecond => Encoder::TimestampMicros(LongEncoder(
420 array.as_primitive::<TimestampMicrosecondType>(),
421 )),
422 TimeUnit::Nanosecond => Encoder::TimestampNanos(LongEncoder(
423 array.as_primitive::<TimestampNanosecondType>(),
424 )),
425 },
426 #[cfg(feature = "avro_custom_types")]
427 DataType::Interval(unit) => match unit {
428 IntervalUnit::MonthDayNano => {
429 Encoder::IntervalMonthDayNanoFixed(IntervalMonthDayNanoFixedEncoder(
430 array.as_primitive::<IntervalMonthDayNanoType>(),
431 ))
432 }
433 IntervalUnit::YearMonth => {
434 Encoder::IntervalYearMonthFixed(IntervalYearMonthFixedEncoder(
435 array.as_primitive::<IntervalYearMonthType>(),
436 ))
437 }
438 IntervalUnit::DayTime => Encoder::IntervalDayTimeFixed(
439 IntervalDayTimeFixedEncoder(array.as_primitive::<IntervalDayTimeType>()),
440 ),
441 },
442 DataType::Duration(tu) => match tu {
443 TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
444 array.as_primitive::<DurationSecondType>(),
445 )),
446 TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
447 array.as_primitive::<DurationMillisecondType>(),
448 )),
449 TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
450 array.as_primitive::<DurationMicrosecondType>(),
451 )),
452 TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
453 array.as_primitive::<DurationNanosecondType>(),
454 )),
455 },
456 other => {
457 return Err(AvroError::NYI(format!(
458 "Avro scalar type not yet supported: {other:?}"
459 )));
460 }
461 },
462 FieldPlan::Struct { bindings } => {
463 let arr = array
464 .as_any()
465 .downcast_ref::<StructArray>()
466 .ok_or_else(|| AvroError::SchemaError("Expected StructArray".into()))?;
467 Encoder::Struct(Box::new(StructEncoder::try_new(arr, bindings)?))
468 }
469 FieldPlan::List {
470 items_nullability,
471 item_plan,
472 } => match array.data_type() {
473 DataType::List(_) => {
474 let arr = array
475 .as_any()
476 .downcast_ref::<ListArray>()
477 .ok_or_else(|| AvroError::SchemaError("Expected ListArray".into()))?;
478 Encoder::List(Box::new(ListEncoder32::try_new(
479 arr,
480 *items_nullability,
481 item_plan.as_ref(),
482 )?))
483 }
484 DataType::LargeList(_) => {
485 let arr = array
486 .as_any()
487 .downcast_ref::<LargeListArray>()
488 .ok_or_else(|| AvroError::SchemaError("Expected LargeListArray".into()))?;
489 Encoder::LargeList(Box::new(ListEncoder64::try_new(
490 arr,
491 *items_nullability,
492 item_plan.as_ref(),
493 )?))
494 }
495 DataType::ListView(_) => {
496 let arr = array
497 .as_any()
498 .downcast_ref::<ListViewArray>()
499 .ok_or_else(|| AvroError::SchemaError("Expected ListViewArray".into()))?;
500 Encoder::ListView(Box::new(ListViewEncoder32::try_new(
501 arr,
502 *items_nullability,
503 item_plan.as_ref(),
504 )?))
505 }
506 DataType::LargeListView(_) => {
507 let arr = array
508 .as_any()
509 .downcast_ref::<LargeListViewArray>()
510 .ok_or_else(|| {
511 AvroError::SchemaError("Expected LargeListViewArray".into())
512 })?;
513 Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
514 arr,
515 *items_nullability,
516 item_plan.as_ref(),
517 )?))
518 }
519 DataType::FixedSizeList(_, _) => {
520 let arr = array
521 .as_any()
522 .downcast_ref::<FixedSizeListArray>()
523 .ok_or_else(|| {
524 AvroError::SchemaError("Expected FixedSizeListArray".into())
525 })?;
526 Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
527 arr,
528 *items_nullability,
529 item_plan.as_ref(),
530 )?))
531 }
532 other => {
533 return Err(AvroError::SchemaError(format!(
534 "Avro array site requires Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
535 )));
536 }
537 },
538 FieldPlan::Decimal { size } => match array.data_type() {
539 #[cfg(feature = "small_decimals")]
540 DataType::Decimal32(_, _) => {
541 let arr = array
542 .as_any()
543 .downcast_ref::<Decimal32Array>()
544 .ok_or_else(|| AvroError::SchemaError("Expected Decimal32Array".into()))?;
545 Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
546 }
547 #[cfg(feature = "small_decimals")]
548 DataType::Decimal64(_, _) => {
549 let arr = array
550 .as_any()
551 .downcast_ref::<Decimal64Array>()
552 .ok_or_else(|| AvroError::SchemaError("Expected Decimal64Array".into()))?;
553 Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
554 }
555 DataType::Decimal128(_, _) => {
556 let arr = array
557 .as_any()
558 .downcast_ref::<Decimal128Array>()
559 .ok_or_else(|| AvroError::SchemaError("Expected Decimal128Array".into()))?;
560 Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
561 }
562 DataType::Decimal256(_, _) => {
563 let arr = array
564 .as_any()
565 .downcast_ref::<Decimal256Array>()
566 .ok_or_else(|| AvroError::SchemaError("Expected Decimal256Array".into()))?;
567 Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
568 }
569 other => {
570 return Err(AvroError::SchemaError(format!(
571 "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
572 )));
573 }
574 },
575 FieldPlan::Uuid => {
576 let arr = array
577 .as_any()
578 .downcast_ref::<FixedSizeBinaryArray>()
579 .ok_or_else(|| {
580 AvroError::SchemaError("Expected FixedSizeBinaryArray".into())
581 })?;
582 Encoder::Uuid(UuidEncoder(arr))
583 }
584 FieldPlan::Map {
585 values_nullability,
586 value_plan,
587 } => {
588 let arr = array
589 .as_any()
590 .downcast_ref::<MapArray>()
591 .ok_or_else(|| AvroError::SchemaError("Expected MapArray".into()))?;
592 Encoder::Map(Box::new(MapEncoder::try_new(
593 arr,
594 *values_nullability,
595 value_plan.as_ref(),
596 )?))
597 }
598 FieldPlan::Enum { symbols } => match array.data_type() {
599 DataType::Dictionary(key_dt, value_dt) => {
600 if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
601 return Err(AvroError::SchemaError(
602 "Avro enum requires Dictionary<Int32, Utf8>".into(),
603 ));
604 }
605 let dict = array
606 .as_any()
607 .downcast_ref::<DictionaryArray<Int32Type>>()
608 .ok_or_else(|| {
609 AvroError::SchemaError("Expected DictionaryArray<Int32>".into())
610 })?;
611 let values = dict
612 .values()
613 .as_any()
614 .downcast_ref::<StringArray>()
615 .ok_or_else(|| {
616 AvroError::SchemaError("Dictionary values must be Utf8".into())
617 })?;
618 if values.len() != symbols.len() {
619 return Err(AvroError::SchemaError(format!(
620 "Enum symbol length {} != dictionary size {}",
621 symbols.len(),
622 values.len()
623 )));
624 }
625 for i in 0..values.len() {
626 if values.value(i) != symbols[i].as_str() {
627 return Err(AvroError::SchemaError(format!(
628 "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
629 symbols[i],
630 values.value(i)
631 )));
632 }
633 }
634 let keys = dict.keys();
635 Encoder::Enum(EnumEncoder { keys })
636 }
637 other => {
638 return Err(AvroError::SchemaError(format!(
639 "Avro enum site requires DataType::Dictionary, found: {other:?}"
640 )));
641 }
642 },
643 FieldPlan::Union { bindings } => {
644 let arr = array
645 .as_any()
646 .downcast_ref::<UnionArray>()
647 .ok_or_else(|| AvroError::SchemaError("Expected UnionArray".into()))?;
648
649 Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
650 }
651 FieldPlan::RunEndEncoded {
652 values_nullability,
653 value_plan,
654 } => {
655 let build = |run_arr_any: &'a dyn Array| -> Result<Encoder<'a>, AvroError> {
657 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
658 return Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
659 Int16Type,
660 >::new(
661 arr,
662 FieldEncoder::make_encoder(
663 arr.values().as_ref(),
664 value_plan.as_ref(),
665 *values_nullability,
666 )?,
667 ))));
668 }
669 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
670 return Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
671 Int32Type,
672 >::new(
673 arr,
674 FieldEncoder::make_encoder(
675 arr.values().as_ref(),
676 value_plan.as_ref(),
677 *values_nullability,
678 )?,
679 ))));
680 }
681 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
682 return Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
683 Int64Type,
684 >::new(
685 arr,
686 FieldEncoder::make_encoder(
687 arr.values().as_ref(),
688 value_plan.as_ref(),
689 *values_nullability,
690 )?,
691 ))));
692 }
693 Err(AvroError::SchemaError(
694 "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64"
695 .into(),
696 ))
697 };
698 build(array)?
699 }
700 FieldPlan::Duration => match array.data_type() {
701 DataType::Interval(IntervalUnit::MonthDayNano) => {
702 Encoder::IntervalMonthDayNanoDuration(DurationEncoder(
703 array.as_primitive::<IntervalMonthDayNanoType>(),
704 ))
705 }
706 DataType::Interval(IntervalUnit::YearMonth) => Encoder::IntervalYearMonthDuration(
707 DurationEncoder(array.as_primitive::<IntervalYearMonthType>()),
708 ),
709 DataType::Interval(IntervalUnit::DayTime) => Encoder::IntervalDayTimeDuration(
710 DurationEncoder(array.as_primitive::<IntervalDayTimeType>()),
711 ),
712 other => {
713 return Err(AvroError::SchemaError(format!(
714 "Avro duration requires Arrow Interval type, found: {other:?}"
715 )));
716 }
717 },
718 FieldPlan::TimeMillisFromSecs => match array.data_type() {
719 DataType::Time32(TimeUnit::Second) => Encoder::Time32SecsToMillis(
720 Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
721 ),
722 other => {
723 return Err(AvroError::SchemaError(format!(
724 "Avro time-millis-from-seconds requires Arrow Time32(Second), found: {other:?}"
725 )));
726 }
727 },
728 };
729 let null_state = match nullability {
731 None => NullState::NonNullable,
732 Some(null_order) => match array.nulls() {
733 Some(nulls) if array.null_count() > 0 => NullState::Nullable { nulls, null_order },
734 _ => NullState::NullableNoNulls {
735 union_value_byte: union_value_branch_byte(null_order, false),
737 },
738 },
739 };
740 Ok(Self {
741 encoder,
742 null_state,
743 })
744 }
745
746 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
747 match &self.null_state {
748 NullState::NonNullable => {}
749 NullState::NullableNoNulls { union_value_byte } => out
750 .write_all(&[*union_value_byte])
751 .map_err(|e| AvroError::IoError(format!("write union value branch: {e}"), e))?,
752 NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
753 return write_optional_index(out, true, *null_order); }
755 NullState::Nullable { null_order, .. } => {
756 write_optional_index(out, false, *null_order)?;
757 }
758 }
759 self.encoder.encode(out, idx)
760 }
761}
762
763fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
764 let nulls_first = null_order == Nullability::default();
765 if nulls_first == is_null { 0x00 } else { 0x02 }
766}
767
768#[derive(Debug, Clone)]
771enum FieldPlan {
772 Scalar,
774 Struct { bindings: Vec<FieldBinding> },
776 List {
778 items_nullability: Option<Nullability>,
779 item_plan: Box<FieldPlan>,
780 },
781 Decimal { size: Option<usize> },
783 Uuid,
785 Map {
787 values_nullability: Option<Nullability>,
788 value_plan: Box<FieldPlan>,
789 },
790 Enum { symbols: Arc<[String]> },
793 Union { bindings: Vec<FieldBinding> },
795 RunEndEncoded {
798 values_nullability: Option<Nullability>,
799 value_plan: Box<FieldPlan>,
800 },
801 Duration,
805 TimeMillisFromSecs,
809}
810
811#[derive(Debug, Clone)]
812struct FieldBinding {
813 arrow_index: usize,
815 nullability: Option<Nullability>,
817 plan: FieldPlan,
819}
820
821#[derive(Debug)]
823pub(crate) struct RecordEncoderBuilder<'a> {
824 avro_root: &'a AvroField,
825 arrow_schema: &'a ArrowSchema,
826 fingerprint: Option<Fingerprint>,
827}
828
829impl<'a> RecordEncoderBuilder<'a> {
830 pub(crate) fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
832 Self {
833 avro_root,
834 arrow_schema,
835 fingerprint: None,
836 }
837 }
838
839 pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
840 self.fingerprint = fingerprint;
841 self
842 }
843
844 pub(crate) fn build(self) -> Result<RecordEncoder, AvroError> {
847 let avro_root_dt = self.avro_root.data_type();
848 let Codec::Struct(root_fields) = avro_root_dt.codec() else {
849 return Err(AvroError::SchemaError(
850 "Top-level Avro schema must be a record/struct".into(),
851 ));
852 };
853 let mut columns = Vec::with_capacity(root_fields.len());
854 for root_field in root_fields.as_ref() {
855 let name = root_field.name();
856 let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
857 AvroError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
858 })?;
859 columns.push(FieldBinding {
860 arrow_index,
861 nullability: root_field.data_type().nullability(),
862 plan: FieldPlan::build(
863 root_field.data_type(),
864 self.arrow_schema.field(arrow_index),
865 )?,
866 });
867 }
868 Ok(RecordEncoder {
869 columns,
870 prefix: self.fingerprint.map(|fp| fp.make_prefix()),
871 })
872 }
873}
874
875#[derive(Debug, Clone)]
881pub(crate) struct RecordEncoder {
882 columns: Vec<FieldBinding>,
883 prefix: Option<Prefix>,
885}
886
887impl RecordEncoder {
888 fn prepare_for_batch<'a>(
889 &'a self,
890 batch: &'a RecordBatch,
891 ) -> Result<Vec<FieldEncoder<'a>>, AvroError> {
892 let arrays = batch.columns();
893 let mut out = Vec::with_capacity(self.columns.len());
894 for col_plan in self.columns.iter() {
895 let arrow_index = col_plan.arrow_index;
896 let array = arrays.get(arrow_index).ok_or_else(|| {
897 AvroError::SchemaError(format!("Column index {arrow_index} out of range"))
898 })?;
899 #[cfg(not(feature = "avro_custom_types"))]
900 let site_nullability = match &col_plan.plan {
901 FieldPlan::RunEndEncoded { .. } => None,
902 _ => col_plan.nullability,
903 };
904 #[cfg(feature = "avro_custom_types")]
905 let site_nullability = col_plan.nullability;
906 out.push(FieldEncoder::make_encoder(
907 array.as_ref(),
908 &col_plan.plan,
909 site_nullability,
910 )?);
911 }
912 Ok(out)
913 }
914
915 pub(crate) fn encode<W: Write>(
919 &self,
920 out: &mut W,
921 batch: &RecordBatch,
922 ) -> Result<(), AvroError> {
923 let mut column_encoders = self.prepare_for_batch(batch)?;
924 let n = batch.num_rows();
925 let prefix = self.prefix.as_ref().map(|p| p.as_slice());
926 for_rows_with_prefix!(n, prefix, out, |row| {
927 for enc in column_encoders.iter_mut() {
928 enc.encode(out, row)?;
929 }
930 });
931 Ok(())
932 }
933
934 pub(crate) fn encode_rows(
943 &self,
944 batch: &RecordBatch,
945 row_capacity: usize,
946 out: &mut BytesMut,
947 offsets: &mut Vec<usize>,
948 ) -> Result<(), AvroError> {
949 let out_len = out.len();
950 if offsets.first() != Some(&0) || offsets.last() != Some(&out_len) {
951 return Err(AvroError::General(
952 "encode_rows requires offsets to start with 0 and end at out.len()".to_string(),
953 ));
954 }
955 let n = batch.num_rows();
956 if n == 0 {
957 return Ok(());
958 }
959 if offsets.len().checked_add(n).is_none() {
960 return Err(AvroError::General(
961 "encode_rows cannot append offsets: too many rows".to_string(),
962 ));
963 }
964 let mut column_encoders = self.prepare_for_batch(batch)?;
965 offsets.reserve(n);
966 let prefix_bytes = self.prefix.as_ref().map(|p| p.as_slice());
967 let prefix_len = prefix_bytes.map_or(0, |p| p.len());
968 let per_row_hint = row_capacity.max(prefix_len);
969 if let Some(additional) = n
970 .checked_mul(per_row_hint)
971 .filter(|&a| out_len.checked_add(a).is_some())
972 {
973 out.reserve(additional);
974 }
975 let start_out_len = out.len();
976 let start_offsets_len = offsets.len();
977 let res = (|| -> Result<(), AvroError> {
978 let mut w = out.writer();
979 if let [enc0] = column_encoders.as_mut_slice() {
980 for_rows_with_prefix!(n, prefix_bytes, w, |row| {
981 enc0.encode(&mut w, row)?;
982 offsets.push(w.get_ref().len());
983 });
984 } else {
985 for_rows_with_prefix!(n, prefix_bytes, w, |row| {
986 for enc in column_encoders.iter_mut() {
987 enc.encode(&mut w, row)?;
988 }
989 offsets.push(w.get_ref().len());
990 });
991 }
992 Ok(())
993 })();
994 if res.is_err() {
995 out.truncate(start_out_len);
996 offsets.truncate(start_offsets_len);
997 } else {
998 debug_assert_eq!(
999 *offsets.last().unwrap(),
1000 out.len(),
1001 "encode_rows: offsets/out length mismatch after successful encode"
1002 );
1003 }
1004 res
1005 }
1006}
1007
1008fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
1009 fields.iter().position(|f| f.name() == name)
1010}
1011
1012fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
1013 find_struct_child_index(fields, "value")
1015 .or_else(|| find_struct_child_index(fields, "values"))
1016 .or_else(|| if fields.len() == 2 { Some(1) } else { None })
1017}
1018
1019impl FieldPlan {
1020 fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, AvroError> {
1021 #[cfg(not(feature = "avro_custom_types"))]
1022 if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() {
1023 let values_nullability = avro_dt.nullability();
1024 let value_site_dt: &AvroDataType = match avro_dt.codec() {
1025 Codec::Union(branches, _, _) => branches
1026 .iter()
1027 .find(|b| !matches!(b.codec(), Codec::Null))
1028 .ok_or_else(|| {
1029 AvroError::SchemaError(
1030 "Avro union at RunEndEncoded site has no non-null branch".into(),
1031 )
1032 })?,
1033 _ => avro_dt,
1034 };
1035 return Ok(FieldPlan::RunEndEncoded {
1036 values_nullability,
1037 value_plan: Box::new(FieldPlan::build(value_site_dt, values_field.as_ref())?),
1038 });
1039 }
1040 if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
1041 let ext_is_uuid = {
1043 #[cfg(feature = "canonical_extension_types")]
1044 {
1045 matches!(
1046 arrow_field.extension_type_name(),
1047 Some("arrow.uuid") | Some("uuid")
1048 )
1049 }
1050 #[cfg(not(feature = "canonical_extension_types"))]
1051 {
1052 false
1053 }
1054 };
1055 let md_is_uuid = arrow_field
1056 .metadata()
1057 .get("logicalType")
1058 .map(|s| s.as_str())
1059 == Some("uuid");
1060 if ext_is_uuid || md_is_uuid {
1061 if *len != 16 {
1062 return Err(AvroError::InvalidArgument(
1063 "logicalType=uuid requires FixedSizeBinary(16)".into(),
1064 ));
1065 }
1066 return Ok(FieldPlan::Uuid);
1067 }
1068 }
1069 match avro_dt.codec() {
1070 Codec::Struct(avro_fields) => {
1071 let fields = match arrow_field.data_type() {
1072 DataType::Struct(struct_fields) => struct_fields,
1073 other => {
1074 return Err(AvroError::SchemaError(format!(
1075 "Avro struct maps to Arrow Struct, found: {other:?}"
1076 )));
1077 }
1078 };
1079 let mut bindings = Vec::with_capacity(avro_fields.len());
1080 for avro_field in avro_fields.iter() {
1081 let name = avro_field.name().to_string();
1082 let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
1083 AvroError::SchemaError(format!(
1084 "Struct field '{name}' not present in Arrow field '{}'",
1085 arrow_field.name()
1086 ))
1087 })?;
1088 bindings.push(FieldBinding {
1089 arrow_index: idx,
1090 nullability: avro_field.data_type().nullability(),
1091 plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
1092 });
1093 }
1094 Ok(FieldPlan::Struct { bindings })
1095 }
1096 Codec::List(items_dt) => match arrow_field.data_type() {
1097 DataType::List(field_ref)
1098 | DataType::LargeList(field_ref)
1099 | DataType::ListView(field_ref)
1100 | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
1101 items_nullability: items_dt.nullability(),
1102 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
1103 }),
1104 DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List {
1105 items_nullability: items_dt.nullability(),
1106 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
1107 }),
1108 other => Err(AvroError::SchemaError(format!(
1109 "Avro array maps to Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
1110 ))),
1111 },
1112 Codec::Map(values_dt) => {
1113 let entries_field = match arrow_field.data_type() {
1114 DataType::Map(entries, _sorted) => entries.as_ref(),
1115 other => {
1116 return Err(AvroError::SchemaError(format!(
1117 "Avro map maps to Arrow DataType::Map, found: {other:?}"
1118 )));
1119 }
1120 };
1121 let entries_struct_fields = match entries_field.data_type() {
1122 DataType::Struct(fs) => fs,
1123 other => {
1124 return Err(AvroError::SchemaError(format!(
1125 "Arrow Map entries must be Struct, found: {other:?}"
1126 )));
1127 }
1128 };
1129 let value_idx =
1130 find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
1131 AvroError::SchemaError("Map entries struct missing value field".into())
1132 })?;
1133 let value_field = entries_struct_fields[value_idx].as_ref();
1134 let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
1135 Ok(FieldPlan::Map {
1136 values_nullability: values_dt.nullability(),
1137 value_plan: Box::new(value_plan),
1138 })
1139 }
1140 Codec::Enum(symbols) => match arrow_field.data_type() {
1141 DataType::Dictionary(key_dt, value_dt) => {
1142 if **key_dt != DataType::Int32 {
1143 return Err(AvroError::SchemaError(
1144 "Avro enum requires Dictionary<Int32, Utf8>".into(),
1145 ));
1146 }
1147 if **value_dt != DataType::Utf8 {
1148 return Err(AvroError::SchemaError(
1149 "Avro enum requires Dictionary<Int32, Utf8>".into(),
1150 ));
1151 }
1152 Ok(FieldPlan::Enum {
1153 symbols: symbols.clone(),
1154 })
1155 }
1156 other => Err(AvroError::SchemaError(format!(
1157 "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
1158 ))),
1159 },
1160 Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
1162 let (ap, as_) = match arrow_field.data_type() {
1163 #[cfg(feature = "small_decimals")]
1164 DataType::Decimal32(p, s) => (*p as usize, *s as i32),
1165 #[cfg(feature = "small_decimals")]
1166 DataType::Decimal64(p, s) => (*p as usize, *s as i32),
1167 DataType::Decimal128(p, s) => (*p as usize, *s as i32),
1168 DataType::Decimal256(p, s) => (*p as usize, *s as i32),
1169 other => {
1170 return Err(AvroError::SchemaError(format!(
1171 "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
1172 arrow_field.name()
1173 )));
1174 }
1175 };
1176 let sc = scale_opt.unwrap_or(0) as i32; if ap != *precision || as_ != sc {
1178 return Err(AvroError::SchemaError(format!(
1179 "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
1180 arrow_field.name()
1181 )));
1182 }
1183 Ok(FieldPlan::Decimal {
1184 size: *fixed_size_opt,
1185 })
1186 }
1187 Codec::Interval => match arrow_field.data_type() {
1188 DataType::Interval(
1189 IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime,
1190 ) => Ok(FieldPlan::Duration),
1191 other => Err(AvroError::SchemaError(format!(
1192 "Avro 'duration' logical type requires an Arrow Interval (MonthDayNano, YearMonth, or DayTime), found: {other:?}"
1193 ))),
1194 },
1195 Codec::Union(avro_branches, _, UnionMode::Dense) => {
1196 let arrow_union_fields = match arrow_field.data_type() {
1197 DataType::Union(fields, UnionMode::Dense) => fields,
1198 DataType::Union(_, UnionMode::Sparse) => {
1199 return Err(AvroError::NYI(
1200 "Sparse Arrow unions are not yet supported".to_string(),
1201 ));
1202 }
1203 other => {
1204 return Err(AvroError::SchemaError(format!(
1205 "Avro union maps to Arrow Union, found: {other:?}"
1206 )));
1207 }
1208 };
1209 if avro_branches.len() != arrow_union_fields.len() {
1210 return Err(AvroError::SchemaError(format!(
1211 "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'",
1212 avro_branches.len(),
1213 arrow_union_fields.len(),
1214 arrow_field.name()
1215 )));
1216 }
1217 let bindings = avro_branches
1218 .iter()
1219 .zip(arrow_union_fields.iter())
1220 .enumerate()
1221 .map(|(i, (avro_branch, (_, arrow_child_field)))| {
1222 Ok(FieldBinding {
1223 arrow_index: i,
1224 nullability: avro_branch.nullability(),
1225 plan: FieldPlan::build(avro_branch, arrow_child_field)?,
1226 })
1227 })
1228 .collect::<Result<Vec<_>, AvroError>>()?;
1229 Ok(FieldPlan::Union { bindings })
1230 }
1231 Codec::Union(_, _, UnionMode::Sparse) => Err(AvroError::NYI(
1232 "Sparse Arrow unions are not yet supported".to_string(),
1233 )),
1234 #[cfg(feature = "avro_custom_types")]
1235 Codec::RunEndEncoded(values_dt, _width_code) => {
1236 let values_field = match arrow_field.data_type() {
1237 DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(),
1238 other => {
1239 return Err(AvroError::SchemaError(format!(
1240 "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}"
1241 )));
1242 }
1243 };
1244 Ok(FieldPlan::RunEndEncoded {
1245 values_nullability: values_dt.nullability(),
1246 value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), values_field)?),
1247 })
1248 }
1249 Codec::TimeMillis => match arrow_field.data_type() {
1250 DataType::Time32(TimeUnit::Second) => Ok(FieldPlan::TimeMillisFromSecs),
1251 _ => Ok(FieldPlan::Scalar),
1252 },
1253 _ => Ok(FieldPlan::Scalar),
1254 }
1255 }
1256}
1257
1258enum Encoder<'a> {
1259 Boolean(BooleanEncoder<'a>),
1260 Int(IntEncoder<'a, Int32Type>),
1261 Long(LongEncoder<'a, Int64Type>),
1262 TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
1263 TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
1264 TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
1265 #[cfg(not(feature = "avro_custom_types"))]
1266 TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
1267 Date32(IntEncoder<'a, Date32Type>),
1268 Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
1269 Time32Millis(IntEncoder<'a, Time32MillisecondType>),
1270 Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
1271 DurationSeconds(LongEncoder<'a, DurationSecondType>),
1272 DurationMillis(LongEncoder<'a, DurationMillisecondType>),
1273 DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
1274 DurationNanos(LongEncoder<'a, DurationNanosecondType>),
1275 Float32(F32Encoder<'a>),
1276 Float64(F64Encoder<'a>),
1277 Binary(BinaryEncoder<'a, i32>),
1278 LargeBinary(BinaryEncoder<'a, i64>),
1279 Utf8(Utf8Encoder<'a>),
1280 Utf8Large(Utf8LargeEncoder<'a>),
1281 Utf8View(Utf8ViewEncoder<'a>),
1282 BinaryView(BinaryViewEncoder<'a>),
1283 List(Box<ListEncoder32<'a>>),
1284 LargeList(Box<ListEncoder64<'a>>),
1285 ListView(Box<ListViewEncoder32<'a>>),
1286 LargeListView(Box<ListViewEncoder64<'a>>),
1287 FixedSizeList(Box<FixedSizeListEncoder<'a>>),
1288 Struct(Box<StructEncoder<'a>>),
1289 Fixed(FixedEncoder<'a>),
1291 Uuid(UuidEncoder<'a>),
1293 IntervalMonthDayNanoDuration(DurationEncoder<'a, IntervalMonthDayNanoType>),
1295 #[cfg(feature = "avro_custom_types")]
1297 IntervalMonthDayNanoFixed(IntervalMonthDayNanoFixedEncoder<'a>),
1298 IntervalYearMonthDuration(DurationEncoder<'a, IntervalYearMonthType>),
1300 IntervalDayTimeDuration(DurationEncoder<'a, IntervalDayTimeType>),
1302 #[cfg(feature = "small_decimals")]
1303 Decimal32(Decimal32Encoder<'a>),
1304 #[cfg(feature = "small_decimals")]
1305 Decimal64(Decimal64Encoder<'a>),
1306 Decimal128(Decimal128Encoder<'a>),
1307 Decimal256(Decimal256Encoder<'a>),
1308 Enum(EnumEncoder<'a>),
1310 Map(Box<MapEncoder<'a>>),
1311 Union(Box<UnionEncoder<'a>>),
1312 RunEncoded16(Box<RunEncodedEncoder16<'a>>),
1314 RunEncoded32(Box<RunEncodedEncoder32<'a>>),
1315 RunEncoded64(Box<RunEncodedEncoder64<'a>>),
1316 Null,
1317 #[cfg(feature = "avro_custom_types")]
1318 Int8(Int8Encoder<'a>),
1319 #[cfg(feature = "avro_custom_types")]
1320 Int16(Int16Encoder<'a>),
1321 #[cfg(feature = "avro_custom_types")]
1322 UInt8(UInt8Encoder<'a>),
1323 #[cfg(feature = "avro_custom_types")]
1324 UInt16(UInt16Encoder<'a>),
1325 #[cfg(feature = "avro_custom_types")]
1326 UInt32(UInt32Encoder<'a>),
1327 #[cfg(feature = "avro_custom_types")]
1328 UInt64Fixed(UInt64FixedEncoder<'a>),
1329 #[cfg(feature = "avro_custom_types")]
1330 Float16Fixed(Float16FixedEncoder<'a>),
1331 #[cfg(feature = "avro_custom_types")]
1332 Date64(LongEncoder<'a, Date64Type>),
1333 #[cfg(feature = "avro_custom_types")]
1334 Time64Nanos(LongEncoder<'a, Time64NanosecondType>),
1335 #[cfg(feature = "avro_custom_types")]
1336 Time32Secs(IntEncoder<'a, Time32SecondType>),
1337 #[cfg(feature = "avro_custom_types")]
1338 TimestampSecs(LongEncoder<'a, TimestampSecondType>),
1339 #[cfg(feature = "avro_custom_types")]
1340 IntervalYearMonthFixed(IntervalYearMonthFixedEncoder<'a>),
1341 #[cfg(feature = "avro_custom_types")]
1342 IntervalDayTimeFixed(IntervalDayTimeFixedEncoder<'a>),
1343 #[cfg(not(feature = "avro_custom_types"))]
1344 Int8ToInt(Int8ToIntEncoder<'a>),
1345 #[cfg(not(feature = "avro_custom_types"))]
1346 Int16ToInt(Int16ToIntEncoder<'a>),
1347 #[cfg(not(feature = "avro_custom_types"))]
1348 UInt8ToInt(UInt8ToIntEncoder<'a>),
1349 #[cfg(not(feature = "avro_custom_types"))]
1350 UInt16ToInt(UInt16ToIntEncoder<'a>),
1351 #[cfg(not(feature = "avro_custom_types"))]
1352 UInt32ToLong(UInt32ToLongEncoder<'a>),
1353 #[cfg(not(feature = "avro_custom_types"))]
1354 UInt64ToLong(UInt64ToLongEncoder<'a>),
1355 #[cfg(not(feature = "avro_custom_types"))]
1356 Float16ToFloat(Float16ToFloatEncoder<'a>),
1357 #[cfg(not(feature = "avro_custom_types"))]
1358 Date64ToLong(Date64ToLongEncoder<'a>),
1359 #[cfg(not(feature = "avro_custom_types"))]
1360 Time64NanosToMicros(Time64NanosToMicrosEncoder<'a>),
1361}
1362
1363impl<'a> Encoder<'a> {
1364 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1366 match self {
1367 Encoder::Boolean(e) => e.encode(out, idx),
1368 Encoder::Int(e) => e.encode(out, idx),
1369 Encoder::Long(e) => e.encode(out, idx),
1370 Encoder::TimestampMicros(e) => e.encode(out, idx),
1371 Encoder::TimestampMillis(e) => e.encode(out, idx),
1372 Encoder::TimestampNanos(e) => e.encode(out, idx),
1373 #[cfg(not(feature = "avro_custom_types"))]
1374 Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
1375 Encoder::Date32(e) => e.encode(out, idx),
1376 Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
1377 Encoder::Time32Millis(e) => e.encode(out, idx),
1378 Encoder::Time64Micros(e) => e.encode(out, idx),
1379 Encoder::DurationSeconds(e) => e.encode(out, idx),
1380 Encoder::DurationMicros(e) => e.encode(out, idx),
1381 Encoder::DurationMillis(e) => e.encode(out, idx),
1382 Encoder::DurationNanos(e) => e.encode(out, idx),
1383 Encoder::Float32(e) => e.encode(out, idx),
1384 Encoder::Float64(e) => e.encode(out, idx),
1385 Encoder::Binary(e) => e.encode(out, idx),
1386 Encoder::LargeBinary(e) => e.encode(out, idx),
1387 Encoder::Utf8(e) => e.encode(out, idx),
1388 Encoder::Utf8Large(e) => e.encode(out, idx),
1389 Encoder::Utf8View(e) => e.encode(out, idx),
1390 Encoder::BinaryView(e) => e.encode(out, idx),
1391 Encoder::List(e) => e.encode(out, idx),
1392 Encoder::LargeList(e) => e.encode(out, idx),
1393 Encoder::ListView(e) => e.encode(out, idx),
1394 Encoder::LargeListView(e) => e.encode(out, idx),
1395 Encoder::FixedSizeList(e) => e.encode(out, idx),
1396 Encoder::Struct(e) => e.encode(out, idx),
1397 Encoder::Fixed(e) => (e).encode(out, idx),
1398 Encoder::Uuid(e) => (e).encode(out, idx),
1399 Encoder::IntervalMonthDayNanoDuration(e) => e.encode(out, idx),
1400 #[cfg(feature = "avro_custom_types")]
1401 Encoder::IntervalMonthDayNanoFixed(e) => e.encode(out, idx),
1402 Encoder::IntervalYearMonthDuration(e) => e.encode(out, idx),
1403 Encoder::IntervalDayTimeDuration(e) => e.encode(out, idx),
1404 #[cfg(feature = "small_decimals")]
1405 Encoder::Decimal32(e) => (e).encode(out, idx),
1406 #[cfg(feature = "small_decimals")]
1407 Encoder::Decimal64(e) => (e).encode(out, idx),
1408 Encoder::Decimal128(e) => (e).encode(out, idx),
1409 Encoder::Decimal256(e) => (e).encode(out, idx),
1410 Encoder::Map(e) => (e).encode(out, idx),
1411 Encoder::Enum(e) => (e).encode(out, idx),
1412 Encoder::Union(e) => (e).encode(out, idx),
1413 Encoder::RunEncoded16(e) => (e).encode(out, idx),
1414 Encoder::RunEncoded32(e) => (e).encode(out, idx),
1415 Encoder::RunEncoded64(e) => (e).encode(out, idx),
1416 Encoder::Null => Ok(()),
1417 #[cfg(feature = "avro_custom_types")]
1418 Encoder::Int8(e) => e.encode(out, idx),
1419 #[cfg(feature = "avro_custom_types")]
1420 Encoder::Int16(e) => e.encode(out, idx),
1421 #[cfg(feature = "avro_custom_types")]
1422 Encoder::UInt8(e) => e.encode(out, idx),
1423 #[cfg(feature = "avro_custom_types")]
1424 Encoder::UInt16(e) => e.encode(out, idx),
1425 #[cfg(feature = "avro_custom_types")]
1426 Encoder::UInt32(e) => e.encode(out, idx),
1427 #[cfg(feature = "avro_custom_types")]
1428 Encoder::UInt64Fixed(e) => e.encode(out, idx),
1429 #[cfg(feature = "avro_custom_types")]
1430 Encoder::Float16Fixed(e) => e.encode(out, idx),
1431 #[cfg(feature = "avro_custom_types")]
1432 Encoder::Date64(e) => e.encode(out, idx),
1433 #[cfg(feature = "avro_custom_types")]
1434 Encoder::Time64Nanos(e) => e.encode(out, idx),
1435 #[cfg(feature = "avro_custom_types")]
1436 Encoder::Time32Secs(e) => e.encode(out, idx),
1437 #[cfg(feature = "avro_custom_types")]
1438 Encoder::TimestampSecs(e) => e.encode(out, idx),
1439 #[cfg(feature = "avro_custom_types")]
1440 Encoder::IntervalYearMonthFixed(e) => e.encode(out, idx),
1441 #[cfg(feature = "avro_custom_types")]
1442 Encoder::IntervalDayTimeFixed(e) => e.encode(out, idx),
1443 #[cfg(not(feature = "avro_custom_types"))]
1444 Encoder::Int8ToInt(e) => e.encode(out, idx),
1445 #[cfg(not(feature = "avro_custom_types"))]
1446 Encoder::Int16ToInt(e) => e.encode(out, idx),
1447 #[cfg(not(feature = "avro_custom_types"))]
1448 Encoder::UInt8ToInt(e) => e.encode(out, idx),
1449 #[cfg(not(feature = "avro_custom_types"))]
1450 Encoder::UInt16ToInt(e) => e.encode(out, idx),
1451 #[cfg(not(feature = "avro_custom_types"))]
1452 Encoder::UInt32ToLong(e) => e.encode(out, idx),
1453 #[cfg(not(feature = "avro_custom_types"))]
1454 Encoder::UInt64ToLong(e) => e.encode(out, idx),
1455 #[cfg(not(feature = "avro_custom_types"))]
1456 Encoder::Float16ToFloat(e) => e.encode(out, idx),
1457 #[cfg(not(feature = "avro_custom_types"))]
1458 Encoder::Date64ToLong(e) => e.encode(out, idx),
1459 #[cfg(not(feature = "avro_custom_types"))]
1460 Encoder::Time64NanosToMicros(e) => e.encode(out, idx),
1461 }
1462 }
1463}
1464
1465struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
1466impl BooleanEncoder<'_> {
1467 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1468 write_bool(out, self.0.value(idx))
1469 }
1470}
1471
1472struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
1474impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
1475 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1476 write_int(out, self.0.value(idx))
1477 }
1478}
1479
1480struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
1482impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
1483 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1484 write_long(out, self.0.value(idx))
1485 }
1486}
1487
1488struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
1490impl<'a> Time32SecondsToMillisEncoder<'a> {
1491 #[inline]
1492 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1493 let secs = self.0.value(idx);
1494 let millis = secs
1495 .checked_mul(1000)
1496 .ok_or_else(|| AvroError::InvalidArgument("time32(secs) * 1000 overflowed".into()))?;
1497 write_int(out, millis)
1498 }
1499}
1500
1501#[cfg(not(feature = "avro_custom_types"))]
1503struct TimestampSecondsToMillisEncoder<'a>(&'a PrimitiveArray<TimestampSecondType>);
1504#[cfg(not(feature = "avro_custom_types"))]
1505impl<'a> TimestampSecondsToMillisEncoder<'a> {
1506 #[inline]
1507 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1508 let secs = self.0.value(idx);
1509 let millis = secs.checked_mul(1000).ok_or_else(|| {
1510 AvroError::InvalidArgument("timestamp(secs) * 1000 overflowed".into())
1511 })?;
1512 write_long(out, millis)
1513 }
1514}
1515
1516#[cfg(feature = "avro_custom_types")]
1518struct Int8Encoder<'a>(&'a PrimitiveArray<Int8Type>);
1519#[cfg(feature = "avro_custom_types")]
1520impl Int8Encoder<'_> {
1521 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1522 write_int(out, self.0.value(idx) as i32)
1523 }
1524}
1525
1526#[cfg(feature = "avro_custom_types")]
1528struct Int16Encoder<'a>(&'a PrimitiveArray<Int16Type>);
1529#[cfg(feature = "avro_custom_types")]
1530impl Int16Encoder<'_> {
1531 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1532 write_int(out, self.0.value(idx) as i32)
1533 }
1534}
1535
1536#[cfg(feature = "avro_custom_types")]
1538struct UInt8Encoder<'a>(&'a PrimitiveArray<UInt8Type>);
1539#[cfg(feature = "avro_custom_types")]
1540impl UInt8Encoder<'_> {
1541 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1542 write_int(out, self.0.value(idx) as i32)
1543 }
1544}
1545
1546#[cfg(feature = "avro_custom_types")]
1548struct UInt16Encoder<'a>(&'a PrimitiveArray<UInt16Type>);
1549#[cfg(feature = "avro_custom_types")]
1550impl UInt16Encoder<'_> {
1551 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1552 write_int(out, self.0.value(idx) as i32)
1553 }
1554}
1555
1556#[cfg(feature = "avro_custom_types")]
1558struct UInt32Encoder<'a>(&'a PrimitiveArray<UInt32Type>);
1559#[cfg(feature = "avro_custom_types")]
1560impl UInt32Encoder<'_> {
1561 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1562 write_long(out, self.0.value(idx) as i64)
1563 }
1564}
1565
1566#[cfg(feature = "avro_custom_types")]
1568struct UInt64FixedEncoder<'a>(&'a PrimitiveArray<UInt64Type>);
1569#[cfg(feature = "avro_custom_types")]
1570impl UInt64FixedEncoder<'_> {
1571 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1572 let v = self.0.value(idx);
1573 out.write_all(&v.to_le_bytes())?;
1574 Ok(())
1575 }
1576}
1577
1578#[cfg(feature = "avro_custom_types")]
1580struct Float16FixedEncoder<'a>(&'a Float16Array);
1581#[cfg(feature = "avro_custom_types")]
1582impl Float16FixedEncoder<'_> {
1583 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1584 let v = self.0.value(idx);
1585 out.write_all(&v.to_le_bytes())?;
1586 Ok(())
1587 }
1588}
1589
1590#[cfg(feature = "avro_custom_types")]
1598struct IntervalMonthDayNanoFixedEncoder<'a>(&'a PrimitiveArray<IntervalMonthDayNanoType>);
1599#[cfg(feature = "avro_custom_types")]
1600impl IntervalMonthDayNanoFixedEncoder<'_> {
1601 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1602 let v = self.0.value(idx);
1603 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(v);
1604 out.write_all(&months.to_le_bytes())?;
1605 out.write_all(&days.to_le_bytes())?;
1606 out.write_all(&nanos.to_le_bytes())?;
1607 Ok(())
1608 }
1609}
1610
1611#[cfg(feature = "avro_custom_types")]
1613struct IntervalYearMonthFixedEncoder<'a>(&'a PrimitiveArray<IntervalYearMonthType>);
1614#[cfg(feature = "avro_custom_types")]
1615impl IntervalYearMonthFixedEncoder<'_> {
1616 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1617 let months = self.0.value(idx);
1618 out.write_all(&months.to_le_bytes())?;
1619 Ok(())
1620 }
1621}
1622
1623#[cfg(feature = "avro_custom_types")]
1625struct IntervalDayTimeFixedEncoder<'a>(&'a PrimitiveArray<IntervalDayTimeType>);
1626#[cfg(feature = "avro_custom_types")]
1627impl IntervalDayTimeFixedEncoder<'_> {
1628 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1629 let dt = self.0.value(idx);
1630 out.write_all(&dt.days.to_le_bytes())?;
1631 out.write_all(&dt.milliseconds.to_le_bytes())?;
1632 Ok(())
1633 }
1634}
1635
1636#[cfg(not(feature = "avro_custom_types"))]
1638struct Int8ToIntEncoder<'a>(&'a PrimitiveArray<Int8Type>);
1639#[cfg(not(feature = "avro_custom_types"))]
1640impl Int8ToIntEncoder<'_> {
1641 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1642 write_int(out, self.0.value(idx) as i32)
1643 }
1644}
1645
1646#[cfg(not(feature = "avro_custom_types"))]
1648struct Int16ToIntEncoder<'a>(&'a PrimitiveArray<Int16Type>);
1649#[cfg(not(feature = "avro_custom_types"))]
1650impl Int16ToIntEncoder<'_> {
1651 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1652 write_int(out, self.0.value(idx) as i32)
1653 }
1654}
1655
1656#[cfg(not(feature = "avro_custom_types"))]
1658struct UInt8ToIntEncoder<'a>(&'a PrimitiveArray<UInt8Type>);
1659#[cfg(not(feature = "avro_custom_types"))]
1660impl UInt8ToIntEncoder<'_> {
1661 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1662 write_int(out, self.0.value(idx) as i32)
1663 }
1664}
1665
1666#[cfg(not(feature = "avro_custom_types"))]
1668struct UInt16ToIntEncoder<'a>(&'a PrimitiveArray<UInt16Type>);
1669#[cfg(not(feature = "avro_custom_types"))]
1670impl UInt16ToIntEncoder<'_> {
1671 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1672 write_int(out, self.0.value(idx) as i32)
1673 }
1674}
1675
1676#[cfg(not(feature = "avro_custom_types"))]
1678struct UInt32ToLongEncoder<'a>(&'a PrimitiveArray<UInt32Type>);
1679#[cfg(not(feature = "avro_custom_types"))]
1680impl UInt32ToLongEncoder<'_> {
1681 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1682 write_long(out, self.0.value(idx) as i64)
1683 }
1684}
1685
1686#[cfg(not(feature = "avro_custom_types"))]
1688struct UInt64ToLongEncoder<'a>(&'a PrimitiveArray<UInt64Type>);
1689#[cfg(not(feature = "avro_custom_types"))]
1690impl UInt64ToLongEncoder<'_> {
1691 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1692 let v = self.0.value(idx);
1693 if v > i64::MAX as u64 {
1694 return Err(AvroError::InvalidArgument(format!(
1695 "UInt64 value {v} exceeds i64::MAX; enable avro_custom_types feature for full UInt64 support",
1696 )));
1697 }
1698 write_long(out, v as i64)
1699 }
1700}
1701
1702#[cfg(not(feature = "avro_custom_types"))]
1704struct Float16ToFloatEncoder<'a>(&'a Float16Array);
1705#[cfg(not(feature = "avro_custom_types"))]
1706impl Float16ToFloatEncoder<'_> {
1707 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1708 out.write_all(&self.0.value(idx).to_f32().to_bits().to_le_bytes())?;
1709 Ok(())
1710 }
1711}
1712
1713#[cfg(not(feature = "avro_custom_types"))]
1715struct Date64ToLongEncoder<'a>(&'a PrimitiveArray<Date64Type>);
1716#[cfg(not(feature = "avro_custom_types"))]
1717impl Date64ToLongEncoder<'_> {
1718 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1719 write_long(out, self.0.value(idx))
1720 }
1721}
1722
1723#[cfg(not(feature = "avro_custom_types"))]
1725struct Time64NanosToMicrosEncoder<'a>(&'a PrimitiveArray<Time64NanosecondType>);
1726#[cfg(not(feature = "avro_custom_types"))]
1727impl Time64NanosToMicrosEncoder<'_> {
1728 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1729 let nanos = self.0.value(idx);
1730 let micros = nanos / 1000;
1731 write_long(out, micros)
1732 }
1733}
1734
1735struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
1737impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
1738 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1739 write_len_prefixed(out, self.0.value(idx))
1740 }
1741}
1742
1743struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
1745impl BinaryViewEncoder<'_> {
1746 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1747 write_len_prefixed(out, self.0.value(idx))
1748 }
1749}
1750
1751struct Utf8ViewEncoder<'a>(&'a StringViewArray);
1753impl Utf8ViewEncoder<'_> {
1754 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1755 write_len_prefixed(out, self.0.value(idx).as_bytes())
1756 }
1757}
1758
1759struct F32Encoder<'a>(&'a arrow_array::Float32Array);
1760impl F32Encoder<'_> {
1761 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1762 out.write_all(&self.0.value(idx).to_bits().to_le_bytes())?;
1764 Ok(())
1765 }
1766}
1767
1768struct F64Encoder<'a>(&'a arrow_array::Float64Array);
1769impl F64Encoder<'_> {
1770 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1771 out.write_all(&self.0.value(idx).to_bits().to_le_bytes())
1773 .map_err(Into::into)
1774 }
1775}
1776
1777struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
1778
1779impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
1780 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1781 write_len_prefixed(out, self.0.value(idx).as_bytes())
1782 }
1783}
1784
1785type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
1786type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
1787
1788enum KeyKind<'a> {
1790 Utf8(&'a GenericStringArray<i32>),
1791 LargeUtf8(&'a GenericStringArray<i64>),
1792}
1793struct MapEncoder<'a> {
1794 map: &'a MapArray,
1795 keys: KeyKind<'a>,
1796 values: FieldEncoder<'a>,
1797 keys_offset: usize,
1798 values_offset: usize,
1799}
1800
1801impl<'a> MapEncoder<'a> {
1802 fn try_new(
1803 map: &'a MapArray,
1804 values_nullability: Option<Nullability>,
1805 value_plan: &FieldPlan,
1806 ) -> Result<Self, AvroError> {
1807 let keys_arr = map.keys();
1808 let keys_kind = match keys_arr.data_type() {
1809 DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
1810 DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
1811 other => {
1812 return Err(AvroError::SchemaError(format!(
1813 "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
1814 )));
1815 }
1816 };
1817 Ok(Self {
1818 map,
1819 keys: keys_kind,
1820 values: FieldEncoder::make_encoder(
1821 map.values().as_ref(),
1822 value_plan,
1823 values_nullability,
1824 )?,
1825 keys_offset: keys_arr.offset(),
1826 values_offset: map.values().offset(),
1827 })
1828 }
1829
1830 fn encode_map_entries<W, O>(
1831 out: &mut W,
1832 keys: &GenericStringArray<O>,
1833 keys_offset: usize,
1834 start: usize,
1835 end: usize,
1836 mut write_item: impl FnMut(&mut W, usize) -> Result<(), AvroError>,
1837 ) -> Result<(), AvroError>
1838 where
1839 W: Write + ?Sized,
1840 O: OffsetSizeTrait,
1841 {
1842 encode_blocked_range(out, start, end, |out, j| {
1843 let j_key = j.saturating_sub(keys_offset);
1844 write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1845 write_item(out, j)
1846 })
1847 }
1848
1849 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1850 let offsets = self.map.offsets();
1851 let start = offsets[idx] as usize;
1852 let end = offsets[idx + 1] as usize;
1853 let write_item = |out: &mut W, j: usize| {
1854 let j_val = j.saturating_sub(self.values_offset);
1855 self.values.encode(out, j_val)
1856 };
1857 match self.keys {
1858 KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1859 out,
1860 arr,
1861 self.keys_offset,
1862 start,
1863 end,
1864 write_item,
1865 ),
1866 KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1867 out,
1868 arr,
1869 self.keys_offset,
1870 start,
1871 end,
1872 write_item,
1873 ),
1874 }
1875 }
1876}
1877
1878struct EnumEncoder<'a> {
1885 keys: &'a PrimitiveArray<Int32Type>,
1886}
1887impl EnumEncoder<'_> {
1888 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), AvroError> {
1889 write_int(out, self.keys.value(row))
1890 }
1891}
1892
1893struct UnionEncoder<'a> {
1894 encoders: Vec<FieldEncoder<'a>>,
1895 array: &'a UnionArray,
1896 type_id_to_encoder_index: Vec<Option<usize>>,
1897}
1898
1899impl<'a> UnionEncoder<'a> {
1900 fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result<Self, AvroError> {
1901 let DataType::Union(fields, UnionMode::Dense) = array.data_type() else {
1902 return Err(AvroError::SchemaError("Expected Dense UnionArray".into()));
1903 };
1904 if fields.len() != field_bindings.len() {
1905 return Err(AvroError::SchemaError(format!(
1906 "Mismatched number of union branches between Arrow array ({}) and encoding plan ({})",
1907 fields.len(),
1908 field_bindings.len()
1909 )));
1910 }
1911 let max_type_id = fields.iter().map(|(tid, _)| tid).max().unwrap_or(0);
1912 let mut type_id_to_encoder_index: Vec<Option<usize>> =
1913 vec![None; (max_type_id + 1) as usize];
1914 let mut encoders = Vec::with_capacity(fields.len());
1915 for (i, (type_id, _)) in fields.iter().enumerate() {
1916 let binding = field_bindings
1917 .get(i)
1918 .ok_or_else(|| AvroError::SchemaError("Binding and field mismatch".to_string()))?;
1919 encoders.push(FieldEncoder::make_encoder(
1920 array.child(type_id).as_ref(),
1921 &binding.plan,
1922 binding.nullability,
1923 )?);
1924 type_id_to_encoder_index[type_id as usize] = Some(i);
1925 }
1926 Ok(Self {
1927 encoders,
1928 array,
1929 type_id_to_encoder_index,
1930 })
1931 }
1932
1933 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1934 let type_id = self.array.type_ids()[idx];
1939 let encoder_index = self
1940 .type_id_to_encoder_index
1941 .get(type_id as usize)
1942 .and_then(|opt| *opt)
1943 .ok_or_else(|| AvroError::SchemaError(format!("Invalid type_id {type_id}")))?;
1944 write_int(out, encoder_index as i32)?;
1945 let encoder = self.encoders.get_mut(encoder_index).ok_or_else(|| {
1946 AvroError::SchemaError(format!("Invalid encoder index {encoder_index}"))
1947 })?;
1948 encoder.encode(out, self.array.value_offset(idx))
1949 }
1950}
1951
1952struct StructEncoder<'a> {
1953 encoders: Vec<FieldEncoder<'a>>,
1954}
1955
1956impl<'a> StructEncoder<'a> {
1957 fn try_new(array: &'a StructArray, field_bindings: &[FieldBinding]) -> Result<Self, AvroError> {
1958 let mut encoders = Vec::with_capacity(field_bindings.len());
1959 for field_binding in field_bindings {
1960 let idx = field_binding.arrow_index;
1961 let column = array.columns().get(idx).ok_or_else(|| {
1962 AvroError::SchemaError(format!("Struct child index {idx} out of range"))
1963 })?;
1964 let encoder = FieldEncoder::make_encoder(
1965 column.as_ref(),
1966 &field_binding.plan,
1967 field_binding.nullability,
1968 )?;
1969 encoders.push(encoder);
1970 }
1971 Ok(Self { encoders })
1972 }
1973
1974 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1975 for encoder in self.encoders.iter_mut() {
1976 encoder.encode(out, idx)?;
1977 }
1978 Ok(())
1979 }
1980}
1981
1982fn encode_blocked_range<W: Write + ?Sized, F>(
1986 out: &mut W,
1987 start: usize,
1988 end: usize,
1989 mut write_item: F,
1990) -> Result<(), AvroError>
1991where
1992 F: FnMut(&mut W, usize) -> Result<(), AvroError>,
1993{
1994 let len = end.saturating_sub(start);
1995 if len == 0 {
1996 write_long(out, 0)?;
1998 return Ok(());
1999 }
2000 write_long(out, len as i64)?;
2002 for row in start..end {
2003 write_item(out, row)?;
2004 }
2005 write_long(out, 0)?;
2006 Ok(())
2007}
2008
2009struct ListEncoder<'a, O: OffsetSizeTrait> {
2010 list: &'a GenericListArray<O>,
2011 values: FieldEncoder<'a>,
2012 values_offset: usize,
2013}
2014
2015type ListEncoder32<'a> = ListEncoder<'a, i32>;
2016type ListEncoder64<'a> = ListEncoder<'a, i64>;
2017
2018impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
2019 fn try_new(
2020 list: &'a GenericListArray<O>,
2021 items_nullability: Option<Nullability>,
2022 item_plan: &FieldPlan,
2023 ) -> Result<Self, AvroError> {
2024 Ok(Self {
2025 list,
2026 values: FieldEncoder::make_encoder(
2027 list.values().as_ref(),
2028 item_plan,
2029 items_nullability,
2030 )?,
2031 values_offset: list.values().offset(),
2032 })
2033 }
2034
2035 fn encode_list_range<W: Write + ?Sized>(
2036 &mut self,
2037 out: &mut W,
2038 start: usize,
2039 end: usize,
2040 ) -> Result<(), AvroError> {
2041 encode_blocked_range(out, start, end, |out, row| {
2042 self.values
2043 .encode(out, row.saturating_sub(self.values_offset))
2044 })
2045 }
2046
2047 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2048 let offsets = self.list.offsets();
2049 let start = offsets[idx].to_usize().ok_or_else(|| {
2050 AvroError::InvalidArgument(format!("Error converting offset[{idx}] to usize"))
2051 })?;
2052 let end = offsets[idx + 1].to_usize().ok_or_else(|| {
2053 AvroError::InvalidArgument(format!("Error converting offset[{}] to usize", idx + 1))
2054 })?;
2055 self.encode_list_range(out, start, end)
2056 }
2057}
2058
2059struct ListViewEncoder<'a, O: OffsetSizeTrait> {
2061 list: &'a GenericListViewArray<O>,
2062 values: FieldEncoder<'a>,
2063 values_offset: usize,
2064}
2065type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
2066type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
2067
2068impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
2069 fn try_new(
2070 list: &'a GenericListViewArray<O>,
2071 items_nullability: Option<Nullability>,
2072 item_plan: &FieldPlan,
2073 ) -> Result<Self, AvroError> {
2074 Ok(Self {
2075 list,
2076 values: FieldEncoder::make_encoder(
2077 list.values().as_ref(),
2078 item_plan,
2079 items_nullability,
2080 )?,
2081 values_offset: list.values().offset(),
2082 })
2083 }
2084
2085 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2086 let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
2087 AvroError::InvalidArgument(format!("Error converting value_offset[{idx}] to usize"))
2088 })?;
2089 let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
2090 AvroError::InvalidArgument(format!("Error converting value_size[{idx}] to usize"))
2091 })?;
2092 let start = start + self.values_offset;
2093 let end = start + len;
2094 encode_blocked_range(out, start, end, |out, row| {
2095 self.values
2096 .encode(out, row.saturating_sub(self.values_offset))
2097 })
2098 }
2099}
2100
2101struct FixedSizeListEncoder<'a> {
2103 list: &'a FixedSizeListArray,
2104 values: FieldEncoder<'a>,
2105 values_offset: usize,
2106 elem_len: usize,
2107}
2108
2109impl<'a> FixedSizeListEncoder<'a> {
2110 fn try_new(
2111 list: &'a FixedSizeListArray,
2112 items_nullability: Option<Nullability>,
2113 item_plan: &FieldPlan,
2114 ) -> Result<Self, AvroError> {
2115 Ok(Self {
2116 list,
2117 values: FieldEncoder::make_encoder(
2118 list.values().as_ref(),
2119 item_plan,
2120 items_nullability,
2121 )?,
2122 values_offset: list.values().offset(),
2123 elem_len: list.value_length() as usize,
2124 })
2125 }
2126
2127 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2128 let rel = self.list.value_offset(idx) as usize;
2130 let start = self.values_offset + rel;
2131 let end = start + self.elem_len;
2132 encode_blocked_range(out, start, end, |out, row| {
2133 self.values
2134 .encode(out, row.saturating_sub(self.values_offset))
2135 })
2136 }
2137}
2138
2139struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
2142impl FixedEncoder<'_> {
2143 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2144 let v = self.0.value(idx); out.write_all(v)?;
2146 Ok(())
2147 }
2148}
2149
2150struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
2153impl UuidEncoder<'_> {
2154 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2155 let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
2156 buf[0] = 0x48;
2157 let v = self.0.value(idx);
2158 let u = Uuid::from_slice(v)
2159 .map_err(|e| AvroError::InvalidArgument(format!("Invalid UUID bytes: {e}")))?;
2160 let _ = u.hyphenated().encode_lower(&mut buf[1..]);
2161 out.write_all(&buf)?;
2162 Ok(())
2163 }
2164}
2165
2166#[derive(Copy, Clone)]
2167struct DurationParts {
2168 months: u32,
2169 days: u32,
2170 millis: u32,
2171}
2172trait IntervalToDurationParts: ArrowPrimitiveType {
2174 fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError>;
2175}
2176impl IntervalToDurationParts for IntervalMonthDayNanoType {
2177 fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
2178 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
2179 if months < 0 || days < 0 || nanos < 0 {
2180 return Err(AvroError::InvalidArgument(
2181 "Avro 'duration' cannot encode negative months/days/nanoseconds; enable `avro_custom_types` to round-trip signed Arrow intervals".into(),
2182 ));
2183 }
2184 if nanos % 1_000_000 != 0 {
2185 return Err(AvroError::InvalidArgument(
2186 "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000 (enable `avro_custom_types` to preserve nanosecond intervals)"
2187 .into(),
2188 ));
2189 }
2190 let millis = nanos / 1_000_000;
2191 if millis > u32::MAX as i64 {
2192 return Err(AvroError::InvalidArgument(
2193 "Avro 'duration' milliseconds exceed u32::MAX; enable `avro_custom_types` to preserve full Arrow Interval(MonthDayNano) range".into(),
2194 ));
2195 }
2196 Ok(DurationParts {
2197 months: months as u32,
2198 days: days as u32,
2199 millis: millis as u32,
2200 })
2201 }
2202}
2203impl IntervalToDurationParts for IntervalYearMonthType {
2204 fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
2205 if native < 0 {
2206 return Err(AvroError::InvalidArgument(
2207 "Avro 'duration' cannot encode negative months; enable `avro_custom_types` to round-trip signed Arrow Interval(YearMonth)".into(),
2208 ));
2209 }
2210 Ok(DurationParts {
2211 months: native as u32,
2212 days: 0,
2213 millis: 0,
2214 })
2215 }
2216}
2217impl IntervalToDurationParts for IntervalDayTimeType {
2218 fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
2219 let (days, millis) = IntervalDayTimeType::to_parts(native);
2220 if days < 0 || millis < 0 {
2221 return Err(AvroError::InvalidArgument(
2222 "Avro 'duration' cannot encode negative days or milliseconds; enable `avro_custom_types` to round-trip signed Arrow Interval(DayTime)".into(),
2223 ));
2224 }
2225 Ok(DurationParts {
2226 months: 0,
2227 days: days as u32,
2228 millis: millis as u32,
2229 })
2230 }
2231}
2232
2233struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
2236impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
2237 #[inline(always)]
2238 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2239 let parts = P::duration_parts(self.0.value(idx))?;
2240 let months = parts.months.to_le_bytes();
2241 let days = parts.days.to_le_bytes();
2242 let ms = parts.millis.to_le_bytes();
2243 let buf = [
2259 months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
2260 ms[1], ms[2], ms[3],
2261 ];
2262 out.write_all(&buf)?;
2263 Ok(())
2264 }
2265}
2266
2267trait DecimalBeBytes<const N: usize> {
2270 fn value_be_bytes(&self, idx: usize) -> [u8; N];
2271}
2272#[cfg(feature = "small_decimals")]
2273impl DecimalBeBytes<4> for Decimal32Array {
2274 fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
2275 self.value(idx).to_be_bytes()
2276 }
2277}
2278#[cfg(feature = "small_decimals")]
2279impl DecimalBeBytes<8> for Decimal64Array {
2280 fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
2281 self.value(idx).to_be_bytes()
2282 }
2283}
2284impl DecimalBeBytes<16> for Decimal128Array {
2285 fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
2286 self.value(idx).to_be_bytes()
2287 }
2288}
2289impl DecimalBeBytes<32> for Decimal256Array {
2290 fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
2291 self.value(idx).to_be_bytes()
2293 }
2294}
2295
2296struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
2302 arr: &'a A,
2303 fixed_size: Option<usize>,
2304}
2305
2306impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
2307 fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
2308 Self { arr, fixed_size }
2309 }
2310
2311 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2312 let be = self.arr.value_be_bytes(idx);
2313 match self.fixed_size {
2314 Some(n) => write_sign_extended(out, &be, n),
2315 None => write_len_prefixed(out, minimal_twos_complement(&be)),
2316 }
2317 }
2318}
2319
2320#[cfg(feature = "small_decimals")]
2321type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
2322#[cfg(feature = "small_decimals")]
2323type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
2324type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
2325type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
2326
2327struct RunEncodedEncoder<'a, R: RunEndIndexType> {
2331 ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
2332 base: usize,
2333 len: usize,
2334 values: FieldEncoder<'a>,
2335 cur_run: usize,
2337 cur_end: usize,
2339}
2340
2341type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
2342type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
2343type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
2344
2345impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
2346 fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
2347 let ends = arr.run_ends();
2348 let base = ends.get_start_physical_index();
2349 let slice = ends.values();
2350 let len = ends.len();
2351 let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
2352 Self {
2353 ends_slice: slice,
2354 base,
2355 len,
2356 values,
2357 cur_run: 0,
2358 cur_end,
2359 }
2360 }
2361
2362 #[inline(always)]
2365 fn advance_to_row(&mut self, idx: usize) -> Result<(), AvroError> {
2366 if idx < self.cur_end {
2367 return Ok(());
2368 }
2369 while self.cur_run + 1 < self.len && idx >= self.cur_end {
2371 self.cur_run += 1;
2372 self.cur_end = self.ends_slice[self.base + self.cur_run].as_usize();
2373 }
2374 if idx < self.cur_end {
2375 Ok(())
2376 } else {
2377 Err(AvroError::InvalidArgument(format!(
2378 "row index {idx} out of bounds for run-ends ({} runs)",
2379 self.len
2380 )))
2381 }
2382 }
2383
2384 #[inline(always)]
2385 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2386 self.advance_to_row(idx)?;
2387 self.values.encode(out, self.cur_run)
2390 }
2391}
2392
2393#[cfg(test)]
2394mod tests {
2395 use super::*;
2396 use arrow_array::types::Int32Type;
2397 use arrow_array::{
2398 Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
2399 Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, NullArray,
2400 StringArray,
2401 };
2402 use arrow_buffer::Buffer;
2403 use arrow_schema::{DataType, Field, Fields, UnionFields};
2404
2405 fn zigzag_i64(v: i64) -> u64 {
2406 ((v << 1) ^ (v >> 63)) as u64
2407 }
2408
2409 fn varint(mut x: u64) -> Vec<u8> {
2410 let mut out = Vec::new();
2411 while (x & !0x7f) != 0 {
2412 out.push(((x & 0x7f) as u8) | 0x80);
2413 x >>= 7;
2414 }
2415 out.push((x & 0x7f) as u8);
2416 out
2417 }
2418
2419 fn avro_long_bytes(v: i64) -> Vec<u8> {
2420 varint(zigzag_i64(v))
2421 }
2422
2423 fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
2424 let mut out = avro_long_bytes(payload.len() as i64);
2425 out.extend_from_slice(payload);
2426 out
2427 }
2428
2429 fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
2430 let m = months.to_le_bytes();
2431 let d = days.to_le_bytes();
2432 let ms = millis.to_le_bytes();
2433 [
2434 m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
2435 ]
2436 }
2437
2438 #[cfg(feature = "avro_custom_types")]
2439 fn interval_mdn_fixed16(months: i32, days: i32, nanos: i64) -> [u8; 16] {
2440 let m = months.to_le_bytes();
2441 let d = days.to_le_bytes();
2442 let n = nanos.to_le_bytes();
2443 [
2444 m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], n[0], n[1], n[2], n[3], n[4], n[5],
2445 n[6], n[7],
2446 ]
2447 }
2448
2449 fn encode_all(
2450 array: &dyn Array,
2451 plan: &FieldPlan,
2452 nullability: Option<Nullability>,
2453 ) -> Vec<u8> {
2454 let mut enc = FieldEncoder::make_encoder(array, plan, nullability).unwrap();
2455 let mut out = Vec::new();
2456 for i in 0..array.len() {
2457 enc.encode(&mut out, i).unwrap();
2458 }
2459 out
2460 }
2461
2462 fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
2463 if actual != expected {
2464 let to_hex = |b: &[u8]| {
2465 b.iter()
2466 .map(|x| format!("{:02X}", x))
2467 .collect::<Vec<_>>()
2468 .join(" ")
2469 };
2470 panic!(
2471 "mismatch\n expected: [{}]\n actual: [{}]",
2472 to_hex(expected),
2473 to_hex(actual)
2474 );
2475 }
2476 }
2477
2478 fn row_slice<'a>(buf: &'a [u8], offsets: &[usize], row: usize) -> &'a [u8] {
2479 let start = offsets[row];
2480 let end = offsets[row + 1];
2481 &buf[start..end]
2482 }
2483
2484 #[test]
2485 fn binary_encoder() {
2486 let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
2487 let arr = BinaryArray::from_vec(values);
2488 let mut expected = Vec::new();
2489 for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
2490 expected.extend(avro_len_prefixed_bytes(payload));
2491 }
2492 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2493 assert_bytes_eq(&got, &expected);
2494 }
2495
2496 #[test]
2497 fn large_binary_encoder() {
2498 let values: Vec<&[u8]> = vec![b"xyz", b""];
2499 let arr = LargeBinaryArray::from_vec(values);
2500 let mut expected = Vec::new();
2501 for payload in [b"xyz" as &[u8], b""] {
2502 expected.extend(avro_len_prefixed_bytes(payload));
2503 }
2504 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2505 assert_bytes_eq(&got, &expected);
2506 }
2507
2508 #[test]
2509 fn utf8_encoder() {
2510 let arr = StringArray::from(vec!["", "A", "BC"]);
2511 let mut expected = Vec::new();
2512 for s in ["", "A", "BC"] {
2513 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2514 }
2515 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2516 assert_bytes_eq(&got, &expected);
2517 }
2518
2519 #[test]
2520 fn large_utf8_encoder() {
2521 let arr = LargeStringArray::from(vec!["hello", ""]);
2522 let mut expected = Vec::new();
2523 for s in ["hello", ""] {
2524 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2525 }
2526 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2527 assert_bytes_eq(&got, &expected);
2528 }
2529
2530 #[test]
2531 fn list_encoder_int32() {
2532 let values = Int32Array::from(vec![1, 2, 3]);
2534 let offsets = vec![0, 2, 2, 3];
2535 let list = ListArray::new(
2536 Field::new("item", DataType::Int32, true).into(),
2537 arrow_buffer::OffsetBuffer::new(offsets.into()),
2538 Arc::new(values) as ArrayRef,
2539 None,
2540 );
2541 let mut expected = Vec::new();
2543 expected.extend(avro_long_bytes(2));
2545 expected.extend(avro_long_bytes(1));
2546 expected.extend(avro_long_bytes(2));
2547 expected.extend(avro_long_bytes(0));
2548 expected.extend(avro_long_bytes(0));
2550 expected.extend(avro_long_bytes(1));
2552 expected.extend(avro_long_bytes(3));
2553 expected.extend(avro_long_bytes(0));
2554
2555 let plan = FieldPlan::List {
2556 items_nullability: None,
2557 item_plan: Box::new(FieldPlan::Scalar),
2558 };
2559 let got = encode_all(&list, &plan, None);
2560 assert_bytes_eq(&got, &expected);
2561 }
2562
2563 #[test]
2564 fn struct_encoder_two_fields() {
2565 let a = Int32Array::from(vec![1, 2]);
2567 let b = StringArray::from(vec!["x", "y"]);
2568 let fields = Fields::from(vec![
2569 Field::new("a", DataType::Int32, true),
2570 Field::new("b", DataType::Utf8, true),
2571 ]);
2572 let struct_arr = StructArray::new(
2573 fields.clone(),
2574 vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2575 None,
2576 );
2577 let plan = FieldPlan::Struct {
2578 bindings: vec![
2579 FieldBinding {
2580 arrow_index: 0,
2581 nullability: None,
2582 plan: FieldPlan::Scalar,
2583 },
2584 FieldBinding {
2585 arrow_index: 1,
2586 nullability: None,
2587 plan: FieldPlan::Scalar,
2588 },
2589 ],
2590 };
2591 let got = encode_all(&struct_arr, &plan, None);
2592 let mut expected = Vec::new();
2594 expected.extend(avro_long_bytes(1)); expected.extend(avro_len_prefixed_bytes(b"x")); expected.extend(avro_long_bytes(2)); expected.extend(avro_len_prefixed_bytes(b"y")); assert_bytes_eq(&got, &expected);
2599 }
2600
2601 #[test]
2602 fn enum_encoder_dictionary() {
2603 let dict_values = StringArray::from(vec!["A", "B", "C"]);
2605 let keys = Int32Array::from(vec![2, 0, 1]);
2606 let dict =
2607 DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
2608 let symbols = Arc::<[String]>::from(
2609 vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
2610 );
2611 let plan = FieldPlan::Enum { symbols };
2612 let got = encode_all(&dict, &plan, None);
2613 let mut expected = Vec::new();
2614 expected.extend(avro_long_bytes(2));
2615 expected.extend(avro_long_bytes(0));
2616 expected.extend(avro_long_bytes(1));
2617 assert_bytes_eq(&got, &expected);
2618 }
2619
2620 #[test]
2621 fn decimal_bytes_and_fixed() {
2622 let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
2624 .with_precision_and_scale(20, 0)
2625 .unwrap();
2626 let plan_bytes = FieldPlan::Decimal { size: None };
2628 let got_bytes = encode_all(&dec, &plan_bytes, None);
2629 let mut expected_bytes = Vec::new();
2631 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2632 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2633 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2634 assert_bytes_eq(&got_bytes, &expected_bytes);
2635
2636 let plan_fixed = FieldPlan::Decimal { size: Some(16) };
2637 let got_fixed = encode_all(&dec, &plan_fixed, None);
2638 let mut expected_fixed = Vec::new();
2639 expected_fixed.extend_from_slice(&1i128.to_be_bytes());
2640 expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
2641 expected_fixed.extend_from_slice(&0i128.to_be_bytes());
2642 assert_bytes_eq(&got_fixed, &expected_fixed);
2643 }
2644
2645 #[test]
2646 fn decimal_bytes_256() {
2647 use arrow_buffer::i256;
2648 let dec = Decimal256Array::from(vec![
2650 i256::from_i128(1),
2651 i256::from_i128(-1),
2652 i256::from_i128(0),
2653 ])
2654 .with_precision_and_scale(76, 0)
2655 .unwrap();
2656 let plan_bytes = FieldPlan::Decimal { size: None };
2658 let got_bytes = encode_all(&dec, &plan_bytes, None);
2659 let mut expected_bytes = Vec::new();
2661 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2662 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2663 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2664 assert_bytes_eq(&got_bytes, &expected_bytes);
2665
2666 let plan_fixed = FieldPlan::Decimal { size: Some(32) };
2668 let got_fixed = encode_all(&dec, &plan_fixed, None);
2669 let mut expected_fixed = Vec::new();
2670 expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
2671 expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
2672 expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
2673 assert_bytes_eq(&got_fixed, &expected_fixed);
2674 }
2675
2676 #[cfg(feature = "small_decimals")]
2677 #[test]
2678 fn decimal_bytes_and_fixed_32() {
2679 let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
2681 .with_precision_and_scale(9, 0)
2682 .unwrap();
2683 let plan_bytes = FieldPlan::Decimal { size: None };
2685 let got_bytes = encode_all(&dec, &plan_bytes, None);
2686 let mut expected_bytes = Vec::new();
2687 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2688 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2689 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2690 assert_bytes_eq(&got_bytes, &expected_bytes);
2691 let plan_fixed = FieldPlan::Decimal { size: Some(4) };
2693 let got_fixed = encode_all(&dec, &plan_fixed, None);
2694 let mut expected_fixed = Vec::new();
2695 expected_fixed.extend_from_slice(&1i32.to_be_bytes());
2696 expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
2697 expected_fixed.extend_from_slice(&0i32.to_be_bytes());
2698 assert_bytes_eq(&got_fixed, &expected_fixed);
2699 }
2700
2701 #[cfg(feature = "small_decimals")]
2702 #[test]
2703 fn decimal_bytes_and_fixed_64() {
2704 let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
2706 .with_precision_and_scale(18, 0)
2707 .unwrap();
2708 let plan_bytes = FieldPlan::Decimal { size: None };
2710 let got_bytes = encode_all(&dec, &plan_bytes, None);
2711 let mut expected_bytes = Vec::new();
2712 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2713 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2714 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2715 assert_bytes_eq(&got_bytes, &expected_bytes);
2716 let plan_fixed = FieldPlan::Decimal { size: Some(8) };
2718 let got_fixed = encode_all(&dec, &plan_fixed, None);
2719 let mut expected_fixed = Vec::new();
2720 expected_fixed.extend_from_slice(&1i64.to_be_bytes());
2721 expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
2722 expected_fixed.extend_from_slice(&0i64.to_be_bytes());
2723 assert_bytes_eq(&got_fixed, &expected_fixed);
2724 }
2725
2726 #[test]
2727 fn float32_and_float64_encoders() {
2728 let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
2730 let mut expected32 = Vec::new();
2732 for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
2733 expected32.extend_from_slice(&v.to_bits().to_le_bytes());
2734 }
2735 let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
2736 assert_bytes_eq(&got32, &expected32);
2737 let mut expected64 = Vec::new();
2739 for v in [0.0f64, -2.25f64] {
2740 expected64.extend_from_slice(&v.to_bits().to_le_bytes());
2741 }
2742 let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
2743 assert_bytes_eq(&got64, &expected64);
2744 }
2745
2746 #[test]
2747 fn long_encoder_int64() {
2748 let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
2749 let mut expected = Vec::new();
2750 for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
2751 expected.extend(avro_long_bytes(v));
2752 }
2753 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2754 assert_bytes_eq(&got, &expected);
2755 }
2756
2757 #[test]
2758 fn fixed_encoder_plain() {
2759 let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
2761 let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
2762 let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
2763 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2764 let mut expected = Vec::new();
2765 expected.extend_from_slice(&data[0]);
2766 expected.extend_from_slice(&data[1]);
2767 assert_bytes_eq(&got, &expected);
2768 }
2769
2770 #[test]
2771 fn uuid_encoder_test() {
2772 let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
2774 let bytes = *u.as_bytes();
2775 let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
2776 let mut expected = Vec::new();
2778 expected.push(0x48);
2779 expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
2780 let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
2781 assert_bytes_eq(&got, &expected);
2782 }
2783
2784 #[test]
2785 fn uuid_encoder_error() {
2786 let arr =
2788 FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
2789 .unwrap();
2790 let plan = FieldPlan::Uuid;
2791 let mut enc = FieldEncoder::make_encoder(&arr, &plan, None).unwrap();
2792 let mut out = Vec::new();
2793 let err = enc.encode(&mut out, 0).unwrap_err();
2794 match err {
2795 AvroError::InvalidArgument(msg) => {
2796 assert!(msg.contains("Invalid UUID bytes"))
2797 }
2798 other => panic!("expected InvalidArgument, got {other:?}"),
2799 }
2800 }
2801
2802 fn test_scalar_primitive_encoding<T>(
2803 non_nullable_data: &[T::Native],
2804 nullable_data: &[Option<T::Native>],
2805 ) where
2806 T: ArrowPrimitiveType,
2807 T::Native: Into<i64> + Copy,
2808 PrimitiveArray<T>: From<Vec<<T as ArrowPrimitiveType>::Native>>,
2809 {
2810 let plan = FieldPlan::Scalar;
2811
2812 let array = PrimitiveArray::<T>::from(non_nullable_data.to_vec());
2813 let got = encode_all(&array, &plan, None);
2814
2815 let mut expected = Vec::new();
2816 for &value in non_nullable_data {
2817 expected.extend(avro_long_bytes(value.into()));
2818 }
2819 assert_bytes_eq(&got, &expected);
2820
2821 let array_nullable: PrimitiveArray<T> = nullable_data.iter().copied().collect();
2822 let got_nullable = encode_all(&array_nullable, &plan, Some(Nullability::NullFirst));
2823
2824 let mut expected_nullable = Vec::new();
2825 for &opt_value in nullable_data {
2826 match opt_value {
2827 Some(value) => {
2828 expected_nullable.extend(avro_long_bytes(1));
2830 expected_nullable.extend(avro_long_bytes(value.into()));
2831 }
2832 None => {
2833 expected_nullable.extend(avro_long_bytes(0));
2835 }
2836 }
2837 }
2838 assert_bytes_eq(&got_nullable, &expected_nullable);
2839 }
2840
2841 #[test]
2842 fn date32_encoder() {
2843 test_scalar_primitive_encoding::<Date32Type>(
2844 &[
2845 19345, 0, -1, ],
2849 &[Some(19345), None],
2850 );
2851 }
2852
2853 #[test]
2854 fn time32_millis_encoder() {
2855 test_scalar_primitive_encoding::<Time32MillisecondType>(
2856 &[
2857 0, 49530123, 86399999, ],
2861 &[None, Some(49530123)],
2862 );
2863 }
2864
2865 #[test]
2866 fn time64_micros_encoder() {
2867 test_scalar_primitive_encoding::<Time64MicrosecondType>(
2868 &[
2869 0, 86399999999, ],
2872 &[Some(86399999999), None],
2873 );
2874 }
2875
2876 #[test]
2877 fn timestamp_millis_encoder() {
2878 test_scalar_primitive_encoding::<TimestampMillisecondType>(
2879 &[
2880 1704067200000, 0, -123456789, ],
2884 &[None, Some(1704067200000)],
2885 );
2886 }
2887
2888 #[test]
2889 fn map_encoder_string_keys_int_values() {
2890 let keys = StringArray::from(vec!["k1", "k2"]);
2894 let values = Int32Array::from(vec![1, 2]);
2895 let entries_fields = Fields::from(vec![
2896 Field::new("key", DataType::Utf8, false),
2897 Field::new("value", DataType::Int32, true),
2898 ]);
2899 let entries = StructArray::new(
2900 entries_fields,
2901 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2902 None,
2903 );
2904 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
2905 let map = MapArray::new(
2906 Field::new("entries", entries.data_type().clone(), false).into(),
2907 offsets,
2908 entries,
2909 None,
2910 false,
2911 );
2912 let plan = FieldPlan::Map {
2913 values_nullability: None,
2914 value_plan: Box::new(FieldPlan::Scalar),
2915 };
2916 let got = encode_all(&map, &plan, None);
2917 let mut expected = Vec::new();
2918 expected.extend(avro_long_bytes(2));
2920 expected.extend(avro_len_prefixed_bytes(b"k1"));
2921 expected.extend(avro_long_bytes(1));
2922 expected.extend(avro_len_prefixed_bytes(b"k2"));
2923 expected.extend(avro_long_bytes(2));
2924 expected.extend(avro_long_bytes(0));
2925 expected.extend(avro_long_bytes(0));
2927 assert_bytes_eq(&got, &expected);
2928 }
2929
2930 #[test]
2931 fn union_encoder_string_int() {
2932 let strings = StringArray::from(vec!["hello", "world"]);
2933 let ints = Int32Array::from(vec![10, 20, 30]);
2934
2935 let union_fields = UnionFields::try_new(
2936 vec![0, 1],
2937 vec![
2938 Field::new("v_str", DataType::Utf8, true),
2939 Field::new("v_int", DataType::Int32, true),
2940 ],
2941 )
2942 .unwrap();
2943
2944 let type_ids = Buffer::from_slice_ref([0_i8, 1, 1, 0, 1]);
2945 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2946
2947 let union_array = UnionArray::try_new(
2948 union_fields,
2949 type_ids.into(),
2950 Some(offsets.into()),
2951 vec![Arc::new(strings), Arc::new(ints)],
2952 )
2953 .unwrap();
2954
2955 let plan = FieldPlan::Union {
2956 bindings: vec![
2957 FieldBinding {
2958 arrow_index: 0,
2959 nullability: None,
2960 plan: FieldPlan::Scalar,
2961 },
2962 FieldBinding {
2963 arrow_index: 1,
2964 nullability: None,
2965 plan: FieldPlan::Scalar,
2966 },
2967 ],
2968 };
2969
2970 let got = encode_all(&union_array, &plan, None);
2971
2972 let mut expected = Vec::new();
2973 expected.extend(avro_long_bytes(0));
2974 expected.extend(avro_len_prefixed_bytes(b"hello"));
2975 expected.extend(avro_long_bytes(1));
2976 expected.extend(avro_long_bytes(10));
2977 expected.extend(avro_long_bytes(1));
2978 expected.extend(avro_long_bytes(20));
2979 expected.extend(avro_long_bytes(0));
2980 expected.extend(avro_len_prefixed_bytes(b"world"));
2981 expected.extend(avro_long_bytes(1));
2982 expected.extend(avro_long_bytes(30));
2983
2984 assert_bytes_eq(&got, &expected);
2985 }
2986
2987 #[test]
2988 fn union_encoder_null_string_int() {
2989 let nulls = NullArray::new(1);
2990 let strings = StringArray::from(vec!["hello"]);
2991 let ints = Int32Array::from(vec![10]);
2992
2993 let union_fields = UnionFields::try_new(
2994 vec![0, 1, 2],
2995 vec![
2996 Field::new("v_null", DataType::Null, true),
2997 Field::new("v_str", DataType::Utf8, true),
2998 Field::new("v_int", DataType::Int32, true),
2999 ],
3000 )
3001 .unwrap();
3002
3003 let type_ids = Buffer::from_slice_ref([0_i8, 1, 2]);
3004 let offsets = Buffer::from_slice_ref([0_i32, 0, 0]);
3008
3009 let union_array = UnionArray::try_new(
3010 union_fields,
3011 type_ids.into(),
3012 Some(offsets.into()),
3013 vec![Arc::new(nulls), Arc::new(strings), Arc::new(ints)],
3014 )
3015 .unwrap();
3016
3017 let plan = FieldPlan::Union {
3018 bindings: vec![
3019 FieldBinding {
3020 arrow_index: 0,
3021 nullability: None,
3022 plan: FieldPlan::Scalar,
3023 },
3024 FieldBinding {
3025 arrow_index: 1,
3026 nullability: None,
3027 plan: FieldPlan::Scalar,
3028 },
3029 FieldBinding {
3030 arrow_index: 2,
3031 nullability: None,
3032 plan: FieldPlan::Scalar,
3033 },
3034 ],
3035 };
3036
3037 let got = encode_all(&union_array, &plan, None);
3038
3039 let mut expected = Vec::new();
3040 expected.extend(avro_long_bytes(0));
3041 expected.extend(avro_long_bytes(1));
3042 expected.extend(avro_len_prefixed_bytes(b"hello"));
3043 expected.extend(avro_long_bytes(2));
3044 expected.extend(avro_long_bytes(10));
3045
3046 assert_bytes_eq(&got, &expected);
3047 }
3048
3049 #[test]
3050 fn list64_encoder_int32() {
3051 let values = Int32Array::from(vec![1, 2, 3]);
3053 let offsets: Vec<i64> = vec![0, 3, 3];
3054 let list = LargeListArray::new(
3055 Field::new("item", DataType::Int32, true).into(),
3056 arrow_buffer::OffsetBuffer::new(offsets.into()),
3057 Arc::new(values) as ArrayRef,
3058 None,
3059 );
3060 let plan = FieldPlan::List {
3061 items_nullability: None,
3062 item_plan: Box::new(FieldPlan::Scalar),
3063 };
3064 let got = encode_all(&list, &plan, None);
3065 let mut expected = Vec::new();
3067 expected.extend(avro_long_bytes(3));
3068 expected.extend(avro_long_bytes(1));
3069 expected.extend(avro_long_bytes(2));
3070 expected.extend(avro_long_bytes(3));
3071 expected.extend(avro_long_bytes(0));
3072 expected.extend(avro_long_bytes(0));
3073 assert_bytes_eq(&got, &expected);
3074 }
3075
3076 #[test]
3077 fn int_encoder_test() {
3078 let ints = Int32Array::from(vec![0, -1, 2]);
3079 let mut expected_i = Vec::new();
3080 for v in [0i32, -1, 2] {
3081 expected_i.extend(avro_long_bytes(v as i64));
3082 }
3083 let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
3084 assert_bytes_eq(&got_i, &expected_i);
3085 }
3086
3087 #[test]
3088 fn boolean_encoder_test() {
3089 let bools = BooleanArray::from(vec![true, false]);
3090 let mut expected_b = Vec::new();
3091 expected_b.extend_from_slice(&[1]);
3092 expected_b.extend_from_slice(&[0]);
3093 let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
3094 assert_bytes_eq(&got_b, &expected_b);
3095 }
3096
3097 #[test]
3098 #[cfg(feature = "avro_custom_types")]
3099 fn duration_encoding_seconds() {
3100 let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
3101 let mut expected = Vec::new();
3102 for v in [0i64, -1, 2] {
3103 expected.extend_from_slice(&avro_long_bytes(v));
3104 }
3105 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3106 assert_bytes_eq(&got, &expected);
3107 }
3108
3109 #[test]
3110 #[cfg(feature = "avro_custom_types")]
3111 fn duration_encoding_milliseconds() {
3112 let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
3113 let mut expected = Vec::new();
3114 for v in [1i64, 0, -2] {
3115 expected.extend_from_slice(&avro_long_bytes(v));
3116 }
3117 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3118 assert_bytes_eq(&got, &expected);
3119 }
3120
3121 #[test]
3122 #[cfg(feature = "avro_custom_types")]
3123 fn duration_encoding_microseconds() {
3124 let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
3125 let mut expected = Vec::new();
3126 for v in [5i64, -6, 7] {
3127 expected.extend_from_slice(&avro_long_bytes(v));
3128 }
3129 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3130 assert_bytes_eq(&got, &expected);
3131 }
3132
3133 #[test]
3134 #[cfg(feature = "avro_custom_types")]
3135 fn duration_encoding_nanoseconds() {
3136 let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
3137 let mut expected = Vec::new();
3138 for v in [8i64, 9, -10] {
3139 expected.extend_from_slice(&avro_long_bytes(v));
3140 }
3141 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3142 assert_bytes_eq(&got, &expected);
3143 }
3144
3145 #[test]
3146 fn duration_encoder_year_month_happy_path() {
3147 let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
3148 let mut expected = Vec::new();
3149 for m in [0u32, 1u32, 25u32] {
3150 expected.extend_from_slice(&duration_fixed12(m, 0, 0));
3151 }
3152 let got = encode_all(&arr, &FieldPlan::Duration, None);
3153 assert_bytes_eq(&got, &expected);
3154 }
3155
3156 #[test]
3157 fn duration_encoder_year_month_rejects_negative() {
3158 let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
3159 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Duration, None).unwrap();
3160 let mut out = Vec::new();
3161 let err = enc.encode(&mut out, 0).unwrap_err();
3162 match err {
3163 AvroError::InvalidArgument(msg) => {
3164 assert!(msg.contains("cannot encode negative months"))
3165 }
3166 other => panic!("expected InvalidArgument, got {other:?}"),
3167 }
3168 }
3169
3170 #[test]
3171 fn duration_encoder_day_time_happy_path() {
3172 let v0 = IntervalDayTimeType::make_value(2, 500); let v1 = IntervalDayTimeType::make_value(0, 0);
3174 let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
3175 let mut expected = Vec::new();
3176 expected.extend_from_slice(&duration_fixed12(0, 2, 500));
3177 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
3178 let got = encode_all(&arr, &FieldPlan::Duration, None);
3179 assert_bytes_eq(&got, &expected);
3180 }
3181
3182 #[test]
3183 fn duration_encoder_day_time_rejects_negative() {
3184 let bad = IntervalDayTimeType::make_value(-1, 0);
3185 let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
3186 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Duration, None).unwrap();
3187 let mut out = Vec::new();
3188 let err = enc.encode(&mut out, 0).unwrap_err();
3189 match err {
3190 AvroError::InvalidArgument(msg) => {
3191 assert!(msg.contains("cannot encode negative days"))
3192 }
3193 other => panic!("expected InvalidArgument, got {other:?}"),
3194 }
3195 }
3196
3197 #[cfg(feature = "avro_custom_types")]
3198 #[test]
3199 fn interval_month_day_nano_fixed_encoder_happy_path() {
3200 let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3); let v1 = IntervalMonthDayNanoType::make_value(-4, -5, -6);
3203 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
3204
3205 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3206 let mut expected = Vec::new();
3207 expected.extend_from_slice(&interval_mdn_fixed16(1, 2, 3));
3208 expected.extend_from_slice(&interval_mdn_fixed16(-4, -5, -6));
3209 assert_bytes_eq(&got, &expected);
3210 }
3211
3212 #[test]
3213 fn duration_encoder_month_day_nano_happy_path() {
3214 let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
3216 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
3217 let mut expected = Vec::new();
3218 expected.extend_from_slice(&duration_fixed12(1, 2, 3));
3219 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
3220 let got = encode_all(&arr, &FieldPlan::Duration, None);
3221 assert_bytes_eq(&got, &expected);
3222 }
3223
3224 #[test]
3225 fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
3226 let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
3227 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
3228 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Duration, None).unwrap();
3229 let mut out = Vec::new();
3230 let err = enc.encode(&mut out, 0).unwrap_err();
3231 match err {
3232 AvroError::InvalidArgument(msg) => {
3233 assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
3234 }
3235 other => panic!("expected InvalidArgument, got {other:?}"),
3236 }
3237 }
3238
3239 #[test]
3240 fn minimal_twos_complement_test() {
3241 let pos = [0x00, 0x00, 0x01];
3242 assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
3243 let neg = [0xFF, 0xFF, 0x80]; assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
3245 let zero = [0x00, 0x00, 0x00];
3246 assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
3247 }
3248
3249 #[test]
3250 fn write_sign_extend_test() {
3251 let mut out = Vec::new();
3252 write_sign_extended(&mut out, &[0x01], 4).unwrap();
3253 assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
3254 out.clear();
3255 write_sign_extended(&mut out, &[0xFF], 4).unwrap();
3256 assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
3257 out.clear();
3258 write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
3260 assert_eq!(out, vec![0xFF, 0x80]);
3261 out.clear();
3262 let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
3264 match err {
3265 AvroError::InvalidArgument(_) => {}
3266 _ => panic!("expected InvalidArgument"),
3267 }
3268 }
3269
3270 #[test]
3271 fn duration_month_day_nano_overflow_millis() {
3272 let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
3274 let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
3275 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
3276 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Duration, None).unwrap();
3277 let mut out = Vec::new();
3278 let err = enc.encode(&mut out, 0).unwrap_err();
3279 match err {
3280 AvroError::InvalidArgument(msg) => assert!(msg.contains("exceed u32::MAX")),
3281 _ => panic!("expected InvalidArgument"),
3282 }
3283 }
3284
3285 #[test]
3286 fn fieldplan_decimal_precision_scale_mismatch_errors() {
3287 use crate::codec::Codec;
3289 use std::collections::HashMap;
3290 let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
3291 let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3292 let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
3293 match err {
3294 AvroError::SchemaError(msg) => {
3295 assert!(msg.contains("Decimal precision/scale mismatch"))
3296 }
3297 _ => panic!("expected SchemaError"),
3298 }
3299 }
3300
3301 #[test]
3302 fn timestamp_micros_encoder() {
3303 test_scalar_primitive_encoding::<TimestampMicrosecondType>(
3305 &[
3306 1_704_067_200_000_000, 0, -123_456_789, ],
3310 &[None, Some(1_704_067_200_000_000)],
3311 );
3312 }
3313
3314 #[test]
3315 fn list_encoder_nullable_items_null_first() {
3316 let values = Int32Array::from(vec![Some(1), None, Some(2)]);
3318 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
3319 let list = ListArray::new(
3320 Field::new("item", DataType::Int32, true).into(),
3321 offsets,
3322 Arc::new(values) as ArrayRef,
3323 None,
3324 );
3325
3326 let plan = FieldPlan::List {
3327 items_nullability: Some(Nullability::NullFirst),
3328 item_plan: Box::new(FieldPlan::Scalar),
3329 };
3330
3331 let mut expected = Vec::new();
3334 expected.extend(avro_long_bytes(3)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(2)); expected.extend(avro_long_bytes(0)); let got = encode_all(&list, &plan, None);
3343 assert_bytes_eq(&got, &expected);
3344 }
3345
3346 #[test]
3347 fn large_list_encoder_nullable_items_null_first() {
3348 let values = Int32Array::from(vec![Some(10), None]);
3350 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
3351 let list = LargeListArray::new(
3352 Field::new("item", DataType::Int32, true).into(),
3353 offsets,
3354 Arc::new(values) as ArrayRef,
3355 None,
3356 );
3357
3358 let plan = FieldPlan::List {
3359 items_nullability: Some(Nullability::NullFirst),
3360 item_plan: Box::new(FieldPlan::Scalar),
3361 };
3362
3363 let mut expected = Vec::new();
3364 expected.extend(avro_long_bytes(2)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(10)); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(0)); let got = encode_all(&list, &plan, None);
3371 assert_bytes_eq(&got, &expected);
3372 }
3373
3374 #[test]
3375 fn map_encoder_string_keys_nullable_int_values_null_first() {
3376 let keys = StringArray::from(vec!["k1", "k2"]);
3378 let values = Int32Array::from(vec![Some(7), None]);
3379
3380 let entries_fields = Fields::from(vec![
3381 Field::new("key", DataType::Utf8, false),
3382 Field::new("value", DataType::Int32, true),
3383 ]);
3384 let entries = StructArray::new(
3385 entries_fields,
3386 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
3387 None,
3388 );
3389
3390 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
3392 let map = MapArray::new(
3393 Field::new("entries", entries.data_type().clone(), false).into(),
3394 offsets,
3395 entries,
3396 None,
3397 false,
3398 );
3399
3400 let plan = FieldPlan::Map {
3401 values_nullability: Some(Nullability::NullFirst),
3402 value_plan: Box::new(FieldPlan::Scalar),
3403 };
3404
3405 let mut expected = Vec::new();
3411 expected.extend(avro_long_bytes(2)); expected.extend(avro_len_prefixed_bytes(b"k1")); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(7)); expected.extend(avro_len_prefixed_bytes(b"k2")); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(0)); let got = encode_all(&map, &plan, None);
3420 assert_bytes_eq(&got, &expected);
3421 }
3422
3423 #[test]
3424 fn time32_seconds_to_millis_encoder() {
3425 let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
3427 vec![0i32, 1, -2, 12_345].into();
3428 let got = encode_all(&arr, &FieldPlan::TimeMillisFromSecs, None);
3429 let mut expected = Vec::new();
3430 for secs in [0i32, 1, -2, 12_345] {
3431 let millis = (secs as i64) * 1000;
3432 expected.extend_from_slice(&avro_long_bytes(millis));
3433 }
3434 assert_bytes_eq(&got, &expected);
3435 }
3436
3437 #[test]
3438 fn time32_seconds_to_millis_overflow() {
3439 let overflow_secs: i32 = i32::MAX / 1000 + 1;
3441 let arr: PrimitiveArray<Time32SecondType> = vec![overflow_secs].into();
3442 let mut enc =
3443 FieldEncoder::make_encoder(&arr, &FieldPlan::TimeMillisFromSecs, None).unwrap();
3444 let mut out = Vec::new();
3445 let err = enc.encode(&mut out, 0).unwrap_err();
3446 match err {
3447 AvroError::InvalidArgument(msg) => {
3448 assert!(
3449 msg.contains("overflowed") || msg.contains("overflow"),
3450 "unexpected message: {msg}"
3451 )
3452 }
3453 other => panic!("expected InvalidArgument, got {other:?}"),
3454 }
3455 }
3456
3457 #[test]
3458 fn time32_seconds_to_millis_type_mismatch_returns_schema_error() {
3459 let arr = Int32Array::from(vec![1, 2, 3]);
3460 match FieldEncoder::make_encoder(&arr, &FieldPlan::TimeMillisFromSecs, None) {
3461 Err(AvroError::SchemaError(msg)) => {
3462 assert!(msg.contains("Time32(Second)"), "unexpected message: {msg}");
3463 assert!(msg.contains("Int32"), "unexpected message: {msg}");
3464 }
3465 Ok(_) => panic!("expected SchemaError"),
3466 Err(other) => panic!("expected SchemaError, got {other:?}"),
3467 }
3468 }
3469
3470 #[test]
3471 fn encode_rows_time32_seconds_plan_rejects_millisecond_column() {
3472 let schema = ArrowSchema::new(vec![Field::new(
3473 "t",
3474 DataType::Time32(TimeUnit::Millisecond),
3475 false,
3476 )]);
3477 let arr: PrimitiveArray<Time32MillisecondType> = vec![1i32].into();
3478 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr)]).unwrap();
3479 let encoder = RecordEncoder {
3480 columns: vec![FieldBinding {
3481 arrow_index: 0,
3482 nullability: None,
3483 plan: FieldPlan::TimeMillisFromSecs,
3484 }],
3485 prefix: None,
3486 };
3487
3488 let mut out = BytesMut::new();
3489 let mut offsets = vec![0usize];
3490 let err = encoder
3491 .encode_rows(&batch, 16, &mut out, &mut offsets)
3492 .unwrap_err();
3493 match err {
3494 AvroError::SchemaError(msg) => {
3495 assert!(msg.contains("Time32(Second)"), "unexpected message: {msg}");
3496 assert!(
3497 msg.contains("Time32(Millisecond)"),
3498 "unexpected message: {msg}"
3499 );
3500 }
3501 other => panic!("expected SchemaError, got {other:?}"),
3502 }
3503 }
3504
3505 #[cfg(not(feature = "avro_custom_types"))]
3506 #[test]
3507 fn timestamp_seconds_to_millis_encoder() {
3508 let arr: PrimitiveArray<TimestampSecondType> = vec![0i64, 1, -1, 1_234_567_890].into();
3510 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3511 let mut expected = Vec::new();
3512 for secs in [0i64, 1, -1, 1_234_567_890] {
3513 let millis = secs * 1000;
3514 expected.extend_from_slice(&avro_long_bytes(millis));
3515 }
3516 assert_bytes_eq(&got, &expected);
3517 }
3518
3519 #[cfg(not(feature = "avro_custom_types"))]
3520 #[test]
3521 fn timestamp_seconds_to_millis_overflow() {
3522 let overflow_secs: i64 = i64::MAX / 1000 + 1;
3524 let arr: PrimitiveArray<TimestampSecondType> = vec![overflow_secs].into();
3525 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
3526 let mut out = Vec::new();
3527 let err = enc.encode(&mut out, 0).unwrap_err();
3528 match err {
3529 AvroError::InvalidArgument(msg) => {
3530 assert!(
3531 msg.contains("overflowed") || msg.contains("overflow"),
3532 "unexpected message: {msg}"
3533 )
3534 }
3535 other => panic!("expected InvalidArgument, got {other:?}"),
3536 }
3537 }
3538
3539 #[test]
3540 fn timestamp_nanos_encoder() {
3541 let arr: PrimitiveArray<TimestampNanosecondType> = vec![0i64, 1, -1, 123].into();
3542 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3543 let mut expected = Vec::new();
3544 for ns in [0i64, 1, -1, 123] {
3545 expected.extend_from_slice(&avro_long_bytes(ns));
3546 }
3547 assert_bytes_eq(&got, &expected);
3548 }
3549
3550 #[test]
3551 fn union_encoder_string_int_nonzero_type_ids() {
3552 let strings = StringArray::from(vec!["hello", "world"]);
3553 let ints = Int32Array::from(vec![10, 20, 30]);
3554 let union_fields = UnionFields::try_new(
3555 vec![2, 5],
3556 vec![
3557 Field::new("v_str", DataType::Utf8, true),
3558 Field::new("v_int", DataType::Int32, true),
3559 ],
3560 )
3561 .unwrap();
3562 let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
3563 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
3564 let union_array = UnionArray::try_new(
3565 union_fields,
3566 type_ids.into(),
3567 Some(offsets.into()),
3568 vec![Arc::new(strings), Arc::new(ints)],
3569 )
3570 .unwrap();
3571 let plan = FieldPlan::Union {
3572 bindings: vec![
3573 FieldBinding {
3574 arrow_index: 0,
3575 nullability: None,
3576 plan: FieldPlan::Scalar,
3577 },
3578 FieldBinding {
3579 arrow_index: 1,
3580 nullability: None,
3581 plan: FieldPlan::Scalar,
3582 },
3583 ],
3584 };
3585 let got = encode_all(&union_array, &plan, None);
3586 let mut expected = Vec::new();
3587 expected.extend(avro_long_bytes(0));
3588 expected.extend(avro_len_prefixed_bytes(b"hello"));
3589 expected.extend(avro_long_bytes(1));
3590 expected.extend(avro_long_bytes(10));
3591 expected.extend(avro_long_bytes(1));
3592 expected.extend(avro_long_bytes(20));
3593 expected.extend(avro_long_bytes(0));
3594 expected.extend(avro_len_prefixed_bytes(b"world"));
3595 expected.extend(avro_long_bytes(1));
3596 expected.extend(avro_long_bytes(30));
3597 assert_bytes_eq(&got, &expected);
3598 }
3599
3600 #[test]
3601 fn nullable_state_with_null_buffer_and_zero_nulls() {
3602 let values = vec![1i32, 2, 3];
3603 let arr = Int32Array::from_iter_values_with_nulls(values, Some(NullBuffer::new_valid(3)));
3604 assert_eq!(arr.null_count(), 0);
3605 assert!(arr.nulls().is_some());
3606 let plan = FieldPlan::Scalar;
3607 let enc = FieldEncoder::make_encoder(&arr, &plan, Some(Nullability::NullFirst)).unwrap();
3608 match enc.null_state {
3609 NullState::NullableNoNulls { union_value_byte } => {
3610 assert_eq!(
3611 union_value_byte,
3612 union_value_branch_byte(Nullability::NullFirst, false)
3613 );
3614 }
3615 other => panic!("expected NullableNoNulls, got {other:?}"),
3616 }
3617 }
3618
3619 #[test]
3620 fn encode_rows_single_column_int32() {
3621 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3622 let arr = Int32Array::from(vec![1, 2, 3]);
3623 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3624 let encoder = RecordEncoder {
3625 columns: vec![FieldBinding {
3626 arrow_index: 0,
3627 nullability: None,
3628 plan: FieldPlan::Scalar,
3629 }],
3630 prefix: None,
3631 };
3632 let mut out = BytesMut::new();
3633 let mut offsets: Vec<usize> = vec![0];
3634 encoder
3635 .encode_rows(&batch, 16, &mut out, &mut offsets)
3636 .unwrap();
3637 assert_eq!(offsets.len(), 4);
3638 assert_eq!(*offsets.last().unwrap(), out.len());
3639 assert_bytes_eq(row_slice(&out, &offsets, 0), &avro_long_bytes(1));
3640 assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(2));
3641 assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(3));
3642 }
3643
3644 #[test]
3645 fn encode_rows_multiple_columns() {
3646 let schema = ArrowSchema::new(vec![
3647 Field::new("a", DataType::Int32, false),
3648 Field::new("b", DataType::Utf8, false),
3649 ]);
3650 let int_arr = Int32Array::from(vec![10, 20]);
3651 let str_arr = StringArray::from(vec!["hello", "world"]);
3652 let batch = RecordBatch::try_new(
3653 Arc::new(schema.clone()),
3654 vec![Arc::new(int_arr), Arc::new(str_arr)],
3655 )
3656 .unwrap();
3657 let encoder = RecordEncoder {
3658 columns: vec![
3659 FieldBinding {
3660 arrow_index: 0,
3661 nullability: None,
3662 plan: FieldPlan::Scalar,
3663 },
3664 FieldBinding {
3665 arrow_index: 1,
3666 nullability: None,
3667 plan: FieldPlan::Scalar,
3668 },
3669 ],
3670 prefix: None,
3671 };
3672 let mut out = BytesMut::new();
3673 let mut offsets: Vec<usize> = vec![0];
3674 encoder
3675 .encode_rows(&batch, 32, &mut out, &mut offsets)
3676 .unwrap();
3677 assert_eq!(offsets.len(), 3);
3678 assert_eq!(*offsets.last().unwrap(), out.len());
3679 let mut expected_row0 = Vec::new();
3680 expected_row0.extend(avro_long_bytes(10));
3681 expected_row0.extend(avro_len_prefixed_bytes(b"hello"));
3682 assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3683 let mut expected_row1 = Vec::new();
3684 expected_row1.extend(avro_long_bytes(20));
3685 expected_row1.extend(avro_len_prefixed_bytes(b"world"));
3686 assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3687 }
3688
3689 #[test]
3690 fn encode_rows_with_prefix() {
3691 use crate::codec::AvroFieldBuilder;
3692 use crate::schema::AvroSchema;
3693 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3694 let arr = Int32Array::from(vec![42]);
3695 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3696 let avro_schema = AvroSchema::try_from(&schema).unwrap();
3697 let fingerprint = avro_schema
3698 .fingerprint(crate::schema::FingerprintAlgorithm::Rabin)
3699 .unwrap();
3700 let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
3701 .build()
3702 .unwrap();
3703 let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
3704 .with_fingerprint(Some(fingerprint))
3705 .build()
3706 .unwrap();
3707 let mut out = BytesMut::new();
3708 let mut offsets: Vec<usize> = vec![0];
3709 encoder
3710 .encode_rows(&batch, 32, &mut out, &mut offsets)
3711 .unwrap();
3712 assert_eq!(offsets.len(), 2);
3713 let row0 = row_slice(&out, &offsets, 0);
3714 assert!(row0.len() > 10, "Row should contain prefix + encoded value");
3715 assert_eq!(row0[0], 0xC3);
3716 assert_eq!(row0[1], 0x01);
3717 }
3718
3719 #[test]
3720 fn encode_rows_empty_batch() {
3721 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3722 let arr = Int32Array::from(Vec::<i32>::new());
3723 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3724 let encoder = RecordEncoder {
3725 columns: vec![FieldBinding {
3726 arrow_index: 0,
3727 nullability: None,
3728 plan: FieldPlan::Scalar,
3729 }],
3730 prefix: None,
3731 };
3732 let mut out = BytesMut::new();
3733 let mut offsets: Vec<usize> = vec![0];
3734 encoder
3735 .encode_rows(&batch, 16, &mut out, &mut offsets)
3736 .unwrap();
3737 assert_eq!(offsets, vec![0]);
3738 assert!(out.is_empty());
3739 }
3740
3741 #[test]
3742 fn encode_rows_matches_encode_output() {
3743 let schema = ArrowSchema::new(vec![
3744 Field::new("a", DataType::Int64, false),
3745 Field::new("b", DataType::Float64, false),
3746 ]);
3747 let int_arr = Int64Array::from(vec![100i64, 200, 300]);
3748 let float_arr = Float64Array::from(vec![1.5, 2.5, 3.5]);
3749 let batch = RecordBatch::try_new(
3750 Arc::new(schema.clone()),
3751 vec![Arc::new(int_arr), Arc::new(float_arr)],
3752 )
3753 .unwrap();
3754 let encoder = RecordEncoder {
3755 columns: vec![
3756 FieldBinding {
3757 arrow_index: 0,
3758 nullability: None,
3759 plan: FieldPlan::Scalar,
3760 },
3761 FieldBinding {
3762 arrow_index: 1,
3763 nullability: None,
3764 plan: FieldPlan::Scalar,
3765 },
3766 ],
3767 prefix: None,
3768 };
3769 let mut stream_buf = Vec::new();
3770 encoder.encode(&mut stream_buf, &batch).unwrap();
3771 let mut out = BytesMut::new();
3772 let mut offsets: Vec<usize> = vec![0];
3773 encoder
3774 .encode_rows(&batch, 32, &mut out, &mut offsets)
3775 .unwrap();
3776 assert_eq!(offsets.len(), 1 + batch.num_rows());
3777 assert_bytes_eq(&out[..], &stream_buf);
3778 }
3779
3780 #[test]
3781 fn encode_rows_appends_to_existing_buffer() {
3782 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3783 let arr = Int32Array::from(vec![5, 6]);
3784 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3785 let encoder = RecordEncoder {
3786 columns: vec![FieldBinding {
3787 arrow_index: 0,
3788 nullability: None,
3789 plan: FieldPlan::Scalar,
3790 }],
3791 prefix: None,
3792 };
3793 let mut out = BytesMut::new();
3794 out.extend_from_slice(&[0xAA, 0xBB]);
3795 let mut offsets: Vec<usize> = vec![0, out.len()];
3796 encoder
3797 .encode_rows(&batch, 16, &mut out, &mut offsets)
3798 .unwrap();
3799 assert_eq!(offsets.len(), 4);
3800 assert_eq!(*offsets.last().unwrap(), out.len());
3801 assert_bytes_eq(row_slice(&out, &offsets, 0), &[0xAA, 0xBB]);
3802 assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(5));
3803 assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(6));
3804 }
3805
3806 #[test]
3807 fn encode_rows_nullable_column() {
3808 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, true)]);
3809 let arr = Int32Array::from(vec![Some(1), None, Some(3)]);
3810 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3811 let encoder = RecordEncoder {
3812 columns: vec![FieldBinding {
3813 arrow_index: 0,
3814 nullability: Some(Nullability::NullFirst),
3815 plan: FieldPlan::Scalar,
3816 }],
3817 prefix: None,
3818 };
3819 let mut out = BytesMut::new();
3820 let mut offsets: Vec<usize> = vec![0];
3821 encoder
3822 .encode_rows(&batch, 16, &mut out, &mut offsets)
3823 .unwrap();
3824 assert_eq!(offsets.len(), 4);
3825 let mut expected_row0 = Vec::new();
3826 expected_row0.extend(avro_long_bytes(1)); expected_row0.extend(avro_long_bytes(1)); assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3829 let expected_row1 = avro_long_bytes(0); assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3831 let mut expected_row2 = Vec::new();
3832 expected_row2.extend(avro_long_bytes(1)); expected_row2.extend(avro_long_bytes(3)); assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2);
3835 }
3836
3837 #[test]
3838 fn encode_prefix_write_error() {
3839 use crate::codec::AvroFieldBuilder;
3840 use crate::schema::{AvroSchema, FingerprintAlgorithm};
3841 use std::io;
3842
3843 struct FailWriter {
3844 failed: bool,
3845 }
3846
3847 impl io::Write for FailWriter {
3848 fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
3849 if !self.failed {
3850 self.failed = true;
3851 Err(io::Error::other("fail write"))
3852 } else {
3853 Ok(0)
3854 }
3855 }
3856
3857 fn flush(&mut self) -> io::Result<()> {
3858 Ok(())
3859 }
3860 }
3861
3862 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3863 let arr = Int32Array::from(vec![42]);
3864 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3865 let avro_schema = AvroSchema::try_from(&schema).unwrap();
3866 let fingerprint = avro_schema
3867 .fingerprint(FingerprintAlgorithm::Rabin)
3868 .unwrap();
3869 let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
3870 .build()
3871 .unwrap();
3872 let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
3873 .with_fingerprint(Some(fingerprint))
3874 .build()
3875 .unwrap();
3876
3877 let mut writer = FailWriter { failed: false };
3878 let err = encoder.encode(&mut writer, &batch).unwrap_err();
3879 let msg = format!("{err}");
3880 assert!(msg.contains("write prefix"), "unexpected error: {msg}");
3881 }
3882}