1use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::schema::{Fingerprint, Nullability, Prefix};
22use arrow_array::cast::AsArray;
23use arrow_array::types::{
24 ArrowPrimitiveType, Date32Type, DurationMicrosecondType, DurationMillisecondType,
25 DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int16Type, Int32Type,
26 Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
27 Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
28 TimestampMillisecondType,
29};
30use arrow_array::types::{
31 RunEndIndexType, Time32SecondType, TimestampNanosecondType, TimestampSecondType,
32};
33use arrow_array::{
34 Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
35 FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray,
36 GenericListViewArray, GenericStringArray, LargeListArray, LargeListViewArray, ListArray,
37 ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
38 StringViewArray, StructArray, UnionArray,
39};
40#[cfg(feature = "small_decimals")]
41use arrow_array::{Decimal32Array, Decimal64Array};
42use arrow_buffer::{ArrowNativeType, NullBuffer};
43use arrow_schema::{
44 ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode,
45};
46use std::io::Write;
47use std::sync::Arc;
48use uuid::Uuid;
49
50#[inline]
54pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), ArrowError> {
55 let mut zz = ((value << 1) ^ (value >> 63)) as u64;
56 let mut buf = [0u8; 10];
58 let mut i = 0;
59 while (zz & !0x7F) != 0 {
60 buf[i] = ((zz & 0x7F) as u8) | 0x80;
61 i += 1;
62 zz >>= 7;
63 }
64 buf[i] = (zz & 0x7F) as u8;
65 i += 1;
66 out.write_all(&buf[..i])
67 .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e))
68}
69
70#[inline]
71fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), ArrowError> {
72 write_long(out, value as i64)
73}
74
75#[inline]
76fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
77 write_long(out, bytes.len() as i64)?;
78 out.write_all(bytes)
79 .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
80}
81
82#[inline]
83fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), ArrowError> {
84 out.write_all(&[if v { 1 } else { 0 }])
85 .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
86}
87
88#[inline]
97fn minimal_twos_complement(be: &[u8]) -> &[u8] {
98 if be.is_empty() {
99 return be;
100 }
101 let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
102 let mut k = 0usize;
103 while k < be.len() && be[k] == sign_byte {
104 k += 1;
105 }
106 if k == 0 {
107 return be;
108 }
109 if k == be.len() {
110 return &be[be.len() - 1..];
111 }
112 let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
113 k
114 } else {
115 k - 1
116 };
117 &be[drop..]
118}
119
120#[inline]
132fn write_sign_extended<W: Write + ?Sized>(
133 out: &mut W,
134 src_be: &[u8],
135 n: usize,
136) -> Result<(), ArrowError> {
137 let len = src_be.len();
138 if len == n {
139 return out
140 .write_all(src_be)
141 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
142 }
143 let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
144 0xFF
145 } else {
146 0x00
147 };
148 if len > n {
149 let extra = len - n;
150 if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
151 return Ok(());
152 }
153 if src_be[..extra].iter().any(|&b| b != sign_byte)
156 || ((src_be[extra] ^ sign_byte) & 0x80) != 0
157 {
158 return Err(ArrowError::InvalidArgumentError(format!(
159 "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
160 )));
161 }
162 return out
163 .write_all(&src_be[extra..])
164 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
165 }
166 let pad_len = n - len;
168 const ZPAD: [u8; 64] = [0x00; 64];
170 const FPAD: [u8; 64] = [0xFF; 64];
171 let pad = if sign_byte == 0x00 {
172 &ZPAD[..]
173 } else {
174 &FPAD[..]
175 };
176 let mut rem = pad_len;
179 while rem >= pad.len() {
180 out.write_all(pad)
181 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
182 rem -= pad.len();
183 }
184 if rem > 0 {
185 out.write_all(&pad[..rem])
186 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
187 }
188 out.write_all(src_be)
189 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))
190}
191
192fn write_optional_index<W: Write + ?Sized>(
198 out: &mut W,
199 is_null: bool,
200 null_order: Nullability,
201) -> Result<(), ArrowError> {
202 let byte = union_value_branch_byte(null_order, is_null);
203 out.write_all(&[byte])
204 .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), e))
205}
206
207#[derive(Debug, Clone)]
208enum NullState<'a> {
209 NonNullable,
210 NullableNoNulls {
211 union_value_byte: u8,
212 },
213 Nullable {
214 nulls: &'a NullBuffer,
215 null_order: Nullability,
216 },
217}
218
219pub(crate) struct FieldEncoder<'a> {
223 encoder: Encoder<'a>,
224 null_state: NullState<'a>,
225}
226
227impl<'a> FieldEncoder<'a> {
228 fn make_encoder(
229 array: &'a dyn Array,
230 plan: &FieldPlan,
231 nullability: Option<Nullability>,
232 ) -> Result<Self, ArrowError> {
233 let encoder = match plan {
234 FieldPlan::Scalar => match array.data_type() {
235 DataType::Null => Encoder::Null,
236 DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
237 DataType::Utf8 => {
238 Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
239 }
240 DataType::LargeUtf8 => {
241 Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
242 }
243 DataType::Utf8View => {
244 let arr = array
245 .as_any()
246 .downcast_ref::<StringViewArray>()
247 .ok_or_else(|| {
248 ArrowError::SchemaError("Expected StringViewArray".into())
249 })?;
250 Encoder::Utf8View(Utf8ViewEncoder(arr))
251 }
252 DataType::BinaryView => {
253 let arr = array
254 .as_any()
255 .downcast_ref::<BinaryViewArray>()
256 .ok_or_else(|| {
257 ArrowError::SchemaError("Expected BinaryViewArray".into())
258 })?;
259 Encoder::BinaryView(BinaryViewEncoder(arr))
260 }
261 DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
262 DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
263 DataType::Date32 => Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
264 DataType::Date64 => {
265 return Err(ArrowError::NotYetImplemented(
266 "Avro logical type 'date' is days since epoch (int). Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a Timestamp."
267 .into(),
268 ));
269 }
270 DataType::Time32(TimeUnit::Second) => Encoder::Time32SecsToMillis(
271 Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
272 ),
273 DataType::Time32(TimeUnit::Millisecond) => {
274 Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
275 }
276 DataType::Time32(TimeUnit::Microsecond) => {
277 return Err(ArrowError::InvalidArgumentError(
278 "Arrow Time32 only supports Second or Millisecond. Use Time64 for microseconds."
279 .into(),
280 ));
281 }
282 DataType::Time32(TimeUnit::Nanosecond) => {
283 return Err(ArrowError::InvalidArgumentError(
284 "Arrow Time32 only supports Second or Millisecond. Use Time64 for nanoseconds."
285 .into(),
286 ));
287 }
288 DataType::Time64(TimeUnit::Microsecond) => Encoder::Time64Micros(LongEncoder(
289 array.as_primitive::<Time64MicrosecondType>(),
290 )),
291 DataType::Time64(TimeUnit::Nanosecond) => {
292 return Err(ArrowError::NotYetImplemented(
293 "Avro writer does not support time-nanos; cast to Time64(Microsecond)."
294 .into(),
295 ));
296 }
297 DataType::Time64(TimeUnit::Millisecond) => {
298 return Err(ArrowError::InvalidArgumentError(
299 "Arrow Time64 with millisecond unit is not a valid Arrow type (use Time32 for millis)."
300 .into(),
301 ));
302 }
303 DataType::Time64(TimeUnit::Second) => {
304 return Err(ArrowError::InvalidArgumentError(
305 "Arrow Time64 with second unit is not a valid Arrow type (use Time32 for seconds)."
306 .into(),
307 ));
308 }
309 DataType::Float32 => {
310 Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
311 }
312 DataType::Float64 => {
313 Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
314 }
315 DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
316 DataType::LargeBinary => {
317 Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
318 }
319 DataType::FixedSizeBinary(_len) => {
320 let arr = array
321 .as_any()
322 .downcast_ref::<FixedSizeBinaryArray>()
323 .ok_or_else(|| {
324 ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
325 })?;
326 Encoder::Fixed(FixedEncoder(arr))
327 }
328 DataType::Timestamp(unit, _) => match unit {
329 TimeUnit::Second => {
330 Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
331 array.as_primitive::<TimestampSecondType>(),
332 ))
333 }
334 TimeUnit::Millisecond => Encoder::TimestampMillis(LongEncoder(
335 array.as_primitive::<TimestampMillisecondType>(),
336 )),
337 TimeUnit::Microsecond => Encoder::TimestampMicros(LongEncoder(
338 array.as_primitive::<TimestampMicrosecondType>(),
339 )),
340 TimeUnit::Nanosecond => Encoder::TimestampNanos(LongEncoder(
341 array.as_primitive::<TimestampNanosecondType>(),
342 )),
343 },
344 DataType::Interval(unit) => match unit {
345 IntervalUnit::MonthDayNano => Encoder::IntervalMonthDayNano(DurationEncoder(
346 array.as_primitive::<IntervalMonthDayNanoType>(),
347 )),
348 IntervalUnit::YearMonth => Encoder::IntervalYearMonth(DurationEncoder(
349 array.as_primitive::<IntervalYearMonthType>(),
350 )),
351 IntervalUnit::DayTime => Encoder::IntervalDayTime(DurationEncoder(
352 array.as_primitive::<IntervalDayTimeType>(),
353 )),
354 },
355 DataType::Duration(tu) => match tu {
356 TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
357 array.as_primitive::<DurationSecondType>(),
358 )),
359 TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
360 array.as_primitive::<DurationMillisecondType>(),
361 )),
362 TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
363 array.as_primitive::<DurationMicrosecondType>(),
364 )),
365 TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
366 array.as_primitive::<DurationNanosecondType>(),
367 )),
368 },
369 other => {
370 return Err(ArrowError::NotYetImplemented(format!(
371 "Avro scalar type not yet supported: {other:?}"
372 )));
373 }
374 },
375 FieldPlan::Struct { bindings } => {
376 let arr = array
377 .as_any()
378 .downcast_ref::<StructArray>()
379 .ok_or_else(|| ArrowError::SchemaError("Expected StructArray".into()))?;
380 Encoder::Struct(Box::new(StructEncoder::try_new(arr, bindings)?))
381 }
382 FieldPlan::List {
383 items_nullability,
384 item_plan,
385 } => match array.data_type() {
386 DataType::List(_) => {
387 let arr = array
388 .as_any()
389 .downcast_ref::<ListArray>()
390 .ok_or_else(|| ArrowError::SchemaError("Expected ListArray".into()))?;
391 Encoder::List(Box::new(ListEncoder32::try_new(
392 arr,
393 *items_nullability,
394 item_plan.as_ref(),
395 )?))
396 }
397 DataType::LargeList(_) => {
398 let arr = array
399 .as_any()
400 .downcast_ref::<LargeListArray>()
401 .ok_or_else(|| ArrowError::SchemaError("Expected LargeListArray".into()))?;
402 Encoder::LargeList(Box::new(ListEncoder64::try_new(
403 arr,
404 *items_nullability,
405 item_plan.as_ref(),
406 )?))
407 }
408 DataType::ListView(_) => {
409 let arr = array
410 .as_any()
411 .downcast_ref::<ListViewArray>()
412 .ok_or_else(|| ArrowError::SchemaError("Expected ListViewArray".into()))?;
413 Encoder::ListView(Box::new(ListViewEncoder32::try_new(
414 arr,
415 *items_nullability,
416 item_plan.as_ref(),
417 )?))
418 }
419 DataType::LargeListView(_) => {
420 let arr = array
421 .as_any()
422 .downcast_ref::<LargeListViewArray>()
423 .ok_or_else(|| {
424 ArrowError::SchemaError("Expected LargeListViewArray".into())
425 })?;
426 Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
427 arr,
428 *items_nullability,
429 item_plan.as_ref(),
430 )?))
431 }
432 DataType::FixedSizeList(_, _) => {
433 let arr = array
434 .as_any()
435 .downcast_ref::<FixedSizeListArray>()
436 .ok_or_else(|| {
437 ArrowError::SchemaError("Expected FixedSizeListArray".into())
438 })?;
439 Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
440 arr,
441 *items_nullability,
442 item_plan.as_ref(),
443 )?))
444 }
445 other => {
446 return Err(ArrowError::SchemaError(format!(
447 "Avro array site requires Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
448 )));
449 }
450 },
451 FieldPlan::Decimal { size } => match array.data_type() {
452 #[cfg(feature = "small_decimals")]
453 DataType::Decimal32(_, _) => {
454 let arr = array
455 .as_any()
456 .downcast_ref::<Decimal32Array>()
457 .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?;
458 Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
459 }
460 #[cfg(feature = "small_decimals")]
461 DataType::Decimal64(_, _) => {
462 let arr = array
463 .as_any()
464 .downcast_ref::<Decimal64Array>()
465 .ok_or_else(|| ArrowError::SchemaError("Expected Decimal64Array".into()))?;
466 Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
467 }
468 DataType::Decimal128(_, _) => {
469 let arr = array
470 .as_any()
471 .downcast_ref::<Decimal128Array>()
472 .ok_or_else(|| {
473 ArrowError::SchemaError("Expected Decimal128Array".into())
474 })?;
475 Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
476 }
477 DataType::Decimal256(_, _) => {
478 let arr = array
479 .as_any()
480 .downcast_ref::<Decimal256Array>()
481 .ok_or_else(|| {
482 ArrowError::SchemaError("Expected Decimal256Array".into())
483 })?;
484 Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
485 }
486 other => {
487 return Err(ArrowError::SchemaError(format!(
488 "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
489 )));
490 }
491 },
492 FieldPlan::Uuid => {
493 let arr = array
494 .as_any()
495 .downcast_ref::<FixedSizeBinaryArray>()
496 .ok_or_else(|| {
497 ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
498 })?;
499 Encoder::Uuid(UuidEncoder(arr))
500 }
501 FieldPlan::Map {
502 values_nullability,
503 value_plan,
504 } => {
505 let arr = array
506 .as_any()
507 .downcast_ref::<MapArray>()
508 .ok_or_else(|| ArrowError::SchemaError("Expected MapArray".into()))?;
509 Encoder::Map(Box::new(MapEncoder::try_new(
510 arr,
511 *values_nullability,
512 value_plan.as_ref(),
513 )?))
514 }
515 FieldPlan::Enum { symbols } => match array.data_type() {
516 DataType::Dictionary(key_dt, value_dt) => {
517 if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
518 return Err(ArrowError::SchemaError(
519 "Avro enum requires Dictionary<Int32, Utf8>".into(),
520 ));
521 }
522 let dict = array
523 .as_any()
524 .downcast_ref::<DictionaryArray<Int32Type>>()
525 .ok_or_else(|| {
526 ArrowError::SchemaError("Expected DictionaryArray<Int32>".into())
527 })?;
528 let values = dict
529 .values()
530 .as_any()
531 .downcast_ref::<StringArray>()
532 .ok_or_else(|| {
533 ArrowError::SchemaError("Dictionary values must be Utf8".into())
534 })?;
535 if values.len() != symbols.len() {
536 return Err(ArrowError::SchemaError(format!(
537 "Enum symbol length {} != dictionary size {}",
538 symbols.len(),
539 values.len()
540 )));
541 }
542 for i in 0..values.len() {
543 if values.value(i) != symbols[i].as_str() {
544 return Err(ArrowError::SchemaError(format!(
545 "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
546 symbols[i],
547 values.value(i)
548 )));
549 }
550 }
551 let keys = dict.keys();
552 Encoder::Enum(EnumEncoder { keys })
553 }
554 other => {
555 return Err(ArrowError::SchemaError(format!(
556 "Avro enum site requires DataType::Dictionary, found: {other:?}"
557 )));
558 }
559 },
560 FieldPlan::Union { bindings } => {
561 let arr = array
562 .as_any()
563 .downcast_ref::<UnionArray>()
564 .ok_or_else(|| ArrowError::SchemaError("Expected UnionArray".into()))?;
565 Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
566 }
567 FieldPlan::RunEndEncoded {
568 values_nullability,
569 value_plan,
570 } => {
571 let build = |run_arr_any: &'a dyn Array| -> Result<Encoder<'a>, ArrowError> {
573 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
574 return Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
575 Int16Type,
576 >::new(
577 arr,
578 FieldEncoder::make_encoder(
579 arr.values().as_ref(),
580 value_plan.as_ref(),
581 *values_nullability,
582 )?,
583 ))));
584 }
585 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
586 return Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
587 Int32Type,
588 >::new(
589 arr,
590 FieldEncoder::make_encoder(
591 arr.values().as_ref(),
592 value_plan.as_ref(),
593 *values_nullability,
594 )?,
595 ))));
596 }
597 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
598 return Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
599 Int64Type,
600 >::new(
601 arr,
602 FieldEncoder::make_encoder(
603 arr.values().as_ref(),
604 value_plan.as_ref(),
605 *values_nullability,
606 )?,
607 ))));
608 }
609 Err(ArrowError::SchemaError(
610 "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64"
611 .into(),
612 ))
613 };
614 build(array)?
615 }
616 };
617 let null_state = match nullability {
619 None => NullState::NonNullable,
620 Some(null_order) => {
621 match array.nulls() {
622 Some(nulls) if array.null_count() > 0 => {
623 NullState::Nullable { nulls, null_order }
624 }
625 _ => NullState::NullableNoNulls {
626 union_value_byte: union_value_branch_byte(null_order, false),
628 },
629 }
630 }
631 };
632 Ok(Self {
633 encoder,
634 null_state,
635 })
636 }
637
638 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
639 match &self.null_state {
640 NullState::NonNullable => {}
641 NullState::NullableNoNulls { union_value_byte } => out
642 .write_all(&[*union_value_byte])
643 .map_err(|e| ArrowError::IoError(format!("write union value branch: {e}"), e))?,
644 NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
645 return write_optional_index(out, true, *null_order); }
647 NullState::Nullable { null_order, .. } => {
648 write_optional_index(out, false, *null_order)?;
649 }
650 }
651 self.encoder.encode(out, idx)
652 }
653}
654
655fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
656 let nulls_first = null_order == Nullability::default();
657 if nulls_first == is_null { 0x00 } else { 0x02 }
658}
659
660#[derive(Debug, Clone)]
663enum FieldPlan {
664 Scalar,
666 Struct { bindings: Vec<FieldBinding> },
668 List {
670 items_nullability: Option<Nullability>,
671 item_plan: Box<FieldPlan>,
672 },
673 Decimal { size: Option<usize> },
675 Uuid,
677 Map {
679 values_nullability: Option<Nullability>,
680 value_plan: Box<FieldPlan>,
681 },
682 Enum { symbols: Arc<[String]> },
685 Union { bindings: Vec<FieldBinding> },
687 RunEndEncoded {
690 values_nullability: Option<Nullability>,
691 value_plan: Box<FieldPlan>,
692 },
693}
694
695#[derive(Debug, Clone)]
696struct FieldBinding {
697 arrow_index: usize,
699 nullability: Option<Nullability>,
701 plan: FieldPlan,
703}
704
705#[derive(Debug)]
707pub(crate) struct RecordEncoderBuilder<'a> {
708 avro_root: &'a AvroField,
709 arrow_schema: &'a ArrowSchema,
710 fingerprint: Option<Fingerprint>,
711}
712
713impl<'a> RecordEncoderBuilder<'a> {
714 pub(crate) fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
716 Self {
717 avro_root,
718 arrow_schema,
719 fingerprint: None,
720 }
721 }
722
723 pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
724 self.fingerprint = fingerprint;
725 self
726 }
727
728 pub(crate) fn build(self) -> Result<RecordEncoder, ArrowError> {
731 let avro_root_dt = self.avro_root.data_type();
732 let Codec::Struct(root_fields) = avro_root_dt.codec() else {
733 return Err(ArrowError::SchemaError(
734 "Top-level Avro schema must be a record/struct".into(),
735 ));
736 };
737 let mut columns = Vec::with_capacity(root_fields.len());
738 for root_field in root_fields.as_ref() {
739 let name = root_field.name();
740 let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
741 ArrowError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
742 })?;
743 columns.push(FieldBinding {
744 arrow_index,
745 nullability: root_field.data_type().nullability(),
746 plan: FieldPlan::build(
747 root_field.data_type(),
748 self.arrow_schema.field(arrow_index),
749 )?,
750 });
751 }
752 Ok(RecordEncoder {
753 columns,
754 prefix: self.fingerprint.map(|fp| fp.make_prefix()),
755 })
756 }
757}
758
759#[derive(Debug, Clone)]
765pub(crate) struct RecordEncoder {
766 columns: Vec<FieldBinding>,
767 prefix: Option<Prefix>,
769}
770
771impl RecordEncoder {
772 fn prepare_for_batch<'a>(
773 &'a self,
774 batch: &'a RecordBatch,
775 ) -> Result<Vec<FieldEncoder<'a>>, ArrowError> {
776 let arrays = batch.columns();
777 let mut out = Vec::with_capacity(self.columns.len());
778 for col_plan in self.columns.iter() {
779 let arrow_index = col_plan.arrow_index;
780 let array = arrays.get(arrow_index).ok_or_else(|| {
781 ArrowError::SchemaError(format!("Column index {arrow_index} out of range"))
782 })?;
783 #[cfg(not(feature = "avro_custom_types"))]
784 let site_nullability = match &col_plan.plan {
785 FieldPlan::RunEndEncoded { .. } => None,
786 _ => col_plan.nullability,
787 };
788 #[cfg(feature = "avro_custom_types")]
789 let site_nullability = col_plan.nullability;
790 out.push(FieldEncoder::make_encoder(
791 array.as_ref(),
792 &col_plan.plan,
793 site_nullability,
794 )?);
795 }
796 Ok(out)
797 }
798
799 pub(crate) fn encode<W: Write>(
803 &self,
804 out: &mut W,
805 batch: &RecordBatch,
806 ) -> Result<(), ArrowError> {
807 let mut column_encoders = self.prepare_for_batch(batch)?;
808 let n = batch.num_rows();
809 match self.prefix {
810 Some(prefix) => {
811 for row in 0..n {
812 out.write_all(prefix.as_slice())
813 .map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?;
814 for enc in column_encoders.iter_mut() {
815 enc.encode(out, row)?;
816 }
817 }
818 }
819 None => {
820 for row in 0..n {
821 for enc in column_encoders.iter_mut() {
822 enc.encode(out, row)?;
823 }
824 }
825 }
826 }
827 Ok(())
828 }
829}
830
831fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
832 fields.iter().position(|f| f.name() == name)
833}
834
835fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
836 find_struct_child_index(fields, "value")
838 .or_else(|| find_struct_child_index(fields, "values"))
839 .or_else(|| if fields.len() == 2 { Some(1) } else { None })
840}
841
842impl FieldPlan {
843 fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, ArrowError> {
844 #[cfg(not(feature = "avro_custom_types"))]
845 if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() {
846 let values_nullability = avro_dt.nullability();
847 let value_site_dt: &AvroDataType = match avro_dt.codec() {
848 Codec::Union(branches, _, _) => branches
849 .iter()
850 .find(|b| !matches!(b.codec(), Codec::Null))
851 .ok_or_else(|| {
852 ArrowError::SchemaError(
853 "Avro union at RunEndEncoded site has no non-null branch".into(),
854 )
855 })?,
856 _ => avro_dt,
857 };
858 return Ok(FieldPlan::RunEndEncoded {
859 values_nullability,
860 value_plan: Box::new(FieldPlan::build(value_site_dt, values_field.as_ref())?),
861 });
862 }
863 if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
864 let ext_is_uuid = {
866 #[cfg(feature = "canonical_extension_types")]
867 {
868 matches!(
869 arrow_field.extension_type_name(),
870 Some("arrow.uuid") | Some("uuid")
871 )
872 }
873 #[cfg(not(feature = "canonical_extension_types"))]
874 {
875 false
876 }
877 };
878 let md_is_uuid = arrow_field
879 .metadata()
880 .get("logicalType")
881 .map(|s| s.as_str())
882 == Some("uuid");
883 if ext_is_uuid || md_is_uuid {
884 if *len != 16 {
885 return Err(ArrowError::InvalidArgumentError(
886 "logicalType=uuid requires FixedSizeBinary(16)".into(),
887 ));
888 }
889 return Ok(FieldPlan::Uuid);
890 }
891 }
892 match avro_dt.codec() {
893 Codec::Struct(avro_fields) => {
894 let fields = match arrow_field.data_type() {
895 DataType::Struct(struct_fields) => struct_fields,
896 other => {
897 return Err(ArrowError::SchemaError(format!(
898 "Avro struct maps to Arrow Struct, found: {other:?}"
899 )));
900 }
901 };
902 let mut bindings = Vec::with_capacity(avro_fields.len());
903 for avro_field in avro_fields.iter() {
904 let name = avro_field.name().to_string();
905 let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
906 ArrowError::SchemaError(format!(
907 "Struct field '{name}' not present in Arrow field '{}'",
908 arrow_field.name()
909 ))
910 })?;
911 bindings.push(FieldBinding {
912 arrow_index: idx,
913 nullability: avro_field.data_type().nullability(),
914 plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
915 });
916 }
917 Ok(FieldPlan::Struct { bindings })
918 }
919 Codec::List(items_dt) => match arrow_field.data_type() {
920 DataType::List(field_ref)
921 | DataType::LargeList(field_ref)
922 | DataType::ListView(field_ref)
923 | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
924 items_nullability: items_dt.nullability(),
925 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
926 }),
927 DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List {
928 items_nullability: items_dt.nullability(),
929 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
930 }),
931 other => Err(ArrowError::SchemaError(format!(
932 "Avro array maps to Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
933 ))),
934 },
935 Codec::Map(values_dt) => {
936 let entries_field = match arrow_field.data_type() {
937 DataType::Map(entries, _sorted) => entries.as_ref(),
938 other => {
939 return Err(ArrowError::SchemaError(format!(
940 "Avro map maps to Arrow DataType::Map, found: {other:?}"
941 )));
942 }
943 };
944 let entries_struct_fields = match entries_field.data_type() {
945 DataType::Struct(fs) => fs,
946 other => {
947 return Err(ArrowError::SchemaError(format!(
948 "Arrow Map entries must be Struct, found: {other:?}"
949 )));
950 }
951 };
952 let value_idx =
953 find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
954 ArrowError::SchemaError("Map entries struct missing value field".into())
955 })?;
956 let value_field = entries_struct_fields[value_idx].as_ref();
957 let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
958 Ok(FieldPlan::Map {
959 values_nullability: values_dt.nullability(),
960 value_plan: Box::new(value_plan),
961 })
962 }
963 Codec::Enum(symbols) => match arrow_field.data_type() {
964 DataType::Dictionary(key_dt, value_dt) => {
965 if **key_dt != DataType::Int32 {
966 return Err(ArrowError::SchemaError(
967 "Avro enum requires Dictionary<Int32, Utf8>".into(),
968 ));
969 }
970 if **value_dt != DataType::Utf8 {
971 return Err(ArrowError::SchemaError(
972 "Avro enum requires Dictionary<Int32, Utf8>".into(),
973 ));
974 }
975 Ok(FieldPlan::Enum {
976 symbols: symbols.clone(),
977 })
978 }
979 other => Err(ArrowError::SchemaError(format!(
980 "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
981 ))),
982 },
983 Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
985 let (ap, as_) = match arrow_field.data_type() {
986 #[cfg(feature = "small_decimals")]
987 DataType::Decimal32(p, s) => (*p as usize, *s as i32),
988 #[cfg(feature = "small_decimals")]
989 DataType::Decimal64(p, s) => (*p as usize, *s as i32),
990 DataType::Decimal128(p, s) => (*p as usize, *s as i32),
991 DataType::Decimal256(p, s) => (*p as usize, *s as i32),
992 other => {
993 return Err(ArrowError::SchemaError(format!(
994 "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
995 arrow_field.name()
996 )));
997 }
998 };
999 let sc = scale_opt.unwrap_or(0) as i32; if ap != *precision || as_ != sc {
1001 return Err(ArrowError::SchemaError(format!(
1002 "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
1003 arrow_field.name()
1004 )));
1005 }
1006 Ok(FieldPlan::Decimal {
1007 size: *fixed_size_opt,
1008 })
1009 }
1010 Codec::Interval => match arrow_field.data_type() {
1011 DataType::Interval(
1012 IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime,
1013 ) => Ok(FieldPlan::Scalar),
1014 other => Err(ArrowError::SchemaError(format!(
1015 "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}"
1016 ))),
1017 },
1018 Codec::Union(avro_branches, _, UnionMode::Dense) => {
1019 let arrow_union_fields = match arrow_field.data_type() {
1020 DataType::Union(fields, UnionMode::Dense) => fields,
1021 DataType::Union(_, UnionMode::Sparse) => {
1022 return Err(ArrowError::NotYetImplemented(
1023 "Sparse Arrow unions are not yet supported".to_string(),
1024 ));
1025 }
1026 other => {
1027 return Err(ArrowError::SchemaError(format!(
1028 "Avro union maps to Arrow Union, found: {other:?}"
1029 )));
1030 }
1031 };
1032 if avro_branches.len() != arrow_union_fields.len() {
1033 return Err(ArrowError::SchemaError(format!(
1034 "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'",
1035 avro_branches.len(),
1036 arrow_union_fields.len(),
1037 arrow_field.name()
1038 )));
1039 }
1040 let bindings = avro_branches
1041 .iter()
1042 .zip(arrow_union_fields.iter())
1043 .enumerate()
1044 .map(|(i, (avro_branch, (_, arrow_child_field)))| {
1045 Ok(FieldBinding {
1046 arrow_index: i,
1047 nullability: avro_branch.nullability(),
1048 plan: FieldPlan::build(avro_branch, arrow_child_field)?,
1049 })
1050 })
1051 .collect::<Result<Vec<_>, ArrowError>>()?;
1052 Ok(FieldPlan::Union { bindings })
1053 }
1054 Codec::Union(_, _, UnionMode::Sparse) => Err(ArrowError::NotYetImplemented(
1055 "Sparse Arrow unions are not yet supported".to_string(),
1056 )),
1057 #[cfg(feature = "avro_custom_types")]
1058 Codec::RunEndEncoded(values_dt, _width_code) => {
1059 let values_field = match arrow_field.data_type() {
1060 DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(),
1061 other => {
1062 return Err(ArrowError::SchemaError(format!(
1063 "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}"
1064 )));
1065 }
1066 };
1067 Ok(FieldPlan::RunEndEncoded {
1068 values_nullability: values_dt.nullability(),
1069 value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), values_field)?),
1070 })
1071 }
1072 _ => Ok(FieldPlan::Scalar),
1073 }
1074 }
1075}
1076
1077enum Encoder<'a> {
1078 Boolean(BooleanEncoder<'a>),
1079 Int(IntEncoder<'a, Int32Type>),
1080 Long(LongEncoder<'a, Int64Type>),
1081 TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
1082 TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
1083 TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
1084 TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
1085 Date32(IntEncoder<'a, Date32Type>),
1086 Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
1087 Time32Millis(IntEncoder<'a, Time32MillisecondType>),
1088 Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
1089 DurationSeconds(LongEncoder<'a, DurationSecondType>),
1090 DurationMillis(LongEncoder<'a, DurationMillisecondType>),
1091 DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
1092 DurationNanos(LongEncoder<'a, DurationNanosecondType>),
1093 Float32(F32Encoder<'a>),
1094 Float64(F64Encoder<'a>),
1095 Binary(BinaryEncoder<'a, i32>),
1096 LargeBinary(BinaryEncoder<'a, i64>),
1097 Utf8(Utf8Encoder<'a>),
1098 Utf8Large(Utf8LargeEncoder<'a>),
1099 Utf8View(Utf8ViewEncoder<'a>),
1100 BinaryView(BinaryViewEncoder<'a>),
1101 List(Box<ListEncoder32<'a>>),
1102 LargeList(Box<ListEncoder64<'a>>),
1103 ListView(Box<ListViewEncoder32<'a>>),
1104 LargeListView(Box<ListViewEncoder64<'a>>),
1105 FixedSizeList(Box<FixedSizeListEncoder<'a>>),
1106 Struct(Box<StructEncoder<'a>>),
1107 Fixed(FixedEncoder<'a>),
1109 Uuid(UuidEncoder<'a>),
1111 IntervalMonthDayNano(DurationEncoder<'a, IntervalMonthDayNanoType>),
1113 IntervalYearMonth(DurationEncoder<'a, IntervalYearMonthType>),
1115 IntervalDayTime(DurationEncoder<'a, IntervalDayTimeType>),
1117 #[cfg(feature = "small_decimals")]
1118 Decimal32(Decimal32Encoder<'a>),
1119 #[cfg(feature = "small_decimals")]
1120 Decimal64(Decimal64Encoder<'a>),
1121 Decimal128(Decimal128Encoder<'a>),
1122 Decimal256(Decimal256Encoder<'a>),
1123 Enum(EnumEncoder<'a>),
1125 Map(Box<MapEncoder<'a>>),
1126 Union(Box<UnionEncoder<'a>>),
1127 RunEncoded16(Box<RunEncodedEncoder16<'a>>),
1129 RunEncoded32(Box<RunEncodedEncoder32<'a>>),
1130 RunEncoded64(Box<RunEncodedEncoder64<'a>>),
1131 Null,
1132}
1133
1134impl<'a> Encoder<'a> {
1135 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1137 match self {
1138 Encoder::Boolean(e) => e.encode(out, idx),
1139 Encoder::Int(e) => e.encode(out, idx),
1140 Encoder::Long(e) => e.encode(out, idx),
1141 Encoder::TimestampMicros(e) => e.encode(out, idx),
1142 Encoder::TimestampMillis(e) => e.encode(out, idx),
1143 Encoder::TimestampNanos(e) => e.encode(out, idx),
1144 Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
1145 Encoder::Date32(e) => e.encode(out, idx),
1146 Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
1147 Encoder::Time32Millis(e) => e.encode(out, idx),
1148 Encoder::Time64Micros(e) => e.encode(out, idx),
1149 Encoder::DurationSeconds(e) => e.encode(out, idx),
1150 Encoder::DurationMicros(e) => e.encode(out, idx),
1151 Encoder::DurationMillis(e) => e.encode(out, idx),
1152 Encoder::DurationNanos(e) => e.encode(out, idx),
1153 Encoder::Float32(e) => e.encode(out, idx),
1154 Encoder::Float64(e) => e.encode(out, idx),
1155 Encoder::Binary(e) => e.encode(out, idx),
1156 Encoder::LargeBinary(e) => e.encode(out, idx),
1157 Encoder::Utf8(e) => e.encode(out, idx),
1158 Encoder::Utf8Large(e) => e.encode(out, idx),
1159 Encoder::Utf8View(e) => e.encode(out, idx),
1160 Encoder::BinaryView(e) => e.encode(out, idx),
1161 Encoder::List(e) => e.encode(out, idx),
1162 Encoder::LargeList(e) => e.encode(out, idx),
1163 Encoder::ListView(e) => e.encode(out, idx),
1164 Encoder::LargeListView(e) => e.encode(out, idx),
1165 Encoder::FixedSizeList(e) => e.encode(out, idx),
1166 Encoder::Struct(e) => e.encode(out, idx),
1167 Encoder::Fixed(e) => (e).encode(out, idx),
1168 Encoder::Uuid(e) => (e).encode(out, idx),
1169 Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx),
1170 Encoder::IntervalYearMonth(e) => (e).encode(out, idx),
1171 Encoder::IntervalDayTime(e) => (e).encode(out, idx),
1172 #[cfg(feature = "small_decimals")]
1173 Encoder::Decimal32(e) => (e).encode(out, idx),
1174 #[cfg(feature = "small_decimals")]
1175 Encoder::Decimal64(e) => (e).encode(out, idx),
1176 Encoder::Decimal128(e) => (e).encode(out, idx),
1177 Encoder::Decimal256(e) => (e).encode(out, idx),
1178 Encoder::Map(e) => (e).encode(out, idx),
1179 Encoder::Enum(e) => (e).encode(out, idx),
1180 Encoder::Union(e) => (e).encode(out, idx),
1181 Encoder::RunEncoded16(e) => (e).encode(out, idx),
1182 Encoder::RunEncoded32(e) => (e).encode(out, idx),
1183 Encoder::RunEncoded64(e) => (e).encode(out, idx),
1184 Encoder::Null => Ok(()),
1185 }
1186 }
1187}
1188
1189struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
1190impl BooleanEncoder<'_> {
1191 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1192 write_bool(out, self.0.value(idx))
1193 }
1194}
1195
1196struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
1198impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
1199 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1200 write_int(out, self.0.value(idx))
1201 }
1202}
1203
1204struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
1206impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
1207 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1208 write_long(out, self.0.value(idx))
1209 }
1210}
1211
1212struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
1214impl<'a> Time32SecondsToMillisEncoder<'a> {
1215 #[inline]
1216 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1217 let secs = self.0.value(idx);
1218 let millis = secs.checked_mul(1000).ok_or_else(|| {
1219 ArrowError::InvalidArgumentError("time32(secs) * 1000 overflowed".into())
1220 })?;
1221 write_int(out, millis)
1222 }
1223}
1224
1225struct TimestampSecondsToMillisEncoder<'a>(&'a PrimitiveArray<TimestampSecondType>);
1227impl<'a> TimestampSecondsToMillisEncoder<'a> {
1228 #[inline]
1229 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1230 let secs = self.0.value(idx);
1231 let millis = secs.checked_mul(1000).ok_or_else(|| {
1232 ArrowError::InvalidArgumentError("timestamp(secs) * 1000 overflowed".into())
1233 })?;
1234 write_long(out, millis)
1235 }
1236}
1237
1238struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
1240impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
1241 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1242 write_len_prefixed(out, self.0.value(idx))
1243 }
1244}
1245
1246struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
1248impl BinaryViewEncoder<'_> {
1249 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1250 write_len_prefixed(out, self.0.value(idx))
1251 }
1252}
1253
1254struct Utf8ViewEncoder<'a>(&'a StringViewArray);
1256impl Utf8ViewEncoder<'_> {
1257 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1258 write_len_prefixed(out, self.0.value(idx).as_bytes())
1259 }
1260}
1261
1262struct F32Encoder<'a>(&'a arrow_array::Float32Array);
1263impl F32Encoder<'_> {
1264 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1265 let bits = self.0.value(idx).to_bits();
1267 out.write_all(&bits.to_le_bytes())
1268 .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e))
1269 }
1270}
1271
1272struct F64Encoder<'a>(&'a arrow_array::Float64Array);
1273impl F64Encoder<'_> {
1274 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1275 let bits = self.0.value(idx).to_bits();
1277 out.write_all(&bits.to_le_bytes())
1278 .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
1279 }
1280}
1281
1282struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
1283
1284impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
1285 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1286 write_len_prefixed(out, self.0.value(idx).as_bytes())
1287 }
1288}
1289
1290type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
1291type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
1292
1293enum KeyKind<'a> {
1295 Utf8(&'a GenericStringArray<i32>),
1296 LargeUtf8(&'a GenericStringArray<i64>),
1297}
1298struct MapEncoder<'a> {
1299 map: &'a MapArray,
1300 keys: KeyKind<'a>,
1301 values: FieldEncoder<'a>,
1302 keys_offset: usize,
1303 values_offset: usize,
1304}
1305
1306impl<'a> MapEncoder<'a> {
1307 fn try_new(
1308 map: &'a MapArray,
1309 values_nullability: Option<Nullability>,
1310 value_plan: &FieldPlan,
1311 ) -> Result<Self, ArrowError> {
1312 let keys_arr = map.keys();
1313 let keys_kind = match keys_arr.data_type() {
1314 DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
1315 DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
1316 other => {
1317 return Err(ArrowError::SchemaError(format!(
1318 "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
1319 )));
1320 }
1321 };
1322 Ok(Self {
1323 map,
1324 keys: keys_kind,
1325 values: FieldEncoder::make_encoder(
1326 map.values().as_ref(),
1327 value_plan,
1328 values_nullability,
1329 )?,
1330 keys_offset: keys_arr.offset(),
1331 values_offset: map.values().offset(),
1332 })
1333 }
1334
1335 fn encode_map_entries<W, O>(
1336 out: &mut W,
1337 keys: &GenericStringArray<O>,
1338 keys_offset: usize,
1339 start: usize,
1340 end: usize,
1341 mut write_item: impl FnMut(&mut W, usize) -> Result<(), ArrowError>,
1342 ) -> Result<(), ArrowError>
1343 where
1344 W: Write + ?Sized,
1345 O: OffsetSizeTrait,
1346 {
1347 encode_blocked_range(out, start, end, |out, j| {
1348 let j_key = j.saturating_sub(keys_offset);
1349 write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1350 write_item(out, j)
1351 })
1352 }
1353
1354 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1355 let offsets = self.map.offsets();
1356 let start = offsets[idx] as usize;
1357 let end = offsets[idx + 1] as usize;
1358 let write_item = |out: &mut W, j: usize| {
1359 let j_val = j.saturating_sub(self.values_offset);
1360 self.values.encode(out, j_val)
1361 };
1362 match self.keys {
1363 KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1364 out,
1365 arr,
1366 self.keys_offset,
1367 start,
1368 end,
1369 write_item,
1370 ),
1371 KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1372 out,
1373 arr,
1374 self.keys_offset,
1375 start,
1376 end,
1377 write_item,
1378 ),
1379 }
1380 }
1381}
1382
1383struct EnumEncoder<'a> {
1390 keys: &'a PrimitiveArray<Int32Type>,
1391}
1392impl EnumEncoder<'_> {
1393 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), ArrowError> {
1394 write_int(out, self.keys.value(row))
1395 }
1396}
1397
1398struct UnionEncoder<'a> {
1399 encoders: Vec<FieldEncoder<'a>>,
1400 array: &'a UnionArray,
1401 type_id_to_encoder_index: Vec<Option<usize>>,
1402}
1403
1404impl<'a> UnionEncoder<'a> {
1405 fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result<Self, ArrowError> {
1406 let DataType::Union(fields, UnionMode::Dense) = array.data_type() else {
1407 return Err(ArrowError::SchemaError("Expected Dense UnionArray".into()));
1408 };
1409 if fields.len() != field_bindings.len() {
1410 return Err(ArrowError::SchemaError(format!(
1411 "Mismatched number of union branches between Arrow array ({}) and encoding plan ({})",
1412 fields.len(),
1413 field_bindings.len()
1414 )));
1415 }
1416 let max_type_id = fields.iter().map(|(tid, _)| tid).max().unwrap_or(0);
1417 let mut type_id_to_encoder_index: Vec<Option<usize>> =
1418 vec![None; (max_type_id + 1) as usize];
1419 let mut encoders = Vec::with_capacity(fields.len());
1420 for (i, (type_id, _)) in fields.iter().enumerate() {
1421 let binding = field_bindings
1422 .get(i)
1423 .ok_or_else(|| ArrowError::SchemaError("Binding and field mismatch".to_string()))?;
1424 encoders.push(FieldEncoder::make_encoder(
1425 array.child(type_id).as_ref(),
1426 &binding.plan,
1427 binding.nullability,
1428 )?);
1429 type_id_to_encoder_index[type_id as usize] = Some(i);
1430 }
1431 Ok(Self {
1432 encoders,
1433 array,
1434 type_id_to_encoder_index,
1435 })
1436 }
1437
1438 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1439 let type_id = self.array.type_ids()[idx];
1444 let encoder_index = self
1445 .type_id_to_encoder_index
1446 .get(type_id as usize)
1447 .and_then(|opt| *opt)
1448 .ok_or_else(|| ArrowError::SchemaError(format!("Invalid type_id {type_id}")))?;
1449 write_int(out, encoder_index as i32)?;
1450 let encoder = self.encoders.get_mut(encoder_index).ok_or_else(|| {
1451 ArrowError::SchemaError(format!("Invalid encoder index {encoder_index}"))
1452 })?;
1453 encoder.encode(out, self.array.value_offset(idx))
1454 }
1455}
1456
1457struct StructEncoder<'a> {
1458 encoders: Vec<FieldEncoder<'a>>,
1459}
1460
1461impl<'a> StructEncoder<'a> {
1462 fn try_new(
1463 array: &'a StructArray,
1464 field_bindings: &[FieldBinding],
1465 ) -> Result<Self, ArrowError> {
1466 let mut encoders = Vec::with_capacity(field_bindings.len());
1467 for field_binding in field_bindings {
1468 let idx = field_binding.arrow_index;
1469 let column = array.columns().get(idx).ok_or_else(|| {
1470 ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1471 })?;
1472 let encoder = FieldEncoder::make_encoder(
1473 column.as_ref(),
1474 &field_binding.plan,
1475 field_binding.nullability,
1476 )?;
1477 encoders.push(encoder);
1478 }
1479 Ok(Self { encoders })
1480 }
1481
1482 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1483 for encoder in self.encoders.iter_mut() {
1484 encoder.encode(out, idx)?;
1485 }
1486 Ok(())
1487 }
1488}
1489
1490fn encode_blocked_range<W: Write + ?Sized, F>(
1494 out: &mut W,
1495 start: usize,
1496 end: usize,
1497 mut write_item: F,
1498) -> Result<(), ArrowError>
1499where
1500 F: FnMut(&mut W, usize) -> Result<(), ArrowError>,
1501{
1502 let len = end.saturating_sub(start);
1503 if len == 0 {
1504 write_long(out, 0)?;
1506 return Ok(());
1507 }
1508 write_long(out, len as i64)?;
1510 for row in start..end {
1511 write_item(out, row)?;
1512 }
1513 write_long(out, 0)?;
1514 Ok(())
1515}
1516
1517struct ListEncoder<'a, O: OffsetSizeTrait> {
1518 list: &'a GenericListArray<O>,
1519 values: FieldEncoder<'a>,
1520 values_offset: usize,
1521}
1522
1523type ListEncoder32<'a> = ListEncoder<'a, i32>;
1524type ListEncoder64<'a> = ListEncoder<'a, i64>;
1525
1526impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
1527 fn try_new(
1528 list: &'a GenericListArray<O>,
1529 items_nullability: Option<Nullability>,
1530 item_plan: &FieldPlan,
1531 ) -> Result<Self, ArrowError> {
1532 Ok(Self {
1533 list,
1534 values: FieldEncoder::make_encoder(
1535 list.values().as_ref(),
1536 item_plan,
1537 items_nullability,
1538 )?,
1539 values_offset: list.values().offset(),
1540 })
1541 }
1542
1543 fn encode_list_range<W: Write + ?Sized>(
1544 &mut self,
1545 out: &mut W,
1546 start: usize,
1547 end: usize,
1548 ) -> Result<(), ArrowError> {
1549 encode_blocked_range(out, start, end, |out, row| {
1550 self.values
1551 .encode(out, row.saturating_sub(self.values_offset))
1552 })
1553 }
1554
1555 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1556 let offsets = self.list.offsets();
1557 let start = offsets[idx].to_usize().ok_or_else(|| {
1558 ArrowError::InvalidArgumentError(format!("Error converting offset[{idx}] to usize"))
1559 })?;
1560 let end = offsets[idx + 1].to_usize().ok_or_else(|| {
1561 ArrowError::InvalidArgumentError(format!(
1562 "Error converting offset[{}] to usize",
1563 idx + 1
1564 ))
1565 })?;
1566 self.encode_list_range(out, start, end)
1567 }
1568}
1569
1570struct ListViewEncoder<'a, O: OffsetSizeTrait> {
1572 list: &'a GenericListViewArray<O>,
1573 values: FieldEncoder<'a>,
1574 values_offset: usize,
1575}
1576type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
1577type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
1578
1579impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
1580 fn try_new(
1581 list: &'a GenericListViewArray<O>,
1582 items_nullability: Option<Nullability>,
1583 item_plan: &FieldPlan,
1584 ) -> Result<Self, ArrowError> {
1585 Ok(Self {
1586 list,
1587 values: FieldEncoder::make_encoder(
1588 list.values().as_ref(),
1589 item_plan,
1590 items_nullability,
1591 )?,
1592 values_offset: list.values().offset(),
1593 })
1594 }
1595
1596 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1597 let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
1598 ArrowError::InvalidArgumentError(format!(
1599 "Error converting value_offset[{idx}] to usize"
1600 ))
1601 })?;
1602 let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
1603 ArrowError::InvalidArgumentError(format!("Error converting value_size[{idx}] to usize"))
1604 })?;
1605 let start = start + self.values_offset;
1606 let end = start + len;
1607 encode_blocked_range(out, start, end, |out, row| {
1608 self.values
1609 .encode(out, row.saturating_sub(self.values_offset))
1610 })
1611 }
1612}
1613
1614struct FixedSizeListEncoder<'a> {
1616 list: &'a FixedSizeListArray,
1617 values: FieldEncoder<'a>,
1618 values_offset: usize,
1619 elem_len: usize,
1620}
1621
1622impl<'a> FixedSizeListEncoder<'a> {
1623 fn try_new(
1624 list: &'a FixedSizeListArray,
1625 items_nullability: Option<Nullability>,
1626 item_plan: &FieldPlan,
1627 ) -> Result<Self, ArrowError> {
1628 Ok(Self {
1629 list,
1630 values: FieldEncoder::make_encoder(
1631 list.values().as_ref(),
1632 item_plan,
1633 items_nullability,
1634 )?,
1635 values_offset: list.values().offset(),
1636 elem_len: list.value_length() as usize,
1637 })
1638 }
1639
1640 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1641 let rel = self.list.value_offset(idx) as usize;
1643 let start = self.values_offset + rel;
1644 let end = start + self.elem_len;
1645 encode_blocked_range(out, start, end, |out, row| {
1646 self.values
1647 .encode(out, row.saturating_sub(self.values_offset))
1648 })
1649 }
1650}
1651
1652struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
1655impl FixedEncoder<'_> {
1656 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1657 let v = self.0.value(idx); out.write_all(v)
1659 .map_err(|e| ArrowError::IoError(format!("write fixed bytes: {e}"), e))
1660 }
1661}
1662
1663struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
1666impl UuidEncoder<'_> {
1667 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1668 let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
1669 buf[0] = 0x48;
1670 let v = self.0.value(idx);
1671 let u = Uuid::from_slice(v)
1672 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid UUID bytes: {e}")))?;
1673 let _ = u.hyphenated().encode_lower(&mut buf[1..]);
1674 out.write_all(&buf)
1675 .map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e))
1676 }
1677}
1678
1679#[derive(Copy, Clone)]
1680struct DurationParts {
1681 months: u32,
1682 days: u32,
1683 millis: u32,
1684}
1685trait IntervalToDurationParts: ArrowPrimitiveType {
1687 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError>;
1688}
1689impl IntervalToDurationParts for IntervalMonthDayNanoType {
1690 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1691 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
1692 if months < 0 || days < 0 || nanos < 0 {
1693 return Err(ArrowError::InvalidArgumentError(
1694 "Avro 'duration' cannot encode negative months/days/nanoseconds".into(),
1695 ));
1696 }
1697 if nanos % 1_000_000 != 0 {
1698 return Err(ArrowError::InvalidArgumentError(
1699 "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000"
1700 .into(),
1701 ));
1702 }
1703 let millis = nanos / 1_000_000;
1704 if millis > u32::MAX as i64 {
1705 return Err(ArrowError::InvalidArgumentError(
1706 "Avro 'duration' milliseconds exceed u32::MAX".into(),
1707 ));
1708 }
1709 Ok(DurationParts {
1710 months: months as u32,
1711 days: days as u32,
1712 millis: millis as u32,
1713 })
1714 }
1715}
1716impl IntervalToDurationParts for IntervalYearMonthType {
1717 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1718 if native < 0 {
1719 return Err(ArrowError::InvalidArgumentError(
1720 "Avro 'duration' cannot encode negative months".into(),
1721 ));
1722 }
1723 Ok(DurationParts {
1724 months: native as u32,
1725 days: 0,
1726 millis: 0,
1727 })
1728 }
1729}
1730impl IntervalToDurationParts for IntervalDayTimeType {
1731 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1732 let (days, millis) = IntervalDayTimeType::to_parts(native);
1733 if days < 0 || millis < 0 {
1734 return Err(ArrowError::InvalidArgumentError(
1735 "Avro 'duration' cannot encode negative days or milliseconds".into(),
1736 ));
1737 }
1738 Ok(DurationParts {
1739 months: 0,
1740 days: days as u32,
1741 millis: millis as u32,
1742 })
1743 }
1744}
1745
1746struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
1749impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
1750 #[inline(always)]
1751 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1752 let parts = P::duration_parts(self.0.value(idx))?;
1753 let months = parts.months.to_le_bytes();
1754 let days = parts.days.to_le_bytes();
1755 let ms = parts.millis.to_le_bytes();
1756 let buf = [
1772 months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
1773 ms[1], ms[2], ms[3],
1774 ];
1775 out.write_all(&buf)
1776 .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e))
1777 }
1778}
1779
1780trait DecimalBeBytes<const N: usize> {
1783 fn value_be_bytes(&self, idx: usize) -> [u8; N];
1784}
1785#[cfg(feature = "small_decimals")]
1786impl DecimalBeBytes<4> for Decimal32Array {
1787 fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
1788 self.value(idx).to_be_bytes()
1789 }
1790}
1791#[cfg(feature = "small_decimals")]
1792impl DecimalBeBytes<8> for Decimal64Array {
1793 fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
1794 self.value(idx).to_be_bytes()
1795 }
1796}
1797impl DecimalBeBytes<16> for Decimal128Array {
1798 fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
1799 self.value(idx).to_be_bytes()
1800 }
1801}
1802impl DecimalBeBytes<32> for Decimal256Array {
1803 fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
1804 self.value(idx).to_be_bytes()
1806 }
1807}
1808
1809struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
1815 arr: &'a A,
1816 fixed_size: Option<usize>,
1817}
1818
1819impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
1820 fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
1821 Self { arr, fixed_size }
1822 }
1823
1824 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1825 let be = self.arr.value_be_bytes(idx);
1826 match self.fixed_size {
1827 Some(n) => write_sign_extended(out, &be, n),
1828 None => write_len_prefixed(out, minimal_twos_complement(&be)),
1829 }
1830 }
1831}
1832
1833#[cfg(feature = "small_decimals")]
1834type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
1835#[cfg(feature = "small_decimals")]
1836type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
1837type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
1838type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
1839
1840struct RunEncodedEncoder<'a, R: RunEndIndexType> {
1844 ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
1845 base: usize,
1846 len: usize,
1847 values: FieldEncoder<'a>,
1848 cur_run: usize,
1850 cur_end: usize,
1852}
1853
1854type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
1855type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
1856type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
1857
1858impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
1859 fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
1860 let ends = arr.run_ends();
1861 let base = ends.get_start_physical_index();
1862 let slice = ends.values();
1863 let len = ends.len();
1864 let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
1865 Self {
1866 ends_slice: slice,
1867 base,
1868 len,
1869 values,
1870 cur_run: 0,
1871 cur_end,
1872 }
1873 }
1874
1875 #[inline(always)]
1878 fn advance_to_row(&mut self, idx: usize) -> Result<(), ArrowError> {
1879 if idx < self.cur_end {
1880 return Ok(());
1881 }
1882 while self.cur_run + 1 < self.len && idx >= self.cur_end {
1884 self.cur_run += 1;
1885 self.cur_end = self.ends_slice[self.base + self.cur_run].as_usize();
1886 }
1887 if idx < self.cur_end {
1888 Ok(())
1889 } else {
1890 Err(ArrowError::InvalidArgumentError(format!(
1891 "row index {idx} out of bounds for run-ends ({} runs)",
1892 self.len
1893 )))
1894 }
1895 }
1896
1897 #[inline(always)]
1898 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1899 self.advance_to_row(idx)?;
1900 self.values.encode(out, self.cur_run)
1903 }
1904}
1905
1906#[cfg(test)]
1907mod tests {
1908 use super::*;
1909 use arrow_array::types::Int32Type;
1910 use arrow_array::{
1911 Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
1912 Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, NullArray,
1913 StringArray,
1914 };
1915 use arrow_buffer::Buffer;
1916 use arrow_schema::{DataType, Field, Fields, UnionFields};
1917
1918 fn zigzag_i64(v: i64) -> u64 {
1919 ((v << 1) ^ (v >> 63)) as u64
1920 }
1921
1922 fn varint(mut x: u64) -> Vec<u8> {
1923 let mut out = Vec::new();
1924 while (x & !0x7f) != 0 {
1925 out.push(((x & 0x7f) as u8) | 0x80);
1926 x >>= 7;
1927 }
1928 out.push((x & 0x7f) as u8);
1929 out
1930 }
1931
1932 fn avro_long_bytes(v: i64) -> Vec<u8> {
1933 varint(zigzag_i64(v))
1934 }
1935
1936 fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
1937 let mut out = avro_long_bytes(payload.len() as i64);
1938 out.extend_from_slice(payload);
1939 out
1940 }
1941
1942 fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
1943 let m = months.to_le_bytes();
1944 let d = days.to_le_bytes();
1945 let ms = millis.to_le_bytes();
1946 [
1947 m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
1948 ]
1949 }
1950
1951 fn encode_all(
1952 array: &dyn Array,
1953 plan: &FieldPlan,
1954 nullability: Option<Nullability>,
1955 ) -> Vec<u8> {
1956 let mut enc = FieldEncoder::make_encoder(array, plan, nullability).unwrap();
1957 let mut out = Vec::new();
1958 for i in 0..array.len() {
1959 enc.encode(&mut out, i).unwrap();
1960 }
1961 out
1962 }
1963
1964 fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
1965 if actual != expected {
1966 let to_hex = |b: &[u8]| {
1967 b.iter()
1968 .map(|x| format!("{:02X}", x))
1969 .collect::<Vec<_>>()
1970 .join(" ")
1971 };
1972 panic!(
1973 "mismatch\n expected: [{}]\n actual: [{}]",
1974 to_hex(expected),
1975 to_hex(actual)
1976 );
1977 }
1978 }
1979
1980 #[test]
1981 fn binary_encoder() {
1982 let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
1983 let arr = BinaryArray::from_vec(values);
1984 let mut expected = Vec::new();
1985 for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
1986 expected.extend(avro_len_prefixed_bytes(payload));
1987 }
1988 let got = encode_all(&arr, &FieldPlan::Scalar, None);
1989 assert_bytes_eq(&got, &expected);
1990 }
1991
1992 #[test]
1993 fn large_binary_encoder() {
1994 let values: Vec<&[u8]> = vec![b"xyz", b""];
1995 let arr = LargeBinaryArray::from_vec(values);
1996 let mut expected = Vec::new();
1997 for payload in [b"xyz" as &[u8], b""] {
1998 expected.extend(avro_len_prefixed_bytes(payload));
1999 }
2000 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2001 assert_bytes_eq(&got, &expected);
2002 }
2003
2004 #[test]
2005 fn utf8_encoder() {
2006 let arr = StringArray::from(vec!["", "A", "BC"]);
2007 let mut expected = Vec::new();
2008 for s in ["", "A", "BC"] {
2009 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2010 }
2011 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2012 assert_bytes_eq(&got, &expected);
2013 }
2014
2015 #[test]
2016 fn large_utf8_encoder() {
2017 let arr = LargeStringArray::from(vec!["hello", ""]);
2018 let mut expected = Vec::new();
2019 for s in ["hello", ""] {
2020 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2021 }
2022 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2023 assert_bytes_eq(&got, &expected);
2024 }
2025
2026 #[test]
2027 fn list_encoder_int32() {
2028 let values = Int32Array::from(vec![1, 2, 3]);
2030 let offsets = vec![0, 2, 2, 3];
2031 let list = ListArray::new(
2032 Field::new("item", DataType::Int32, true).into(),
2033 arrow_buffer::OffsetBuffer::new(offsets.into()),
2034 Arc::new(values) as ArrayRef,
2035 None,
2036 );
2037 let mut expected = Vec::new();
2039 expected.extend(avro_long_bytes(2));
2041 expected.extend(avro_long_bytes(1));
2042 expected.extend(avro_long_bytes(2));
2043 expected.extend(avro_long_bytes(0));
2044 expected.extend(avro_long_bytes(0));
2046 expected.extend(avro_long_bytes(1));
2048 expected.extend(avro_long_bytes(3));
2049 expected.extend(avro_long_bytes(0));
2050
2051 let plan = FieldPlan::List {
2052 items_nullability: None,
2053 item_plan: Box::new(FieldPlan::Scalar),
2054 };
2055 let got = encode_all(&list, &plan, None);
2056 assert_bytes_eq(&got, &expected);
2057 }
2058
2059 #[test]
2060 fn struct_encoder_two_fields() {
2061 let a = Int32Array::from(vec![1, 2]);
2063 let b = StringArray::from(vec!["x", "y"]);
2064 let fields = Fields::from(vec![
2065 Field::new("a", DataType::Int32, true),
2066 Field::new("b", DataType::Utf8, true),
2067 ]);
2068 let struct_arr = StructArray::new(
2069 fields.clone(),
2070 vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2071 None,
2072 );
2073 let plan = FieldPlan::Struct {
2074 bindings: vec![
2075 FieldBinding {
2076 arrow_index: 0,
2077 nullability: None,
2078 plan: FieldPlan::Scalar,
2079 },
2080 FieldBinding {
2081 arrow_index: 1,
2082 nullability: None,
2083 plan: FieldPlan::Scalar,
2084 },
2085 ],
2086 };
2087 let got = encode_all(&struct_arr, &plan, None);
2088 let mut expected = Vec::new();
2090 expected.extend(avro_long_bytes(1)); expected.extend(avro_len_prefixed_bytes(b"x")); expected.extend(avro_long_bytes(2)); expected.extend(avro_len_prefixed_bytes(b"y")); assert_bytes_eq(&got, &expected);
2095 }
2096
2097 #[test]
2098 fn enum_encoder_dictionary() {
2099 let dict_values = StringArray::from(vec!["A", "B", "C"]);
2101 let keys = Int32Array::from(vec![2, 0, 1]);
2102 let dict =
2103 DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
2104 let symbols = Arc::<[String]>::from(
2105 vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
2106 );
2107 let plan = FieldPlan::Enum { symbols };
2108 let got = encode_all(&dict, &plan, None);
2109 let mut expected = Vec::new();
2110 expected.extend(avro_long_bytes(2));
2111 expected.extend(avro_long_bytes(0));
2112 expected.extend(avro_long_bytes(1));
2113 assert_bytes_eq(&got, &expected);
2114 }
2115
2116 #[test]
2117 fn decimal_bytes_and_fixed() {
2118 let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
2120 .with_precision_and_scale(20, 0)
2121 .unwrap();
2122 let plan_bytes = FieldPlan::Decimal { size: None };
2124 let got_bytes = encode_all(&dec, &plan_bytes, None);
2125 let mut expected_bytes = Vec::new();
2127 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2128 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2129 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2130 assert_bytes_eq(&got_bytes, &expected_bytes);
2131
2132 let plan_fixed = FieldPlan::Decimal { size: Some(16) };
2133 let got_fixed = encode_all(&dec, &plan_fixed, None);
2134 let mut expected_fixed = Vec::new();
2135 expected_fixed.extend_from_slice(&1i128.to_be_bytes());
2136 expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
2137 expected_fixed.extend_from_slice(&0i128.to_be_bytes());
2138 assert_bytes_eq(&got_fixed, &expected_fixed);
2139 }
2140
2141 #[test]
2142 fn decimal_bytes_256() {
2143 use arrow_buffer::i256;
2144 let dec = Decimal256Array::from(vec![
2146 i256::from_i128(1),
2147 i256::from_i128(-1),
2148 i256::from_i128(0),
2149 ])
2150 .with_precision_and_scale(76, 0)
2151 .unwrap();
2152 let plan_bytes = FieldPlan::Decimal { size: None };
2154 let got_bytes = encode_all(&dec, &plan_bytes, None);
2155 let mut expected_bytes = Vec::new();
2157 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2158 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2159 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2160 assert_bytes_eq(&got_bytes, &expected_bytes);
2161
2162 let plan_fixed = FieldPlan::Decimal { size: Some(32) };
2164 let got_fixed = encode_all(&dec, &plan_fixed, None);
2165 let mut expected_fixed = Vec::new();
2166 expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
2167 expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
2168 expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
2169 assert_bytes_eq(&got_fixed, &expected_fixed);
2170 }
2171
2172 #[cfg(feature = "small_decimals")]
2173 #[test]
2174 fn decimal_bytes_and_fixed_32() {
2175 let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
2177 .with_precision_and_scale(9, 0)
2178 .unwrap();
2179 let plan_bytes = FieldPlan::Decimal { size: None };
2181 let got_bytes = encode_all(&dec, &plan_bytes, None);
2182 let mut expected_bytes = Vec::new();
2183 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2184 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2185 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2186 assert_bytes_eq(&got_bytes, &expected_bytes);
2187 let plan_fixed = FieldPlan::Decimal { size: Some(4) };
2189 let got_fixed = encode_all(&dec, &plan_fixed, None);
2190 let mut expected_fixed = Vec::new();
2191 expected_fixed.extend_from_slice(&1i32.to_be_bytes());
2192 expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
2193 expected_fixed.extend_from_slice(&0i32.to_be_bytes());
2194 assert_bytes_eq(&got_fixed, &expected_fixed);
2195 }
2196
2197 #[cfg(feature = "small_decimals")]
2198 #[test]
2199 fn decimal_bytes_and_fixed_64() {
2200 let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
2202 .with_precision_and_scale(18, 0)
2203 .unwrap();
2204 let plan_bytes = FieldPlan::Decimal { size: None };
2206 let got_bytes = encode_all(&dec, &plan_bytes, None);
2207 let mut expected_bytes = Vec::new();
2208 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2209 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2210 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2211 assert_bytes_eq(&got_bytes, &expected_bytes);
2212 let plan_fixed = FieldPlan::Decimal { size: Some(8) };
2214 let got_fixed = encode_all(&dec, &plan_fixed, None);
2215 let mut expected_fixed = Vec::new();
2216 expected_fixed.extend_from_slice(&1i64.to_be_bytes());
2217 expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
2218 expected_fixed.extend_from_slice(&0i64.to_be_bytes());
2219 assert_bytes_eq(&got_fixed, &expected_fixed);
2220 }
2221
2222 #[test]
2223 fn float32_and_float64_encoders() {
2224 let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
2226 let mut expected32 = Vec::new();
2228 for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
2229 expected32.extend_from_slice(&v.to_bits().to_le_bytes());
2230 }
2231 let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
2232 assert_bytes_eq(&got32, &expected32);
2233 let mut expected64 = Vec::new();
2235 for v in [0.0f64, -2.25f64] {
2236 expected64.extend_from_slice(&v.to_bits().to_le_bytes());
2237 }
2238 let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
2239 assert_bytes_eq(&got64, &expected64);
2240 }
2241
2242 #[test]
2243 fn long_encoder_int64() {
2244 let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
2245 let mut expected = Vec::new();
2246 for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
2247 expected.extend(avro_long_bytes(v));
2248 }
2249 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2250 assert_bytes_eq(&got, &expected);
2251 }
2252
2253 #[test]
2254 fn fixed_encoder_plain() {
2255 let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
2257 let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
2258 let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
2259 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2260 let mut expected = Vec::new();
2261 expected.extend_from_slice(&data[0]);
2262 expected.extend_from_slice(&data[1]);
2263 assert_bytes_eq(&got, &expected);
2264 }
2265
2266 #[test]
2267 fn uuid_encoder_test() {
2268 let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
2270 let bytes = *u.as_bytes();
2271 let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
2272 let mut expected = Vec::new();
2274 expected.push(0x48);
2275 expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
2276 let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
2277 assert_bytes_eq(&got, &expected);
2278 }
2279
2280 #[test]
2281 fn uuid_encoder_error() {
2282 let arr =
2284 FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
2285 .unwrap();
2286 let plan = FieldPlan::Uuid;
2287 let mut enc = FieldEncoder::make_encoder(&arr, &plan, None).unwrap();
2288 let mut out = Vec::new();
2289 let err = enc.encode(&mut out, 0).unwrap_err();
2290 match err {
2291 ArrowError::InvalidArgumentError(msg) => {
2292 assert!(msg.contains("Invalid UUID bytes"))
2293 }
2294 other => panic!("expected InvalidArgumentError, got {other:?}"),
2295 }
2296 }
2297
2298 fn test_scalar_primitive_encoding<T>(
2299 non_nullable_data: &[T::Native],
2300 nullable_data: &[Option<T::Native>],
2301 ) where
2302 T: ArrowPrimitiveType,
2303 T::Native: Into<i64> + Copy,
2304 PrimitiveArray<T>: From<Vec<<T as ArrowPrimitiveType>::Native>>,
2305 {
2306 let plan = FieldPlan::Scalar;
2307
2308 let array = PrimitiveArray::<T>::from(non_nullable_data.to_vec());
2309 let got = encode_all(&array, &plan, None);
2310
2311 let mut expected = Vec::new();
2312 for &value in non_nullable_data {
2313 expected.extend(avro_long_bytes(value.into()));
2314 }
2315 assert_bytes_eq(&got, &expected);
2316
2317 let array_nullable: PrimitiveArray<T> = nullable_data.iter().copied().collect();
2318 let got_nullable = encode_all(&array_nullable, &plan, Some(Nullability::NullFirst));
2319
2320 let mut expected_nullable = Vec::new();
2321 for &opt_value in nullable_data {
2322 match opt_value {
2323 Some(value) => {
2324 expected_nullable.extend(avro_long_bytes(1));
2326 expected_nullable.extend(avro_long_bytes(value.into()));
2327 }
2328 None => {
2329 expected_nullable.extend(avro_long_bytes(0));
2331 }
2332 }
2333 }
2334 assert_bytes_eq(&got_nullable, &expected_nullable);
2335 }
2336
2337 #[test]
2338 fn date32_encoder() {
2339 test_scalar_primitive_encoding::<Date32Type>(
2340 &[
2341 19345, 0, -1, ],
2345 &[Some(19345), None],
2346 );
2347 }
2348
2349 #[test]
2350 fn time32_millis_encoder() {
2351 test_scalar_primitive_encoding::<Time32MillisecondType>(
2352 &[
2353 0, 49530123, 86399999, ],
2357 &[None, Some(49530123)],
2358 );
2359 }
2360
2361 #[test]
2362 fn time64_micros_encoder() {
2363 test_scalar_primitive_encoding::<Time64MicrosecondType>(
2364 &[
2365 0, 86399999999, ],
2368 &[Some(86399999999), None],
2369 );
2370 }
2371
2372 #[test]
2373 fn timestamp_millis_encoder() {
2374 test_scalar_primitive_encoding::<TimestampMillisecondType>(
2375 &[
2376 1704067200000, 0, -123456789, ],
2380 &[None, Some(1704067200000)],
2381 );
2382 }
2383
2384 #[test]
2385 fn map_encoder_string_keys_int_values() {
2386 let keys = StringArray::from(vec!["k1", "k2"]);
2390 let values = Int32Array::from(vec![1, 2]);
2391 let entries_fields = Fields::from(vec![
2392 Field::new("key", DataType::Utf8, false),
2393 Field::new("value", DataType::Int32, true),
2394 ]);
2395 let entries = StructArray::new(
2396 entries_fields,
2397 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2398 None,
2399 );
2400 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
2401 let map = MapArray::new(
2402 Field::new("entries", entries.data_type().clone(), false).into(),
2403 offsets,
2404 entries,
2405 None,
2406 false,
2407 );
2408 let plan = FieldPlan::Map {
2409 values_nullability: None,
2410 value_plan: Box::new(FieldPlan::Scalar),
2411 };
2412 let got = encode_all(&map, &plan, None);
2413 let mut expected = Vec::new();
2414 expected.extend(avro_long_bytes(2));
2416 expected.extend(avro_len_prefixed_bytes(b"k1"));
2417 expected.extend(avro_long_bytes(1));
2418 expected.extend(avro_len_prefixed_bytes(b"k2"));
2419 expected.extend(avro_long_bytes(2));
2420 expected.extend(avro_long_bytes(0));
2421 expected.extend(avro_long_bytes(0));
2423 assert_bytes_eq(&got, &expected);
2424 }
2425
2426 #[test]
2427 fn union_encoder_string_int() {
2428 let strings = StringArray::from(vec!["hello", "world"]);
2429 let ints = Int32Array::from(vec![10, 20, 30]);
2430
2431 let union_fields = UnionFields::try_new(
2432 vec![0, 1],
2433 vec![
2434 Field::new("v_str", DataType::Utf8, true),
2435 Field::new("v_int", DataType::Int32, true),
2436 ],
2437 )
2438 .unwrap();
2439
2440 let type_ids = Buffer::from_slice_ref([0_i8, 1, 1, 0, 1]);
2441 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2442
2443 let union_array = UnionArray::try_new(
2444 union_fields,
2445 type_ids.into(),
2446 Some(offsets.into()),
2447 vec![Arc::new(strings), Arc::new(ints)],
2448 )
2449 .unwrap();
2450
2451 let plan = FieldPlan::Union {
2452 bindings: vec![
2453 FieldBinding {
2454 arrow_index: 0,
2455 nullability: None,
2456 plan: FieldPlan::Scalar,
2457 },
2458 FieldBinding {
2459 arrow_index: 1,
2460 nullability: None,
2461 plan: FieldPlan::Scalar,
2462 },
2463 ],
2464 };
2465
2466 let got = encode_all(&union_array, &plan, None);
2467
2468 let mut expected = Vec::new();
2469 expected.extend(avro_long_bytes(0));
2470 expected.extend(avro_len_prefixed_bytes(b"hello"));
2471 expected.extend(avro_long_bytes(1));
2472 expected.extend(avro_long_bytes(10));
2473 expected.extend(avro_long_bytes(1));
2474 expected.extend(avro_long_bytes(20));
2475 expected.extend(avro_long_bytes(0));
2476 expected.extend(avro_len_prefixed_bytes(b"world"));
2477 expected.extend(avro_long_bytes(1));
2478 expected.extend(avro_long_bytes(30));
2479
2480 assert_bytes_eq(&got, &expected);
2481 }
2482
2483 #[test]
2484 fn union_encoder_null_string_int() {
2485 let nulls = NullArray::new(1);
2486 let strings = StringArray::from(vec!["hello"]);
2487 let ints = Int32Array::from(vec![10]);
2488
2489 let union_fields = UnionFields::try_new(
2490 vec![0, 1, 2],
2491 vec![
2492 Field::new("v_null", DataType::Null, true),
2493 Field::new("v_str", DataType::Utf8, true),
2494 Field::new("v_int", DataType::Int32, true),
2495 ],
2496 )
2497 .unwrap();
2498
2499 let type_ids = Buffer::from_slice_ref([0_i8, 1, 2]);
2500 let offsets = Buffer::from_slice_ref([0_i32, 0, 0]);
2504
2505 let union_array = UnionArray::try_new(
2506 union_fields,
2507 type_ids.into(),
2508 Some(offsets.into()),
2509 vec![Arc::new(nulls), Arc::new(strings), Arc::new(ints)],
2510 )
2511 .unwrap();
2512
2513 let plan = FieldPlan::Union {
2514 bindings: vec![
2515 FieldBinding {
2516 arrow_index: 0,
2517 nullability: None,
2518 plan: FieldPlan::Scalar,
2519 },
2520 FieldBinding {
2521 arrow_index: 1,
2522 nullability: None,
2523 plan: FieldPlan::Scalar,
2524 },
2525 FieldBinding {
2526 arrow_index: 2,
2527 nullability: None,
2528 plan: FieldPlan::Scalar,
2529 },
2530 ],
2531 };
2532
2533 let got = encode_all(&union_array, &plan, None);
2534
2535 let mut expected = Vec::new();
2536 expected.extend(avro_long_bytes(0));
2537 expected.extend(avro_long_bytes(1));
2538 expected.extend(avro_len_prefixed_bytes(b"hello"));
2539 expected.extend(avro_long_bytes(2));
2540 expected.extend(avro_long_bytes(10));
2541
2542 assert_bytes_eq(&got, &expected);
2543 }
2544
2545 #[test]
2546 fn list64_encoder_int32() {
2547 let values = Int32Array::from(vec![1, 2, 3]);
2549 let offsets: Vec<i64> = vec![0, 3, 3];
2550 let list = LargeListArray::new(
2551 Field::new("item", DataType::Int32, true).into(),
2552 arrow_buffer::OffsetBuffer::new(offsets.into()),
2553 Arc::new(values) as ArrayRef,
2554 None,
2555 );
2556 let plan = FieldPlan::List {
2557 items_nullability: None,
2558 item_plan: Box::new(FieldPlan::Scalar),
2559 };
2560 let got = encode_all(&list, &plan, None);
2561 let mut expected = Vec::new();
2563 expected.extend(avro_long_bytes(3));
2564 expected.extend(avro_long_bytes(1));
2565 expected.extend(avro_long_bytes(2));
2566 expected.extend(avro_long_bytes(3));
2567 expected.extend(avro_long_bytes(0));
2568 expected.extend(avro_long_bytes(0));
2569 assert_bytes_eq(&got, &expected);
2570 }
2571
2572 #[test]
2573 fn int_encoder_test() {
2574 let ints = Int32Array::from(vec![0, -1, 2]);
2575 let mut expected_i = Vec::new();
2576 for v in [0i32, -1, 2] {
2577 expected_i.extend(avro_long_bytes(v as i64));
2578 }
2579 let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
2580 assert_bytes_eq(&got_i, &expected_i);
2581 }
2582
2583 #[test]
2584 fn boolean_encoder_test() {
2585 let bools = BooleanArray::from(vec![true, false]);
2586 let mut expected_b = Vec::new();
2587 expected_b.extend_from_slice(&[1]);
2588 expected_b.extend_from_slice(&[0]);
2589 let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
2590 assert_bytes_eq(&got_b, &expected_b);
2591 }
2592
2593 #[test]
2594 #[cfg(feature = "avro_custom_types")]
2595 fn duration_encoding_seconds() {
2596 let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
2597 let mut expected = Vec::new();
2598 for v in [0i64, -1, 2] {
2599 expected.extend_from_slice(&avro_long_bytes(v));
2600 }
2601 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2602 assert_bytes_eq(&got, &expected);
2603 }
2604
2605 #[test]
2606 #[cfg(feature = "avro_custom_types")]
2607 fn duration_encoding_milliseconds() {
2608 let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
2609 let mut expected = Vec::new();
2610 for v in [1i64, 0, -2] {
2611 expected.extend_from_slice(&avro_long_bytes(v));
2612 }
2613 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2614 assert_bytes_eq(&got, &expected);
2615 }
2616
2617 #[test]
2618 #[cfg(feature = "avro_custom_types")]
2619 fn duration_encoding_microseconds() {
2620 let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
2621 let mut expected = Vec::new();
2622 for v in [5i64, -6, 7] {
2623 expected.extend_from_slice(&avro_long_bytes(v));
2624 }
2625 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2626 assert_bytes_eq(&got, &expected);
2627 }
2628
2629 #[test]
2630 #[cfg(feature = "avro_custom_types")]
2631 fn duration_encoding_nanoseconds() {
2632 let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
2633 let mut expected = Vec::new();
2634 for v in [8i64, 9, -10] {
2635 expected.extend_from_slice(&avro_long_bytes(v));
2636 }
2637 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2638 assert_bytes_eq(&got, &expected);
2639 }
2640
2641 #[test]
2642 fn duration_encoder_year_month_happy_path() {
2643 let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
2644 let mut expected = Vec::new();
2645 for m in [0u32, 1u32, 25u32] {
2646 expected.extend_from_slice(&duration_fixed12(m, 0, 0));
2647 }
2648 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2649 assert_bytes_eq(&got, &expected);
2650 }
2651
2652 #[test]
2653 fn duration_encoder_year_month_rejects_negative() {
2654 let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
2655 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2656 let mut out = Vec::new();
2657 let err = enc.encode(&mut out, 0).unwrap_err();
2658 match err {
2659 ArrowError::InvalidArgumentError(msg) => {
2660 assert!(msg.contains("cannot encode negative months"))
2661 }
2662 other => panic!("expected InvalidArgumentError, got {other:?}"),
2663 }
2664 }
2665
2666 #[test]
2667 fn duration_encoder_day_time_happy_path() {
2668 let v0 = IntervalDayTimeType::make_value(2, 500); let v1 = IntervalDayTimeType::make_value(0, 0);
2670 let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
2671 let mut expected = Vec::new();
2672 expected.extend_from_slice(&duration_fixed12(0, 2, 500));
2673 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2674 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2675 assert_bytes_eq(&got, &expected);
2676 }
2677
2678 #[test]
2679 fn duration_encoder_day_time_rejects_negative() {
2680 let bad = IntervalDayTimeType::make_value(-1, 0);
2681 let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
2682 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2683 let mut out = Vec::new();
2684 let err = enc.encode(&mut out, 0).unwrap_err();
2685 match err {
2686 ArrowError::InvalidArgumentError(msg) => {
2687 assert!(msg.contains("cannot encode negative days"))
2688 }
2689 other => panic!("expected InvalidArgumentError, got {other:?}"),
2690 }
2691 }
2692
2693 #[test]
2694 fn duration_encoder_month_day_nano_happy_path() {
2695 let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
2697 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
2698 let mut expected = Vec::new();
2699 expected.extend_from_slice(&duration_fixed12(1, 2, 3));
2700 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2701 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2702 assert_bytes_eq(&got, &expected);
2703 }
2704
2705 #[test]
2706 fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
2707 let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
2708 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
2709 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2710 let mut out = Vec::new();
2711 let err = enc.encode(&mut out, 0).unwrap_err();
2712 match err {
2713 ArrowError::InvalidArgumentError(msg) => {
2714 assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
2715 }
2716 other => panic!("expected InvalidArgumentError, got {other:?}"),
2717 }
2718 }
2719
2720 #[test]
2721 fn minimal_twos_complement_test() {
2722 let pos = [0x00, 0x00, 0x01];
2723 assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
2724 let neg = [0xFF, 0xFF, 0x80]; assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
2726 let zero = [0x00, 0x00, 0x00];
2727 assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
2728 }
2729
2730 #[test]
2731 fn write_sign_extend_test() {
2732 let mut out = Vec::new();
2733 write_sign_extended(&mut out, &[0x01], 4).unwrap();
2734 assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
2735 out.clear();
2736 write_sign_extended(&mut out, &[0xFF], 4).unwrap();
2737 assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
2738 out.clear();
2739 write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
2741 assert_eq!(out, vec![0xFF, 0x80]);
2742 out.clear();
2743 let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
2745 match err {
2746 ArrowError::InvalidArgumentError(_) => {}
2747 _ => panic!("expected InvalidArgumentError"),
2748 }
2749 }
2750
2751 #[test]
2752 fn duration_month_day_nano_overflow_millis() {
2753 let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
2755 let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
2756 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
2757 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2758 let mut out = Vec::new();
2759 let err = enc.encode(&mut out, 0).unwrap_err();
2760 match err {
2761 ArrowError::InvalidArgumentError(msg) => assert!(msg.contains("exceed u32::MAX")),
2762 _ => panic!("expected InvalidArgumentError"),
2763 }
2764 }
2765
2766 #[test]
2767 fn fieldplan_decimal_precision_scale_mismatch_errors() {
2768 use crate::codec::Codec;
2770 use std::collections::HashMap;
2771 let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
2772 let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2773 let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
2774 match err {
2775 ArrowError::SchemaError(msg) => {
2776 assert!(msg.contains("Decimal precision/scale mismatch"))
2777 }
2778 _ => panic!("expected SchemaError"),
2779 }
2780 }
2781
2782 #[test]
2783 fn timestamp_micros_encoder() {
2784 test_scalar_primitive_encoding::<TimestampMicrosecondType>(
2786 &[
2787 1_704_067_200_000_000, 0, -123_456_789, ],
2791 &[None, Some(1_704_067_200_000_000)],
2792 );
2793 }
2794
2795 #[test]
2796 fn list_encoder_nullable_items_null_first() {
2797 let values = Int32Array::from(vec![Some(1), None, Some(2)]);
2799 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
2800 let list = ListArray::new(
2801 Field::new("item", DataType::Int32, true).into(),
2802 offsets,
2803 Arc::new(values) as ArrayRef,
2804 None,
2805 );
2806
2807 let plan = FieldPlan::List {
2808 items_nullability: Some(Nullability::NullFirst),
2809 item_plan: Box::new(FieldPlan::Scalar),
2810 };
2811
2812 let mut expected = Vec::new();
2815 expected.extend(avro_long_bytes(3)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(2)); expected.extend(avro_long_bytes(0)); let got = encode_all(&list, &plan, None);
2824 assert_bytes_eq(&got, &expected);
2825 }
2826
2827 #[test]
2828 fn large_list_encoder_nullable_items_null_first() {
2829 let values = Int32Array::from(vec![Some(10), None]);
2831 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
2832 let list = LargeListArray::new(
2833 Field::new("item", DataType::Int32, true).into(),
2834 offsets,
2835 Arc::new(values) as ArrayRef,
2836 None,
2837 );
2838
2839 let plan = FieldPlan::List {
2840 items_nullability: Some(Nullability::NullFirst),
2841 item_plan: Box::new(FieldPlan::Scalar),
2842 };
2843
2844 let mut expected = Vec::new();
2845 expected.extend(avro_long_bytes(2)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(10)); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(0)); let got = encode_all(&list, &plan, None);
2852 assert_bytes_eq(&got, &expected);
2853 }
2854
2855 #[test]
2856 fn map_encoder_string_keys_nullable_int_values_null_first() {
2857 let keys = StringArray::from(vec!["k1", "k2"]);
2859 let values = Int32Array::from(vec![Some(7), None]);
2860
2861 let entries_fields = Fields::from(vec![
2862 Field::new("key", DataType::Utf8, false),
2863 Field::new("value", DataType::Int32, true),
2864 ]);
2865 let entries = StructArray::new(
2866 entries_fields,
2867 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2868 None,
2869 );
2870
2871 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
2873 let map = MapArray::new(
2874 Field::new("entries", entries.data_type().clone(), false).into(),
2875 offsets,
2876 entries,
2877 None,
2878 false,
2879 );
2880
2881 let plan = FieldPlan::Map {
2882 values_nullability: Some(Nullability::NullFirst),
2883 value_plan: Box::new(FieldPlan::Scalar),
2884 };
2885
2886 let mut expected = Vec::new();
2892 expected.extend(avro_long_bytes(2)); expected.extend(avro_len_prefixed_bytes(b"k1")); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(7)); expected.extend(avro_len_prefixed_bytes(b"k2")); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(0)); let got = encode_all(&map, &plan, None);
2901 assert_bytes_eq(&got, &expected);
2902 }
2903
2904 #[test]
2905 fn time32_seconds_to_millis_encoder() {
2906 let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
2908 vec![0i32, 1, -2, 12_345].into();
2909 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2910 let mut expected = Vec::new();
2911 for secs in [0i32, 1, -2, 12_345] {
2912 let millis = (secs as i64) * 1000;
2913 expected.extend_from_slice(&avro_long_bytes(millis));
2914 }
2915 assert_bytes_eq(&got, &expected);
2916 }
2917
2918 #[test]
2919 fn time32_seconds_to_millis_overflow() {
2920 let overflow_secs: i32 = i32::MAX / 1000 + 1;
2922 let arr: PrimitiveArray<Time32SecondType> = vec![overflow_secs].into();
2923 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2924 let mut out = Vec::new();
2925 let err = enc.encode(&mut out, 0).unwrap_err();
2926 match err {
2927 arrow_schema::ArrowError::InvalidArgumentError(msg) => {
2928 assert!(
2929 msg.contains("overflowed") || msg.contains("overflow"),
2930 "unexpected message: {msg}"
2931 )
2932 }
2933 other => panic!("expected InvalidArgumentError, got {other:?}"),
2934 }
2935 }
2936
2937 #[test]
2938 fn timestamp_seconds_to_millis_encoder() {
2939 let arr: PrimitiveArray<TimestampSecondType> = vec![0i64, 1, -1, 1_234_567_890].into();
2941 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2942 let mut expected = Vec::new();
2943 for secs in [0i64, 1, -1, 1_234_567_890] {
2944 let millis = secs * 1000;
2945 expected.extend_from_slice(&avro_long_bytes(millis));
2946 }
2947 assert_bytes_eq(&got, &expected);
2948 }
2949
2950 #[test]
2951 fn timestamp_seconds_to_millis_overflow() {
2952 let overflow_secs: i64 = i64::MAX / 1000 + 1;
2954 let arr: PrimitiveArray<TimestampSecondType> = vec![overflow_secs].into();
2955 let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2956 let mut out = Vec::new();
2957 let err = enc.encode(&mut out, 0).unwrap_err();
2958 match err {
2959 arrow_schema::ArrowError::InvalidArgumentError(msg) => {
2960 assert!(
2961 msg.contains("overflowed") || msg.contains("overflow"),
2962 "unexpected message: {msg}"
2963 )
2964 }
2965 other => panic!("expected InvalidArgumentError, got {other:?}"),
2966 }
2967 }
2968
2969 #[test]
2970 fn timestamp_nanos_encoder() {
2971 let arr: PrimitiveArray<TimestampNanosecondType> = vec![0i64, 1, -1, 123].into();
2972 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2973 let mut expected = Vec::new();
2974 for ns in [0i64, 1, -1, 123] {
2975 expected.extend_from_slice(&avro_long_bytes(ns));
2976 }
2977 assert_bytes_eq(&got, &expected);
2978 }
2979
2980 #[test]
2981 fn union_encoder_string_int_nonzero_type_ids() {
2982 let strings = StringArray::from(vec!["hello", "world"]);
2983 let ints = Int32Array::from(vec![10, 20, 30]);
2984 let union_fields = UnionFields::try_new(
2985 vec![2, 5],
2986 vec![
2987 Field::new("v_str", DataType::Utf8, true),
2988 Field::new("v_int", DataType::Int32, true),
2989 ],
2990 )
2991 .unwrap();
2992 let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
2993 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2994 let union_array = UnionArray::try_new(
2995 union_fields,
2996 type_ids.into(),
2997 Some(offsets.into()),
2998 vec![Arc::new(strings), Arc::new(ints)],
2999 )
3000 .unwrap();
3001 let plan = FieldPlan::Union {
3002 bindings: vec![
3003 FieldBinding {
3004 arrow_index: 0,
3005 nullability: None,
3006 plan: FieldPlan::Scalar,
3007 },
3008 FieldBinding {
3009 arrow_index: 1,
3010 nullability: None,
3011 plan: FieldPlan::Scalar,
3012 },
3013 ],
3014 };
3015 let got = encode_all(&union_array, &plan, None);
3016 let mut expected = Vec::new();
3017 expected.extend(avro_long_bytes(0));
3018 expected.extend(avro_len_prefixed_bytes(b"hello"));
3019 expected.extend(avro_long_bytes(1));
3020 expected.extend(avro_long_bytes(10));
3021 expected.extend(avro_long_bytes(1));
3022 expected.extend(avro_long_bytes(20));
3023 expected.extend(avro_long_bytes(0));
3024 expected.extend(avro_len_prefixed_bytes(b"world"));
3025 expected.extend(avro_long_bytes(1));
3026 expected.extend(avro_long_bytes(30));
3027 assert_bytes_eq(&got, &expected);
3028 }
3029
3030 #[test]
3031 fn nullable_state_with_null_buffer_and_zero_nulls() {
3032 let values = vec![1i32, 2, 3];
3033 let arr = Int32Array::from_iter_values_with_nulls(values, Some(NullBuffer::new_valid(3)));
3034 assert_eq!(arr.null_count(), 0);
3035 assert!(arr.nulls().is_some());
3036 let plan = FieldPlan::Scalar;
3037 let enc = FieldEncoder::make_encoder(&arr, &plan, Some(Nullability::NullFirst)).unwrap();
3038 match enc.null_state {
3039 NullState::NullableNoNulls { union_value_byte } => {
3040 assert_eq!(
3041 union_value_byte,
3042 union_value_branch_byte(Nullability::NullFirst, false)
3043 );
3044 }
3045 other => panic!("expected NullableNoNulls, got {other:?}"),
3046 }
3047 }
3048}