Skip to main content

arrow_avro/writer/
encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Encoder for Arrow types.
19
20use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::errors::AvroError;
22use crate::schema::{Fingerprint, Nullability, Prefix};
23use arrow_array::Float16Array;
24use arrow_array::cast::AsArray;
25use arrow_array::types::{
26    ArrowPrimitiveType, Date32Type, Date64Type, DurationMicrosecondType, DurationMillisecondType,
27    DurationNanosecondType, DurationSecondType, Float16Type, Float32Type, Float64Type, Int8Type,
28    Int16Type, Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType,
29    IntervalYearMonthType, RunEndIndexType, Time32MillisecondType, Time32SecondType,
30    Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
31    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type,
32    UInt32Type, UInt64Type,
33};
34use arrow_array::{
35    Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
36    FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray,
37    GenericListViewArray, GenericStringArray, LargeListArray, LargeListViewArray, ListArray,
38    ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
39    StringViewArray, StructArray, UnionArray,
40};
41#[cfg(feature = "small_decimals")]
42use arrow_array::{Decimal32Array, Decimal64Array};
43use arrow_buffer::{ArrowNativeType, NullBuffer};
44use arrow_schema::{DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode};
45use bytes::{BufMut, BytesMut};
46use std::io::Write;
47use std::sync::Arc;
48use uuid::Uuid;
49
50macro_rules! for_rows_with_prefix {
51    ($n:expr, $prefix:expr, $out:ident, |$row:ident| $body:block) => {{
52        match $prefix {
53            Some(prefix) => {
54                for $row in 0..$n {
55                    $out.write_all(prefix)
56                        .map_err(|e| AvroError::IoError(format!("write prefix: {e}"), e))?;
57                    $body
58                }
59            }
60            None => {
61                for $row in 0..$n {
62                    $body
63                }
64            }
65        }
66    }};
67}
68
69/// Encode a single Avro-`long` using ZigZag + variable length, buffered.
70///
71/// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding>
72#[inline]
73pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), AvroError> {
74    let mut zz = ((value << 1) ^ (value >> 63)) as u64;
75    // At most 10 bytes for 64-bit varint
76    let mut buf = [0u8; 10];
77    let mut i = 0;
78    while (zz & !0x7F) != 0 {
79        buf[i] = ((zz & 0x7F) as u8) | 0x80;
80        i += 1;
81        zz >>= 7;
82    }
83    buf[i] = (zz & 0x7F) as u8;
84    i += 1;
85    out.write_all(&buf[..i])
86        .map_err(|e| AvroError::IoError(format!("write long: {e}"), e))
87}
88
89#[inline]
90fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), AvroError> {
91    write_long(out, value as i64)
92}
93
94#[inline]
95fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), AvroError> {
96    write_long(out, bytes.len() as i64)?;
97    out.write_all(bytes)
98        .map_err(|e| AvroError::IoError(format!("write bytes: {e}"), e))
99}
100
101#[inline]
102fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), AvroError> {
103    out.write_all(&[if v { 1 } else { 0 }])
104        .map_err(|e| AvroError::IoError(format!("write bool: {e}"), e))
105}
106
107/// Minimal two's-complement big-endian representation helper for Avro decimal (bytes).
108///
109/// For positive numbers, trim leading 0x00 until an essential byte is reached.
110/// For negative numbers, trim leading 0xFF until an essential byte is reached.
111/// The resulting slice still encodes the same signed value.
112///
113/// See Avro spec: decimal over `bytes` uses two's-complement big-endian
114/// representation of the unscaled integer value. 1.11.1 specification.
115#[inline]
116fn minimal_twos_complement(be: &[u8]) -> &[u8] {
117    if be.is_empty() {
118        return be;
119    }
120    let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
121    let mut k = 0usize;
122    while k < be.len() && be[k] == sign_byte {
123        k += 1;
124    }
125    if k == 0 {
126        return be;
127    }
128    if k == be.len() {
129        return &be[be.len() - 1..];
130    }
131    let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
132        k
133    } else {
134        k - 1
135    };
136    &be[drop..]
137}
138
139/// Sign-extend (or validate/truncate) big-endian integer bytes to exactly `n` bytes.
140///
141///
142/// - If shorter than `n`, the slice is sign-extended by left-padding with the
143///   sign byte (`0x00` for positive, `0xFF` for negative).
144/// - If longer than `n`, the slice is truncated from the left. An overflow error
145///   is returned if any of the truncated bytes are not redundant sign bytes,
146///   or if the resulting value's sign bit would differ from the original.
147/// - If the slice is already `n` bytes long, it is copied.
148///
149/// Used for encoding Avro decimal values into `fixed(N)` fields.
150#[inline]
151fn write_sign_extended<W: Write + ?Sized>(
152    out: &mut W,
153    src_be: &[u8],
154    n: usize,
155) -> Result<(), AvroError> {
156    let len = src_be.len();
157    if len == n {
158        out.write_all(src_be)?;
159        return Ok(());
160    }
161    let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
162        0xFF
163    } else {
164        0x00
165    };
166    if len > n {
167        let extra = len - n;
168        if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
169            return Ok(());
170        }
171        // All truncated bytes must equal the sign byte, and the MSB of the first
172        // retained byte must match the sign (otherwise overflow).
173        if src_be[..extra].iter().any(|&b| b != sign_byte)
174            || ((src_be[extra] ^ sign_byte) & 0x80) != 0
175        {
176            return Err(AvroError::InvalidArgument(format!(
177                "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
178            )));
179        }
180        return out
181            .write_all(&src_be[extra..])
182            .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e));
183    }
184    // len < n: prepend sign bytes (sign extension) then the payload
185    let pad_len = n - len;
186    // Fixed-size stack pads to avoid heap allocation on the hot path
187    const ZPAD: [u8; 64] = [0x00; 64];
188    const FPAD: [u8; 64] = [0xFF; 64];
189    let pad = if sign_byte == 0x00 {
190        &ZPAD[..]
191    } else {
192        &FPAD[..]
193    };
194    // Emit padding in 64‑byte chunks (minimizes write calls without allocating),
195    // then write the original bytes.
196    let mut rem = pad_len;
197    while rem >= pad.len() {
198        out.write_all(pad)
199            .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))?;
200        rem -= pad.len();
201    }
202    if rem > 0 {
203        out.write_all(&pad[..rem])
204            .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))?;
205    }
206    out.write_all(src_be)
207        .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))
208}
209
210/// Write the union branch index for an optional field.
211///
212/// Branch index is 0-based per Avro unions:
213/// - Null-first (default): null => 0, value => 1
214/// - Null-second (Impala): value => 0, null => 1
215fn write_optional_index<W: Write + ?Sized>(
216    out: &mut W,
217    is_null: bool,
218    null_order: Nullability,
219) -> Result<(), AvroError> {
220    let byte = union_value_branch_byte(null_order, is_null);
221    out.write_all(&[byte])
222        .map_err(|e| AvroError::IoError(format!("write union branch: {e}"), e))
223}
224
225#[derive(Debug, Clone)]
226enum NullState<'a> {
227    NonNullable,
228    NullableNoNulls {
229        union_value_byte: u8,
230    },
231    Nullable {
232        nulls: &'a NullBuffer,
233        null_order: Nullability,
234    },
235}
236
237/// Arrow to Avro FieldEncoder:
238/// - Holds the inner `Encoder` (by value)
239/// - Carries the per-site nullability **state** as a single enum that enforces invariants
240pub(crate) struct FieldEncoder<'a> {
241    encoder: Encoder<'a>,
242    null_state: NullState<'a>,
243}
244
245impl<'a> FieldEncoder<'a> {
246    fn make_encoder(
247        array: &'a dyn Array,
248        plan: &FieldPlan,
249        nullability: Option<Nullability>,
250    ) -> Result<Self, AvroError> {
251        let encoder = match plan {
252            FieldPlan::Scalar => match array.data_type() {
253                DataType::Null => Encoder::Null,
254                DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
255                DataType::Utf8 => {
256                    Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
257                }
258                DataType::LargeUtf8 => {
259                    Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
260                }
261                DataType::Utf8View => {
262                    let arr = array
263                        .as_any()
264                        .downcast_ref::<StringViewArray>()
265                        .ok_or_else(|| AvroError::SchemaError("Expected StringViewArray".into()))?;
266                    Encoder::Utf8View(Utf8ViewEncoder(arr))
267                }
268                DataType::BinaryView => {
269                    let arr = array
270                        .as_any()
271                        .downcast_ref::<BinaryViewArray>()
272                        .ok_or_else(|| AvroError::SchemaError("Expected BinaryViewArray".into()))?;
273                    Encoder::BinaryView(BinaryViewEncoder(arr))
274                }
275                DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
276                DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
277                #[cfg(feature = "avro_custom_types")]
278                DataType::Int8 => Encoder::Int8(Int8Encoder(array.as_primitive::<Int8Type>())),
279                #[cfg(not(feature = "avro_custom_types"))]
280                DataType::Int8 => {
281                    Encoder::Int8ToInt(Int8ToIntEncoder(array.as_primitive::<Int8Type>()))
282                }
283                #[cfg(feature = "avro_custom_types")]
284                DataType::Int16 => Encoder::Int16(Int16Encoder(array.as_primitive::<Int16Type>())),
285                #[cfg(not(feature = "avro_custom_types"))]
286                DataType::Int16 => {
287                    Encoder::Int16ToInt(Int16ToIntEncoder(array.as_primitive::<Int16Type>()))
288                }
289                #[cfg(feature = "avro_custom_types")]
290                DataType::UInt8 => Encoder::UInt8(UInt8Encoder(array.as_primitive::<UInt8Type>())),
291                #[cfg(not(feature = "avro_custom_types"))]
292                DataType::UInt8 => {
293                    Encoder::UInt8ToInt(UInt8ToIntEncoder(array.as_primitive::<UInt8Type>()))
294                }
295                #[cfg(feature = "avro_custom_types")]
296                DataType::UInt16 => {
297                    Encoder::UInt16(UInt16Encoder(array.as_primitive::<UInt16Type>()))
298                }
299                #[cfg(not(feature = "avro_custom_types"))]
300                DataType::UInt16 => {
301                    Encoder::UInt16ToInt(UInt16ToIntEncoder(array.as_primitive::<UInt16Type>()))
302                }
303                #[cfg(feature = "avro_custom_types")]
304                DataType::UInt32 => {
305                    Encoder::UInt32(UInt32Encoder(array.as_primitive::<UInt32Type>()))
306                }
307                #[cfg(not(feature = "avro_custom_types"))]
308                DataType::UInt32 => {
309                    Encoder::UInt32ToLong(UInt32ToLongEncoder(array.as_primitive::<UInt32Type>()))
310                }
311                #[cfg(feature = "avro_custom_types")]
312                DataType::UInt64 => {
313                    Encoder::UInt64Fixed(UInt64FixedEncoder(array.as_primitive::<UInt64Type>()))
314                }
315                #[cfg(not(feature = "avro_custom_types"))]
316                DataType::UInt64 => {
317                    Encoder::UInt64ToLong(UInt64ToLongEncoder(array.as_primitive::<UInt64Type>()))
318                }
319                #[cfg(feature = "avro_custom_types")]
320                DataType::Float16 => {
321                    Encoder::Float16Fixed(Float16FixedEncoder(array.as_primitive::<Float16Type>()))
322                }
323                #[cfg(not(feature = "avro_custom_types"))]
324                DataType::Float16 => Encoder::Float16ToFloat(Float16ToFloatEncoder(
325                    array.as_primitive::<Float16Type>(),
326                )),
327                DataType::Date32 => Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
328                #[cfg(not(feature = "avro_custom_types"))]
329                DataType::Date64 => {
330                    // Encode as local-timestamp-millis (long)
331                    Encoder::Date64ToLong(Date64ToLongEncoder(array.as_primitive::<Date64Type>()))
332                }
333                #[cfg(feature = "avro_custom_types")]
334                DataType::Date64 => {
335                    Encoder::Date64(LongEncoder(array.as_primitive::<Date64Type>()))
336                }
337                #[cfg(feature = "avro_custom_types")]
338                DataType::Time32(TimeUnit::Second) => {
339                    Encoder::Time32Secs(IntEncoder(array.as_primitive::<Time32SecondType>()))
340                }
341                DataType::Time32(TimeUnit::Millisecond) => {
342                    Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
343                }
344                DataType::Time32(TimeUnit::Microsecond) => {
345                    return Err(AvroError::InvalidArgument(
346                        "Arrow Time32 only supports Second or Millisecond. Use Time64 for microseconds."
347                            .into(),
348                    ));
349                }
350                DataType::Time32(TimeUnit::Nanosecond) => {
351                    return Err(AvroError::InvalidArgument(
352                        "Arrow Time32 only supports Second or Millisecond. Use Time64 for nanoseconds."
353                            .into(),
354                    ));
355                }
356                DataType::Time64(TimeUnit::Microsecond) => Encoder::Time64Micros(LongEncoder(
357                    array.as_primitive::<Time64MicrosecondType>(),
358                )),
359                #[cfg(not(feature = "avro_custom_types"))]
360                DataType::Time64(TimeUnit::Nanosecond) => {
361                    // Truncate nanoseconds to microseconds for time-micros logical type
362                    Encoder::Time64NanosToMicros(Time64NanosToMicrosEncoder(
363                        array.as_primitive::<Time64NanosecondType>(),
364                    ))
365                }
366                #[cfg(feature = "avro_custom_types")]
367                DataType::Time64(TimeUnit::Nanosecond) => {
368                    Encoder::Time64Nanos(LongEncoder(array.as_primitive::<Time64NanosecondType>()))
369                }
370                DataType::Time64(TimeUnit::Millisecond) => {
371                    return Err(AvroError::InvalidArgument(
372                        "Arrow Time64 with millisecond unit is not a valid Arrow type (use Time32 for millis)."
373                            .into(),
374                    ));
375                }
376                DataType::Time64(TimeUnit::Second) => {
377                    return Err(AvroError::InvalidArgument(
378                        "Arrow Time64 with second unit is not a valid Arrow type (use Time32 for seconds)."
379                            .into(),
380                    ));
381                }
382                DataType::Float32 => {
383                    Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
384                }
385                DataType::Float64 => {
386                    Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
387                }
388                DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
389                DataType::LargeBinary => {
390                    Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
391                }
392                DataType::FixedSizeBinary(_len) => {
393                    let arr = array
394                        .as_any()
395                        .downcast_ref::<FixedSizeBinaryArray>()
396                        .ok_or_else(|| {
397                            AvroError::SchemaError("Expected FixedSizeBinaryArray".into())
398                        })?;
399                    Encoder::Fixed(FixedEncoder(arr))
400                }
401                DataType::Timestamp(unit, _) => match unit {
402                    TimeUnit::Second => {
403                        #[cfg(not(feature = "avro_custom_types"))]
404                        {
405                            Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
406                                array.as_primitive::<TimestampSecondType>(),
407                            ))
408                        }
409                        #[cfg(feature = "avro_custom_types")]
410                        {
411                            Encoder::TimestampSecs(LongEncoder(
412                                array.as_primitive::<TimestampSecondType>(),
413                            ))
414                        }
415                    }
416                    TimeUnit::Millisecond => Encoder::TimestampMillis(LongEncoder(
417                        array.as_primitive::<TimestampMillisecondType>(),
418                    )),
419                    TimeUnit::Microsecond => Encoder::TimestampMicros(LongEncoder(
420                        array.as_primitive::<TimestampMicrosecondType>(),
421                    )),
422                    TimeUnit::Nanosecond => Encoder::TimestampNanos(LongEncoder(
423                        array.as_primitive::<TimestampNanosecondType>(),
424                    )),
425                },
426                #[cfg(feature = "avro_custom_types")]
427                DataType::Interval(unit) => match unit {
428                    IntervalUnit::MonthDayNano => {
429                        Encoder::IntervalMonthDayNanoFixed(IntervalMonthDayNanoFixedEncoder(
430                            array.as_primitive::<IntervalMonthDayNanoType>(),
431                        ))
432                    }
433                    IntervalUnit::YearMonth => {
434                        Encoder::IntervalYearMonthFixed(IntervalYearMonthFixedEncoder(
435                            array.as_primitive::<IntervalYearMonthType>(),
436                        ))
437                    }
438                    IntervalUnit::DayTime => Encoder::IntervalDayTimeFixed(
439                        IntervalDayTimeFixedEncoder(array.as_primitive::<IntervalDayTimeType>()),
440                    ),
441                },
442                DataType::Duration(tu) => match tu {
443                    TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
444                        array.as_primitive::<DurationSecondType>(),
445                    )),
446                    TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
447                        array.as_primitive::<DurationMillisecondType>(),
448                    )),
449                    TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
450                        array.as_primitive::<DurationMicrosecondType>(),
451                    )),
452                    TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
453                        array.as_primitive::<DurationNanosecondType>(),
454                    )),
455                },
456                other => {
457                    return Err(AvroError::NYI(format!(
458                        "Avro scalar type not yet supported: {other:?}"
459                    )));
460                }
461            },
462            FieldPlan::Struct { bindings } => {
463                let arr = array
464                    .as_any()
465                    .downcast_ref::<StructArray>()
466                    .ok_or_else(|| AvroError::SchemaError("Expected StructArray".into()))?;
467                Encoder::Struct(Box::new(StructEncoder::try_new(arr, bindings)?))
468            }
469            FieldPlan::List {
470                items_nullability,
471                item_plan,
472            } => match array.data_type() {
473                DataType::List(_) => {
474                    let arr = array
475                        .as_any()
476                        .downcast_ref::<ListArray>()
477                        .ok_or_else(|| AvroError::SchemaError("Expected ListArray".into()))?;
478                    Encoder::List(Box::new(ListEncoder32::try_new(
479                        arr,
480                        *items_nullability,
481                        item_plan.as_ref(),
482                    )?))
483                }
484                DataType::LargeList(_) => {
485                    let arr = array
486                        .as_any()
487                        .downcast_ref::<LargeListArray>()
488                        .ok_or_else(|| AvroError::SchemaError("Expected LargeListArray".into()))?;
489                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
490                        arr,
491                        *items_nullability,
492                        item_plan.as_ref(),
493                    )?))
494                }
495                DataType::ListView(_) => {
496                    let arr = array
497                        .as_any()
498                        .downcast_ref::<ListViewArray>()
499                        .ok_or_else(|| AvroError::SchemaError("Expected ListViewArray".into()))?;
500                    Encoder::ListView(Box::new(ListViewEncoder32::try_new(
501                        arr,
502                        *items_nullability,
503                        item_plan.as_ref(),
504                    )?))
505                }
506                DataType::LargeListView(_) => {
507                    let arr = array
508                        .as_any()
509                        .downcast_ref::<LargeListViewArray>()
510                        .ok_or_else(|| {
511                            AvroError::SchemaError("Expected LargeListViewArray".into())
512                        })?;
513                    Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
514                        arr,
515                        *items_nullability,
516                        item_plan.as_ref(),
517                    )?))
518                }
519                DataType::FixedSizeList(_, _) => {
520                    let arr = array
521                        .as_any()
522                        .downcast_ref::<FixedSizeListArray>()
523                        .ok_or_else(|| {
524                            AvroError::SchemaError("Expected FixedSizeListArray".into())
525                        })?;
526                    Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
527                        arr,
528                        *items_nullability,
529                        item_plan.as_ref(),
530                    )?))
531                }
532                other => {
533                    return Err(AvroError::SchemaError(format!(
534                        "Avro array site requires Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
535                    )));
536                }
537            },
538            FieldPlan::Decimal { size } => match array.data_type() {
539                #[cfg(feature = "small_decimals")]
540                DataType::Decimal32(_, _) => {
541                    let arr = array
542                        .as_any()
543                        .downcast_ref::<Decimal32Array>()
544                        .ok_or_else(|| AvroError::SchemaError("Expected Decimal32Array".into()))?;
545                    Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
546                }
547                #[cfg(feature = "small_decimals")]
548                DataType::Decimal64(_, _) => {
549                    let arr = array
550                        .as_any()
551                        .downcast_ref::<Decimal64Array>()
552                        .ok_or_else(|| AvroError::SchemaError("Expected Decimal64Array".into()))?;
553                    Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
554                }
555                DataType::Decimal128(_, _) => {
556                    let arr = array
557                        .as_any()
558                        .downcast_ref::<Decimal128Array>()
559                        .ok_or_else(|| AvroError::SchemaError("Expected Decimal128Array".into()))?;
560                    Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
561                }
562                DataType::Decimal256(_, _) => {
563                    let arr = array
564                        .as_any()
565                        .downcast_ref::<Decimal256Array>()
566                        .ok_or_else(|| AvroError::SchemaError("Expected Decimal256Array".into()))?;
567                    Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
568                }
569                other => {
570                    return Err(AvroError::SchemaError(format!(
571                        "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
572                    )));
573                }
574            },
575            FieldPlan::Uuid => {
576                let arr = array
577                    .as_any()
578                    .downcast_ref::<FixedSizeBinaryArray>()
579                    .ok_or_else(|| {
580                        AvroError::SchemaError("Expected FixedSizeBinaryArray".into())
581                    })?;
582                Encoder::Uuid(UuidEncoder(arr))
583            }
584            FieldPlan::Map {
585                values_nullability,
586                value_plan,
587            } => {
588                let arr = array
589                    .as_any()
590                    .downcast_ref::<MapArray>()
591                    .ok_or_else(|| AvroError::SchemaError("Expected MapArray".into()))?;
592                Encoder::Map(Box::new(MapEncoder::try_new(
593                    arr,
594                    *values_nullability,
595                    value_plan.as_ref(),
596                )?))
597            }
598            FieldPlan::Enum { symbols } => match array.data_type() {
599                DataType::Dictionary(key_dt, value_dt) => {
600                    if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
601                        return Err(AvroError::SchemaError(
602                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
603                        ));
604                    }
605                    let dict = array
606                        .as_any()
607                        .downcast_ref::<DictionaryArray<Int32Type>>()
608                        .ok_or_else(|| {
609                            AvroError::SchemaError("Expected DictionaryArray<Int32>".into())
610                        })?;
611                    let values = dict
612                        .values()
613                        .as_any()
614                        .downcast_ref::<StringArray>()
615                        .ok_or_else(|| {
616                            AvroError::SchemaError("Dictionary values must be Utf8".into())
617                        })?;
618                    if values.len() != symbols.len() {
619                        return Err(AvroError::SchemaError(format!(
620                            "Enum symbol length {} != dictionary size {}",
621                            symbols.len(),
622                            values.len()
623                        )));
624                    }
625                    for i in 0..values.len() {
626                        if values.value(i) != symbols[i].as_str() {
627                            return Err(AvroError::SchemaError(format!(
628                                "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
629                                symbols[i],
630                                values.value(i)
631                            )));
632                        }
633                    }
634                    let keys = dict.keys();
635                    Encoder::Enum(EnumEncoder { keys })
636                }
637                other => {
638                    return Err(AvroError::SchemaError(format!(
639                        "Avro enum site requires DataType::Dictionary, found: {other:?}"
640                    )));
641                }
642            },
643            FieldPlan::Union { bindings } => {
644                let arr = array
645                    .as_any()
646                    .downcast_ref::<UnionArray>()
647                    .ok_or_else(|| AvroError::SchemaError("Expected UnionArray".into()))?;
648
649                Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
650            }
651            FieldPlan::RunEndEncoded {
652                values_nullability,
653                value_plan,
654            } => {
655                // Helper closure to build a typed RunEncodedEncoder<R>
656                let build = |run_arr_any: &'a dyn Array| -> Result<Encoder<'a>, AvroError> {
657                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
658                        return Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
659                            Int16Type,
660                        >::new(
661                            arr,
662                            FieldEncoder::make_encoder(
663                                arr.values().as_ref(),
664                                value_plan.as_ref(),
665                                *values_nullability,
666                            )?,
667                        ))));
668                    }
669                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
670                        return Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
671                            Int32Type,
672                        >::new(
673                            arr,
674                            FieldEncoder::make_encoder(
675                                arr.values().as_ref(),
676                                value_plan.as_ref(),
677                                *values_nullability,
678                            )?,
679                        ))));
680                    }
681                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
682                        return Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
683                            Int64Type,
684                        >::new(
685                            arr,
686                            FieldEncoder::make_encoder(
687                                arr.values().as_ref(),
688                                value_plan.as_ref(),
689                                *values_nullability,
690                            )?,
691                        ))));
692                    }
693                    Err(AvroError::SchemaError(
694                        "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64"
695                            .into(),
696                    ))
697                };
698                build(array)?
699            }
700            FieldPlan::Duration => match array.data_type() {
701                DataType::Interval(IntervalUnit::MonthDayNano) => {
702                    Encoder::IntervalMonthDayNanoDuration(DurationEncoder(
703                        array.as_primitive::<IntervalMonthDayNanoType>(),
704                    ))
705                }
706                DataType::Interval(IntervalUnit::YearMonth) => Encoder::IntervalYearMonthDuration(
707                    DurationEncoder(array.as_primitive::<IntervalYearMonthType>()),
708                ),
709                DataType::Interval(IntervalUnit::DayTime) => Encoder::IntervalDayTimeDuration(
710                    DurationEncoder(array.as_primitive::<IntervalDayTimeType>()),
711                ),
712                other => {
713                    return Err(AvroError::SchemaError(format!(
714                        "Avro duration requires Arrow Interval type, found: {other:?}"
715                    )));
716                }
717            },
718            FieldPlan::TimeMillisFromSecs => match array.data_type() {
719                DataType::Time32(TimeUnit::Second) => Encoder::Time32SecsToMillis(
720                    Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
721                ),
722                other => {
723                    return Err(AvroError::SchemaError(format!(
724                        "Avro time-millis-from-seconds requires Arrow Time32(Second), found: {other:?}"
725                    )));
726                }
727            },
728        };
729        // Compute the effective null state from writer-declared nullability and data nulls.
730        let null_state = match nullability {
731            None => NullState::NonNullable,
732            Some(null_order) => match array.nulls() {
733                Some(nulls) if array.null_count() > 0 => NullState::Nullable { nulls, null_order },
734                _ => NullState::NullableNoNulls {
735                    // Nullable site with no null buffer for this view
736                    union_value_byte: union_value_branch_byte(null_order, false),
737                },
738            },
739        };
740        Ok(Self {
741            encoder,
742            null_state,
743        })
744    }
745
746    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
747        match &self.null_state {
748            NullState::NonNullable => {}
749            NullState::NullableNoNulls { union_value_byte } => out
750                .write_all(&[*union_value_byte])
751                .map_err(|e| AvroError::IoError(format!("write union value branch: {e}"), e))?,
752            NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
753                return write_optional_index(out, true, *null_order); // no value to write
754            }
755            NullState::Nullable { null_order, .. } => {
756                write_optional_index(out, false, *null_order)?;
757            }
758        }
759        self.encoder.encode(out, idx)
760    }
761}
762
763fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
764    let nulls_first = null_order == Nullability::default();
765    if nulls_first == is_null { 0x00 } else { 0x02 }
766}
767
768/// Per‑site encoder plan for a field. This mirrors the Avro structure, so nested
769/// optional branch order can be honored exactly as declared by the schema.
770#[derive(Debug, Clone)]
771enum FieldPlan {
772    /// Non-nested scalar/logical type
773    Scalar,
774    /// Record/Struct with Avro‑ordered children
775    Struct { bindings: Vec<FieldBinding> },
776    /// Array with item‑site nullability and nested plan
777    List {
778        items_nullability: Option<Nullability>,
779        item_plan: Box<FieldPlan>,
780    },
781    /// Avro decimal logical type (bytes or fixed). `size=None` => bytes(decimal), `Some(n)` => fixed(n)
782    Decimal { size: Option<usize> },
783    /// Avro UUID logical type (fixed)
784    Uuid,
785    /// Avro map with value‑site nullability and nested plan
786    Map {
787        values_nullability: Option<Nullability>,
788        value_plan: Box<FieldPlan>,
789    },
790    /// Avro enum; maps to Arrow Dictionary<Int32, Utf8> with dictionary values
791    /// exactly equal and ordered as the Avro enum `symbols`.
792    Enum { symbols: Arc<[String]> },
793    /// Avro union, maps to Arrow Union.
794    Union { bindings: Vec<FieldBinding> },
795    /// Avro RunEndEncoded site. Values are encoded per logical row by mapping the
796    /// row index to its containing run and emitting that run's value with `value_plan`.
797    RunEndEncoded {
798        values_nullability: Option<Nullability>,
799        value_plan: Box<FieldPlan>,
800    },
801    /// Standard Avro `duration` logical type (`fixed(12)`, three LE u32 values).
802    /// Used when `Codec::Interval` is resolved, ensuring the 12-byte duration encoder
803    /// is selected regardless of the `avro_custom_types` feature flag.
804    Duration,
805    /// Arrow `Time32(Second)` mapped to Avro `time-millis` — values must be scaled × 1000.
806    /// Used when `Codec::TimeMillis` is resolved for a `Time32(Second)` Arrow column,
807    /// ensuring the scaling encoder is selected regardless of the `avro_custom_types` feature flag.
808    TimeMillisFromSecs,
809}
810
811#[derive(Debug, Clone)]
812struct FieldBinding {
813    /// Index of the Arrow field/column associated with this Avro field site
814    arrow_index: usize,
815    /// Nullability/order for this site (None for required fields)
816    nullability: Option<Nullability>,
817    /// Nested plan for this site
818    plan: FieldPlan,
819}
820
821/// Builder for `RecordEncoder` write plan
822#[derive(Debug)]
823pub(crate) struct RecordEncoderBuilder<'a> {
824    avro_root: &'a AvroField,
825    arrow_schema: &'a ArrowSchema,
826    fingerprint: Option<Fingerprint>,
827}
828
829impl<'a> RecordEncoderBuilder<'a> {
830    /// Create a new builder from the Avro root and Arrow schema.
831    pub(crate) fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
832        Self {
833            avro_root,
834            arrow_schema,
835            fingerprint: None,
836        }
837    }
838
839    pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
840        self.fingerprint = fingerprint;
841        self
842    }
843
844    /// Build the `RecordEncoder` by walking the Avro **record** root in Avro order,
845    /// resolving each field to an Arrow index by name.
846    pub(crate) fn build(self) -> Result<RecordEncoder, AvroError> {
847        let avro_root_dt = self.avro_root.data_type();
848        let Codec::Struct(root_fields) = avro_root_dt.codec() else {
849            return Err(AvroError::SchemaError(
850                "Top-level Avro schema must be a record/struct".into(),
851            ));
852        };
853        let mut columns = Vec::with_capacity(root_fields.len());
854        for root_field in root_fields.as_ref() {
855            let name = root_field.name();
856            let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
857                AvroError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
858            })?;
859            columns.push(FieldBinding {
860                arrow_index,
861                nullability: root_field.data_type().nullability(),
862                plan: FieldPlan::build(
863                    root_field.data_type(),
864                    self.arrow_schema.field(arrow_index),
865                )?,
866            });
867        }
868        Ok(RecordEncoder {
869            columns,
870            prefix: self.fingerprint.map(|fp| fp.make_prefix()),
871        })
872    }
873}
874
875/// A pre-computed plan for encoding a `RecordBatch` to Avro.
876///
877/// Derived from an Avro schema and an Arrow schema. It maps
878/// top-level Avro fields to Arrow columns and contains a nested encoding plan
879/// for each column.
880#[derive(Debug, Clone)]
881pub(crate) struct RecordEncoder {
882    columns: Vec<FieldBinding>,
883    /// Optional pre-built, variable-length prefix written before each record.
884    prefix: Option<Prefix>,
885}
886
887impl RecordEncoder {
888    fn prepare_for_batch<'a>(
889        &'a self,
890        batch: &'a RecordBatch,
891    ) -> Result<Vec<FieldEncoder<'a>>, AvroError> {
892        let arrays = batch.columns();
893        let mut out = Vec::with_capacity(self.columns.len());
894        for col_plan in self.columns.iter() {
895            let arrow_index = col_plan.arrow_index;
896            let array = arrays.get(arrow_index).ok_or_else(|| {
897                AvroError::SchemaError(format!("Column index {arrow_index} out of range"))
898            })?;
899            #[cfg(not(feature = "avro_custom_types"))]
900            let site_nullability = match &col_plan.plan {
901                FieldPlan::RunEndEncoded { .. } => None,
902                _ => col_plan.nullability,
903            };
904            #[cfg(feature = "avro_custom_types")]
905            let site_nullability = col_plan.nullability;
906            out.push(FieldEncoder::make_encoder(
907                array.as_ref(),
908                &col_plan.plan,
909                site_nullability,
910            )?);
911        }
912        Ok(out)
913    }
914
915    /// Encode a `RecordBatch` using this encoder plan.
916    ///
917    /// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of many small writes.
918    pub(crate) fn encode<W: Write>(
919        &self,
920        out: &mut W,
921        batch: &RecordBatch,
922    ) -> Result<(), AvroError> {
923        let mut column_encoders = self.prepare_for_batch(batch)?;
924        let n = batch.num_rows();
925        let prefix = self.prefix.as_ref().map(|p| p.as_slice());
926        for_rows_with_prefix!(n, prefix, out, |row| {
927            for enc in column_encoders.iter_mut() {
928                enc.encode(out, row)?;
929            }
930        });
931        Ok(())
932    }
933
934    /// Encode rows into a single contiguous `BytesMut` and append row-end offsets.
935    ///
936    /// # Invariants
937    ///
938    /// * `offsets` must be non-empty and seeded with `0` at index 0.
939    /// * `offsets.last()` must equal `out.len()` on entry.
940    /// * On success, exactly `batch.num_rows()` additional offsets are pushed, and
941    ///   `offsets.last()` equals the new `out.len()`.
942    pub(crate) fn encode_rows(
943        &self,
944        batch: &RecordBatch,
945        row_capacity: usize,
946        out: &mut BytesMut,
947        offsets: &mut Vec<usize>,
948    ) -> Result<(), AvroError> {
949        let out_len = out.len();
950        if offsets.first() != Some(&0) || offsets.last() != Some(&out_len) {
951            return Err(AvroError::General(
952                "encode_rows requires offsets to start with 0 and end at out.len()".to_string(),
953            ));
954        }
955        let n = batch.num_rows();
956        if n == 0 {
957            return Ok(());
958        }
959        if offsets.len().checked_add(n).is_none() {
960            return Err(AvroError::General(
961                "encode_rows cannot append offsets: too many rows".to_string(),
962            ));
963        }
964        let mut column_encoders = self.prepare_for_batch(batch)?;
965        offsets.reserve(n);
966        let prefix_bytes = self.prefix.as_ref().map(|p| p.as_slice());
967        let prefix_len = prefix_bytes.map_or(0, |p| p.len());
968        let per_row_hint = row_capacity.max(prefix_len);
969        if let Some(additional) = n
970            .checked_mul(per_row_hint)
971            .filter(|&a| out_len.checked_add(a).is_some())
972        {
973            out.reserve(additional);
974        }
975        let start_out_len = out.len();
976        let start_offsets_len = offsets.len();
977        let res = (|| -> Result<(), AvroError> {
978            let mut w = out.writer();
979            if let [enc0] = column_encoders.as_mut_slice() {
980                for_rows_with_prefix!(n, prefix_bytes, w, |row| {
981                    enc0.encode(&mut w, row)?;
982                    offsets.push(w.get_ref().len());
983                });
984            } else {
985                for_rows_with_prefix!(n, prefix_bytes, w, |row| {
986                    for enc in column_encoders.iter_mut() {
987                        enc.encode(&mut w, row)?;
988                    }
989                    offsets.push(w.get_ref().len());
990                });
991            }
992            Ok(())
993        })();
994        if res.is_err() {
995            out.truncate(start_out_len);
996            offsets.truncate(start_offsets_len);
997        } else {
998            debug_assert_eq!(
999                *offsets.last().unwrap(),
1000                out.len(),
1001                "encode_rows: offsets/out length mismatch after successful encode"
1002            );
1003        }
1004        res
1005    }
1006}
1007
1008fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
1009    fields.iter().position(|f| f.name() == name)
1010}
1011
1012fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
1013    // Prefer common Arrow field names; fall back to second child if exactly two
1014    find_struct_child_index(fields, "value")
1015        .or_else(|| find_struct_child_index(fields, "values"))
1016        .or_else(|| if fields.len() == 2 { Some(1) } else { None })
1017}
1018
1019impl FieldPlan {
1020    fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, AvroError> {
1021        #[cfg(not(feature = "avro_custom_types"))]
1022        if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() {
1023            let values_nullability = avro_dt.nullability();
1024            let value_site_dt: &AvroDataType = match avro_dt.codec() {
1025                Codec::Union(branches, _, _) => branches
1026                    .iter()
1027                    .find(|b| !matches!(b.codec(), Codec::Null))
1028                    .ok_or_else(|| {
1029                        AvroError::SchemaError(
1030                            "Avro union at RunEndEncoded site has no non-null branch".into(),
1031                        )
1032                    })?,
1033                _ => avro_dt,
1034            };
1035            return Ok(FieldPlan::RunEndEncoded {
1036                values_nullability,
1037                value_plan: Box::new(FieldPlan::build(value_site_dt, values_field.as_ref())?),
1038            });
1039        }
1040        if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
1041            // Extension-based detection (only when the feature is enabled)
1042            let ext_is_uuid = {
1043                #[cfg(feature = "canonical_extension_types")]
1044                {
1045                    matches!(
1046                        arrow_field.extension_type_name(),
1047                        Some("arrow.uuid") | Some("uuid")
1048                    )
1049                }
1050                #[cfg(not(feature = "canonical_extension_types"))]
1051                {
1052                    false
1053                }
1054            };
1055            let md_is_uuid = arrow_field
1056                .metadata()
1057                .get("logicalType")
1058                .map(|s| s.as_str())
1059                == Some("uuid");
1060            if ext_is_uuid || md_is_uuid {
1061                if *len != 16 {
1062                    return Err(AvroError::InvalidArgument(
1063                        "logicalType=uuid requires FixedSizeBinary(16)".into(),
1064                    ));
1065                }
1066                return Ok(FieldPlan::Uuid);
1067            }
1068        }
1069        match avro_dt.codec() {
1070            Codec::Struct(avro_fields) => {
1071                let fields = match arrow_field.data_type() {
1072                    DataType::Struct(struct_fields) => struct_fields,
1073                    other => {
1074                        return Err(AvroError::SchemaError(format!(
1075                            "Avro struct maps to Arrow Struct, found: {other:?}"
1076                        )));
1077                    }
1078                };
1079                let mut bindings = Vec::with_capacity(avro_fields.len());
1080                for avro_field in avro_fields.iter() {
1081                    let name = avro_field.name().to_string();
1082                    let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
1083                        AvroError::SchemaError(format!(
1084                            "Struct field '{name}' not present in Arrow field '{}'",
1085                            arrow_field.name()
1086                        ))
1087                    })?;
1088                    bindings.push(FieldBinding {
1089                        arrow_index: idx,
1090                        nullability: avro_field.data_type().nullability(),
1091                        plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
1092                    });
1093                }
1094                Ok(FieldPlan::Struct { bindings })
1095            }
1096            Codec::List(items_dt) => match arrow_field.data_type() {
1097                DataType::List(field_ref)
1098                | DataType::LargeList(field_ref)
1099                | DataType::ListView(field_ref)
1100                | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
1101                    items_nullability: items_dt.nullability(),
1102                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
1103                }),
1104                DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List {
1105                    items_nullability: items_dt.nullability(),
1106                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
1107                }),
1108                other => Err(AvroError::SchemaError(format!(
1109                    "Avro array maps to Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
1110                ))),
1111            },
1112            Codec::Map(values_dt) => {
1113                let entries_field = match arrow_field.data_type() {
1114                    DataType::Map(entries, _sorted) => entries.as_ref(),
1115                    other => {
1116                        return Err(AvroError::SchemaError(format!(
1117                            "Avro map maps to Arrow DataType::Map, found: {other:?}"
1118                        )));
1119                    }
1120                };
1121                let entries_struct_fields = match entries_field.data_type() {
1122                    DataType::Struct(fs) => fs,
1123                    other => {
1124                        return Err(AvroError::SchemaError(format!(
1125                            "Arrow Map entries must be Struct, found: {other:?}"
1126                        )));
1127                    }
1128                };
1129                let value_idx =
1130                    find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
1131                        AvroError::SchemaError("Map entries struct missing value field".into())
1132                    })?;
1133                let value_field = entries_struct_fields[value_idx].as_ref();
1134                let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
1135                Ok(FieldPlan::Map {
1136                    values_nullability: values_dt.nullability(),
1137                    value_plan: Box::new(value_plan),
1138                })
1139            }
1140            Codec::Enum(symbols) => match arrow_field.data_type() {
1141                DataType::Dictionary(key_dt, value_dt) => {
1142                    if **key_dt != DataType::Int32 {
1143                        return Err(AvroError::SchemaError(
1144                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
1145                        ));
1146                    }
1147                    if **value_dt != DataType::Utf8 {
1148                        return Err(AvroError::SchemaError(
1149                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
1150                        ));
1151                    }
1152                    Ok(FieldPlan::Enum {
1153                        symbols: symbols.clone(),
1154                    })
1155                }
1156                other => Err(AvroError::SchemaError(format!(
1157                    "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
1158                ))),
1159            },
1160            // decimal site (bytes or fixed(N)) with precision/scale validation
1161            Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
1162                let (ap, as_) = match arrow_field.data_type() {
1163                    #[cfg(feature = "small_decimals")]
1164                    DataType::Decimal32(p, s) => (*p as usize, *s as i32),
1165                    #[cfg(feature = "small_decimals")]
1166                    DataType::Decimal64(p, s) => (*p as usize, *s as i32),
1167                    DataType::Decimal128(p, s) => (*p as usize, *s as i32),
1168                    DataType::Decimal256(p, s) => (*p as usize, *s as i32),
1169                    other => {
1170                        return Err(AvroError::SchemaError(format!(
1171                            "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
1172                            arrow_field.name()
1173                        )));
1174                    }
1175                };
1176                let sc = scale_opt.unwrap_or(0) as i32; // Avro scale defaults to 0 if absent
1177                if ap != *precision || as_ != sc {
1178                    return Err(AvroError::SchemaError(format!(
1179                        "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
1180                        arrow_field.name()
1181                    )));
1182                }
1183                Ok(FieldPlan::Decimal {
1184                    size: *fixed_size_opt,
1185                })
1186            }
1187            Codec::Interval => match arrow_field.data_type() {
1188                DataType::Interval(
1189                    IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime,
1190                ) => Ok(FieldPlan::Duration),
1191                other => Err(AvroError::SchemaError(format!(
1192                    "Avro 'duration' logical type requires an Arrow Interval (MonthDayNano, YearMonth, or DayTime), found: {other:?}"
1193                ))),
1194            },
1195            Codec::Union(avro_branches, _, UnionMode::Dense) => {
1196                let arrow_union_fields = match arrow_field.data_type() {
1197                    DataType::Union(fields, UnionMode::Dense) => fields,
1198                    DataType::Union(_, UnionMode::Sparse) => {
1199                        return Err(AvroError::NYI(
1200                            "Sparse Arrow unions are not yet supported".to_string(),
1201                        ));
1202                    }
1203                    other => {
1204                        return Err(AvroError::SchemaError(format!(
1205                            "Avro union maps to Arrow Union, found: {other:?}"
1206                        )));
1207                    }
1208                };
1209                if avro_branches.len() != arrow_union_fields.len() {
1210                    return Err(AvroError::SchemaError(format!(
1211                        "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'",
1212                        avro_branches.len(),
1213                        arrow_union_fields.len(),
1214                        arrow_field.name()
1215                    )));
1216                }
1217                let bindings = avro_branches
1218                    .iter()
1219                    .zip(arrow_union_fields.iter())
1220                    .enumerate()
1221                    .map(|(i, (avro_branch, (_, arrow_child_field)))| {
1222                        Ok(FieldBinding {
1223                            arrow_index: i,
1224                            nullability: avro_branch.nullability(),
1225                            plan: FieldPlan::build(avro_branch, arrow_child_field)?,
1226                        })
1227                    })
1228                    .collect::<Result<Vec<_>, AvroError>>()?;
1229                Ok(FieldPlan::Union { bindings })
1230            }
1231            Codec::Union(_, _, UnionMode::Sparse) => Err(AvroError::NYI(
1232                "Sparse Arrow unions are not yet supported".to_string(),
1233            )),
1234            #[cfg(feature = "avro_custom_types")]
1235            Codec::RunEndEncoded(values_dt, _width_code) => {
1236                let values_field = match arrow_field.data_type() {
1237                    DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(),
1238                    other => {
1239                        return Err(AvroError::SchemaError(format!(
1240                            "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}"
1241                        )));
1242                    }
1243                };
1244                Ok(FieldPlan::RunEndEncoded {
1245                    values_nullability: values_dt.nullability(),
1246                    value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), values_field)?),
1247                })
1248            }
1249            Codec::TimeMillis => match arrow_field.data_type() {
1250                DataType::Time32(TimeUnit::Second) => Ok(FieldPlan::TimeMillisFromSecs),
1251                _ => Ok(FieldPlan::Scalar),
1252            },
1253            _ => Ok(FieldPlan::Scalar),
1254        }
1255    }
1256}
1257
1258enum Encoder<'a> {
1259    Boolean(BooleanEncoder<'a>),
1260    Int(IntEncoder<'a, Int32Type>),
1261    Long(LongEncoder<'a, Int64Type>),
1262    TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
1263    TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
1264    TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
1265    #[cfg(not(feature = "avro_custom_types"))]
1266    TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
1267    Date32(IntEncoder<'a, Date32Type>),
1268    Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
1269    Time32Millis(IntEncoder<'a, Time32MillisecondType>),
1270    Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
1271    DurationSeconds(LongEncoder<'a, DurationSecondType>),
1272    DurationMillis(LongEncoder<'a, DurationMillisecondType>),
1273    DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
1274    DurationNanos(LongEncoder<'a, DurationNanosecondType>),
1275    Float32(F32Encoder<'a>),
1276    Float64(F64Encoder<'a>),
1277    Binary(BinaryEncoder<'a, i32>),
1278    LargeBinary(BinaryEncoder<'a, i64>),
1279    Utf8(Utf8Encoder<'a>),
1280    Utf8Large(Utf8LargeEncoder<'a>),
1281    Utf8View(Utf8ViewEncoder<'a>),
1282    BinaryView(BinaryViewEncoder<'a>),
1283    List(Box<ListEncoder32<'a>>),
1284    LargeList(Box<ListEncoder64<'a>>),
1285    ListView(Box<ListViewEncoder32<'a>>),
1286    LargeListView(Box<ListViewEncoder64<'a>>),
1287    FixedSizeList(Box<FixedSizeListEncoder<'a>>),
1288    Struct(Box<StructEncoder<'a>>),
1289    /// Avro `fixed` encoder (raw bytes, no length)
1290    Fixed(FixedEncoder<'a>),
1291    /// Avro `uuid` logical type encoder (string with RFC‑4122 hyphenated text)
1292    Uuid(UuidEncoder<'a>),
1293    /// Avro `duration` logical type encoder (`fixed(12)` months/days/millis) for MonthDayNano.
1294    IntervalMonthDayNanoDuration(DurationEncoder<'a, IntervalMonthDayNanoType>),
1295    /// Arrow Interval(MonthDayNano) custom logical type encoder (`fixed(16)` months/days/nanos)
1296    #[cfg(feature = "avro_custom_types")]
1297    IntervalMonthDayNanoFixed(IntervalMonthDayNanoFixedEncoder<'a>),
1298    /// Avro `duration` logical type encoder (`fixed(12)` months/days/millis) for YearMonth.
1299    IntervalYearMonthDuration(DurationEncoder<'a, IntervalYearMonthType>),
1300    /// Avro `duration` logical type encoder (`fixed(12)` months/days/millis) for DayTime.
1301    IntervalDayTimeDuration(DurationEncoder<'a, IntervalDayTimeType>),
1302    #[cfg(feature = "small_decimals")]
1303    Decimal32(Decimal32Encoder<'a>),
1304    #[cfg(feature = "small_decimals")]
1305    Decimal64(Decimal64Encoder<'a>),
1306    Decimal128(Decimal128Encoder<'a>),
1307    Decimal256(Decimal256Encoder<'a>),
1308    /// Avro `enum` encoder: writes the key (int) as the enum index.
1309    Enum(EnumEncoder<'a>),
1310    Map(Box<MapEncoder<'a>>),
1311    Union(Box<UnionEncoder<'a>>),
1312    /// Run-end encoded values with specific run-end index widths
1313    RunEncoded16(Box<RunEncodedEncoder16<'a>>),
1314    RunEncoded32(Box<RunEncodedEncoder32<'a>>),
1315    RunEncoded64(Box<RunEncodedEncoder64<'a>>),
1316    Null,
1317    #[cfg(feature = "avro_custom_types")]
1318    Int8(Int8Encoder<'a>),
1319    #[cfg(feature = "avro_custom_types")]
1320    Int16(Int16Encoder<'a>),
1321    #[cfg(feature = "avro_custom_types")]
1322    UInt8(UInt8Encoder<'a>),
1323    #[cfg(feature = "avro_custom_types")]
1324    UInt16(UInt16Encoder<'a>),
1325    #[cfg(feature = "avro_custom_types")]
1326    UInt32(UInt32Encoder<'a>),
1327    #[cfg(feature = "avro_custom_types")]
1328    UInt64Fixed(UInt64FixedEncoder<'a>),
1329    #[cfg(feature = "avro_custom_types")]
1330    Float16Fixed(Float16FixedEncoder<'a>),
1331    #[cfg(feature = "avro_custom_types")]
1332    Date64(LongEncoder<'a, Date64Type>),
1333    #[cfg(feature = "avro_custom_types")]
1334    Time64Nanos(LongEncoder<'a, Time64NanosecondType>),
1335    #[cfg(feature = "avro_custom_types")]
1336    Time32Secs(IntEncoder<'a, Time32SecondType>),
1337    #[cfg(feature = "avro_custom_types")]
1338    TimestampSecs(LongEncoder<'a, TimestampSecondType>),
1339    #[cfg(feature = "avro_custom_types")]
1340    IntervalYearMonthFixed(IntervalYearMonthFixedEncoder<'a>),
1341    #[cfg(feature = "avro_custom_types")]
1342    IntervalDayTimeFixed(IntervalDayTimeFixedEncoder<'a>),
1343    #[cfg(not(feature = "avro_custom_types"))]
1344    Int8ToInt(Int8ToIntEncoder<'a>),
1345    #[cfg(not(feature = "avro_custom_types"))]
1346    Int16ToInt(Int16ToIntEncoder<'a>),
1347    #[cfg(not(feature = "avro_custom_types"))]
1348    UInt8ToInt(UInt8ToIntEncoder<'a>),
1349    #[cfg(not(feature = "avro_custom_types"))]
1350    UInt16ToInt(UInt16ToIntEncoder<'a>),
1351    #[cfg(not(feature = "avro_custom_types"))]
1352    UInt32ToLong(UInt32ToLongEncoder<'a>),
1353    #[cfg(not(feature = "avro_custom_types"))]
1354    UInt64ToLong(UInt64ToLongEncoder<'a>),
1355    #[cfg(not(feature = "avro_custom_types"))]
1356    Float16ToFloat(Float16ToFloatEncoder<'a>),
1357    #[cfg(not(feature = "avro_custom_types"))]
1358    Date64ToLong(Date64ToLongEncoder<'a>),
1359    #[cfg(not(feature = "avro_custom_types"))]
1360    Time64NanosToMicros(Time64NanosToMicrosEncoder<'a>),
1361}
1362
1363impl<'a> Encoder<'a> {
1364    /// Encode the value at `idx`.
1365    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1366        match self {
1367            Encoder::Boolean(e) => e.encode(out, idx),
1368            Encoder::Int(e) => e.encode(out, idx),
1369            Encoder::Long(e) => e.encode(out, idx),
1370            Encoder::TimestampMicros(e) => e.encode(out, idx),
1371            Encoder::TimestampMillis(e) => e.encode(out, idx),
1372            Encoder::TimestampNanos(e) => e.encode(out, idx),
1373            #[cfg(not(feature = "avro_custom_types"))]
1374            Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
1375            Encoder::Date32(e) => e.encode(out, idx),
1376            Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
1377            Encoder::Time32Millis(e) => e.encode(out, idx),
1378            Encoder::Time64Micros(e) => e.encode(out, idx),
1379            Encoder::DurationSeconds(e) => e.encode(out, idx),
1380            Encoder::DurationMicros(e) => e.encode(out, idx),
1381            Encoder::DurationMillis(e) => e.encode(out, idx),
1382            Encoder::DurationNanos(e) => e.encode(out, idx),
1383            Encoder::Float32(e) => e.encode(out, idx),
1384            Encoder::Float64(e) => e.encode(out, idx),
1385            Encoder::Binary(e) => e.encode(out, idx),
1386            Encoder::LargeBinary(e) => e.encode(out, idx),
1387            Encoder::Utf8(e) => e.encode(out, idx),
1388            Encoder::Utf8Large(e) => e.encode(out, idx),
1389            Encoder::Utf8View(e) => e.encode(out, idx),
1390            Encoder::BinaryView(e) => e.encode(out, idx),
1391            Encoder::List(e) => e.encode(out, idx),
1392            Encoder::LargeList(e) => e.encode(out, idx),
1393            Encoder::ListView(e) => e.encode(out, idx),
1394            Encoder::LargeListView(e) => e.encode(out, idx),
1395            Encoder::FixedSizeList(e) => e.encode(out, idx),
1396            Encoder::Struct(e) => e.encode(out, idx),
1397            Encoder::Fixed(e) => (e).encode(out, idx),
1398            Encoder::Uuid(e) => (e).encode(out, idx),
1399            Encoder::IntervalMonthDayNanoDuration(e) => e.encode(out, idx),
1400            #[cfg(feature = "avro_custom_types")]
1401            Encoder::IntervalMonthDayNanoFixed(e) => e.encode(out, idx),
1402            Encoder::IntervalYearMonthDuration(e) => e.encode(out, idx),
1403            Encoder::IntervalDayTimeDuration(e) => e.encode(out, idx),
1404            #[cfg(feature = "small_decimals")]
1405            Encoder::Decimal32(e) => (e).encode(out, idx),
1406            #[cfg(feature = "small_decimals")]
1407            Encoder::Decimal64(e) => (e).encode(out, idx),
1408            Encoder::Decimal128(e) => (e).encode(out, idx),
1409            Encoder::Decimal256(e) => (e).encode(out, idx),
1410            Encoder::Map(e) => (e).encode(out, idx),
1411            Encoder::Enum(e) => (e).encode(out, idx),
1412            Encoder::Union(e) => (e).encode(out, idx),
1413            Encoder::RunEncoded16(e) => (e).encode(out, idx),
1414            Encoder::RunEncoded32(e) => (e).encode(out, idx),
1415            Encoder::RunEncoded64(e) => (e).encode(out, idx),
1416            Encoder::Null => Ok(()),
1417            #[cfg(feature = "avro_custom_types")]
1418            Encoder::Int8(e) => e.encode(out, idx),
1419            #[cfg(feature = "avro_custom_types")]
1420            Encoder::Int16(e) => e.encode(out, idx),
1421            #[cfg(feature = "avro_custom_types")]
1422            Encoder::UInt8(e) => e.encode(out, idx),
1423            #[cfg(feature = "avro_custom_types")]
1424            Encoder::UInt16(e) => e.encode(out, idx),
1425            #[cfg(feature = "avro_custom_types")]
1426            Encoder::UInt32(e) => e.encode(out, idx),
1427            #[cfg(feature = "avro_custom_types")]
1428            Encoder::UInt64Fixed(e) => e.encode(out, idx),
1429            #[cfg(feature = "avro_custom_types")]
1430            Encoder::Float16Fixed(e) => e.encode(out, idx),
1431            #[cfg(feature = "avro_custom_types")]
1432            Encoder::Date64(e) => e.encode(out, idx),
1433            #[cfg(feature = "avro_custom_types")]
1434            Encoder::Time64Nanos(e) => e.encode(out, idx),
1435            #[cfg(feature = "avro_custom_types")]
1436            Encoder::Time32Secs(e) => e.encode(out, idx),
1437            #[cfg(feature = "avro_custom_types")]
1438            Encoder::TimestampSecs(e) => e.encode(out, idx),
1439            #[cfg(feature = "avro_custom_types")]
1440            Encoder::IntervalYearMonthFixed(e) => e.encode(out, idx),
1441            #[cfg(feature = "avro_custom_types")]
1442            Encoder::IntervalDayTimeFixed(e) => e.encode(out, idx),
1443            #[cfg(not(feature = "avro_custom_types"))]
1444            Encoder::Int8ToInt(e) => e.encode(out, idx),
1445            #[cfg(not(feature = "avro_custom_types"))]
1446            Encoder::Int16ToInt(e) => e.encode(out, idx),
1447            #[cfg(not(feature = "avro_custom_types"))]
1448            Encoder::UInt8ToInt(e) => e.encode(out, idx),
1449            #[cfg(not(feature = "avro_custom_types"))]
1450            Encoder::UInt16ToInt(e) => e.encode(out, idx),
1451            #[cfg(not(feature = "avro_custom_types"))]
1452            Encoder::UInt32ToLong(e) => e.encode(out, idx),
1453            #[cfg(not(feature = "avro_custom_types"))]
1454            Encoder::UInt64ToLong(e) => e.encode(out, idx),
1455            #[cfg(not(feature = "avro_custom_types"))]
1456            Encoder::Float16ToFloat(e) => e.encode(out, idx),
1457            #[cfg(not(feature = "avro_custom_types"))]
1458            Encoder::Date64ToLong(e) => e.encode(out, idx),
1459            #[cfg(not(feature = "avro_custom_types"))]
1460            Encoder::Time64NanosToMicros(e) => e.encode(out, idx),
1461        }
1462    }
1463}
1464
1465struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
1466impl BooleanEncoder<'_> {
1467    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1468        write_bool(out, self.0.value(idx))
1469    }
1470}
1471
1472/// Generic Avro `int` encoder for primitive arrays with `i32` native values.
1473struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
1474impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
1475    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1476        write_int(out, self.0.value(idx))
1477    }
1478}
1479
1480/// Generic Avro `long` encoder for primitive arrays with `i64` native values.
1481struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
1482impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
1483    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1484        write_long(out, self.0.value(idx))
1485    }
1486}
1487
1488/// Time32(Second) to Avro time-millis (int), via safe scaling by 1000
1489struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
1490impl<'a> Time32SecondsToMillisEncoder<'a> {
1491    #[inline]
1492    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1493        let secs = self.0.value(idx);
1494        let millis = secs
1495            .checked_mul(1000)
1496            .ok_or_else(|| AvroError::InvalidArgument("time32(secs) * 1000 overflowed".into()))?;
1497        write_int(out, millis)
1498    }
1499}
1500
1501/// Timestamp(Second) to Avro timestamp-millis (long), via safe scaling by 1000
1502#[cfg(not(feature = "avro_custom_types"))]
1503struct TimestampSecondsToMillisEncoder<'a>(&'a PrimitiveArray<TimestampSecondType>);
1504#[cfg(not(feature = "avro_custom_types"))]
1505impl<'a> TimestampSecondsToMillisEncoder<'a> {
1506    #[inline]
1507    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1508        let secs = self.0.value(idx);
1509        let millis = secs.checked_mul(1000).ok_or_else(|| {
1510            AvroError::InvalidArgument("timestamp(secs) * 1000 overflowed".into())
1511        })?;
1512        write_long(out, millis)
1513    }
1514}
1515
1516/// Int8 to Avro int encoder (converts i8 to i32)
1517#[cfg(feature = "avro_custom_types")]
1518struct Int8Encoder<'a>(&'a PrimitiveArray<Int8Type>);
1519#[cfg(feature = "avro_custom_types")]
1520impl Int8Encoder<'_> {
1521    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1522        write_int(out, self.0.value(idx) as i32)
1523    }
1524}
1525
1526/// Int16 to Avro int encoder (converts i16 to i32)
1527#[cfg(feature = "avro_custom_types")]
1528struct Int16Encoder<'a>(&'a PrimitiveArray<Int16Type>);
1529#[cfg(feature = "avro_custom_types")]
1530impl Int16Encoder<'_> {
1531    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1532        write_int(out, self.0.value(idx) as i32)
1533    }
1534}
1535
1536/// UInt8 to Avro int encoder (converts u8 to i32)
1537#[cfg(feature = "avro_custom_types")]
1538struct UInt8Encoder<'a>(&'a PrimitiveArray<UInt8Type>);
1539#[cfg(feature = "avro_custom_types")]
1540impl UInt8Encoder<'_> {
1541    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1542        write_int(out, self.0.value(idx) as i32)
1543    }
1544}
1545
1546/// UInt16 to Avro int encoder (converts u16 to i32)
1547#[cfg(feature = "avro_custom_types")]
1548struct UInt16Encoder<'a>(&'a PrimitiveArray<UInt16Type>);
1549#[cfg(feature = "avro_custom_types")]
1550impl UInt16Encoder<'_> {
1551    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1552        write_int(out, self.0.value(idx) as i32)
1553    }
1554}
1555
1556/// UInt32 to Avro long encoder (converts u32 to i64)
1557#[cfg(feature = "avro_custom_types")]
1558struct UInt32Encoder<'a>(&'a PrimitiveArray<UInt32Type>);
1559#[cfg(feature = "avro_custom_types")]
1560impl UInt32Encoder<'_> {
1561    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1562        write_long(out, self.0.value(idx) as i64)
1563    }
1564}
1565
1566/// UInt64 to Avro fixed(8) encoder (little-endian bytes)
1567#[cfg(feature = "avro_custom_types")]
1568struct UInt64FixedEncoder<'a>(&'a PrimitiveArray<UInt64Type>);
1569#[cfg(feature = "avro_custom_types")]
1570impl UInt64FixedEncoder<'_> {
1571    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1572        let v = self.0.value(idx);
1573        out.write_all(&v.to_le_bytes())?;
1574        Ok(())
1575    }
1576}
1577
1578/// Float16 to Avro fixed(2) encoder (IEEE-754 little-endian bits)
1579#[cfg(feature = "avro_custom_types")]
1580struct Float16FixedEncoder<'a>(&'a Float16Array);
1581#[cfg(feature = "avro_custom_types")]
1582impl Float16FixedEncoder<'_> {
1583    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1584        let v = self.0.value(idx);
1585        out.write_all(&v.to_le_bytes())?;
1586        Ok(())
1587    }
1588}
1589
1590/// Interval(MonthDayNano) encoder for the Arrow-specific logical type
1591/// `arrow.interval-month-day-nano`.
1592///
1593/// The representation is `fixed(16)` containing:
1594/// - i32 months (LE)
1595/// - i32 days (LE)
1596/// - i64 nanoseconds (LE)
1597#[cfg(feature = "avro_custom_types")]
1598struct IntervalMonthDayNanoFixedEncoder<'a>(&'a PrimitiveArray<IntervalMonthDayNanoType>);
1599#[cfg(feature = "avro_custom_types")]
1600impl IntervalMonthDayNanoFixedEncoder<'_> {
1601    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1602        let v = self.0.value(idx);
1603        let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(v);
1604        out.write_all(&months.to_le_bytes())?;
1605        out.write_all(&days.to_le_bytes())?;
1606        out.write_all(&nanos.to_le_bytes())?;
1607        Ok(())
1608    }
1609}
1610
1611/// Interval(YearMonth) to Avro fixed(4) encoder (little-endian i32 months)
1612#[cfg(feature = "avro_custom_types")]
1613struct IntervalYearMonthFixedEncoder<'a>(&'a PrimitiveArray<IntervalYearMonthType>);
1614#[cfg(feature = "avro_custom_types")]
1615impl IntervalYearMonthFixedEncoder<'_> {
1616    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1617        let months = self.0.value(idx);
1618        out.write_all(&months.to_le_bytes())?;
1619        Ok(())
1620    }
1621}
1622
1623/// Interval(DayTime) to Avro fixed(8) encoder (little-endian i32 days + i32 milliseconds)
1624#[cfg(feature = "avro_custom_types")]
1625struct IntervalDayTimeFixedEncoder<'a>(&'a PrimitiveArray<IntervalDayTimeType>);
1626#[cfg(feature = "avro_custom_types")]
1627impl IntervalDayTimeFixedEncoder<'_> {
1628    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1629        let dt = self.0.value(idx);
1630        out.write_all(&dt.days.to_le_bytes())?;
1631        out.write_all(&dt.milliseconds.to_le_bytes())?;
1632        Ok(())
1633    }
1634}
1635
1636/// Int8 to Avro int encoder (widens i8 to i32)
1637#[cfg(not(feature = "avro_custom_types"))]
1638struct Int8ToIntEncoder<'a>(&'a PrimitiveArray<Int8Type>);
1639#[cfg(not(feature = "avro_custom_types"))]
1640impl Int8ToIntEncoder<'_> {
1641    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1642        write_int(out, self.0.value(idx) as i32)
1643    }
1644}
1645
1646/// Int16 to Avro int encoder (widens i16 to i32)
1647#[cfg(not(feature = "avro_custom_types"))]
1648struct Int16ToIntEncoder<'a>(&'a PrimitiveArray<Int16Type>);
1649#[cfg(not(feature = "avro_custom_types"))]
1650impl Int16ToIntEncoder<'_> {
1651    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1652        write_int(out, self.0.value(idx) as i32)
1653    }
1654}
1655
1656/// UInt8 to Avro int encoder (widens u8 to i32)
1657#[cfg(not(feature = "avro_custom_types"))]
1658struct UInt8ToIntEncoder<'a>(&'a PrimitiveArray<UInt8Type>);
1659#[cfg(not(feature = "avro_custom_types"))]
1660impl UInt8ToIntEncoder<'_> {
1661    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1662        write_int(out, self.0.value(idx) as i32)
1663    }
1664}
1665
1666/// UInt16 to Avro int encoder (widens u16 to i32)
1667#[cfg(not(feature = "avro_custom_types"))]
1668struct UInt16ToIntEncoder<'a>(&'a PrimitiveArray<UInt16Type>);
1669#[cfg(not(feature = "avro_custom_types"))]
1670impl UInt16ToIntEncoder<'_> {
1671    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1672        write_int(out, self.0.value(idx) as i32)
1673    }
1674}
1675
1676/// UInt32 to Avro long encoder (widens u32 to i64)
1677#[cfg(not(feature = "avro_custom_types"))]
1678struct UInt32ToLongEncoder<'a>(&'a PrimitiveArray<UInt32Type>);
1679#[cfg(not(feature = "avro_custom_types"))]
1680impl UInt32ToLongEncoder<'_> {
1681    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1682        write_long(out, self.0.value(idx) as i64)
1683    }
1684}
1685
1686/// UInt64 to Avro long encoder (widens u64 to i64, errors if > i64::MAX)
1687#[cfg(not(feature = "avro_custom_types"))]
1688struct UInt64ToLongEncoder<'a>(&'a PrimitiveArray<UInt64Type>);
1689#[cfg(not(feature = "avro_custom_types"))]
1690impl UInt64ToLongEncoder<'_> {
1691    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1692        let v = self.0.value(idx);
1693        if v > i64::MAX as u64 {
1694            return Err(AvroError::InvalidArgument(format!(
1695                "UInt64 value {v} exceeds i64::MAX; enable avro_custom_types feature for full UInt64 support",
1696            )));
1697        }
1698        write_long(out, v as i64)
1699    }
1700}
1701
1702/// Float16 to Avro float encoder (widens f16 to f32)
1703#[cfg(not(feature = "avro_custom_types"))]
1704struct Float16ToFloatEncoder<'a>(&'a Float16Array);
1705#[cfg(not(feature = "avro_custom_types"))]
1706impl Float16ToFloatEncoder<'_> {
1707    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1708        out.write_all(&self.0.value(idx).to_f32().to_bits().to_le_bytes())?;
1709        Ok(())
1710    }
1711}
1712
1713/// Date64 to Avro long encoder (milliseconds since epoch)
1714#[cfg(not(feature = "avro_custom_types"))]
1715struct Date64ToLongEncoder<'a>(&'a PrimitiveArray<Date64Type>);
1716#[cfg(not(feature = "avro_custom_types"))]
1717impl Date64ToLongEncoder<'_> {
1718    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1719        write_long(out, self.0.value(idx))
1720    }
1721}
1722
1723/// Time64 nanoseconds to microseconds encoder (truncates ns to us)
1724#[cfg(not(feature = "avro_custom_types"))]
1725struct Time64NanosToMicrosEncoder<'a>(&'a PrimitiveArray<Time64NanosecondType>);
1726#[cfg(not(feature = "avro_custom_types"))]
1727impl Time64NanosToMicrosEncoder<'_> {
1728    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1729        let nanos = self.0.value(idx);
1730        let micros = nanos / 1000;
1731        write_long(out, micros)
1732    }
1733}
1734
1735/// Unified binary encoder generic over offset size (i32/i64).
1736struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
1737impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
1738    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1739        write_len_prefixed(out, self.0.value(idx))
1740    }
1741}
1742
1743/// BinaryView (byte view) encoder.
1744struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
1745impl BinaryViewEncoder<'_> {
1746    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1747        write_len_prefixed(out, self.0.value(idx))
1748    }
1749}
1750
1751/// StringView encoder.
1752struct Utf8ViewEncoder<'a>(&'a StringViewArray);
1753impl Utf8ViewEncoder<'_> {
1754    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1755        write_len_prefixed(out, self.0.value(idx).as_bytes())
1756    }
1757}
1758
1759struct F32Encoder<'a>(&'a arrow_array::Float32Array);
1760impl F32Encoder<'_> {
1761    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1762        // Avro float: 4 bytes, IEEE-754 little-endian
1763        out.write_all(&self.0.value(idx).to_bits().to_le_bytes())?;
1764        Ok(())
1765    }
1766}
1767
1768struct F64Encoder<'a>(&'a arrow_array::Float64Array);
1769impl F64Encoder<'_> {
1770    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1771        // Avro double: 8 bytes, IEEE-754 little-endian
1772        out.write_all(&self.0.value(idx).to_bits().to_le_bytes())
1773            .map_err(Into::into)
1774    }
1775}
1776
1777struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
1778
1779impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
1780    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1781        write_len_prefixed(out, self.0.value(idx).as_bytes())
1782    }
1783}
1784
1785type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
1786type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
1787
1788/// Internal key array kind used by Map encoder.
1789enum KeyKind<'a> {
1790    Utf8(&'a GenericStringArray<i32>),
1791    LargeUtf8(&'a GenericStringArray<i64>),
1792}
1793struct MapEncoder<'a> {
1794    map: &'a MapArray,
1795    keys: KeyKind<'a>,
1796    values: FieldEncoder<'a>,
1797    keys_offset: usize,
1798    values_offset: usize,
1799}
1800
1801impl<'a> MapEncoder<'a> {
1802    fn try_new(
1803        map: &'a MapArray,
1804        values_nullability: Option<Nullability>,
1805        value_plan: &FieldPlan,
1806    ) -> Result<Self, AvroError> {
1807        let keys_arr = map.keys();
1808        let keys_kind = match keys_arr.data_type() {
1809            DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
1810            DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
1811            other => {
1812                return Err(AvroError::SchemaError(format!(
1813                    "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
1814                )));
1815            }
1816        };
1817        Ok(Self {
1818            map,
1819            keys: keys_kind,
1820            values: FieldEncoder::make_encoder(
1821                map.values().as_ref(),
1822                value_plan,
1823                values_nullability,
1824            )?,
1825            keys_offset: keys_arr.offset(),
1826            values_offset: map.values().offset(),
1827        })
1828    }
1829
1830    fn encode_map_entries<W, O>(
1831        out: &mut W,
1832        keys: &GenericStringArray<O>,
1833        keys_offset: usize,
1834        start: usize,
1835        end: usize,
1836        mut write_item: impl FnMut(&mut W, usize) -> Result<(), AvroError>,
1837    ) -> Result<(), AvroError>
1838    where
1839        W: Write + ?Sized,
1840        O: OffsetSizeTrait,
1841    {
1842        encode_blocked_range(out, start, end, |out, j| {
1843            let j_key = j.saturating_sub(keys_offset);
1844            write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1845            write_item(out, j)
1846        })
1847    }
1848
1849    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1850        let offsets = self.map.offsets();
1851        let start = offsets[idx] as usize;
1852        let end = offsets[idx + 1] as usize;
1853        let write_item = |out: &mut W, j: usize| {
1854            let j_val = j.saturating_sub(self.values_offset);
1855            self.values.encode(out, j_val)
1856        };
1857        match self.keys {
1858            KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1859                out,
1860                arr,
1861                self.keys_offset,
1862                start,
1863                end,
1864                write_item,
1865            ),
1866            KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1867                out,
1868                arr,
1869                self.keys_offset,
1870                start,
1871                end,
1872                write_item,
1873            ),
1874        }
1875    }
1876}
1877
1878/// Avro `enum` encoder for Arrow `DictionaryArray<Int32, Utf8>`.
1879///
1880/// Per Avro spec, an enum is encoded as an **int** equal to the
1881/// zero-based position of the symbol in the schema’s `symbols` list.
1882/// We validate at construction that the dictionary values equal the symbols,
1883/// so we can directly write the key value here.
1884struct EnumEncoder<'a> {
1885    keys: &'a PrimitiveArray<Int32Type>,
1886}
1887impl EnumEncoder<'_> {
1888    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), AvroError> {
1889        write_int(out, self.keys.value(row))
1890    }
1891}
1892
1893struct UnionEncoder<'a> {
1894    encoders: Vec<FieldEncoder<'a>>,
1895    array: &'a UnionArray,
1896    type_id_to_encoder_index: Vec<Option<usize>>,
1897}
1898
1899impl<'a> UnionEncoder<'a> {
1900    fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result<Self, AvroError> {
1901        let DataType::Union(fields, UnionMode::Dense) = array.data_type() else {
1902            return Err(AvroError::SchemaError("Expected Dense UnionArray".into()));
1903        };
1904        if fields.len() != field_bindings.len() {
1905            return Err(AvroError::SchemaError(format!(
1906                "Mismatched number of union branches between Arrow array ({}) and encoding plan ({})",
1907                fields.len(),
1908                field_bindings.len()
1909            )));
1910        }
1911        let max_type_id = fields.iter().map(|(tid, _)| tid).max().unwrap_or(0);
1912        let mut type_id_to_encoder_index: Vec<Option<usize>> =
1913            vec![None; (max_type_id + 1) as usize];
1914        let mut encoders = Vec::with_capacity(fields.len());
1915        for (i, (type_id, _)) in fields.iter().enumerate() {
1916            let binding = field_bindings
1917                .get(i)
1918                .ok_or_else(|| AvroError::SchemaError("Binding and field mismatch".to_string()))?;
1919            encoders.push(FieldEncoder::make_encoder(
1920                array.child(type_id).as_ref(),
1921                &binding.plan,
1922                binding.nullability,
1923            )?);
1924            type_id_to_encoder_index[type_id as usize] = Some(i);
1925        }
1926        Ok(Self {
1927            encoders,
1928            array,
1929            type_id_to_encoder_index,
1930        })
1931    }
1932
1933    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1934        // SAFETY: `idx` is always in bounds because:
1935        // 1. The encoder is called from `RecordEncoder::encode,` which iterates over `0..batch.num_rows()`
1936        // 2. `self.array` is a column from the same batch, so its length equals `batch.num_rows()`
1937        // 3. `type_ids()` returns a buffer with exactly `self.array.len()` entries (one per logical element)
1938        let type_id = self.array.type_ids()[idx];
1939        let encoder_index = self
1940            .type_id_to_encoder_index
1941            .get(type_id as usize)
1942            .and_then(|opt| *opt)
1943            .ok_or_else(|| AvroError::SchemaError(format!("Invalid type_id {type_id}")))?;
1944        write_int(out, encoder_index as i32)?;
1945        let encoder = self.encoders.get_mut(encoder_index).ok_or_else(|| {
1946            AvroError::SchemaError(format!("Invalid encoder index {encoder_index}"))
1947        })?;
1948        encoder.encode(out, self.array.value_offset(idx))
1949    }
1950}
1951
1952struct StructEncoder<'a> {
1953    encoders: Vec<FieldEncoder<'a>>,
1954}
1955
1956impl<'a> StructEncoder<'a> {
1957    fn try_new(array: &'a StructArray, field_bindings: &[FieldBinding]) -> Result<Self, AvroError> {
1958        let mut encoders = Vec::with_capacity(field_bindings.len());
1959        for field_binding in field_bindings {
1960            let idx = field_binding.arrow_index;
1961            let column = array.columns().get(idx).ok_or_else(|| {
1962                AvroError::SchemaError(format!("Struct child index {idx} out of range"))
1963            })?;
1964            let encoder = FieldEncoder::make_encoder(
1965                column.as_ref(),
1966                &field_binding.plan,
1967                field_binding.nullability,
1968            )?;
1969            encoders.push(encoder);
1970        }
1971        Ok(Self { encoders })
1972    }
1973
1974    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1975        for encoder in self.encoders.iter_mut() {
1976            encoder.encode(out, idx)?;
1977        }
1978        Ok(())
1979    }
1980}
1981
1982/// Encode a blocked range of items with Avro array block framing.
1983///
1984/// `write_item` must take `(out, index)` to maintain the "out-first" convention.
1985fn encode_blocked_range<W: Write + ?Sized, F>(
1986    out: &mut W,
1987    start: usize,
1988    end: usize,
1989    mut write_item: F,
1990) -> Result<(), AvroError>
1991where
1992    F: FnMut(&mut W, usize) -> Result<(), AvroError>,
1993{
1994    let len = end.saturating_sub(start);
1995    if len == 0 {
1996        // Zero-length terminator per Avro spec.
1997        write_long(out, 0)?;
1998        return Ok(());
1999    }
2000    // Emit a single positive block for performance, then the end marker.
2001    write_long(out, len as i64)?;
2002    for row in start..end {
2003        write_item(out, row)?;
2004    }
2005    write_long(out, 0)?;
2006    Ok(())
2007}
2008
2009struct ListEncoder<'a, O: OffsetSizeTrait> {
2010    list: &'a GenericListArray<O>,
2011    values: FieldEncoder<'a>,
2012    values_offset: usize,
2013}
2014
2015type ListEncoder32<'a> = ListEncoder<'a, i32>;
2016type ListEncoder64<'a> = ListEncoder<'a, i64>;
2017
2018impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
2019    fn try_new(
2020        list: &'a GenericListArray<O>,
2021        items_nullability: Option<Nullability>,
2022        item_plan: &FieldPlan,
2023    ) -> Result<Self, AvroError> {
2024        Ok(Self {
2025            list,
2026            values: FieldEncoder::make_encoder(
2027                list.values().as_ref(),
2028                item_plan,
2029                items_nullability,
2030            )?,
2031            values_offset: list.values().offset(),
2032        })
2033    }
2034
2035    fn encode_list_range<W: Write + ?Sized>(
2036        &mut self,
2037        out: &mut W,
2038        start: usize,
2039        end: usize,
2040    ) -> Result<(), AvroError> {
2041        encode_blocked_range(out, start, end, |out, row| {
2042            self.values
2043                .encode(out, row.saturating_sub(self.values_offset))
2044        })
2045    }
2046
2047    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2048        let offsets = self.list.offsets();
2049        let start = offsets[idx].to_usize().ok_or_else(|| {
2050            AvroError::InvalidArgument(format!("Error converting offset[{idx}] to usize"))
2051        })?;
2052        let end = offsets[idx + 1].to_usize().ok_or_else(|| {
2053            AvroError::InvalidArgument(format!("Error converting offset[{}] to usize", idx + 1))
2054        })?;
2055        self.encode_list_range(out, start, end)
2056    }
2057}
2058
2059/// ListView encoder using `(offset, size)` buffers.
2060struct ListViewEncoder<'a, O: OffsetSizeTrait> {
2061    list: &'a GenericListViewArray<O>,
2062    values: FieldEncoder<'a>,
2063    values_offset: usize,
2064}
2065type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
2066type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
2067
2068impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
2069    fn try_new(
2070        list: &'a GenericListViewArray<O>,
2071        items_nullability: Option<Nullability>,
2072        item_plan: &FieldPlan,
2073    ) -> Result<Self, AvroError> {
2074        Ok(Self {
2075            list,
2076            values: FieldEncoder::make_encoder(
2077                list.values().as_ref(),
2078                item_plan,
2079                items_nullability,
2080            )?,
2081            values_offset: list.values().offset(),
2082        })
2083    }
2084
2085    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2086        let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
2087            AvroError::InvalidArgument(format!("Error converting value_offset[{idx}] to usize"))
2088        })?;
2089        let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
2090            AvroError::InvalidArgument(format!("Error converting value_size[{idx}] to usize"))
2091        })?;
2092        let start = start + self.values_offset;
2093        let end = start + len;
2094        encode_blocked_range(out, start, end, |out, row| {
2095            self.values
2096                .encode(out, row.saturating_sub(self.values_offset))
2097        })
2098    }
2099}
2100
2101/// FixedSizeList encoder.
2102struct FixedSizeListEncoder<'a> {
2103    list: &'a FixedSizeListArray,
2104    values: FieldEncoder<'a>,
2105    values_offset: usize,
2106    elem_len: usize,
2107}
2108
2109impl<'a> FixedSizeListEncoder<'a> {
2110    fn try_new(
2111        list: &'a FixedSizeListArray,
2112        items_nullability: Option<Nullability>,
2113        item_plan: &FieldPlan,
2114    ) -> Result<Self, AvroError> {
2115        Ok(Self {
2116            list,
2117            values: FieldEncoder::make_encoder(
2118                list.values().as_ref(),
2119                item_plan,
2120                items_nullability,
2121            )?,
2122            values_offset: list.values().offset(),
2123            elem_len: list.value_length() as usize,
2124        })
2125    }
2126
2127    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2128        // Starting index is relative to values() start
2129        let rel = self.list.value_offset(idx) as usize;
2130        let start = self.values_offset + rel;
2131        let end = start + self.elem_len;
2132        encode_blocked_range(out, start, end, |out, row| {
2133            self.values
2134                .encode(out, row.saturating_sub(self.values_offset))
2135        })
2136    }
2137}
2138
2139/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`.
2140/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
2141struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
2142impl FixedEncoder<'_> {
2143    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2144        let v = self.0.value(idx); // &[u8] of fixed width
2145        out.write_all(v)?;
2146        Ok(())
2147    }
2148}
2149
2150/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) to Avro string (UUID).
2151/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated form.
2152struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
2153impl UuidEncoder<'_> {
2154    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2155        let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
2156        buf[0] = 0x48;
2157        let v = self.0.value(idx);
2158        let u = Uuid::from_slice(v)
2159            .map_err(|e| AvroError::InvalidArgument(format!("Invalid UUID bytes: {e}")))?;
2160        let _ = u.hyphenated().encode_lower(&mut buf[1..]);
2161        out.write_all(&buf)?;
2162        Ok(())
2163    }
2164}
2165
2166#[derive(Copy, Clone)]
2167struct DurationParts {
2168    months: u32,
2169    days: u32,
2170    millis: u32,
2171}
2172/// Trait mapping an Arrow interval native value to Avro duration `(months, days, millis)`.
2173trait IntervalToDurationParts: ArrowPrimitiveType {
2174    fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError>;
2175}
2176impl IntervalToDurationParts for IntervalMonthDayNanoType {
2177    fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
2178        let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
2179        if months < 0 || days < 0 || nanos < 0 {
2180            return Err(AvroError::InvalidArgument(
2181                "Avro 'duration' cannot encode negative months/days/nanoseconds; enable `avro_custom_types` to round-trip signed Arrow intervals".into(),
2182            ));
2183        }
2184        if nanos % 1_000_000 != 0 {
2185            return Err(AvroError::InvalidArgument(
2186                "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000 (enable `avro_custom_types` to preserve nanosecond intervals)"
2187                    .into(),
2188            ));
2189        }
2190        let millis = nanos / 1_000_000;
2191        if millis > u32::MAX as i64 {
2192            return Err(AvroError::InvalidArgument(
2193                "Avro 'duration' milliseconds exceed u32::MAX; enable `avro_custom_types` to preserve full Arrow Interval(MonthDayNano) range".into(),
2194            ));
2195        }
2196        Ok(DurationParts {
2197            months: months as u32,
2198            days: days as u32,
2199            millis: millis as u32,
2200        })
2201    }
2202}
2203impl IntervalToDurationParts for IntervalYearMonthType {
2204    fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
2205        if native < 0 {
2206            return Err(AvroError::InvalidArgument(
2207                "Avro 'duration' cannot encode negative months; enable `avro_custom_types` to round-trip signed Arrow Interval(YearMonth)".into(),
2208            ));
2209        }
2210        Ok(DurationParts {
2211            months: native as u32,
2212            days: 0,
2213            millis: 0,
2214        })
2215    }
2216}
2217impl IntervalToDurationParts for IntervalDayTimeType {
2218    fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
2219        let (days, millis) = IntervalDayTimeType::to_parts(native);
2220        if days < 0 || millis < 0 {
2221            return Err(AvroError::InvalidArgument(
2222                "Avro 'duration' cannot encode negative days or milliseconds; enable `avro_custom_types` to round-trip signed Arrow Interval(DayTime)".into(),
2223            ));
2224        }
2225        Ok(DurationParts {
2226            months: 0,
2227            days: days as u32,
2228            millis: millis as u32,
2229        })
2230    }
2231}
2232
2233/// Single generic encoder used for all three interval units.
2234/// Writes Avro `fixed(12)` as three little-endian u32 values in one call.
2235struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
2236impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
2237    #[inline(always)]
2238    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2239        let parts = P::duration_parts(self.0.value(idx))?;
2240        let months = parts.months.to_le_bytes();
2241        let days = parts.days.to_le_bytes();
2242        let ms = parts.millis.to_le_bytes();
2243        // SAFETY
2244        // - Endianness & layout: Avro's `duration` logical type is encoded as fixed(12)
2245        //   with three *little-endian* unsigned 32-bit integers in order: (months, days, millis).
2246        //   We explicitly materialize exactly those 12 bytes.
2247        // - In-bounds indexing: `to_le_bytes()` on `u32` returns `[u8; 4]` by contract,
2248        //   therefore, the constant indices 0..=3 used below are *always* in-bounds.
2249        //   Rust will panic on out-of-bounds indexing, but there is no such path here;
2250        //   the compiler can also elide the bound checks for constant, provably in-range
2251        //   indices. [std docs; Rust Performance Book on bounds-check elimination]
2252        // - Memory safety: The `[u8; 12]` array is built on the stack by value, with no
2253        //   aliasing and no uninitialized memory. There is no `unsafe`.
2254        // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated as an AvroError,
2255        //   so I/O errors are reported, not panicked.
2256        // Consequently, constructing `buf` with the constant indices below is safe and
2257        // panic-free under these validated preconditions.
2258        let buf = [
2259            months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
2260            ms[1], ms[2], ms[3],
2261        ];
2262        out.write_all(&buf)?;
2263        Ok(())
2264    }
2265}
2266
2267/// Minimal trait to obtain a big-endian fixed-size byte array for a decimal's
2268/// unscaled integer value at `idx`.
2269trait DecimalBeBytes<const N: usize> {
2270    fn value_be_bytes(&self, idx: usize) -> [u8; N];
2271}
2272#[cfg(feature = "small_decimals")]
2273impl DecimalBeBytes<4> for Decimal32Array {
2274    fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
2275        self.value(idx).to_be_bytes()
2276    }
2277}
2278#[cfg(feature = "small_decimals")]
2279impl DecimalBeBytes<8> for Decimal64Array {
2280    fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
2281        self.value(idx).to_be_bytes()
2282    }
2283}
2284impl DecimalBeBytes<16> for Decimal128Array {
2285    fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
2286        self.value(idx).to_be_bytes()
2287    }
2288}
2289impl DecimalBeBytes<32> for Decimal256Array {
2290    fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
2291        // Arrow i256 → [u8; 32] big-endian
2292        self.value(idx).to_be_bytes()
2293    }
2294}
2295
2296/// Generic Avro decimal encoder over Arrow decimal arrays.
2297/// - When `fixed_size` is `None` → Avro `bytes(decimal)`; writes the minimal
2298///   two's-complement representation with a length prefix.
2299/// - When `Some(n)` → Avro `fixed(n, decimal)`; sign-extends (or validates)
2300///   to exactly `n` bytes and writes them directly.
2301struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
2302    arr: &'a A,
2303    fixed_size: Option<usize>,
2304}
2305
2306impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
2307    fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
2308        Self { arr, fixed_size }
2309    }
2310
2311    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2312        let be = self.arr.value_be_bytes(idx);
2313        match self.fixed_size {
2314            Some(n) => write_sign_extended(out, &be, n),
2315            None => write_len_prefixed(out, minimal_twos_complement(&be)),
2316        }
2317    }
2318}
2319
2320#[cfg(feature = "small_decimals")]
2321type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
2322#[cfg(feature = "small_decimals")]
2323type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
2324type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
2325type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
2326
2327/// Generic encoder for Arrow `RunArray<R>`-based sites (run-end encoded).
2328/// Follows the pattern used by other generic encoders (i.e., `ListEncoder<O>`),
2329/// avoiding runtime branching on run-end width.
2330struct RunEncodedEncoder<'a, R: RunEndIndexType> {
2331    ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
2332    base: usize,
2333    len: usize,
2334    values: FieldEncoder<'a>,
2335    // Cached run index used for sequential scans of rows [0..n)
2336    cur_run: usize,
2337    // Cached end (logical index, 1-based per spec) for the current run.
2338    cur_end: usize,
2339}
2340
2341type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
2342type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
2343type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
2344
2345impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
2346    fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
2347        let ends = arr.run_ends();
2348        let base = ends.get_start_physical_index();
2349        let slice = ends.values();
2350        let len = ends.len();
2351        let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
2352        Self {
2353            ends_slice: slice,
2354            base,
2355            len,
2356            values,
2357            cur_run: 0,
2358            cur_end,
2359        }
2360    }
2361
2362    /// Advance `cur_run` so that `idx` is within the run ending at `cur_end`.
2363    /// Uses the REE invariant: run ends are strictly increasing, positive, and 1-based.
2364    #[inline(always)]
2365    fn advance_to_row(&mut self, idx: usize) -> Result<(), AvroError> {
2366        if idx < self.cur_end {
2367            return Ok(());
2368        }
2369        // Move forward across run boundaries until idx falls within cur_end
2370        while self.cur_run + 1 < self.len && idx >= self.cur_end {
2371            self.cur_run += 1;
2372            self.cur_end = self.ends_slice[self.base + self.cur_run].as_usize();
2373        }
2374        if idx < self.cur_end {
2375            Ok(())
2376        } else {
2377            Err(AvroError::InvalidArgument(format!(
2378                "row index {idx} out of bounds for run-ends ({} runs)",
2379                self.len
2380            )))
2381        }
2382    }
2383
2384    #[inline(always)]
2385    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
2386        self.advance_to_row(idx)?;
2387        // For REE values, the value for any logical row within a run is at
2388        // the physical index of that run.
2389        self.values.encode(out, self.cur_run)
2390    }
2391}
2392
2393#[cfg(test)]
2394mod tests {
2395    use super::*;
2396    use arrow_array::types::Int32Type;
2397    use arrow_array::{
2398        Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
2399        Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, NullArray,
2400        StringArray,
2401    };
2402    use arrow_buffer::Buffer;
2403    use arrow_schema::{DataType, Field, Fields, UnionFields};
2404
2405    fn zigzag_i64(v: i64) -> u64 {
2406        ((v << 1) ^ (v >> 63)) as u64
2407    }
2408
2409    fn varint(mut x: u64) -> Vec<u8> {
2410        let mut out = Vec::new();
2411        while (x & !0x7f) != 0 {
2412            out.push(((x & 0x7f) as u8) | 0x80);
2413            x >>= 7;
2414        }
2415        out.push((x & 0x7f) as u8);
2416        out
2417    }
2418
2419    fn avro_long_bytes(v: i64) -> Vec<u8> {
2420        varint(zigzag_i64(v))
2421    }
2422
2423    fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
2424        let mut out = avro_long_bytes(payload.len() as i64);
2425        out.extend_from_slice(payload);
2426        out
2427    }
2428
2429    fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
2430        let m = months.to_le_bytes();
2431        let d = days.to_le_bytes();
2432        let ms = millis.to_le_bytes();
2433        [
2434            m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
2435        ]
2436    }
2437
2438    #[cfg(feature = "avro_custom_types")]
2439    fn interval_mdn_fixed16(months: i32, days: i32, nanos: i64) -> [u8; 16] {
2440        let m = months.to_le_bytes();
2441        let d = days.to_le_bytes();
2442        let n = nanos.to_le_bytes();
2443        [
2444            m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], n[0], n[1], n[2], n[3], n[4], n[5],
2445            n[6], n[7],
2446        ]
2447    }
2448
2449    fn encode_all(
2450        array: &dyn Array,
2451        plan: &FieldPlan,
2452        nullability: Option<Nullability>,
2453    ) -> Vec<u8> {
2454        let mut enc = FieldEncoder::make_encoder(array, plan, nullability).unwrap();
2455        let mut out = Vec::new();
2456        for i in 0..array.len() {
2457            enc.encode(&mut out, i).unwrap();
2458        }
2459        out
2460    }
2461
2462    fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
2463        if actual != expected {
2464            let to_hex = |b: &[u8]| {
2465                b.iter()
2466                    .map(|x| format!("{:02X}", x))
2467                    .collect::<Vec<_>>()
2468                    .join(" ")
2469            };
2470            panic!(
2471                "mismatch\n  expected: [{}]\n    actual: [{}]",
2472                to_hex(expected),
2473                to_hex(actual)
2474            );
2475        }
2476    }
2477
2478    fn row_slice<'a>(buf: &'a [u8], offsets: &[usize], row: usize) -> &'a [u8] {
2479        let start = offsets[row];
2480        let end = offsets[row + 1];
2481        &buf[start..end]
2482    }
2483
2484    #[test]
2485    fn binary_encoder() {
2486        let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
2487        let arr = BinaryArray::from_vec(values);
2488        let mut expected = Vec::new();
2489        for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
2490            expected.extend(avro_len_prefixed_bytes(payload));
2491        }
2492        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2493        assert_bytes_eq(&got, &expected);
2494    }
2495
2496    #[test]
2497    fn large_binary_encoder() {
2498        let values: Vec<&[u8]> = vec![b"xyz", b""];
2499        let arr = LargeBinaryArray::from_vec(values);
2500        let mut expected = Vec::new();
2501        for payload in [b"xyz" as &[u8], b""] {
2502            expected.extend(avro_len_prefixed_bytes(payload));
2503        }
2504        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2505        assert_bytes_eq(&got, &expected);
2506    }
2507
2508    #[test]
2509    fn utf8_encoder() {
2510        let arr = StringArray::from(vec!["", "A", "BC"]);
2511        let mut expected = Vec::new();
2512        for s in ["", "A", "BC"] {
2513            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2514        }
2515        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2516        assert_bytes_eq(&got, &expected);
2517    }
2518
2519    #[test]
2520    fn large_utf8_encoder() {
2521        let arr = LargeStringArray::from(vec!["hello", ""]);
2522        let mut expected = Vec::new();
2523        for s in ["hello", ""] {
2524            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2525        }
2526        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2527        assert_bytes_eq(&got, &expected);
2528    }
2529
2530    #[test]
2531    fn list_encoder_int32() {
2532        // Build ListArray [[1,2], [], [3]]
2533        let values = Int32Array::from(vec![1, 2, 3]);
2534        let offsets = vec![0, 2, 2, 3];
2535        let list = ListArray::new(
2536            Field::new("item", DataType::Int32, true).into(),
2537            arrow_buffer::OffsetBuffer::new(offsets.into()),
2538            Arc::new(values) as ArrayRef,
2539            None,
2540        );
2541        // Avro array encoding per row
2542        let mut expected = Vec::new();
2543        // row 0: block len 2, items 1,2 then 0
2544        expected.extend(avro_long_bytes(2));
2545        expected.extend(avro_long_bytes(1));
2546        expected.extend(avro_long_bytes(2));
2547        expected.extend(avro_long_bytes(0));
2548        // row 1: empty
2549        expected.extend(avro_long_bytes(0));
2550        // row 2: one item 3
2551        expected.extend(avro_long_bytes(1));
2552        expected.extend(avro_long_bytes(3));
2553        expected.extend(avro_long_bytes(0));
2554
2555        let plan = FieldPlan::List {
2556            items_nullability: None,
2557            item_plan: Box::new(FieldPlan::Scalar),
2558        };
2559        let got = encode_all(&list, &plan, None);
2560        assert_bytes_eq(&got, &expected);
2561    }
2562
2563    #[test]
2564    fn struct_encoder_two_fields() {
2565        // Struct { a: Int32, b: Utf8 }
2566        let a = Int32Array::from(vec![1, 2]);
2567        let b = StringArray::from(vec!["x", "y"]);
2568        let fields = Fields::from(vec![
2569            Field::new("a", DataType::Int32, true),
2570            Field::new("b", DataType::Utf8, true),
2571        ]);
2572        let struct_arr = StructArray::new(
2573            fields.clone(),
2574            vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2575            None,
2576        );
2577        let plan = FieldPlan::Struct {
2578            bindings: vec![
2579                FieldBinding {
2580                    arrow_index: 0,
2581                    nullability: None,
2582                    plan: FieldPlan::Scalar,
2583                },
2584                FieldBinding {
2585                    arrow_index: 1,
2586                    nullability: None,
2587                    plan: FieldPlan::Scalar,
2588                },
2589            ],
2590        };
2591        let got = encode_all(&struct_arr, &plan, None);
2592        // Expected: rows concatenated: a then b
2593        let mut expected = Vec::new();
2594        expected.extend(avro_long_bytes(1)); // a=1
2595        expected.extend(avro_len_prefixed_bytes(b"x")); // b="x"
2596        expected.extend(avro_long_bytes(2)); // a=2
2597        expected.extend(avro_len_prefixed_bytes(b"y")); // b="y"
2598        assert_bytes_eq(&got, &expected);
2599    }
2600
2601    #[test]
2602    fn enum_encoder_dictionary() {
2603        // symbols: ["A","B","C"], keys [2,0,1]
2604        let dict_values = StringArray::from(vec!["A", "B", "C"]);
2605        let keys = Int32Array::from(vec![2, 0, 1]);
2606        let dict =
2607            DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
2608        let symbols = Arc::<[String]>::from(
2609            vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
2610        );
2611        let plan = FieldPlan::Enum { symbols };
2612        let got = encode_all(&dict, &plan, None);
2613        let mut expected = Vec::new();
2614        expected.extend(avro_long_bytes(2));
2615        expected.extend(avro_long_bytes(0));
2616        expected.extend(avro_long_bytes(1));
2617        assert_bytes_eq(&got, &expected);
2618    }
2619
2620    #[test]
2621    fn decimal_bytes_and_fixed() {
2622        // Use Decimal128 with small positives and negatives
2623        let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
2624            .with_precision_and_scale(20, 0)
2625            .unwrap();
2626        // bytes(decimal): minimal two's complement length-prefixed
2627        let plan_bytes = FieldPlan::Decimal { size: None };
2628        let got_bytes = encode_all(&dec, &plan_bytes, None);
2629        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
2630        let mut expected_bytes = Vec::new();
2631        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2632        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2633        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2634        assert_bytes_eq(&got_bytes, &expected_bytes);
2635
2636        let plan_fixed = FieldPlan::Decimal { size: Some(16) };
2637        let got_fixed = encode_all(&dec, &plan_fixed, None);
2638        let mut expected_fixed = Vec::new();
2639        expected_fixed.extend_from_slice(&1i128.to_be_bytes());
2640        expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
2641        expected_fixed.extend_from_slice(&0i128.to_be_bytes());
2642        assert_bytes_eq(&got_fixed, &expected_fixed);
2643    }
2644
2645    #[test]
2646    fn decimal_bytes_256() {
2647        use arrow_buffer::i256;
2648        // Use Decimal256 with small positives and negatives
2649        let dec = Decimal256Array::from(vec![
2650            i256::from_i128(1),
2651            i256::from_i128(-1),
2652            i256::from_i128(0),
2653        ])
2654        .with_precision_and_scale(76, 0)
2655        .unwrap();
2656        // bytes(decimal): minimal two's complement length-prefixed
2657        let plan_bytes = FieldPlan::Decimal { size: None };
2658        let got_bytes = encode_all(&dec, &plan_bytes, None);
2659        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
2660        let mut expected_bytes = Vec::new();
2661        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2662        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2663        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2664        assert_bytes_eq(&got_bytes, &expected_bytes);
2665
2666        // fixed(32): 32-byte big-endian two's complement
2667        let plan_fixed = FieldPlan::Decimal { size: Some(32) };
2668        let got_fixed = encode_all(&dec, &plan_fixed, None);
2669        let mut expected_fixed = Vec::new();
2670        expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
2671        expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
2672        expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
2673        assert_bytes_eq(&got_fixed, &expected_fixed);
2674    }
2675
2676    #[cfg(feature = "small_decimals")]
2677    #[test]
2678    fn decimal_bytes_and_fixed_32() {
2679        // Use Decimal32 with small positives and negatives
2680        let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
2681            .with_precision_and_scale(9, 0)
2682            .unwrap();
2683        // bytes(decimal)
2684        let plan_bytes = FieldPlan::Decimal { size: None };
2685        let got_bytes = encode_all(&dec, &plan_bytes, None);
2686        let mut expected_bytes = Vec::new();
2687        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2688        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2689        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2690        assert_bytes_eq(&got_bytes, &expected_bytes);
2691        // fixed(4)
2692        let plan_fixed = FieldPlan::Decimal { size: Some(4) };
2693        let got_fixed = encode_all(&dec, &plan_fixed, None);
2694        let mut expected_fixed = Vec::new();
2695        expected_fixed.extend_from_slice(&1i32.to_be_bytes());
2696        expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
2697        expected_fixed.extend_from_slice(&0i32.to_be_bytes());
2698        assert_bytes_eq(&got_fixed, &expected_fixed);
2699    }
2700
2701    #[cfg(feature = "small_decimals")]
2702    #[test]
2703    fn decimal_bytes_and_fixed_64() {
2704        // Use Decimal64 with small positives and negatives
2705        let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
2706            .with_precision_and_scale(18, 0)
2707            .unwrap();
2708        // bytes(decimal)
2709        let plan_bytes = FieldPlan::Decimal { size: None };
2710        let got_bytes = encode_all(&dec, &plan_bytes, None);
2711        let mut expected_bytes = Vec::new();
2712        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2713        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2714        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2715        assert_bytes_eq(&got_bytes, &expected_bytes);
2716        // fixed(8)
2717        let plan_fixed = FieldPlan::Decimal { size: Some(8) };
2718        let got_fixed = encode_all(&dec, &plan_fixed, None);
2719        let mut expected_fixed = Vec::new();
2720        expected_fixed.extend_from_slice(&1i64.to_be_bytes());
2721        expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
2722        expected_fixed.extend_from_slice(&0i64.to_be_bytes());
2723        assert_bytes_eq(&got_fixed, &expected_fixed);
2724    }
2725
2726    #[test]
2727    fn float32_and_float64_encoders() {
2728        let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); // includes a quiet NaN bit pattern
2729        let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
2730        // f32 expected
2731        let mut expected32 = Vec::new();
2732        for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
2733            expected32.extend_from_slice(&v.to_bits().to_le_bytes());
2734        }
2735        let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
2736        assert_bytes_eq(&got32, &expected32);
2737        // f64 expected
2738        let mut expected64 = Vec::new();
2739        for v in [0.0f64, -2.25f64] {
2740            expected64.extend_from_slice(&v.to_bits().to_le_bytes());
2741        }
2742        let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
2743        assert_bytes_eq(&got64, &expected64);
2744    }
2745
2746    #[test]
2747    fn long_encoder_int64() {
2748        let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
2749        let mut expected = Vec::new();
2750        for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
2751            expected.extend(avro_long_bytes(v));
2752        }
2753        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2754        assert_bytes_eq(&got, &expected);
2755    }
2756
2757    #[test]
2758    fn fixed_encoder_plain() {
2759        // Two values of width 4
2760        let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
2761        let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
2762        let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
2763        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2764        let mut expected = Vec::new();
2765        expected.extend_from_slice(&data[0]);
2766        expected.extend_from_slice(&data[1]);
2767        assert_bytes_eq(&got, &expected);
2768    }
2769
2770    #[test]
2771    fn uuid_encoder_test() {
2772        // Happy path
2773        let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
2774        let bytes = *u.as_bytes();
2775        let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
2776        // Expected: length 36 (0x48) followed by hyphenated lowercase text
2777        let mut expected = Vec::new();
2778        expected.push(0x48);
2779        expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
2780        let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
2781        assert_bytes_eq(&got, &expected);
2782    }
2783
2784    #[test]
2785    fn uuid_encoder_error() {
2786        // Invalid UUID bytes: wrong length
2787        let arr =
2788            FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
2789                .unwrap();
2790        let plan = FieldPlan::Uuid;
2791        let mut enc = FieldEncoder::make_encoder(&arr, &plan, None).unwrap();
2792        let mut out = Vec::new();
2793        let err = enc.encode(&mut out, 0).unwrap_err();
2794        match err {
2795            AvroError::InvalidArgument(msg) => {
2796                assert!(msg.contains("Invalid UUID bytes"))
2797            }
2798            other => panic!("expected InvalidArgument, got {other:?}"),
2799        }
2800    }
2801
2802    fn test_scalar_primitive_encoding<T>(
2803        non_nullable_data: &[T::Native],
2804        nullable_data: &[Option<T::Native>],
2805    ) where
2806        T: ArrowPrimitiveType,
2807        T::Native: Into<i64> + Copy,
2808        PrimitiveArray<T>: From<Vec<<T as ArrowPrimitiveType>::Native>>,
2809    {
2810        let plan = FieldPlan::Scalar;
2811
2812        let array = PrimitiveArray::<T>::from(non_nullable_data.to_vec());
2813        let got = encode_all(&array, &plan, None);
2814
2815        let mut expected = Vec::new();
2816        for &value in non_nullable_data {
2817            expected.extend(avro_long_bytes(value.into()));
2818        }
2819        assert_bytes_eq(&got, &expected);
2820
2821        let array_nullable: PrimitiveArray<T> = nullable_data.iter().copied().collect();
2822        let got_nullable = encode_all(&array_nullable, &plan, Some(Nullability::NullFirst));
2823
2824        let mut expected_nullable = Vec::new();
2825        for &opt_value in nullable_data {
2826            match opt_value {
2827                Some(value) => {
2828                    // Union index 1 for the value, then the value itself
2829                    expected_nullable.extend(avro_long_bytes(1));
2830                    expected_nullable.extend(avro_long_bytes(value.into()));
2831                }
2832                None => {
2833                    // Union index 0 for the null
2834                    expected_nullable.extend(avro_long_bytes(0));
2835                }
2836            }
2837        }
2838        assert_bytes_eq(&got_nullable, &expected_nullable);
2839    }
2840
2841    #[test]
2842    fn date32_encoder() {
2843        test_scalar_primitive_encoding::<Date32Type>(
2844            &[
2845                19345, // 2022-12-20
2846                0,     // 1970-01-01 (epoch)
2847                -1,    // 1969-12-31 (pre-epoch)
2848            ],
2849            &[Some(19345), None],
2850        );
2851    }
2852
2853    #[test]
2854    fn time32_millis_encoder() {
2855        test_scalar_primitive_encoding::<Time32MillisecondType>(
2856            &[
2857                0,        // Midnight
2858                49530123, // 13:45:30.123
2859                86399999, // 23:59:59.999
2860            ],
2861            &[None, Some(49530123)],
2862        );
2863    }
2864
2865    #[test]
2866    fn time64_micros_encoder() {
2867        test_scalar_primitive_encoding::<Time64MicrosecondType>(
2868            &[
2869                0,           // Midnight
2870                86399999999, // 23:59:59.999999
2871            ],
2872            &[Some(86399999999), None],
2873        );
2874    }
2875
2876    #[test]
2877    fn timestamp_millis_encoder() {
2878        test_scalar_primitive_encoding::<TimestampMillisecondType>(
2879            &[
2880                1704067200000, // 2024-01-01T00:00:00Z
2881                0,             // 1970-01-01T00:00:00Z (epoch)
2882                -123456789,    // Pre-epoch timestamp
2883            ],
2884            &[None, Some(1704067200000)],
2885        );
2886    }
2887
2888    #[test]
2889    fn map_encoder_string_keys_int_values() {
2890        // Build MapArray with two rows
2891        // Row0: {"k1":1, "k2":2}
2892        // Row1: {}
2893        let keys = StringArray::from(vec!["k1", "k2"]);
2894        let values = Int32Array::from(vec![1, 2]);
2895        let entries_fields = Fields::from(vec![
2896            Field::new("key", DataType::Utf8, false),
2897            Field::new("value", DataType::Int32, true),
2898        ]);
2899        let entries = StructArray::new(
2900            entries_fields,
2901            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2902            None,
2903        );
2904        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
2905        let map = MapArray::new(
2906            Field::new("entries", entries.data_type().clone(), false).into(),
2907            offsets,
2908            entries,
2909            None,
2910            false,
2911        );
2912        let plan = FieldPlan::Map {
2913            values_nullability: None,
2914            value_plan: Box::new(FieldPlan::Scalar),
2915        };
2916        let got = encode_all(&map, &plan, None);
2917        let mut expected = Vec::new();
2918        // Row0: block 2 then pairs
2919        expected.extend(avro_long_bytes(2));
2920        expected.extend(avro_len_prefixed_bytes(b"k1"));
2921        expected.extend(avro_long_bytes(1));
2922        expected.extend(avro_len_prefixed_bytes(b"k2"));
2923        expected.extend(avro_long_bytes(2));
2924        expected.extend(avro_long_bytes(0));
2925        // Row1: empty
2926        expected.extend(avro_long_bytes(0));
2927        assert_bytes_eq(&got, &expected);
2928    }
2929
2930    #[test]
2931    fn union_encoder_string_int() {
2932        let strings = StringArray::from(vec!["hello", "world"]);
2933        let ints = Int32Array::from(vec![10, 20, 30]);
2934
2935        let union_fields = UnionFields::try_new(
2936            vec![0, 1],
2937            vec![
2938                Field::new("v_str", DataType::Utf8, true),
2939                Field::new("v_int", DataType::Int32, true),
2940            ],
2941        )
2942        .unwrap();
2943
2944        let type_ids = Buffer::from_slice_ref([0_i8, 1, 1, 0, 1]);
2945        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2946
2947        let union_array = UnionArray::try_new(
2948            union_fields,
2949            type_ids.into(),
2950            Some(offsets.into()),
2951            vec![Arc::new(strings), Arc::new(ints)],
2952        )
2953        .unwrap();
2954
2955        let plan = FieldPlan::Union {
2956            bindings: vec![
2957                FieldBinding {
2958                    arrow_index: 0,
2959                    nullability: None,
2960                    plan: FieldPlan::Scalar,
2961                },
2962                FieldBinding {
2963                    arrow_index: 1,
2964                    nullability: None,
2965                    plan: FieldPlan::Scalar,
2966                },
2967            ],
2968        };
2969
2970        let got = encode_all(&union_array, &plan, None);
2971
2972        let mut expected = Vec::new();
2973        expected.extend(avro_long_bytes(0));
2974        expected.extend(avro_len_prefixed_bytes(b"hello"));
2975        expected.extend(avro_long_bytes(1));
2976        expected.extend(avro_long_bytes(10));
2977        expected.extend(avro_long_bytes(1));
2978        expected.extend(avro_long_bytes(20));
2979        expected.extend(avro_long_bytes(0));
2980        expected.extend(avro_len_prefixed_bytes(b"world"));
2981        expected.extend(avro_long_bytes(1));
2982        expected.extend(avro_long_bytes(30));
2983
2984        assert_bytes_eq(&got, &expected);
2985    }
2986
2987    #[test]
2988    fn union_encoder_null_string_int() {
2989        let nulls = NullArray::new(1);
2990        let strings = StringArray::from(vec!["hello"]);
2991        let ints = Int32Array::from(vec![10]);
2992
2993        let union_fields = UnionFields::try_new(
2994            vec![0, 1, 2],
2995            vec![
2996                Field::new("v_null", DataType::Null, true),
2997                Field::new("v_str", DataType::Utf8, true),
2998                Field::new("v_int", DataType::Int32, true),
2999            ],
3000        )
3001        .unwrap();
3002
3003        let type_ids = Buffer::from_slice_ref([0_i8, 1, 2]);
3004        // For a null value in a dense union, no value is added to a child array.
3005        // The offset points to the last value of that type. Since there's only one
3006        // null, and one of each other type, all offsets are 0.
3007        let offsets = Buffer::from_slice_ref([0_i32, 0, 0]);
3008
3009        let union_array = UnionArray::try_new(
3010            union_fields,
3011            type_ids.into(),
3012            Some(offsets.into()),
3013            vec![Arc::new(nulls), Arc::new(strings), Arc::new(ints)],
3014        )
3015        .unwrap();
3016
3017        let plan = FieldPlan::Union {
3018            bindings: vec![
3019                FieldBinding {
3020                    arrow_index: 0,
3021                    nullability: None,
3022                    plan: FieldPlan::Scalar,
3023                },
3024                FieldBinding {
3025                    arrow_index: 1,
3026                    nullability: None,
3027                    plan: FieldPlan::Scalar,
3028                },
3029                FieldBinding {
3030                    arrow_index: 2,
3031                    nullability: None,
3032                    plan: FieldPlan::Scalar,
3033                },
3034            ],
3035        };
3036
3037        let got = encode_all(&union_array, &plan, None);
3038
3039        let mut expected = Vec::new();
3040        expected.extend(avro_long_bytes(0));
3041        expected.extend(avro_long_bytes(1));
3042        expected.extend(avro_len_prefixed_bytes(b"hello"));
3043        expected.extend(avro_long_bytes(2));
3044        expected.extend(avro_long_bytes(10));
3045
3046        assert_bytes_eq(&got, &expected);
3047    }
3048
3049    #[test]
3050    fn list64_encoder_int32() {
3051        // LargeList [[1,2,3], []]
3052        let values = Int32Array::from(vec![1, 2, 3]);
3053        let offsets: Vec<i64> = vec![0, 3, 3];
3054        let list = LargeListArray::new(
3055            Field::new("item", DataType::Int32, true).into(),
3056            arrow_buffer::OffsetBuffer::new(offsets.into()),
3057            Arc::new(values) as ArrayRef,
3058            None,
3059        );
3060        let plan = FieldPlan::List {
3061            items_nullability: None,
3062            item_plan: Box::new(FieldPlan::Scalar),
3063        };
3064        let got = encode_all(&list, &plan, None);
3065        // Expected one block of 3 and then 0, then empty 0
3066        let mut expected = Vec::new();
3067        expected.extend(avro_long_bytes(3));
3068        expected.extend(avro_long_bytes(1));
3069        expected.extend(avro_long_bytes(2));
3070        expected.extend(avro_long_bytes(3));
3071        expected.extend(avro_long_bytes(0));
3072        expected.extend(avro_long_bytes(0));
3073        assert_bytes_eq(&got, &expected);
3074    }
3075
3076    #[test]
3077    fn int_encoder_test() {
3078        let ints = Int32Array::from(vec![0, -1, 2]);
3079        let mut expected_i = Vec::new();
3080        for v in [0i32, -1, 2] {
3081            expected_i.extend(avro_long_bytes(v as i64));
3082        }
3083        let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
3084        assert_bytes_eq(&got_i, &expected_i);
3085    }
3086
3087    #[test]
3088    fn boolean_encoder_test() {
3089        let bools = BooleanArray::from(vec![true, false]);
3090        let mut expected_b = Vec::new();
3091        expected_b.extend_from_slice(&[1]);
3092        expected_b.extend_from_slice(&[0]);
3093        let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
3094        assert_bytes_eq(&got_b, &expected_b);
3095    }
3096
3097    #[test]
3098    #[cfg(feature = "avro_custom_types")]
3099    fn duration_encoding_seconds() {
3100        let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
3101        let mut expected = Vec::new();
3102        for v in [0i64, -1, 2] {
3103            expected.extend_from_slice(&avro_long_bytes(v));
3104        }
3105        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3106        assert_bytes_eq(&got, &expected);
3107    }
3108
3109    #[test]
3110    #[cfg(feature = "avro_custom_types")]
3111    fn duration_encoding_milliseconds() {
3112        let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
3113        let mut expected = Vec::new();
3114        for v in [1i64, 0, -2] {
3115            expected.extend_from_slice(&avro_long_bytes(v));
3116        }
3117        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3118        assert_bytes_eq(&got, &expected);
3119    }
3120
3121    #[test]
3122    #[cfg(feature = "avro_custom_types")]
3123    fn duration_encoding_microseconds() {
3124        let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
3125        let mut expected = Vec::new();
3126        for v in [5i64, -6, 7] {
3127            expected.extend_from_slice(&avro_long_bytes(v));
3128        }
3129        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3130        assert_bytes_eq(&got, &expected);
3131    }
3132
3133    #[test]
3134    #[cfg(feature = "avro_custom_types")]
3135    fn duration_encoding_nanoseconds() {
3136        let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
3137        let mut expected = Vec::new();
3138        for v in [8i64, 9, -10] {
3139            expected.extend_from_slice(&avro_long_bytes(v));
3140        }
3141        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3142        assert_bytes_eq(&got, &expected);
3143    }
3144
3145    #[test]
3146    fn duration_encoder_year_month_happy_path() {
3147        let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
3148        let mut expected = Vec::new();
3149        for m in [0u32, 1u32, 25u32] {
3150            expected.extend_from_slice(&duration_fixed12(m, 0, 0));
3151        }
3152        let got = encode_all(&arr, &FieldPlan::Duration, None);
3153        assert_bytes_eq(&got, &expected);
3154    }
3155
3156    #[test]
3157    fn duration_encoder_year_month_rejects_negative() {
3158        let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
3159        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Duration, None).unwrap();
3160        let mut out = Vec::new();
3161        let err = enc.encode(&mut out, 0).unwrap_err();
3162        match err {
3163            AvroError::InvalidArgument(msg) => {
3164                assert!(msg.contains("cannot encode negative months"))
3165            }
3166            other => panic!("expected InvalidArgument, got {other:?}"),
3167        }
3168    }
3169
3170    #[test]
3171    fn duration_encoder_day_time_happy_path() {
3172        let v0 = IntervalDayTimeType::make_value(2, 500); // days=2, millis=500
3173        let v1 = IntervalDayTimeType::make_value(0, 0);
3174        let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
3175        let mut expected = Vec::new();
3176        expected.extend_from_slice(&duration_fixed12(0, 2, 500));
3177        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
3178        let got = encode_all(&arr, &FieldPlan::Duration, None);
3179        assert_bytes_eq(&got, &expected);
3180    }
3181
3182    #[test]
3183    fn duration_encoder_day_time_rejects_negative() {
3184        let bad = IntervalDayTimeType::make_value(-1, 0);
3185        let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
3186        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Duration, None).unwrap();
3187        let mut out = Vec::new();
3188        let err = enc.encode(&mut out, 0).unwrap_err();
3189        match err {
3190            AvroError::InvalidArgument(msg) => {
3191                assert!(msg.contains("cannot encode negative days"))
3192            }
3193            other => panic!("expected InvalidArgument, got {other:?}"),
3194        }
3195    }
3196
3197    #[cfg(feature = "avro_custom_types")]
3198    #[test]
3199    fn interval_month_day_nano_fixed_encoder_happy_path() {
3200        // Custom encoding stores raw i32/i32/i64 values in fixed(16)
3201        let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3); // sub-millisecond nanos OK
3202        let v1 = IntervalMonthDayNanoType::make_value(-4, -5, -6);
3203        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
3204
3205        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3206        let mut expected = Vec::new();
3207        expected.extend_from_slice(&interval_mdn_fixed16(1, 2, 3));
3208        expected.extend_from_slice(&interval_mdn_fixed16(-4, -5, -6));
3209        assert_bytes_eq(&got, &expected);
3210    }
3211
3212    #[test]
3213    fn duration_encoder_month_day_nano_happy_path() {
3214        let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); // -> millis = 3
3215        let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
3216        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
3217        let mut expected = Vec::new();
3218        expected.extend_from_slice(&duration_fixed12(1, 2, 3));
3219        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
3220        let got = encode_all(&arr, &FieldPlan::Duration, None);
3221        assert_bytes_eq(&got, &expected);
3222    }
3223
3224    #[test]
3225    fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
3226        let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
3227        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
3228        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Duration, None).unwrap();
3229        let mut out = Vec::new();
3230        let err = enc.encode(&mut out, 0).unwrap_err();
3231        match err {
3232            AvroError::InvalidArgument(msg) => {
3233                assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
3234            }
3235            other => panic!("expected InvalidArgument, got {other:?}"),
3236        }
3237    }
3238
3239    #[test]
3240    fn minimal_twos_complement_test() {
3241        let pos = [0x00, 0x00, 0x01];
3242        assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
3243        let neg = [0xFF, 0xFF, 0x80]; // negative minimal is 0x80
3244        assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
3245        let zero = [0x00, 0x00, 0x00];
3246        assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
3247    }
3248
3249    #[test]
3250    fn write_sign_extend_test() {
3251        let mut out = Vec::new();
3252        write_sign_extended(&mut out, &[0x01], 4).unwrap();
3253        assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
3254        out.clear();
3255        write_sign_extended(&mut out, &[0xFF], 4).unwrap();
3256        assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
3257        out.clear();
3258        // truncation success (sign bytes only removed)
3259        write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
3260        assert_eq!(out, vec![0xFF, 0x80]);
3261        out.clear();
3262        // truncation overflow
3263        let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
3264        match err {
3265            AvroError::InvalidArgument(_) => {}
3266            _ => panic!("expected InvalidArgument"),
3267        }
3268    }
3269
3270    #[test]
3271    fn duration_month_day_nano_overflow_millis() {
3272        // nanos leading to millis > u32::MAX
3273        let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
3274        let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
3275        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
3276        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Duration, None).unwrap();
3277        let mut out = Vec::new();
3278        let err = enc.encode(&mut out, 0).unwrap_err();
3279        match err {
3280            AvroError::InvalidArgument(msg) => assert!(msg.contains("exceed u32::MAX")),
3281            _ => panic!("expected InvalidArgument"),
3282        }
3283    }
3284
3285    #[test]
3286    fn fieldplan_decimal_precision_scale_mismatch_errors() {
3287        // Avro expects (10,2), Arrow has (12,2)
3288        use crate::codec::Codec;
3289        use std::collections::HashMap;
3290        let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
3291        let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3292        let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
3293        match err {
3294            AvroError::SchemaError(msg) => {
3295                assert!(msg.contains("Decimal precision/scale mismatch"))
3296            }
3297            _ => panic!("expected SchemaError"),
3298        }
3299    }
3300
3301    #[test]
3302    fn timestamp_micros_encoder() {
3303        // Mirrors the style used by `timestamp_millis_encoder`
3304        test_scalar_primitive_encoding::<TimestampMicrosecondType>(
3305            &[
3306                1_704_067_200_000_000, // 2024-01-01T00:00:00Z in micros
3307                0,                     // epoch
3308                -123_456_789,          // pre-epoch
3309            ],
3310            &[None, Some(1_704_067_200_000_000)],
3311        );
3312    }
3313
3314    #[test]
3315    fn list_encoder_nullable_items_null_first() {
3316        // One List row with three elements: [Some(1), None, Some(2)]
3317        let values = Int32Array::from(vec![Some(1), None, Some(2)]);
3318        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
3319        let list = ListArray::new(
3320            Field::new("item", DataType::Int32, true).into(),
3321            offsets,
3322            Arc::new(values) as ArrayRef,
3323            None,
3324        );
3325
3326        let plan = FieldPlan::List {
3327            items_nullability: Some(Nullability::NullFirst),
3328            item_plan: Box::new(FieldPlan::Scalar),
3329        };
3330
3331        // Avro array encoding per row: one positive block, then 0 terminator.
3332        // For NullFirst: Some(v) => branch 1 (0x02) then the value; None => branch 0 (0x00)
3333        let mut expected = Vec::new();
3334        expected.extend(avro_long_bytes(3)); // block of 3
3335        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
3336        expected.extend(avro_long_bytes(1)); // value 1
3337        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
3338        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
3339        expected.extend(avro_long_bytes(2)); // value 2
3340        expected.extend(avro_long_bytes(0)); // block terminator
3341
3342        let got = encode_all(&list, &plan, None);
3343        assert_bytes_eq(&got, &expected);
3344    }
3345
3346    #[test]
3347    fn large_list_encoder_nullable_items_null_first() {
3348        // LargeList single row: [Some(10), None]
3349        let values = Int32Array::from(vec![Some(10), None]);
3350        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
3351        let list = LargeListArray::new(
3352            Field::new("item", DataType::Int32, true).into(),
3353            offsets,
3354            Arc::new(values) as ArrayRef,
3355            None,
3356        );
3357
3358        let plan = FieldPlan::List {
3359            items_nullability: Some(Nullability::NullFirst),
3360            item_plan: Box::new(FieldPlan::Scalar),
3361        };
3362
3363        let mut expected = Vec::new();
3364        expected.extend(avro_long_bytes(2)); // block of 2
3365        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
3366        expected.extend(avro_long_bytes(10)); // value 10
3367        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
3368        expected.extend(avro_long_bytes(0)); // block terminator
3369
3370        let got = encode_all(&list, &plan, None);
3371        assert_bytes_eq(&got, &expected);
3372    }
3373
3374    #[test]
3375    fn map_encoder_string_keys_nullable_int_values_null_first() {
3376        // One map row: {"k1": Some(7), "k2": None}
3377        let keys = StringArray::from(vec!["k1", "k2"]);
3378        let values = Int32Array::from(vec![Some(7), None]);
3379
3380        let entries_fields = Fields::from(vec![
3381            Field::new("key", DataType::Utf8, false),
3382            Field::new("value", DataType::Int32, true),
3383        ]);
3384        let entries = StructArray::new(
3385            entries_fields,
3386            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
3387            None,
3388        );
3389
3390        // Single row -> offsets [0, 2]
3391        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
3392        let map = MapArray::new(
3393            Field::new("entries", entries.data_type().clone(), false).into(),
3394            offsets,
3395            entries,
3396            None,
3397            false,
3398        );
3399
3400        let plan = FieldPlan::Map {
3401            values_nullability: Some(Nullability::NullFirst),
3402            value_plan: Box::new(FieldPlan::Scalar),
3403        };
3404
3405        // Expected:
3406        // - one positive block (len=2)
3407        // - "k1", branch=1 + value=7
3408        // - "k2", branch=0 (null)
3409        // - end-of-block marker 0
3410        let mut expected = Vec::new();
3411        expected.extend(avro_long_bytes(2)); // block length 2
3412        expected.extend(avro_len_prefixed_bytes(b"k1")); // key "k1"
3413        expected.extend(avro_long_bytes(1)); // union branch 1 (value)
3414        expected.extend(avro_long_bytes(7)); // value 7
3415        expected.extend(avro_len_prefixed_bytes(b"k2")); // key "k2"
3416        expected.extend(avro_long_bytes(0)); // union branch 0 (null)
3417        expected.extend(avro_long_bytes(0)); // block terminator
3418
3419        let got = encode_all(&map, &plan, None);
3420        assert_bytes_eq(&got, &expected);
3421    }
3422
3423    #[test]
3424    fn time32_seconds_to_millis_encoder() {
3425        // Time32(Second) must encode as Avro time-millis (ms since midnight).
3426        let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
3427            vec![0i32, 1, -2, 12_345].into();
3428        let got = encode_all(&arr, &FieldPlan::TimeMillisFromSecs, None);
3429        let mut expected = Vec::new();
3430        for secs in [0i32, 1, -2, 12_345] {
3431            let millis = (secs as i64) * 1000;
3432            expected.extend_from_slice(&avro_long_bytes(millis));
3433        }
3434        assert_bytes_eq(&got, &expected);
3435    }
3436
3437    #[test]
3438    fn time32_seconds_to_millis_overflow() {
3439        // Choose a value that will overflow i32 when multiplied by 1000.
3440        let overflow_secs: i32 = i32::MAX / 1000 + 1;
3441        let arr: PrimitiveArray<Time32SecondType> = vec![overflow_secs].into();
3442        let mut enc =
3443            FieldEncoder::make_encoder(&arr, &FieldPlan::TimeMillisFromSecs, None).unwrap();
3444        let mut out = Vec::new();
3445        let err = enc.encode(&mut out, 0).unwrap_err();
3446        match err {
3447            AvroError::InvalidArgument(msg) => {
3448                assert!(
3449                    msg.contains("overflowed") || msg.contains("overflow"),
3450                    "unexpected message: {msg}"
3451                )
3452            }
3453            other => panic!("expected InvalidArgument, got {other:?}"),
3454        }
3455    }
3456
3457    #[test]
3458    fn time32_seconds_to_millis_type_mismatch_returns_schema_error() {
3459        let arr = Int32Array::from(vec![1, 2, 3]);
3460        match FieldEncoder::make_encoder(&arr, &FieldPlan::TimeMillisFromSecs, None) {
3461            Err(AvroError::SchemaError(msg)) => {
3462                assert!(msg.contains("Time32(Second)"), "unexpected message: {msg}");
3463                assert!(msg.contains("Int32"), "unexpected message: {msg}");
3464            }
3465            Ok(_) => panic!("expected SchemaError"),
3466            Err(other) => panic!("expected SchemaError, got {other:?}"),
3467        }
3468    }
3469
3470    #[test]
3471    fn encode_rows_time32_seconds_plan_rejects_millisecond_column() {
3472        let schema = ArrowSchema::new(vec![Field::new(
3473            "t",
3474            DataType::Time32(TimeUnit::Millisecond),
3475            false,
3476        )]);
3477        let arr: PrimitiveArray<Time32MillisecondType> = vec![1i32].into();
3478        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arr)]).unwrap();
3479        let encoder = RecordEncoder {
3480            columns: vec![FieldBinding {
3481                arrow_index: 0,
3482                nullability: None,
3483                plan: FieldPlan::TimeMillisFromSecs,
3484            }],
3485            prefix: None,
3486        };
3487
3488        let mut out = BytesMut::new();
3489        let mut offsets = vec![0usize];
3490        let err = encoder
3491            .encode_rows(&batch, 16, &mut out, &mut offsets)
3492            .unwrap_err();
3493        match err {
3494            AvroError::SchemaError(msg) => {
3495                assert!(msg.contains("Time32(Second)"), "unexpected message: {msg}");
3496                assert!(
3497                    msg.contains("Time32(Millisecond)"),
3498                    "unexpected message: {msg}"
3499                );
3500            }
3501            other => panic!("expected SchemaError, got {other:?}"),
3502        }
3503    }
3504
3505    #[cfg(not(feature = "avro_custom_types"))]
3506    #[test]
3507    fn timestamp_seconds_to_millis_encoder() {
3508        // Timestamp(Second) must encode as Avro timestamp-millis (ms since epoch).
3509        let arr: PrimitiveArray<TimestampSecondType> = vec![0i64, 1, -1, 1_234_567_890].into();
3510        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3511        let mut expected = Vec::new();
3512        for secs in [0i64, 1, -1, 1_234_567_890] {
3513            let millis = secs * 1000;
3514            expected.extend_from_slice(&avro_long_bytes(millis));
3515        }
3516        assert_bytes_eq(&got, &expected);
3517    }
3518
3519    #[cfg(not(feature = "avro_custom_types"))]
3520    #[test]
3521    fn timestamp_seconds_to_millis_overflow() {
3522        // Overflow i64 when multiplied by 1000.
3523        let overflow_secs: i64 = i64::MAX / 1000 + 1;
3524        let arr: PrimitiveArray<TimestampSecondType> = vec![overflow_secs].into();
3525        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
3526        let mut out = Vec::new();
3527        let err = enc.encode(&mut out, 0).unwrap_err();
3528        match err {
3529            AvroError::InvalidArgument(msg) => {
3530                assert!(
3531                    msg.contains("overflowed") || msg.contains("overflow"),
3532                    "unexpected message: {msg}"
3533                )
3534            }
3535            other => panic!("expected InvalidArgument, got {other:?}"),
3536        }
3537    }
3538
3539    #[test]
3540    fn timestamp_nanos_encoder() {
3541        let arr: PrimitiveArray<TimestampNanosecondType> = vec![0i64, 1, -1, 123].into();
3542        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3543        let mut expected = Vec::new();
3544        for ns in [0i64, 1, -1, 123] {
3545            expected.extend_from_slice(&avro_long_bytes(ns));
3546        }
3547        assert_bytes_eq(&got, &expected);
3548    }
3549
3550    #[test]
3551    fn union_encoder_string_int_nonzero_type_ids() {
3552        let strings = StringArray::from(vec!["hello", "world"]);
3553        let ints = Int32Array::from(vec![10, 20, 30]);
3554        let union_fields = UnionFields::try_new(
3555            vec![2, 5],
3556            vec![
3557                Field::new("v_str", DataType::Utf8, true),
3558                Field::new("v_int", DataType::Int32, true),
3559            ],
3560        )
3561        .unwrap();
3562        let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
3563        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
3564        let union_array = UnionArray::try_new(
3565            union_fields,
3566            type_ids.into(),
3567            Some(offsets.into()),
3568            vec![Arc::new(strings), Arc::new(ints)],
3569        )
3570        .unwrap();
3571        let plan = FieldPlan::Union {
3572            bindings: vec![
3573                FieldBinding {
3574                    arrow_index: 0,
3575                    nullability: None,
3576                    plan: FieldPlan::Scalar,
3577                },
3578                FieldBinding {
3579                    arrow_index: 1,
3580                    nullability: None,
3581                    plan: FieldPlan::Scalar,
3582                },
3583            ],
3584        };
3585        let got = encode_all(&union_array, &plan, None);
3586        let mut expected = Vec::new();
3587        expected.extend(avro_long_bytes(0));
3588        expected.extend(avro_len_prefixed_bytes(b"hello"));
3589        expected.extend(avro_long_bytes(1));
3590        expected.extend(avro_long_bytes(10));
3591        expected.extend(avro_long_bytes(1));
3592        expected.extend(avro_long_bytes(20));
3593        expected.extend(avro_long_bytes(0));
3594        expected.extend(avro_len_prefixed_bytes(b"world"));
3595        expected.extend(avro_long_bytes(1));
3596        expected.extend(avro_long_bytes(30));
3597        assert_bytes_eq(&got, &expected);
3598    }
3599
3600    #[test]
3601    fn nullable_state_with_null_buffer_and_zero_nulls() {
3602        let values = vec![1i32, 2, 3];
3603        let arr = Int32Array::from_iter_values_with_nulls(values, Some(NullBuffer::new_valid(3)));
3604        assert_eq!(arr.null_count(), 0);
3605        assert!(arr.nulls().is_some());
3606        let plan = FieldPlan::Scalar;
3607        let enc = FieldEncoder::make_encoder(&arr, &plan, Some(Nullability::NullFirst)).unwrap();
3608        match enc.null_state {
3609            NullState::NullableNoNulls { union_value_byte } => {
3610                assert_eq!(
3611                    union_value_byte,
3612                    union_value_branch_byte(Nullability::NullFirst, false)
3613                );
3614            }
3615            other => panic!("expected NullableNoNulls, got {other:?}"),
3616        }
3617    }
3618
3619    #[test]
3620    fn encode_rows_single_column_int32() {
3621        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3622        let arr = Int32Array::from(vec![1, 2, 3]);
3623        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3624        let encoder = RecordEncoder {
3625            columns: vec![FieldBinding {
3626                arrow_index: 0,
3627                nullability: None,
3628                plan: FieldPlan::Scalar,
3629            }],
3630            prefix: None,
3631        };
3632        let mut out = BytesMut::new();
3633        let mut offsets: Vec<usize> = vec![0];
3634        encoder
3635            .encode_rows(&batch, 16, &mut out, &mut offsets)
3636            .unwrap();
3637        assert_eq!(offsets.len(), 4);
3638        assert_eq!(*offsets.last().unwrap(), out.len());
3639        assert_bytes_eq(row_slice(&out, &offsets, 0), &avro_long_bytes(1));
3640        assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(2));
3641        assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(3));
3642    }
3643
3644    #[test]
3645    fn encode_rows_multiple_columns() {
3646        let schema = ArrowSchema::new(vec![
3647            Field::new("a", DataType::Int32, false),
3648            Field::new("b", DataType::Utf8, false),
3649        ]);
3650        let int_arr = Int32Array::from(vec![10, 20]);
3651        let str_arr = StringArray::from(vec!["hello", "world"]);
3652        let batch = RecordBatch::try_new(
3653            Arc::new(schema.clone()),
3654            vec![Arc::new(int_arr), Arc::new(str_arr)],
3655        )
3656        .unwrap();
3657        let encoder = RecordEncoder {
3658            columns: vec![
3659                FieldBinding {
3660                    arrow_index: 0,
3661                    nullability: None,
3662                    plan: FieldPlan::Scalar,
3663                },
3664                FieldBinding {
3665                    arrow_index: 1,
3666                    nullability: None,
3667                    plan: FieldPlan::Scalar,
3668                },
3669            ],
3670            prefix: None,
3671        };
3672        let mut out = BytesMut::new();
3673        let mut offsets: Vec<usize> = vec![0];
3674        encoder
3675            .encode_rows(&batch, 32, &mut out, &mut offsets)
3676            .unwrap();
3677        assert_eq!(offsets.len(), 3);
3678        assert_eq!(*offsets.last().unwrap(), out.len());
3679        let mut expected_row0 = Vec::new();
3680        expected_row0.extend(avro_long_bytes(10));
3681        expected_row0.extend(avro_len_prefixed_bytes(b"hello"));
3682        assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3683        let mut expected_row1 = Vec::new();
3684        expected_row1.extend(avro_long_bytes(20));
3685        expected_row1.extend(avro_len_prefixed_bytes(b"world"));
3686        assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3687    }
3688
3689    #[test]
3690    fn encode_rows_with_prefix() {
3691        use crate::codec::AvroFieldBuilder;
3692        use crate::schema::AvroSchema;
3693        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3694        let arr = Int32Array::from(vec![42]);
3695        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3696        let avro_schema = AvroSchema::try_from(&schema).unwrap();
3697        let fingerprint = avro_schema
3698            .fingerprint(crate::schema::FingerprintAlgorithm::Rabin)
3699            .unwrap();
3700        let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
3701            .build()
3702            .unwrap();
3703        let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
3704            .with_fingerprint(Some(fingerprint))
3705            .build()
3706            .unwrap();
3707        let mut out = BytesMut::new();
3708        let mut offsets: Vec<usize> = vec![0];
3709        encoder
3710            .encode_rows(&batch, 32, &mut out, &mut offsets)
3711            .unwrap();
3712        assert_eq!(offsets.len(), 2);
3713        let row0 = row_slice(&out, &offsets, 0);
3714        assert!(row0.len() > 10, "Row should contain prefix + encoded value");
3715        assert_eq!(row0[0], 0xC3);
3716        assert_eq!(row0[1], 0x01);
3717    }
3718
3719    #[test]
3720    fn encode_rows_empty_batch() {
3721        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3722        let arr = Int32Array::from(Vec::<i32>::new());
3723        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3724        let encoder = RecordEncoder {
3725            columns: vec![FieldBinding {
3726                arrow_index: 0,
3727                nullability: None,
3728                plan: FieldPlan::Scalar,
3729            }],
3730            prefix: None,
3731        };
3732        let mut out = BytesMut::new();
3733        let mut offsets: Vec<usize> = vec![0];
3734        encoder
3735            .encode_rows(&batch, 16, &mut out, &mut offsets)
3736            .unwrap();
3737        assert_eq!(offsets, vec![0]);
3738        assert!(out.is_empty());
3739    }
3740
3741    #[test]
3742    fn encode_rows_matches_encode_output() {
3743        let schema = ArrowSchema::new(vec![
3744            Field::new("a", DataType::Int64, false),
3745            Field::new("b", DataType::Float64, false),
3746        ]);
3747        let int_arr = Int64Array::from(vec![100i64, 200, 300]);
3748        let float_arr = Float64Array::from(vec![1.5, 2.5, 3.5]);
3749        let batch = RecordBatch::try_new(
3750            Arc::new(schema.clone()),
3751            vec![Arc::new(int_arr), Arc::new(float_arr)],
3752        )
3753        .unwrap();
3754        let encoder = RecordEncoder {
3755            columns: vec![
3756                FieldBinding {
3757                    arrow_index: 0,
3758                    nullability: None,
3759                    plan: FieldPlan::Scalar,
3760                },
3761                FieldBinding {
3762                    arrow_index: 1,
3763                    nullability: None,
3764                    plan: FieldPlan::Scalar,
3765                },
3766            ],
3767            prefix: None,
3768        };
3769        let mut stream_buf = Vec::new();
3770        encoder.encode(&mut stream_buf, &batch).unwrap();
3771        let mut out = BytesMut::new();
3772        let mut offsets: Vec<usize> = vec![0];
3773        encoder
3774            .encode_rows(&batch, 32, &mut out, &mut offsets)
3775            .unwrap();
3776        assert_eq!(offsets.len(), 1 + batch.num_rows());
3777        assert_bytes_eq(&out[..], &stream_buf);
3778    }
3779
3780    #[test]
3781    fn encode_rows_appends_to_existing_buffer() {
3782        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3783        let arr = Int32Array::from(vec![5, 6]);
3784        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3785        let encoder = RecordEncoder {
3786            columns: vec![FieldBinding {
3787                arrow_index: 0,
3788                nullability: None,
3789                plan: FieldPlan::Scalar,
3790            }],
3791            prefix: None,
3792        };
3793        let mut out = BytesMut::new();
3794        out.extend_from_slice(&[0xAA, 0xBB]);
3795        let mut offsets: Vec<usize> = vec![0, out.len()];
3796        encoder
3797            .encode_rows(&batch, 16, &mut out, &mut offsets)
3798            .unwrap();
3799        assert_eq!(offsets.len(), 4);
3800        assert_eq!(*offsets.last().unwrap(), out.len());
3801        assert_bytes_eq(row_slice(&out, &offsets, 0), &[0xAA, 0xBB]);
3802        assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(5));
3803        assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(6));
3804    }
3805
3806    #[test]
3807    fn encode_rows_nullable_column() {
3808        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, true)]);
3809        let arr = Int32Array::from(vec![Some(1), None, Some(3)]);
3810        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3811        let encoder = RecordEncoder {
3812            columns: vec![FieldBinding {
3813                arrow_index: 0,
3814                nullability: Some(Nullability::NullFirst),
3815                plan: FieldPlan::Scalar,
3816            }],
3817            prefix: None,
3818        };
3819        let mut out = BytesMut::new();
3820        let mut offsets: Vec<usize> = vec![0];
3821        encoder
3822            .encode_rows(&batch, 16, &mut out, &mut offsets)
3823            .unwrap();
3824        assert_eq!(offsets.len(), 4);
3825        let mut expected_row0 = Vec::new();
3826        expected_row0.extend(avro_long_bytes(1)); // union branch for value
3827        expected_row0.extend(avro_long_bytes(1)); // value
3828        assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3829        let expected_row1 = avro_long_bytes(0); // union branch for null
3830        assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3831        let mut expected_row2 = Vec::new();
3832        expected_row2.extend(avro_long_bytes(1)); // union branch for value
3833        expected_row2.extend(avro_long_bytes(3)); // value
3834        assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2);
3835    }
3836
3837    #[test]
3838    fn encode_prefix_write_error() {
3839        use crate::codec::AvroFieldBuilder;
3840        use crate::schema::{AvroSchema, FingerprintAlgorithm};
3841        use std::io;
3842
3843        struct FailWriter {
3844            failed: bool,
3845        }
3846
3847        impl io::Write for FailWriter {
3848            fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
3849                if !self.failed {
3850                    self.failed = true;
3851                    Err(io::Error::other("fail write"))
3852                } else {
3853                    Ok(0)
3854                }
3855            }
3856
3857            fn flush(&mut self) -> io::Result<()> {
3858                Ok(())
3859            }
3860        }
3861
3862        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3863        let arr = Int32Array::from(vec![42]);
3864        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3865        let avro_schema = AvroSchema::try_from(&schema).unwrap();
3866        let fingerprint = avro_schema
3867            .fingerprint(FingerprintAlgorithm::Rabin)
3868            .unwrap();
3869        let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
3870            .build()
3871            .unwrap();
3872        let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
3873            .with_fingerprint(Some(fingerprint))
3874            .build()
3875            .unwrap();
3876
3877        let mut writer = FailWriter { failed: false };
3878        let err = encoder.encode(&mut writer, &batch).unwrap_err();
3879        let msg = format!("{err}");
3880        assert!(msg.contains("write prefix"), "unexpected error: {msg}");
3881    }
3882}