1use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::schema::{Fingerprint, Nullability, Prefix};
22use arrow_array::cast::AsArray;
23use arrow_array::types::{
24 ArrowPrimitiveType, Date32Type, DurationMicrosecondType, DurationMillisecondType,
25 DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int16Type, Int32Type,
26 Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
27 Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
28 TimestampMillisecondType,
29};
30use arrow_array::types::{
31 RunEndIndexType, Time32SecondType, TimestampNanosecondType, TimestampSecondType,
32};
33use arrow_array::{
34 Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
35 FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray,
36 GenericListViewArray, GenericStringArray, LargeListArray, LargeListViewArray, ListArray,
37 ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
38 StringViewArray, StructArray, UnionArray,
39};
40#[cfg(feature = "small_decimals")]
41use arrow_array::{Decimal32Array, Decimal64Array};
42use arrow_buffer::{ArrowNativeType, NullBuffer};
43use arrow_schema::{
44 ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode,
45};
46use std::io::Write;
47use std::sync::Arc;
48use uuid::Uuid;
49
50#[inline]
54pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), ArrowError> {
55 let mut zz = ((value << 1) ^ (value >> 63)) as u64;
56 let mut buf = [0u8; 10];
58 let mut i = 0;
59 while (zz & !0x7F) != 0 {
60 buf[i] = ((zz & 0x7F) as u8) | 0x80;
61 i += 1;
62 zz >>= 7;
63 }
64 buf[i] = (zz & 0x7F) as u8;
65 i += 1;
66 out.write_all(&buf[..i])
67 .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e))
68}
69
70#[inline]
71fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), ArrowError> {
72 write_long(out, value as i64)
73}
74
75#[inline]
76fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
77 write_long(out, bytes.len() as i64)?;
78 out.write_all(bytes)
79 .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
80}
81
82#[inline]
83fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), ArrowError> {
84 out.write_all(&[if v { 1 } else { 0 }])
85 .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
86}
87
88#[inline]
97fn minimal_twos_complement(be: &[u8]) -> &[u8] {
98 if be.is_empty() {
99 return be;
100 }
101 let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
102 let mut k = 0usize;
103 while k < be.len() && be[k] == sign_byte {
104 k += 1;
105 }
106 if k == 0 {
107 return be;
108 }
109 if k == be.len() {
110 return &be[be.len() - 1..];
111 }
112 let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
113 k
114 } else {
115 k - 1
116 };
117 &be[drop..]
118}
119
120#[inline]
132fn write_sign_extended<W: Write + ?Sized>(
133 out: &mut W,
134 src_be: &[u8],
135 n: usize,
136) -> Result<(), ArrowError> {
137 let len = src_be.len();
138 if len == n {
139 return out
140 .write_all(src_be)
141 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
142 }
143 let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
144 0xFF
145 } else {
146 0x00
147 };
148 if len > n {
149 let extra = len - n;
150 if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
151 return Ok(());
152 }
153 if src_be[..extra].iter().any(|&b| b != sign_byte)
156 || ((src_be[extra] ^ sign_byte) & 0x80) != 0
157 {
158 return Err(ArrowError::InvalidArgumentError(format!(
159 "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
160 )));
161 }
162 return out
163 .write_all(&src_be[extra..])
164 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
165 }
166 let pad_len = n - len;
168 const ZPAD: [u8; 64] = [0x00; 64];
170 const FPAD: [u8; 64] = [0xFF; 64];
171 let pad = if sign_byte == 0x00 {
172 &ZPAD[..]
173 } else {
174 &FPAD[..]
175 };
176 let mut rem = pad_len;
179 while rem >= pad.len() {
180 out.write_all(pad)
181 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
182 rem -= pad.len();
183 }
184 if rem > 0 {
185 out.write_all(&pad[..rem])
186 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
187 }
188 out.write_all(src_be)
189 .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))
190}
191
192fn write_optional_index<W: Write + ?Sized>(
198 out: &mut W,
199 is_null: bool,
200 null_order: Nullability,
201) -> Result<(), ArrowError> {
202 let byte = union_value_branch_byte(null_order, is_null);
203 out.write_all(&[byte])
204 .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), e))
205}
206
207#[derive(Debug, Clone)]
208enum NullState {
209 NonNullable,
210 NullableNoNulls {
211 union_value_byte: u8,
212 },
213 Nullable {
214 nulls: NullBuffer,
215 null_order: Nullability,
216 },
217}
218
219pub(crate) struct FieldEncoder<'a> {
223 encoder: Encoder<'a>,
224 null_state: NullState,
225}
226
227impl<'a> FieldEncoder<'a> {
228 fn make_encoder(
229 array: &'a dyn Array,
230 field: &Field,
231 plan: &FieldPlan,
232 nullability: Option<Nullability>,
233 ) -> Result<Self, ArrowError> {
234 let encoder = match plan {
235 FieldPlan::Scalar => match array.data_type() {
236 DataType::Null => Encoder::Null,
237 DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
238 DataType::Utf8 => {
239 Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
240 }
241 DataType::LargeUtf8 => {
242 Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
243 }
244 DataType::Utf8View => {
245 let arr = array
246 .as_any()
247 .downcast_ref::<StringViewArray>()
248 .ok_or_else(|| {
249 ArrowError::SchemaError("Expected StringViewArray".into())
250 })?;
251 Encoder::Utf8View(Utf8ViewEncoder(arr))
252 }
253 DataType::BinaryView => {
254 let arr = array
255 .as_any()
256 .downcast_ref::<BinaryViewArray>()
257 .ok_or_else(|| {
258 ArrowError::SchemaError("Expected BinaryViewArray".into())
259 })?;
260 Encoder::BinaryView(BinaryViewEncoder(arr))
261 }
262 DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
263 DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
264 DataType::Date32 => Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
265 DataType::Date64 => {
266 return Err(ArrowError::NotYetImplemented(
267 "Avro logical type 'date' is days since epoch (int). Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a Timestamp."
268 .into(),
269 ));
270 }
271 DataType::Time32(TimeUnit::Second) => Encoder::Time32SecsToMillis(
272 Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
273 ),
274 DataType::Time32(TimeUnit::Millisecond) => {
275 Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
276 }
277 DataType::Time32(TimeUnit::Microsecond) => {
278 return Err(ArrowError::InvalidArgumentError(
279 "Arrow Time32 only supports Second or Millisecond. Use Time64 for microseconds."
280 .into(),
281 ));
282 }
283 DataType::Time32(TimeUnit::Nanosecond) => {
284 return Err(ArrowError::InvalidArgumentError(
285 "Arrow Time32 only supports Second or Millisecond. Use Time64 for nanoseconds."
286 .into(),
287 ));
288 }
289 DataType::Time64(TimeUnit::Microsecond) => Encoder::Time64Micros(LongEncoder(
290 array.as_primitive::<Time64MicrosecondType>(),
291 )),
292 DataType::Time64(TimeUnit::Nanosecond) => {
293 return Err(ArrowError::NotYetImplemented(
294 "Avro writer does not support time-nanos; cast to Time64(Microsecond)."
295 .into(),
296 ));
297 }
298 DataType::Time64(TimeUnit::Millisecond) => {
299 return Err(ArrowError::InvalidArgumentError(
300 "Arrow Time64 with millisecond unit is not a valid Arrow type (use Time32 for millis)."
301 .into(),
302 ));
303 }
304 DataType::Time64(TimeUnit::Second) => {
305 return Err(ArrowError::InvalidArgumentError(
306 "Arrow Time64 with second unit is not a valid Arrow type (use Time32 for seconds)."
307 .into(),
308 ));
309 }
310 DataType::Float32 => {
311 Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
312 }
313 DataType::Float64 => {
314 Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
315 }
316 DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
317 DataType::LargeBinary => {
318 Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
319 }
320 DataType::FixedSizeBinary(_len) => {
321 let arr = array
322 .as_any()
323 .downcast_ref::<FixedSizeBinaryArray>()
324 .ok_or_else(|| {
325 ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
326 })?;
327 Encoder::Fixed(FixedEncoder(arr))
328 }
329 DataType::Timestamp(unit, _) => match unit {
330 TimeUnit::Second => {
331 Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
332 array.as_primitive::<TimestampSecondType>(),
333 ))
334 }
335 TimeUnit::Millisecond => Encoder::TimestampMillis(LongEncoder(
336 array.as_primitive::<TimestampMillisecondType>(),
337 )),
338 TimeUnit::Microsecond => Encoder::TimestampMicros(LongEncoder(
339 array.as_primitive::<TimestampMicrosecondType>(),
340 )),
341 TimeUnit::Nanosecond => Encoder::TimestampNanos(LongEncoder(
342 array.as_primitive::<TimestampNanosecondType>(),
343 )),
344 },
345 DataType::Interval(unit) => match unit {
346 IntervalUnit::MonthDayNano => Encoder::IntervalMonthDayNano(DurationEncoder(
347 array.as_primitive::<IntervalMonthDayNanoType>(),
348 )),
349 IntervalUnit::YearMonth => Encoder::IntervalYearMonth(DurationEncoder(
350 array.as_primitive::<IntervalYearMonthType>(),
351 )),
352 IntervalUnit::DayTime => Encoder::IntervalDayTime(DurationEncoder(
353 array.as_primitive::<IntervalDayTimeType>(),
354 )),
355 },
356 DataType::Duration(tu) => match tu {
357 TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
358 array.as_primitive::<DurationSecondType>(),
359 )),
360 TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
361 array.as_primitive::<DurationMillisecondType>(),
362 )),
363 TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
364 array.as_primitive::<DurationMicrosecondType>(),
365 )),
366 TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
367 array.as_primitive::<DurationNanosecondType>(),
368 )),
369 },
370 other => {
371 return Err(ArrowError::NotYetImplemented(format!(
372 "Avro scalar type not yet supported: {other:?}"
373 )));
374 }
375 },
376 FieldPlan::Struct { bindings } => {
377 let arr = array
378 .as_any()
379 .downcast_ref::<StructArray>()
380 .ok_or_else(|| ArrowError::SchemaError("Expected StructArray".into()))?;
381 Encoder::Struct(Box::new(StructEncoder::try_new(arr, bindings)?))
382 }
383 FieldPlan::List {
384 items_nullability,
385 item_plan,
386 } => match array.data_type() {
387 DataType::List(_) => {
388 let arr = array
389 .as_any()
390 .downcast_ref::<ListArray>()
391 .ok_or_else(|| ArrowError::SchemaError("Expected ListArray".into()))?;
392 Encoder::List(Box::new(ListEncoder32::try_new(
393 arr,
394 *items_nullability,
395 item_plan.as_ref(),
396 )?))
397 }
398 DataType::LargeList(_) => {
399 let arr = array
400 .as_any()
401 .downcast_ref::<LargeListArray>()
402 .ok_or_else(|| ArrowError::SchemaError("Expected LargeListArray".into()))?;
403 Encoder::LargeList(Box::new(ListEncoder64::try_new(
404 arr,
405 *items_nullability,
406 item_plan.as_ref(),
407 )?))
408 }
409 DataType::ListView(_) => {
410 let arr = array
411 .as_any()
412 .downcast_ref::<ListViewArray>()
413 .ok_or_else(|| ArrowError::SchemaError("Expected ListViewArray".into()))?;
414 Encoder::ListView(Box::new(ListViewEncoder32::try_new(
415 arr,
416 *items_nullability,
417 item_plan.as_ref(),
418 )?))
419 }
420 DataType::LargeListView(_) => {
421 let arr = array
422 .as_any()
423 .downcast_ref::<LargeListViewArray>()
424 .ok_or_else(|| {
425 ArrowError::SchemaError("Expected LargeListViewArray".into())
426 })?;
427 Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
428 arr,
429 *items_nullability,
430 item_plan.as_ref(),
431 )?))
432 }
433 DataType::FixedSizeList(_, _) => {
434 let arr = array
435 .as_any()
436 .downcast_ref::<FixedSizeListArray>()
437 .ok_or_else(|| {
438 ArrowError::SchemaError("Expected FixedSizeListArray".into())
439 })?;
440 Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
441 arr,
442 *items_nullability,
443 item_plan.as_ref(),
444 )?))
445 }
446 other => {
447 return Err(ArrowError::SchemaError(format!(
448 "Avro array site requires Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
449 )));
450 }
451 },
452 FieldPlan::Decimal { size } => match array.data_type() {
453 #[cfg(feature = "small_decimals")]
454 DataType::Decimal32(_, _) => {
455 let arr = array
456 .as_any()
457 .downcast_ref::<Decimal32Array>()
458 .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?;
459 Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
460 }
461 #[cfg(feature = "small_decimals")]
462 DataType::Decimal64(_, _) => {
463 let arr = array
464 .as_any()
465 .downcast_ref::<Decimal64Array>()
466 .ok_or_else(|| ArrowError::SchemaError("Expected Decimal64Array".into()))?;
467 Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
468 }
469 DataType::Decimal128(_, _) => {
470 let arr = array
471 .as_any()
472 .downcast_ref::<Decimal128Array>()
473 .ok_or_else(|| {
474 ArrowError::SchemaError("Expected Decimal128Array".into())
475 })?;
476 Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
477 }
478 DataType::Decimal256(_, _) => {
479 let arr = array
480 .as_any()
481 .downcast_ref::<Decimal256Array>()
482 .ok_or_else(|| {
483 ArrowError::SchemaError("Expected Decimal256Array".into())
484 })?;
485 Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
486 }
487 other => {
488 return Err(ArrowError::SchemaError(format!(
489 "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
490 )));
491 }
492 },
493 FieldPlan::Uuid => {
494 let arr = array
495 .as_any()
496 .downcast_ref::<FixedSizeBinaryArray>()
497 .ok_or_else(|| {
498 ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
499 })?;
500 Encoder::Uuid(UuidEncoder(arr))
501 }
502 FieldPlan::Map {
503 values_nullability,
504 value_plan,
505 } => {
506 let arr = array
507 .as_any()
508 .downcast_ref::<MapArray>()
509 .ok_or_else(|| ArrowError::SchemaError("Expected MapArray".into()))?;
510 Encoder::Map(Box::new(MapEncoder::try_new(
511 arr,
512 *values_nullability,
513 value_plan.as_ref(),
514 )?))
515 }
516 FieldPlan::Enum { symbols } => match array.data_type() {
517 DataType::Dictionary(key_dt, value_dt) => {
518 if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
519 return Err(ArrowError::SchemaError(
520 "Avro enum requires Dictionary<Int32, Utf8>".into(),
521 ));
522 }
523 let dict = array
524 .as_any()
525 .downcast_ref::<DictionaryArray<Int32Type>>()
526 .ok_or_else(|| {
527 ArrowError::SchemaError("Expected DictionaryArray<Int32>".into())
528 })?;
529 let values = dict
530 .values()
531 .as_any()
532 .downcast_ref::<StringArray>()
533 .ok_or_else(|| {
534 ArrowError::SchemaError("Dictionary values must be Utf8".into())
535 })?;
536 if values.len() != symbols.len() {
537 return Err(ArrowError::SchemaError(format!(
538 "Enum symbol length {} != dictionary size {}",
539 symbols.len(),
540 values.len()
541 )));
542 }
543 for i in 0..values.len() {
544 if values.value(i) != symbols[i].as_str() {
545 return Err(ArrowError::SchemaError(format!(
546 "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
547 symbols[i],
548 values.value(i)
549 )));
550 }
551 }
552 let keys = dict.keys();
553 Encoder::Enum(EnumEncoder { keys })
554 }
555 other => {
556 return Err(ArrowError::SchemaError(format!(
557 "Avro enum site requires DataType::Dictionary, found: {other:?}"
558 )));
559 }
560 },
561 FieldPlan::Union { bindings } => {
562 let arr = array
563 .as_any()
564 .downcast_ref::<UnionArray>()
565 .ok_or_else(|| ArrowError::SchemaError("Expected UnionArray".into()))?;
566
567 Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
568 }
569 FieldPlan::RunEndEncoded {
570 values_nullability,
571 value_plan,
572 } => {
573 let dt = array.data_type();
574 let values_field = match dt {
575 DataType::RunEndEncoded(_re_field, v_field) => v_field.as_ref(),
576 other => {
577 return Err(ArrowError::SchemaError(format!(
578 "Avro RunEndEncoded site requires Arrow DataType::RunEndEncoded, found: {other:?}"
579 )));
580 }
581 };
582 let build = |run_arr_any: &'a dyn Array| -> Result<Encoder<'a>, ArrowError> {
584 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
585 let values_enc = prepare_value_site_encoder(
586 arr.values().as_ref(),
587 values_field,
588 *values_nullability,
589 value_plan.as_ref(),
590 )?;
591 return Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
592 Int16Type,
593 >::new(
594 arr, values_enc
595 ))));
596 }
597 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
598 let values_enc = prepare_value_site_encoder(
599 arr.values().as_ref(),
600 values_field,
601 *values_nullability,
602 value_plan.as_ref(),
603 )?;
604 return Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
605 Int32Type,
606 >::new(
607 arr, values_enc
608 ))));
609 }
610 if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
611 let values_enc = prepare_value_site_encoder(
612 arr.values().as_ref(),
613 values_field,
614 *values_nullability,
615 value_plan.as_ref(),
616 )?;
617 return Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
618 Int64Type,
619 >::new(
620 arr, values_enc
621 ))));
622 }
623 Err(ArrowError::SchemaError(
624 "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64"
625 .into(),
626 ))
627 };
628 build(array)?
629 }
630 };
631 let null_state = match (nullability, array.null_count() > 0) {
633 (None, false) => NullState::NonNullable,
634 (None, true) => {
635 return Err(ArrowError::InvalidArgumentError(format!(
636 "Avro site '{}' is non-nullable, but array contains nulls",
637 field.name()
638 )));
639 }
640 (Some(order), false) => {
641 NullState::NullableNoNulls {
643 union_value_byte: union_value_branch_byte(order, false),
644 }
645 }
646 (Some(null_order), true) => {
647 let Some(nulls) = array.nulls().cloned() else {
648 return Err(ArrowError::InvalidArgumentError(format!(
649 "Array for Avro site '{}' reports nulls but has no null buffer",
650 field.name()
651 )));
652 };
653 NullState::Nullable { nulls, null_order }
654 }
655 };
656 Ok(Self {
657 encoder,
658 null_state,
659 })
660 }
661
662 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
663 match &self.null_state {
664 NullState::NonNullable => {}
665 NullState::NullableNoNulls { union_value_byte } => out
666 .write_all(&[*union_value_byte])
667 .map_err(|e| ArrowError::IoError(format!("write union value branch: {e}"), e))?,
668 NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
669 return write_optional_index(out, true, *null_order); }
671 NullState::Nullable { null_order, .. } => {
672 write_optional_index(out, false, *null_order)?;
673 }
674 }
675 self.encoder.encode(out, idx)
676 }
677}
678
679fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
680 let nulls_first = null_order == Nullability::default();
681 if nulls_first == is_null { 0x00 } else { 0x02 }
682}
683
684#[derive(Debug, Clone)]
687enum FieldPlan {
688 Scalar,
690 Struct { bindings: Vec<FieldBinding> },
692 List {
694 items_nullability: Option<Nullability>,
695 item_plan: Box<FieldPlan>,
696 },
697 Decimal { size: Option<usize> },
699 Uuid,
701 Map {
703 values_nullability: Option<Nullability>,
704 value_plan: Box<FieldPlan>,
705 },
706 Enum { symbols: Arc<[String]> },
709 Union { bindings: Vec<FieldBinding> },
711 RunEndEncoded {
714 values_nullability: Option<Nullability>,
715 value_plan: Box<FieldPlan>,
716 },
717}
718
719#[derive(Debug, Clone)]
720struct FieldBinding {
721 arrow_index: usize,
723 nullability: Option<Nullability>,
725 plan: FieldPlan,
727}
728
729#[derive(Debug)]
731pub(crate) struct RecordEncoderBuilder<'a> {
732 avro_root: &'a AvroField,
733 arrow_schema: &'a ArrowSchema,
734 fingerprint: Option<Fingerprint>,
735}
736
737impl<'a> RecordEncoderBuilder<'a> {
738 pub(crate) fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
740 Self {
741 avro_root,
742 arrow_schema,
743 fingerprint: None,
744 }
745 }
746
747 pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
748 self.fingerprint = fingerprint;
749 self
750 }
751
752 pub(crate) fn build(self) -> Result<RecordEncoder, ArrowError> {
755 let avro_root_dt = self.avro_root.data_type();
756 let Codec::Struct(root_fields) = avro_root_dt.codec() else {
757 return Err(ArrowError::SchemaError(
758 "Top-level Avro schema must be a record/struct".into(),
759 ));
760 };
761 let mut columns = Vec::with_capacity(root_fields.len());
762 for root_field in root_fields.as_ref() {
763 let name = root_field.name();
764 let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
765 ArrowError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
766 })?;
767 columns.push(FieldBinding {
768 arrow_index,
769 nullability: root_field.data_type().nullability(),
770 plan: FieldPlan::build(
771 root_field.data_type(),
772 self.arrow_schema.field(arrow_index),
773 )?,
774 });
775 }
776 Ok(RecordEncoder {
777 columns,
778 prefix: self.fingerprint.map(|fp| fp.make_prefix()),
779 })
780 }
781}
782
783#[derive(Debug, Clone)]
789pub(crate) struct RecordEncoder {
790 columns: Vec<FieldBinding>,
791 prefix: Option<Prefix>,
793}
794
795impl RecordEncoder {
796 fn prepare_for_batch<'a>(
797 &'a self,
798 batch: &'a RecordBatch,
799 ) -> Result<Vec<FieldEncoder<'a>>, ArrowError> {
800 let schema_binding = batch.schema();
801 let fields = schema_binding.fields();
802 let arrays = batch.columns();
803 let mut out = Vec::with_capacity(self.columns.len());
804 for col_plan in self.columns.iter() {
805 let arrow_index = col_plan.arrow_index;
806 let array = arrays.get(arrow_index).ok_or_else(|| {
807 ArrowError::SchemaError(format!("Column index {arrow_index} out of range"))
808 })?;
809 let field = fields[arrow_index].as_ref();
810 #[cfg(not(feature = "avro_custom_types"))]
811 let site_nullability = match &col_plan.plan {
812 FieldPlan::RunEndEncoded { .. } => None,
813 _ => col_plan.nullability,
814 };
815 #[cfg(feature = "avro_custom_types")]
816 let site_nullability = col_plan.nullability;
817 let encoder = prepare_value_site_encoder(
818 array.as_ref(),
819 field,
820 site_nullability,
821 &col_plan.plan,
822 )?;
823 out.push(encoder);
824 }
825 Ok(out)
826 }
827
828 pub(crate) fn encode<W: Write>(
832 &self,
833 out: &mut W,
834 batch: &RecordBatch,
835 ) -> Result<(), ArrowError> {
836 let mut column_encoders = self.prepare_for_batch(batch)?;
837 let n = batch.num_rows();
838 match self.prefix {
839 Some(prefix) => {
840 for row in 0..n {
841 out.write_all(prefix.as_slice())
842 .map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?;
843 for enc in column_encoders.iter_mut() {
844 enc.encode(out, row)?;
845 }
846 }
847 }
848 None => {
849 for row in 0..n {
850 for enc in column_encoders.iter_mut() {
851 enc.encode(out, row)?;
852 }
853 }
854 }
855 }
856 Ok(())
857 }
858}
859
860fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
861 fields.iter().position(|f| f.name() == name)
862}
863
864fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
865 find_struct_child_index(fields, "value")
867 .or_else(|| find_struct_child_index(fields, "values"))
868 .or_else(|| if fields.len() == 2 { Some(1) } else { None })
869}
870
871impl FieldPlan {
872 fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, ArrowError> {
873 #[cfg(not(feature = "avro_custom_types"))]
874 if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() {
875 let values_nullability = avro_dt.nullability();
876 let value_site_dt: &AvroDataType = match avro_dt.codec() {
877 Codec::Union(branches, _, _) => branches
878 .iter()
879 .find(|b| !matches!(b.codec(), Codec::Null))
880 .ok_or_else(|| {
881 ArrowError::SchemaError(
882 "Avro union at RunEndEncoded site has no non-null branch".into(),
883 )
884 })?,
885 _ => avro_dt,
886 };
887 return Ok(FieldPlan::RunEndEncoded {
888 values_nullability,
889 value_plan: Box::new(FieldPlan::build(value_site_dt, values_field.as_ref())?),
890 });
891 }
892 if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
893 let ext_is_uuid = {
895 #[cfg(feature = "canonical_extension_types")]
896 {
897 matches!(
898 arrow_field.extension_type_name(),
899 Some("arrow.uuid") | Some("uuid")
900 )
901 }
902 #[cfg(not(feature = "canonical_extension_types"))]
903 {
904 false
905 }
906 };
907 let md_is_uuid = arrow_field
908 .metadata()
909 .get("logicalType")
910 .map(|s| s.as_str())
911 == Some("uuid");
912 if ext_is_uuid || md_is_uuid {
913 if *len != 16 {
914 return Err(ArrowError::InvalidArgumentError(
915 "logicalType=uuid requires FixedSizeBinary(16)".into(),
916 ));
917 }
918 return Ok(FieldPlan::Uuid);
919 }
920 }
921 match avro_dt.codec() {
922 Codec::Struct(avro_fields) => {
923 let fields = match arrow_field.data_type() {
924 DataType::Struct(struct_fields) => struct_fields,
925 other => {
926 return Err(ArrowError::SchemaError(format!(
927 "Avro struct maps to Arrow Struct, found: {other:?}"
928 )));
929 }
930 };
931 let mut bindings = Vec::with_capacity(avro_fields.len());
932 for avro_field in avro_fields.iter() {
933 let name = avro_field.name().to_string();
934 let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
935 ArrowError::SchemaError(format!(
936 "Struct field '{name}' not present in Arrow field '{}'",
937 arrow_field.name()
938 ))
939 })?;
940 bindings.push(FieldBinding {
941 arrow_index: idx,
942 nullability: avro_field.data_type().nullability(),
943 plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
944 });
945 }
946 Ok(FieldPlan::Struct { bindings })
947 }
948 Codec::List(items_dt) => match arrow_field.data_type() {
949 DataType::List(field_ref)
950 | DataType::LargeList(field_ref)
951 | DataType::ListView(field_ref)
952 | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
953 items_nullability: items_dt.nullability(),
954 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
955 }),
956 DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List {
957 items_nullability: items_dt.nullability(),
958 item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
959 }),
960 other => Err(ArrowError::SchemaError(format!(
961 "Avro array maps to Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
962 ))),
963 },
964 Codec::Map(values_dt) => {
965 let entries_field = match arrow_field.data_type() {
966 DataType::Map(entries, _sorted) => entries.as_ref(),
967 other => {
968 return Err(ArrowError::SchemaError(format!(
969 "Avro map maps to Arrow DataType::Map, found: {other:?}"
970 )));
971 }
972 };
973 let entries_struct_fields = match entries_field.data_type() {
974 DataType::Struct(fs) => fs,
975 other => {
976 return Err(ArrowError::SchemaError(format!(
977 "Arrow Map entries must be Struct, found: {other:?}"
978 )));
979 }
980 };
981 let value_idx =
982 find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
983 ArrowError::SchemaError("Map entries struct missing value field".into())
984 })?;
985 let value_field = entries_struct_fields[value_idx].as_ref();
986 let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
987 Ok(FieldPlan::Map {
988 values_nullability: values_dt.nullability(),
989 value_plan: Box::new(value_plan),
990 })
991 }
992 Codec::Enum(symbols) => match arrow_field.data_type() {
993 DataType::Dictionary(key_dt, value_dt) => {
994 if **key_dt != DataType::Int32 {
995 return Err(ArrowError::SchemaError(
996 "Avro enum requires Dictionary<Int32, Utf8>".into(),
997 ));
998 }
999 if **value_dt != DataType::Utf8 {
1000 return Err(ArrowError::SchemaError(
1001 "Avro enum requires Dictionary<Int32, Utf8>".into(),
1002 ));
1003 }
1004 Ok(FieldPlan::Enum {
1005 symbols: symbols.clone(),
1006 })
1007 }
1008 other => Err(ArrowError::SchemaError(format!(
1009 "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
1010 ))),
1011 },
1012 Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
1014 let (ap, as_) = match arrow_field.data_type() {
1015 #[cfg(feature = "small_decimals")]
1016 DataType::Decimal32(p, s) => (*p as usize, *s as i32),
1017 #[cfg(feature = "small_decimals")]
1018 DataType::Decimal64(p, s) => (*p as usize, *s as i32),
1019 DataType::Decimal128(p, s) => (*p as usize, *s as i32),
1020 DataType::Decimal256(p, s) => (*p as usize, *s as i32),
1021 other => {
1022 return Err(ArrowError::SchemaError(format!(
1023 "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
1024 arrow_field.name()
1025 )));
1026 }
1027 };
1028 let sc = scale_opt.unwrap_or(0) as i32; if ap != *precision || as_ != sc {
1030 return Err(ArrowError::SchemaError(format!(
1031 "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
1032 arrow_field.name()
1033 )));
1034 }
1035 Ok(FieldPlan::Decimal {
1036 size: *fixed_size_opt,
1037 })
1038 }
1039 Codec::Interval => match arrow_field.data_type() {
1040 DataType::Interval(
1041 IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime,
1042 ) => Ok(FieldPlan::Scalar),
1043 other => Err(ArrowError::SchemaError(format!(
1044 "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}"
1045 ))),
1046 },
1047 Codec::Union(avro_branches, _, UnionMode::Dense) => {
1048 let arrow_union_fields = match arrow_field.data_type() {
1049 DataType::Union(fields, UnionMode::Dense) => fields,
1050 DataType::Union(_, UnionMode::Sparse) => {
1051 return Err(ArrowError::NotYetImplemented(
1052 "Sparse Arrow unions are not yet supported".to_string(),
1053 ));
1054 }
1055 other => {
1056 return Err(ArrowError::SchemaError(format!(
1057 "Avro union maps to Arrow Union, found: {other:?}"
1058 )));
1059 }
1060 };
1061 if avro_branches.len() != arrow_union_fields.len() {
1062 return Err(ArrowError::SchemaError(format!(
1063 "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'",
1064 avro_branches.len(),
1065 arrow_union_fields.len(),
1066 arrow_field.name()
1067 )));
1068 }
1069 let bindings = avro_branches
1070 .iter()
1071 .zip(arrow_union_fields.iter())
1072 .enumerate()
1073 .map(|(i, (avro_branch, (_, arrow_child_field)))| {
1074 Ok(FieldBinding {
1075 arrow_index: i,
1076 nullability: avro_branch.nullability(),
1077 plan: FieldPlan::build(avro_branch, arrow_child_field)?,
1078 })
1079 })
1080 .collect::<Result<Vec<_>, ArrowError>>()?;
1081 Ok(FieldPlan::Union { bindings })
1082 }
1083 Codec::Union(_, _, UnionMode::Sparse) => Err(ArrowError::NotYetImplemented(
1084 "Sparse Arrow unions are not yet supported".to_string(),
1085 )),
1086 #[cfg(feature = "avro_custom_types")]
1087 Codec::RunEndEncoded(values_dt, _width_code) => {
1088 let values_field = match arrow_field.data_type() {
1089 DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(),
1090 other => {
1091 return Err(ArrowError::SchemaError(format!(
1092 "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}"
1093 )));
1094 }
1095 };
1096 Ok(FieldPlan::RunEndEncoded {
1097 values_nullability: values_dt.nullability(),
1098 value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), values_field)?),
1099 })
1100 }
1101 _ => Ok(FieldPlan::Scalar),
1102 }
1103 }
1104}
1105
1106enum Encoder<'a> {
1107 Boolean(BooleanEncoder<'a>),
1108 Int(IntEncoder<'a, Int32Type>),
1109 Long(LongEncoder<'a, Int64Type>),
1110 TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
1111 TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
1112 TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
1113 TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
1114 Date32(IntEncoder<'a, Date32Type>),
1115 Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
1116 Time32Millis(IntEncoder<'a, Time32MillisecondType>),
1117 Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
1118 DurationSeconds(LongEncoder<'a, DurationSecondType>),
1119 DurationMillis(LongEncoder<'a, DurationMillisecondType>),
1120 DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
1121 DurationNanos(LongEncoder<'a, DurationNanosecondType>),
1122 Float32(F32Encoder<'a>),
1123 Float64(F64Encoder<'a>),
1124 Binary(BinaryEncoder<'a, i32>),
1125 LargeBinary(BinaryEncoder<'a, i64>),
1126 Utf8(Utf8Encoder<'a>),
1127 Utf8Large(Utf8LargeEncoder<'a>),
1128 Utf8View(Utf8ViewEncoder<'a>),
1129 BinaryView(BinaryViewEncoder<'a>),
1130 List(Box<ListEncoder32<'a>>),
1131 LargeList(Box<ListEncoder64<'a>>),
1132 ListView(Box<ListViewEncoder32<'a>>),
1133 LargeListView(Box<ListViewEncoder64<'a>>),
1134 FixedSizeList(Box<FixedSizeListEncoder<'a>>),
1135 Struct(Box<StructEncoder<'a>>),
1136 Fixed(FixedEncoder<'a>),
1138 Uuid(UuidEncoder<'a>),
1140 IntervalMonthDayNano(DurationEncoder<'a, IntervalMonthDayNanoType>),
1142 IntervalYearMonth(DurationEncoder<'a, IntervalYearMonthType>),
1144 IntervalDayTime(DurationEncoder<'a, IntervalDayTimeType>),
1146 #[cfg(feature = "small_decimals")]
1147 Decimal32(Decimal32Encoder<'a>),
1148 #[cfg(feature = "small_decimals")]
1149 Decimal64(Decimal64Encoder<'a>),
1150 Decimal128(Decimal128Encoder<'a>),
1151 Decimal256(Decimal256Encoder<'a>),
1152 Enum(EnumEncoder<'a>),
1154 Map(Box<MapEncoder<'a>>),
1155 Union(Box<UnionEncoder<'a>>),
1156 RunEncoded16(Box<RunEncodedEncoder16<'a>>),
1158 RunEncoded32(Box<RunEncodedEncoder32<'a>>),
1159 RunEncoded64(Box<RunEncodedEncoder64<'a>>),
1160 Null,
1161}
1162
1163impl<'a> Encoder<'a> {
1164 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1166 match self {
1167 Encoder::Boolean(e) => e.encode(out, idx),
1168 Encoder::Int(e) => e.encode(out, idx),
1169 Encoder::Long(e) => e.encode(out, idx),
1170 Encoder::TimestampMicros(e) => e.encode(out, idx),
1171 Encoder::TimestampMillis(e) => e.encode(out, idx),
1172 Encoder::TimestampNanos(e) => e.encode(out, idx),
1173 Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
1174 Encoder::Date32(e) => e.encode(out, idx),
1175 Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
1176 Encoder::Time32Millis(e) => e.encode(out, idx),
1177 Encoder::Time64Micros(e) => e.encode(out, idx),
1178 Encoder::DurationSeconds(e) => e.encode(out, idx),
1179 Encoder::DurationMicros(e) => e.encode(out, idx),
1180 Encoder::DurationMillis(e) => e.encode(out, idx),
1181 Encoder::DurationNanos(e) => e.encode(out, idx),
1182 Encoder::Float32(e) => e.encode(out, idx),
1183 Encoder::Float64(e) => e.encode(out, idx),
1184 Encoder::Binary(e) => e.encode(out, idx),
1185 Encoder::LargeBinary(e) => e.encode(out, idx),
1186 Encoder::Utf8(e) => e.encode(out, idx),
1187 Encoder::Utf8Large(e) => e.encode(out, idx),
1188 Encoder::Utf8View(e) => e.encode(out, idx),
1189 Encoder::BinaryView(e) => e.encode(out, idx),
1190 Encoder::List(e) => e.encode(out, idx),
1191 Encoder::LargeList(e) => e.encode(out, idx),
1192 Encoder::ListView(e) => e.encode(out, idx),
1193 Encoder::LargeListView(e) => e.encode(out, idx),
1194 Encoder::FixedSizeList(e) => e.encode(out, idx),
1195 Encoder::Struct(e) => e.encode(out, idx),
1196 Encoder::Fixed(e) => (e).encode(out, idx),
1197 Encoder::Uuid(e) => (e).encode(out, idx),
1198 Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx),
1199 Encoder::IntervalYearMonth(e) => (e).encode(out, idx),
1200 Encoder::IntervalDayTime(e) => (e).encode(out, idx),
1201 #[cfg(feature = "small_decimals")]
1202 Encoder::Decimal32(e) => (e).encode(out, idx),
1203 #[cfg(feature = "small_decimals")]
1204 Encoder::Decimal64(e) => (e).encode(out, idx),
1205 Encoder::Decimal128(e) => (e).encode(out, idx),
1206 Encoder::Decimal256(e) => (e).encode(out, idx),
1207 Encoder::Map(e) => (e).encode(out, idx),
1208 Encoder::Enum(e) => (e).encode(out, idx),
1209 Encoder::Union(e) => (e).encode(out, idx),
1210 Encoder::RunEncoded16(e) => (e).encode(out, idx),
1211 Encoder::RunEncoded32(e) => (e).encode(out, idx),
1212 Encoder::RunEncoded64(e) => (e).encode(out, idx),
1213 Encoder::Null => Ok(()),
1214 }
1215 }
1216}
1217
1218struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
1219impl BooleanEncoder<'_> {
1220 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1221 write_bool(out, self.0.value(idx))
1222 }
1223}
1224
1225struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
1227impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
1228 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1229 write_int(out, self.0.value(idx))
1230 }
1231}
1232
1233struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
1235impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
1236 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1237 write_long(out, self.0.value(idx))
1238 }
1239}
1240
1241struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
1243impl<'a> Time32SecondsToMillisEncoder<'a> {
1244 #[inline]
1245 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1246 let secs = self.0.value(idx);
1247 let millis = secs.checked_mul(1000).ok_or_else(|| {
1248 ArrowError::InvalidArgumentError("time32(secs) * 1000 overflowed".into())
1249 })?;
1250 write_int(out, millis)
1251 }
1252}
1253
1254struct TimestampSecondsToMillisEncoder<'a>(&'a PrimitiveArray<TimestampSecondType>);
1256impl<'a> TimestampSecondsToMillisEncoder<'a> {
1257 #[inline]
1258 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1259 let secs = self.0.value(idx);
1260 let millis = secs.checked_mul(1000).ok_or_else(|| {
1261 ArrowError::InvalidArgumentError("timestamp(secs) * 1000 overflowed".into())
1262 })?;
1263 write_long(out, millis)
1264 }
1265}
1266
1267struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
1269impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
1270 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1271 write_len_prefixed(out, self.0.value(idx))
1272 }
1273}
1274
1275struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
1277impl BinaryViewEncoder<'_> {
1278 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1279 write_len_prefixed(out, self.0.value(idx))
1280 }
1281}
1282
1283struct Utf8ViewEncoder<'a>(&'a StringViewArray);
1285impl Utf8ViewEncoder<'_> {
1286 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1287 write_len_prefixed(out, self.0.value(idx).as_bytes())
1288 }
1289}
1290
1291struct F32Encoder<'a>(&'a arrow_array::Float32Array);
1292impl F32Encoder<'_> {
1293 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1294 let bits = self.0.value(idx).to_bits();
1296 out.write_all(&bits.to_le_bytes())
1297 .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e))
1298 }
1299}
1300
1301struct F64Encoder<'a>(&'a arrow_array::Float64Array);
1302impl F64Encoder<'_> {
1303 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1304 let bits = self.0.value(idx).to_bits();
1306 out.write_all(&bits.to_le_bytes())
1307 .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
1308 }
1309}
1310
1311struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
1312
1313impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
1314 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1315 write_len_prefixed(out, self.0.value(idx).as_bytes())
1316 }
1317}
1318
1319type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
1320type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
1321
1322enum KeyKind<'a> {
1324 Utf8(&'a GenericStringArray<i32>),
1325 LargeUtf8(&'a GenericStringArray<i64>),
1326}
1327struct MapEncoder<'a> {
1328 map: &'a MapArray,
1329 keys: KeyKind<'a>,
1330 values: FieldEncoder<'a>,
1331 keys_offset: usize,
1332 values_offset: usize,
1333}
1334
1335impl<'a> MapEncoder<'a> {
1336 fn try_new(
1337 map: &'a MapArray,
1338 values_nullability: Option<Nullability>,
1339 value_plan: &FieldPlan,
1340 ) -> Result<Self, ArrowError> {
1341 let keys_arr = map.keys();
1342 let keys_kind = match keys_arr.data_type() {
1343 DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
1344 DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
1345 other => {
1346 return Err(ArrowError::SchemaError(format!(
1347 "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
1348 )));
1349 }
1350 };
1351
1352 let entries_struct_fields = match map.data_type() {
1353 DataType::Map(entries, _) => match entries.data_type() {
1354 DataType::Struct(fs) => fs,
1355 other => {
1356 return Err(ArrowError::SchemaError(format!(
1357 "Arrow Map entries must be Struct, found: {other:?}"
1358 )));
1359 }
1360 },
1361 _ => {
1362 return Err(ArrowError::SchemaError(
1363 "Expected MapArray with DataType::Map".into(),
1364 ));
1365 }
1366 };
1367
1368 let v_idx = find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
1369 ArrowError::SchemaError("Map entries struct missing value field".into())
1370 })?;
1371 let value_field = entries_struct_fields[v_idx].as_ref();
1372
1373 let values_enc = prepare_value_site_encoder(
1374 map.values().as_ref(),
1375 value_field,
1376 values_nullability,
1377 value_plan,
1378 )?;
1379
1380 Ok(Self {
1381 map,
1382 keys: keys_kind,
1383 values: values_enc,
1384 keys_offset: keys_arr.offset(),
1385 values_offset: map.values().offset(),
1386 })
1387 }
1388
1389 fn encode_map_entries<W, O>(
1390 out: &mut W,
1391 keys: &GenericStringArray<O>,
1392 keys_offset: usize,
1393 start: usize,
1394 end: usize,
1395 mut write_item: impl FnMut(&mut W, usize) -> Result<(), ArrowError>,
1396 ) -> Result<(), ArrowError>
1397 where
1398 W: Write + ?Sized,
1399 O: OffsetSizeTrait,
1400 {
1401 encode_blocked_range(out, start, end, |out, j| {
1402 let j_key = j.saturating_sub(keys_offset);
1403 write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1404 write_item(out, j)
1405 })
1406 }
1407
1408 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1409 let offsets = self.map.offsets();
1410 let start = offsets[idx] as usize;
1411 let end = offsets[idx + 1] as usize;
1412 let write_item = |out: &mut W, j: usize| {
1413 let j_val = j.saturating_sub(self.values_offset);
1414 self.values.encode(out, j_val)
1415 };
1416 match self.keys {
1417 KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1418 out,
1419 arr,
1420 self.keys_offset,
1421 start,
1422 end,
1423 write_item,
1424 ),
1425 KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1426 out,
1427 arr,
1428 self.keys_offset,
1429 start,
1430 end,
1431 write_item,
1432 ),
1433 }
1434 }
1435}
1436
1437struct EnumEncoder<'a> {
1444 keys: &'a PrimitiveArray<Int32Type>,
1445}
1446impl EnumEncoder<'_> {
1447 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), ArrowError> {
1448 write_int(out, self.keys.value(row))
1449 }
1450}
1451
1452struct UnionEncoder<'a> {
1453 encoders: Vec<FieldEncoder<'a>>,
1454 array: &'a UnionArray,
1455}
1456
1457impl<'a> UnionEncoder<'a> {
1458 fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result<Self, ArrowError> {
1459 let DataType::Union(fields, UnionMode::Dense) = array.data_type() else {
1460 return Err(ArrowError::SchemaError("Expected Dense UnionArray".into()));
1461 };
1462
1463 if fields.len() != field_bindings.len() {
1464 return Err(ArrowError::SchemaError(format!(
1465 "Mismatched number of union branches between Arrow array ({}) and encoding plan ({})",
1466 fields.len(),
1467 field_bindings.len()
1468 )));
1469 }
1470 let mut encoders = Vec::with_capacity(fields.len());
1471 for (type_id, field_ref) in fields.iter() {
1472 let binding = field_bindings
1473 .get(type_id as usize)
1474 .ok_or_else(|| ArrowError::SchemaError("Binding and field mismatch".to_string()))?;
1475
1476 let child = array.child(type_id).as_ref();
1477
1478 let encoder = prepare_value_site_encoder(
1479 child,
1480 field_ref.as_ref(),
1481 binding.nullability,
1482 &binding.plan,
1483 )?;
1484 encoders.push(encoder);
1485 }
1486 Ok(Self { encoders, array })
1487 }
1488
1489 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1490 let type_id = self.array.type_ids()[idx];
1491 let branch_index = type_id as usize;
1492 write_int(out, type_id as i32)?;
1493 let child_row = self.array.value_offset(idx);
1494
1495 let encoder = self
1496 .encoders
1497 .get_mut(branch_index)
1498 .ok_or_else(|| ArrowError::SchemaError(format!("Invalid type_id {type_id}")))?;
1499
1500 encoder.encode(out, child_row)
1501 }
1502}
1503
1504struct StructEncoder<'a> {
1505 encoders: Vec<FieldEncoder<'a>>,
1506}
1507
1508impl<'a> StructEncoder<'a> {
1509 fn try_new(
1510 array: &'a StructArray,
1511 field_bindings: &[FieldBinding],
1512 ) -> Result<Self, ArrowError> {
1513 let DataType::Struct(fields) = array.data_type() else {
1514 return Err(ArrowError::SchemaError("Expected Struct".into()));
1515 };
1516 let mut encoders = Vec::with_capacity(field_bindings.len());
1517 for field_binding in field_bindings {
1518 let idx = field_binding.arrow_index;
1519 let column = array.columns().get(idx).ok_or_else(|| {
1520 ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1521 })?;
1522 let field = fields.get(idx).ok_or_else(|| {
1523 ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1524 })?;
1525 let encoder = prepare_value_site_encoder(
1526 column.as_ref(),
1527 field,
1528 field_binding.nullability,
1529 &field_binding.plan,
1530 )?;
1531 encoders.push(encoder);
1532 }
1533 Ok(Self { encoders })
1534 }
1535
1536 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1537 for encoder in self.encoders.iter_mut() {
1538 encoder.encode(out, idx)?;
1539 }
1540 Ok(())
1541 }
1542}
1543
1544fn encode_blocked_range<W: Write + ?Sized, F>(
1548 out: &mut W,
1549 start: usize,
1550 end: usize,
1551 mut write_item: F,
1552) -> Result<(), ArrowError>
1553where
1554 F: FnMut(&mut W, usize) -> Result<(), ArrowError>,
1555{
1556 let len = end.saturating_sub(start);
1557 if len == 0 {
1558 write_long(out, 0)?;
1560 return Ok(());
1561 }
1562 write_long(out, len as i64)?;
1564 for row in start..end {
1565 write_item(out, row)?;
1566 }
1567 write_long(out, 0)?;
1568 Ok(())
1569}
1570
1571struct ListEncoder<'a, O: OffsetSizeTrait> {
1572 list: &'a GenericListArray<O>,
1573 values: FieldEncoder<'a>,
1574 values_offset: usize,
1575}
1576
1577type ListEncoder32<'a> = ListEncoder<'a, i32>;
1578type ListEncoder64<'a> = ListEncoder<'a, i64>;
1579
1580impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
1581 fn try_new(
1582 list: &'a GenericListArray<O>,
1583 items_nullability: Option<Nullability>,
1584 item_plan: &FieldPlan,
1585 ) -> Result<Self, ArrowError> {
1586 let child_field = match list.data_type() {
1587 DataType::List(field) => field.as_ref(),
1588 DataType::LargeList(field) => field.as_ref(),
1589 _ => {
1590 return Err(ArrowError::SchemaError(
1591 "Expected List or LargeList for ListEncoder".into(),
1592 ));
1593 }
1594 };
1595 let values_enc = prepare_value_site_encoder(
1596 list.values().as_ref(),
1597 child_field,
1598 items_nullability,
1599 item_plan,
1600 )?;
1601 Ok(Self {
1602 list,
1603 values: values_enc,
1604 values_offset: list.values().offset(),
1605 })
1606 }
1607
1608 fn encode_list_range<W: Write + ?Sized>(
1609 &mut self,
1610 out: &mut W,
1611 start: usize,
1612 end: usize,
1613 ) -> Result<(), ArrowError> {
1614 encode_blocked_range(out, start, end, |out, row| {
1615 self.values
1616 .encode(out, row.saturating_sub(self.values_offset))
1617 })
1618 }
1619
1620 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1621 let offsets = self.list.offsets();
1622 let start = offsets[idx].to_usize().ok_or_else(|| {
1623 ArrowError::InvalidArgumentError(format!("Error converting offset[{idx}] to usize"))
1624 })?;
1625 let end = offsets[idx + 1].to_usize().ok_or_else(|| {
1626 ArrowError::InvalidArgumentError(format!(
1627 "Error converting offset[{}] to usize",
1628 idx + 1
1629 ))
1630 })?;
1631 self.encode_list_range(out, start, end)
1632 }
1633}
1634
1635struct ListViewEncoder<'a, O: OffsetSizeTrait> {
1637 list: &'a GenericListViewArray<O>,
1638 values: FieldEncoder<'a>,
1639 values_offset: usize,
1640}
1641type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
1642type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
1643
1644impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
1645 fn try_new(
1646 list: &'a GenericListViewArray<O>,
1647 items_nullability: Option<Nullability>,
1648 item_plan: &FieldPlan,
1649 ) -> Result<Self, ArrowError> {
1650 let child_field = match list.data_type() {
1651 DataType::ListView(field) => field.as_ref(),
1652 DataType::LargeListView(field) => field.as_ref(),
1653 _ => {
1654 return Err(ArrowError::SchemaError(
1655 "Expected ListView or LargeListView for ListViewEncoder".into(),
1656 ));
1657 }
1658 };
1659 let values_enc = prepare_value_site_encoder(
1660 list.values().as_ref(),
1661 child_field,
1662 items_nullability,
1663 item_plan,
1664 )?;
1665 Ok(Self {
1666 list,
1667 values: values_enc,
1668 values_offset: list.values().offset(),
1669 })
1670 }
1671
1672 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1673 let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
1674 ArrowError::InvalidArgumentError(format!(
1675 "Error converting value_offset[{idx}] to usize"
1676 ))
1677 })?;
1678 let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
1679 ArrowError::InvalidArgumentError(format!("Error converting value_size[{idx}] to usize"))
1680 })?;
1681 let start = start + self.values_offset;
1682 let end = start + len;
1683 encode_blocked_range(out, start, end, |out, row| {
1684 self.values
1685 .encode(out, row.saturating_sub(self.values_offset))
1686 })
1687 }
1688}
1689
1690struct FixedSizeListEncoder<'a> {
1692 list: &'a FixedSizeListArray,
1693 values: FieldEncoder<'a>,
1694 values_offset: usize,
1695 elem_len: usize,
1696}
1697
1698impl<'a> FixedSizeListEncoder<'a> {
1699 fn try_new(
1700 list: &'a FixedSizeListArray,
1701 items_nullability: Option<Nullability>,
1702 item_plan: &FieldPlan,
1703 ) -> Result<Self, ArrowError> {
1704 let child_field = match list.data_type() {
1705 DataType::FixedSizeList(field, _len) => field.as_ref(),
1706 _ => {
1707 return Err(ArrowError::SchemaError(
1708 "Expected FixedSizeList for FixedSizeListEncoder".into(),
1709 ));
1710 }
1711 };
1712 let values_enc = prepare_value_site_encoder(
1713 list.values().as_ref(),
1714 child_field,
1715 items_nullability,
1716 item_plan,
1717 )?;
1718 Ok(Self {
1719 list,
1720 values: values_enc,
1721 values_offset: list.values().offset(),
1722 elem_len: list.value_length() as usize,
1723 })
1724 }
1725
1726 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1727 let rel = self.list.value_offset(idx) as usize;
1729 let start = self.values_offset + rel;
1730 let end = start + self.elem_len;
1731 encode_blocked_range(out, start, end, |out, row| {
1732 self.values
1733 .encode(out, row.saturating_sub(self.values_offset))
1734 })
1735 }
1736}
1737
1738fn prepare_value_site_encoder<'a>(
1739 values_array: &'a dyn Array,
1740 value_field: &Field,
1741 nullability: Option<Nullability>,
1742 plan: &FieldPlan,
1743) -> Result<FieldEncoder<'a>, ArrowError> {
1744 FieldEncoder::make_encoder(values_array, value_field, plan, nullability)
1746}
1747
1748struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
1751impl FixedEncoder<'_> {
1752 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1753 let v = self.0.value(idx); out.write_all(v)
1755 .map_err(|e| ArrowError::IoError(format!("write fixed bytes: {e}"), e))
1756 }
1757}
1758
1759struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
1762impl UuidEncoder<'_> {
1763 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1764 let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
1765 buf[0] = 0x48;
1766 let v = self.0.value(idx);
1767 let u = Uuid::from_slice(v)
1768 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid UUID bytes: {e}")))?;
1769 let _ = u.hyphenated().encode_lower(&mut buf[1..]);
1770 out.write_all(&buf)
1771 .map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e))
1772 }
1773}
1774
1775#[derive(Copy, Clone)]
1776struct DurationParts {
1777 months: u32,
1778 days: u32,
1779 millis: u32,
1780}
1781trait IntervalToDurationParts: ArrowPrimitiveType {
1783 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError>;
1784}
1785impl IntervalToDurationParts for IntervalMonthDayNanoType {
1786 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1787 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
1788 if months < 0 || days < 0 || nanos < 0 {
1789 return Err(ArrowError::InvalidArgumentError(
1790 "Avro 'duration' cannot encode negative months/days/nanoseconds".into(),
1791 ));
1792 }
1793 if nanos % 1_000_000 != 0 {
1794 return Err(ArrowError::InvalidArgumentError(
1795 "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000"
1796 .into(),
1797 ));
1798 }
1799 let millis = nanos / 1_000_000;
1800 if millis > u32::MAX as i64 {
1801 return Err(ArrowError::InvalidArgumentError(
1802 "Avro 'duration' milliseconds exceed u32::MAX".into(),
1803 ));
1804 }
1805 Ok(DurationParts {
1806 months: months as u32,
1807 days: days as u32,
1808 millis: millis as u32,
1809 })
1810 }
1811}
1812impl IntervalToDurationParts for IntervalYearMonthType {
1813 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1814 if native < 0 {
1815 return Err(ArrowError::InvalidArgumentError(
1816 "Avro 'duration' cannot encode negative months".into(),
1817 ));
1818 }
1819 Ok(DurationParts {
1820 months: native as u32,
1821 days: 0,
1822 millis: 0,
1823 })
1824 }
1825}
1826impl IntervalToDurationParts for IntervalDayTimeType {
1827 fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1828 let (days, millis) = IntervalDayTimeType::to_parts(native);
1829 if days < 0 || millis < 0 {
1830 return Err(ArrowError::InvalidArgumentError(
1831 "Avro 'duration' cannot encode negative days or milliseconds".into(),
1832 ));
1833 }
1834 Ok(DurationParts {
1835 months: 0,
1836 days: days as u32,
1837 millis: millis as u32,
1838 })
1839 }
1840}
1841
1842struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
1845impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
1846 #[inline(always)]
1847 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1848 let parts = P::duration_parts(self.0.value(idx))?;
1849 let months = parts.months.to_le_bytes();
1850 let days = parts.days.to_le_bytes();
1851 let ms = parts.millis.to_le_bytes();
1852 let buf = [
1868 months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
1869 ms[1], ms[2], ms[3],
1870 ];
1871 out.write_all(&buf)
1872 .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e))
1873 }
1874}
1875
1876trait DecimalBeBytes<const N: usize> {
1879 fn value_be_bytes(&self, idx: usize) -> [u8; N];
1880}
1881#[cfg(feature = "small_decimals")]
1882impl DecimalBeBytes<4> for Decimal32Array {
1883 fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
1884 self.value(idx).to_be_bytes()
1885 }
1886}
1887#[cfg(feature = "small_decimals")]
1888impl DecimalBeBytes<8> for Decimal64Array {
1889 fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
1890 self.value(idx).to_be_bytes()
1891 }
1892}
1893impl DecimalBeBytes<16> for Decimal128Array {
1894 fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
1895 self.value(idx).to_be_bytes()
1896 }
1897}
1898impl DecimalBeBytes<32> for Decimal256Array {
1899 fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
1900 self.value(idx).to_be_bytes()
1902 }
1903}
1904
1905struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
1911 arr: &'a A,
1912 fixed_size: Option<usize>,
1913}
1914
1915impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
1916 fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
1917 Self { arr, fixed_size }
1918 }
1919
1920 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1921 let be = self.arr.value_be_bytes(idx);
1922 match self.fixed_size {
1923 Some(n) => write_sign_extended(out, &be, n),
1924 None => write_len_prefixed(out, minimal_twos_complement(&be)),
1925 }
1926 }
1927}
1928
1929#[cfg(feature = "small_decimals")]
1930type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
1931#[cfg(feature = "small_decimals")]
1932type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
1933type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
1934type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
1935
1936struct RunEncodedEncoder<'a, R: RunEndIndexType> {
1940 ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
1941 base: usize,
1942 len: usize,
1943 values: FieldEncoder<'a>,
1944 cur_run: usize,
1946 cur_end: usize,
1948}
1949
1950type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
1951type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
1952type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
1953
1954impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
1955 fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
1956 let ends = arr.run_ends();
1957 let base = ends.get_start_physical_index();
1958 let slice = ends.values();
1959 let len = ends.len();
1960 let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
1961 Self {
1962 ends_slice: slice,
1963 base,
1964 len,
1965 values,
1966 cur_run: 0,
1967 cur_end,
1968 }
1969 }
1970
1971 #[inline(always)]
1974 fn advance_to_row(&mut self, idx: usize) -> Result<(), ArrowError> {
1975 if idx < self.cur_end {
1976 return Ok(());
1977 }
1978 while self.cur_run + 1 < self.len && idx >= self.cur_end {
1980 self.cur_run += 1;
1981 self.cur_end = self.ends_slice[self.base + self.cur_run].as_usize();
1982 }
1983 if idx < self.cur_end {
1984 Ok(())
1985 } else {
1986 Err(ArrowError::InvalidArgumentError(format!(
1987 "row index {idx} out of bounds for run-ends ({} runs)",
1988 self.len
1989 )))
1990 }
1991 }
1992
1993 #[inline(always)]
1994 fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1995 self.advance_to_row(idx)?;
1996 self.values.encode(out, self.cur_run)
1999 }
2000}
2001
2002#[cfg(test)]
2003mod tests {
2004 use super::*;
2005 use arrow_array::types::Int32Type;
2006 use arrow_array::{
2007 Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
2008 Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, NullArray,
2009 StringArray,
2010 };
2011 use arrow_buffer::Buffer;
2012 use arrow_schema::{DataType, Field, Fields, UnionFields};
2013
2014 fn zigzag_i64(v: i64) -> u64 {
2015 ((v << 1) ^ (v >> 63)) as u64
2016 }
2017
2018 fn varint(mut x: u64) -> Vec<u8> {
2019 let mut out = Vec::new();
2020 while (x & !0x7f) != 0 {
2021 out.push(((x & 0x7f) as u8) | 0x80);
2022 x >>= 7;
2023 }
2024 out.push((x & 0x7f) as u8);
2025 out
2026 }
2027
2028 fn avro_long_bytes(v: i64) -> Vec<u8> {
2029 varint(zigzag_i64(v))
2030 }
2031
2032 fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
2033 let mut out = avro_long_bytes(payload.len() as i64);
2034 out.extend_from_slice(payload);
2035 out
2036 }
2037
2038 fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
2039 let m = months.to_le_bytes();
2040 let d = days.to_le_bytes();
2041 let ms = millis.to_le_bytes();
2042 [
2043 m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
2044 ]
2045 }
2046
2047 fn encode_all(
2048 array: &dyn Array,
2049 plan: &FieldPlan,
2050 nullability: Option<Nullability>,
2051 ) -> Vec<u8> {
2052 let field = Field::new("f", array.data_type().clone(), true);
2053 let mut enc = FieldEncoder::make_encoder(array, &field, plan, nullability).unwrap();
2054 let mut out = Vec::new();
2055 for i in 0..array.len() {
2056 enc.encode(&mut out, i).unwrap();
2057 }
2058 out
2059 }
2060
2061 fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
2062 if actual != expected {
2063 let to_hex = |b: &[u8]| {
2064 b.iter()
2065 .map(|x| format!("{:02X}", x))
2066 .collect::<Vec<_>>()
2067 .join(" ")
2068 };
2069 panic!(
2070 "mismatch\n expected: [{}]\n actual: [{}]",
2071 to_hex(expected),
2072 to_hex(actual)
2073 );
2074 }
2075 }
2076
2077 #[test]
2078 fn binary_encoder() {
2079 let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
2080 let arr = BinaryArray::from_vec(values);
2081 let mut expected = Vec::new();
2082 for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
2083 expected.extend(avro_len_prefixed_bytes(payload));
2084 }
2085 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2086 assert_bytes_eq(&got, &expected);
2087 }
2088
2089 #[test]
2090 fn large_binary_encoder() {
2091 let values: Vec<&[u8]> = vec![b"xyz", b""];
2092 let arr = LargeBinaryArray::from_vec(values);
2093 let mut expected = Vec::new();
2094 for payload in [b"xyz" as &[u8], b""] {
2095 expected.extend(avro_len_prefixed_bytes(payload));
2096 }
2097 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2098 assert_bytes_eq(&got, &expected);
2099 }
2100
2101 #[test]
2102 fn utf8_encoder() {
2103 let arr = StringArray::from(vec!["", "A", "BC"]);
2104 let mut expected = Vec::new();
2105 for s in ["", "A", "BC"] {
2106 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2107 }
2108 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2109 assert_bytes_eq(&got, &expected);
2110 }
2111
2112 #[test]
2113 fn large_utf8_encoder() {
2114 let arr = LargeStringArray::from(vec!["hello", ""]);
2115 let mut expected = Vec::new();
2116 for s in ["hello", ""] {
2117 expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2118 }
2119 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2120 assert_bytes_eq(&got, &expected);
2121 }
2122
2123 #[test]
2124 fn list_encoder_int32() {
2125 let values = Int32Array::from(vec![1, 2, 3]);
2127 let offsets = vec![0, 2, 2, 3];
2128 let list = ListArray::new(
2129 Field::new("item", DataType::Int32, true).into(),
2130 arrow_buffer::OffsetBuffer::new(offsets.into()),
2131 Arc::new(values) as ArrayRef,
2132 None,
2133 );
2134 let mut expected = Vec::new();
2136 expected.extend(avro_long_bytes(2));
2138 expected.extend(avro_long_bytes(1));
2139 expected.extend(avro_long_bytes(2));
2140 expected.extend(avro_long_bytes(0));
2141 expected.extend(avro_long_bytes(0));
2143 expected.extend(avro_long_bytes(1));
2145 expected.extend(avro_long_bytes(3));
2146 expected.extend(avro_long_bytes(0));
2147
2148 let plan = FieldPlan::List {
2149 items_nullability: None,
2150 item_plan: Box::new(FieldPlan::Scalar),
2151 };
2152 let got = encode_all(&list, &plan, None);
2153 assert_bytes_eq(&got, &expected);
2154 }
2155
2156 #[test]
2157 fn struct_encoder_two_fields() {
2158 let a = Int32Array::from(vec![1, 2]);
2160 let b = StringArray::from(vec!["x", "y"]);
2161 let fields = Fields::from(vec![
2162 Field::new("a", DataType::Int32, true),
2163 Field::new("b", DataType::Utf8, true),
2164 ]);
2165 let struct_arr = StructArray::new(
2166 fields.clone(),
2167 vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2168 None,
2169 );
2170 let plan = FieldPlan::Struct {
2171 bindings: vec![
2172 FieldBinding {
2173 arrow_index: 0,
2174 nullability: None,
2175 plan: FieldPlan::Scalar,
2176 },
2177 FieldBinding {
2178 arrow_index: 1,
2179 nullability: None,
2180 plan: FieldPlan::Scalar,
2181 },
2182 ],
2183 };
2184 let got = encode_all(&struct_arr, &plan, None);
2185 let mut expected = Vec::new();
2187 expected.extend(avro_long_bytes(1)); expected.extend(avro_len_prefixed_bytes(b"x")); expected.extend(avro_long_bytes(2)); expected.extend(avro_len_prefixed_bytes(b"y")); assert_bytes_eq(&got, &expected);
2192 }
2193
2194 #[test]
2195 fn enum_encoder_dictionary() {
2196 let dict_values = StringArray::from(vec!["A", "B", "C"]);
2198 let keys = Int32Array::from(vec![2, 0, 1]);
2199 let dict =
2200 DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
2201 let symbols = Arc::<[String]>::from(
2202 vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
2203 );
2204 let plan = FieldPlan::Enum { symbols };
2205 let got = encode_all(&dict, &plan, None);
2206 let mut expected = Vec::new();
2207 expected.extend(avro_long_bytes(2));
2208 expected.extend(avro_long_bytes(0));
2209 expected.extend(avro_long_bytes(1));
2210 assert_bytes_eq(&got, &expected);
2211 }
2212
2213 #[test]
2214 fn decimal_bytes_and_fixed() {
2215 let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
2217 .with_precision_and_scale(20, 0)
2218 .unwrap();
2219 let plan_bytes = FieldPlan::Decimal { size: None };
2221 let got_bytes = encode_all(&dec, &plan_bytes, None);
2222 let mut expected_bytes = Vec::new();
2224 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2225 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2226 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2227 assert_bytes_eq(&got_bytes, &expected_bytes);
2228
2229 let plan_fixed = FieldPlan::Decimal { size: Some(16) };
2230 let got_fixed = encode_all(&dec, &plan_fixed, None);
2231 let mut expected_fixed = Vec::new();
2232 expected_fixed.extend_from_slice(&1i128.to_be_bytes());
2233 expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
2234 expected_fixed.extend_from_slice(&0i128.to_be_bytes());
2235 assert_bytes_eq(&got_fixed, &expected_fixed);
2236 }
2237
2238 #[test]
2239 fn decimal_bytes_256() {
2240 use arrow_buffer::i256;
2241 let dec = Decimal256Array::from(vec![
2243 i256::from_i128(1),
2244 i256::from_i128(-1),
2245 i256::from_i128(0),
2246 ])
2247 .with_precision_and_scale(76, 0)
2248 .unwrap();
2249 let plan_bytes = FieldPlan::Decimal { size: None };
2251 let got_bytes = encode_all(&dec, &plan_bytes, None);
2252 let mut expected_bytes = Vec::new();
2254 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2255 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2256 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2257 assert_bytes_eq(&got_bytes, &expected_bytes);
2258
2259 let plan_fixed = FieldPlan::Decimal { size: Some(32) };
2261 let got_fixed = encode_all(&dec, &plan_fixed, None);
2262 let mut expected_fixed = Vec::new();
2263 expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
2264 expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
2265 expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
2266 assert_bytes_eq(&got_fixed, &expected_fixed);
2267 }
2268
2269 #[cfg(feature = "small_decimals")]
2270 #[test]
2271 fn decimal_bytes_and_fixed_32() {
2272 let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
2274 .with_precision_and_scale(9, 0)
2275 .unwrap();
2276 let plan_bytes = FieldPlan::Decimal { size: None };
2278 let got_bytes = encode_all(&dec, &plan_bytes, None);
2279 let mut expected_bytes = Vec::new();
2280 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2281 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2282 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2283 assert_bytes_eq(&got_bytes, &expected_bytes);
2284 let plan_fixed = FieldPlan::Decimal { size: Some(4) };
2286 let got_fixed = encode_all(&dec, &plan_fixed, None);
2287 let mut expected_fixed = Vec::new();
2288 expected_fixed.extend_from_slice(&1i32.to_be_bytes());
2289 expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
2290 expected_fixed.extend_from_slice(&0i32.to_be_bytes());
2291 assert_bytes_eq(&got_fixed, &expected_fixed);
2292 }
2293
2294 #[cfg(feature = "small_decimals")]
2295 #[test]
2296 fn decimal_bytes_and_fixed_64() {
2297 let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
2299 .with_precision_and_scale(18, 0)
2300 .unwrap();
2301 let plan_bytes = FieldPlan::Decimal { size: None };
2303 let got_bytes = encode_all(&dec, &plan_bytes, None);
2304 let mut expected_bytes = Vec::new();
2305 expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2306 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2307 expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2308 assert_bytes_eq(&got_bytes, &expected_bytes);
2309 let plan_fixed = FieldPlan::Decimal { size: Some(8) };
2311 let got_fixed = encode_all(&dec, &plan_fixed, None);
2312 let mut expected_fixed = Vec::new();
2313 expected_fixed.extend_from_slice(&1i64.to_be_bytes());
2314 expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
2315 expected_fixed.extend_from_slice(&0i64.to_be_bytes());
2316 assert_bytes_eq(&got_fixed, &expected_fixed);
2317 }
2318
2319 #[test]
2320 fn float32_and_float64_encoders() {
2321 let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
2323 let mut expected32 = Vec::new();
2325 for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
2326 expected32.extend_from_slice(&v.to_bits().to_le_bytes());
2327 }
2328 let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
2329 assert_bytes_eq(&got32, &expected32);
2330 let mut expected64 = Vec::new();
2332 for v in [0.0f64, -2.25f64] {
2333 expected64.extend_from_slice(&v.to_bits().to_le_bytes());
2334 }
2335 let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
2336 assert_bytes_eq(&got64, &expected64);
2337 }
2338
2339 #[test]
2340 fn long_encoder_int64() {
2341 let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
2342 let mut expected = Vec::new();
2343 for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
2344 expected.extend(avro_long_bytes(v));
2345 }
2346 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2347 assert_bytes_eq(&got, &expected);
2348 }
2349
2350 #[test]
2351 fn fixed_encoder_plain() {
2352 let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
2354 let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
2355 let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
2356 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2357 let mut expected = Vec::new();
2358 expected.extend_from_slice(&data[0]);
2359 expected.extend_from_slice(&data[1]);
2360 assert_bytes_eq(&got, &expected);
2361 }
2362
2363 #[test]
2364 fn uuid_encoder_test() {
2365 let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
2367 let bytes = *u.as_bytes();
2368 let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
2369 let mut expected = Vec::new();
2371 expected.push(0x48);
2372 expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
2373 let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
2374 assert_bytes_eq(&got, &expected);
2375 }
2376
2377 #[test]
2378 fn uuid_encoder_error() {
2379 let arr =
2381 FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
2382 .unwrap();
2383 let plan = FieldPlan::Uuid;
2384
2385 let field = Field::new("f", arr.data_type().clone(), true);
2386 let mut enc = FieldEncoder::make_encoder(&arr, &field, &plan, None).unwrap();
2387 let mut out = Vec::new();
2388 let err = enc.encode(&mut out, 0).unwrap_err();
2389 match err {
2390 ArrowError::InvalidArgumentError(msg) => {
2391 assert!(msg.contains("Invalid UUID bytes"))
2392 }
2393 other => panic!("expected InvalidArgumentError, got {other:?}"),
2394 }
2395 }
2396
2397 fn test_scalar_primitive_encoding<T>(
2398 non_nullable_data: &[T::Native],
2399 nullable_data: &[Option<T::Native>],
2400 ) where
2401 T: ArrowPrimitiveType,
2402 T::Native: Into<i64> + Copy,
2403 PrimitiveArray<T>: From<Vec<<T as ArrowPrimitiveType>::Native>>,
2404 {
2405 let plan = FieldPlan::Scalar;
2406
2407 let array = PrimitiveArray::<T>::from(non_nullable_data.to_vec());
2408 let got = encode_all(&array, &plan, None);
2409
2410 let mut expected = Vec::new();
2411 for &value in non_nullable_data {
2412 expected.extend(avro_long_bytes(value.into()));
2413 }
2414 assert_bytes_eq(&got, &expected);
2415
2416 let array_nullable: PrimitiveArray<T> = nullable_data.iter().copied().collect();
2417 let got_nullable = encode_all(&array_nullable, &plan, Some(Nullability::NullFirst));
2418
2419 let mut expected_nullable = Vec::new();
2420 for &opt_value in nullable_data {
2421 match opt_value {
2422 Some(value) => {
2423 expected_nullable.extend(avro_long_bytes(1));
2425 expected_nullable.extend(avro_long_bytes(value.into()));
2426 }
2427 None => {
2428 expected_nullable.extend(avro_long_bytes(0));
2430 }
2431 }
2432 }
2433 assert_bytes_eq(&got_nullable, &expected_nullable);
2434 }
2435
2436 #[test]
2437 fn date32_encoder() {
2438 test_scalar_primitive_encoding::<Date32Type>(
2439 &[
2440 19345, 0, -1, ],
2444 &[Some(19345), None],
2445 );
2446 }
2447
2448 #[test]
2449 fn time32_millis_encoder() {
2450 test_scalar_primitive_encoding::<Time32MillisecondType>(
2451 &[
2452 0, 49530123, 86399999, ],
2456 &[None, Some(49530123)],
2457 );
2458 }
2459
2460 #[test]
2461 fn time64_micros_encoder() {
2462 test_scalar_primitive_encoding::<Time64MicrosecondType>(
2463 &[
2464 0, 86399999999, ],
2467 &[Some(86399999999), None],
2468 );
2469 }
2470
2471 #[test]
2472 fn timestamp_millis_encoder() {
2473 test_scalar_primitive_encoding::<TimestampMillisecondType>(
2474 &[
2475 1704067200000, 0, -123456789, ],
2479 &[None, Some(1704067200000)],
2480 );
2481 }
2482
2483 #[test]
2484 fn map_encoder_string_keys_int_values() {
2485 let keys = StringArray::from(vec!["k1", "k2"]);
2489 let values = Int32Array::from(vec![1, 2]);
2490 let entries_fields = Fields::from(vec![
2491 Field::new("key", DataType::Utf8, false),
2492 Field::new("value", DataType::Int32, true),
2493 ]);
2494 let entries = StructArray::new(
2495 entries_fields,
2496 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2497 None,
2498 );
2499 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
2500 let map = MapArray::new(
2501 Field::new("entries", entries.data_type().clone(), false).into(),
2502 offsets,
2503 entries,
2504 None,
2505 false,
2506 );
2507 let plan = FieldPlan::Map {
2508 values_nullability: None,
2509 value_plan: Box::new(FieldPlan::Scalar),
2510 };
2511 let got = encode_all(&map, &plan, None);
2512 let mut expected = Vec::new();
2513 expected.extend(avro_long_bytes(2));
2515 expected.extend(avro_len_prefixed_bytes(b"k1"));
2516 expected.extend(avro_long_bytes(1));
2517 expected.extend(avro_len_prefixed_bytes(b"k2"));
2518 expected.extend(avro_long_bytes(2));
2519 expected.extend(avro_long_bytes(0));
2520 expected.extend(avro_long_bytes(0));
2522 assert_bytes_eq(&got, &expected);
2523 }
2524
2525 #[test]
2526 fn union_encoder_string_int() {
2527 let strings = StringArray::from(vec!["hello", "world"]);
2528 let ints = Int32Array::from(vec![10, 20, 30]);
2529
2530 let union_fields = UnionFields::new(
2531 vec![0, 1],
2532 vec![
2533 Field::new("v_str", DataType::Utf8, true),
2534 Field::new("v_int", DataType::Int32, true),
2535 ],
2536 );
2537
2538 let type_ids = Buffer::from_slice_ref([0_i8, 1, 1, 0, 1]);
2539 let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2540
2541 let union_array = UnionArray::try_new(
2542 union_fields,
2543 type_ids.into(),
2544 Some(offsets.into()),
2545 vec![Arc::new(strings), Arc::new(ints)],
2546 )
2547 .unwrap();
2548
2549 let plan = FieldPlan::Union {
2550 bindings: vec![
2551 FieldBinding {
2552 arrow_index: 0,
2553 nullability: None,
2554 plan: FieldPlan::Scalar,
2555 },
2556 FieldBinding {
2557 arrow_index: 1,
2558 nullability: None,
2559 plan: FieldPlan::Scalar,
2560 },
2561 ],
2562 };
2563
2564 let got = encode_all(&union_array, &plan, None);
2565
2566 let mut expected = Vec::new();
2567 expected.extend(avro_long_bytes(0));
2568 expected.extend(avro_len_prefixed_bytes(b"hello"));
2569 expected.extend(avro_long_bytes(1));
2570 expected.extend(avro_long_bytes(10));
2571 expected.extend(avro_long_bytes(1));
2572 expected.extend(avro_long_bytes(20));
2573 expected.extend(avro_long_bytes(0));
2574 expected.extend(avro_len_prefixed_bytes(b"world"));
2575 expected.extend(avro_long_bytes(1));
2576 expected.extend(avro_long_bytes(30));
2577
2578 assert_bytes_eq(&got, &expected);
2579 }
2580
2581 #[test]
2582 fn union_encoder_null_string_int() {
2583 let nulls = NullArray::new(1);
2584 let strings = StringArray::from(vec!["hello"]);
2585 let ints = Int32Array::from(vec![10]);
2586
2587 let union_fields = UnionFields::new(
2588 vec![0, 1, 2],
2589 vec![
2590 Field::new("v_null", DataType::Null, true),
2591 Field::new("v_str", DataType::Utf8, true),
2592 Field::new("v_int", DataType::Int32, true),
2593 ],
2594 );
2595
2596 let type_ids = Buffer::from_slice_ref([0_i8, 1, 2]);
2597 let offsets = Buffer::from_slice_ref([0_i32, 0, 0]);
2601
2602 let union_array = UnionArray::try_new(
2603 union_fields,
2604 type_ids.into(),
2605 Some(offsets.into()),
2606 vec![Arc::new(nulls), Arc::new(strings), Arc::new(ints)],
2607 )
2608 .unwrap();
2609
2610 let plan = FieldPlan::Union {
2611 bindings: vec![
2612 FieldBinding {
2613 arrow_index: 0,
2614 nullability: None,
2615 plan: FieldPlan::Scalar,
2616 },
2617 FieldBinding {
2618 arrow_index: 1,
2619 nullability: None,
2620 plan: FieldPlan::Scalar,
2621 },
2622 FieldBinding {
2623 arrow_index: 2,
2624 nullability: None,
2625 plan: FieldPlan::Scalar,
2626 },
2627 ],
2628 };
2629
2630 let got = encode_all(&union_array, &plan, None);
2631
2632 let mut expected = Vec::new();
2633 expected.extend(avro_long_bytes(0));
2634 expected.extend(avro_long_bytes(1));
2635 expected.extend(avro_len_prefixed_bytes(b"hello"));
2636 expected.extend(avro_long_bytes(2));
2637 expected.extend(avro_long_bytes(10));
2638
2639 assert_bytes_eq(&got, &expected);
2640 }
2641
2642 #[test]
2643 fn list64_encoder_int32() {
2644 let values = Int32Array::from(vec![1, 2, 3]);
2646 let offsets: Vec<i64> = vec![0, 3, 3];
2647 let list = LargeListArray::new(
2648 Field::new("item", DataType::Int32, true).into(),
2649 arrow_buffer::OffsetBuffer::new(offsets.into()),
2650 Arc::new(values) as ArrayRef,
2651 None,
2652 );
2653 let plan = FieldPlan::List {
2654 items_nullability: None,
2655 item_plan: Box::new(FieldPlan::Scalar),
2656 };
2657 let got = encode_all(&list, &plan, None);
2658 let mut expected = Vec::new();
2660 expected.extend(avro_long_bytes(3));
2661 expected.extend(avro_long_bytes(1));
2662 expected.extend(avro_long_bytes(2));
2663 expected.extend(avro_long_bytes(3));
2664 expected.extend(avro_long_bytes(0));
2665 expected.extend(avro_long_bytes(0));
2666 assert_bytes_eq(&got, &expected);
2667 }
2668
2669 #[test]
2670 fn int_encoder_test() {
2671 let ints = Int32Array::from(vec![0, -1, 2]);
2672 let mut expected_i = Vec::new();
2673 for v in [0i32, -1, 2] {
2674 expected_i.extend(avro_long_bytes(v as i64));
2675 }
2676 let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
2677 assert_bytes_eq(&got_i, &expected_i);
2678 }
2679
2680 #[test]
2681 fn boolean_encoder_test() {
2682 let bools = BooleanArray::from(vec![true, false]);
2683 let mut expected_b = Vec::new();
2684 expected_b.extend_from_slice(&[1]);
2685 expected_b.extend_from_slice(&[0]);
2686 let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
2687 assert_bytes_eq(&got_b, &expected_b);
2688 }
2689
2690 #[test]
2691 #[cfg(feature = "avro_custom_types")]
2692 fn duration_encoding_seconds() {
2693 let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
2694 let mut expected = Vec::new();
2695 for v in [0i64, -1, 2] {
2696 expected.extend_from_slice(&avro_long_bytes(v));
2697 }
2698 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2699 assert_bytes_eq(&got, &expected);
2700 }
2701
2702 #[test]
2703 #[cfg(feature = "avro_custom_types")]
2704 fn duration_encoding_milliseconds() {
2705 let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
2706 let mut expected = Vec::new();
2707 for v in [1i64, 0, -2] {
2708 expected.extend_from_slice(&avro_long_bytes(v));
2709 }
2710 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2711 assert_bytes_eq(&got, &expected);
2712 }
2713
2714 #[test]
2715 #[cfg(feature = "avro_custom_types")]
2716 fn duration_encoding_microseconds() {
2717 let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
2718 let mut expected = Vec::new();
2719 for v in [5i64, -6, 7] {
2720 expected.extend_from_slice(&avro_long_bytes(v));
2721 }
2722 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2723 assert_bytes_eq(&got, &expected);
2724 }
2725
2726 #[test]
2727 #[cfg(feature = "avro_custom_types")]
2728 fn duration_encoding_nanoseconds() {
2729 let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
2730 let mut expected = Vec::new();
2731 for v in [8i64, 9, -10] {
2732 expected.extend_from_slice(&avro_long_bytes(v));
2733 }
2734 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2735 assert_bytes_eq(&got, &expected);
2736 }
2737
2738 #[test]
2739 fn duration_encoder_year_month_happy_path() {
2740 let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
2741 let mut expected = Vec::new();
2742 for m in [0u32, 1u32, 25u32] {
2743 expected.extend_from_slice(&duration_fixed12(m, 0, 0));
2744 }
2745 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2746 assert_bytes_eq(&got, &expected);
2747 }
2748
2749 #[test]
2750 fn duration_encoder_year_month_rejects_negative() {
2751 let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
2752 let field = Field::new("f", DataType::Interval(IntervalUnit::YearMonth), true);
2753 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2754 let mut out = Vec::new();
2755 let err = enc.encode(&mut out, 0).unwrap_err();
2756 match err {
2757 ArrowError::InvalidArgumentError(msg) => {
2758 assert!(msg.contains("cannot encode negative months"))
2759 }
2760 other => panic!("expected InvalidArgumentError, got {other:?}"),
2761 }
2762 }
2763
2764 #[test]
2765 fn duration_encoder_day_time_happy_path() {
2766 let v0 = IntervalDayTimeType::make_value(2, 500); let v1 = IntervalDayTimeType::make_value(0, 0);
2768 let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
2769 let mut expected = Vec::new();
2770 expected.extend_from_slice(&duration_fixed12(0, 2, 500));
2771 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2772 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2773 assert_bytes_eq(&got, &expected);
2774 }
2775
2776 #[test]
2777 fn duration_encoder_day_time_rejects_negative() {
2778 let bad = IntervalDayTimeType::make_value(-1, 0);
2779 let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
2780 let field = Field::new("f", DataType::Interval(IntervalUnit::DayTime), true);
2781 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2782 let mut out = Vec::new();
2783 let err = enc.encode(&mut out, 0).unwrap_err();
2784 match err {
2785 ArrowError::InvalidArgumentError(msg) => {
2786 assert!(msg.contains("cannot encode negative days"))
2787 }
2788 other => panic!("expected InvalidArgumentError, got {other:?}"),
2789 }
2790 }
2791
2792 #[test]
2793 fn duration_encoder_month_day_nano_happy_path() {
2794 let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
2796 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
2797 let mut expected = Vec::new();
2798 expected.extend_from_slice(&duration_fixed12(1, 2, 3));
2799 expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2800 let got = encode_all(&arr, &FieldPlan::Scalar, None);
2801 assert_bytes_eq(&got, &expected);
2802 }
2803
2804 #[test]
2805 fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
2806 let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
2807 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
2808 let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true);
2809 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2810 let mut out = Vec::new();
2811 let err = enc.encode(&mut out, 0).unwrap_err();
2812 match err {
2813 ArrowError::InvalidArgumentError(msg) => {
2814 assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
2815 }
2816 other => panic!("expected InvalidArgumentError, got {other:?}"),
2817 }
2818 }
2819
2820 #[test]
2821 fn minimal_twos_complement_test() {
2822 let pos = [0x00, 0x00, 0x01];
2823 assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
2824 let neg = [0xFF, 0xFF, 0x80]; assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
2826 let zero = [0x00, 0x00, 0x00];
2827 assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
2828 }
2829
2830 #[test]
2831 fn write_sign_extend_test() {
2832 let mut out = Vec::new();
2833 write_sign_extended(&mut out, &[0x01], 4).unwrap();
2834 assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
2835 out.clear();
2836 write_sign_extended(&mut out, &[0xFF], 4).unwrap();
2837 assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
2838 out.clear();
2839 write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
2841 assert_eq!(out, vec![0xFF, 0x80]);
2842 out.clear();
2843 let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
2845 match err {
2846 ArrowError::InvalidArgumentError(_) => {}
2847 _ => panic!("expected InvalidArgumentError"),
2848 }
2849 }
2850
2851 #[test]
2852 fn duration_month_day_nano_overflow_millis() {
2853 let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
2855 let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
2856 let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
2857 let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true);
2858 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2859 let mut out = Vec::new();
2860 let err = enc.encode(&mut out, 0).unwrap_err();
2861 match err {
2862 ArrowError::InvalidArgumentError(msg) => assert!(msg.contains("exceed u32::MAX")),
2863 _ => panic!("expected InvalidArgumentError"),
2864 }
2865 }
2866
2867 #[test]
2868 fn fieldplan_decimal_precision_scale_mismatch_errors() {
2869 use crate::codec::Codec;
2871 use std::collections::HashMap;
2872 let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
2873 let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2874 let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
2875 match err {
2876 ArrowError::SchemaError(msg) => {
2877 assert!(msg.contains("Decimal precision/scale mismatch"))
2878 }
2879 _ => panic!("expected SchemaError"),
2880 }
2881 }
2882
2883 #[test]
2884 fn timestamp_micros_encoder() {
2885 test_scalar_primitive_encoding::<TimestampMicrosecondType>(
2887 &[
2888 1_704_067_200_000_000, 0, -123_456_789, ],
2892 &[None, Some(1_704_067_200_000_000)],
2893 );
2894 }
2895
2896 #[test]
2897 fn list_encoder_nullable_items_null_first() {
2898 let values = Int32Array::from(vec![Some(1), None, Some(2)]);
2900 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
2901 let list = ListArray::new(
2902 Field::new("item", DataType::Int32, true).into(),
2903 offsets,
2904 Arc::new(values) as ArrayRef,
2905 None,
2906 );
2907
2908 let plan = FieldPlan::List {
2909 items_nullability: Some(Nullability::NullFirst),
2910 item_plan: Box::new(FieldPlan::Scalar),
2911 };
2912
2913 let mut expected = Vec::new();
2916 expected.extend(avro_long_bytes(3)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(2)); expected.extend(avro_long_bytes(0)); let got = encode_all(&list, &plan, None);
2925 assert_bytes_eq(&got, &expected);
2926 }
2927
2928 #[test]
2929 fn large_list_encoder_nullable_items_null_first() {
2930 let values = Int32Array::from(vec![Some(10), None]);
2932 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
2933 let list = LargeListArray::new(
2934 Field::new("item", DataType::Int32, true).into(),
2935 offsets,
2936 Arc::new(values) as ArrayRef,
2937 None,
2938 );
2939
2940 let plan = FieldPlan::List {
2941 items_nullability: Some(Nullability::NullFirst),
2942 item_plan: Box::new(FieldPlan::Scalar),
2943 };
2944
2945 let mut expected = Vec::new();
2946 expected.extend(avro_long_bytes(2)); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(10)); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(0)); let got = encode_all(&list, &plan, None);
2953 assert_bytes_eq(&got, &expected);
2954 }
2955
2956 #[test]
2957 fn map_encoder_string_keys_nullable_int_values_null_first() {
2958 let keys = StringArray::from(vec!["k1", "k2"]);
2960 let values = Int32Array::from(vec![Some(7), None]);
2961
2962 let entries_fields = Fields::from(vec![
2963 Field::new("key", DataType::Utf8, false),
2964 Field::new("value", DataType::Int32, true),
2965 ]);
2966 let entries = StructArray::new(
2967 entries_fields,
2968 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2969 None,
2970 );
2971
2972 let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
2974 let map = MapArray::new(
2975 Field::new("entries", entries.data_type().clone(), false).into(),
2976 offsets,
2977 entries,
2978 None,
2979 false,
2980 );
2981
2982 let plan = FieldPlan::Map {
2983 values_nullability: Some(Nullability::NullFirst),
2984 value_plan: Box::new(FieldPlan::Scalar),
2985 };
2986
2987 let mut expected = Vec::new();
2993 expected.extend(avro_long_bytes(2)); expected.extend(avro_len_prefixed_bytes(b"k1")); expected.extend(avro_long_bytes(1)); expected.extend(avro_long_bytes(7)); expected.extend(avro_len_prefixed_bytes(b"k2")); expected.extend(avro_long_bytes(0)); expected.extend(avro_long_bytes(0)); let got = encode_all(&map, &plan, None);
3002 assert_bytes_eq(&got, &expected);
3003 }
3004
3005 #[test]
3006 fn time32_seconds_to_millis_encoder() {
3007 let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
3009 vec![0i32, 1, -2, 12_345].into();
3010
3011 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3012
3013 let mut expected = Vec::new();
3014 for secs in [0i32, 1, -2, 12_345] {
3015 let millis = (secs as i64) * 1000;
3016 expected.extend_from_slice(&avro_long_bytes(millis));
3017 }
3018 assert_bytes_eq(&got, &expected);
3019 }
3020
3021 #[test]
3022 fn time32_seconds_to_millis_overflow() {
3023 let overflow_secs: i32 = i32::MAX / 1000 + 1;
3025 let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
3026 vec![overflow_secs].into();
3027
3028 let field = arrow_schema::Field::new(
3029 "f",
3030 arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second),
3031 true,
3032 );
3033 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
3034
3035 let mut out = Vec::new();
3036 let err = enc.encode(&mut out, 0).unwrap_err();
3037 match err {
3038 arrow_schema::ArrowError::InvalidArgumentError(msg) => {
3039 assert!(
3040 msg.contains("overflowed") || msg.contains("overflow"),
3041 "unexpected message: {msg}"
3042 )
3043 }
3044 other => panic!("expected InvalidArgumentError, got {other:?}"),
3045 }
3046 }
3047
3048 #[test]
3049 fn timestamp_seconds_to_millis_encoder() {
3050 let arr: arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
3052 vec![0i64, 1, -1, 1_234_567_890].into();
3053
3054 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3055
3056 let mut expected = Vec::new();
3057 for secs in [0i64, 1, -1, 1_234_567_890] {
3058 let millis = secs * 1000;
3059 expected.extend_from_slice(&avro_long_bytes(millis));
3060 }
3061 assert_bytes_eq(&got, &expected);
3062 }
3063
3064 #[test]
3065 fn timestamp_seconds_to_millis_overflow() {
3066 let overflow_secs: i64 = i64::MAX / 1000 + 1;
3068 let arr: arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
3069 vec![overflow_secs].into();
3070
3071 let field = arrow_schema::Field::new(
3072 "f",
3073 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, None),
3074 true,
3075 );
3076 let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
3077
3078 let mut out = Vec::new();
3079 let err = enc.encode(&mut out, 0).unwrap_err();
3080 match err {
3081 arrow_schema::ArrowError::InvalidArgumentError(msg) => {
3082 assert!(
3083 msg.contains("overflowed") || msg.contains("overflow"),
3084 "unexpected message: {msg}"
3085 )
3086 }
3087 other => panic!("expected InvalidArgumentError, got {other:?}"),
3088 }
3089 }
3090
3091 #[test]
3092 fn timestamp_nanos_encoder() {
3093 let arr: arrow_array::PrimitiveArray<arrow_array::types::TimestampNanosecondType> =
3094 vec![0i64, 1, -1, 123].into();
3095
3096 let got = encode_all(&arr, &FieldPlan::Scalar, None);
3097
3098 let mut expected = Vec::new();
3099 for ns in [0i64, 1, -1, 123] {
3100 expected.extend_from_slice(&avro_long_bytes(ns));
3101 }
3102 assert_bytes_eq(&got, &expected);
3103 }
3104}