1use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::schema::{Fingerprint, Nullability, Prefix};
22use arrow_array::cast::AsArray;
23use arrow_array::types::{
24 ArrowPrimitiveType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType,
25 DurationSecondType, Float32Type, Float64Type, Int32Type, Int64Type, IntervalDayTimeType,
26 IntervalMonthDayNanoType, IntervalYearMonthType, TimestampMicrosecondType,
27};
28use arrow_array::{
29 Array, Decimal128Array, Decimal256Array, DictionaryArray, FixedSizeBinaryArray,
30 GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray, ListArray, MapArray,
31 OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray,
32};
33#[cfg(feature = "small_decimals")]
34use arrow_array::{Decimal32Array, Decimal64Array};
35use arrow_buffer::NullBuffer;
36use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit};
37use serde::Serialize;
38use std::io::Write;
39use std::sync::Arc;
40use uuid::Uuid;
41
42#[inline]
46pub fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), ArrowError> {
47 let mut zz = ((value << 1) ^ (value >> 63)) as u64;
48 let mut buf = [0u8; 10];
50 let mut i = 0;
51 while (zz & !0x7F) != 0 {
52 buf[i] = ((zz & 0x7F) as u8) | 0x80;
53 i += 1;
54 zz >>= 7;
55 }
56 buf[i] = (zz & 0x7F) as u8;
57 i += 1;
58 out.write_all(&buf[..i])
59 .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e))
60}
61
62#[inline]
63fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), ArrowError> {
64 write_long(out, value as i64)
65}
66
67#[inline]
68fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
69 write_long(out, bytes.len() as i64)?;
70 out.write_all(bytes)
71 .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
72}
73
74#[inline]
75fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), ArrowError> {
76 out.write_all(&[if v { 1 } else { 0 }])
77 .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
78}
79
80#[inline]
89fn minimal_twos_complement(be: &[u8]) -> &[u8] {
90 if be.is_empty() {
91 return be;
92 }
93 let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
94 let mut k = 0usize;
95 while k < be.len() && be[k] == sign_byte {
96 k += 1;
97 }
98 if k == 0 {
99 return be;
100 }
101 if k == be.len() {
102 return &be[be.len() - 1..];
103 }
104 let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
105 k
106 } else {
107 k - 1
108 };
109 &be[drop..]
110}
111
112#[inline]
124fn write_sign_extended<W: Write + ?Sized>(
125 out: &mut W,
126 src_be: &[u8],
127 n: usize,
128) -> Result<(), ArrowError> {
129 let len = src_be.len();
130 if len == n {
131 return out
132 .write_all(src_be)
133 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
134 }
135 let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
136 0xFF
137 } else {
138 0x00
139 };
140 if len > n {
141 let extra = len - n;
142 if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
143 return Ok(());
144 }
145 if src_be[..extra].iter().any(|&b| b != sign_byte)
148 || ((src_be[extra] ^ sign_byte) & 0x80) != 0
149 {
150 return Err(ArrowError::InvalidArgumentError(format!(
151 "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
152 )));
153 }
154 return out
155 .write_all(&src_be[extra..])
156 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
157 }
158 let pad_len = n - len;
160 const ZPAD: [u8; 64] = [0x00; 64];
162 const FPAD: [u8; 64] = [0xFF; 64];
163 let pad = if sign_byte == 0x00 {
164 &ZPAD[..]
165 } else {
166 &FPAD[..]
167 };
168 let mut rem = pad_len;
171 while rem >= pad.len() {
172 out.write_all(pad)
173 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
174 rem -= pad.len();
175 }
176 if rem > 0 {
177 out.write_all(&pad[..rem])
178 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
179 }
180 out.write_all(src_be)
181 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))
182}
183
184fn write_optional_index<W: Write + ?Sized>(
190 out: &mut W,
191 is_null: bool,
192 null_order: Nullability,
193) -> Result<(), ArrowError> {
194 let byte = union_value_branch_byte(null_order, is_null);
195 out.write_all(&[byte])
196 .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), e))
197}
198
199#[derive(Debug, Clone)]
200enum NullState {
201 NonNullable,
202 NullableNoNulls {
203 union_value_byte: u8,
204 },
205 Nullable {
206 nulls: NullBuffer,
207 null_order: Nullability,
208 },
209}
210
211pub struct FieldEncoder<'a> {
215 encoder: Encoder<'a>,
216 null_state: NullState,
217}
218
219impl<'a> FieldEncoder<'a> {
220 fn make_encoder(
221 array: &'a dyn Array,
222 field: &Field,
223 plan: &FieldPlan,
224 nullability: Option<Nullability>,
225 ) -> Result<Self, ArrowError> {
226 let encoder = match plan {
227 FieldPlan::Scalar => match array.data_type() {
228 DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
229 DataType::Utf8 => {
230 Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
231 }
232 DataType::LargeUtf8 => {
233 Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
234 }
235 DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
236 DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
237 DataType::Float32 => {
238 Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
239 }
240 DataType::Float64 => {
241 Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
242 }
243 DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
244 DataType::LargeBinary => {
245 Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
246 }
247 DataType::FixedSizeBinary(len) => {
248 let arr = array
249 .as_any()
250 .downcast_ref::<FixedSizeBinaryArray>()
251 .ok_or_else(|| {
252 ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
253 })?;
254 Encoder::Fixed(FixedEncoder(arr))
255 }
256 DataType::Timestamp(TimeUnit::Microsecond, _) => Encoder::Timestamp(LongEncoder(
257 array.as_primitive::<TimestampMicrosecondType>(),
258 )),
259 DataType::Interval(unit) => match unit {
260 IntervalUnit::MonthDayNano => {
261 Encoder::IntervalMonthDayNano(DurationEncoder(
262 array.as_primitive::<IntervalMonthDayNanoType>(),
263 ))
264 }
265 IntervalUnit::YearMonth => {
266 Encoder::IntervalYearMonth(DurationEncoder(
267 array.as_primitive::<IntervalYearMonthType>(),
268 ))
269 }
270 IntervalUnit::DayTime => Encoder::IntervalDayTime(DurationEncoder(
271 array.as_primitive::<IntervalDayTimeType>(),
272 )),
273 }
274 DataType::Duration(tu) => {
275 match tu {
276 TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
277 array.as_primitive::<DurationSecondType>(),
278 )),
279 TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
280 array.as_primitive::<DurationMillisecondType>(),
281 )),
282 TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
283 array.as_primitive::<DurationMicrosecondType>(),
284 )),
285 TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
286 array.as_primitive::<DurationNanosecondType>(),
287 )),
288 }
289 }
290 other => {
291 return Err(ArrowError::NotYetImplemented(format!(
292 "Avro scalar type not yet supported: {other:?}"
293 )));
294 }
295 },
296 FieldPlan::Struct { encoders } => {
297 let arr = array
298 .as_any()
299 .downcast_ref::<StructArray>()
300 .ok_or_else(|| ArrowError::SchemaError("Expected StructArray".into()))?;
301 Encoder::Struct(Box::new(StructEncoder::try_new(arr, encoders)?))
302 }
303 FieldPlan::List {
304 items_nullability,
305 item_plan,
306 } => match array.data_type() {
307 DataType::List(_) => {
308 let arr = array
309 .as_any()
310 .downcast_ref::<ListArray>()
311 .ok_or_else(|| ArrowError::SchemaError("Expected ListArray".into()))?;
312 Encoder::List(Box::new(ListEncoder32::try_new(
313 arr,
314 *items_nullability,
315 item_plan.as_ref(),
316 )?))
317 }
318 DataType::LargeList(_) => {
319 let arr = array
320 .as_any()
321 .downcast_ref::<LargeListArray>()
322 .ok_or_else(|| ArrowError::SchemaError("Expected LargeListArray".into()))?;
323 Encoder::LargeList(Box::new(ListEncoder64::try_new(
324 arr,
325 *items_nullability,
326 item_plan.as_ref(),
327 )?))
328 }
329 other => {
330 return Err(ArrowError::SchemaError(format!(
331 "Avro array site requires Arrow List/LargeList, found: {other:?}"
332 )))
333 }
334 },
335 FieldPlan::Decimal {size} => match array.data_type() {
336 #[cfg(feature = "small_decimals")]
337 DataType::Decimal32(_,_) => {
338 let arr = array
339 .as_any()
340 .downcast_ref::<Decimal32Array>()
341 .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?;
342 Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
343 }
344 #[cfg(feature = "small_decimals")]
345 DataType::Decimal64(_,_) => {
346 let arr = array
347 .as_any()
348 .downcast_ref::<Decimal64Array>()
349 .ok_or_else(|| ArrowError::SchemaError("Expected Decimal64Array".into()))?;
350 Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
351 }
352 DataType::Decimal128(_,_) => {
353 let arr = array
354 .as_any()
355 .downcast_ref::<Decimal128Array>()
356 .ok_or_else(|| ArrowError::SchemaError("Expected Decimal128Array".into()))?;
357 Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
358 }
359 DataType::Decimal256(_,_) => {
360 let arr = array
361 .as_any()
362 .downcast_ref::<Decimal256Array>()
363 .ok_or_else(|| ArrowError::SchemaError("Expected Decimal256Array".into()))?;
364 Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
365 }
366 other => {
367 return Err(ArrowError::SchemaError(format!(
368 "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
369 )))
370 }
371 },
372 FieldPlan::Uuid => {
373 let arr = array
374 .as_any()
375 .downcast_ref::<FixedSizeBinaryArray>()
376 .ok_or_else(|| ArrowError::SchemaError("Expected FixedSizeBinaryArray".into()))?;
377 Encoder::Uuid(UuidEncoder(arr))
378 }
379 FieldPlan::Map { values_nullability,
380 value_plan } => {
381 let arr = array
382 .as_any()
383 .downcast_ref::<MapArray>()
384 .ok_or_else(|| ArrowError::SchemaError("Expected MapArray".into()))?;
385 Encoder::Map(Box::new(MapEncoder::try_new(arr, *values_nullability, value_plan.as_ref())?))
386 }
387 FieldPlan::Enum { symbols} => match array.data_type() {
388 DataType::Dictionary(key_dt, value_dt) => {
389 if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
390 return Err(ArrowError::SchemaError(
391 "Avro enum requires Dictionary<Int32, Utf8>".into(),
392 ));
393 }
394 let dict = array
395 .as_any()
396 .downcast_ref::<DictionaryArray<Int32Type>>()
397 .ok_or_else(|| {
398 ArrowError::SchemaError("Expected DictionaryArray<Int32>".into())
399 })?;
400
401 let values = dict
402 .values()
403 .as_any()
404 .downcast_ref::<StringArray>()
405 .ok_or_else(|| {
406 ArrowError::SchemaError("Dictionary values must be Utf8".into())
407 })?;
408 if values.len() != symbols.len() {
409 return Err(ArrowError::SchemaError(format!(
410 "Enum symbol length {} != dictionary size {}",
411 symbols.len(),
412 values.len()
413 )));
414 }
415 for i in 0..values.len() {
416 if values.value(i) != symbols[i].as_str() {
417 return Err(ArrowError::SchemaError(format!(
418 "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
419 symbols[i],
420 values.value(i)
421 )));
422 }
423 }
424 let keys = dict.keys();
425 Encoder::Enum(EnumEncoder { keys })
426 }
427 other => {
428 return Err(ArrowError::SchemaError(format!(
429 "Avro enum site requires DataType::Dictionary, found: {other:?}"
430 )))
431 }
432 }
433 other => {
434 return Err(ArrowError::NotYetImplemented(format!(
435 "Avro writer: {other:?} not yet supported",
436 )));
437 }
438 };
439 let null_state = match (nullability, array.null_count() > 0) {
441 (None, false) => NullState::NonNullable,
442 (None, true) => {
443 return Err(ArrowError::InvalidArgumentError(format!(
444 "Avro site '{}' is non-nullable, but array contains nulls",
445 field.name()
446 )));
447 }
448 (Some(order), false) => {
449 NullState::NullableNoNulls {
451 union_value_byte: union_value_branch_byte(order, false),
452 }
453 }
454 (Some(null_order), true) => {
455 let Some(nulls) = array.nulls().cloned() else {
456 return Err(ArrowError::InvalidArgumentError(format!(
457 "Array for Avro site '{}' reports nulls but has no null buffer",
458 field.name()
459 )));
460 };
461 NullState::Nullable { nulls, null_order }
462 }
463 };
464 Ok(Self {
465 encoder,
466 null_state,
467 })
468 }
469
470 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
471 match &self.null_state {
472 NullState::NonNullable => {}
473 NullState::NullableNoNulls { union_value_byte } => out
474 .write_all(&[*union_value_byte])
475 .map_err(|e| ArrowError::IoError(format!("write union value branch: {e}"), e))?,
476 NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
477 return write_optional_index(out, true, *null_order); }
479 NullState::Nullable { null_order, .. } => {
480 write_optional_index(out, false, *null_order)?;
481 }
482 }
483 self.encoder.encode(out, idx)
484 }
485}
486
487fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
488 let nulls_first = null_order == Nullability::default();
489 if nulls_first == is_null {
490 0x00
491 } else {
492 0x02
493 }
494}
495
496#[derive(Debug, Clone)]
499enum FieldPlan {
500 Scalar,
502 Struct { encoders: Vec<FieldBinding> },
504 List {
506 items_nullability: Option<Nullability>,
507 item_plan: Box<FieldPlan>,
508 },
509 Decimal { size: Option<usize> },
511 Uuid,
513 Map {
515 values_nullability: Option<Nullability>,
516 value_plan: Box<FieldPlan>,
517 },
518 Enum { symbols: Arc<[String]> },
521}
522
523#[derive(Debug, Clone)]
524struct FieldBinding {
525 arrow_index: usize,
527 nullability: Option<Nullability>,
529 plan: FieldPlan,
531}
532
533#[derive(Debug)]
535pub struct RecordEncoderBuilder<'a> {
536 avro_root: &'a AvroField,
537 arrow_schema: &'a ArrowSchema,
538 fingerprint: Option<Fingerprint>,
539}
540
541impl<'a> RecordEncoderBuilder<'a> {
542 pub fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
544 Self {
545 avro_root,
546 arrow_schema,
547 fingerprint: None,
548 }
549 }
550
551 pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
552 self.fingerprint = fingerprint;
553 self
554 }
555
556 pub fn build(self) -> Result<RecordEncoder, ArrowError> {
559 let avro_root_dt = self.avro_root.data_type();
560 let Codec::Struct(root_fields) = avro_root_dt.codec() else {
561 return Err(ArrowError::SchemaError(
562 "Top-level Avro schema must be a record/struct".into(),
563 ));
564 };
565 let mut columns = Vec::with_capacity(root_fields.len());
566 for root_field in root_fields.as_ref() {
567 let name = root_field.name();
568 let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
569 ArrowError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
570 })?;
571 columns.push(FieldBinding {
572 arrow_index,
573 nullability: root_field.data_type().nullability(),
574 plan: FieldPlan::build(
575 root_field.data_type(),
576 self.arrow_schema.field(arrow_index),
577 )?,
578 });
579 }
580 Ok(RecordEncoder {
581 columns,
582 prefix: self.fingerprint.map(|fp| fp.make_prefix()),
583 })
584 }
585}
586
587#[derive(Debug, Clone)]
593pub struct RecordEncoder {
594 columns: Vec<FieldBinding>,
595 prefix: Option<Prefix>,
597}
598
599impl RecordEncoder {
600 fn prepare_for_batch<'a>(
601 &'a self,
602 batch: &'a RecordBatch,
603 ) -> Result<Vec<FieldEncoder<'a>>, ArrowError> {
604 let schema_binding = batch.schema();
605 let fields = schema_binding.fields();
606 let arrays = batch.columns();
607 let mut out = Vec::with_capacity(self.columns.len());
608 for col_plan in self.columns.iter() {
609 let arrow_index = col_plan.arrow_index;
610 let array = arrays.get(arrow_index).ok_or_else(|| {
611 ArrowError::SchemaError(format!("Column index {arrow_index} out of range"))
612 })?;
613 let field = fields[arrow_index].as_ref();
614 let encoder = prepare_value_site_encoder(
615 array.as_ref(),
616 field,
617 col_plan.nullability,
618 &col_plan.plan,
619 )?;
620 out.push(encoder);
621 }
622 Ok(out)
623 }
624
625 pub fn encode<W: Write>(&self, out: &mut W, batch: &RecordBatch) -> Result<(), ArrowError> {
629 let mut column_encoders = self.prepare_for_batch(batch)?;
630 let n = batch.num_rows();
631 match self.prefix {
632 Some(prefix) => {
633 for row in 0..n {
634 out.write_all(prefix.as_slice())
635 .map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?;
636 for enc in column_encoders.iter_mut() {
637 enc.encode(out, row)?;
638 }
639 }
640 }
641 None => {
642 for row in 0..n {
643 for enc in column_encoders.iter_mut() {
644 enc.encode(out, row)?;
645 }
646 }
647 }
648 }
649 Ok(())
650 }
651}
652
653fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
654 fields.iter().position(|f| f.name() == name)
655}
656
657fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
658 find_struct_child_index(fields, "value")
660 .or_else(|| find_struct_child_index(fields, "values"))
661 .or_else(|| if fields.len() == 2 { Some(1) } else { None })
662}
663
664impl FieldPlan {
665 fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, ArrowError> {
666 if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
667 let ext_is_uuid = {
669 #[cfg(feature = "canonical_extension_types")]
670 {
671 matches!(
672 arrow_field.extension_type_name(),
673 Some("arrow.uuid") | Some("uuid")
674 )
675 }
676 #[cfg(not(feature = "canonical_extension_types"))]
677 {
678 false
679 }
680 };
681 let md_is_uuid = arrow_field
682 .metadata()
683 .get("logicalType")
684 .map(|s| s.as_str())
685 == Some("uuid");
686 if ext_is_uuid || md_is_uuid {
687 if *len != 16 {
688 return Err(ArrowError::InvalidArgumentError(
689 "logicalType=uuid requires FixedSizeBinary(16)".into(),
690 ));
691 }
692 return Ok(FieldPlan::Uuid);
693 }
694 }
695 match avro_dt.codec() {
696 Codec::Struct(avro_fields) => {
697 let fields = match arrow_field.data_type() {
698 DataType::Struct(struct_fields) => struct_fields,
699 other => {
700 return Err(ArrowError::SchemaError(format!(
701 "Avro struct maps to Arrow Struct, found: {other:?}"
702 )))
703 }
704 };
705 let mut encoders = Vec::with_capacity(avro_fields.len());
706 for avro_field in avro_fields.iter() {
707 let name = avro_field.name().to_string();
708 let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
709 ArrowError::SchemaError(format!(
710 "Struct field '{name}' not present in Arrow field '{}'",
711 arrow_field.name()
712 ))
713 })?;
714 encoders.push(FieldBinding {
715 arrow_index: idx,
716 nullability: avro_field.data_type().nullability(),
717 plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
718 });
719 }
720 Ok(FieldPlan::Struct { encoders })
721 }
722 Codec::List(items_dt) => match arrow_field.data_type() {
723 DataType::List(field_ref) => Ok(FieldPlan::List {
724 items_nullability: items_dt.nullability(),
725 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
726 }),
727 DataType::LargeList(field_ref) => Ok(FieldPlan::List {
728 items_nullability: items_dt.nullability(),
729 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
730 }),
731 other => Err(ArrowError::SchemaError(format!(
732 "Avro array maps to Arrow List/LargeList, found: {other:?}"
733 ))),
734 },
735 Codec::Map(values_dt) => {
736 let entries_field = match arrow_field.data_type() {
737 DataType::Map(entries, _sorted) => entries.as_ref(),
738 other => {
739 return Err(ArrowError::SchemaError(format!(
740 "Avro map maps to Arrow DataType::Map, found: {other:?}"
741 )))
742 }
743 };
744 let entries_struct_fields = match entries_field.data_type() {
745 DataType::Struct(fs) => fs,
746 other => {
747 return Err(ArrowError::SchemaError(format!(
748 "Arrow Map entries must be Struct, found: {other:?}"
749 )))
750 }
751 };
752 let value_idx =
753 find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
754 ArrowError::SchemaError("Map entries struct missing value field".into())
755 })?;
756 let value_field = entries_struct_fields[value_idx].as_ref();
757 let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
758 Ok(FieldPlan::Map {
759 values_nullability: values_dt.nullability(),
760 value_plan: Box::new(value_plan),
761 })
762 }
763 Codec::Enum(symbols) => match arrow_field.data_type() {
764 DataType::Dictionary(key_dt, value_dt) => {
765 if **key_dt != DataType::Int32 {
766 return Err(ArrowError::SchemaError(
767 "Avro enum requires Dictionary<Int32, Utf8>".into(),
768 ));
769 }
770 if **value_dt != DataType::Utf8 {
771 return Err(ArrowError::SchemaError(
772 "Avro enum requires Dictionary<Int32, Utf8>".into(),
773 ));
774 }
775 Ok(FieldPlan::Enum {
776 symbols: symbols.clone(),
777 })
778 }
779 other => Err(ArrowError::SchemaError(format!(
780 "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
781 ))),
782 },
783 Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
785 let (ap, as_) = match arrow_field.data_type() {
786 #[cfg(feature = "small_decimals")]
787 DataType::Decimal32(p, s) => (*p as usize, *s as i32),
788 #[cfg(feature = "small_decimals")]
789 DataType::Decimal64(p, s) => (*p as usize, *s as i32),
790 DataType::Decimal128(p, s) => (*p as usize, *s as i32),
791 DataType::Decimal256(p, s) => (*p as usize, *s as i32),
792 other => {
793 return Err(ArrowError::SchemaError(format!(
794 "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
795 arrow_field.name()
796 )))
797 }
798 };
799 let sc = scale_opt.unwrap_or(0) as i32; if ap != *precision || as_ != sc {
801 return Err(ArrowError::SchemaError(format!(
802 "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
803 arrow_field.name()
804 )));
805 }
806 Ok(FieldPlan::Decimal {
807 size: *fixed_size_opt,
808 })
809 }
810 Codec::Interval => match arrow_field.data_type() {
811 DataType::Interval(IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime
812 ) => Ok(FieldPlan::Scalar),
813 other => Err(ArrowError::SchemaError(format!(
814 "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}"
815 ))),
816 }
817 _ => Ok(FieldPlan::Scalar),
818 }
819 }
820}
821
822enum Encoder<'a> {
823 Boolean(BooleanEncoder<'a>),
824 Int(IntEncoder<'a, Int32Type>),
825 Long(LongEncoder<'a, Int64Type>),
826 Timestamp(LongEncoder<'a, TimestampMicrosecondType>),
827 DurationSeconds(LongEncoder<'a, DurationSecondType>),
828 DurationMillis(LongEncoder<'a, DurationMillisecondType>),
829 DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
830 DurationNanos(LongEncoder<'a, DurationNanosecondType>),
831 Float32(F32Encoder<'a>),
832 Float64(F64Encoder<'a>),
833 Binary(BinaryEncoder<'a, i32>),
834 LargeBinary(BinaryEncoder<'a, i64>),
835 Utf8(Utf8Encoder<'a>),
836 Utf8Large(Utf8LargeEncoder<'a>),
837 List(Box<ListEncoder32<'a>>),
838 LargeList(Box<ListEncoder64<'a>>),
839 Struct(Box<StructEncoder<'a>>),
840 Fixed(FixedEncoder<'a>),
842 Uuid(UuidEncoder<'a>),
844 IntervalMonthDayNano(DurationEncoder<'a, IntervalMonthDayNanoType>),
846 IntervalYearMonth(DurationEncoder<'a, IntervalYearMonthType>),
848 IntervalDayTime(DurationEncoder<'a, IntervalDayTimeType>),
850 #[cfg(feature = "small_decimals")]
851 Decimal32(Decimal32Encoder<'a>),
852 #[cfg(feature = "small_decimals")]
853 Decimal64(Decimal64Encoder<'a>),
854 Decimal128(Decimal128Encoder<'a>),
855 Decimal256(Decimal256Encoder<'a>),
856 Enum(EnumEncoder<'a>),
858 Map(Box<MapEncoder<'a>>),
859}
860
861impl<'a> Encoder<'a> {
862 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
864 match self {
865 Encoder::Boolean(e) => e.encode(out, idx),
866 Encoder::Int(e) => e.encode(out, idx),
867 Encoder::Long(e) => e.encode(out, idx),
868 Encoder::Timestamp(e) => e.encode(out, idx),
869 Encoder::DurationSeconds(e) => e.encode(out, idx),
870 Encoder::DurationMicros(e) => e.encode(out, idx),
871 Encoder::DurationMillis(e) => e.encode(out, idx),
872 Encoder::DurationNanos(e) => e.encode(out, idx),
873 Encoder::Float32(e) => e.encode(out, idx),
874 Encoder::Float64(e) => e.encode(out, idx),
875 Encoder::Binary(e) => e.encode(out, idx),
876 Encoder::LargeBinary(e) => e.encode(out, idx),
877 Encoder::Utf8(e) => e.encode(out, idx),
878 Encoder::Utf8Large(e) => e.encode(out, idx),
879 Encoder::List(e) => e.encode(out, idx),
880 Encoder::LargeList(e) => e.encode(out, idx),
881 Encoder::Struct(e) => e.encode(out, idx),
882 Encoder::Fixed(e) => (e).encode(out, idx),
883 Encoder::Uuid(e) => (e).encode(out, idx),
884 Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx),
885 Encoder::IntervalYearMonth(e) => (e).encode(out, idx),
886 Encoder::IntervalDayTime(e) => (e).encode(out, idx),
887 #[cfg(feature = "small_decimals")]
888 Encoder::Decimal32(e) => (e).encode(out, idx),
889 #[cfg(feature = "small_decimals")]
890 Encoder::Decimal64(e) => (e).encode(out, idx),
891 Encoder::Decimal128(e) => (e).encode(out, idx),
892 Encoder::Decimal256(e) => (e).encode(out, idx),
893 Encoder::Map(e) => (e).encode(out, idx),
894 Encoder::Enum(e) => (e).encode(out, idx),
895 }
896 }
897}
898
899struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
900impl BooleanEncoder<'_> {
901 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
902 write_bool(out, self.0.value(idx))
903 }
904}
905
906struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
908impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
909 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
910 write_int(out, self.0.value(idx))
911 }
912}
913
914struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
916impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
917 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
918 write_long(out, self.0.value(idx))
919 }
920}
921
922struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
924impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
925 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
926 write_len_prefixed(out, self.0.value(idx))
927 }
928}
929
930struct F32Encoder<'a>(&'a arrow_array::Float32Array);
931impl F32Encoder<'_> {
932 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
933 let bits = self.0.value(idx).to_bits();
935 out.write_all(&bits.to_le_bytes())
936 .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e))
937 }
938}
939
940struct F64Encoder<'a>(&'a arrow_array::Float64Array);
941impl F64Encoder<'_> {
942 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
943 let bits = self.0.value(idx).to_bits();
945 out.write_all(&bits.to_le_bytes())
946 .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
947 }
948}
949
950struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
951
952impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
953 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
954 write_len_prefixed(out, self.0.value(idx).as_bytes())
955 }
956}
957
958type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
959type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
960
961enum KeyKind<'a> {
963 Utf8(&'a GenericStringArray<i32>),
964 LargeUtf8(&'a GenericStringArray<i64>),
965}
966struct MapEncoder<'a> {
967 map: &'a MapArray,
968 keys: KeyKind<'a>,
969 values: FieldEncoder<'a>,
970 keys_offset: usize,
971 values_offset: usize,
972}
973
974impl<'a> MapEncoder<'a> {
975 fn try_new(
976 map: &'a MapArray,
977 values_nullability: Option<Nullability>,
978 value_plan: &FieldPlan,
979 ) -> Result<Self, ArrowError> {
980 let keys_arr = map.keys();
981 let keys_kind = match keys_arr.data_type() {
982 DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
983 DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
984 other => {
985 return Err(ArrowError::SchemaError(format!(
986 "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
987 )))
988 }
989 };
990
991 let entries_struct_fields = match map.data_type() {
992 DataType::Map(entries, _) => match entries.data_type() {
993 DataType::Struct(fs) => fs,
994 other => {
995 return Err(ArrowError::SchemaError(format!(
996 "Arrow Map entries must be Struct, found: {other:?}"
997 )))
998 }
999 },
1000 _ => {
1001 return Err(ArrowError::SchemaError(
1002 "Expected MapArray with DataType::Map".into(),
1003 ))
1004 }
1005 };
1006
1007 let v_idx = find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
1008 ArrowError::SchemaError("Map entries struct missing value field".into())
1009 })?;
1010 let value_field = entries_struct_fields[v_idx].as_ref();
1011
1012 let values_enc = prepare_value_site_encoder(
1013 map.values().as_ref(),
1014 value_field,
1015 values_nullability,
1016 value_plan,
1017 )?;
1018
1019 Ok(Self {
1020 map,
1021 keys: keys_kind,
1022 values: values_enc,
1023 keys_offset: keys_arr.offset(),
1024 values_offset: map.values().offset(),
1025 })
1026 }
1027
1028 fn encode_map_entries<W, O>(
1029 out: &mut W,
1030 keys: &GenericStringArray<O>,
1031 keys_offset: usize,
1032 start: usize,
1033 end: usize,
1034 mut write_item: impl FnMut(&mut W, usize) -> Result<(), ArrowError>,
1035 ) -> Result<(), ArrowError>
1036 where
1037 W: Write + ?Sized,
1038 O: OffsetSizeTrait,
1039 {
1040 encode_blocked_range(out, start, end, |out, j| {
1041 let j_key = j.saturating_sub(keys_offset);
1042 write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1043 write_item(out, j)
1044 })
1045 }
1046
1047 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1048 let offsets = self.map.offsets();
1049 let start = offsets[idx] as usize;
1050 let end = offsets[idx + 1] as usize;
1051
1052 let mut write_item = |out: &mut W, j: usize| {
1053 let j_val = j.saturating_sub(self.values_offset);
1054 self.values.encode(out, j_val)
1055 };
1056
1057 match self.keys {
1058 KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1059 out,
1060 arr,
1061 self.keys_offset,
1062 start,
1063 end,
1064 write_item,
1065 ),
1066 KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1067 out,
1068 arr,
1069 self.keys_offset,
1070 start,
1071 end,
1072 write_item,
1073 ),
1074 }
1075 }
1076}
1077
1078struct EnumEncoder<'a> {
1085 keys: &'a PrimitiveArray<Int32Type>,
1086}
1087impl EnumEncoder<'_> {
1088 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), ArrowError> {
1089 write_int(out, self.keys.value(row))
1090 }
1091}
1092
1093struct StructEncoder<'a> {
1094 encoders: Vec<FieldEncoder<'a>>,
1095}
1096
1097impl<'a> StructEncoder<'a> {
1098 fn try_new(
1099 array: &'a StructArray,
1100 field_bindings: &[FieldBinding],
1101 ) -> Result<Self, ArrowError> {
1102 let DataType::Struct(fields) = array.data_type() else {
1103 return Err(ArrowError::SchemaError("Expected Struct".into()));
1104 };
1105 let mut encoders = Vec::with_capacity(field_bindings.len());
1106 for field_binding in field_bindings {
1107 let idx = field_binding.arrow_index;
1108 let column = array.columns().get(idx).ok_or_else(|| {
1109 ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1110 })?;
1111 let field = fields.get(idx).ok_or_else(|| {
1112 ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1113 })?;
1114 let encoder = prepare_value_site_encoder(
1115 column.as_ref(),
1116 field,
1117 field_binding.nullability,
1118 &field_binding.plan,
1119 )?;
1120 encoders.push(encoder);
1121 }
1122 Ok(Self { encoders })
1123 }
1124
1125 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1126 for encoder in self.encoders.iter_mut() {
1127 encoder.encode(out, idx)?;
1128 }
1129 Ok(())
1130 }
1131}
1132
1133fn encode_blocked_range<W: Write + ?Sized, F>(
1137 out: &mut W,
1138 start: usize,
1139 end: usize,
1140 mut write_item: F,
1141) -> Result<(), ArrowError>
1142where
1143 F: FnMut(&mut W, usize) -> Result<(), ArrowError>,
1144{
1145 let len = end.saturating_sub(start);
1146 if len == 0 {
1147 write_long(out, 0)?;
1149 return Ok(());
1150 }
1151 write_long(out, len as i64)?;
1153 for row in start..end {
1154 write_item(out, row)?;
1155 }
1156 write_long(out, 0)?;
1157 Ok(())
1158}
1159
1160struct ListEncoder<'a, O: OffsetSizeTrait> {
1161 list: &'a GenericListArray<O>,
1162 values: FieldEncoder<'a>,
1163 values_offset: usize,
1164}
1165
1166type ListEncoder32<'a> = ListEncoder<'a, i32>;
1167type ListEncoder64<'a> = ListEncoder<'a, i64>;
1168
1169impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
1170 fn try_new(
1171 list: &'a GenericListArray<O>,
1172 items_nullability: Option<Nullability>,
1173 item_plan: &FieldPlan,
1174 ) -> Result<Self, ArrowError> {
1175 let child_field = match list.data_type() {
1176 DataType::List(field) => field.as_ref(),
1177 DataType::LargeList(field) => field.as_ref(),
1178 _ => {
1179 return Err(ArrowError::SchemaError(
1180 "Expected List or LargeList for ListEncoder".into(),
1181 ))
1182 }
1183 };
1184 let values_enc = prepare_value_site_encoder(
1185 list.values().as_ref(),
1186 child_field,
1187 items_nullability,
1188 item_plan,
1189 )?;
1190 Ok(Self {
1191 list,
1192 values: values_enc,
1193 values_offset: list.values().offset(),
1194 })
1195 }
1196
1197 fn encode_list_range<W: Write + ?Sized>(
1198 &mut self,
1199 out: &mut W,
1200 start: usize,
1201 end: usize,
1202 ) -> Result<(), ArrowError> {
1203 encode_blocked_range(out, start, end, |out, row| {
1204 self.values
1205 .encode(out, row.saturating_sub(self.values_offset))
1206 })
1207 }
1208
1209 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1210 let offsets = self.list.offsets();
1211 let start = offsets[idx].to_usize().ok_or_else(|| {
1212 ArrowError::InvalidArgumentError(format!("Error converting offset[{idx}] to usize"))
1213 })?;
1214 let end = offsets[idx + 1].to_usize().ok_or_else(|| {
1215 ArrowError::InvalidArgumentError(format!(
1216 "Error converting offset[{}] to usize",
1217 idx + 1
1218 ))
1219 })?;
1220 self.encode_list_range(out, start, end)
1221 }
1222}
1223
1224fn prepare_value_site_encoder<'a>(
1225 values_array: &'a dyn Array,
1226 value_field: &Field,
1227 nullability: Option<Nullability>,
1228 plan: &FieldPlan,
1229) -> Result<FieldEncoder<'a>, ArrowError> {
1230 FieldEncoder::make_encoder(values_array, value_field, plan, nullability)
1232}
1233
1234struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
1237impl FixedEncoder<'_> {
1238 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1239 let v = self.0.value(idx); out.write_all(v)
1241 .map_err(|e| ArrowError::IoError(format!("write fixed bytes: {e}"), e))
1242 }
1243}
1244
1245struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
1248impl UuidEncoder<'_> {
1249 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1250 let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
1251 buf[0] = 0x48;
1252 let v = self.0.value(idx);
1253 let u = Uuid::from_slice(v)
1254 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid UUID bytes: {e}")))?;
1255 let _ = u.hyphenated().encode_lower(&mut buf[1..]);
1256 out.write_all(&buf)
1257 .map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e))
1258 }
1259}
1260
1261#[derive(Copy, Clone)]
1262struct DurationParts {
1263 months: u32,
1264 days: u32,
1265 millis: u32,
1266}
1267trait IntervalToDurationParts: ArrowPrimitiveType {
1269 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError>;
1270}
1271impl IntervalToDurationParts for IntervalMonthDayNanoType {
1272 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1273 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
1274 if months < 0 || days < 0 || nanos < 0 {
1275 return Err(ArrowError::InvalidArgumentError(
1276 "Avro 'duration' cannot encode negative months/days/nanoseconds".into(),
1277 ));
1278 }
1279 if nanos % 1_000_000 != 0 {
1280 return Err(ArrowError::InvalidArgumentError(
1281 "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000"
1282 .into(),
1283 ));
1284 }
1285 let millis = nanos / 1_000_000;
1286 if millis > u32::MAX as i64 {
1287 return Err(ArrowError::InvalidArgumentError(
1288 "Avro 'duration' milliseconds exceed u32::MAX".into(),
1289 ));
1290 }
1291 Ok(DurationParts {
1292 months: months as u32,
1293 days: days as u32,
1294 millis: millis as u32,
1295 })
1296 }
1297}
1298impl IntervalToDurationParts for IntervalYearMonthType {
1299 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1300 if native < 0 {
1301 return Err(ArrowError::InvalidArgumentError(
1302 "Avro 'duration' cannot encode negative months".into(),
1303 ));
1304 }
1305 Ok(DurationParts {
1306 months: native as u32,
1307 days: 0,
1308 millis: 0,
1309 })
1310 }
1311}
1312impl IntervalToDurationParts for IntervalDayTimeType {
1313 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1314 let (days, millis) = IntervalDayTimeType::to_parts(native);
1315 if days < 0 || millis < 0 {
1316 return Err(ArrowError::InvalidArgumentError(
1317 "Avro 'duration' cannot encode negative days or milliseconds".into(),
1318 ));
1319 }
1320 Ok(DurationParts {
1321 months: 0,
1322 days: days as u32,
1323 millis: millis as u32,
1324 })
1325 }
1326}
1327struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
1330impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
1331 #[inline(always)]
1332 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1333 let parts = P::duration_parts(self.0.value(idx))?;
1334 let months = parts.months.to_le_bytes();
1335 let days = parts.days.to_le_bytes();
1336 let ms = parts.millis.to_le_bytes();
1337 let buf = [
1353 months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
1354 ms[1], ms[2], ms[3],
1355 ];
1356 out.write_all(&buf)
1357 .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e))
1358 }
1359}
1360
1361trait DecimalBeBytes<const N: usize> {
1364 fn value_be_bytes(&self, idx: usize) -> [u8; N];
1365}
1366#[cfg(feature = "small_decimals")]
1367impl DecimalBeBytes<4> for Decimal32Array {
1368 fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
1369 self.value(idx).to_be_bytes()
1370 }
1371}
1372#[cfg(feature = "small_decimals")]
1373impl DecimalBeBytes<8> for Decimal64Array {
1374 fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
1375 self.value(idx).to_be_bytes()
1376 }
1377}
1378impl DecimalBeBytes<16> for Decimal128Array {
1379 fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
1380 self.value(idx).to_be_bytes()
1381 }
1382}
1383impl DecimalBeBytes<32> for Decimal256Array {
1384 fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
1385 self.value(idx).to_be_bytes()
1387 }
1388}
1389
1390struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
1396 arr: &'a A,
1397 fixed_size: Option<usize>,
1398}
1399
1400impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
1401 fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
1402 Self { arr, fixed_size }
1403 }
1404
1405 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1406 let be = self.arr.value_be_bytes(idx);
1407 match self.fixed_size {
1408 Some(n) => write_sign_extended(out, &be, n),
1409 None => write_len_prefixed(out, minimal_twos_complement(&be)),
1410 }
1411 }
1412}
1413
1414#[cfg(feature = "small_decimals")]
1415type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
1416#[cfg(feature = "small_decimals")]
1417type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
1418type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
1419type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
1420
1421#[cfg(test)]
1422mod tests {
1423 use super::*;
1424 use arrow_array::types::Int32Type;
1425 use arrow_array::{
1426 Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
1427 Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, StringArray,
1428 TimestampMicrosecondArray,
1429 };
1430 use arrow_schema::{DataType, Field, Fields};
1431
1432 fn zigzag_i64(v: i64) -> u64 {
1433 ((v << 1) ^ (v >> 63)) as u64
1434 }
1435
1436 fn varint(mut x: u64) -> Vec<u8> {
1437 let mut out = Vec::new();
1438 while (x & !0x7f) != 0 {
1439 out.push(((x & 0x7f) as u8) | 0x80);
1440 x >>= 7;
1441 }
1442 out.push((x & 0x7f) as u8);
1443 out
1444 }
1445
1446 fn avro_long_bytes(v: i64) -> Vec<u8> {
1447 varint(zigzag_i64(v))
1448 }
1449
1450 fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
1451 let mut out = avro_long_bytes(payload.len() as i64);
1452 out.extend_from_slice(payload);
1453 out
1454 }
1455
1456 fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
1457 let m = months.to_le_bytes();
1458 let d = days.to_le_bytes();
1459 let ms = millis.to_le_bytes();
1460 [
1461 m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
1462 ]
1463 }
1464
1465 fn encode_all(
1466 array: &dyn Array,
1467 plan: &FieldPlan,
1468 nullability: Option<Nullability>,
1469 ) -> Vec<u8> {
1470 let field = Field::new("f", array.data_type().clone(), true);
1471 let mut enc = FieldEncoder::make_encoder(array, &field, plan, nullability).unwrap();
1472 let mut out = Vec::new();
1473 for i in 0..array.len() {
1474 enc.encode(&mut out, i).unwrap();
1475 }
1476 out
1477 }
1478
1479 fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
1480 if actual != expected {
1481 let to_hex = |b: &[u8]| {
1482 b.iter()
1483 .map(|x| format!("{:02X}", x))
1484 .collect::<Vec<_>>()
1485 .join(" ")
1486 };
1487 panic!(
1488 "mismatch\n expected: [{}]\n actual: [{}]",
1489 to_hex(expected),
1490 to_hex(actual)
1491 );
1492 }
1493 }
1494
1495 #[test]
1496 fn binary_encoder() {
1497 let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
1498 let arr = BinaryArray::from_vec(values);
1499 let mut expected = Vec::new();
1500 for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
1501 expected.extend(avro_len_prefixed_bytes(payload));
1502 }
1503 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1504 assert_bytes_eq(&got, &expected);
1505 }
1506
1507 #[test]
1508 fn large_binary_encoder() {
1509 let values: Vec<&[u8]> = vec![b"xyz", b""];
1510 let arr = LargeBinaryArray::from_vec(values);
1511 let mut expected = Vec::new();
1512 for payload in [b"xyz" as &[u8], b""] {
1513 expected.extend(avro_len_prefixed_bytes(payload));
1514 }
1515 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1516 assert_bytes_eq(&got, &expected);
1517 }
1518
1519 #[test]
1520 fn utf8_encoder() {
1521 let arr = StringArray::from(vec!["", "A", "BC"]);
1522 let mut expected = Vec::new();
1523 for s in ["", "A", "BC"] {
1524 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
1525 }
1526 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1527 assert_bytes_eq(&got, &expected);
1528 }
1529
1530 #[test]
1531 fn large_utf8_encoder() {
1532 let arr = LargeStringArray::from(vec!["hello", ""]);
1533 let mut expected = Vec::new();
1534 for s in ["hello", ""] {
1535 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
1536 }
1537 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1538 assert_bytes_eq(&got, &expected);
1539 }
1540
1541 #[test]
1542 fn list_encoder_int32() {
1543 let values = Int32Array::from(vec![1, 2, 3]);
1545 let offsets = vec![0, 2, 2, 3];
1546 let list = ListArray::new(
1547 Field::new("item", DataType::Int32, true).into(),
1548 arrow_buffer::OffsetBuffer::new(offsets.into()),
1549 Arc::new(values) as ArrayRef,
1550 None,
1551 );
1552 let mut expected = Vec::new();
1554 expected.extend(avro_long_bytes(2));
1556 expected.extend(avro_long_bytes(1));
1557 expected.extend(avro_long_bytes(2));
1558 expected.extend(avro_long_bytes(0));
1559 expected.extend(avro_long_bytes(0));
1561 expected.extend(avro_long_bytes(1));
1563 expected.extend(avro_long_bytes(3));
1564 expected.extend(avro_long_bytes(0));
1565
1566 let plan = FieldPlan::List {
1567 items_nullability: None,
1568 item_plan: Box::new(FieldPlan::Scalar),
1569 };
1570 let got = encode_all(&list, &plan, None);
1571 assert_bytes_eq(&got, &expected);
1572 }
1573
1574 #[test]
1575 fn struct_encoder_two_fields() {
1576 let a = Int32Array::from(vec![1, 2]);
1578 let b = StringArray::from(vec!["x", "y"]);
1579 let fields = Fields::from(vec![
1580 Field::new("a", DataType::Int32, true),
1581 Field::new("b", DataType::Utf8, true),
1582 ]);
1583 let struct_arr = StructArray::new(
1584 fields.clone(),
1585 vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
1586 None,
1587 );
1588 let plan = FieldPlan::Struct {
1589 encoders: vec![
1590 FieldBinding {
1591 arrow_index: 0,
1592 nullability: None,
1593 plan: FieldPlan::Scalar,
1594 },
1595 FieldBinding {
1596 arrow_index: 1,
1597 nullability: None,
1598 plan: FieldPlan::Scalar,
1599 },
1600 ],
1601 };
1602 let got = encode_all(&struct_arr, &plan, None);
1603 let mut expected = Vec::new();
1605 expected.extend(avro_long_bytes(1)); expected.extend(avro_len_prefixed_bytes(b"x")); expected.extend(avro_long_bytes(2)); expected.extend(avro_len_prefixed_bytes(b"y")); assert_bytes_eq(&got, &expected);
1610 }
1611
1612 #[test]
1613 fn enum_encoder_dictionary() {
1614 let dict_values = StringArray::from(vec!["A", "B", "C"]);
1616 let keys = Int32Array::from(vec![2, 0, 1]);
1617 let dict =
1618 DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
1619 let symbols = Arc::<[String]>::from(
1620 vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
1621 );
1622 let plan = FieldPlan::Enum { symbols };
1623 let got = encode_all(&dict, &plan, None);
1624 let mut expected = Vec::new();
1625 expected.extend(avro_long_bytes(2));
1626 expected.extend(avro_long_bytes(0));
1627 expected.extend(avro_long_bytes(1));
1628 assert_bytes_eq(&got, &expected);
1629 }
1630
1631 #[test]
1632 fn decimal_bytes_and_fixed() {
1633 let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
1635 .with_precision_and_scale(20, 0)
1636 .unwrap();
1637 let plan_bytes = FieldPlan::Decimal { size: None };
1639 let got_bytes = encode_all(&dec, &plan_bytes, None);
1640 let mut expected_bytes = Vec::new();
1642 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
1643 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
1644 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
1645 assert_bytes_eq(&got_bytes, &expected_bytes);
1646
1647 let plan_fixed = FieldPlan::Decimal { size: Some(16) };
1648 let got_fixed = encode_all(&dec, &plan_fixed, None);
1649 let mut expected_fixed = Vec::new();
1650 expected_fixed.extend_from_slice(&1i128.to_be_bytes());
1651 expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
1652 expected_fixed.extend_from_slice(&0i128.to_be_bytes());
1653 assert_bytes_eq(&got_fixed, &expected_fixed);
1654 }
1655
1656 #[test]
1657 fn decimal_bytes_256() {
1658 use arrow_buffer::i256;
1659 let dec = Decimal256Array::from(vec![
1661 i256::from_i128(1),
1662 i256::from_i128(-1),
1663 i256::from_i128(0),
1664 ])
1665 .with_precision_and_scale(76, 0)
1666 .unwrap();
1667 let plan_bytes = FieldPlan::Decimal { size: None };
1669 let got_bytes = encode_all(&dec, &plan_bytes, None);
1670 let mut expected_bytes = Vec::new();
1672 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
1673 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
1674 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
1675 assert_bytes_eq(&got_bytes, &expected_bytes);
1676
1677 let plan_fixed = FieldPlan::Decimal { size: Some(32) };
1679 let got_fixed = encode_all(&dec, &plan_fixed, None);
1680 let mut expected_fixed = Vec::new();
1681 expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
1682 expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
1683 expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
1684 assert_bytes_eq(&got_fixed, &expected_fixed);
1685 }
1686
1687 #[cfg(feature = "small_decimals")]
1688 #[test]
1689 fn decimal_bytes_and_fixed_32() {
1690 let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
1692 .with_precision_and_scale(9, 0)
1693 .unwrap();
1694 let plan_bytes = FieldPlan::Decimal { size: None };
1696 let got_bytes = encode_all(&dec, &plan_bytes, None);
1697 let mut expected_bytes = Vec::new();
1698 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
1699 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
1700 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
1701 assert_bytes_eq(&got_bytes, &expected_bytes);
1702 let plan_fixed = FieldPlan::Decimal { size: Some(4) };
1704 let got_fixed = encode_all(&dec, &plan_fixed, None);
1705 let mut expected_fixed = Vec::new();
1706 expected_fixed.extend_from_slice(&1i32.to_be_bytes());
1707 expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
1708 expected_fixed.extend_from_slice(&0i32.to_be_bytes());
1709 assert_bytes_eq(&got_fixed, &expected_fixed);
1710 }
1711
1712 #[cfg(feature = "small_decimals")]
1713 #[test]
1714 fn decimal_bytes_and_fixed_64() {
1715 let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
1717 .with_precision_and_scale(18, 0)
1718 .unwrap();
1719 let plan_bytes = FieldPlan::Decimal { size: None };
1721 let got_bytes = encode_all(&dec, &plan_bytes, None);
1722 let mut expected_bytes = Vec::new();
1723 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
1724 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
1725 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
1726 assert_bytes_eq(&got_bytes, &expected_bytes);
1727 let plan_fixed = FieldPlan::Decimal { size: Some(8) };
1729 let got_fixed = encode_all(&dec, &plan_fixed, None);
1730 let mut expected_fixed = Vec::new();
1731 expected_fixed.extend_from_slice(&1i64.to_be_bytes());
1732 expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
1733 expected_fixed.extend_from_slice(&0i64.to_be_bytes());
1734 assert_bytes_eq(&got_fixed, &expected_fixed);
1735 }
1736
1737 #[test]
1738 fn float32_and_float64_encoders() {
1739 let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
1741 let mut expected32 = Vec::new();
1743 for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
1744 expected32.extend_from_slice(&v.to_bits().to_le_bytes());
1745 }
1746 let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
1747 assert_bytes_eq(&got32, &expected32);
1748 let mut expected64 = Vec::new();
1750 for v in [0.0f64, -2.25f64] {
1751 expected64.extend_from_slice(&v.to_bits().to_le_bytes());
1752 }
1753 let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
1754 assert_bytes_eq(&got64, &expected64);
1755 }
1756
1757 #[test]
1758 fn long_encoder_int64() {
1759 let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
1760 let mut expected = Vec::new();
1761 for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
1762 expected.extend(avro_long_bytes(v));
1763 }
1764 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1765 assert_bytes_eq(&got, &expected);
1766 }
1767
1768 #[test]
1769 fn fixed_encoder_plain() {
1770 let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
1772 let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
1773 let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
1774 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1775 let mut expected = Vec::new();
1776 expected.extend_from_slice(&data[0]);
1777 expected.extend_from_slice(&data[1]);
1778 assert_bytes_eq(&got, &expected);
1779 }
1780
1781 #[test]
1782 fn uuid_encoder_test() {
1783 let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
1785 let bytes = *u.as_bytes();
1786 let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
1787 let mut expected = Vec::new();
1789 expected.push(0x48);
1790 expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
1791 let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
1792 assert_bytes_eq(&got, &expected);
1793 }
1794
1795 #[test]
1796 fn uuid_encoder_error() {
1797 let arr =
1799 FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
1800 .unwrap();
1801 let plan = FieldPlan::Uuid;
1802
1803 let field = Field::new("f", arr.data_type().clone(), true);
1804 let mut enc = FieldEncoder::make_encoder(&arr, &field, &plan, None).unwrap();
1805 let mut out = Vec::new();
1806 let err = enc.encode(&mut out, 0).unwrap_err();
1807 match err {
1808 ArrowError::InvalidArgumentError(msg) => {
1809 assert!(msg.contains("Invalid UUID bytes"))
1810 }
1811 other => panic!("expected InvalidArgumentError, got {other:?}"),
1812 }
1813 }
1814
1815 #[test]
1816 fn map_encoder_string_keys_int_values() {
1817 let keys = StringArray::from(vec!["k1", "k2"]);
1821 let values = Int32Array::from(vec![1, 2]);
1822 let entries_fields = Fields::from(vec![
1823 Field::new("key", DataType::Utf8, false),
1824 Field::new("value", DataType::Int32, true),
1825 ]);
1826 let entries = StructArray::new(
1827 entries_fields,
1828 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
1829 None,
1830 );
1831 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
1832 let map = MapArray::new(
1833 Field::new("entries", entries.data_type().clone(), false).into(),
1834 offsets,
1835 entries,
1836 None,
1837 false,
1838 );
1839 let plan = FieldPlan::Map {
1840 values_nullability: None,
1841 value_plan: Box::new(FieldPlan::Scalar),
1842 };
1843 let got = encode_all(&map, &plan, None);
1844 let mut expected = Vec::new();
1845 expected.extend(avro_long_bytes(2));
1847 expected.extend(avro_len_prefixed_bytes(b"k1"));
1848 expected.extend(avro_long_bytes(1));
1849 expected.extend(avro_len_prefixed_bytes(b"k2"));
1850 expected.extend(avro_long_bytes(2));
1851 expected.extend(avro_long_bytes(0));
1852 expected.extend(avro_long_bytes(0));
1854 assert_bytes_eq(&got, &expected);
1855 }
1856
1857 #[test]
1858 fn list64_encoder_int32() {
1859 let values = Int32Array::from(vec![1, 2, 3]);
1861 let offsets: Vec<i64> = vec![0, 3, 3];
1862 let list = LargeListArray::new(
1863 Field::new("item", DataType::Int32, true).into(),
1864 arrow_buffer::OffsetBuffer::new(offsets.into()),
1865 Arc::new(values) as ArrayRef,
1866 None,
1867 );
1868 let plan = FieldPlan::List {
1869 items_nullability: None,
1870 item_plan: Box::new(FieldPlan::Scalar),
1871 };
1872 let got = encode_all(&list, &plan, None);
1873 let mut expected = Vec::new();
1875 expected.extend(avro_long_bytes(3));
1876 expected.extend(avro_long_bytes(1));
1877 expected.extend(avro_long_bytes(2));
1878 expected.extend(avro_long_bytes(3));
1879 expected.extend(avro_long_bytes(0));
1880 expected.extend(avro_long_bytes(0));
1881 assert_bytes_eq(&got, &expected);
1882 }
1883
1884 #[test]
1885 fn int_encoder_test() {
1886 let ints = Int32Array::from(vec![0, -1, 2]);
1887 let mut expected_i = Vec::new();
1888 for v in [0i32, -1, 2] {
1889 expected_i.extend(avro_long_bytes(v as i64));
1890 }
1891 let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
1892 assert_bytes_eq(&got_i, &expected_i);
1893 }
1894
1895 #[test]
1896 fn boolean_encoder_test() {
1897 let bools = BooleanArray::from(vec![true, false]);
1898 let mut expected_b = Vec::new();
1899 expected_b.extend_from_slice(&[1]);
1900 expected_b.extend_from_slice(&[0]);
1901 let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
1902 assert_bytes_eq(&got_b, &expected_b);
1903 }
1904
1905 #[test]
1906 #[cfg(feature = "avro_custom_types")]
1907 fn duration_encoding_seconds() {
1908 let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
1909 let mut expected = Vec::new();
1910 for v in [0i64, -1, 2] {
1911 expected.extend_from_slice(&avro_long_bytes(v));
1912 }
1913 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1914 assert_bytes_eq(&got, &expected);
1915 }
1916
1917 #[test]
1918 #[cfg(feature = "avro_custom_types")]
1919 fn duration_encoding_milliseconds() {
1920 let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
1921 let mut expected = Vec::new();
1922 for v in [1i64, 0, -2] {
1923 expected.extend_from_slice(&avro_long_bytes(v));
1924 }
1925 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1926 assert_bytes_eq(&got, &expected);
1927 }
1928
1929 #[test]
1930 #[cfg(feature = "avro_custom_types")]
1931 fn duration_encoding_microseconds() {
1932 let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
1933 let mut expected = Vec::new();
1934 for v in [5i64, -6, 7] {
1935 expected.extend_from_slice(&avro_long_bytes(v));
1936 }
1937 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1938 assert_bytes_eq(&got, &expected);
1939 }
1940
1941 #[test]
1942 #[cfg(feature = "avro_custom_types")]
1943 fn duration_encoding_nanoseconds() {
1944 let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
1945 let mut expected = Vec::new();
1946 for v in [8i64, 9, -10] {
1947 expected.extend_from_slice(&avro_long_bytes(v));
1948 }
1949 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1950 assert_bytes_eq(&got, &expected);
1951 }
1952
1953 #[test]
1954 fn duration_encoder_year_month_happy_path() {
1955 let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
1956 let mut expected = Vec::new();
1957 for m in [0u32, 1u32, 25u32] {
1958 expected.extend_from_slice(&duration_fixed12(m, 0, 0));
1959 }
1960 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1961 assert_bytes_eq(&got, &expected);
1962 }
1963
1964 #[test]
1965 fn duration_encoder_year_month_rejects_negative() {
1966 let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
1967 let field = Field::new("f", DataType::Interval(IntervalUnit::YearMonth), true);
1968 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
1969 let mut out = Vec::new();
1970 let err = enc.encode(&mut out, 0).unwrap_err();
1971 match err {
1972 ArrowError::InvalidArgumentError(msg) => {
1973 assert!(msg.contains("cannot encode negative months"))
1974 }
1975 other => panic!("expected InvalidArgumentError, got {other:?}"),
1976 }
1977 }
1978
1979 #[test]
1980 fn duration_encoder_day_time_happy_path() {
1981 let v0 = IntervalDayTimeType::make_value(2, 500); let v1 = IntervalDayTimeType::make_value(0, 0);
1983 let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
1984 let mut expected = Vec::new();
1985 expected.extend_from_slice(&duration_fixed12(0, 2, 500));
1986 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
1987 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1988 assert_bytes_eq(&got, &expected);
1989 }
1990
1991 #[test]
1992 fn duration_encoder_day_time_rejects_negative() {
1993 let bad = IntervalDayTimeType::make_value(-1, 0);
1994 let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
1995 let field = Field::new("f", DataType::Interval(IntervalUnit::DayTime), true);
1996 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
1997 let mut out = Vec::new();
1998 let err = enc.encode(&mut out, 0).unwrap_err();
1999 match err {
2000 ArrowError::InvalidArgumentError(msg) => {
2001 assert!(msg.contains("cannot encode negative days"))
2002 }
2003 other => panic!("expected InvalidArgumentError, got {other:?}"),
2004 }
2005 }
2006
2007 #[test]
2008 fn duration_encoder_month_day_nano_happy_path() {
2009 let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
2011 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
2012 let mut expected = Vec::new();
2013 expected.extend_from_slice(&duration_fixed12(1, 2, 3));
2014 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2015 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2016 assert_bytes_eq(&got, &expected);
2017 }
2018
2019 #[test]
2020 fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
2021 let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
2022 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
2023 let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true);
2024 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2025 let mut out = Vec::new();
2026 let err = enc.encode(&mut out, 0).unwrap_err();
2027 match err {
2028 ArrowError::InvalidArgumentError(msg) => {
2029 assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
2030 }
2031 other => panic!("expected InvalidArgumentError, got {other:?}"),
2032 }
2033 }
2034
2035 #[test]
2036 fn minimal_twos_complement_test() {
2037 let pos = [0x00, 0x00, 0x01];
2038 assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
2039 let neg = [0xFF, 0xFF, 0x80]; assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
2041 let zero = [0x00, 0x00, 0x00];
2042 assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
2043 }
2044
2045 #[test]
2046 fn write_sign_extend_test() {
2047 let mut out = Vec::new();
2048 write_sign_extended(&mut out, &[0x01], 4).unwrap();
2049 assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
2050 out.clear();
2051 write_sign_extended(&mut out, &[0xFF], 4).unwrap();
2052 assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
2053 out.clear();
2054 write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
2056 assert_eq!(out, vec![0xFF, 0x80]);
2057 out.clear();
2058 let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
2060 match err {
2061 ArrowError::InvalidArgumentError(_) => {}
2062 _ => panic!("expected InvalidArgumentError"),
2063 }
2064 }
2065
2066 #[test]
2067 fn duration_month_day_nano_overflow_millis() {
2068 let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
2070 let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
2071 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
2072 let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true);
2073 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2074 let mut out = Vec::new();
2075 let err = enc.encode(&mut out, 0).unwrap_err();
2076 match err {
2077 ArrowError::InvalidArgumentError(msg) => assert!(msg.contains("exceed u32::MAX")),
2078 _ => panic!("expected InvalidArgumentError"),
2079 }
2080 }
2081
2082 #[test]
2083 fn fieldplan_decimal_precision_scale_mismatch_errors() {
2084 use crate::codec::Codec;
2086 use std::collections::HashMap;
2087 let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
2088 let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2089 let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
2090 match err {
2091 ArrowError::SchemaError(msg) => {
2092 assert!(msg.contains("Decimal precision/scale mismatch"))
2093 }
2094 _ => panic!("expected SchemaError"),
2095 }
2096 }
2097}