1use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::errors::AvroError;
22use crate::schema::{Fingerprint, Nullability, Prefix};
23use arrow_array::cast::AsArray;
24use arrow_array::types::{
25 ArrowPrimitiveType, Date32Type, DurationMicrosecondType, DurationMillisecondType,
26 DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int16Type, Int32Type,
27 Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
28 Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
29 TimestampMillisecondType,
30};
31use arrow_array::types::{
32 RunEndIndexType, Time32SecondType, TimestampNanosecondType, TimestampSecondType,
33};
34use arrow_array::{
35 Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
36 FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray,
37 GenericListViewArray, GenericStringArray, LargeListArray, LargeListViewArray, ListArray,
38 ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
39 StringViewArray, StructArray, UnionArray,
40};
41#[cfg(feature = "small_decimals")]
42use arrow_array::{Decimal32Array, Decimal64Array};
43use arrow_buffer::{ArrowNativeType, NullBuffer};
44use arrow_schema::{DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode};
45use bytes::{BufMut, BytesMut};
46use std::io::Write;
47use std::sync::Arc;
48use uuid::Uuid;
49
50macro_rules! for_rows_with_prefix {
51 ($n:expr, $prefix:expr, $out:ident, |$row:ident| $body:block) => {{
52 match $prefix {
53 Some(prefix) => {
54 for $row in 0..$n {
55 $out.write_all(prefix)
56 .map_err(|e| AvroError::IoError(format!("write prefix: {e}"), e))?;
57 $body
58 }
59 }
60 None => {
61 for $row in 0..$n {
62 $body
63 }
64 }
65 }
66 }};
67}
68
69#[inline]
73pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), AvroError> {
74 let mut zz = ((value << 1) ^ (value >> 63)) as u64;
75 let mut buf = [0u8; 10];
77 let mut i = 0;
78 while (zz & !0x7F) != 0 {
79 buf[i] = ((zz & 0x7F) as u8) | 0x80;
80 i += 1;
81 zz >>= 7;
82 }
83 buf[i] = (zz & 0x7F) as u8;
84 i += 1;
85 out.write_all(&buf[..i])
86 .map_err(|e| AvroError::IoError(format!("write long: {e}"), e))
87}
88
89#[inline]
90fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), AvroError> {
91 write_long(out, value as i64)
92}
93
94#[inline]
95fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), AvroError> {
96 write_long(out, bytes.len() as i64)?;
97 out.write_all(bytes)
98 .map_err(|e| AvroError::IoError(format!("write bytes: {e}"), e))
99}
100
101#[inline]
102fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), AvroError> {
103 out.write_all(&[if v { 1 } else { 0 }])
104 .map_err(|e| AvroError::IoError(format!("write bool: {e}"), e))
105}
106
107#[inline]
116fn minimal_twos_complement(be: &[u8]) -> &[u8] {
117 if be.is_empty() {
118 return be;
119 }
120 let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
121 let mut k = 0usize;
122 while k < be.len() && be[k] == sign_byte {
123 k += 1;
124 }
125 if k == 0 {
126 return be;
127 }
128 if k == be.len() {
129 return &be[be.len() - 1..];
130 }
131 let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
132 k
133 } else {
134 k - 1
135 };
136 &be[drop..]
137}
138
139#[inline]
151fn write_sign_extended<W: Write + ?Sized>(
152 out: &mut W,
153 src_be: &[u8],
154 n: usize,
155) -> Result<(), AvroError> {
156 let len = src_be.len();
157 if len == n {
158 out.write_all(src_be)?;
159 return Ok(());
160 }
161 let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
162 0xFF
163 } else {
164 0x00
165 };
166 if len > n {
167 let extra = len - n;
168 if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
169 return Ok(());
170 }
171 if src_be[..extra].iter().any(|&b| b != sign_byte)
174 || ((src_be[extra] ^ sign_byte) & 0x80) != 0
175 {
176 return Err(AvroError::InvalidArgument(format!(
177 "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
178 )));
179 }
180 return out
181 .write_all(&src_be[extra..])
182 .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e));
183 }
184 let pad_len = n - len;
186 const ZPAD: [u8; 64] = [0x00; 64];
188 const FPAD: [u8; 64] = [0xFF; 64];
189 let pad = if sign_byte == 0x00 {
190 &ZPAD[..]
191 } else {
192 &FPAD[..]
193 };
194 let mut rem = pad_len;
197 while rem >= pad.len() {
198 out.write_all(pad)
199 .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))?;
200 rem -= pad.len();
201 }
202 if rem > 0 {
203 out.write_all(&pad[..rem])
204 .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))?;
205 }
206 out.write_all(src_be)
207 .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))
208}
209
210fn write_optional_index<W: Write + ?Sized>(
216 out: &mut W,
217 is_null: bool,
218 null_order: Nullability,
219) -> Result<(), AvroError> {
220 let byte = union_value_branch_byte(null_order, is_null);
221 out.write_all(&[byte])
222 .map_err(|e| AvroError::IoError(format!("write union branch: {e}"), e))
223}
224
225#[derive(Debug, Clone)]
226enum NullState<'a> {
227 NonNullable,
228 NullableNoNulls {
229 union_value_byte: u8,
230 },
231 Nullable {
232 nulls: &'a NullBuffer,
233 null_order: Nullability,
234 },
235}
236
237pub(crate) struct FieldEncoder<'a> {
241 encoder: Encoder<'a>,
242 null_state: NullState<'a>,
243}
244
245impl<'a> FieldEncoder<'a> {
246 fn make_encoder(
247 array: &'a dyn Array,
248 plan: &FieldPlan,
249 nullability: Option<Nullability>,
250 ) -> Result<Self, AvroError> {
251 let encoder = match plan {
252 FieldPlan::Scalar => match array.data_type() {
253 DataType::Null => Encoder::Null,
254 DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
255 DataType::Utf8 => {
256 Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
257 }
258 DataType::LargeUtf8 => {
259 Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
260 }
261 DataType::Utf8View => {
262 let arr = array
263 .as_any()
264 .downcast_ref::<StringViewArray>()
265 .ok_or_else(|| AvroError::SchemaError("Expected StringViewArray".into()))?;
266 Encoder::Utf8View(Utf8ViewEncoder(arr))
267 }
268 DataType::BinaryView => {
269 let arr = array
270 .as_any()
271 .downcast_ref::<BinaryViewArray>()
272 .ok_or_else(|| AvroError::SchemaError("Expected BinaryViewArray".into()))?;
273 Encoder::BinaryView(BinaryViewEncoder(arr))
274 }
275 DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
276 DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
277 DataType::Date32 => Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
278 DataType::Date64 => {
279 return Err(AvroError::NYI(
280 "Avro logical type 'date' is days since epoch (int). Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a Timestamp."
281 .into(),
282 ));
283 }
284 DataType::Time32(TimeUnit::Second) => Encoder::Time32SecsToMillis(
285 Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
286 ),
287 DataType::Time32(TimeUnit::Millisecond) => {
288 Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
289 }
290 DataType::Time32(TimeUnit::Microsecond) => {
291 return Err(AvroError::InvalidArgument(
292 "Arrow Time32 only supports Second or Millisecond. Use Time64 for microseconds."
293 .into(),
294 ));
295 }
296 DataType::Time32(TimeUnit::Nanosecond) => {
297 return Err(AvroError::InvalidArgument(
298 "Arrow Time32 only supports Second or Millisecond. Use Time64 for nanoseconds."
299 .into(),
300 ));
301 }
302 DataType::Time64(TimeUnit::Microsecond) => Encoder::Time64Micros(LongEncoder(
303 array.as_primitive::<Time64MicrosecondType>(),
304 )),
305 DataType::Time64(TimeUnit::Nanosecond) => {
306 return Err(AvroError::NYI(
307 "Avro writer does not support time-nanos; cast to Time64(Microsecond)."
308 .into(),
309 ));
310 }
311 DataType::Time64(TimeUnit::Millisecond) => {
312 return Err(AvroError::InvalidArgument(
313 "Arrow Time64 with millisecond unit is not a valid Arrow type (use Time32 for millis)."
314 .into(),
315 ));
316 }
317 DataType::Time64(TimeUnit::Second) => {
318 return Err(AvroError::InvalidArgument(
319 "Arrow Time64 with second unit is not a valid Arrow type (use Time32 for seconds)."
320 .into(),
321 ));
322 }
323 DataType::Float32 => {
324 Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
325 }
326 DataType::Float64 => {
327 Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
328 }
329 DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
330 DataType::LargeBinary => {
331 Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
332 }
333 DataType::FixedSizeBinary(_len) => {
334 let arr = array
335 .as_any()
336 .downcast_ref::<FixedSizeBinaryArray>()
337 .ok_or_else(|| {
338 AvroError::SchemaError("Expected FixedSizeBinaryArray".into())
339 })?;
340 Encoder::Fixed(FixedEncoder(arr))
341 }
342 DataType::Timestamp(unit, _) => match unit {
343 TimeUnit::Second => {
344 Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
345 array.as_primitive::<TimestampSecondType>(),
346 ))
347 }
348 TimeUnit::Millisecond => Encoder::TimestampMillis(LongEncoder(
349 array.as_primitive::<TimestampMillisecondType>(),
350 )),
351 TimeUnit::Microsecond => Encoder::TimestampMicros(LongEncoder(
352 array.as_primitive::<TimestampMicrosecondType>(),
353 )),
354 TimeUnit::Nanosecond => Encoder::TimestampNanos(LongEncoder(
355 array.as_primitive::<TimestampNanosecondType>(),
356 )),
357 },
358 DataType::Interval(unit) => match unit {
359 IntervalUnit::MonthDayNano => Encoder::IntervalMonthDayNano(DurationEncoder(
360 array.as_primitive::<IntervalMonthDayNanoType>(),
361 )),
362 IntervalUnit::YearMonth => Encoder::IntervalYearMonth(DurationEncoder(
363 array.as_primitive::<IntervalYearMonthType>(),
364 )),
365 IntervalUnit::DayTime => Encoder::IntervalDayTime(DurationEncoder(
366 array.as_primitive::<IntervalDayTimeType>(),
367 )),
368 },
369 DataType::Duration(tu) => match tu {
370 TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
371 array.as_primitive::<DurationSecondType>(),
372 )),
373 TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
374 array.as_primitive::<DurationMillisecondType>(),
375 )),
376 TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
377 array.as_primitive::<DurationMicrosecondType>(),
378 )),
379 TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
380 array.as_primitive::<DurationNanosecondType>(),
381 )),
382 },
383 other => {
384 return Err(AvroError::NYI(format!(
385 "Avro scalar type not yet supported: {other:?}"
386 )));
387 }
388 },
389 FieldPlan::Struct { bindings } => {
390 let arr = array
391 .as_any()
392 .downcast_ref::<StructArray>()
393 .ok_or_else(|| AvroError::SchemaError("Expected StructArray".into()))?;
394 Encoder::Struct(Box::new(StructEncoder::try_new(arr, bindings)?))
395 }
396 FieldPlan::List {
397 items_nullability,
398 item_plan,
399 } => match array.data_type() {
400 DataType::List(_) => {
401 let arr = array
402 .as_any()
403 .downcast_ref::<ListArray>()
404 .ok_or_else(|| AvroError::SchemaError("Expected ListArray".into()))?;
405 Encoder::List(Box::new(ListEncoder32::try_new(
406 arr,
407 *items_nullability,
408 item_plan.as_ref(),
409 )?))
410 }
411 DataType::LargeList(_) => {
412 let arr = array
413 .as_any()
414 .downcast_ref::<LargeListArray>()
415 .ok_or_else(|| AvroError::SchemaError("Expected LargeListArray".into()))?;
416 Encoder::LargeList(Box::new(ListEncoder64::try_new(
417 arr,
418 *items_nullability,
419 item_plan.as_ref(),
420 )?))
421 }
422 DataType::ListView(_) => {
423 let arr = array
424 .as_any()
425 .downcast_ref::<ListViewArray>()
426 .ok_or_else(|| AvroError::SchemaError("Expected ListViewArray".into()))?;
427 Encoder::ListView(Box::new(ListViewEncoder32::try_new(
428 arr,
429 *items_nullability,
430 item_plan.as_ref(),
431 )?))
432 }
433 DataType::LargeListView(_) => {
434 let arr = array
435 .as_any()
436 .downcast_ref::<LargeListViewArray>()
437 .ok_or_else(|| {
438 AvroError::SchemaError("Expected LargeListViewArray".into())
439 })?;
440 Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
441 arr,
442 *items_nullability,
443 item_plan.as_ref(),
444 )?))
445 }
446 DataType::FixedSizeList(_, _) => {
447 let arr = array
448 .as_any()
449 .downcast_ref::<FixedSizeListArray>()
450 .ok_or_else(|| {
451 AvroError::SchemaError("Expected FixedSizeListArray".into())
452 })?;
453 Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
454 arr,
455 *items_nullability,
456 item_plan.as_ref(),
457 )?))
458 }
459 other => {
460 return Err(AvroError::SchemaError(format!(
461 "Avro array site requires Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
462 )));
463 }
464 },
465 FieldPlan::Decimal { size } => match array.data_type() {
466 #[cfg(feature = "small_decimals")]
467 DataType::Decimal32(_, _) => {
468 let arr = array
469 .as_any()
470 .downcast_ref::<Decimal32Array>()
471 .ok_or_else(|| AvroError::SchemaError("Expected Decimal32Array".into()))?;
472 Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
473 }
474 #[cfg(feature = "small_decimals")]
475 DataType::Decimal64(_, _) => {
476 let arr = array
477 .as_any()
478 .downcast_ref::<Decimal64Array>()
479 .ok_or_else(|| AvroError::SchemaError("Expected Decimal64Array".into()))?;
480 Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
481 }
482 DataType::Decimal128(_, _) => {
483 let arr = array
484 .as_any()
485 .downcast_ref::<Decimal128Array>()
486 .ok_or_else(|| AvroError::SchemaError("Expected Decimal128Array".into()))?;
487 Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
488 }
489 DataType::Decimal256(_, _) => {
490 let arr = array
491 .as_any()
492 .downcast_ref::<Decimal256Array>()
493 .ok_or_else(|| AvroError::SchemaError("Expected Decimal256Array".into()))?;
494 Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
495 }
496 other => {
497 return Err(AvroError::SchemaError(format!(
498 "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
499 )));
500 }
501 },
502 FieldPlan::Uuid => {
503 let arr = array
504 .as_any()
505 .downcast_ref::<FixedSizeBinaryArray>()
506 .ok_or_else(|| {
507 AvroError::SchemaError("Expected FixedSizeBinaryArray".into())
508 })?;
509 Encoder::Uuid(UuidEncoder(arr))
510 }
511 FieldPlan::Map {
512 values_nullability,
513 value_plan,
514 } => {
515 let arr = array
516 .as_any()
517 .downcast_ref::<MapArray>()
518 .ok_or_else(|| AvroError::SchemaError("Expected MapArray".into()))?;
519 Encoder::Map(Box::new(MapEncoder::try_new(
520 arr,
521 *values_nullability,
522 value_plan.as_ref(),
523 )?))
524 }
525 FieldPlan::Enum { symbols } => match array.data_type() {
526 DataType::Dictionary(key_dt, value_dt) => {
527 if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
528 return Err(AvroError::SchemaError(
529 "Avro enum requires Dictionary<Int32, Utf8>".into(),
530 ));
531 }
532 let dict = array
533 .as_any()
534 .downcast_ref::<DictionaryArray<Int32Type>>()
535 .ok_or_else(|| {
536 AvroError::SchemaError("Expected DictionaryArray<Int32>".into())
537 })?;
538 let values = dict
539 .values()
540 .as_any()
541 .downcast_ref::<StringArray>()
542 .ok_or_else(|| {
543 AvroError::SchemaError("Dictionary values must be Utf8".into())
544 })?;
545 if values.len() != symbols.len() {
546 return Err(AvroError::SchemaError(format!(
547 "Enum symbol length {} != dictionary size {}",
548 symbols.len(),
549 values.len()
550 )));
551 }
552 for i in 0..values.len() {
553 if values.value(i) != symbols[i].as_str() {
554 return Err(AvroError::SchemaError(format!(
555 "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
556 symbols[i],
557 values.value(i)
558 )));
559 }
560 }
561 let keys = dict.keys();
562 Encoder::Enum(EnumEncoder { keys })
563 }
564 other => {
565 return Err(AvroError::SchemaError(format!(
566 "Avro enum site requires DataType::Dictionary, found: {other:?}"
567 )));
568 }
569 },
570 FieldPlan::Union { bindings } => {
571 let arr = array
572 .as_any()
573 .downcast_ref::<UnionArray>()
574 .ok_or_else(|| AvroError::SchemaError("Expected UnionArray".into()))?;
575
576 Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
577 }
578 FieldPlan::RunEndEncoded {
579 values_nullability,
580 value_plan,
581 } => {
582 let build = |run_arr_any: &'a dyn Array| -> Result<Encoder<'a>, AvroError> {
584 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
585 return Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
586 Int16Type,
587 >::new(
588 arr,
589 FieldEncoder::make_encoder(
590 arr.values().as_ref(),
591 value_plan.as_ref(),
592 *values_nullability,
593 )?,
594 ))));
595 }
596 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
597 return Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
598 Int32Type,
599 >::new(
600 arr,
601 FieldEncoder::make_encoder(
602 arr.values().as_ref(),
603 value_plan.as_ref(),
604 *values_nullability,
605 )?,
606 ))));
607 }
608 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
609 return Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
610 Int64Type,
611 >::new(
612 arr,
613 FieldEncoder::make_encoder(
614 arr.values().as_ref(),
615 value_plan.as_ref(),
616 *values_nullability,
617 )?,
618 ))));
619 }
620 Err(AvroError::SchemaError(
621 "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64"
622 .into(),
623 ))
624 };
625 build(array)?
626 }
627 };
628 let null_state = match nullability {
630 None => NullState::NonNullable,
631 Some(null_order) => match array.nulls() {
632 Some(nulls) if array.null_count() > 0 => NullState::Nullable { nulls, null_order },
633 _ => NullState::NullableNoNulls {
634 union_value_byte: union_value_branch_byte(null_order, false),
636 },
637 },
638 };
639 Ok(Self {
640 encoder,
641 null_state,
642 })
643 }
644
645 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
646 match &self.null_state {
647 NullState::NonNullable => {}
648 NullState::NullableNoNulls { union_value_byte } => out
649 .write_all(&[*union_value_byte])
650 .map_err(|e| AvroError::IoError(format!("write union value branch: {e}"), e))?,
651 NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
652 return write_optional_index(out, true, *null_order); }
654 NullState::Nullable { null_order, .. } => {
655 write_optional_index(out, false, *null_order)?;
656 }
657 }
658 self.encoder.encode(out, idx)
659 }
660}
661
662fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
663 let nulls_first = null_order == Nullability::default();
664 if nulls_first == is_null { 0x00 } else { 0x02 }
665}
666
667#[derive(Debug, Clone)]
670enum FieldPlan {
671 Scalar,
673 Struct { bindings: Vec<FieldBinding> },
675 List {
677 items_nullability: Option<Nullability>,
678 item_plan: Box<FieldPlan>,
679 },
680 Decimal { size: Option<usize> },
682 Uuid,
684 Map {
686 values_nullability: Option<Nullability>,
687 value_plan: Box<FieldPlan>,
688 },
689 Enum { symbols: Arc<[String]> },
692 Union { bindings: Vec<FieldBinding> },
694 RunEndEncoded {
697 values_nullability: Option<Nullability>,
698 value_plan: Box<FieldPlan>,
699 },
700}
701
702#[derive(Debug, Clone)]
703struct FieldBinding {
704 arrow_index: usize,
706 nullability: Option<Nullability>,
708 plan: FieldPlan,
710}
711
712#[derive(Debug)]
714pub(crate) struct RecordEncoderBuilder<'a> {
715 avro_root: &'a AvroField,
716 arrow_schema: &'a ArrowSchema,
717 fingerprint: Option<Fingerprint>,
718}
719
720impl<'a> RecordEncoderBuilder<'a> {
721 pub(crate) fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
723 Self {
724 avro_root,
725 arrow_schema,
726 fingerprint: None,
727 }
728 }
729
730 pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
731 self.fingerprint = fingerprint;
732 self
733 }
734
735 pub(crate) fn build(self) -> Result<RecordEncoder, AvroError> {
738 let avro_root_dt = self.avro_root.data_type();
739 let Codec::Struct(root_fields) = avro_root_dt.codec() else {
740 return Err(AvroError::SchemaError(
741 "Top-level Avro schema must be a record/struct".into(),
742 ));
743 };
744 let mut columns = Vec::with_capacity(root_fields.len());
745 for root_field in root_fields.as_ref() {
746 let name = root_field.name();
747 let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
748 AvroError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
749 })?;
750 columns.push(FieldBinding {
751 arrow_index,
752 nullability: root_field.data_type().nullability(),
753 plan: FieldPlan::build(
754 root_field.data_type(),
755 self.arrow_schema.field(arrow_index),
756 )?,
757 });
758 }
759 Ok(RecordEncoder {
760 columns,
761 prefix: self.fingerprint.map(|fp| fp.make_prefix()),
762 })
763 }
764}
765
766#[derive(Debug, Clone)]
772pub(crate) struct RecordEncoder {
773 columns: Vec<FieldBinding>,
774 prefix: Option<Prefix>,
776}
777
778impl RecordEncoder {
779 fn prepare_for_batch<'a>(
780 &'a self,
781 batch: &'a RecordBatch,
782 ) -> Result<Vec<FieldEncoder<'a>>, AvroError> {
783 let arrays = batch.columns();
784 let mut out = Vec::with_capacity(self.columns.len());
785 for col_plan in self.columns.iter() {
786 let arrow_index = col_plan.arrow_index;
787 let array = arrays.get(arrow_index).ok_or_else(|| {
788 AvroError::SchemaError(format!("Column index {arrow_index} out of range"))
789 })?;
790 #[cfg(not(feature = "avro_custom_types"))]
791 let site_nullability = match &col_plan.plan {
792 FieldPlan::RunEndEncoded { .. } => None,
793 _ => col_plan.nullability,
794 };
795 #[cfg(feature = "avro_custom_types")]
796 let site_nullability = col_plan.nullability;
797 out.push(FieldEncoder::make_encoder(
798 array.as_ref(),
799 &col_plan.plan,
800 site_nullability,
801 )?);
802 }
803 Ok(out)
804 }
805
806 pub(crate) fn encode<W: Write>(
810 &self,
811 out: &mut W,
812 batch: &RecordBatch,
813 ) -> Result<(), AvroError> {
814 let mut column_encoders = self.prepare_for_batch(batch)?;
815 let n = batch.num_rows();
816 let prefix = self.prefix.as_ref().map(|p| p.as_slice());
817 for_rows_with_prefix!(n, prefix, out, |row| {
818 for enc in column_encoders.iter_mut() {
819 enc.encode(out, row)?;
820 }
821 });
822 Ok(())
823 }
824
825 pub(crate) fn encode_rows(
834 &self,
835 batch: &RecordBatch,
836 row_capacity: usize,
837 out: &mut BytesMut,
838 offsets: &mut Vec<usize>,
839 ) -> Result<(), AvroError> {
840 let out_len = out.len();
841 if offsets.first() != Some(&0) || offsets.last() != Some(&out_len) {
842 return Err(AvroError::General(
843 "encode_rows requires offsets to start with 0 and end at out.len()".to_string(),
844 ));
845 }
846 let n = batch.num_rows();
847 if n == 0 {
848 return Ok(());
849 }
850 if offsets.len().checked_add(n).is_none() {
851 return Err(AvroError::General(
852 "encode_rows cannot append offsets: too many rows".to_string(),
853 ));
854 }
855 let mut column_encoders = self.prepare_for_batch(batch)?;
856 offsets.reserve(n);
857 let prefix_bytes = self.prefix.as_ref().map(|p| p.as_slice());
858 let prefix_len = prefix_bytes.map_or(0, |p| p.len());
859 let per_row_hint = row_capacity.max(prefix_len);
860 if let Some(additional) = n
861 .checked_mul(per_row_hint)
862 .filter(|&a| out_len.checked_add(a).is_some())
863 {
864 out.reserve(additional);
865 }
866 let start_out_len = out.len();
867 let start_offsets_len = offsets.len();
868 let res = (|| -> Result<(), AvroError> {
869 let mut w = out.writer();
870 if let [enc0] = column_encoders.as_mut_slice() {
871 for_rows_with_prefix!(n, prefix_bytes, w, |row| {
872 enc0.encode(&mut w, row)?;
873 offsets.push(w.get_ref().len());
874 });
875 } else {
876 for_rows_with_prefix!(n, prefix_bytes, w, |row| {
877 for enc in column_encoders.iter_mut() {
878 enc.encode(&mut w, row)?;
879 }
880 offsets.push(w.get_ref().len());
881 });
882 }
883 Ok(())
884 })();
885 if res.is_err() {
886 out.truncate(start_out_len);
887 offsets.truncate(start_offsets_len);
888 } else {
889 debug_assert_eq!(
890 *offsets.last().unwrap(),
891 out.len(),
892 "encode_rows: offsets/out length mismatch after successful encode"
893 );
894 }
895 res
896 }
897}
898
899fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
900 fields.iter().position(|f| f.name() == name)
901}
902
903fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
904 find_struct_child_index(fields, "value")
906 .or_else(|| find_struct_child_index(fields, "values"))
907 .or_else(|| if fields.len() == 2 { Some(1) } else { None })
908}
909
910impl FieldPlan {
911 fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, AvroError> {
912 #[cfg(not(feature = "avro_custom_types"))]
913 if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() {
914 let values_nullability = avro_dt.nullability();
915 let value_site_dt: &AvroDataType = match avro_dt.codec() {
916 Codec::Union(branches, _, _) => branches
917 .iter()
918 .find(|b| !matches!(b.codec(), Codec::Null))
919 .ok_or_else(|| {
920 AvroError::SchemaError(
921 "Avro union at RunEndEncoded site has no non-null branch".into(),
922 )
923 })?,
924 _ => avro_dt,
925 };
926 return Ok(FieldPlan::RunEndEncoded {
927 values_nullability,
928 value_plan: Box::new(FieldPlan::build(value_site_dt, values_field.as_ref())?),
929 });
930 }
931 if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
932 let ext_is_uuid = {
934 #[cfg(feature = "canonical_extension_types")]
935 {
936 matches!(
937 arrow_field.extension_type_name(),
938 Some("arrow.uuid") | Some("uuid")
939 )
940 }
941 #[cfg(not(feature = "canonical_extension_types"))]
942 {
943 false
944 }
945 };
946 let md_is_uuid = arrow_field
947 .metadata()
948 .get("logicalType")
949 .map(|s| s.as_str())
950 == Some("uuid");
951 if ext_is_uuid || md_is_uuid {
952 if *len != 16 {
953 return Err(AvroError::InvalidArgument(
954 "logicalType=uuid requires FixedSizeBinary(16)".into(),
955 ));
956 }
957 return Ok(FieldPlan::Uuid);
958 }
959 }
960 match avro_dt.codec() {
961 Codec::Struct(avro_fields) => {
962 let fields = match arrow_field.data_type() {
963 DataType::Struct(struct_fields) => struct_fields,
964 other => {
965 return Err(AvroError::SchemaError(format!(
966 "Avro struct maps to Arrow Struct, found: {other:?}"
967 )));
968 }
969 };
970 let mut bindings = Vec::with_capacity(avro_fields.len());
971 for avro_field in avro_fields.iter() {
972 let name = avro_field.name().to_string();
973 let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
974 AvroError::SchemaError(format!(
975 "Struct field '{name}' not present in Arrow field '{}'",
976 arrow_field.name()
977 ))
978 })?;
979 bindings.push(FieldBinding {
980 arrow_index: idx,
981 nullability: avro_field.data_type().nullability(),
982 plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
983 });
984 }
985 Ok(FieldPlan::Struct { bindings })
986 }
987 Codec::List(items_dt) => match arrow_field.data_type() {
988 DataType::List(field_ref)
989 | DataType::LargeList(field_ref)
990 | DataType::ListView(field_ref)
991 | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
992 items_nullability: items_dt.nullability(),
993 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
994 }),
995 DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List {
996 items_nullability: items_dt.nullability(),
997 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
998 }),
999 other => Err(AvroError::SchemaError(format!(
1000 "Avro array maps to Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
1001 ))),
1002 },
1003 Codec::Map(values_dt) => {
1004 let entries_field = match arrow_field.data_type() {
1005 DataType::Map(entries, _sorted) => entries.as_ref(),
1006 other => {
1007 return Err(AvroError::SchemaError(format!(
1008 "Avro map maps to Arrow DataType::Map, found: {other:?}"
1009 )));
1010 }
1011 };
1012 let entries_struct_fields = match entries_field.data_type() {
1013 DataType::Struct(fs) => fs,
1014 other => {
1015 return Err(AvroError::SchemaError(format!(
1016 "Arrow Map entries must be Struct, found: {other:?}"
1017 )));
1018 }
1019 };
1020 let value_idx =
1021 find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
1022 AvroError::SchemaError("Map entries struct missing value field".into())
1023 })?;
1024 let value_field = entries_struct_fields[value_idx].as_ref();
1025 let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
1026 Ok(FieldPlan::Map {
1027 values_nullability: values_dt.nullability(),
1028 value_plan: Box::new(value_plan),
1029 })
1030 }
1031 Codec::Enum(symbols) => match arrow_field.data_type() {
1032 DataType::Dictionary(key_dt, value_dt) => {
1033 if **key_dt != DataType::Int32 {
1034 return Err(AvroError::SchemaError(
1035 "Avro enum requires Dictionary<Int32, Utf8>".into(),
1036 ));
1037 }
1038 if **value_dt != DataType::Utf8 {
1039 return Err(AvroError::SchemaError(
1040 "Avro enum requires Dictionary<Int32, Utf8>".into(),
1041 ));
1042 }
1043 Ok(FieldPlan::Enum {
1044 symbols: symbols.clone(),
1045 })
1046 }
1047 other => Err(AvroError::SchemaError(format!(
1048 "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
1049 ))),
1050 },
1051 Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
1053 let (ap, as_) = match arrow_field.data_type() {
1054 #[cfg(feature = "small_decimals")]
1055 DataType::Decimal32(p, s) => (*p as usize, *s as i32),
1056 #[cfg(feature = "small_decimals")]
1057 DataType::Decimal64(p, s) => (*p as usize, *s as i32),
1058 DataType::Decimal128(p, s) => (*p as usize, *s as i32),
1059 DataType::Decimal256(p, s) => (*p as usize, *s as i32),
1060 other => {
1061 return Err(AvroError::SchemaError(format!(
1062 "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
1063 arrow_field.name()
1064 )));
1065 }
1066 };
1067 let sc = scale_opt.unwrap_or(0) as i32; if ap != *precision || as_ != sc {
1069 return Err(AvroError::SchemaError(format!(
1070 "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
1071 arrow_field.name()
1072 )));
1073 }
1074 Ok(FieldPlan::Decimal {
1075 size: *fixed_size_opt,
1076 })
1077 }
1078 Codec::Interval => match arrow_field.data_type() {
1079 DataType::Interval(
1080 IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime,
1081 ) => Ok(FieldPlan::Scalar),
1082 other => Err(AvroError::SchemaError(format!(
1083 "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}"
1084 ))),
1085 },
1086 Codec::Union(avro_branches, _, UnionMode::Dense) => {
1087 let arrow_union_fields = match arrow_field.data_type() {
1088 DataType::Union(fields, UnionMode::Dense) => fields,
1089 DataType::Union(_, UnionMode::Sparse) => {
1090 return Err(AvroError::NYI(
1091 "Sparse Arrow unions are not yet supported".to_string(),
1092 ));
1093 }
1094 other => {
1095 return Err(AvroError::SchemaError(format!(
1096 "Avro union maps to Arrow Union, found: {other:?}"
1097 )));
1098 }
1099 };
1100 if avro_branches.len() != arrow_union_fields.len() {
1101 return Err(AvroError::SchemaError(format!(
1102 "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'",
1103 avro_branches.len(),
1104 arrow_union_fields.len(),
1105 arrow_field.name()
1106 )));
1107 }
1108 let bindings = avro_branches
1109 .iter()
1110 .zip(arrow_union_fields.iter())
1111 .enumerate()
1112 .map(|(i, (avro_branch, (_, arrow_child_field)))| {
1113 Ok(FieldBinding {
1114 arrow_index: i,
1115 nullability: avro_branch.nullability(),
1116 plan: FieldPlan::build(avro_branch, arrow_child_field)?,
1117 })
1118 })
1119 .collect::<Result<Vec<_>, AvroError>>()?;
1120 Ok(FieldPlan::Union { bindings })
1121 }
1122 Codec::Union(_, _, UnionMode::Sparse) => Err(AvroError::NYI(
1123 "Sparse Arrow unions are not yet supported".to_string(),
1124 )),
1125 #[cfg(feature = "avro_custom_types")]
1126 Codec::RunEndEncoded(values_dt, _width_code) => {
1127 let values_field = match arrow_field.data_type() {
1128 DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(),
1129 other => {
1130 return Err(AvroError::SchemaError(format!(
1131 "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}"
1132 )));
1133 }
1134 };
1135 Ok(FieldPlan::RunEndEncoded {
1136 values_nullability: values_dt.nullability(),
1137 value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), values_field)?),
1138 })
1139 }
1140 _ => Ok(FieldPlan::Scalar),
1141 }
1142 }
1143}
1144
1145enum Encoder<'a> {
1146 Boolean(BooleanEncoder<'a>),
1147 Int(IntEncoder<'a, Int32Type>),
1148 Long(LongEncoder<'a, Int64Type>),
1149 TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
1150 TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
1151 TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
1152 TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
1153 Date32(IntEncoder<'a, Date32Type>),
1154 Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
1155 Time32Millis(IntEncoder<'a, Time32MillisecondType>),
1156 Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
1157 DurationSeconds(LongEncoder<'a, DurationSecondType>),
1158 DurationMillis(LongEncoder<'a, DurationMillisecondType>),
1159 DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
1160 DurationNanos(LongEncoder<'a, DurationNanosecondType>),
1161 Float32(F32Encoder<'a>),
1162 Float64(F64Encoder<'a>),
1163 Binary(BinaryEncoder<'a, i32>),
1164 LargeBinary(BinaryEncoder<'a, i64>),
1165 Utf8(Utf8Encoder<'a>),
1166 Utf8Large(Utf8LargeEncoder<'a>),
1167 Utf8View(Utf8ViewEncoder<'a>),
1168 BinaryView(BinaryViewEncoder<'a>),
1169 List(Box<ListEncoder32<'a>>),
1170 LargeList(Box<ListEncoder64<'a>>),
1171 ListView(Box<ListViewEncoder32<'a>>),
1172 LargeListView(Box<ListViewEncoder64<'a>>),
1173 FixedSizeList(Box<FixedSizeListEncoder<'a>>),
1174 Struct(Box<StructEncoder<'a>>),
1175 Fixed(FixedEncoder<'a>),
1177 Uuid(UuidEncoder<'a>),
1179 IntervalMonthDayNano(DurationEncoder<'a, IntervalMonthDayNanoType>),
1181 IntervalYearMonth(DurationEncoder<'a, IntervalYearMonthType>),
1183 IntervalDayTime(DurationEncoder<'a, IntervalDayTimeType>),
1185 #[cfg(feature = "small_decimals")]
1186 Decimal32(Decimal32Encoder<'a>),
1187 #[cfg(feature = "small_decimals")]
1188 Decimal64(Decimal64Encoder<'a>),
1189 Decimal128(Decimal128Encoder<'a>),
1190 Decimal256(Decimal256Encoder<'a>),
1191 Enum(EnumEncoder<'a>),
1193 Map(Box<MapEncoder<'a>>),
1194 Union(Box<UnionEncoder<'a>>),
1195 RunEncoded16(Box<RunEncodedEncoder16<'a>>),
1197 RunEncoded32(Box<RunEncodedEncoder32<'a>>),
1198 RunEncoded64(Box<RunEncodedEncoder64<'a>>),
1199 Null,
1200}
1201
1202impl<'a> Encoder<'a> {
1203 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1205 match self {
1206 Encoder::Boolean(e) => e.encode(out, idx),
1207 Encoder::Int(e) => e.encode(out, idx),
1208 Encoder::Long(e) => e.encode(out, idx),
1209 Encoder::TimestampMicros(e) => e.encode(out, idx),
1210 Encoder::TimestampMillis(e) => e.encode(out, idx),
1211 Encoder::TimestampNanos(e) => e.encode(out, idx),
1212 Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
1213 Encoder::Date32(e) => e.encode(out, idx),
1214 Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
1215 Encoder::Time32Millis(e) => e.encode(out, idx),
1216 Encoder::Time64Micros(e) => e.encode(out, idx),
1217 Encoder::DurationSeconds(e) => e.encode(out, idx),
1218 Encoder::DurationMicros(e) => e.encode(out, idx),
1219 Encoder::DurationMillis(e) => e.encode(out, idx),
1220 Encoder::DurationNanos(e) => e.encode(out, idx),
1221 Encoder::Float32(e) => e.encode(out, idx),
1222 Encoder::Float64(e) => e.encode(out, idx),
1223 Encoder::Binary(e) => e.encode(out, idx),
1224 Encoder::LargeBinary(e) => e.encode(out, idx),
1225 Encoder::Utf8(e) => e.encode(out, idx),
1226 Encoder::Utf8Large(e) => e.encode(out, idx),
1227 Encoder::Utf8View(e) => e.encode(out, idx),
1228 Encoder::BinaryView(e) => e.encode(out, idx),
1229 Encoder::List(e) => e.encode(out, idx),
1230 Encoder::LargeList(e) => e.encode(out, idx),
1231 Encoder::ListView(e) => e.encode(out, idx),
1232 Encoder::LargeListView(e) => e.encode(out, idx),
1233 Encoder::FixedSizeList(e) => e.encode(out, idx),
1234 Encoder::Struct(e) => e.encode(out, idx),
1235 Encoder::Fixed(e) => (e).encode(out, idx),
1236 Encoder::Uuid(e) => (e).encode(out, idx),
1237 Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx),
1238 Encoder::IntervalYearMonth(e) => (e).encode(out, idx),
1239 Encoder::IntervalDayTime(e) => (e).encode(out, idx),
1240 #[cfg(feature = "small_decimals")]
1241 Encoder::Decimal32(e) => (e).encode(out, idx),
1242 #[cfg(feature = "small_decimals")]
1243 Encoder::Decimal64(e) => (e).encode(out, idx),
1244 Encoder::Decimal128(e) => (e).encode(out, idx),
1245 Encoder::Decimal256(e) => (e).encode(out, idx),
1246 Encoder::Map(e) => (e).encode(out, idx),
1247 Encoder::Enum(e) => (e).encode(out, idx),
1248 Encoder::Union(e) => (e).encode(out, idx),
1249 Encoder::RunEncoded16(e) => (e).encode(out, idx),
1250 Encoder::RunEncoded32(e) => (e).encode(out, idx),
1251 Encoder::RunEncoded64(e) => (e).encode(out, idx),
1252 Encoder::Null => Ok(()),
1253 }
1254 }
1255}
1256
1257struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
1258impl BooleanEncoder<'_> {
1259 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1260 write_bool(out, self.0.value(idx))
1261 }
1262}
1263
1264struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
1266impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
1267 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1268 write_int(out, self.0.value(idx))
1269 }
1270}
1271
1272struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
1274impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
1275 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1276 write_long(out, self.0.value(idx))
1277 }
1278}
1279
1280struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
1282impl<'a> Time32SecondsToMillisEncoder<'a> {
1283 #[inline]
1284 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1285 let secs = self.0.value(idx);
1286 let millis = secs
1287 .checked_mul(1000)
1288 .ok_or_else(|| AvroError::InvalidArgument("time32(secs) * 1000 overflowed".into()))?;
1289 write_int(out, millis)
1290 }
1291}
1292
1293struct TimestampSecondsToMillisEncoder<'a>(&'a PrimitiveArray<TimestampSecondType>);
1295impl<'a> TimestampSecondsToMillisEncoder<'a> {
1296 #[inline]
1297 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1298 let secs = self.0.value(idx);
1299 let millis = secs.checked_mul(1000).ok_or_else(|| {
1300 AvroError::InvalidArgument("timestamp(secs) * 1000 overflowed".into())
1301 })?;
1302 write_long(out, millis)
1303 }
1304}
1305
1306struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
1308impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
1309 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1310 write_len_prefixed(out, self.0.value(idx))
1311 }
1312}
1313
1314struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
1316impl BinaryViewEncoder<'_> {
1317 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1318 write_len_prefixed(out, self.0.value(idx))
1319 }
1320}
1321
1322struct Utf8ViewEncoder<'a>(&'a StringViewArray);
1324impl Utf8ViewEncoder<'_> {
1325 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1326 write_len_prefixed(out, self.0.value(idx).as_bytes())
1327 }
1328}
1329
1330struct F32Encoder<'a>(&'a arrow_array::Float32Array);
1331impl F32Encoder<'_> {
1332 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1333 let bits = self.0.value(idx).to_bits();
1335 out.write_all(&bits.to_le_bytes())?;
1336 Ok(())
1337 }
1338}
1339
1340struct F64Encoder<'a>(&'a arrow_array::Float64Array);
1341impl F64Encoder<'_> {
1342 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1343 let bits = self.0.value(idx).to_bits();
1345 out.write_all(&bits.to_le_bytes()).map_err(Into::into)
1346 }
1347}
1348
1349struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
1350
1351impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
1352 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1353 write_len_prefixed(out, self.0.value(idx).as_bytes())
1354 }
1355}
1356
1357type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
1358type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
1359
1360enum KeyKind<'a> {
1362 Utf8(&'a GenericStringArray<i32>),
1363 LargeUtf8(&'a GenericStringArray<i64>),
1364}
1365struct MapEncoder<'a> {
1366 map: &'a MapArray,
1367 keys: KeyKind<'a>,
1368 values: FieldEncoder<'a>,
1369 keys_offset: usize,
1370 values_offset: usize,
1371}
1372
1373impl<'a> MapEncoder<'a> {
1374 fn try_new(
1375 map: &'a MapArray,
1376 values_nullability: Option<Nullability>,
1377 value_plan: &FieldPlan,
1378 ) -> Result<Self, AvroError> {
1379 let keys_arr = map.keys();
1380 let keys_kind = match keys_arr.data_type() {
1381 DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
1382 DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
1383 other => {
1384 return Err(AvroError::SchemaError(format!(
1385 "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
1386 )));
1387 }
1388 };
1389 Ok(Self {
1390 map,
1391 keys: keys_kind,
1392 values: FieldEncoder::make_encoder(
1393 map.values().as_ref(),
1394 value_plan,
1395 values_nullability,
1396 )?,
1397 keys_offset: keys_arr.offset(),
1398 values_offset: map.values().offset(),
1399 })
1400 }
1401
1402 fn encode_map_entries<W, O>(
1403 out: &mut W,
1404 keys: &GenericStringArray<O>,
1405 keys_offset: usize,
1406 start: usize,
1407 end: usize,
1408 mut write_item: impl FnMut(&mut W, usize) -> Result<(), AvroError>,
1409 ) -> Result<(), AvroError>
1410 where
1411 W: Write + ?Sized,
1412 O: OffsetSizeTrait,
1413 {
1414 encode_blocked_range(out, start, end, |out, j| {
1415 let j_key = j.saturating_sub(keys_offset);
1416 write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1417 write_item(out, j)
1418 })
1419 }
1420
1421 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1422 let offsets = self.map.offsets();
1423 let start = offsets[idx] as usize;
1424 let end = offsets[idx + 1] as usize;
1425 let write_item = |out: &mut W, j: usize| {
1426 let j_val = j.saturating_sub(self.values_offset);
1427 self.values.encode(out, j_val)
1428 };
1429 match self.keys {
1430 KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1431 out,
1432 arr,
1433 self.keys_offset,
1434 start,
1435 end,
1436 write_item,
1437 ),
1438 KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1439 out,
1440 arr,
1441 self.keys_offset,
1442 start,
1443 end,
1444 write_item,
1445 ),
1446 }
1447 }
1448}
1449
1450struct EnumEncoder<'a> {
1457 keys: &'a PrimitiveArray<Int32Type>,
1458}
1459impl EnumEncoder<'_> {
1460 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), AvroError> {
1461 write_int(out, self.keys.value(row))
1462 }
1463}
1464
1465struct UnionEncoder<'a> {
1466 encoders: Vec<FieldEncoder<'a>>,
1467 array: &'a UnionArray,
1468 type_id_to_encoder_index: Vec<Option<usize>>,
1469}
1470
1471impl<'a> UnionEncoder<'a> {
1472 fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result<Self, AvroError> {
1473 let DataType::Union(fields, UnionMode::Dense) = array.data_type() else {
1474 return Err(AvroError::SchemaError("Expected Dense UnionArray".into()));
1475 };
1476 if fields.len() != field_bindings.len() {
1477 return Err(AvroError::SchemaError(format!(
1478 "Mismatched number of union branches between Arrow array ({}) and encoding plan ({})",
1479 fields.len(),
1480 field_bindings.len()
1481 )));
1482 }
1483 let max_type_id = fields.iter().map(|(tid, _)| tid).max().unwrap_or(0);
1484 let mut type_id_to_encoder_index: Vec<Option<usize>> =
1485 vec![None; (max_type_id + 1) as usize];
1486 let mut encoders = Vec::with_capacity(fields.len());
1487 for (i, (type_id, _)) in fields.iter().enumerate() {
1488 let binding = field_bindings
1489 .get(i)
1490 .ok_or_else(|| AvroError::SchemaError("Binding and field mismatch".to_string()))?;
1491 encoders.push(FieldEncoder::make_encoder(
1492 array.child(type_id).as_ref(),
1493 &binding.plan,
1494 binding.nullability,
1495 )?);
1496 type_id_to_encoder_index[type_id as usize] = Some(i);
1497 }
1498 Ok(Self {
1499 encoders,
1500 array,
1501 type_id_to_encoder_index,
1502 })
1503 }
1504
1505 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1506 let type_id = self.array.type_ids()[idx];
1511 let encoder_index = self
1512 .type_id_to_encoder_index
1513 .get(type_id as usize)
1514 .and_then(|opt| *opt)
1515 .ok_or_else(|| AvroError::SchemaError(format!("Invalid type_id {type_id}")))?;
1516 write_int(out, encoder_index as i32)?;
1517 let encoder = self.encoders.get_mut(encoder_index).ok_or_else(|| {
1518 AvroError::SchemaError(format!("Invalid encoder index {encoder_index}"))
1519 })?;
1520 encoder.encode(out, self.array.value_offset(idx))
1521 }
1522}
1523
1524struct StructEncoder<'a> {
1525 encoders: Vec<FieldEncoder<'a>>,
1526}
1527
1528impl<'a> StructEncoder<'a> {
1529 fn try_new(array: &'a StructArray, field_bindings: &[FieldBinding]) -> Result<Self, AvroError> {
1530 let mut encoders = Vec::with_capacity(field_bindings.len());
1531 for field_binding in field_bindings {
1532 let idx = field_binding.arrow_index;
1533 let column = array.columns().get(idx).ok_or_else(|| {
1534 AvroError::SchemaError(format!("Struct child index {idx} out of range"))
1535 })?;
1536 let encoder = FieldEncoder::make_encoder(
1537 column.as_ref(),
1538 &field_binding.plan,
1539 field_binding.nullability,
1540 )?;
1541 encoders.push(encoder);
1542 }
1543 Ok(Self { encoders })
1544 }
1545
1546 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1547 for encoder in self.encoders.iter_mut() {
1548 encoder.encode(out, idx)?;
1549 }
1550 Ok(())
1551 }
1552}
1553
1554fn encode_blocked_range<W: Write + ?Sized, F>(
1558 out: &mut W,
1559 start: usize,
1560 end: usize,
1561 mut write_item: F,
1562) -> Result<(), AvroError>
1563where
1564 F: FnMut(&mut W, usize) -> Result<(), AvroError>,
1565{
1566 let len = end.saturating_sub(start);
1567 if len == 0 {
1568 write_long(out, 0)?;
1570 return Ok(());
1571 }
1572 write_long(out, len as i64)?;
1574 for row in start..end {
1575 write_item(out, row)?;
1576 }
1577 write_long(out, 0)?;
1578 Ok(())
1579}
1580
1581struct ListEncoder<'a, O: OffsetSizeTrait> {
1582 list: &'a GenericListArray<O>,
1583 values: FieldEncoder<'a>,
1584 values_offset: usize,
1585}
1586
1587type ListEncoder32<'a> = ListEncoder<'a, i32>;
1588type ListEncoder64<'a> = ListEncoder<'a, i64>;
1589
1590impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
1591 fn try_new(
1592 list: &'a GenericListArray<O>,
1593 items_nullability: Option<Nullability>,
1594 item_plan: &FieldPlan,
1595 ) -> Result<Self, AvroError> {
1596 Ok(Self {
1597 list,
1598 values: FieldEncoder::make_encoder(
1599 list.values().as_ref(),
1600 item_plan,
1601 items_nullability,
1602 )?,
1603 values_offset: list.values().offset(),
1604 })
1605 }
1606
1607 fn encode_list_range<W: Write + ?Sized>(
1608 &mut self,
1609 out: &mut W,
1610 start: usize,
1611 end: usize,
1612 ) -> Result<(), AvroError> {
1613 encode_blocked_range(out, start, end, |out, row| {
1614 self.values
1615 .encode(out, row.saturating_sub(self.values_offset))
1616 })
1617 }
1618
1619 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1620 let offsets = self.list.offsets();
1621 let start = offsets[idx].to_usize().ok_or_else(|| {
1622 AvroError::InvalidArgument(format!("Error converting offset[{idx}] to usize"))
1623 })?;
1624 let end = offsets[idx + 1].to_usize().ok_or_else(|| {
1625 AvroError::InvalidArgument(format!("Error converting offset[{}] to usize", idx + 1))
1626 })?;
1627 self.encode_list_range(out, start, end)
1628 }
1629}
1630
1631struct ListViewEncoder<'a, O: OffsetSizeTrait> {
1633 list: &'a GenericListViewArray<O>,
1634 values: FieldEncoder<'a>,
1635 values_offset: usize,
1636}
1637type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
1638type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
1639
1640impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
1641 fn try_new(
1642 list: &'a GenericListViewArray<O>,
1643 items_nullability: Option<Nullability>,
1644 item_plan: &FieldPlan,
1645 ) -> Result<Self, AvroError> {
1646 Ok(Self {
1647 list,
1648 values: FieldEncoder::make_encoder(
1649 list.values().as_ref(),
1650 item_plan,
1651 items_nullability,
1652 )?,
1653 values_offset: list.values().offset(),
1654 })
1655 }
1656
1657 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1658 let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
1659 AvroError::InvalidArgument(format!("Error converting value_offset[{idx}] to usize"))
1660 })?;
1661 let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
1662 AvroError::InvalidArgument(format!("Error converting value_size[{idx}] to usize"))
1663 })?;
1664 let start = start + self.values_offset;
1665 let end = start + len;
1666 encode_blocked_range(out, start, end, |out, row| {
1667 self.values
1668 .encode(out, row.saturating_sub(self.values_offset))
1669 })
1670 }
1671}
1672
1673struct FixedSizeListEncoder<'a> {
1675 list: &'a FixedSizeListArray,
1676 values: FieldEncoder<'a>,
1677 values_offset: usize,
1678 elem_len: usize,
1679}
1680
1681impl<'a> FixedSizeListEncoder<'a> {
1682 fn try_new(
1683 list: &'a FixedSizeListArray,
1684 items_nullability: Option<Nullability>,
1685 item_plan: &FieldPlan,
1686 ) -> Result<Self, AvroError> {
1687 Ok(Self {
1688 list,
1689 values: FieldEncoder::make_encoder(
1690 list.values().as_ref(),
1691 item_plan,
1692 items_nullability,
1693 )?,
1694 values_offset: list.values().offset(),
1695 elem_len: list.value_length() as usize,
1696 })
1697 }
1698
1699 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1700 let rel = self.list.value_offset(idx) as usize;
1702 let start = self.values_offset + rel;
1703 let end = start + self.elem_len;
1704 encode_blocked_range(out, start, end, |out, row| {
1705 self.values
1706 .encode(out, row.saturating_sub(self.values_offset))
1707 })
1708 }
1709}
1710
1711struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
1714impl FixedEncoder<'_> {
1715 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1716 let v = self.0.value(idx); out.write_all(v)?;
1718 Ok(())
1719 }
1720}
1721
1722struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
1725impl UuidEncoder<'_> {
1726 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1727 let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
1728 buf[0] = 0x48;
1729 let v = self.0.value(idx);
1730 let u = Uuid::from_slice(v)
1731 .map_err(|e| AvroError::InvalidArgument(format!("Invalid UUID bytes: {e}")))?;
1732 let _ = u.hyphenated().encode_lower(&mut buf[1..]);
1733 out.write_all(&buf)?;
1734 Ok(())
1735 }
1736}
1737
1738#[derive(Copy, Clone)]
1739struct DurationParts {
1740 months: u32,
1741 days: u32,
1742 millis: u32,
1743}
1744trait IntervalToDurationParts: ArrowPrimitiveType {
1746 fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError>;
1747}
1748impl IntervalToDurationParts for IntervalMonthDayNanoType {
1749 fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
1750 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
1751 if months < 0 || days < 0 || nanos < 0 {
1752 return Err(AvroError::InvalidArgument(
1753 "Avro 'duration' cannot encode negative months/days/nanoseconds".into(),
1754 ));
1755 }
1756 if nanos % 1_000_000 != 0 {
1757 return Err(AvroError::InvalidArgument(
1758 "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000"
1759 .into(),
1760 ));
1761 }
1762 let millis = nanos / 1_000_000;
1763 if millis > u32::MAX as i64 {
1764 return Err(AvroError::InvalidArgument(
1765 "Avro 'duration' milliseconds exceed u32::MAX".into(),
1766 ));
1767 }
1768 Ok(DurationParts {
1769 months: months as u32,
1770 days: days as u32,
1771 millis: millis as u32,
1772 })
1773 }
1774}
1775impl IntervalToDurationParts for IntervalYearMonthType {
1776 fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
1777 if native < 0 {
1778 return Err(AvroError::InvalidArgument(
1779 "Avro 'duration' cannot encode negative months".into(),
1780 ));
1781 }
1782 Ok(DurationParts {
1783 months: native as u32,
1784 days: 0,
1785 millis: 0,
1786 })
1787 }
1788}
1789impl IntervalToDurationParts for IntervalDayTimeType {
1790 fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
1791 let (days, millis) = IntervalDayTimeType::to_parts(native);
1792 if days < 0 || millis < 0 {
1793 return Err(AvroError::InvalidArgument(
1794 "Avro 'duration' cannot encode negative days or milliseconds".into(),
1795 ));
1796 }
1797 Ok(DurationParts {
1798 months: 0,
1799 days: days as u32,
1800 millis: millis as u32,
1801 })
1802 }
1803}
1804
1805struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
1808impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
1809 #[inline(always)]
1810 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1811 let parts = P::duration_parts(self.0.value(idx))?;
1812 let months = parts.months.to_le_bytes();
1813 let days = parts.days.to_le_bytes();
1814 let ms = parts.millis.to_le_bytes();
1815 let buf = [
1831 months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
1832 ms[1], ms[2], ms[3],
1833 ];
1834 out.write_all(&buf)?;
1835 Ok(())
1836 }
1837}
1838
1839trait DecimalBeBytes<const N: usize> {
1842 fn value_be_bytes(&self, idx: usize) -> [u8; N];
1843}
1844#[cfg(feature = "small_decimals")]
1845impl DecimalBeBytes<4> for Decimal32Array {
1846 fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
1847 self.value(idx).to_be_bytes()
1848 }
1849}
1850#[cfg(feature = "small_decimals")]
1851impl DecimalBeBytes<8> for Decimal64Array {
1852 fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
1853 self.value(idx).to_be_bytes()
1854 }
1855}
1856impl DecimalBeBytes<16> for Decimal128Array {
1857 fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
1858 self.value(idx).to_be_bytes()
1859 }
1860}
1861impl DecimalBeBytes<32> for Decimal256Array {
1862 fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
1863 self.value(idx).to_be_bytes()
1865 }
1866}
1867
1868struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
1874 arr: &'a A,
1875 fixed_size: Option<usize>,
1876}
1877
1878impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
1879 fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
1880 Self { arr, fixed_size }
1881 }
1882
1883 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1884 let be = self.arr.value_be_bytes(idx);
1885 match self.fixed_size {
1886 Some(n) => write_sign_extended(out, &be, n),
1887 None => write_len_prefixed(out, minimal_twos_complement(&be)),
1888 }
1889 }
1890}
1891
1892#[cfg(feature = "small_decimals")]
1893type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
1894#[cfg(feature = "small_decimals")]
1895type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
1896type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
1897type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
1898
1899struct RunEncodedEncoder<'a, R: RunEndIndexType> {
1903 ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
1904 base: usize,
1905 len: usize,
1906 values: FieldEncoder<'a>,
1907 cur_run: usize,
1909 cur_end: usize,
1911}
1912
1913type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
1914type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
1915type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
1916
1917impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
1918 fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
1919 let ends = arr.run_ends();
1920 let base = ends.get_start_physical_index();
1921 let slice = ends.values();
1922 let len = ends.len();
1923 let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
1924 Self {
1925 ends_slice: slice,
1926 base,
1927 len,
1928 values,
1929 cur_run: 0,
1930 cur_end,
1931 }
1932 }
1933
1934 #[inline(always)]
1937 fn advance_to_row(&mut self, idx: usize) -> Result<(), AvroError> {
1938 if idx < self.cur_end {
1939 return Ok(());
1940 }
1941 while self.cur_run + 1 < self.len && idx >= self.cur_end {
1943 self.cur_run += 1;
1944 self.cur_end = self.ends_slice[self.base + self.cur_run].as_usize();
1945 }
1946 if idx < self.cur_end {
1947 Ok(())
1948 } else {
1949 Err(AvroError::InvalidArgument(format!(
1950 "row index {idx} out of bounds for run-ends ({} runs)",
1951 self.len
1952 )))
1953 }
1954 }
1955
1956 #[inline(always)]
1957 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1958 self.advance_to_row(idx)?;
1959 self.values.encode(out, self.cur_run)
1962 }
1963}
1964
1965#[cfg(test)]
1966mod tests {
1967 use super::*;
1968 use arrow_array::types::Int32Type;
1969 use arrow_array::{
1970 Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
1971 Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, NullArray,
1972 StringArray,
1973 };
1974 use arrow_buffer::Buffer;
1975 use arrow_schema::{DataType, Field, Fields, UnionFields};
1976
1977 fn zigzag_i64(v: i64) -> u64 {
1978 ((v << 1) ^ (v >> 63)) as u64
1979 }
1980
1981 fn varint(mut x: u64) -> Vec<u8> {
1982 let mut out = Vec::new();
1983 while (x & !0x7f) != 0 {
1984 out.push(((x & 0x7f) as u8) | 0x80);
1985 x >>= 7;
1986 }
1987 out.push((x & 0x7f) as u8);
1988 out
1989 }
1990
1991 fn avro_long_bytes(v: i64) -> Vec<u8> {
1992 varint(zigzag_i64(v))
1993 }
1994
1995 fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
1996 let mut out = avro_long_bytes(payload.len() as i64);
1997 out.extend_from_slice(payload);
1998 out
1999 }
2000
2001 fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
2002 let m = months.to_le_bytes();
2003 let d = days.to_le_bytes();
2004 let ms = millis.to_le_bytes();
2005 [
2006 m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
2007 ]
2008 }
2009
2010 fn encode_all(
2011 array: &dyn Array,
2012 plan: &FieldPlan,
2013 nullability: Option<Nullability>,
2014 ) -> Vec<u8> {
2015 let mut enc = FieldEncoder::make_encoder(array, plan, nullability).unwrap();
2016 let mut out = Vec::new();
2017 for i in 0..array.len() {
2018 enc.encode(&mut out, i).unwrap();
2019 }
2020 out
2021 }
2022
2023 fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
2024 if actual != expected {
2025 let to_hex = |b: &[u8]| {
2026 b.iter()
2027 .map(|x| format!("{:02X}", x))
2028 .collect::<Vec<_>>()
2029 .join(" ")
2030 };
2031 panic!(
2032 "mismatch\n expected: [{}]\n actual: [{}]",
2033 to_hex(expected),
2034 to_hex(actual)
2035 );
2036 }
2037 }
2038
2039 fn row_slice<'a>(buf: &'a [u8], offsets: &[usize], row: usize) -> &'a [u8] {
2040 let start = offsets[row];
2041 let end = offsets[row + 1];
2042 &buf[start..end]
2043 }
2044
2045 #[test]
2046 fn binary_encoder() {
2047 let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
2048 let arr = BinaryArray::from_vec(values);
2049 let mut expected = Vec::new();
2050 for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
2051 expected.extend(avro_len_prefixed_bytes(payload));
2052 }
2053 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2054 assert_bytes_eq(&got, &expected);
2055 }
2056
2057 #[test]
2058 fn large_binary_encoder() {
2059 let values: Vec<&[u8]> = vec![b"xyz", b""];
2060 let arr = LargeBinaryArray::from_vec(values);
2061 let mut expected = Vec::new();
2062 for payload in [b"xyz" as &[u8], b""] {
2063 expected.extend(avro_len_prefixed_bytes(payload));
2064 }
2065 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2066 assert_bytes_eq(&got, &expected);
2067 }
2068
2069 #[test]
2070 fn utf8_encoder() {
2071 let arr = StringArray::from(vec!["", "A", "BC"]);
2072 let mut expected = Vec::new();
2073 for s in ["", "A", "BC"] {
2074 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2075 }
2076 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2077 assert_bytes_eq(&got, &expected);
2078 }
2079
2080 #[test]
2081 fn large_utf8_encoder() {
2082 let arr = LargeStringArray::from(vec!["hello", ""]);
2083 let mut expected = Vec::new();
2084 for s in ["hello", ""] {
2085 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2086 }
2087 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2088 assert_bytes_eq(&got, &expected);
2089 }
2090
2091 #[test]
2092 fn list_encoder_int32() {
2093 let values = Int32Array::from(vec![1, 2, 3]);
2095 let offsets = vec![0, 2, 2, 3];
2096 let list = ListArray::new(
2097 Field::new("item", DataType::Int32, true).into(),
2098 arrow_buffer::OffsetBuffer::new(offsets.into()),
2099 Arc::new(values) as ArrayRef,
2100 None,
2101 );
2102 let mut expected = Vec::new();
2104 expected.extend(avro_long_bytes(2));
2106 expected.extend(avro_long_bytes(1));
2107 expected.extend(avro_long_bytes(2));
2108 expected.extend(avro_long_bytes(0));
2109 expected.extend(avro_long_bytes(0));
2111 expected.extend(avro_long_bytes(1));
2113 expected.extend(avro_long_bytes(3));
2114 expected.extend(avro_long_bytes(0));
2115
2116 let plan = FieldPlan::List {
2117 items_nullability: None,
2118 item_plan: Box::new(FieldPlan::Scalar),
2119 };
2120 let got = encode_all(&list, &plan, None);
2121 assert_bytes_eq(&got, &expected);
2122 }
2123
2124 #[test]
2125 fn struct_encoder_two_fields() {
2126 let a = Int32Array::from(vec![1, 2]);
2128 let b = StringArray::from(vec!["x", "y"]);
2129 let fields = Fields::from(vec![
2130 Field::new("a", DataType::Int32, true),
2131 Field::new("b", DataType::Utf8, true),
2132 ]);
2133 let struct_arr = StructArray::new(
2134 fields.clone(),
2135 vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2136 None,
2137 );
2138 let plan = FieldPlan::Struct {
2139 bindings: vec![
2140 FieldBinding {
2141 arrow_index: 0,
2142 nullability: None,
2143 plan: FieldPlan::Scalar,
2144 },
2145 FieldBinding {
2146 arrow_index: 1,
2147 nullability: None,
2148 plan: FieldPlan::Scalar,
2149 },
2150 ],
2151 };
2152 let got = encode_all(&struct_arr, &plan, None);
2153 let mut expected = Vec::new();
2155 expected.extend(avro_long_bytes(1)); expected.extend(avro_len_prefixed_bytes(b"x")); expected.extend(avro_long_bytes(2)); expected.extend(avro_len_prefixed_bytes(b"y")); assert_bytes_eq(&got, &expected);
2160 }
2161
2162 #[test]
2163 fn enum_encoder_dictionary() {
2164 let dict_values = StringArray::from(vec!["A", "B", "C"]);
2166 let keys = Int32Array::from(vec![2, 0, 1]);
2167 let dict =
2168 DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
2169 let symbols = Arc::<[String]>::from(
2170 vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
2171 );
2172 let plan = FieldPlan::Enum { symbols };
2173 let got = encode_all(&dict, &plan, None);
2174 let mut expected = Vec::new();
2175 expected.extend(avro_long_bytes(2));
2176 expected.extend(avro_long_bytes(0));
2177 expected.extend(avro_long_bytes(1));
2178 assert_bytes_eq(&got, &expected);
2179 }
2180
2181 #[test]
2182 fn decimal_bytes_and_fixed() {
2183 let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
2185 .with_precision_and_scale(20, 0)
2186 .unwrap();
2187 let plan_bytes = FieldPlan::Decimal { size: None };
2189 let got_bytes = encode_all(&dec, &plan_bytes, None);
2190 let mut expected_bytes = Vec::new();
2192 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2193 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2194 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2195 assert_bytes_eq(&got_bytes, &expected_bytes);
2196
2197 let plan_fixed = FieldPlan::Decimal { size: Some(16) };
2198 let got_fixed = encode_all(&dec, &plan_fixed, None);
2199 let mut expected_fixed = Vec::new();
2200 expected_fixed.extend_from_slice(&1i128.to_be_bytes());
2201 expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
2202 expected_fixed.extend_from_slice(&0i128.to_be_bytes());
2203 assert_bytes_eq(&got_fixed, &expected_fixed);
2204 }
2205
2206 #[test]
2207 fn decimal_bytes_256() {
2208 use arrow_buffer::i256;
2209 let dec = Decimal256Array::from(vec![
2211 i256::from_i128(1),
2212 i256::from_i128(-1),
2213 i256::from_i128(0),
2214 ])
2215 .with_precision_and_scale(76, 0)
2216 .unwrap();
2217 let plan_bytes = FieldPlan::Decimal { size: None };
2219 let got_bytes = encode_all(&dec, &plan_bytes, None);
2220 let mut expected_bytes = Vec::new();
2222 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2223 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2224 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2225 assert_bytes_eq(&got_bytes, &expected_bytes);
2226
2227 let plan_fixed = FieldPlan::Decimal { size: Some(32) };
2229 let got_fixed = encode_all(&dec, &plan_fixed, None);
2230 let mut expected_fixed = Vec::new();
2231 expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
2232 expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
2233 expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
2234 assert_bytes_eq(&got_fixed, &expected_fixed);
2235 }
2236
2237 #[cfg(feature = "small_decimals")]
2238 #[test]
2239 fn decimal_bytes_and_fixed_32() {
2240 let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
2242 .with_precision_and_scale(9, 0)
2243 .unwrap();
2244 let plan_bytes = FieldPlan::Decimal { size: None };
2246 let got_bytes = encode_all(&dec, &plan_bytes, None);
2247 let mut expected_bytes = Vec::new();
2248 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2249 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2250 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2251 assert_bytes_eq(&got_bytes, &expected_bytes);
2252 let plan_fixed = FieldPlan::Decimal { size: Some(4) };
2254 let got_fixed = encode_all(&dec, &plan_fixed, None);
2255 let mut expected_fixed = Vec::new();
2256 expected_fixed.extend_from_slice(&1i32.to_be_bytes());
2257 expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
2258 expected_fixed.extend_from_slice(&0i32.to_be_bytes());
2259 assert_bytes_eq(&got_fixed, &expected_fixed);
2260 }
2261
2262 #[cfg(feature = "small_decimals")]
2263 #[test]
2264 fn decimal_bytes_and_fixed_64() {
2265 let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
2267 .with_precision_and_scale(18, 0)
2268 .unwrap();
2269 let plan_bytes = FieldPlan::Decimal { size: None };
2271 let got_bytes = encode_all(&dec, &plan_bytes, None);
2272 let mut expected_bytes = Vec::new();
2273 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2274 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2275 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2276 assert_bytes_eq(&got_bytes, &expected_bytes);
2277 let plan_fixed = FieldPlan::Decimal { size: Some(8) };
2279 let got_fixed = encode_all(&dec, &plan_fixed, None);
2280 let mut expected_fixed = Vec::new();
2281 expected_fixed.extend_from_slice(&1i64.to_be_bytes());
2282 expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
2283 expected_fixed.extend_from_slice(&0i64.to_be_bytes());
2284 assert_bytes_eq(&got_fixed, &expected_fixed);
2285 }
2286
2287 #[test]
2288 fn float32_and_float64_encoders() {
2289 let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
2291 let mut expected32 = Vec::new();
2293 for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
2294 expected32.extend_from_slice(&v.to_bits().to_le_bytes());
2295 }
2296 let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
2297 assert_bytes_eq(&got32, &expected32);
2298 let mut expected64 = Vec::new();
2300 for v in [0.0f64, -2.25f64] {
2301 expected64.extend_from_slice(&v.to_bits().to_le_bytes());
2302 }
2303 let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
2304 assert_bytes_eq(&got64, &expected64);
2305 }
2306
2307 #[test]
2308 fn long_encoder_int64() {
2309 let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
2310 let mut expected = Vec::new();
2311 for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
2312 expected.extend(avro_long_bytes(v));
2313 }
2314 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2315 assert_bytes_eq(&got, &expected);
2316 }
2317
2318 #[test]
2319 fn fixed_encoder_plain() {
2320 let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
2322 let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
2323 let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
2324 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2325 let mut expected = Vec::new();
2326 expected.extend_from_slice(&data[0]);
2327 expected.extend_from_slice(&data[1]);
2328 assert_bytes_eq(&got, &expected);
2329 }
2330
2331 #[test]
2332 fn uuid_encoder_test() {
2333 let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
2335 let bytes = *u.as_bytes();
2336 let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
2337 let mut expected = Vec::new();
2339 expected.push(0x48);
2340 expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
2341 let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
2342 assert_bytes_eq(&got, &expected);
2343 }
2344
2345 #[test]
2346 fn uuid_encoder_error() {
2347 let arr =
2349 FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
2350 .unwrap();
2351 let plan = FieldPlan::Uuid;
2352 let mut enc = FieldEncoder::make_encoder(&arr, &plan, None).unwrap();
2353 let mut out = Vec::new();
2354 let err = enc.encode(&mut out, 0).unwrap_err();
2355 match err {
2356 AvroError::InvalidArgument(msg) => {
2357 assert!(msg.contains("Invalid UUID bytes"))
2358 }
2359 other => panic!("expected InvalidArgument, got {other:?}"),
2360 }
2361 }
2362
2363 fn test_scalar_primitive_encoding<T>(
2364 non_nullable_data: &[T::Native],
2365 nullable_data: &[Option<T::Native>],
2366 ) where
2367 T: ArrowPrimitiveType,
2368 T::Native: Into<i64> + Copy,
2369 PrimitiveArray<T>: From<Vec<<T as ArrowPrimitiveType>::Native>>,
2370 {
2371 let plan = FieldPlan::Scalar;
2372
2373 let array = PrimitiveArray::<T>::from(non_nullable_data.to_vec());
2374 let got = encode_all(&array, &plan, None);
2375
2376 let mut expected = Vec::new();
2377 for &value in non_nullable_data {
2378 expected.extend(avro_long_bytes(value.into()));
2379 }
2380 assert_bytes_eq(&got, &expected);
2381
2382 let array_nullable: PrimitiveArray<T> = nullable_data.iter().copied().collect();
2383 let got_nullable = encode_all(&array_nullable, &plan, Some(Nullability::NullFirst));
2384
2385 let mut expected_nullable = Vec::new();
2386 for &opt_value in nullable_data {
2387 match opt_value {
2388 Some(value) => {
2389 expected_nullable.extend(avro_long_bytes(1));
2391 expected_nullable.extend(avro_long_bytes(value.into()));
2392 }
2393 None => {
2394 expected_nullable.extend(avro_long_bytes(0));
2396 }
2397 }
2398 }
2399 assert_bytes_eq(&got_nullable, &expected_nullable);
2400 }
2401
2402 #[test]
2403 fn date32_encoder() {
2404 test_scalar_primitive_encoding::<Date32Type>(
2405 &[
2406 19345, 0, -1, ],
2410 &[Some(19345), None],
2411 );
2412 }
2413
2414 #[test]
2415 fn time32_millis_encoder() {
2416 test_scalar_primitive_encoding::<Time32MillisecondType>(
2417 &[
2418 0, 49530123, 86399999, ],
2422 &[None, Some(49530123)],
2423 );
2424 }
2425
2426 #[test]
2427 fn time64_micros_encoder() {
2428 test_scalar_primitive_encoding::<Time64MicrosecondType>(
2429 &[
2430 0, 86399999999, ],
2433 &[Some(86399999999), None],
2434 );
2435 }
2436
2437 #[test]
2438 fn timestamp_millis_encoder() {
2439 test_scalar_primitive_encoding::<TimestampMillisecondType>(
2440 &[
2441 1704067200000, 0, -123456789, ],
2445 &[None, Some(1704067200000)],
2446 );
2447 }
2448
2449 #[test]
2450 fn map_encoder_string_keys_int_values() {
2451 let keys = StringArray::from(vec!["k1", "k2"]);
2455 let values = Int32Array::from(vec![1, 2]);
2456 let entries_fields = Fields::from(vec![
2457 Field::new("key", DataType::Utf8, false),
2458 Field::new("value", DataType::Int32, true),
2459 ]);
2460 let entries = StructArray::new(
2461 entries_fields,
2462 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2463 None,
2464 );
2465 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
2466 let map = MapArray::new(
2467 Field::new("entries", entries.data_type().clone(), false).into(),
2468 offsets,
2469 entries,
2470 None,
2471 false,
2472 );
2473 let plan = FieldPlan::Map {
2474 values_nullability: None,
2475 value_plan: Box::new(FieldPlan::Scalar),
2476 };
2477 let got = encode_all(&map, &plan, None);
2478 let mut expected = Vec::new();
2479 expected.extend(avro_long_bytes(2));
2481 expected.extend(avro_len_prefixed_bytes(b"k1"));
2482 expected.extend(avro_long_bytes(1));
2483 expected.extend(avro_len_prefixed_bytes(b"k2"));
2484 expected.extend(avro_long_bytes(2));
2485 expected.extend(avro_long_bytes(0));
2486 expected.extend(avro_long_bytes(0));
2488 assert_bytes_eq(&got, &expected);
2489 }
2490
2491 #[test]
2492 fn union_encoder_string_int() {
2493 let strings = StringArray::from(vec!["hello", "world"]);
2494 let ints = Int32Array::from(vec![10, 20, 30]);
2495
2496 let union_fields = UnionFields::try_new(
2497 vec![0, 1],
2498 vec![
2499 Field::new("v_str", DataType::Utf8, true),
2500 Field::new("v_int", DataType::Int32, true),
2501 ],
2502 )
2503 .unwrap();
2504
2505 let type_ids = Buffer::from_slice_ref([0_i8, 1, 1, 0, 1]);
2506 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2507
2508 let union_array = UnionArray::try_new(
2509 union_fields,
2510 type_ids.into(),
2511 Some(offsets.into()),
2512 vec![Arc::new(strings), Arc::new(ints)],
2513 )
2514 .unwrap();
2515
2516 let plan = FieldPlan::Union {
2517 bindings: vec![
2518 FieldBinding {
2519 arrow_index: 0,
2520 nullability: None,
2521 plan: FieldPlan::Scalar,
2522 },
2523 FieldBinding {
2524 arrow_index: 1,
2525 nullability: None,
2526 plan: FieldPlan::Scalar,
2527 },
2528 ],
2529 };
2530
2531 let got = encode_all(&union_array, &plan, None);
2532
2533 let mut expected = Vec::new();
2534 expected.extend(avro_long_bytes(0));
2535 expected.extend(avro_len_prefixed_bytes(b"hello"));
2536 expected.extend(avro_long_bytes(1));
2537 expected.extend(avro_long_bytes(10));
2538 expected.extend(avro_long_bytes(1));
2539 expected.extend(avro_long_bytes(20));
2540 expected.extend(avro_long_bytes(0));
2541 expected.extend(avro_len_prefixed_bytes(b"world"));
2542 expected.extend(avro_long_bytes(1));
2543 expected.extend(avro_long_bytes(30));
2544
2545 assert_bytes_eq(&got, &expected);
2546 }
2547
2548 #[test]
2549 fn union_encoder_null_string_int() {
2550 let nulls = NullArray::new(1);
2551 let strings = StringArray::from(vec!["hello"]);
2552 let ints = Int32Array::from(vec![10]);
2553
2554 let union_fields = UnionFields::try_new(
2555 vec![0, 1, 2],
2556 vec![
2557 Field::new("v_null", DataType::Null, true),
2558 Field::new("v_str", DataType::Utf8, true),
2559 Field::new("v_int", DataType::Int32, true),
2560 ],
2561 )
2562 .unwrap();
2563
2564 let type_ids = Buffer::from_slice_ref([0_i8, 1, 2]);
2565 let offsets = Buffer::from_slice_ref([0_i32, 0, 0]);
2569
2570 let union_array = UnionArray::try_new(
2571 union_fields,
2572 type_ids.into(),
2573 Some(offsets.into()),
2574 vec![Arc::new(nulls), Arc::new(strings), Arc::new(ints)],
2575 )
2576 .unwrap();
2577
2578 let plan = FieldPlan::Union {
2579 bindings: vec![
2580 FieldBinding {
2581 arrow_index: 0,
2582 nullability: None,
2583 plan: FieldPlan::Scalar,
2584 },
2585 FieldBinding {
2586 arrow_index: 1,
2587 nullability: None,
2588 plan: FieldPlan::Scalar,
2589 },
2590 FieldBinding {
2591 arrow_index: 2,
2592 nullability: None,
2593 plan: FieldPlan::Scalar,
2594 },
2595 ],
2596 };
2597
2598 let got = encode_all(&union_array, &plan, None);
2599
2600 let mut expected = Vec::new();
2601 expected.extend(avro_long_bytes(0));
2602 expected.extend(avro_long_bytes(1));
2603 expected.extend(avro_len_prefixed_bytes(b"hello"));
2604 expected.extend(avro_long_bytes(2));
2605 expected.extend(avro_long_bytes(10));
2606
2607 assert_bytes_eq(&got, &expected);
2608 }
2609
2610 #[test]
2611 fn list64_encoder_int32() {
2612 let values = Int32Array::from(vec![1, 2, 3]);
2614 let offsets: Vec<i64> = vec![0, 3, 3];
2615 let list = LargeListArray::new(
2616 Field::new("item", DataType::Int32, true).into(),
2617 arrow_buffer::OffsetBuffer::new(offsets.into()),
2618 Arc::new(values) as ArrayRef,
2619 None,
2620 );
2621 let plan = FieldPlan::List {
2622 items_nullability: None,
2623 item_plan: Box::new(FieldPlan::Scalar),
2624 };
2625 let got = encode_all(&list, &plan, None);
2626 let mut expected = Vec::new();
2628 expected.extend(avro_long_bytes(3));
2629 expected.extend(avro_long_bytes(1));
2630 expected.extend(avro_long_bytes(2));
2631 expected.extend(avro_long_bytes(3));
2632 expected.extend(avro_long_bytes(0));
2633 expected.extend(avro_long_bytes(0));
2634 assert_bytes_eq(&got, &expected);
2635 }
2636
2637 #[test]
2638 fn int_encoder_test() {
2639 let ints = Int32Array::from(vec![0, -1, 2]);
2640 let mut expected_i = Vec::new();
2641 for v in [0i32, -1, 2] {
2642 expected_i.extend(avro_long_bytes(v as i64));
2643 }
2644 let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
2645 assert_bytes_eq(&got_i, &expected_i);
2646 }
2647
2648 #[test]
2649 fn boolean_encoder_test() {
2650 let bools = BooleanArray::from(vec![true, false]);
2651 let mut expected_b = Vec::new();
2652 expected_b.extend_from_slice(&[1]);
2653 expected_b.extend_from_slice(&[0]);
2654 let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
2655 assert_bytes_eq(&got_b, &expected_b);
2656 }
2657
2658 #[test]
2659 #[cfg(feature = "avro_custom_types")]
2660 fn duration_encoding_seconds() {
2661 let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
2662 let mut expected = Vec::new();
2663 for v in [0i64, -1, 2] {
2664 expected.extend_from_slice(&avro_long_bytes(v));
2665 }
2666 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2667 assert_bytes_eq(&got, &expected);
2668 }
2669
2670 #[test]
2671 #[cfg(feature = "avro_custom_types")]
2672 fn duration_encoding_milliseconds() {
2673 let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
2674 let mut expected = Vec::new();
2675 for v in [1i64, 0, -2] {
2676 expected.extend_from_slice(&avro_long_bytes(v));
2677 }
2678 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2679 assert_bytes_eq(&got, &expected);
2680 }
2681
2682 #[test]
2683 #[cfg(feature = "avro_custom_types")]
2684 fn duration_encoding_microseconds() {
2685 let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
2686 let mut expected = Vec::new();
2687 for v in [5i64, -6, 7] {
2688 expected.extend_from_slice(&avro_long_bytes(v));
2689 }
2690 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2691 assert_bytes_eq(&got, &expected);
2692 }
2693
2694 #[test]
2695 #[cfg(feature = "avro_custom_types")]
2696 fn duration_encoding_nanoseconds() {
2697 let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
2698 let mut expected = Vec::new();
2699 for v in [8i64, 9, -10] {
2700 expected.extend_from_slice(&avro_long_bytes(v));
2701 }
2702 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2703 assert_bytes_eq(&got, &expected);
2704 }
2705
2706 #[test]
2707 fn duration_encoder_year_month_happy_path() {
2708 let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
2709 let mut expected = Vec::new();
2710 for m in [0u32, 1u32, 25u32] {
2711 expected.extend_from_slice(&duration_fixed12(m, 0, 0));
2712 }
2713 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2714 assert_bytes_eq(&got, &expected);
2715 }
2716
2717 #[test]
2718 fn duration_encoder_year_month_rejects_negative() {
2719 let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
2720 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2721 let mut out = Vec::new();
2722 let err = enc.encode(&mut out, 0).unwrap_err();
2723 match err {
2724 AvroError::InvalidArgument(msg) => {
2725 assert!(msg.contains("cannot encode negative months"))
2726 }
2727 other => panic!("expected InvalidArgument, got {other:?}"),
2728 }
2729 }
2730
2731 #[test]
2732 fn duration_encoder_day_time_happy_path() {
2733 let v0 = IntervalDayTimeType::make_value(2, 500); let v1 = IntervalDayTimeType::make_value(0, 0);
2735 let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
2736 let mut expected = Vec::new();
2737 expected.extend_from_slice(&duration_fixed12(0, 2, 500));
2738 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2739 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2740 assert_bytes_eq(&got, &expected);
2741 }
2742
2743 #[test]
2744 fn duration_encoder_day_time_rejects_negative() {
2745 let bad = IntervalDayTimeType::make_value(-1, 0);
2746 let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
2747 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2748 let mut out = Vec::new();
2749 let err = enc.encode(&mut out, 0).unwrap_err();
2750 match err {
2751 AvroError::InvalidArgument(msg) => {
2752 assert!(msg.contains("cannot encode negative days"))
2753 }
2754 other => panic!("expected InvalidArgument, got {other:?}"),
2755 }
2756 }
2757
2758 #[test]
2759 fn duration_encoder_month_day_nano_happy_path() {
2760 let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
2762 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
2763 let mut expected = Vec::new();
2764 expected.extend_from_slice(&duration_fixed12(1, 2, 3));
2765 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2766 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2767 assert_bytes_eq(&got, &expected);
2768 }
2769
2770 #[test]
2771 fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
2772 let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
2773 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
2774 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2775 let mut out = Vec::new();
2776 let err = enc.encode(&mut out, 0).unwrap_err();
2777 match err {
2778 AvroError::InvalidArgument(msg) => {
2779 assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
2780 }
2781 other => panic!("expected InvalidArgument, got {other:?}"),
2782 }
2783 }
2784
2785 #[test]
2786 fn minimal_twos_complement_test() {
2787 let pos = [0x00, 0x00, 0x01];
2788 assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
2789 let neg = [0xFF, 0xFF, 0x80]; assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
2791 let zero = [0x00, 0x00, 0x00];
2792 assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
2793 }
2794
2795 #[test]
2796 fn write_sign_extend_test() {
2797 let mut out = Vec::new();
2798 write_sign_extended(&mut out, &[0x01], 4).unwrap();
2799 assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
2800 out.clear();
2801 write_sign_extended(&mut out, &[0xFF], 4).unwrap();
2802 assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
2803 out.clear();
2804 write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
2806 assert_eq!(out, vec![0xFF, 0x80]);
2807 out.clear();
2808 let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
2810 match err {
2811 AvroError::InvalidArgument(_) => {}
2812 _ => panic!("expected InvalidArgument"),
2813 }
2814 }
2815
2816 #[test]
2817 fn duration_month_day_nano_overflow_millis() {
2818 let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
2820 let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
2821 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
2822 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2823 let mut out = Vec::new();
2824 let err = enc.encode(&mut out, 0).unwrap_err();
2825 match err {
2826 AvroError::InvalidArgument(msg) => assert!(msg.contains("exceed u32::MAX")),
2827 _ => panic!("expected InvalidArgument"),
2828 }
2829 }
2830
2831 #[test]
2832 fn fieldplan_decimal_precision_scale_mismatch_errors() {
2833 use crate::codec::Codec;
2835 use std::collections::HashMap;
2836 let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
2837 let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2838 let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
2839 match err {
2840 AvroError::SchemaError(msg) => {
2841 assert!(msg.contains("Decimal precision/scale mismatch"))
2842 }
2843 _ => panic!("expected SchemaError"),
2844 }
2845 }
2846
2847 #[test]
2848 fn timestamp_micros_encoder() {
2849 test_scalar_primitive_encoding::<TimestampMicrosecondType>(
2851 &[
2852 1_704_067_200_000_000, 0, -123_456_789, ],
2856 &[None, Some(1_704_067_200_000_000)],
2857 );
2858 }
2859
2860 #[test]
2861 fn list_encoder_nullable_items_null_first() {
2862 let values = Int32Array::from(vec![Some(1), None, Some(2)]);
2864 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
2865 let list = ListArray::new(
2866 Field::new("item", DataType::Int32, true).into(),
2867 offsets,
2868 Arc::new(values) as ArrayRef,
2869 None,
2870 );
2871
2872 let plan = FieldPlan::List {
2873 items_nullability: Some(Nullability::NullFirst),
2874 item_plan: Box::new(FieldPlan::Scalar),
2875 };
2876
2877 let mut expected = Vec::new();
2880 expected.extend(avro_long_bytes(3)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(2)); expected.extend(avro_long_bytes(0)); let got = encode_all(&list, &plan, None);
2889 assert_bytes_eq(&got, &expected);
2890 }
2891
2892 #[test]
2893 fn large_list_encoder_nullable_items_null_first() {
2894 let values = Int32Array::from(vec![Some(10), None]);
2896 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
2897 let list = LargeListArray::new(
2898 Field::new("item", DataType::Int32, true).into(),
2899 offsets,
2900 Arc::new(values) as ArrayRef,
2901 None,
2902 );
2903
2904 let plan = FieldPlan::List {
2905 items_nullability: Some(Nullability::NullFirst),
2906 item_plan: Box::new(FieldPlan::Scalar),
2907 };
2908
2909 let mut expected = Vec::new();
2910 expected.extend(avro_long_bytes(2)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(10)); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(0)); let got = encode_all(&list, &plan, None);
2917 assert_bytes_eq(&got, &expected);
2918 }
2919
2920 #[test]
2921 fn map_encoder_string_keys_nullable_int_values_null_first() {
2922 let keys = StringArray::from(vec!["k1", "k2"]);
2924 let values = Int32Array::from(vec![Some(7), None]);
2925
2926 let entries_fields = Fields::from(vec![
2927 Field::new("key", DataType::Utf8, false),
2928 Field::new("value", DataType::Int32, true),
2929 ]);
2930 let entries = StructArray::new(
2931 entries_fields,
2932 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2933 None,
2934 );
2935
2936 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
2938 let map = MapArray::new(
2939 Field::new("entries", entries.data_type().clone(), false).into(),
2940 offsets,
2941 entries,
2942 None,
2943 false,
2944 );
2945
2946 let plan = FieldPlan::Map {
2947 values_nullability: Some(Nullability::NullFirst),
2948 value_plan: Box::new(FieldPlan::Scalar),
2949 };
2950
2951 let mut expected = Vec::new();
2957 expected.extend(avro_long_bytes(2)); expected.extend(avro_len_prefixed_bytes(b"k1")); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(7)); expected.extend(avro_len_prefixed_bytes(b"k2")); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(0)); let got = encode_all(&map, &plan, None);
2966 assert_bytes_eq(&got, &expected);
2967 }
2968
2969 #[test]
2970 fn time32_seconds_to_millis_encoder() {
2971 let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
2973 vec![0i32, 1, -2, 12_345].into();
2974 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2975 let mut expected = Vec::new();
2976 for secs in [0i32, 1, -2, 12_345] {
2977 let millis = (secs as i64) * 1000;
2978 expected.extend_from_slice(&avro_long_bytes(millis));
2979 }
2980 assert_bytes_eq(&got, &expected);
2981 }
2982
2983 #[test]
2984 fn time32_seconds_to_millis_overflow() {
2985 let overflow_secs: i32 = i32::MAX / 1000 + 1;
2987 let arr: PrimitiveArray<Time32SecondType> = vec![overflow_secs].into();
2988 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2989 let mut out = Vec::new();
2990 let err = enc.encode(&mut out, 0).unwrap_err();
2991 match err {
2992 AvroError::InvalidArgument(msg) => {
2993 assert!(
2994 msg.contains("overflowed") || msg.contains("overflow"),
2995 "unexpected message: {msg}"
2996 )
2997 }
2998 other => panic!("expected InvalidArgument, got {other:?}"),
2999 }
3000 }
3001
3002 #[test]
3003 fn timestamp_seconds_to_millis_encoder() {
3004 let arr: PrimitiveArray<TimestampSecondType> = vec![0i64, 1, -1, 1_234_567_890].into();
3006 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3007 let mut expected = Vec::new();
3008 for secs in [0i64, 1, -1, 1_234_567_890] {
3009 let millis = secs * 1000;
3010 expected.extend_from_slice(&avro_long_bytes(millis));
3011 }
3012 assert_bytes_eq(&got, &expected);
3013 }
3014
3015 #[test]
3016 fn timestamp_seconds_to_millis_overflow() {
3017 let overflow_secs: i64 = i64::MAX / 1000 + 1;
3019 let arr: PrimitiveArray<TimestampSecondType> = vec![overflow_secs].into();
3020 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
3021 let mut out = Vec::new();
3022 let err = enc.encode(&mut out, 0).unwrap_err();
3023 match err {
3024 AvroError::InvalidArgument(msg) => {
3025 assert!(
3026 msg.contains("overflowed") || msg.contains("overflow"),
3027 "unexpected message: {msg}"
3028 )
3029 }
3030 other => panic!("expected InvalidArgument, got {other:?}"),
3031 }
3032 }
3033
3034 #[test]
3035 fn timestamp_nanos_encoder() {
3036 let arr: PrimitiveArray<TimestampNanosecondType> = vec![0i64, 1, -1, 123].into();
3037 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3038 let mut expected = Vec::new();
3039 for ns in [0i64, 1, -1, 123] {
3040 expected.extend_from_slice(&avro_long_bytes(ns));
3041 }
3042 assert_bytes_eq(&got, &expected);
3043 }
3044
3045 #[test]
3046 fn union_encoder_string_int_nonzero_type_ids() {
3047 let strings = StringArray::from(vec!["hello", "world"]);
3048 let ints = Int32Array::from(vec![10, 20, 30]);
3049 let union_fields = UnionFields::try_new(
3050 vec![2, 5],
3051 vec![
3052 Field::new("v_str", DataType::Utf8, true),
3053 Field::new("v_int", DataType::Int32, true),
3054 ],
3055 )
3056 .unwrap();
3057 let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
3058 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
3059 let union_array = UnionArray::try_new(
3060 union_fields,
3061 type_ids.into(),
3062 Some(offsets.into()),
3063 vec![Arc::new(strings), Arc::new(ints)],
3064 )
3065 .unwrap();
3066 let plan = FieldPlan::Union {
3067 bindings: vec![
3068 FieldBinding {
3069 arrow_index: 0,
3070 nullability: None,
3071 plan: FieldPlan::Scalar,
3072 },
3073 FieldBinding {
3074 arrow_index: 1,
3075 nullability: None,
3076 plan: FieldPlan::Scalar,
3077 },
3078 ],
3079 };
3080 let got = encode_all(&union_array, &plan, None);
3081 let mut expected = Vec::new();
3082 expected.extend(avro_long_bytes(0));
3083 expected.extend(avro_len_prefixed_bytes(b"hello"));
3084 expected.extend(avro_long_bytes(1));
3085 expected.extend(avro_long_bytes(10));
3086 expected.extend(avro_long_bytes(1));
3087 expected.extend(avro_long_bytes(20));
3088 expected.extend(avro_long_bytes(0));
3089 expected.extend(avro_len_prefixed_bytes(b"world"));
3090 expected.extend(avro_long_bytes(1));
3091 expected.extend(avro_long_bytes(30));
3092 assert_bytes_eq(&got, &expected);
3093 }
3094
3095 #[test]
3096 fn nullable_state_with_null_buffer_and_zero_nulls() {
3097 let values = vec![1i32, 2, 3];
3098 let arr = Int32Array::from_iter_values_with_nulls(values, Some(NullBuffer::new_valid(3)));
3099 assert_eq!(arr.null_count(), 0);
3100 assert!(arr.nulls().is_some());
3101 let plan = FieldPlan::Scalar;
3102 let enc = FieldEncoder::make_encoder(&arr, &plan, Some(Nullability::NullFirst)).unwrap();
3103 match enc.null_state {
3104 NullState::NullableNoNulls { union_value_byte } => {
3105 assert_eq!(
3106 union_value_byte,
3107 union_value_branch_byte(Nullability::NullFirst, false)
3108 );
3109 }
3110 other => panic!("expected NullableNoNulls, got {other:?}"),
3111 }
3112 }
3113
3114 #[test]
3115 fn encode_rows_single_column_int32() {
3116 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3117 let arr = Int32Array::from(vec![1, 2, 3]);
3118 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3119 let encoder = RecordEncoder {
3120 columns: vec![FieldBinding {
3121 arrow_index: 0,
3122 nullability: None,
3123 plan: FieldPlan::Scalar,
3124 }],
3125 prefix: None,
3126 };
3127 let mut out = BytesMut::new();
3128 let mut offsets: Vec<usize> = vec![0];
3129 encoder
3130 .encode_rows(&batch, 16, &mut out, &mut offsets)
3131 .unwrap();
3132 assert_eq!(offsets.len(), 4);
3133 assert_eq!(*offsets.last().unwrap(), out.len());
3134 assert_bytes_eq(row_slice(&out, &offsets, 0), &avro_long_bytes(1));
3135 assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(2));
3136 assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(3));
3137 }
3138
3139 #[test]
3140 fn encode_rows_multiple_columns() {
3141 let schema = ArrowSchema::new(vec![
3142 Field::new("a", DataType::Int32, false),
3143 Field::new("b", DataType::Utf8, false),
3144 ]);
3145 let int_arr = Int32Array::from(vec![10, 20]);
3146 let str_arr = StringArray::from(vec!["hello", "world"]);
3147 let batch = RecordBatch::try_new(
3148 Arc::new(schema.clone()),
3149 vec![Arc::new(int_arr), Arc::new(str_arr)],
3150 )
3151 .unwrap();
3152 let encoder = RecordEncoder {
3153 columns: vec![
3154 FieldBinding {
3155 arrow_index: 0,
3156 nullability: None,
3157 plan: FieldPlan::Scalar,
3158 },
3159 FieldBinding {
3160 arrow_index: 1,
3161 nullability: None,
3162 plan: FieldPlan::Scalar,
3163 },
3164 ],
3165 prefix: None,
3166 };
3167 let mut out = BytesMut::new();
3168 let mut offsets: Vec<usize> = vec![0];
3169 encoder
3170 .encode_rows(&batch, 32, &mut out, &mut offsets)
3171 .unwrap();
3172 assert_eq!(offsets.len(), 3);
3173 assert_eq!(*offsets.last().unwrap(), out.len());
3174 let mut expected_row0 = Vec::new();
3175 expected_row0.extend(avro_long_bytes(10));
3176 expected_row0.extend(avro_len_prefixed_bytes(b"hello"));
3177 assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3178 let mut expected_row1 = Vec::new();
3179 expected_row1.extend(avro_long_bytes(20));
3180 expected_row1.extend(avro_len_prefixed_bytes(b"world"));
3181 assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3182 }
3183
3184 #[test]
3185 fn encode_rows_with_prefix() {
3186 use crate::codec::AvroFieldBuilder;
3187 use crate::schema::AvroSchema;
3188 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3189 let arr = Int32Array::from(vec![42]);
3190 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3191 let avro_schema = AvroSchema::try_from(&schema).unwrap();
3192 let fingerprint = avro_schema
3193 .fingerprint(crate::schema::FingerprintAlgorithm::Rabin)
3194 .unwrap();
3195 let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
3196 .build()
3197 .unwrap();
3198 let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
3199 .with_fingerprint(Some(fingerprint))
3200 .build()
3201 .unwrap();
3202 let mut out = BytesMut::new();
3203 let mut offsets: Vec<usize> = vec![0];
3204 encoder
3205 .encode_rows(&batch, 32, &mut out, &mut offsets)
3206 .unwrap();
3207 assert_eq!(offsets.len(), 2);
3208 let row0 = row_slice(&out, &offsets, 0);
3209 assert!(row0.len() > 10, "Row should contain prefix + encoded value");
3210 assert_eq!(row0[0], 0xC3);
3211 assert_eq!(row0[1], 0x01);
3212 }
3213
3214 #[test]
3215 fn encode_rows_empty_batch() {
3216 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3217 let arr = Int32Array::from(Vec::<i32>::new());
3218 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3219 let encoder = RecordEncoder {
3220 columns: vec![FieldBinding {
3221 arrow_index: 0,
3222 nullability: None,
3223 plan: FieldPlan::Scalar,
3224 }],
3225 prefix: None,
3226 };
3227 let mut out = BytesMut::new();
3228 let mut offsets: Vec<usize> = vec![0];
3229 encoder
3230 .encode_rows(&batch, 16, &mut out, &mut offsets)
3231 .unwrap();
3232 assert_eq!(offsets, vec![0]);
3233 assert!(out.is_empty());
3234 }
3235
3236 #[test]
3237 fn encode_rows_matches_encode_output() {
3238 let schema = ArrowSchema::new(vec![
3239 Field::new("a", DataType::Int64, false),
3240 Field::new("b", DataType::Float64, false),
3241 ]);
3242 let int_arr = Int64Array::from(vec![100i64, 200, 300]);
3243 let float_arr = Float64Array::from(vec![1.5, 2.5, 3.5]);
3244 let batch = RecordBatch::try_new(
3245 Arc::new(schema.clone()),
3246 vec![Arc::new(int_arr), Arc::new(float_arr)],
3247 )
3248 .unwrap();
3249 let encoder = RecordEncoder {
3250 columns: vec![
3251 FieldBinding {
3252 arrow_index: 0,
3253 nullability: None,
3254 plan: FieldPlan::Scalar,
3255 },
3256 FieldBinding {
3257 arrow_index: 1,
3258 nullability: None,
3259 plan: FieldPlan::Scalar,
3260 },
3261 ],
3262 prefix: None,
3263 };
3264 let mut stream_buf = Vec::new();
3265 encoder.encode(&mut stream_buf, &batch).unwrap();
3266 let mut out = BytesMut::new();
3267 let mut offsets: Vec<usize> = vec![0];
3268 encoder
3269 .encode_rows(&batch, 32, &mut out, &mut offsets)
3270 .unwrap();
3271 assert_eq!(offsets.len(), 1 + batch.num_rows());
3272 assert_bytes_eq(&out[..], &stream_buf);
3273 }
3274
3275 #[test]
3276 fn encode_rows_appends_to_existing_buffer() {
3277 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3278 let arr = Int32Array::from(vec![5, 6]);
3279 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3280 let encoder = RecordEncoder {
3281 columns: vec![FieldBinding {
3282 arrow_index: 0,
3283 nullability: None,
3284 plan: FieldPlan::Scalar,
3285 }],
3286 prefix: None,
3287 };
3288 let mut out = BytesMut::new();
3289 out.extend_from_slice(&[0xAA, 0xBB]);
3290 let mut offsets: Vec<usize> = vec![0, out.len()];
3291 encoder
3292 .encode_rows(&batch, 16, &mut out, &mut offsets)
3293 .unwrap();
3294 assert_eq!(offsets.len(), 4);
3295 assert_eq!(*offsets.last().unwrap(), out.len());
3296 assert_bytes_eq(row_slice(&out, &offsets, 0), &[0xAA, 0xBB]);
3297 assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(5));
3298 assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(6));
3299 }
3300
3301 #[test]
3302 fn encode_rows_nullable_column() {
3303 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, true)]);
3304 let arr = Int32Array::from(vec![Some(1), None, Some(3)]);
3305 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3306 let encoder = RecordEncoder {
3307 columns: vec![FieldBinding {
3308 arrow_index: 0,
3309 nullability: Some(Nullability::NullFirst),
3310 plan: FieldPlan::Scalar,
3311 }],
3312 prefix: None,
3313 };
3314 let mut out = BytesMut::new();
3315 let mut offsets: Vec<usize> = vec![0];
3316 encoder
3317 .encode_rows(&batch, 16, &mut out, &mut offsets)
3318 .unwrap();
3319 assert_eq!(offsets.len(), 4);
3320 let mut expected_row0 = Vec::new();
3321 expected_row0.extend(avro_long_bytes(1)); expected_row0.extend(avro_long_bytes(1)); assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3324 let expected_row1 = avro_long_bytes(0); assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3326 let mut expected_row2 = Vec::new();
3327 expected_row2.extend(avro_long_bytes(1)); expected_row2.extend(avro_long_bytes(3)); assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2);
3330 }
3331
3332 #[test]
3333 fn encode_prefix_write_error() {
3334 use crate::codec::AvroFieldBuilder;
3335 use crate::schema::{AvroSchema, FingerprintAlgorithm};
3336 use std::io;
3337
3338 struct FailWriter {
3339 failed: bool,
3340 }
3341
3342 impl io::Write for FailWriter {
3343 fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
3344 if !self.failed {
3345 self.failed = true;
3346 Err(io::Error::other("fail write"))
3347 } else {
3348 Ok(0)
3349 }
3350 }
3351
3352 fn flush(&mut self) -> io::Result<()> {
3353 Ok(())
3354 }
3355 }
3356
3357 let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3358 let arr = Int32Array::from(vec![42]);
3359 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3360 let avro_schema = AvroSchema::try_from(&schema).unwrap();
3361 let fingerprint = avro_schema
3362 .fingerprint(FingerprintAlgorithm::Rabin)
3363 .unwrap();
3364 let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
3365 .build()
3366 .unwrap();
3367 let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
3368 .with_fingerprint(Some(fingerprint))
3369 .build()
3370 .unwrap();
3371
3372 let mut writer = FailWriter { failed: false };
3373 let err = encoder.encode(&mut writer, &batch).unwrap_err();
3374 let msg = format!("{err}");
3375 assert!(msg.contains("write prefix"), "unexpected error: {msg}");
3376 }
3377}