Skip to main content

arrow_avro/writer/
encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Encoder for Arrow types.
19
20use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::errors::AvroError;
22use crate::schema::{Fingerprint, Nullability, Prefix};
23use arrow_array::cast::AsArray;
24use arrow_array::types::{
25    ArrowPrimitiveType, Date32Type, DurationMicrosecondType, DurationMillisecondType,
26    DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int16Type, Int32Type,
27    Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
28    Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
29    TimestampMillisecondType,
30};
31use arrow_array::types::{
32    RunEndIndexType, Time32SecondType, TimestampNanosecondType, TimestampSecondType,
33};
34use arrow_array::{
35    Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
36    FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray,
37    GenericListViewArray, GenericStringArray, LargeListArray, LargeListViewArray, ListArray,
38    ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
39    StringViewArray, StructArray, UnionArray,
40};
41#[cfg(feature = "small_decimals")]
42use arrow_array::{Decimal32Array, Decimal64Array};
43use arrow_buffer::{ArrowNativeType, NullBuffer};
44use arrow_schema::{DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode};
45use bytes::{BufMut, BytesMut};
46use std::io::Write;
47use std::sync::Arc;
48use uuid::Uuid;
49
50macro_rules! for_rows_with_prefix {
51    ($n:expr, $prefix:expr, $out:ident, |$row:ident| $body:block) => {{
52        match $prefix {
53            Some(prefix) => {
54                for $row in 0..$n {
55                    $out.write_all(prefix)
56                        .map_err(|e| AvroError::IoError(format!("write prefix: {e}"), e))?;
57                    $body
58                }
59            }
60            None => {
61                for $row in 0..$n {
62                    $body
63                }
64            }
65        }
66    }};
67}
68
69/// Encode a single Avro-`long` using ZigZag + variable length, buffered.
70///
71/// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding>
72#[inline]
73pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), AvroError> {
74    let mut zz = ((value << 1) ^ (value >> 63)) as u64;
75    // At most 10 bytes for 64-bit varint
76    let mut buf = [0u8; 10];
77    let mut i = 0;
78    while (zz & !0x7F) != 0 {
79        buf[i] = ((zz & 0x7F) as u8) | 0x80;
80        i += 1;
81        zz >>= 7;
82    }
83    buf[i] = (zz & 0x7F) as u8;
84    i += 1;
85    out.write_all(&buf[..i])
86        .map_err(|e| AvroError::IoError(format!("write long: {e}"), e))
87}
88
89#[inline]
90fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), AvroError> {
91    write_long(out, value as i64)
92}
93
94#[inline]
95fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), AvroError> {
96    write_long(out, bytes.len() as i64)?;
97    out.write_all(bytes)
98        .map_err(|e| AvroError::IoError(format!("write bytes: {e}"), e))
99}
100
101#[inline]
102fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), AvroError> {
103    out.write_all(&[if v { 1 } else { 0 }])
104        .map_err(|e| AvroError::IoError(format!("write bool: {e}"), e))
105}
106
107/// Minimal two's-complement big-endian representation helper for Avro decimal (bytes).
108///
109/// For positive numbers, trim leading 0x00 until an essential byte is reached.
110/// For negative numbers, trim leading 0xFF until an essential byte is reached.
111/// The resulting slice still encodes the same signed value.
112///
113/// See Avro spec: decimal over `bytes` uses two's-complement big-endian
114/// representation of the unscaled integer value. 1.11.1 specification.
115#[inline]
116fn minimal_twos_complement(be: &[u8]) -> &[u8] {
117    if be.is_empty() {
118        return be;
119    }
120    let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
121    let mut k = 0usize;
122    while k < be.len() && be[k] == sign_byte {
123        k += 1;
124    }
125    if k == 0 {
126        return be;
127    }
128    if k == be.len() {
129        return &be[be.len() - 1..];
130    }
131    let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
132        k
133    } else {
134        k - 1
135    };
136    &be[drop..]
137}
138
139/// Sign-extend (or validate/truncate) big-endian integer bytes to exactly `n` bytes.
140///
141///
142/// - If shorter than `n`, the slice is sign-extended by left-padding with the
143///   sign byte (`0x00` for positive, `0xFF` for negative).
144/// - If longer than `n`, the slice is truncated from the left. An overflow error
145///   is returned if any of the truncated bytes are not redundant sign bytes,
146///   or if the resulting value's sign bit would differ from the original.
147/// - If the slice is already `n` bytes long, it is copied.
148///
149/// Used for encoding Avro decimal values into `fixed(N)` fields.
150#[inline]
151fn write_sign_extended<W: Write + ?Sized>(
152    out: &mut W,
153    src_be: &[u8],
154    n: usize,
155) -> Result<(), AvroError> {
156    let len = src_be.len();
157    if len == n {
158        out.write_all(src_be)?;
159        return Ok(());
160    }
161    let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
162        0xFF
163    } else {
164        0x00
165    };
166    if len > n {
167        let extra = len - n;
168        if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
169            return Ok(());
170        }
171        // All truncated bytes must equal the sign byte, and the MSB of the first
172        // retained byte must match the sign (otherwise overflow).
173        if src_be[..extra].iter().any(|&b| b != sign_byte)
174            || ((src_be[extra] ^ sign_byte) & 0x80) != 0
175        {
176            return Err(AvroError::InvalidArgument(format!(
177                "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
178            )));
179        }
180        return out
181            .write_all(&src_be[extra..])
182            .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e));
183    }
184    // len < n: prepend sign bytes (sign extension) then the payload
185    let pad_len = n - len;
186    // Fixed-size stack pads to avoid heap allocation on the hot path
187    const ZPAD: [u8; 64] = [0x00; 64];
188    const FPAD: [u8; 64] = [0xFF; 64];
189    let pad = if sign_byte == 0x00 {
190        &ZPAD[..]
191    } else {
192        &FPAD[..]
193    };
194    // Emit padding in 64‑byte chunks (minimizes write calls without allocating),
195    // then write the original bytes.
196    let mut rem = pad_len;
197    while rem >= pad.len() {
198        out.write_all(pad)
199            .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))?;
200        rem -= pad.len();
201    }
202    if rem > 0 {
203        out.write_all(&pad[..rem])
204            .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))?;
205    }
206    out.write_all(src_be)
207        .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"), e))
208}
209
210/// Write the union branch index for an optional field.
211///
212/// Branch index is 0-based per Avro unions:
213/// - Null-first (default): null => 0, value => 1
214/// - Null-second (Impala): value => 0, null => 1
215fn write_optional_index<W: Write + ?Sized>(
216    out: &mut W,
217    is_null: bool,
218    null_order: Nullability,
219) -> Result<(), AvroError> {
220    let byte = union_value_branch_byte(null_order, is_null);
221    out.write_all(&[byte])
222        .map_err(|e| AvroError::IoError(format!("write union branch: {e}"), e))
223}
224
225#[derive(Debug, Clone)]
226enum NullState<'a> {
227    NonNullable,
228    NullableNoNulls {
229        union_value_byte: u8,
230    },
231    Nullable {
232        nulls: &'a NullBuffer,
233        null_order: Nullability,
234    },
235}
236
237/// Arrow to Avro FieldEncoder:
238/// - Holds the inner `Encoder` (by value)
239/// - Carries the per-site nullability **state** as a single enum that enforces invariants
240pub(crate) struct FieldEncoder<'a> {
241    encoder: Encoder<'a>,
242    null_state: NullState<'a>,
243}
244
245impl<'a> FieldEncoder<'a> {
246    fn make_encoder(
247        array: &'a dyn Array,
248        plan: &FieldPlan,
249        nullability: Option<Nullability>,
250    ) -> Result<Self, AvroError> {
251        let encoder = match plan {
252            FieldPlan::Scalar => match array.data_type() {
253                DataType::Null => Encoder::Null,
254                DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
255                DataType::Utf8 => {
256                    Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
257                }
258                DataType::LargeUtf8 => {
259                    Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
260                }
261                DataType::Utf8View => {
262                    let arr = array
263                        .as_any()
264                        .downcast_ref::<StringViewArray>()
265                        .ok_or_else(|| AvroError::SchemaError("Expected StringViewArray".into()))?;
266                    Encoder::Utf8View(Utf8ViewEncoder(arr))
267                }
268                DataType::BinaryView => {
269                    let arr = array
270                        .as_any()
271                        .downcast_ref::<BinaryViewArray>()
272                        .ok_or_else(|| AvroError::SchemaError("Expected BinaryViewArray".into()))?;
273                    Encoder::BinaryView(BinaryViewEncoder(arr))
274                }
275                DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
276                DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
277                DataType::Date32 => Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
278                DataType::Date64 => {
279                    return Err(AvroError::NYI(
280                        "Avro logical type 'date' is days since epoch (int). Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a Timestamp."
281                            .into(),
282                    ));
283                }
284                DataType::Time32(TimeUnit::Second) => Encoder::Time32SecsToMillis(
285                    Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
286                ),
287                DataType::Time32(TimeUnit::Millisecond) => {
288                    Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
289                }
290                DataType::Time32(TimeUnit::Microsecond) => {
291                    return Err(AvroError::InvalidArgument(
292                        "Arrow Time32 only supports Second or Millisecond. Use Time64 for microseconds."
293                            .into(),
294                    ));
295                }
296                DataType::Time32(TimeUnit::Nanosecond) => {
297                    return Err(AvroError::InvalidArgument(
298                        "Arrow Time32 only supports Second or Millisecond. Use Time64 for nanoseconds."
299                            .into(),
300                    ));
301                }
302                DataType::Time64(TimeUnit::Microsecond) => Encoder::Time64Micros(LongEncoder(
303                    array.as_primitive::<Time64MicrosecondType>(),
304                )),
305                DataType::Time64(TimeUnit::Nanosecond) => {
306                    return Err(AvroError::NYI(
307                        "Avro writer does not support time-nanos; cast to Time64(Microsecond)."
308                            .into(),
309                    ));
310                }
311                DataType::Time64(TimeUnit::Millisecond) => {
312                    return Err(AvroError::InvalidArgument(
313                        "Arrow Time64 with millisecond unit is not a valid Arrow type (use Time32 for millis)."
314                            .into(),
315                    ));
316                }
317                DataType::Time64(TimeUnit::Second) => {
318                    return Err(AvroError::InvalidArgument(
319                        "Arrow Time64 with second unit is not a valid Arrow type (use Time32 for seconds)."
320                            .into(),
321                    ));
322                }
323                DataType::Float32 => {
324                    Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
325                }
326                DataType::Float64 => {
327                    Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
328                }
329                DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
330                DataType::LargeBinary => {
331                    Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
332                }
333                DataType::FixedSizeBinary(_len) => {
334                    let arr = array
335                        .as_any()
336                        .downcast_ref::<FixedSizeBinaryArray>()
337                        .ok_or_else(|| {
338                            AvroError::SchemaError("Expected FixedSizeBinaryArray".into())
339                        })?;
340                    Encoder::Fixed(FixedEncoder(arr))
341                }
342                DataType::Timestamp(unit, _) => match unit {
343                    TimeUnit::Second => {
344                        Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
345                            array.as_primitive::<TimestampSecondType>(),
346                        ))
347                    }
348                    TimeUnit::Millisecond => Encoder::TimestampMillis(LongEncoder(
349                        array.as_primitive::<TimestampMillisecondType>(),
350                    )),
351                    TimeUnit::Microsecond => Encoder::TimestampMicros(LongEncoder(
352                        array.as_primitive::<TimestampMicrosecondType>(),
353                    )),
354                    TimeUnit::Nanosecond => Encoder::TimestampNanos(LongEncoder(
355                        array.as_primitive::<TimestampNanosecondType>(),
356                    )),
357                },
358                DataType::Interval(unit) => match unit {
359                    IntervalUnit::MonthDayNano => Encoder::IntervalMonthDayNano(DurationEncoder(
360                        array.as_primitive::<IntervalMonthDayNanoType>(),
361                    )),
362                    IntervalUnit::YearMonth => Encoder::IntervalYearMonth(DurationEncoder(
363                        array.as_primitive::<IntervalYearMonthType>(),
364                    )),
365                    IntervalUnit::DayTime => Encoder::IntervalDayTime(DurationEncoder(
366                        array.as_primitive::<IntervalDayTimeType>(),
367                    )),
368                },
369                DataType::Duration(tu) => match tu {
370                    TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
371                        array.as_primitive::<DurationSecondType>(),
372                    )),
373                    TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
374                        array.as_primitive::<DurationMillisecondType>(),
375                    )),
376                    TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
377                        array.as_primitive::<DurationMicrosecondType>(),
378                    )),
379                    TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
380                        array.as_primitive::<DurationNanosecondType>(),
381                    )),
382                },
383                other => {
384                    return Err(AvroError::NYI(format!(
385                        "Avro scalar type not yet supported: {other:?}"
386                    )));
387                }
388            },
389            FieldPlan::Struct { bindings } => {
390                let arr = array
391                    .as_any()
392                    .downcast_ref::<StructArray>()
393                    .ok_or_else(|| AvroError::SchemaError("Expected StructArray".into()))?;
394                Encoder::Struct(Box::new(StructEncoder::try_new(arr, bindings)?))
395            }
396            FieldPlan::List {
397                items_nullability,
398                item_plan,
399            } => match array.data_type() {
400                DataType::List(_) => {
401                    let arr = array
402                        .as_any()
403                        .downcast_ref::<ListArray>()
404                        .ok_or_else(|| AvroError::SchemaError("Expected ListArray".into()))?;
405                    Encoder::List(Box::new(ListEncoder32::try_new(
406                        arr,
407                        *items_nullability,
408                        item_plan.as_ref(),
409                    )?))
410                }
411                DataType::LargeList(_) => {
412                    let arr = array
413                        .as_any()
414                        .downcast_ref::<LargeListArray>()
415                        .ok_or_else(|| AvroError::SchemaError("Expected LargeListArray".into()))?;
416                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
417                        arr,
418                        *items_nullability,
419                        item_plan.as_ref(),
420                    )?))
421                }
422                DataType::ListView(_) => {
423                    let arr = array
424                        .as_any()
425                        .downcast_ref::<ListViewArray>()
426                        .ok_or_else(|| AvroError::SchemaError("Expected ListViewArray".into()))?;
427                    Encoder::ListView(Box::new(ListViewEncoder32::try_new(
428                        arr,
429                        *items_nullability,
430                        item_plan.as_ref(),
431                    )?))
432                }
433                DataType::LargeListView(_) => {
434                    let arr = array
435                        .as_any()
436                        .downcast_ref::<LargeListViewArray>()
437                        .ok_or_else(|| {
438                            AvroError::SchemaError("Expected LargeListViewArray".into())
439                        })?;
440                    Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
441                        arr,
442                        *items_nullability,
443                        item_plan.as_ref(),
444                    )?))
445                }
446                DataType::FixedSizeList(_, _) => {
447                    let arr = array
448                        .as_any()
449                        .downcast_ref::<FixedSizeListArray>()
450                        .ok_or_else(|| {
451                            AvroError::SchemaError("Expected FixedSizeListArray".into())
452                        })?;
453                    Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
454                        arr,
455                        *items_nullability,
456                        item_plan.as_ref(),
457                    )?))
458                }
459                other => {
460                    return Err(AvroError::SchemaError(format!(
461                        "Avro array site requires Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
462                    )));
463                }
464            },
465            FieldPlan::Decimal { size } => match array.data_type() {
466                #[cfg(feature = "small_decimals")]
467                DataType::Decimal32(_, _) => {
468                    let arr = array
469                        .as_any()
470                        .downcast_ref::<Decimal32Array>()
471                        .ok_or_else(|| AvroError::SchemaError("Expected Decimal32Array".into()))?;
472                    Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
473                }
474                #[cfg(feature = "small_decimals")]
475                DataType::Decimal64(_, _) => {
476                    let arr = array
477                        .as_any()
478                        .downcast_ref::<Decimal64Array>()
479                        .ok_or_else(|| AvroError::SchemaError("Expected Decimal64Array".into()))?;
480                    Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
481                }
482                DataType::Decimal128(_, _) => {
483                    let arr = array
484                        .as_any()
485                        .downcast_ref::<Decimal128Array>()
486                        .ok_or_else(|| AvroError::SchemaError("Expected Decimal128Array".into()))?;
487                    Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
488                }
489                DataType::Decimal256(_, _) => {
490                    let arr = array
491                        .as_any()
492                        .downcast_ref::<Decimal256Array>()
493                        .ok_or_else(|| AvroError::SchemaError("Expected Decimal256Array".into()))?;
494                    Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
495                }
496                other => {
497                    return Err(AvroError::SchemaError(format!(
498                        "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
499                    )));
500                }
501            },
502            FieldPlan::Uuid => {
503                let arr = array
504                    .as_any()
505                    .downcast_ref::<FixedSizeBinaryArray>()
506                    .ok_or_else(|| {
507                        AvroError::SchemaError("Expected FixedSizeBinaryArray".into())
508                    })?;
509                Encoder::Uuid(UuidEncoder(arr))
510            }
511            FieldPlan::Map {
512                values_nullability,
513                value_plan,
514            } => {
515                let arr = array
516                    .as_any()
517                    .downcast_ref::<MapArray>()
518                    .ok_or_else(|| AvroError::SchemaError("Expected MapArray".into()))?;
519                Encoder::Map(Box::new(MapEncoder::try_new(
520                    arr,
521                    *values_nullability,
522                    value_plan.as_ref(),
523                )?))
524            }
525            FieldPlan::Enum { symbols } => match array.data_type() {
526                DataType::Dictionary(key_dt, value_dt) => {
527                    if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
528                        return Err(AvroError::SchemaError(
529                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
530                        ));
531                    }
532                    let dict = array
533                        .as_any()
534                        .downcast_ref::<DictionaryArray<Int32Type>>()
535                        .ok_or_else(|| {
536                            AvroError::SchemaError("Expected DictionaryArray<Int32>".into())
537                        })?;
538                    let values = dict
539                        .values()
540                        .as_any()
541                        .downcast_ref::<StringArray>()
542                        .ok_or_else(|| {
543                            AvroError::SchemaError("Dictionary values must be Utf8".into())
544                        })?;
545                    if values.len() != symbols.len() {
546                        return Err(AvroError::SchemaError(format!(
547                            "Enum symbol length {} != dictionary size {}",
548                            symbols.len(),
549                            values.len()
550                        )));
551                    }
552                    for i in 0..values.len() {
553                        if values.value(i) != symbols[i].as_str() {
554                            return Err(AvroError::SchemaError(format!(
555                                "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
556                                symbols[i],
557                                values.value(i)
558                            )));
559                        }
560                    }
561                    let keys = dict.keys();
562                    Encoder::Enum(EnumEncoder { keys })
563                }
564                other => {
565                    return Err(AvroError::SchemaError(format!(
566                        "Avro enum site requires DataType::Dictionary, found: {other:?}"
567                    )));
568                }
569            },
570            FieldPlan::Union { bindings } => {
571                let arr = array
572                    .as_any()
573                    .downcast_ref::<UnionArray>()
574                    .ok_or_else(|| AvroError::SchemaError("Expected UnionArray".into()))?;
575
576                Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
577            }
578            FieldPlan::RunEndEncoded {
579                values_nullability,
580                value_plan,
581            } => {
582                // Helper closure to build a typed RunEncodedEncoder<R>
583                let build = |run_arr_any: &'a dyn Array| -> Result<Encoder<'a>, AvroError> {
584                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
585                        return Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
586                            Int16Type,
587                        >::new(
588                            arr,
589                            FieldEncoder::make_encoder(
590                                arr.values().as_ref(),
591                                value_plan.as_ref(),
592                                *values_nullability,
593                            )?,
594                        ))));
595                    }
596                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
597                        return Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
598                            Int32Type,
599                        >::new(
600                            arr,
601                            FieldEncoder::make_encoder(
602                                arr.values().as_ref(),
603                                value_plan.as_ref(),
604                                *values_nullability,
605                            )?,
606                        ))));
607                    }
608                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
609                        return Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
610                            Int64Type,
611                        >::new(
612                            arr,
613                            FieldEncoder::make_encoder(
614                                arr.values().as_ref(),
615                                value_plan.as_ref(),
616                                *values_nullability,
617                            )?,
618                        ))));
619                    }
620                    Err(AvroError::SchemaError(
621                        "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64"
622                            .into(),
623                    ))
624                };
625                build(array)?
626            }
627        };
628        // Compute the effective null state from writer-declared nullability and data nulls.
629        let null_state = match nullability {
630            None => NullState::NonNullable,
631            Some(null_order) => match array.nulls() {
632                Some(nulls) if array.null_count() > 0 => NullState::Nullable { nulls, null_order },
633                _ => NullState::NullableNoNulls {
634                    // Nullable site with no null buffer for this view
635                    union_value_byte: union_value_branch_byte(null_order, false),
636                },
637            },
638        };
639        Ok(Self {
640            encoder,
641            null_state,
642        })
643    }
644
645    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
646        match &self.null_state {
647            NullState::NonNullable => {}
648            NullState::NullableNoNulls { union_value_byte } => out
649                .write_all(&[*union_value_byte])
650                .map_err(|e| AvroError::IoError(format!("write union value branch: {e}"), e))?,
651            NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
652                return write_optional_index(out, true, *null_order); // no value to write
653            }
654            NullState::Nullable { null_order, .. } => {
655                write_optional_index(out, false, *null_order)?;
656            }
657        }
658        self.encoder.encode(out, idx)
659    }
660}
661
662fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
663    let nulls_first = null_order == Nullability::default();
664    if nulls_first == is_null { 0x00 } else { 0x02 }
665}
666
667/// Per‑site encoder plan for a field. This mirrors the Avro structure, so nested
668/// optional branch order can be honored exactly as declared by the schema.
669#[derive(Debug, Clone)]
670enum FieldPlan {
671    /// Non-nested scalar/logical type
672    Scalar,
673    /// Record/Struct with Avro‑ordered children
674    Struct { bindings: Vec<FieldBinding> },
675    /// Array with item‑site nullability and nested plan
676    List {
677        items_nullability: Option<Nullability>,
678        item_plan: Box<FieldPlan>,
679    },
680    /// Avro decimal logical type (bytes or fixed). `size=None` => bytes(decimal), `Some(n)` => fixed(n)
681    Decimal { size: Option<usize> },
682    /// Avro UUID logical type (fixed)
683    Uuid,
684    /// Avro map with value‑site nullability and nested plan
685    Map {
686        values_nullability: Option<Nullability>,
687        value_plan: Box<FieldPlan>,
688    },
689    /// Avro enum; maps to Arrow Dictionary<Int32, Utf8> with dictionary values
690    /// exactly equal and ordered as the Avro enum `symbols`.
691    Enum { symbols: Arc<[String]> },
692    /// Avro union, maps to Arrow Union.
693    Union { bindings: Vec<FieldBinding> },
694    /// Avro RunEndEncoded site. Values are encoded per logical row by mapping the
695    /// row index to its containing run and emitting that run's value with `value_plan`.
696    RunEndEncoded {
697        values_nullability: Option<Nullability>,
698        value_plan: Box<FieldPlan>,
699    },
700}
701
702#[derive(Debug, Clone)]
703struct FieldBinding {
704    /// Index of the Arrow field/column associated with this Avro field site
705    arrow_index: usize,
706    /// Nullability/order for this site (None for required fields)
707    nullability: Option<Nullability>,
708    /// Nested plan for this site
709    plan: FieldPlan,
710}
711
712/// Builder for `RecordEncoder` write plan
713#[derive(Debug)]
714pub(crate) struct RecordEncoderBuilder<'a> {
715    avro_root: &'a AvroField,
716    arrow_schema: &'a ArrowSchema,
717    fingerprint: Option<Fingerprint>,
718}
719
720impl<'a> RecordEncoderBuilder<'a> {
721    /// Create a new builder from the Avro root and Arrow schema.
722    pub(crate) fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
723        Self {
724            avro_root,
725            arrow_schema,
726            fingerprint: None,
727        }
728    }
729
730    pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
731        self.fingerprint = fingerprint;
732        self
733    }
734
735    /// Build the `RecordEncoder` by walking the Avro **record** root in Avro order,
736    /// resolving each field to an Arrow index by name.
737    pub(crate) fn build(self) -> Result<RecordEncoder, AvroError> {
738        let avro_root_dt = self.avro_root.data_type();
739        let Codec::Struct(root_fields) = avro_root_dt.codec() else {
740            return Err(AvroError::SchemaError(
741                "Top-level Avro schema must be a record/struct".into(),
742            ));
743        };
744        let mut columns = Vec::with_capacity(root_fields.len());
745        for root_field in root_fields.as_ref() {
746            let name = root_field.name();
747            let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
748                AvroError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
749            })?;
750            columns.push(FieldBinding {
751                arrow_index,
752                nullability: root_field.data_type().nullability(),
753                plan: FieldPlan::build(
754                    root_field.data_type(),
755                    self.arrow_schema.field(arrow_index),
756                )?,
757            });
758        }
759        Ok(RecordEncoder {
760            columns,
761            prefix: self.fingerprint.map(|fp| fp.make_prefix()),
762        })
763    }
764}
765
766/// A pre-computed plan for encoding a `RecordBatch` to Avro.
767///
768/// Derived from an Avro schema and an Arrow schema. It maps
769/// top-level Avro fields to Arrow columns and contains a nested encoding plan
770/// for each column.
771#[derive(Debug, Clone)]
772pub(crate) struct RecordEncoder {
773    columns: Vec<FieldBinding>,
774    /// Optional pre-built, variable-length prefix written before each record.
775    prefix: Option<Prefix>,
776}
777
778impl RecordEncoder {
779    fn prepare_for_batch<'a>(
780        &'a self,
781        batch: &'a RecordBatch,
782    ) -> Result<Vec<FieldEncoder<'a>>, AvroError> {
783        let arrays = batch.columns();
784        let mut out = Vec::with_capacity(self.columns.len());
785        for col_plan in self.columns.iter() {
786            let arrow_index = col_plan.arrow_index;
787            let array = arrays.get(arrow_index).ok_or_else(|| {
788                AvroError::SchemaError(format!("Column index {arrow_index} out of range"))
789            })?;
790            #[cfg(not(feature = "avro_custom_types"))]
791            let site_nullability = match &col_plan.plan {
792                FieldPlan::RunEndEncoded { .. } => None,
793                _ => col_plan.nullability,
794            };
795            #[cfg(feature = "avro_custom_types")]
796            let site_nullability = col_plan.nullability;
797            out.push(FieldEncoder::make_encoder(
798                array.as_ref(),
799                &col_plan.plan,
800                site_nullability,
801            )?);
802        }
803        Ok(out)
804    }
805
806    /// Encode a `RecordBatch` using this encoder plan.
807    ///
808    /// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of many small writes.
809    pub(crate) fn encode<W: Write>(
810        &self,
811        out: &mut W,
812        batch: &RecordBatch,
813    ) -> Result<(), AvroError> {
814        let mut column_encoders = self.prepare_for_batch(batch)?;
815        let n = batch.num_rows();
816        let prefix = self.prefix.as_ref().map(|p| p.as_slice());
817        for_rows_with_prefix!(n, prefix, out, |row| {
818            for enc in column_encoders.iter_mut() {
819                enc.encode(out, row)?;
820            }
821        });
822        Ok(())
823    }
824
825    /// Encode rows into a single contiguous `BytesMut` and append row-end offsets.
826    ///
827    /// # Invariants
828    ///
829    /// * `offsets` must be non-empty and seeded with `0` at index 0.
830    /// * `offsets.last()` must equal `out.len()` on entry.
831    /// * On success, exactly `batch.num_rows()` additional offsets are pushed, and
832    ///   `offsets.last()` equals the new `out.len()`.
833    pub(crate) fn encode_rows(
834        &self,
835        batch: &RecordBatch,
836        row_capacity: usize,
837        out: &mut BytesMut,
838        offsets: &mut Vec<usize>,
839    ) -> Result<(), AvroError> {
840        let out_len = out.len();
841        if offsets.first() != Some(&0) || offsets.last() != Some(&out_len) {
842            return Err(AvroError::General(
843                "encode_rows requires offsets to start with 0 and end at out.len()".to_string(),
844            ));
845        }
846        let n = batch.num_rows();
847        if n == 0 {
848            return Ok(());
849        }
850        if offsets.len().checked_add(n).is_none() {
851            return Err(AvroError::General(
852                "encode_rows cannot append offsets: too many rows".to_string(),
853            ));
854        }
855        let mut column_encoders = self.prepare_for_batch(batch)?;
856        offsets.reserve(n);
857        let prefix_bytes = self.prefix.as_ref().map(|p| p.as_slice());
858        let prefix_len = prefix_bytes.map_or(0, |p| p.len());
859        let per_row_hint = row_capacity.max(prefix_len);
860        if let Some(additional) = n
861            .checked_mul(per_row_hint)
862            .filter(|&a| out_len.checked_add(a).is_some())
863        {
864            out.reserve(additional);
865        }
866        let start_out_len = out.len();
867        let start_offsets_len = offsets.len();
868        let res = (|| -> Result<(), AvroError> {
869            let mut w = out.writer();
870            if let [enc0] = column_encoders.as_mut_slice() {
871                for_rows_with_prefix!(n, prefix_bytes, w, |row| {
872                    enc0.encode(&mut w, row)?;
873                    offsets.push(w.get_ref().len());
874                });
875            } else {
876                for_rows_with_prefix!(n, prefix_bytes, w, |row| {
877                    for enc in column_encoders.iter_mut() {
878                        enc.encode(&mut w, row)?;
879                    }
880                    offsets.push(w.get_ref().len());
881                });
882            }
883            Ok(())
884        })();
885        if res.is_err() {
886            out.truncate(start_out_len);
887            offsets.truncate(start_offsets_len);
888        } else {
889            debug_assert_eq!(
890                *offsets.last().unwrap(),
891                out.len(),
892                "encode_rows: offsets/out length mismatch after successful encode"
893            );
894        }
895        res
896    }
897}
898
899fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
900    fields.iter().position(|f| f.name() == name)
901}
902
903fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
904    // Prefer common Arrow field names; fall back to second child if exactly two
905    find_struct_child_index(fields, "value")
906        .or_else(|| find_struct_child_index(fields, "values"))
907        .or_else(|| if fields.len() == 2 { Some(1) } else { None })
908}
909
910impl FieldPlan {
911    fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, AvroError> {
912        #[cfg(not(feature = "avro_custom_types"))]
913        if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() {
914            let values_nullability = avro_dt.nullability();
915            let value_site_dt: &AvroDataType = match avro_dt.codec() {
916                Codec::Union(branches, _, _) => branches
917                    .iter()
918                    .find(|b| !matches!(b.codec(), Codec::Null))
919                    .ok_or_else(|| {
920                        AvroError::SchemaError(
921                            "Avro union at RunEndEncoded site has no non-null branch".into(),
922                        )
923                    })?,
924                _ => avro_dt,
925            };
926            return Ok(FieldPlan::RunEndEncoded {
927                values_nullability,
928                value_plan: Box::new(FieldPlan::build(value_site_dt, values_field.as_ref())?),
929            });
930        }
931        if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
932            // Extension-based detection (only when the feature is enabled)
933            let ext_is_uuid = {
934                #[cfg(feature = "canonical_extension_types")]
935                {
936                    matches!(
937                        arrow_field.extension_type_name(),
938                        Some("arrow.uuid") | Some("uuid")
939                    )
940                }
941                #[cfg(not(feature = "canonical_extension_types"))]
942                {
943                    false
944                }
945            };
946            let md_is_uuid = arrow_field
947                .metadata()
948                .get("logicalType")
949                .map(|s| s.as_str())
950                == Some("uuid");
951            if ext_is_uuid || md_is_uuid {
952                if *len != 16 {
953                    return Err(AvroError::InvalidArgument(
954                        "logicalType=uuid requires FixedSizeBinary(16)".into(),
955                    ));
956                }
957                return Ok(FieldPlan::Uuid);
958            }
959        }
960        match avro_dt.codec() {
961            Codec::Struct(avro_fields) => {
962                let fields = match arrow_field.data_type() {
963                    DataType::Struct(struct_fields) => struct_fields,
964                    other => {
965                        return Err(AvroError::SchemaError(format!(
966                            "Avro struct maps to Arrow Struct, found: {other:?}"
967                        )));
968                    }
969                };
970                let mut bindings = Vec::with_capacity(avro_fields.len());
971                for avro_field in avro_fields.iter() {
972                    let name = avro_field.name().to_string();
973                    let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
974                        AvroError::SchemaError(format!(
975                            "Struct field '{name}' not present in Arrow field '{}'",
976                            arrow_field.name()
977                        ))
978                    })?;
979                    bindings.push(FieldBinding {
980                        arrow_index: idx,
981                        nullability: avro_field.data_type().nullability(),
982                        plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
983                    });
984                }
985                Ok(FieldPlan::Struct { bindings })
986            }
987            Codec::List(items_dt) => match arrow_field.data_type() {
988                DataType::List(field_ref)
989                | DataType::LargeList(field_ref)
990                | DataType::ListView(field_ref)
991                | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
992                    items_nullability: items_dt.nullability(),
993                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
994                }),
995                DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List {
996                    items_nullability: items_dt.nullability(),
997                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
998                }),
999                other => Err(AvroError::SchemaError(format!(
1000                    "Avro array maps to Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
1001                ))),
1002            },
1003            Codec::Map(values_dt) => {
1004                let entries_field = match arrow_field.data_type() {
1005                    DataType::Map(entries, _sorted) => entries.as_ref(),
1006                    other => {
1007                        return Err(AvroError::SchemaError(format!(
1008                            "Avro map maps to Arrow DataType::Map, found: {other:?}"
1009                        )));
1010                    }
1011                };
1012                let entries_struct_fields = match entries_field.data_type() {
1013                    DataType::Struct(fs) => fs,
1014                    other => {
1015                        return Err(AvroError::SchemaError(format!(
1016                            "Arrow Map entries must be Struct, found: {other:?}"
1017                        )));
1018                    }
1019                };
1020                let value_idx =
1021                    find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
1022                        AvroError::SchemaError("Map entries struct missing value field".into())
1023                    })?;
1024                let value_field = entries_struct_fields[value_idx].as_ref();
1025                let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
1026                Ok(FieldPlan::Map {
1027                    values_nullability: values_dt.nullability(),
1028                    value_plan: Box::new(value_plan),
1029                })
1030            }
1031            Codec::Enum(symbols) => match arrow_field.data_type() {
1032                DataType::Dictionary(key_dt, value_dt) => {
1033                    if **key_dt != DataType::Int32 {
1034                        return Err(AvroError::SchemaError(
1035                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
1036                        ));
1037                    }
1038                    if **value_dt != DataType::Utf8 {
1039                        return Err(AvroError::SchemaError(
1040                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
1041                        ));
1042                    }
1043                    Ok(FieldPlan::Enum {
1044                        symbols: symbols.clone(),
1045                    })
1046                }
1047                other => Err(AvroError::SchemaError(format!(
1048                    "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
1049                ))),
1050            },
1051            // decimal site (bytes or fixed(N)) with precision/scale validation
1052            Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
1053                let (ap, as_) = match arrow_field.data_type() {
1054                    #[cfg(feature = "small_decimals")]
1055                    DataType::Decimal32(p, s) => (*p as usize, *s as i32),
1056                    #[cfg(feature = "small_decimals")]
1057                    DataType::Decimal64(p, s) => (*p as usize, *s as i32),
1058                    DataType::Decimal128(p, s) => (*p as usize, *s as i32),
1059                    DataType::Decimal256(p, s) => (*p as usize, *s as i32),
1060                    other => {
1061                        return Err(AvroError::SchemaError(format!(
1062                            "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
1063                            arrow_field.name()
1064                        )));
1065                    }
1066                };
1067                let sc = scale_opt.unwrap_or(0) as i32; // Avro scale defaults to 0 if absent
1068                if ap != *precision || as_ != sc {
1069                    return Err(AvroError::SchemaError(format!(
1070                        "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
1071                        arrow_field.name()
1072                    )));
1073                }
1074                Ok(FieldPlan::Decimal {
1075                    size: *fixed_size_opt,
1076                })
1077            }
1078            Codec::Interval => match arrow_field.data_type() {
1079                DataType::Interval(
1080                    IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime,
1081                ) => Ok(FieldPlan::Scalar),
1082                other => Err(AvroError::SchemaError(format!(
1083                    "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}"
1084                ))),
1085            },
1086            Codec::Union(avro_branches, _, UnionMode::Dense) => {
1087                let arrow_union_fields = match arrow_field.data_type() {
1088                    DataType::Union(fields, UnionMode::Dense) => fields,
1089                    DataType::Union(_, UnionMode::Sparse) => {
1090                        return Err(AvroError::NYI(
1091                            "Sparse Arrow unions are not yet supported".to_string(),
1092                        ));
1093                    }
1094                    other => {
1095                        return Err(AvroError::SchemaError(format!(
1096                            "Avro union maps to Arrow Union, found: {other:?}"
1097                        )));
1098                    }
1099                };
1100                if avro_branches.len() != arrow_union_fields.len() {
1101                    return Err(AvroError::SchemaError(format!(
1102                        "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'",
1103                        avro_branches.len(),
1104                        arrow_union_fields.len(),
1105                        arrow_field.name()
1106                    )));
1107                }
1108                let bindings = avro_branches
1109                    .iter()
1110                    .zip(arrow_union_fields.iter())
1111                    .enumerate()
1112                    .map(|(i, (avro_branch, (_, arrow_child_field)))| {
1113                        Ok(FieldBinding {
1114                            arrow_index: i,
1115                            nullability: avro_branch.nullability(),
1116                            plan: FieldPlan::build(avro_branch, arrow_child_field)?,
1117                        })
1118                    })
1119                    .collect::<Result<Vec<_>, AvroError>>()?;
1120                Ok(FieldPlan::Union { bindings })
1121            }
1122            Codec::Union(_, _, UnionMode::Sparse) => Err(AvroError::NYI(
1123                "Sparse Arrow unions are not yet supported".to_string(),
1124            )),
1125            #[cfg(feature = "avro_custom_types")]
1126            Codec::RunEndEncoded(values_dt, _width_code) => {
1127                let values_field = match arrow_field.data_type() {
1128                    DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(),
1129                    other => {
1130                        return Err(AvroError::SchemaError(format!(
1131                            "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}"
1132                        )));
1133                    }
1134                };
1135                Ok(FieldPlan::RunEndEncoded {
1136                    values_nullability: values_dt.nullability(),
1137                    value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), values_field)?),
1138                })
1139            }
1140            _ => Ok(FieldPlan::Scalar),
1141        }
1142    }
1143}
1144
1145enum Encoder<'a> {
1146    Boolean(BooleanEncoder<'a>),
1147    Int(IntEncoder<'a, Int32Type>),
1148    Long(LongEncoder<'a, Int64Type>),
1149    TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
1150    TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
1151    TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
1152    TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
1153    Date32(IntEncoder<'a, Date32Type>),
1154    Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
1155    Time32Millis(IntEncoder<'a, Time32MillisecondType>),
1156    Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
1157    DurationSeconds(LongEncoder<'a, DurationSecondType>),
1158    DurationMillis(LongEncoder<'a, DurationMillisecondType>),
1159    DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
1160    DurationNanos(LongEncoder<'a, DurationNanosecondType>),
1161    Float32(F32Encoder<'a>),
1162    Float64(F64Encoder<'a>),
1163    Binary(BinaryEncoder<'a, i32>),
1164    LargeBinary(BinaryEncoder<'a, i64>),
1165    Utf8(Utf8Encoder<'a>),
1166    Utf8Large(Utf8LargeEncoder<'a>),
1167    Utf8View(Utf8ViewEncoder<'a>),
1168    BinaryView(BinaryViewEncoder<'a>),
1169    List(Box<ListEncoder32<'a>>),
1170    LargeList(Box<ListEncoder64<'a>>),
1171    ListView(Box<ListViewEncoder32<'a>>),
1172    LargeListView(Box<ListViewEncoder64<'a>>),
1173    FixedSizeList(Box<FixedSizeListEncoder<'a>>),
1174    Struct(Box<StructEncoder<'a>>),
1175    /// Avro `fixed` encoder (raw bytes, no length)
1176    Fixed(FixedEncoder<'a>),
1177    /// Avro `uuid` logical type encoder (string with RFC‑4122 hyphenated text)
1178    Uuid(UuidEncoder<'a>),
1179    /// Avro `duration` logical type (Arrow Interval(MonthDayNano)) encoder
1180    IntervalMonthDayNano(DurationEncoder<'a, IntervalMonthDayNanoType>),
1181    /// Avro `duration` logical type (Arrow Interval(YearMonth)) encoder
1182    IntervalYearMonth(DurationEncoder<'a, IntervalYearMonthType>),
1183    /// Avro `duration` logical type (Arrow Interval(DayTime)) encoder
1184    IntervalDayTime(DurationEncoder<'a, IntervalDayTimeType>),
1185    #[cfg(feature = "small_decimals")]
1186    Decimal32(Decimal32Encoder<'a>),
1187    #[cfg(feature = "small_decimals")]
1188    Decimal64(Decimal64Encoder<'a>),
1189    Decimal128(Decimal128Encoder<'a>),
1190    Decimal256(Decimal256Encoder<'a>),
1191    /// Avro `enum` encoder: writes the key (int) as the enum index.
1192    Enum(EnumEncoder<'a>),
1193    Map(Box<MapEncoder<'a>>),
1194    Union(Box<UnionEncoder<'a>>),
1195    /// Run-end encoded values with specific run-end index widths
1196    RunEncoded16(Box<RunEncodedEncoder16<'a>>),
1197    RunEncoded32(Box<RunEncodedEncoder32<'a>>),
1198    RunEncoded64(Box<RunEncodedEncoder64<'a>>),
1199    Null,
1200}
1201
1202impl<'a> Encoder<'a> {
1203    /// Encode the value at `idx`.
1204    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1205        match self {
1206            Encoder::Boolean(e) => e.encode(out, idx),
1207            Encoder::Int(e) => e.encode(out, idx),
1208            Encoder::Long(e) => e.encode(out, idx),
1209            Encoder::TimestampMicros(e) => e.encode(out, idx),
1210            Encoder::TimestampMillis(e) => e.encode(out, idx),
1211            Encoder::TimestampNanos(e) => e.encode(out, idx),
1212            Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
1213            Encoder::Date32(e) => e.encode(out, idx),
1214            Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
1215            Encoder::Time32Millis(e) => e.encode(out, idx),
1216            Encoder::Time64Micros(e) => e.encode(out, idx),
1217            Encoder::DurationSeconds(e) => e.encode(out, idx),
1218            Encoder::DurationMicros(e) => e.encode(out, idx),
1219            Encoder::DurationMillis(e) => e.encode(out, idx),
1220            Encoder::DurationNanos(e) => e.encode(out, idx),
1221            Encoder::Float32(e) => e.encode(out, idx),
1222            Encoder::Float64(e) => e.encode(out, idx),
1223            Encoder::Binary(e) => e.encode(out, idx),
1224            Encoder::LargeBinary(e) => e.encode(out, idx),
1225            Encoder::Utf8(e) => e.encode(out, idx),
1226            Encoder::Utf8Large(e) => e.encode(out, idx),
1227            Encoder::Utf8View(e) => e.encode(out, idx),
1228            Encoder::BinaryView(e) => e.encode(out, idx),
1229            Encoder::List(e) => e.encode(out, idx),
1230            Encoder::LargeList(e) => e.encode(out, idx),
1231            Encoder::ListView(e) => e.encode(out, idx),
1232            Encoder::LargeListView(e) => e.encode(out, idx),
1233            Encoder::FixedSizeList(e) => e.encode(out, idx),
1234            Encoder::Struct(e) => e.encode(out, idx),
1235            Encoder::Fixed(e) => (e).encode(out, idx),
1236            Encoder::Uuid(e) => (e).encode(out, idx),
1237            Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx),
1238            Encoder::IntervalYearMonth(e) => (e).encode(out, idx),
1239            Encoder::IntervalDayTime(e) => (e).encode(out, idx),
1240            #[cfg(feature = "small_decimals")]
1241            Encoder::Decimal32(e) => (e).encode(out, idx),
1242            #[cfg(feature = "small_decimals")]
1243            Encoder::Decimal64(e) => (e).encode(out, idx),
1244            Encoder::Decimal128(e) => (e).encode(out, idx),
1245            Encoder::Decimal256(e) => (e).encode(out, idx),
1246            Encoder::Map(e) => (e).encode(out, idx),
1247            Encoder::Enum(e) => (e).encode(out, idx),
1248            Encoder::Union(e) => (e).encode(out, idx),
1249            Encoder::RunEncoded16(e) => (e).encode(out, idx),
1250            Encoder::RunEncoded32(e) => (e).encode(out, idx),
1251            Encoder::RunEncoded64(e) => (e).encode(out, idx),
1252            Encoder::Null => Ok(()),
1253        }
1254    }
1255}
1256
1257struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
1258impl BooleanEncoder<'_> {
1259    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1260        write_bool(out, self.0.value(idx))
1261    }
1262}
1263
1264/// Generic Avro `int` encoder for primitive arrays with `i32` native values.
1265struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
1266impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
1267    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1268        write_int(out, self.0.value(idx))
1269    }
1270}
1271
1272/// Generic Avro `long` encoder for primitive arrays with `i64` native values.
1273struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
1274impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
1275    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1276        write_long(out, self.0.value(idx))
1277    }
1278}
1279
1280/// Time32(Second) to Avro time-millis (int), via safe scaling by 1000
1281struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
1282impl<'a> Time32SecondsToMillisEncoder<'a> {
1283    #[inline]
1284    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1285        let secs = self.0.value(idx);
1286        let millis = secs
1287            .checked_mul(1000)
1288            .ok_or_else(|| AvroError::InvalidArgument("time32(secs) * 1000 overflowed".into()))?;
1289        write_int(out, millis)
1290    }
1291}
1292
1293/// Timestamp(Second) to Avro timestamp-millis (long), via safe scaling by 1000
1294struct TimestampSecondsToMillisEncoder<'a>(&'a PrimitiveArray<TimestampSecondType>);
1295impl<'a> TimestampSecondsToMillisEncoder<'a> {
1296    #[inline]
1297    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1298        let secs = self.0.value(idx);
1299        let millis = secs.checked_mul(1000).ok_or_else(|| {
1300            AvroError::InvalidArgument("timestamp(secs) * 1000 overflowed".into())
1301        })?;
1302        write_long(out, millis)
1303    }
1304}
1305
1306/// Unified binary encoder generic over offset size (i32/i64).
1307struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
1308impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
1309    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1310        write_len_prefixed(out, self.0.value(idx))
1311    }
1312}
1313
1314/// BinaryView (byte view) encoder.
1315struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
1316impl BinaryViewEncoder<'_> {
1317    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1318        write_len_prefixed(out, self.0.value(idx))
1319    }
1320}
1321
1322/// StringView encoder.
1323struct Utf8ViewEncoder<'a>(&'a StringViewArray);
1324impl Utf8ViewEncoder<'_> {
1325    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1326        write_len_prefixed(out, self.0.value(idx).as_bytes())
1327    }
1328}
1329
1330struct F32Encoder<'a>(&'a arrow_array::Float32Array);
1331impl F32Encoder<'_> {
1332    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1333        // Avro float: 4 bytes, IEEE-754 little-endian
1334        let bits = self.0.value(idx).to_bits();
1335        out.write_all(&bits.to_le_bytes())?;
1336        Ok(())
1337    }
1338}
1339
1340struct F64Encoder<'a>(&'a arrow_array::Float64Array);
1341impl F64Encoder<'_> {
1342    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1343        // Avro double: 8 bytes, IEEE-754 little-endian
1344        let bits = self.0.value(idx).to_bits();
1345        out.write_all(&bits.to_le_bytes()).map_err(Into::into)
1346    }
1347}
1348
1349struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
1350
1351impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
1352    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1353        write_len_prefixed(out, self.0.value(idx).as_bytes())
1354    }
1355}
1356
1357type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
1358type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
1359
1360/// Internal key array kind used by Map encoder.
1361enum KeyKind<'a> {
1362    Utf8(&'a GenericStringArray<i32>),
1363    LargeUtf8(&'a GenericStringArray<i64>),
1364}
1365struct MapEncoder<'a> {
1366    map: &'a MapArray,
1367    keys: KeyKind<'a>,
1368    values: FieldEncoder<'a>,
1369    keys_offset: usize,
1370    values_offset: usize,
1371}
1372
1373impl<'a> MapEncoder<'a> {
1374    fn try_new(
1375        map: &'a MapArray,
1376        values_nullability: Option<Nullability>,
1377        value_plan: &FieldPlan,
1378    ) -> Result<Self, AvroError> {
1379        let keys_arr = map.keys();
1380        let keys_kind = match keys_arr.data_type() {
1381            DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
1382            DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
1383            other => {
1384                return Err(AvroError::SchemaError(format!(
1385                    "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
1386                )));
1387            }
1388        };
1389        Ok(Self {
1390            map,
1391            keys: keys_kind,
1392            values: FieldEncoder::make_encoder(
1393                map.values().as_ref(),
1394                value_plan,
1395                values_nullability,
1396            )?,
1397            keys_offset: keys_arr.offset(),
1398            values_offset: map.values().offset(),
1399        })
1400    }
1401
1402    fn encode_map_entries<W, O>(
1403        out: &mut W,
1404        keys: &GenericStringArray<O>,
1405        keys_offset: usize,
1406        start: usize,
1407        end: usize,
1408        mut write_item: impl FnMut(&mut W, usize) -> Result<(), AvroError>,
1409    ) -> Result<(), AvroError>
1410    where
1411        W: Write + ?Sized,
1412        O: OffsetSizeTrait,
1413    {
1414        encode_blocked_range(out, start, end, |out, j| {
1415            let j_key = j.saturating_sub(keys_offset);
1416            write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1417            write_item(out, j)
1418        })
1419    }
1420
1421    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1422        let offsets = self.map.offsets();
1423        let start = offsets[idx] as usize;
1424        let end = offsets[idx + 1] as usize;
1425        let write_item = |out: &mut W, j: usize| {
1426            let j_val = j.saturating_sub(self.values_offset);
1427            self.values.encode(out, j_val)
1428        };
1429        match self.keys {
1430            KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1431                out,
1432                arr,
1433                self.keys_offset,
1434                start,
1435                end,
1436                write_item,
1437            ),
1438            KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1439                out,
1440                arr,
1441                self.keys_offset,
1442                start,
1443                end,
1444                write_item,
1445            ),
1446        }
1447    }
1448}
1449
1450/// Avro `enum` encoder for Arrow `DictionaryArray<Int32, Utf8>`.
1451///
1452/// Per Avro spec, an enum is encoded as an **int** equal to the
1453/// zero-based position of the symbol in the schema’s `symbols` list.
1454/// We validate at construction that the dictionary values equal the symbols,
1455/// so we can directly write the key value here.
1456struct EnumEncoder<'a> {
1457    keys: &'a PrimitiveArray<Int32Type>,
1458}
1459impl EnumEncoder<'_> {
1460    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), AvroError> {
1461        write_int(out, self.keys.value(row))
1462    }
1463}
1464
1465struct UnionEncoder<'a> {
1466    encoders: Vec<FieldEncoder<'a>>,
1467    array: &'a UnionArray,
1468    type_id_to_encoder_index: Vec<Option<usize>>,
1469}
1470
1471impl<'a> UnionEncoder<'a> {
1472    fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result<Self, AvroError> {
1473        let DataType::Union(fields, UnionMode::Dense) = array.data_type() else {
1474            return Err(AvroError::SchemaError("Expected Dense UnionArray".into()));
1475        };
1476        if fields.len() != field_bindings.len() {
1477            return Err(AvroError::SchemaError(format!(
1478                "Mismatched number of union branches between Arrow array ({}) and encoding plan ({})",
1479                fields.len(),
1480                field_bindings.len()
1481            )));
1482        }
1483        let max_type_id = fields.iter().map(|(tid, _)| tid).max().unwrap_or(0);
1484        let mut type_id_to_encoder_index: Vec<Option<usize>> =
1485            vec![None; (max_type_id + 1) as usize];
1486        let mut encoders = Vec::with_capacity(fields.len());
1487        for (i, (type_id, _)) in fields.iter().enumerate() {
1488            let binding = field_bindings
1489                .get(i)
1490                .ok_or_else(|| AvroError::SchemaError("Binding and field mismatch".to_string()))?;
1491            encoders.push(FieldEncoder::make_encoder(
1492                array.child(type_id).as_ref(),
1493                &binding.plan,
1494                binding.nullability,
1495            )?);
1496            type_id_to_encoder_index[type_id as usize] = Some(i);
1497        }
1498        Ok(Self {
1499            encoders,
1500            array,
1501            type_id_to_encoder_index,
1502        })
1503    }
1504
1505    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1506        // SAFETY: `idx` is always in bounds because:
1507        // 1. The encoder is called from `RecordEncoder::encode,` which iterates over `0..batch.num_rows()`
1508        // 2. `self.array` is a column from the same batch, so its length equals `batch.num_rows()`
1509        // 3. `type_ids()` returns a buffer with exactly `self.array.len()` entries (one per logical element)
1510        let type_id = self.array.type_ids()[idx];
1511        let encoder_index = self
1512            .type_id_to_encoder_index
1513            .get(type_id as usize)
1514            .and_then(|opt| *opt)
1515            .ok_or_else(|| AvroError::SchemaError(format!("Invalid type_id {type_id}")))?;
1516        write_int(out, encoder_index as i32)?;
1517        let encoder = self.encoders.get_mut(encoder_index).ok_or_else(|| {
1518            AvroError::SchemaError(format!("Invalid encoder index {encoder_index}"))
1519        })?;
1520        encoder.encode(out, self.array.value_offset(idx))
1521    }
1522}
1523
1524struct StructEncoder<'a> {
1525    encoders: Vec<FieldEncoder<'a>>,
1526}
1527
1528impl<'a> StructEncoder<'a> {
1529    fn try_new(array: &'a StructArray, field_bindings: &[FieldBinding]) -> Result<Self, AvroError> {
1530        let mut encoders = Vec::with_capacity(field_bindings.len());
1531        for field_binding in field_bindings {
1532            let idx = field_binding.arrow_index;
1533            let column = array.columns().get(idx).ok_or_else(|| {
1534                AvroError::SchemaError(format!("Struct child index {idx} out of range"))
1535            })?;
1536            let encoder = FieldEncoder::make_encoder(
1537                column.as_ref(),
1538                &field_binding.plan,
1539                field_binding.nullability,
1540            )?;
1541            encoders.push(encoder);
1542        }
1543        Ok(Self { encoders })
1544    }
1545
1546    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1547        for encoder in self.encoders.iter_mut() {
1548            encoder.encode(out, idx)?;
1549        }
1550        Ok(())
1551    }
1552}
1553
1554/// Encode a blocked range of items with Avro array block framing.
1555///
1556/// `write_item` must take `(out, index)` to maintain the "out-first" convention.
1557fn encode_blocked_range<W: Write + ?Sized, F>(
1558    out: &mut W,
1559    start: usize,
1560    end: usize,
1561    mut write_item: F,
1562) -> Result<(), AvroError>
1563where
1564    F: FnMut(&mut W, usize) -> Result<(), AvroError>,
1565{
1566    let len = end.saturating_sub(start);
1567    if len == 0 {
1568        // Zero-length terminator per Avro spec.
1569        write_long(out, 0)?;
1570        return Ok(());
1571    }
1572    // Emit a single positive block for performance, then the end marker.
1573    write_long(out, len as i64)?;
1574    for row in start..end {
1575        write_item(out, row)?;
1576    }
1577    write_long(out, 0)?;
1578    Ok(())
1579}
1580
1581struct ListEncoder<'a, O: OffsetSizeTrait> {
1582    list: &'a GenericListArray<O>,
1583    values: FieldEncoder<'a>,
1584    values_offset: usize,
1585}
1586
1587type ListEncoder32<'a> = ListEncoder<'a, i32>;
1588type ListEncoder64<'a> = ListEncoder<'a, i64>;
1589
1590impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
1591    fn try_new(
1592        list: &'a GenericListArray<O>,
1593        items_nullability: Option<Nullability>,
1594        item_plan: &FieldPlan,
1595    ) -> Result<Self, AvroError> {
1596        Ok(Self {
1597            list,
1598            values: FieldEncoder::make_encoder(
1599                list.values().as_ref(),
1600                item_plan,
1601                items_nullability,
1602            )?,
1603            values_offset: list.values().offset(),
1604        })
1605    }
1606
1607    fn encode_list_range<W: Write + ?Sized>(
1608        &mut self,
1609        out: &mut W,
1610        start: usize,
1611        end: usize,
1612    ) -> Result<(), AvroError> {
1613        encode_blocked_range(out, start, end, |out, row| {
1614            self.values
1615                .encode(out, row.saturating_sub(self.values_offset))
1616        })
1617    }
1618
1619    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1620        let offsets = self.list.offsets();
1621        let start = offsets[idx].to_usize().ok_or_else(|| {
1622            AvroError::InvalidArgument(format!("Error converting offset[{idx}] to usize"))
1623        })?;
1624        let end = offsets[idx + 1].to_usize().ok_or_else(|| {
1625            AvroError::InvalidArgument(format!("Error converting offset[{}] to usize", idx + 1))
1626        })?;
1627        self.encode_list_range(out, start, end)
1628    }
1629}
1630
1631/// ListView encoder using `(offset, size)` buffers.
1632struct ListViewEncoder<'a, O: OffsetSizeTrait> {
1633    list: &'a GenericListViewArray<O>,
1634    values: FieldEncoder<'a>,
1635    values_offset: usize,
1636}
1637type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
1638type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
1639
1640impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
1641    fn try_new(
1642        list: &'a GenericListViewArray<O>,
1643        items_nullability: Option<Nullability>,
1644        item_plan: &FieldPlan,
1645    ) -> Result<Self, AvroError> {
1646        Ok(Self {
1647            list,
1648            values: FieldEncoder::make_encoder(
1649                list.values().as_ref(),
1650                item_plan,
1651                items_nullability,
1652            )?,
1653            values_offset: list.values().offset(),
1654        })
1655    }
1656
1657    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1658        let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
1659            AvroError::InvalidArgument(format!("Error converting value_offset[{idx}] to usize"))
1660        })?;
1661        let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
1662            AvroError::InvalidArgument(format!("Error converting value_size[{idx}] to usize"))
1663        })?;
1664        let start = start + self.values_offset;
1665        let end = start + len;
1666        encode_blocked_range(out, start, end, |out, row| {
1667            self.values
1668                .encode(out, row.saturating_sub(self.values_offset))
1669        })
1670    }
1671}
1672
1673/// FixedSizeList encoder.
1674struct FixedSizeListEncoder<'a> {
1675    list: &'a FixedSizeListArray,
1676    values: FieldEncoder<'a>,
1677    values_offset: usize,
1678    elem_len: usize,
1679}
1680
1681impl<'a> FixedSizeListEncoder<'a> {
1682    fn try_new(
1683        list: &'a FixedSizeListArray,
1684        items_nullability: Option<Nullability>,
1685        item_plan: &FieldPlan,
1686    ) -> Result<Self, AvroError> {
1687        Ok(Self {
1688            list,
1689            values: FieldEncoder::make_encoder(
1690                list.values().as_ref(),
1691                item_plan,
1692                items_nullability,
1693            )?,
1694            values_offset: list.values().offset(),
1695            elem_len: list.value_length() as usize,
1696        })
1697    }
1698
1699    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1700        // Starting index is relative to values() start
1701        let rel = self.list.value_offset(idx) as usize;
1702        let start = self.values_offset + rel;
1703        let end = start + self.elem_len;
1704        encode_blocked_range(out, start, end, |out, row| {
1705            self.values
1706                .encode(out, row.saturating_sub(self.values_offset))
1707        })
1708    }
1709}
1710
1711/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`.
1712/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
1713struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
1714impl FixedEncoder<'_> {
1715    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1716        let v = self.0.value(idx); // &[u8] of fixed width
1717        out.write_all(v)?;
1718        Ok(())
1719    }
1720}
1721
1722/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) to Avro string (UUID).
1723/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated form.
1724struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
1725impl UuidEncoder<'_> {
1726    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1727        let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
1728        buf[0] = 0x48;
1729        let v = self.0.value(idx);
1730        let u = Uuid::from_slice(v)
1731            .map_err(|e| AvroError::InvalidArgument(format!("Invalid UUID bytes: {e}")))?;
1732        let _ = u.hyphenated().encode_lower(&mut buf[1..]);
1733        out.write_all(&buf)?;
1734        Ok(())
1735    }
1736}
1737
1738#[derive(Copy, Clone)]
1739struct DurationParts {
1740    months: u32,
1741    days: u32,
1742    millis: u32,
1743}
1744/// Trait mapping an Arrow interval native value to Avro duration `(months, days, millis)`.
1745trait IntervalToDurationParts: ArrowPrimitiveType {
1746    fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError>;
1747}
1748impl IntervalToDurationParts for IntervalMonthDayNanoType {
1749    fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
1750        let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
1751        if months < 0 || days < 0 || nanos < 0 {
1752            return Err(AvroError::InvalidArgument(
1753                "Avro 'duration' cannot encode negative months/days/nanoseconds".into(),
1754            ));
1755        }
1756        if nanos % 1_000_000 != 0 {
1757            return Err(AvroError::InvalidArgument(
1758                "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000"
1759                    .into(),
1760            ));
1761        }
1762        let millis = nanos / 1_000_000;
1763        if millis > u32::MAX as i64 {
1764            return Err(AvroError::InvalidArgument(
1765                "Avro 'duration' milliseconds exceed u32::MAX".into(),
1766            ));
1767        }
1768        Ok(DurationParts {
1769            months: months as u32,
1770            days: days as u32,
1771            millis: millis as u32,
1772        })
1773    }
1774}
1775impl IntervalToDurationParts for IntervalYearMonthType {
1776    fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
1777        if native < 0 {
1778            return Err(AvroError::InvalidArgument(
1779                "Avro 'duration' cannot encode negative months".into(),
1780            ));
1781        }
1782        Ok(DurationParts {
1783            months: native as u32,
1784            days: 0,
1785            millis: 0,
1786        })
1787    }
1788}
1789impl IntervalToDurationParts for IntervalDayTimeType {
1790    fn duration_parts(native: Self::Native) -> Result<DurationParts, AvroError> {
1791        let (days, millis) = IntervalDayTimeType::to_parts(native);
1792        if days < 0 || millis < 0 {
1793            return Err(AvroError::InvalidArgument(
1794                "Avro 'duration' cannot encode negative days or milliseconds".into(),
1795            ));
1796        }
1797        Ok(DurationParts {
1798            months: 0,
1799            days: days as u32,
1800            millis: millis as u32,
1801        })
1802    }
1803}
1804
1805/// Single generic encoder used for all three interval units.
1806/// Writes Avro `fixed(12)` as three little-endian u32 values in one call.
1807struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
1808impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
1809    #[inline(always)]
1810    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1811        let parts = P::duration_parts(self.0.value(idx))?;
1812        let months = parts.months.to_le_bytes();
1813        let days = parts.days.to_le_bytes();
1814        let ms = parts.millis.to_le_bytes();
1815        // SAFETY
1816        // - Endianness & layout: Avro's `duration` logical type is encoded as fixed(12)
1817        //   with three *little-endian* unsigned 32-bit integers in order: (months, days, millis).
1818        //   We explicitly materialize exactly those 12 bytes.
1819        // - In-bounds indexing: `to_le_bytes()` on `u32` returns `[u8; 4]` by contract,
1820        //   therefore, the constant indices 0..=3 used below are *always* in-bounds.
1821        //   Rust will panic on out-of-bounds indexing, but there is no such path here;
1822        //   the compiler can also elide the bound checks for constant, provably in-range
1823        //   indices. [std docs; Rust Performance Book on bounds-check elimination]
1824        // - Memory safety: The `[u8; 12]` array is built on the stack by value, with no
1825        //   aliasing and no uninitialized memory. There is no `unsafe`.
1826        // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated as an AvroError,
1827        //   so I/O errors are reported, not panicked.
1828        // Consequently, constructing `buf` with the constant indices below is safe and
1829        // panic-free under these validated preconditions.
1830        let buf = [
1831            months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
1832            ms[1], ms[2], ms[3],
1833        ];
1834        out.write_all(&buf)?;
1835        Ok(())
1836    }
1837}
1838
1839/// Minimal trait to obtain a big-endian fixed-size byte array for a decimal's
1840/// unscaled integer value at `idx`.
1841trait DecimalBeBytes<const N: usize> {
1842    fn value_be_bytes(&self, idx: usize) -> [u8; N];
1843}
1844#[cfg(feature = "small_decimals")]
1845impl DecimalBeBytes<4> for Decimal32Array {
1846    fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
1847        self.value(idx).to_be_bytes()
1848    }
1849}
1850#[cfg(feature = "small_decimals")]
1851impl DecimalBeBytes<8> for Decimal64Array {
1852    fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
1853        self.value(idx).to_be_bytes()
1854    }
1855}
1856impl DecimalBeBytes<16> for Decimal128Array {
1857    fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
1858        self.value(idx).to_be_bytes()
1859    }
1860}
1861impl DecimalBeBytes<32> for Decimal256Array {
1862    fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
1863        // Arrow i256 → [u8; 32] big-endian
1864        self.value(idx).to_be_bytes()
1865    }
1866}
1867
1868/// Generic Avro decimal encoder over Arrow decimal arrays.
1869/// - When `fixed_size` is `None` → Avro `bytes(decimal)`; writes the minimal
1870///   two's-complement representation with a length prefix.
1871/// - When `Some(n)` → Avro `fixed(n, decimal)`; sign-extends (or validates)
1872///   to exactly `n` bytes and writes them directly.
1873struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
1874    arr: &'a A,
1875    fixed_size: Option<usize>,
1876}
1877
1878impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
1879    fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
1880        Self { arr, fixed_size }
1881    }
1882
1883    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1884        let be = self.arr.value_be_bytes(idx);
1885        match self.fixed_size {
1886            Some(n) => write_sign_extended(out, &be, n),
1887            None => write_len_prefixed(out, minimal_twos_complement(&be)),
1888        }
1889    }
1890}
1891
1892#[cfg(feature = "small_decimals")]
1893type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
1894#[cfg(feature = "small_decimals")]
1895type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
1896type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
1897type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
1898
1899/// Generic encoder for Arrow `RunArray<R>`-based sites (run-end encoded).
1900/// Follows the pattern used by other generic encoders (i.e., `ListEncoder<O>`),
1901/// avoiding runtime branching on run-end width.
1902struct RunEncodedEncoder<'a, R: RunEndIndexType> {
1903    ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
1904    base: usize,
1905    len: usize,
1906    values: FieldEncoder<'a>,
1907    // Cached run index used for sequential scans of rows [0..n)
1908    cur_run: usize,
1909    // Cached end (logical index, 1-based per spec) for the current run.
1910    cur_end: usize,
1911}
1912
1913type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
1914type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
1915type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
1916
1917impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
1918    fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
1919        let ends = arr.run_ends();
1920        let base = ends.get_start_physical_index();
1921        let slice = ends.values();
1922        let len = ends.len();
1923        let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
1924        Self {
1925            ends_slice: slice,
1926            base,
1927            len,
1928            values,
1929            cur_run: 0,
1930            cur_end,
1931        }
1932    }
1933
1934    /// Advance `cur_run` so that `idx` is within the run ending at `cur_end`.
1935    /// Uses the REE invariant: run ends are strictly increasing, positive, and 1-based.
1936    #[inline(always)]
1937    fn advance_to_row(&mut self, idx: usize) -> Result<(), AvroError> {
1938        if idx < self.cur_end {
1939            return Ok(());
1940        }
1941        // Move forward across run boundaries until idx falls within cur_end
1942        while self.cur_run + 1 < self.len && idx >= self.cur_end {
1943            self.cur_run += 1;
1944            self.cur_end = self.ends_slice[self.base + self.cur_run].as_usize();
1945        }
1946        if idx < self.cur_end {
1947            Ok(())
1948        } else {
1949            Err(AvroError::InvalidArgument(format!(
1950                "row index {idx} out of bounds for run-ends ({} runs)",
1951                self.len
1952            )))
1953        }
1954    }
1955
1956    #[inline(always)]
1957    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), AvroError> {
1958        self.advance_to_row(idx)?;
1959        // For REE values, the value for any logical row within a run is at
1960        // the physical index of that run.
1961        self.values.encode(out, self.cur_run)
1962    }
1963}
1964
1965#[cfg(test)]
1966mod tests {
1967    use super::*;
1968    use arrow_array::types::Int32Type;
1969    use arrow_array::{
1970        Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
1971        Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, NullArray,
1972        StringArray,
1973    };
1974    use arrow_buffer::Buffer;
1975    use arrow_schema::{DataType, Field, Fields, UnionFields};
1976
1977    fn zigzag_i64(v: i64) -> u64 {
1978        ((v << 1) ^ (v >> 63)) as u64
1979    }
1980
1981    fn varint(mut x: u64) -> Vec<u8> {
1982        let mut out = Vec::new();
1983        while (x & !0x7f) != 0 {
1984            out.push(((x & 0x7f) as u8) | 0x80);
1985            x >>= 7;
1986        }
1987        out.push((x & 0x7f) as u8);
1988        out
1989    }
1990
1991    fn avro_long_bytes(v: i64) -> Vec<u8> {
1992        varint(zigzag_i64(v))
1993    }
1994
1995    fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
1996        let mut out = avro_long_bytes(payload.len() as i64);
1997        out.extend_from_slice(payload);
1998        out
1999    }
2000
2001    fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
2002        let m = months.to_le_bytes();
2003        let d = days.to_le_bytes();
2004        let ms = millis.to_le_bytes();
2005        [
2006            m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
2007        ]
2008    }
2009
2010    fn encode_all(
2011        array: &dyn Array,
2012        plan: &FieldPlan,
2013        nullability: Option<Nullability>,
2014    ) -> Vec<u8> {
2015        let mut enc = FieldEncoder::make_encoder(array, plan, nullability).unwrap();
2016        let mut out = Vec::new();
2017        for i in 0..array.len() {
2018            enc.encode(&mut out, i).unwrap();
2019        }
2020        out
2021    }
2022
2023    fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
2024        if actual != expected {
2025            let to_hex = |b: &[u8]| {
2026                b.iter()
2027                    .map(|x| format!("{:02X}", x))
2028                    .collect::<Vec<_>>()
2029                    .join(" ")
2030            };
2031            panic!(
2032                "mismatch\n  expected: [{}]\n    actual: [{}]",
2033                to_hex(expected),
2034                to_hex(actual)
2035            );
2036        }
2037    }
2038
2039    fn row_slice<'a>(buf: &'a [u8], offsets: &[usize], row: usize) -> &'a [u8] {
2040        let start = offsets[row];
2041        let end = offsets[row + 1];
2042        &buf[start..end]
2043    }
2044
2045    #[test]
2046    fn binary_encoder() {
2047        let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
2048        let arr = BinaryArray::from_vec(values);
2049        let mut expected = Vec::new();
2050        for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
2051            expected.extend(avro_len_prefixed_bytes(payload));
2052        }
2053        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2054        assert_bytes_eq(&got, &expected);
2055    }
2056
2057    #[test]
2058    fn large_binary_encoder() {
2059        let values: Vec<&[u8]> = vec![b"xyz", b""];
2060        let arr = LargeBinaryArray::from_vec(values);
2061        let mut expected = Vec::new();
2062        for payload in [b"xyz" as &[u8], b""] {
2063            expected.extend(avro_len_prefixed_bytes(payload));
2064        }
2065        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2066        assert_bytes_eq(&got, &expected);
2067    }
2068
2069    #[test]
2070    fn utf8_encoder() {
2071        let arr = StringArray::from(vec!["", "A", "BC"]);
2072        let mut expected = Vec::new();
2073        for s in ["", "A", "BC"] {
2074            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2075        }
2076        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2077        assert_bytes_eq(&got, &expected);
2078    }
2079
2080    #[test]
2081    fn large_utf8_encoder() {
2082        let arr = LargeStringArray::from(vec!["hello", ""]);
2083        let mut expected = Vec::new();
2084        for s in ["hello", ""] {
2085            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2086        }
2087        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2088        assert_bytes_eq(&got, &expected);
2089    }
2090
2091    #[test]
2092    fn list_encoder_int32() {
2093        // Build ListArray [[1,2], [], [3]]
2094        let values = Int32Array::from(vec![1, 2, 3]);
2095        let offsets = vec![0, 2, 2, 3];
2096        let list = ListArray::new(
2097            Field::new("item", DataType::Int32, true).into(),
2098            arrow_buffer::OffsetBuffer::new(offsets.into()),
2099            Arc::new(values) as ArrayRef,
2100            None,
2101        );
2102        // Avro array encoding per row
2103        let mut expected = Vec::new();
2104        // row 0: block len 2, items 1,2 then 0
2105        expected.extend(avro_long_bytes(2));
2106        expected.extend(avro_long_bytes(1));
2107        expected.extend(avro_long_bytes(2));
2108        expected.extend(avro_long_bytes(0));
2109        // row 1: empty
2110        expected.extend(avro_long_bytes(0));
2111        // row 2: one item 3
2112        expected.extend(avro_long_bytes(1));
2113        expected.extend(avro_long_bytes(3));
2114        expected.extend(avro_long_bytes(0));
2115
2116        let plan = FieldPlan::List {
2117            items_nullability: None,
2118            item_plan: Box::new(FieldPlan::Scalar),
2119        };
2120        let got = encode_all(&list, &plan, None);
2121        assert_bytes_eq(&got, &expected);
2122    }
2123
2124    #[test]
2125    fn struct_encoder_two_fields() {
2126        // Struct { a: Int32, b: Utf8 }
2127        let a = Int32Array::from(vec![1, 2]);
2128        let b = StringArray::from(vec!["x", "y"]);
2129        let fields = Fields::from(vec![
2130            Field::new("a", DataType::Int32, true),
2131            Field::new("b", DataType::Utf8, true),
2132        ]);
2133        let struct_arr = StructArray::new(
2134            fields.clone(),
2135            vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2136            None,
2137        );
2138        let plan = FieldPlan::Struct {
2139            bindings: vec![
2140                FieldBinding {
2141                    arrow_index: 0,
2142                    nullability: None,
2143                    plan: FieldPlan::Scalar,
2144                },
2145                FieldBinding {
2146                    arrow_index: 1,
2147                    nullability: None,
2148                    plan: FieldPlan::Scalar,
2149                },
2150            ],
2151        };
2152        let got = encode_all(&struct_arr, &plan, None);
2153        // Expected: rows concatenated: a then b
2154        let mut expected = Vec::new();
2155        expected.extend(avro_long_bytes(1)); // a=1
2156        expected.extend(avro_len_prefixed_bytes(b"x")); // b="x"
2157        expected.extend(avro_long_bytes(2)); // a=2
2158        expected.extend(avro_len_prefixed_bytes(b"y")); // b="y"
2159        assert_bytes_eq(&got, &expected);
2160    }
2161
2162    #[test]
2163    fn enum_encoder_dictionary() {
2164        // symbols: ["A","B","C"], keys [2,0,1]
2165        let dict_values = StringArray::from(vec!["A", "B", "C"]);
2166        let keys = Int32Array::from(vec![2, 0, 1]);
2167        let dict =
2168            DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
2169        let symbols = Arc::<[String]>::from(
2170            vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
2171        );
2172        let plan = FieldPlan::Enum { symbols };
2173        let got = encode_all(&dict, &plan, None);
2174        let mut expected = Vec::new();
2175        expected.extend(avro_long_bytes(2));
2176        expected.extend(avro_long_bytes(0));
2177        expected.extend(avro_long_bytes(1));
2178        assert_bytes_eq(&got, &expected);
2179    }
2180
2181    #[test]
2182    fn decimal_bytes_and_fixed() {
2183        // Use Decimal128 with small positives and negatives
2184        let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
2185            .with_precision_and_scale(20, 0)
2186            .unwrap();
2187        // bytes(decimal): minimal two's complement length-prefixed
2188        let plan_bytes = FieldPlan::Decimal { size: None };
2189        let got_bytes = encode_all(&dec, &plan_bytes, None);
2190        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
2191        let mut expected_bytes = Vec::new();
2192        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2193        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2194        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2195        assert_bytes_eq(&got_bytes, &expected_bytes);
2196
2197        let plan_fixed = FieldPlan::Decimal { size: Some(16) };
2198        let got_fixed = encode_all(&dec, &plan_fixed, None);
2199        let mut expected_fixed = Vec::new();
2200        expected_fixed.extend_from_slice(&1i128.to_be_bytes());
2201        expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
2202        expected_fixed.extend_from_slice(&0i128.to_be_bytes());
2203        assert_bytes_eq(&got_fixed, &expected_fixed);
2204    }
2205
2206    #[test]
2207    fn decimal_bytes_256() {
2208        use arrow_buffer::i256;
2209        // Use Decimal256 with small positives and negatives
2210        let dec = Decimal256Array::from(vec![
2211            i256::from_i128(1),
2212            i256::from_i128(-1),
2213            i256::from_i128(0),
2214        ])
2215        .with_precision_and_scale(76, 0)
2216        .unwrap();
2217        // bytes(decimal): minimal two's complement length-prefixed
2218        let plan_bytes = FieldPlan::Decimal { size: None };
2219        let got_bytes = encode_all(&dec, &plan_bytes, None);
2220        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
2221        let mut expected_bytes = Vec::new();
2222        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2223        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2224        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2225        assert_bytes_eq(&got_bytes, &expected_bytes);
2226
2227        // fixed(32): 32-byte big-endian two's complement
2228        let plan_fixed = FieldPlan::Decimal { size: Some(32) };
2229        let got_fixed = encode_all(&dec, &plan_fixed, None);
2230        let mut expected_fixed = Vec::new();
2231        expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
2232        expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
2233        expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
2234        assert_bytes_eq(&got_fixed, &expected_fixed);
2235    }
2236
2237    #[cfg(feature = "small_decimals")]
2238    #[test]
2239    fn decimal_bytes_and_fixed_32() {
2240        // Use Decimal32 with small positives and negatives
2241        let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
2242            .with_precision_and_scale(9, 0)
2243            .unwrap();
2244        // bytes(decimal)
2245        let plan_bytes = FieldPlan::Decimal { size: None };
2246        let got_bytes = encode_all(&dec, &plan_bytes, None);
2247        let mut expected_bytes = Vec::new();
2248        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2249        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2250        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2251        assert_bytes_eq(&got_bytes, &expected_bytes);
2252        // fixed(4)
2253        let plan_fixed = FieldPlan::Decimal { size: Some(4) };
2254        let got_fixed = encode_all(&dec, &plan_fixed, None);
2255        let mut expected_fixed = Vec::new();
2256        expected_fixed.extend_from_slice(&1i32.to_be_bytes());
2257        expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
2258        expected_fixed.extend_from_slice(&0i32.to_be_bytes());
2259        assert_bytes_eq(&got_fixed, &expected_fixed);
2260    }
2261
2262    #[cfg(feature = "small_decimals")]
2263    #[test]
2264    fn decimal_bytes_and_fixed_64() {
2265        // Use Decimal64 with small positives and negatives
2266        let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
2267            .with_precision_and_scale(18, 0)
2268            .unwrap();
2269        // bytes(decimal)
2270        let plan_bytes = FieldPlan::Decimal { size: None };
2271        let got_bytes = encode_all(&dec, &plan_bytes, None);
2272        let mut expected_bytes = Vec::new();
2273        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2274        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2275        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2276        assert_bytes_eq(&got_bytes, &expected_bytes);
2277        // fixed(8)
2278        let plan_fixed = FieldPlan::Decimal { size: Some(8) };
2279        let got_fixed = encode_all(&dec, &plan_fixed, None);
2280        let mut expected_fixed = Vec::new();
2281        expected_fixed.extend_from_slice(&1i64.to_be_bytes());
2282        expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
2283        expected_fixed.extend_from_slice(&0i64.to_be_bytes());
2284        assert_bytes_eq(&got_fixed, &expected_fixed);
2285    }
2286
2287    #[test]
2288    fn float32_and_float64_encoders() {
2289        let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); // includes a quiet NaN bit pattern
2290        let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
2291        // f32 expected
2292        let mut expected32 = Vec::new();
2293        for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
2294            expected32.extend_from_slice(&v.to_bits().to_le_bytes());
2295        }
2296        let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
2297        assert_bytes_eq(&got32, &expected32);
2298        // f64 expected
2299        let mut expected64 = Vec::new();
2300        for v in [0.0f64, -2.25f64] {
2301            expected64.extend_from_slice(&v.to_bits().to_le_bytes());
2302        }
2303        let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
2304        assert_bytes_eq(&got64, &expected64);
2305    }
2306
2307    #[test]
2308    fn long_encoder_int64() {
2309        let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
2310        let mut expected = Vec::new();
2311        for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
2312            expected.extend(avro_long_bytes(v));
2313        }
2314        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2315        assert_bytes_eq(&got, &expected);
2316    }
2317
2318    #[test]
2319    fn fixed_encoder_plain() {
2320        // Two values of width 4
2321        let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
2322        let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
2323        let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
2324        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2325        let mut expected = Vec::new();
2326        expected.extend_from_slice(&data[0]);
2327        expected.extend_from_slice(&data[1]);
2328        assert_bytes_eq(&got, &expected);
2329    }
2330
2331    #[test]
2332    fn uuid_encoder_test() {
2333        // Happy path
2334        let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
2335        let bytes = *u.as_bytes();
2336        let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
2337        // Expected: length 36 (0x48) followed by hyphenated lowercase text
2338        let mut expected = Vec::new();
2339        expected.push(0x48);
2340        expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
2341        let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
2342        assert_bytes_eq(&got, &expected);
2343    }
2344
2345    #[test]
2346    fn uuid_encoder_error() {
2347        // Invalid UUID bytes: wrong length
2348        let arr =
2349            FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
2350                .unwrap();
2351        let plan = FieldPlan::Uuid;
2352        let mut enc = FieldEncoder::make_encoder(&arr, &plan, None).unwrap();
2353        let mut out = Vec::new();
2354        let err = enc.encode(&mut out, 0).unwrap_err();
2355        match err {
2356            AvroError::InvalidArgument(msg) => {
2357                assert!(msg.contains("Invalid UUID bytes"))
2358            }
2359            other => panic!("expected InvalidArgument, got {other:?}"),
2360        }
2361    }
2362
2363    fn test_scalar_primitive_encoding<T>(
2364        non_nullable_data: &[T::Native],
2365        nullable_data: &[Option<T::Native>],
2366    ) where
2367        T: ArrowPrimitiveType,
2368        T::Native: Into<i64> + Copy,
2369        PrimitiveArray<T>: From<Vec<<T as ArrowPrimitiveType>::Native>>,
2370    {
2371        let plan = FieldPlan::Scalar;
2372
2373        let array = PrimitiveArray::<T>::from(non_nullable_data.to_vec());
2374        let got = encode_all(&array, &plan, None);
2375
2376        let mut expected = Vec::new();
2377        for &value in non_nullable_data {
2378            expected.extend(avro_long_bytes(value.into()));
2379        }
2380        assert_bytes_eq(&got, &expected);
2381
2382        let array_nullable: PrimitiveArray<T> = nullable_data.iter().copied().collect();
2383        let got_nullable = encode_all(&array_nullable, &plan, Some(Nullability::NullFirst));
2384
2385        let mut expected_nullable = Vec::new();
2386        for &opt_value in nullable_data {
2387            match opt_value {
2388                Some(value) => {
2389                    // Union index 1 for the value, then the value itself
2390                    expected_nullable.extend(avro_long_bytes(1));
2391                    expected_nullable.extend(avro_long_bytes(value.into()));
2392                }
2393                None => {
2394                    // Union index 0 for the null
2395                    expected_nullable.extend(avro_long_bytes(0));
2396                }
2397            }
2398        }
2399        assert_bytes_eq(&got_nullable, &expected_nullable);
2400    }
2401
2402    #[test]
2403    fn date32_encoder() {
2404        test_scalar_primitive_encoding::<Date32Type>(
2405            &[
2406                19345, // 2022-12-20
2407                0,     // 1970-01-01 (epoch)
2408                -1,    // 1969-12-31 (pre-epoch)
2409            ],
2410            &[Some(19345), None],
2411        );
2412    }
2413
2414    #[test]
2415    fn time32_millis_encoder() {
2416        test_scalar_primitive_encoding::<Time32MillisecondType>(
2417            &[
2418                0,        // Midnight
2419                49530123, // 13:45:30.123
2420                86399999, // 23:59:59.999
2421            ],
2422            &[None, Some(49530123)],
2423        );
2424    }
2425
2426    #[test]
2427    fn time64_micros_encoder() {
2428        test_scalar_primitive_encoding::<Time64MicrosecondType>(
2429            &[
2430                0,           // Midnight
2431                86399999999, // 23:59:59.999999
2432            ],
2433            &[Some(86399999999), None],
2434        );
2435    }
2436
2437    #[test]
2438    fn timestamp_millis_encoder() {
2439        test_scalar_primitive_encoding::<TimestampMillisecondType>(
2440            &[
2441                1704067200000, // 2024-01-01T00:00:00Z
2442                0,             // 1970-01-01T00:00:00Z (epoch)
2443                -123456789,    // Pre-epoch timestamp
2444            ],
2445            &[None, Some(1704067200000)],
2446        );
2447    }
2448
2449    #[test]
2450    fn map_encoder_string_keys_int_values() {
2451        // Build MapArray with two rows
2452        // Row0: {"k1":1, "k2":2}
2453        // Row1: {}
2454        let keys = StringArray::from(vec!["k1", "k2"]);
2455        let values = Int32Array::from(vec![1, 2]);
2456        let entries_fields = Fields::from(vec![
2457            Field::new("key", DataType::Utf8, false),
2458            Field::new("value", DataType::Int32, true),
2459        ]);
2460        let entries = StructArray::new(
2461            entries_fields,
2462            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2463            None,
2464        );
2465        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
2466        let map = MapArray::new(
2467            Field::new("entries", entries.data_type().clone(), false).into(),
2468            offsets,
2469            entries,
2470            None,
2471            false,
2472        );
2473        let plan = FieldPlan::Map {
2474            values_nullability: None,
2475            value_plan: Box::new(FieldPlan::Scalar),
2476        };
2477        let got = encode_all(&map, &plan, None);
2478        let mut expected = Vec::new();
2479        // Row0: block 2 then pairs
2480        expected.extend(avro_long_bytes(2));
2481        expected.extend(avro_len_prefixed_bytes(b"k1"));
2482        expected.extend(avro_long_bytes(1));
2483        expected.extend(avro_len_prefixed_bytes(b"k2"));
2484        expected.extend(avro_long_bytes(2));
2485        expected.extend(avro_long_bytes(0));
2486        // Row1: empty
2487        expected.extend(avro_long_bytes(0));
2488        assert_bytes_eq(&got, &expected);
2489    }
2490
2491    #[test]
2492    fn union_encoder_string_int() {
2493        let strings = StringArray::from(vec!["hello", "world"]);
2494        let ints = Int32Array::from(vec![10, 20, 30]);
2495
2496        let union_fields = UnionFields::try_new(
2497            vec![0, 1],
2498            vec![
2499                Field::new("v_str", DataType::Utf8, true),
2500                Field::new("v_int", DataType::Int32, true),
2501            ],
2502        )
2503        .unwrap();
2504
2505        let type_ids = Buffer::from_slice_ref([0_i8, 1, 1, 0, 1]);
2506        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2507
2508        let union_array = UnionArray::try_new(
2509            union_fields,
2510            type_ids.into(),
2511            Some(offsets.into()),
2512            vec![Arc::new(strings), Arc::new(ints)],
2513        )
2514        .unwrap();
2515
2516        let plan = FieldPlan::Union {
2517            bindings: vec![
2518                FieldBinding {
2519                    arrow_index: 0,
2520                    nullability: None,
2521                    plan: FieldPlan::Scalar,
2522                },
2523                FieldBinding {
2524                    arrow_index: 1,
2525                    nullability: None,
2526                    plan: FieldPlan::Scalar,
2527                },
2528            ],
2529        };
2530
2531        let got = encode_all(&union_array, &plan, None);
2532
2533        let mut expected = Vec::new();
2534        expected.extend(avro_long_bytes(0));
2535        expected.extend(avro_len_prefixed_bytes(b"hello"));
2536        expected.extend(avro_long_bytes(1));
2537        expected.extend(avro_long_bytes(10));
2538        expected.extend(avro_long_bytes(1));
2539        expected.extend(avro_long_bytes(20));
2540        expected.extend(avro_long_bytes(0));
2541        expected.extend(avro_len_prefixed_bytes(b"world"));
2542        expected.extend(avro_long_bytes(1));
2543        expected.extend(avro_long_bytes(30));
2544
2545        assert_bytes_eq(&got, &expected);
2546    }
2547
2548    #[test]
2549    fn union_encoder_null_string_int() {
2550        let nulls = NullArray::new(1);
2551        let strings = StringArray::from(vec!["hello"]);
2552        let ints = Int32Array::from(vec![10]);
2553
2554        let union_fields = UnionFields::try_new(
2555            vec![0, 1, 2],
2556            vec![
2557                Field::new("v_null", DataType::Null, true),
2558                Field::new("v_str", DataType::Utf8, true),
2559                Field::new("v_int", DataType::Int32, true),
2560            ],
2561        )
2562        .unwrap();
2563
2564        let type_ids = Buffer::from_slice_ref([0_i8, 1, 2]);
2565        // For a null value in a dense union, no value is added to a child array.
2566        // The offset points to the last value of that type. Since there's only one
2567        // null, and one of each other type, all offsets are 0.
2568        let offsets = Buffer::from_slice_ref([0_i32, 0, 0]);
2569
2570        let union_array = UnionArray::try_new(
2571            union_fields,
2572            type_ids.into(),
2573            Some(offsets.into()),
2574            vec![Arc::new(nulls), Arc::new(strings), Arc::new(ints)],
2575        )
2576        .unwrap();
2577
2578        let plan = FieldPlan::Union {
2579            bindings: vec![
2580                FieldBinding {
2581                    arrow_index: 0,
2582                    nullability: None,
2583                    plan: FieldPlan::Scalar,
2584                },
2585                FieldBinding {
2586                    arrow_index: 1,
2587                    nullability: None,
2588                    plan: FieldPlan::Scalar,
2589                },
2590                FieldBinding {
2591                    arrow_index: 2,
2592                    nullability: None,
2593                    plan: FieldPlan::Scalar,
2594                },
2595            ],
2596        };
2597
2598        let got = encode_all(&union_array, &plan, None);
2599
2600        let mut expected = Vec::new();
2601        expected.extend(avro_long_bytes(0));
2602        expected.extend(avro_long_bytes(1));
2603        expected.extend(avro_len_prefixed_bytes(b"hello"));
2604        expected.extend(avro_long_bytes(2));
2605        expected.extend(avro_long_bytes(10));
2606
2607        assert_bytes_eq(&got, &expected);
2608    }
2609
2610    #[test]
2611    fn list64_encoder_int32() {
2612        // LargeList [[1,2,3], []]
2613        let values = Int32Array::from(vec![1, 2, 3]);
2614        let offsets: Vec<i64> = vec![0, 3, 3];
2615        let list = LargeListArray::new(
2616            Field::new("item", DataType::Int32, true).into(),
2617            arrow_buffer::OffsetBuffer::new(offsets.into()),
2618            Arc::new(values) as ArrayRef,
2619            None,
2620        );
2621        let plan = FieldPlan::List {
2622            items_nullability: None,
2623            item_plan: Box::new(FieldPlan::Scalar),
2624        };
2625        let got = encode_all(&list, &plan, None);
2626        // Expected one block of 3 and then 0, then empty 0
2627        let mut expected = Vec::new();
2628        expected.extend(avro_long_bytes(3));
2629        expected.extend(avro_long_bytes(1));
2630        expected.extend(avro_long_bytes(2));
2631        expected.extend(avro_long_bytes(3));
2632        expected.extend(avro_long_bytes(0));
2633        expected.extend(avro_long_bytes(0));
2634        assert_bytes_eq(&got, &expected);
2635    }
2636
2637    #[test]
2638    fn int_encoder_test() {
2639        let ints = Int32Array::from(vec![0, -1, 2]);
2640        let mut expected_i = Vec::new();
2641        for v in [0i32, -1, 2] {
2642            expected_i.extend(avro_long_bytes(v as i64));
2643        }
2644        let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
2645        assert_bytes_eq(&got_i, &expected_i);
2646    }
2647
2648    #[test]
2649    fn boolean_encoder_test() {
2650        let bools = BooleanArray::from(vec![true, false]);
2651        let mut expected_b = Vec::new();
2652        expected_b.extend_from_slice(&[1]);
2653        expected_b.extend_from_slice(&[0]);
2654        let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
2655        assert_bytes_eq(&got_b, &expected_b);
2656    }
2657
2658    #[test]
2659    #[cfg(feature = "avro_custom_types")]
2660    fn duration_encoding_seconds() {
2661        let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
2662        let mut expected = Vec::new();
2663        for v in [0i64, -1, 2] {
2664            expected.extend_from_slice(&avro_long_bytes(v));
2665        }
2666        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2667        assert_bytes_eq(&got, &expected);
2668    }
2669
2670    #[test]
2671    #[cfg(feature = "avro_custom_types")]
2672    fn duration_encoding_milliseconds() {
2673        let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
2674        let mut expected = Vec::new();
2675        for v in [1i64, 0, -2] {
2676            expected.extend_from_slice(&avro_long_bytes(v));
2677        }
2678        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2679        assert_bytes_eq(&got, &expected);
2680    }
2681
2682    #[test]
2683    #[cfg(feature = "avro_custom_types")]
2684    fn duration_encoding_microseconds() {
2685        let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
2686        let mut expected = Vec::new();
2687        for v in [5i64, -6, 7] {
2688            expected.extend_from_slice(&avro_long_bytes(v));
2689        }
2690        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2691        assert_bytes_eq(&got, &expected);
2692    }
2693
2694    #[test]
2695    #[cfg(feature = "avro_custom_types")]
2696    fn duration_encoding_nanoseconds() {
2697        let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
2698        let mut expected = Vec::new();
2699        for v in [8i64, 9, -10] {
2700            expected.extend_from_slice(&avro_long_bytes(v));
2701        }
2702        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2703        assert_bytes_eq(&got, &expected);
2704    }
2705
2706    #[test]
2707    fn duration_encoder_year_month_happy_path() {
2708        let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
2709        let mut expected = Vec::new();
2710        for m in [0u32, 1u32, 25u32] {
2711            expected.extend_from_slice(&duration_fixed12(m, 0, 0));
2712        }
2713        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2714        assert_bytes_eq(&got, &expected);
2715    }
2716
2717    #[test]
2718    fn duration_encoder_year_month_rejects_negative() {
2719        let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
2720        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2721        let mut out = Vec::new();
2722        let err = enc.encode(&mut out, 0).unwrap_err();
2723        match err {
2724            AvroError::InvalidArgument(msg) => {
2725                assert!(msg.contains("cannot encode negative months"))
2726            }
2727            other => panic!("expected InvalidArgument, got {other:?}"),
2728        }
2729    }
2730
2731    #[test]
2732    fn duration_encoder_day_time_happy_path() {
2733        let v0 = IntervalDayTimeType::make_value(2, 500); // days=2, millis=500
2734        let v1 = IntervalDayTimeType::make_value(0, 0);
2735        let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
2736        let mut expected = Vec::new();
2737        expected.extend_from_slice(&duration_fixed12(0, 2, 500));
2738        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2739        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2740        assert_bytes_eq(&got, &expected);
2741    }
2742
2743    #[test]
2744    fn duration_encoder_day_time_rejects_negative() {
2745        let bad = IntervalDayTimeType::make_value(-1, 0);
2746        let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
2747        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2748        let mut out = Vec::new();
2749        let err = enc.encode(&mut out, 0).unwrap_err();
2750        match err {
2751            AvroError::InvalidArgument(msg) => {
2752                assert!(msg.contains("cannot encode negative days"))
2753            }
2754            other => panic!("expected InvalidArgument, got {other:?}"),
2755        }
2756    }
2757
2758    #[test]
2759    fn duration_encoder_month_day_nano_happy_path() {
2760        let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); // -> millis = 3
2761        let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
2762        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
2763        let mut expected = Vec::new();
2764        expected.extend_from_slice(&duration_fixed12(1, 2, 3));
2765        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2766        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2767        assert_bytes_eq(&got, &expected);
2768    }
2769
2770    #[test]
2771    fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
2772        let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
2773        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
2774        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2775        let mut out = Vec::new();
2776        let err = enc.encode(&mut out, 0).unwrap_err();
2777        match err {
2778            AvroError::InvalidArgument(msg) => {
2779                assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
2780            }
2781            other => panic!("expected InvalidArgument, got {other:?}"),
2782        }
2783    }
2784
2785    #[test]
2786    fn minimal_twos_complement_test() {
2787        let pos = [0x00, 0x00, 0x01];
2788        assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
2789        let neg = [0xFF, 0xFF, 0x80]; // negative minimal is 0x80
2790        assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
2791        let zero = [0x00, 0x00, 0x00];
2792        assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
2793    }
2794
2795    #[test]
2796    fn write_sign_extend_test() {
2797        let mut out = Vec::new();
2798        write_sign_extended(&mut out, &[0x01], 4).unwrap();
2799        assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
2800        out.clear();
2801        write_sign_extended(&mut out, &[0xFF], 4).unwrap();
2802        assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
2803        out.clear();
2804        // truncation success (sign bytes only removed)
2805        write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
2806        assert_eq!(out, vec![0xFF, 0x80]);
2807        out.clear();
2808        // truncation overflow
2809        let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
2810        match err {
2811            AvroError::InvalidArgument(_) => {}
2812            _ => panic!("expected InvalidArgument"),
2813        }
2814    }
2815
2816    #[test]
2817    fn duration_month_day_nano_overflow_millis() {
2818        // nanos leading to millis > u32::MAX
2819        let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
2820        let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
2821        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
2822        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2823        let mut out = Vec::new();
2824        let err = enc.encode(&mut out, 0).unwrap_err();
2825        match err {
2826            AvroError::InvalidArgument(msg) => assert!(msg.contains("exceed u32::MAX")),
2827            _ => panic!("expected InvalidArgument"),
2828        }
2829    }
2830
2831    #[test]
2832    fn fieldplan_decimal_precision_scale_mismatch_errors() {
2833        // Avro expects (10,2), Arrow has (12,2)
2834        use crate::codec::Codec;
2835        use std::collections::HashMap;
2836        let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
2837        let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2838        let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
2839        match err {
2840            AvroError::SchemaError(msg) => {
2841                assert!(msg.contains("Decimal precision/scale mismatch"))
2842            }
2843            _ => panic!("expected SchemaError"),
2844        }
2845    }
2846
2847    #[test]
2848    fn timestamp_micros_encoder() {
2849        // Mirrors the style used by `timestamp_millis_encoder`
2850        test_scalar_primitive_encoding::<TimestampMicrosecondType>(
2851            &[
2852                1_704_067_200_000_000, // 2024-01-01T00:00:00Z in micros
2853                0,                     // epoch
2854                -123_456_789,          // pre-epoch
2855            ],
2856            &[None, Some(1_704_067_200_000_000)],
2857        );
2858    }
2859
2860    #[test]
2861    fn list_encoder_nullable_items_null_first() {
2862        // One List row with three elements: [Some(1), None, Some(2)]
2863        let values = Int32Array::from(vec![Some(1), None, Some(2)]);
2864        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
2865        let list = ListArray::new(
2866            Field::new("item", DataType::Int32, true).into(),
2867            offsets,
2868            Arc::new(values) as ArrayRef,
2869            None,
2870        );
2871
2872        let plan = FieldPlan::List {
2873            items_nullability: Some(Nullability::NullFirst),
2874            item_plan: Box::new(FieldPlan::Scalar),
2875        };
2876
2877        // Avro array encoding per row: one positive block, then 0 terminator.
2878        // For NullFirst: Some(v) => branch 1 (0x02) then the value; None => branch 0 (0x00)
2879        let mut expected = Vec::new();
2880        expected.extend(avro_long_bytes(3)); // block of 3
2881        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
2882        expected.extend(avro_long_bytes(1)); // value 1
2883        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
2884        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
2885        expected.extend(avro_long_bytes(2)); // value 2
2886        expected.extend(avro_long_bytes(0)); // block terminator
2887
2888        let got = encode_all(&list, &plan, None);
2889        assert_bytes_eq(&got, &expected);
2890    }
2891
2892    #[test]
2893    fn large_list_encoder_nullable_items_null_first() {
2894        // LargeList single row: [Some(10), None]
2895        let values = Int32Array::from(vec![Some(10), None]);
2896        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
2897        let list = LargeListArray::new(
2898            Field::new("item", DataType::Int32, true).into(),
2899            offsets,
2900            Arc::new(values) as ArrayRef,
2901            None,
2902        );
2903
2904        let plan = FieldPlan::List {
2905            items_nullability: Some(Nullability::NullFirst),
2906            item_plan: Box::new(FieldPlan::Scalar),
2907        };
2908
2909        let mut expected = Vec::new();
2910        expected.extend(avro_long_bytes(2)); // block of 2
2911        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
2912        expected.extend(avro_long_bytes(10)); // value 10
2913        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
2914        expected.extend(avro_long_bytes(0)); // block terminator
2915
2916        let got = encode_all(&list, &plan, None);
2917        assert_bytes_eq(&got, &expected);
2918    }
2919
2920    #[test]
2921    fn map_encoder_string_keys_nullable_int_values_null_first() {
2922        // One map row: {"k1": Some(7), "k2": None}
2923        let keys = StringArray::from(vec!["k1", "k2"]);
2924        let values = Int32Array::from(vec![Some(7), None]);
2925
2926        let entries_fields = Fields::from(vec![
2927            Field::new("key", DataType::Utf8, false),
2928            Field::new("value", DataType::Int32, true),
2929        ]);
2930        let entries = StructArray::new(
2931            entries_fields,
2932            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2933            None,
2934        );
2935
2936        // Single row -> offsets [0, 2]
2937        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
2938        let map = MapArray::new(
2939            Field::new("entries", entries.data_type().clone(), false).into(),
2940            offsets,
2941            entries,
2942            None,
2943            false,
2944        );
2945
2946        let plan = FieldPlan::Map {
2947            values_nullability: Some(Nullability::NullFirst),
2948            value_plan: Box::new(FieldPlan::Scalar),
2949        };
2950
2951        // Expected:
2952        // - one positive block (len=2)
2953        // - "k1", branch=1 + value=7
2954        // - "k2", branch=0 (null)
2955        // - end-of-block marker 0
2956        let mut expected = Vec::new();
2957        expected.extend(avro_long_bytes(2)); // block length 2
2958        expected.extend(avro_len_prefixed_bytes(b"k1")); // key "k1"
2959        expected.extend(avro_long_bytes(1)); // union branch 1 (value)
2960        expected.extend(avro_long_bytes(7)); // value 7
2961        expected.extend(avro_len_prefixed_bytes(b"k2")); // key "k2"
2962        expected.extend(avro_long_bytes(0)); // union branch 0 (null)
2963        expected.extend(avro_long_bytes(0)); // block terminator
2964
2965        let got = encode_all(&map, &plan, None);
2966        assert_bytes_eq(&got, &expected);
2967    }
2968
2969    #[test]
2970    fn time32_seconds_to_millis_encoder() {
2971        // Time32(Second) must encode as Avro time-millis (ms since midnight).
2972        let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
2973            vec![0i32, 1, -2, 12_345].into();
2974        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2975        let mut expected = Vec::new();
2976        for secs in [0i32, 1, -2, 12_345] {
2977            let millis = (secs as i64) * 1000;
2978            expected.extend_from_slice(&avro_long_bytes(millis));
2979        }
2980        assert_bytes_eq(&got, &expected);
2981    }
2982
2983    #[test]
2984    fn time32_seconds_to_millis_overflow() {
2985        // Choose a value that will overflow i32 when multiplied by 1000.
2986        let overflow_secs: i32 = i32::MAX / 1000 + 1;
2987        let arr: PrimitiveArray<Time32SecondType> = vec![overflow_secs].into();
2988        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2989        let mut out = Vec::new();
2990        let err = enc.encode(&mut out, 0).unwrap_err();
2991        match err {
2992            AvroError::InvalidArgument(msg) => {
2993                assert!(
2994                    msg.contains("overflowed") || msg.contains("overflow"),
2995                    "unexpected message: {msg}"
2996                )
2997            }
2998            other => panic!("expected InvalidArgument, got {other:?}"),
2999        }
3000    }
3001
3002    #[test]
3003    fn timestamp_seconds_to_millis_encoder() {
3004        // Timestamp(Second) must encode as Avro timestamp-millis (ms since epoch).
3005        let arr: PrimitiveArray<TimestampSecondType> = vec![0i64, 1, -1, 1_234_567_890].into();
3006        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3007        let mut expected = Vec::new();
3008        for secs in [0i64, 1, -1, 1_234_567_890] {
3009            let millis = secs * 1000;
3010            expected.extend_from_slice(&avro_long_bytes(millis));
3011        }
3012        assert_bytes_eq(&got, &expected);
3013    }
3014
3015    #[test]
3016    fn timestamp_seconds_to_millis_overflow() {
3017        // Overflow i64 when multiplied by 1000.
3018        let overflow_secs: i64 = i64::MAX / 1000 + 1;
3019        let arr: PrimitiveArray<TimestampSecondType> = vec![overflow_secs].into();
3020        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
3021        let mut out = Vec::new();
3022        let err = enc.encode(&mut out, 0).unwrap_err();
3023        match err {
3024            AvroError::InvalidArgument(msg) => {
3025                assert!(
3026                    msg.contains("overflowed") || msg.contains("overflow"),
3027                    "unexpected message: {msg}"
3028                )
3029            }
3030            other => panic!("expected InvalidArgument, got {other:?}"),
3031        }
3032    }
3033
3034    #[test]
3035    fn timestamp_nanos_encoder() {
3036        let arr: PrimitiveArray<TimestampNanosecondType> = vec![0i64, 1, -1, 123].into();
3037        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3038        let mut expected = Vec::new();
3039        for ns in [0i64, 1, -1, 123] {
3040            expected.extend_from_slice(&avro_long_bytes(ns));
3041        }
3042        assert_bytes_eq(&got, &expected);
3043    }
3044
3045    #[test]
3046    fn union_encoder_string_int_nonzero_type_ids() {
3047        let strings = StringArray::from(vec!["hello", "world"]);
3048        let ints = Int32Array::from(vec![10, 20, 30]);
3049        let union_fields = UnionFields::try_new(
3050            vec![2, 5],
3051            vec![
3052                Field::new("v_str", DataType::Utf8, true),
3053                Field::new("v_int", DataType::Int32, true),
3054            ],
3055        )
3056        .unwrap();
3057        let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
3058        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
3059        let union_array = UnionArray::try_new(
3060            union_fields,
3061            type_ids.into(),
3062            Some(offsets.into()),
3063            vec![Arc::new(strings), Arc::new(ints)],
3064        )
3065        .unwrap();
3066        let plan = FieldPlan::Union {
3067            bindings: vec![
3068                FieldBinding {
3069                    arrow_index: 0,
3070                    nullability: None,
3071                    plan: FieldPlan::Scalar,
3072                },
3073                FieldBinding {
3074                    arrow_index: 1,
3075                    nullability: None,
3076                    plan: FieldPlan::Scalar,
3077                },
3078            ],
3079        };
3080        let got = encode_all(&union_array, &plan, None);
3081        let mut expected = Vec::new();
3082        expected.extend(avro_long_bytes(0));
3083        expected.extend(avro_len_prefixed_bytes(b"hello"));
3084        expected.extend(avro_long_bytes(1));
3085        expected.extend(avro_long_bytes(10));
3086        expected.extend(avro_long_bytes(1));
3087        expected.extend(avro_long_bytes(20));
3088        expected.extend(avro_long_bytes(0));
3089        expected.extend(avro_len_prefixed_bytes(b"world"));
3090        expected.extend(avro_long_bytes(1));
3091        expected.extend(avro_long_bytes(30));
3092        assert_bytes_eq(&got, &expected);
3093    }
3094
3095    #[test]
3096    fn nullable_state_with_null_buffer_and_zero_nulls() {
3097        let values = vec![1i32, 2, 3];
3098        let arr = Int32Array::from_iter_values_with_nulls(values, Some(NullBuffer::new_valid(3)));
3099        assert_eq!(arr.null_count(), 0);
3100        assert!(arr.nulls().is_some());
3101        let plan = FieldPlan::Scalar;
3102        let enc = FieldEncoder::make_encoder(&arr, &plan, Some(Nullability::NullFirst)).unwrap();
3103        match enc.null_state {
3104            NullState::NullableNoNulls { union_value_byte } => {
3105                assert_eq!(
3106                    union_value_byte,
3107                    union_value_branch_byte(Nullability::NullFirst, false)
3108                );
3109            }
3110            other => panic!("expected NullableNoNulls, got {other:?}"),
3111        }
3112    }
3113
3114    #[test]
3115    fn encode_rows_single_column_int32() {
3116        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3117        let arr = Int32Array::from(vec![1, 2, 3]);
3118        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3119        let encoder = RecordEncoder {
3120            columns: vec![FieldBinding {
3121                arrow_index: 0,
3122                nullability: None,
3123                plan: FieldPlan::Scalar,
3124            }],
3125            prefix: None,
3126        };
3127        let mut out = BytesMut::new();
3128        let mut offsets: Vec<usize> = vec![0];
3129        encoder
3130            .encode_rows(&batch, 16, &mut out, &mut offsets)
3131            .unwrap();
3132        assert_eq!(offsets.len(), 4);
3133        assert_eq!(*offsets.last().unwrap(), out.len());
3134        assert_bytes_eq(row_slice(&out, &offsets, 0), &avro_long_bytes(1));
3135        assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(2));
3136        assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(3));
3137    }
3138
3139    #[test]
3140    fn encode_rows_multiple_columns() {
3141        let schema = ArrowSchema::new(vec![
3142            Field::new("a", DataType::Int32, false),
3143            Field::new("b", DataType::Utf8, false),
3144        ]);
3145        let int_arr = Int32Array::from(vec![10, 20]);
3146        let str_arr = StringArray::from(vec!["hello", "world"]);
3147        let batch = RecordBatch::try_new(
3148            Arc::new(schema.clone()),
3149            vec![Arc::new(int_arr), Arc::new(str_arr)],
3150        )
3151        .unwrap();
3152        let encoder = RecordEncoder {
3153            columns: vec![
3154                FieldBinding {
3155                    arrow_index: 0,
3156                    nullability: None,
3157                    plan: FieldPlan::Scalar,
3158                },
3159                FieldBinding {
3160                    arrow_index: 1,
3161                    nullability: None,
3162                    plan: FieldPlan::Scalar,
3163                },
3164            ],
3165            prefix: None,
3166        };
3167        let mut out = BytesMut::new();
3168        let mut offsets: Vec<usize> = vec![0];
3169        encoder
3170            .encode_rows(&batch, 32, &mut out, &mut offsets)
3171            .unwrap();
3172        assert_eq!(offsets.len(), 3);
3173        assert_eq!(*offsets.last().unwrap(), out.len());
3174        let mut expected_row0 = Vec::new();
3175        expected_row0.extend(avro_long_bytes(10));
3176        expected_row0.extend(avro_len_prefixed_bytes(b"hello"));
3177        assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3178        let mut expected_row1 = Vec::new();
3179        expected_row1.extend(avro_long_bytes(20));
3180        expected_row1.extend(avro_len_prefixed_bytes(b"world"));
3181        assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3182    }
3183
3184    #[test]
3185    fn encode_rows_with_prefix() {
3186        use crate::codec::AvroFieldBuilder;
3187        use crate::schema::AvroSchema;
3188        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3189        let arr = Int32Array::from(vec![42]);
3190        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3191        let avro_schema = AvroSchema::try_from(&schema).unwrap();
3192        let fingerprint = avro_schema
3193            .fingerprint(crate::schema::FingerprintAlgorithm::Rabin)
3194            .unwrap();
3195        let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
3196            .build()
3197            .unwrap();
3198        let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
3199            .with_fingerprint(Some(fingerprint))
3200            .build()
3201            .unwrap();
3202        let mut out = BytesMut::new();
3203        let mut offsets: Vec<usize> = vec![0];
3204        encoder
3205            .encode_rows(&batch, 32, &mut out, &mut offsets)
3206            .unwrap();
3207        assert_eq!(offsets.len(), 2);
3208        let row0 = row_slice(&out, &offsets, 0);
3209        assert!(row0.len() > 10, "Row should contain prefix + encoded value");
3210        assert_eq!(row0[0], 0xC3);
3211        assert_eq!(row0[1], 0x01);
3212    }
3213
3214    #[test]
3215    fn encode_rows_empty_batch() {
3216        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3217        let arr = Int32Array::from(Vec::<i32>::new());
3218        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3219        let encoder = RecordEncoder {
3220            columns: vec![FieldBinding {
3221                arrow_index: 0,
3222                nullability: None,
3223                plan: FieldPlan::Scalar,
3224            }],
3225            prefix: None,
3226        };
3227        let mut out = BytesMut::new();
3228        let mut offsets: Vec<usize> = vec![0];
3229        encoder
3230            .encode_rows(&batch, 16, &mut out, &mut offsets)
3231            .unwrap();
3232        assert_eq!(offsets, vec![0]);
3233        assert!(out.is_empty());
3234    }
3235
3236    #[test]
3237    fn encode_rows_matches_encode_output() {
3238        let schema = ArrowSchema::new(vec![
3239            Field::new("a", DataType::Int64, false),
3240            Field::new("b", DataType::Float64, false),
3241        ]);
3242        let int_arr = Int64Array::from(vec![100i64, 200, 300]);
3243        let float_arr = Float64Array::from(vec![1.5, 2.5, 3.5]);
3244        let batch = RecordBatch::try_new(
3245            Arc::new(schema.clone()),
3246            vec![Arc::new(int_arr), Arc::new(float_arr)],
3247        )
3248        .unwrap();
3249        let encoder = RecordEncoder {
3250            columns: vec![
3251                FieldBinding {
3252                    arrow_index: 0,
3253                    nullability: None,
3254                    plan: FieldPlan::Scalar,
3255                },
3256                FieldBinding {
3257                    arrow_index: 1,
3258                    nullability: None,
3259                    plan: FieldPlan::Scalar,
3260                },
3261            ],
3262            prefix: None,
3263        };
3264        let mut stream_buf = Vec::new();
3265        encoder.encode(&mut stream_buf, &batch).unwrap();
3266        let mut out = BytesMut::new();
3267        let mut offsets: Vec<usize> = vec![0];
3268        encoder
3269            .encode_rows(&batch, 32, &mut out, &mut offsets)
3270            .unwrap();
3271        assert_eq!(offsets.len(), 1 + batch.num_rows());
3272        assert_bytes_eq(&out[..], &stream_buf);
3273    }
3274
3275    #[test]
3276    fn encode_rows_appends_to_existing_buffer() {
3277        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3278        let arr = Int32Array::from(vec![5, 6]);
3279        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3280        let encoder = RecordEncoder {
3281            columns: vec![FieldBinding {
3282                arrow_index: 0,
3283                nullability: None,
3284                plan: FieldPlan::Scalar,
3285            }],
3286            prefix: None,
3287        };
3288        let mut out = BytesMut::new();
3289        out.extend_from_slice(&[0xAA, 0xBB]);
3290        let mut offsets: Vec<usize> = vec![0, out.len()];
3291        encoder
3292            .encode_rows(&batch, 16, &mut out, &mut offsets)
3293            .unwrap();
3294        assert_eq!(offsets.len(), 4);
3295        assert_eq!(*offsets.last().unwrap(), out.len());
3296        assert_bytes_eq(row_slice(&out, &offsets, 0), &[0xAA, 0xBB]);
3297        assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(5));
3298        assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(6));
3299    }
3300
3301    #[test]
3302    fn encode_rows_nullable_column() {
3303        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, true)]);
3304        let arr = Int32Array::from(vec![Some(1), None, Some(3)]);
3305        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3306        let encoder = RecordEncoder {
3307            columns: vec![FieldBinding {
3308                arrow_index: 0,
3309                nullability: Some(Nullability::NullFirst),
3310                plan: FieldPlan::Scalar,
3311            }],
3312            prefix: None,
3313        };
3314        let mut out = BytesMut::new();
3315        let mut offsets: Vec<usize> = vec![0];
3316        encoder
3317            .encode_rows(&batch, 16, &mut out, &mut offsets)
3318            .unwrap();
3319        assert_eq!(offsets.len(), 4);
3320        let mut expected_row0 = Vec::new();
3321        expected_row0.extend(avro_long_bytes(1)); // union branch for value
3322        expected_row0.extend(avro_long_bytes(1)); // value
3323        assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3324        let expected_row1 = avro_long_bytes(0); // union branch for null
3325        assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3326        let mut expected_row2 = Vec::new();
3327        expected_row2.extend(avro_long_bytes(1)); // union branch for value
3328        expected_row2.extend(avro_long_bytes(3)); // value
3329        assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2);
3330    }
3331
3332    #[test]
3333    fn encode_prefix_write_error() {
3334        use crate::codec::AvroFieldBuilder;
3335        use crate::schema::{AvroSchema, FingerprintAlgorithm};
3336        use std::io;
3337
3338        struct FailWriter {
3339            failed: bool,
3340        }
3341
3342        impl io::Write for FailWriter {
3343            fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
3344                if !self.failed {
3345                    self.failed = true;
3346                    Err(io::Error::other("fail write"))
3347                } else {
3348                    Ok(0)
3349                }
3350            }
3351
3352            fn flush(&mut self) -> io::Result<()> {
3353                Ok(())
3354            }
3355        }
3356
3357        let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3358        let arr = Int32Array::from(vec![42]);
3359        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3360        let avro_schema = AvroSchema::try_from(&schema).unwrap();
3361        let fingerprint = avro_schema
3362            .fingerprint(FingerprintAlgorithm::Rabin)
3363            .unwrap();
3364        let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
3365            .build()
3366            .unwrap();
3367        let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
3368            .with_fingerprint(Some(fingerprint))
3369            .build()
3370            .unwrap();
3371
3372        let mut writer = FailWriter { failed: false };
3373        let err = encoder.encode(&mut writer, &batch).unwrap_err();
3374        let msg = format!("{err}");
3375        assert!(msg.contains("write prefix"), "unexpected error: {msg}");
3376    }
3377}