arrow_avro/writer/
encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Encoder for Arrow types.
19
20use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::schema::{Fingerprint, Nullability, Prefix};
22use arrow_array::cast::AsArray;
23use arrow_array::types::{
24    ArrowPrimitiveType, Date32Type, DurationMicrosecondType, DurationMillisecondType,
25    DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int16Type, Int32Type,
26    Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
27    Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
28    TimestampMillisecondType,
29};
30use arrow_array::types::{
31    RunEndIndexType, Time32SecondType, TimestampNanosecondType, TimestampSecondType,
32};
33use arrow_array::{
34    Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
35    FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray,
36    GenericListViewArray, GenericStringArray, LargeListArray, LargeListViewArray, ListArray,
37    ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
38    StringViewArray, StructArray, UnionArray,
39};
40#[cfg(feature = "small_decimals")]
41use arrow_array::{Decimal32Array, Decimal64Array};
42use arrow_buffer::{ArrowNativeType, NullBuffer};
43use arrow_schema::{
44    ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode,
45};
46use std::io::Write;
47use std::sync::Arc;
48use uuid::Uuid;
49
50/// Encode a single Avro-`long` using ZigZag + variable length, buffered.
51///
52/// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding>
53#[inline]
54pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), ArrowError> {
55    let mut zz = ((value << 1) ^ (value >> 63)) as u64;
56    // At most 10 bytes for 64-bit varint
57    let mut buf = [0u8; 10];
58    let mut i = 0;
59    while (zz & !0x7F) != 0 {
60        buf[i] = ((zz & 0x7F) as u8) | 0x80;
61        i += 1;
62        zz >>= 7;
63    }
64    buf[i] = (zz & 0x7F) as u8;
65    i += 1;
66    out.write_all(&buf[..i])
67        .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e))
68}
69
70#[inline]
71fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), ArrowError> {
72    write_long(out, value as i64)
73}
74
75#[inline]
76fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
77    write_long(out, bytes.len() as i64)?;
78    out.write_all(bytes)
79        .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
80}
81
82#[inline]
83fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), ArrowError> {
84    out.write_all(&[if v { 1 } else { 0 }])
85        .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
86}
87
88/// Minimal two's-complement big-endian representation helper for Avro decimal (bytes).
89///
90/// For positive numbers, trim leading 0x00 until an essential byte is reached.
91/// For negative numbers, trim leading 0xFF until an essential byte is reached.
92/// The resulting slice still encodes the same signed value.
93///
94/// See Avro spec: decimal over `bytes` uses two's-complement big-endian
95/// representation of the unscaled integer value. 1.11.1 specification.
96#[inline]
97fn minimal_twos_complement(be: &[u8]) -> &[u8] {
98    if be.is_empty() {
99        return be;
100    }
101    let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
102    let mut k = 0usize;
103    while k < be.len() && be[k] == sign_byte {
104        k += 1;
105    }
106    if k == 0 {
107        return be;
108    }
109    if k == be.len() {
110        return &be[be.len() - 1..];
111    }
112    let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
113        k
114    } else {
115        k - 1
116    };
117    &be[drop..]
118}
119
120/// Sign-extend (or validate/truncate) big-endian integer bytes to exactly `n` bytes.
121///
122///
123/// - If shorter than `n`, the slice is sign-extended by left-padding with the
124///   sign byte (`0x00` for positive, `0xFF` for negative).
125/// - If longer than `n`, the slice is truncated from the left. An overflow error
126///   is returned if any of the truncated bytes are not redundant sign bytes,
127///   or if the resulting value's sign bit would differ from the original.
128/// - If the slice is already `n` bytes long, it is copied.
129///
130/// Used for encoding Avro decimal values into `fixed(N)` fields.
131#[inline]
132fn write_sign_extended<W: Write + ?Sized>(
133    out: &mut W,
134    src_be: &[u8],
135    n: usize,
136) -> Result<(), ArrowError> {
137    let len = src_be.len();
138    if len == n {
139        return out
140            .write_all(src_be)
141            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
142    }
143    let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
144        0xFF
145    } else {
146        0x00
147    };
148    if len > n {
149        let extra = len - n;
150        if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
151            return Ok(());
152        }
153        // All truncated bytes must equal the sign byte, and the MSB of the first
154        // retained byte must match the sign (otherwise overflow).
155        if src_be[..extra].iter().any(|&b| b != sign_byte)
156            || ((src_be[extra] ^ sign_byte) & 0x80) != 0
157        {
158            return Err(ArrowError::InvalidArgumentError(format!(
159                "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
160            )));
161        }
162        return out
163            .write_all(&src_be[extra..])
164            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
165    }
166    // len < n: prepend sign bytes (sign extension) then the payload
167    let pad_len = n - len;
168    // Fixed-size stack pads to avoid heap allocation on the hot path
169    const ZPAD: [u8; 64] = [0x00; 64];
170    const FPAD: [u8; 64] = [0xFF; 64];
171    let pad = if sign_byte == 0x00 {
172        &ZPAD[..]
173    } else {
174        &FPAD[..]
175    };
176    // Emit padding in 64‑byte chunks (minimizes write calls without allocating),
177    // then write the original bytes.
178    let mut rem = pad_len;
179    while rem >= pad.len() {
180        out.write_all(pad)
181            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
182        rem -= pad.len();
183    }
184    if rem > 0 {
185        out.write_all(&pad[..rem])
186            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
187    }
188    out.write_all(src_be)
189        .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))
190}
191
192/// Write the union branch index for an optional field.
193///
194/// Branch index is 0-based per Avro unions:
195/// - Null-first (default): null => 0, value => 1
196/// - Null-second (Impala): value => 0, null => 1
197fn write_optional_index<W: Write + ?Sized>(
198    out: &mut W,
199    is_null: bool,
200    null_order: Nullability,
201) -> Result<(), ArrowError> {
202    let byte = union_value_branch_byte(null_order, is_null);
203    out.write_all(&[byte])
204        .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), e))
205}
206
207#[derive(Debug, Clone)]
208enum NullState {
209    NonNullable,
210    NullableNoNulls {
211        union_value_byte: u8,
212    },
213    Nullable {
214        nulls: NullBuffer,
215        null_order: Nullability,
216    },
217}
218
219/// Arrow to Avro FieldEncoder:
220/// - Holds the inner `Encoder` (by value)
221/// - Carries the per-site nullability **state** as a single enum that enforces invariants
222pub(crate) struct FieldEncoder<'a> {
223    encoder: Encoder<'a>,
224    null_state: NullState,
225}
226
227impl<'a> FieldEncoder<'a> {
228    fn make_encoder(
229        array: &'a dyn Array,
230        field: &Field,
231        plan: &FieldPlan,
232        nullability: Option<Nullability>,
233    ) -> Result<Self, ArrowError> {
234        let encoder = match plan {
235            FieldPlan::Scalar => match array.data_type() {
236                DataType::Null => Encoder::Null,
237                DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
238                DataType::Utf8 => {
239                    Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
240                }
241                DataType::LargeUtf8 => {
242                    Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
243                }
244                DataType::Utf8View => {
245                    let arr = array
246                        .as_any()
247                        .downcast_ref::<StringViewArray>()
248                        .ok_or_else(|| {
249                            ArrowError::SchemaError("Expected StringViewArray".into())
250                        })?;
251                    Encoder::Utf8View(Utf8ViewEncoder(arr))
252                }
253                DataType::BinaryView => {
254                    let arr = array
255                        .as_any()
256                        .downcast_ref::<BinaryViewArray>()
257                        .ok_or_else(|| {
258                            ArrowError::SchemaError("Expected BinaryViewArray".into())
259                        })?;
260                    Encoder::BinaryView(BinaryViewEncoder(arr))
261                }
262                DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
263                DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
264                DataType::Date32 => Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
265                DataType::Date64 => {
266                    return Err(ArrowError::NotYetImplemented(
267                        "Avro logical type 'date' is days since epoch (int). Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a Timestamp."
268                            .into(),
269                    ));
270                }
271                DataType::Time32(TimeUnit::Second) => Encoder::Time32SecsToMillis(
272                    Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
273                ),
274                DataType::Time32(TimeUnit::Millisecond) => {
275                    Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
276                }
277                DataType::Time32(TimeUnit::Microsecond) => {
278                    return Err(ArrowError::InvalidArgumentError(
279                        "Arrow Time32 only supports Second or Millisecond. Use Time64 for microseconds."
280                            .into(),
281                    ));
282                }
283                DataType::Time32(TimeUnit::Nanosecond) => {
284                    return Err(ArrowError::InvalidArgumentError(
285                        "Arrow Time32 only supports Second or Millisecond. Use Time64 for nanoseconds."
286                            .into(),
287                    ));
288                }
289                DataType::Time64(TimeUnit::Microsecond) => Encoder::Time64Micros(LongEncoder(
290                    array.as_primitive::<Time64MicrosecondType>(),
291                )),
292                DataType::Time64(TimeUnit::Nanosecond) => {
293                    return Err(ArrowError::NotYetImplemented(
294                        "Avro writer does not support time-nanos; cast to Time64(Microsecond)."
295                            .into(),
296                    ));
297                }
298                DataType::Time64(TimeUnit::Millisecond) => {
299                    return Err(ArrowError::InvalidArgumentError(
300                        "Arrow Time64 with millisecond unit is not a valid Arrow type (use Time32 for millis)."
301                            .into(),
302                    ));
303                }
304                DataType::Time64(TimeUnit::Second) => {
305                    return Err(ArrowError::InvalidArgumentError(
306                        "Arrow Time64 with second unit is not a valid Arrow type (use Time32 for seconds)."
307                            .into(),
308                    ));
309                }
310                DataType::Float32 => {
311                    Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
312                }
313                DataType::Float64 => {
314                    Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
315                }
316                DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
317                DataType::LargeBinary => {
318                    Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
319                }
320                DataType::FixedSizeBinary(_len) => {
321                    let arr = array
322                        .as_any()
323                        .downcast_ref::<FixedSizeBinaryArray>()
324                        .ok_or_else(|| {
325                            ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
326                        })?;
327                    Encoder::Fixed(FixedEncoder(arr))
328                }
329                DataType::Timestamp(unit, _) => match unit {
330                    TimeUnit::Second => {
331                        Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
332                            array.as_primitive::<TimestampSecondType>(),
333                        ))
334                    }
335                    TimeUnit::Millisecond => Encoder::TimestampMillis(LongEncoder(
336                        array.as_primitive::<TimestampMillisecondType>(),
337                    )),
338                    TimeUnit::Microsecond => Encoder::TimestampMicros(LongEncoder(
339                        array.as_primitive::<TimestampMicrosecondType>(),
340                    )),
341                    TimeUnit::Nanosecond => Encoder::TimestampNanos(LongEncoder(
342                        array.as_primitive::<TimestampNanosecondType>(),
343                    )),
344                },
345                DataType::Interval(unit) => match unit {
346                    IntervalUnit::MonthDayNano => Encoder::IntervalMonthDayNano(DurationEncoder(
347                        array.as_primitive::<IntervalMonthDayNanoType>(),
348                    )),
349                    IntervalUnit::YearMonth => Encoder::IntervalYearMonth(DurationEncoder(
350                        array.as_primitive::<IntervalYearMonthType>(),
351                    )),
352                    IntervalUnit::DayTime => Encoder::IntervalDayTime(DurationEncoder(
353                        array.as_primitive::<IntervalDayTimeType>(),
354                    )),
355                },
356                DataType::Duration(tu) => match tu {
357                    TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
358                        array.as_primitive::<DurationSecondType>(),
359                    )),
360                    TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
361                        array.as_primitive::<DurationMillisecondType>(),
362                    )),
363                    TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
364                        array.as_primitive::<DurationMicrosecondType>(),
365                    )),
366                    TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
367                        array.as_primitive::<DurationNanosecondType>(),
368                    )),
369                },
370                other => {
371                    return Err(ArrowError::NotYetImplemented(format!(
372                        "Avro scalar type not yet supported: {other:?}"
373                    )));
374                }
375            },
376            FieldPlan::Struct { bindings } => {
377                let arr = array
378                    .as_any()
379                    .downcast_ref::<StructArray>()
380                    .ok_or_else(|| ArrowError::SchemaError("Expected StructArray".into()))?;
381                Encoder::Struct(Box::new(StructEncoder::try_new(arr, bindings)?))
382            }
383            FieldPlan::List {
384                items_nullability,
385                item_plan,
386            } => match array.data_type() {
387                DataType::List(_) => {
388                    let arr = array
389                        .as_any()
390                        .downcast_ref::<ListArray>()
391                        .ok_or_else(|| ArrowError::SchemaError("Expected ListArray".into()))?;
392                    Encoder::List(Box::new(ListEncoder32::try_new(
393                        arr,
394                        *items_nullability,
395                        item_plan.as_ref(),
396                    )?))
397                }
398                DataType::LargeList(_) => {
399                    let arr = array
400                        .as_any()
401                        .downcast_ref::<LargeListArray>()
402                        .ok_or_else(|| ArrowError::SchemaError("Expected LargeListArray".into()))?;
403                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
404                        arr,
405                        *items_nullability,
406                        item_plan.as_ref(),
407                    )?))
408                }
409                DataType::ListView(_) => {
410                    let arr = array
411                        .as_any()
412                        .downcast_ref::<ListViewArray>()
413                        .ok_or_else(|| ArrowError::SchemaError("Expected ListViewArray".into()))?;
414                    Encoder::ListView(Box::new(ListViewEncoder32::try_new(
415                        arr,
416                        *items_nullability,
417                        item_plan.as_ref(),
418                    )?))
419                }
420                DataType::LargeListView(_) => {
421                    let arr = array
422                        .as_any()
423                        .downcast_ref::<LargeListViewArray>()
424                        .ok_or_else(|| {
425                            ArrowError::SchemaError("Expected LargeListViewArray".into())
426                        })?;
427                    Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
428                        arr,
429                        *items_nullability,
430                        item_plan.as_ref(),
431                    )?))
432                }
433                DataType::FixedSizeList(_, _) => {
434                    let arr = array
435                        .as_any()
436                        .downcast_ref::<FixedSizeListArray>()
437                        .ok_or_else(|| {
438                            ArrowError::SchemaError("Expected FixedSizeListArray".into())
439                        })?;
440                    Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
441                        arr,
442                        *items_nullability,
443                        item_plan.as_ref(),
444                    )?))
445                }
446                other => {
447                    return Err(ArrowError::SchemaError(format!(
448                        "Avro array site requires Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
449                    )));
450                }
451            },
452            FieldPlan::Decimal { size } => match array.data_type() {
453                #[cfg(feature = "small_decimals")]
454                DataType::Decimal32(_, _) => {
455                    let arr = array
456                        .as_any()
457                        .downcast_ref::<Decimal32Array>()
458                        .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?;
459                    Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
460                }
461                #[cfg(feature = "small_decimals")]
462                DataType::Decimal64(_, _) => {
463                    let arr = array
464                        .as_any()
465                        .downcast_ref::<Decimal64Array>()
466                        .ok_or_else(|| ArrowError::SchemaError("Expected Decimal64Array".into()))?;
467                    Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
468                }
469                DataType::Decimal128(_, _) => {
470                    let arr = array
471                        .as_any()
472                        .downcast_ref::<Decimal128Array>()
473                        .ok_or_else(|| {
474                            ArrowError::SchemaError("Expected Decimal128Array".into())
475                        })?;
476                    Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
477                }
478                DataType::Decimal256(_, _) => {
479                    let arr = array
480                        .as_any()
481                        .downcast_ref::<Decimal256Array>()
482                        .ok_or_else(|| {
483                            ArrowError::SchemaError("Expected Decimal256Array".into())
484                        })?;
485                    Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
486                }
487                other => {
488                    return Err(ArrowError::SchemaError(format!(
489                        "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
490                    )));
491                }
492            },
493            FieldPlan::Uuid => {
494                let arr = array
495                    .as_any()
496                    .downcast_ref::<FixedSizeBinaryArray>()
497                    .ok_or_else(|| {
498                        ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
499                    })?;
500                Encoder::Uuid(UuidEncoder(arr))
501            }
502            FieldPlan::Map {
503                values_nullability,
504                value_plan,
505            } => {
506                let arr = array
507                    .as_any()
508                    .downcast_ref::<MapArray>()
509                    .ok_or_else(|| ArrowError::SchemaError("Expected MapArray".into()))?;
510                Encoder::Map(Box::new(MapEncoder::try_new(
511                    arr,
512                    *values_nullability,
513                    value_plan.as_ref(),
514                )?))
515            }
516            FieldPlan::Enum { symbols } => match array.data_type() {
517                DataType::Dictionary(key_dt, value_dt) => {
518                    if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
519                        return Err(ArrowError::SchemaError(
520                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
521                        ));
522                    }
523                    let dict = array
524                        .as_any()
525                        .downcast_ref::<DictionaryArray<Int32Type>>()
526                        .ok_or_else(|| {
527                            ArrowError::SchemaError("Expected DictionaryArray<Int32>".into())
528                        })?;
529                    let values = dict
530                        .values()
531                        .as_any()
532                        .downcast_ref::<StringArray>()
533                        .ok_or_else(|| {
534                            ArrowError::SchemaError("Dictionary values must be Utf8".into())
535                        })?;
536                    if values.len() != symbols.len() {
537                        return Err(ArrowError::SchemaError(format!(
538                            "Enum symbol length {} != dictionary size {}",
539                            symbols.len(),
540                            values.len()
541                        )));
542                    }
543                    for i in 0..values.len() {
544                        if values.value(i) != symbols[i].as_str() {
545                            return Err(ArrowError::SchemaError(format!(
546                                "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
547                                symbols[i],
548                                values.value(i)
549                            )));
550                        }
551                    }
552                    let keys = dict.keys();
553                    Encoder::Enum(EnumEncoder { keys })
554                }
555                other => {
556                    return Err(ArrowError::SchemaError(format!(
557                        "Avro enum site requires DataType::Dictionary, found: {other:?}"
558                    )));
559                }
560            },
561            FieldPlan::Union { bindings } => {
562                let arr = array
563                    .as_any()
564                    .downcast_ref::<UnionArray>()
565                    .ok_or_else(|| ArrowError::SchemaError("Expected UnionArray".into()))?;
566
567                Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
568            }
569            FieldPlan::RunEndEncoded {
570                values_nullability,
571                value_plan,
572            } => {
573                let dt = array.data_type();
574                let values_field = match dt {
575                    DataType::RunEndEncoded(_re_field, v_field) => v_field.as_ref(),
576                    other => {
577                        return Err(ArrowError::SchemaError(format!(
578                            "Avro RunEndEncoded site requires Arrow DataType::RunEndEncoded, found: {other:?}"
579                        )));
580                    }
581                };
582                // Helper closure to build a typed RunEncodedEncoder<R>
583                let build = |run_arr_any: &'a dyn Array| -> Result<Encoder<'a>, ArrowError> {
584                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
585                        let values_enc = prepare_value_site_encoder(
586                            arr.values().as_ref(),
587                            values_field,
588                            *values_nullability,
589                            value_plan.as_ref(),
590                        )?;
591                        return Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
592                            Int16Type,
593                        >::new(
594                            arr, values_enc
595                        ))));
596                    }
597                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
598                        let values_enc = prepare_value_site_encoder(
599                            arr.values().as_ref(),
600                            values_field,
601                            *values_nullability,
602                            value_plan.as_ref(),
603                        )?;
604                        return Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
605                            Int32Type,
606                        >::new(
607                            arr, values_enc
608                        ))));
609                    }
610                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
611                        let values_enc = prepare_value_site_encoder(
612                            arr.values().as_ref(),
613                            values_field,
614                            *values_nullability,
615                            value_plan.as_ref(),
616                        )?;
617                        return Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
618                            Int64Type,
619                        >::new(
620                            arr, values_enc
621                        ))));
622                    }
623                    Err(ArrowError::SchemaError(
624                        "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64"
625                            .into(),
626                    ))
627                };
628                build(array)?
629            }
630        };
631        // Compute the effective null state from writer-declared nullability and data nulls.
632        let null_state = match (nullability, array.null_count() > 0) {
633            (None, false) => NullState::NonNullable,
634            (None, true) => {
635                return Err(ArrowError::InvalidArgumentError(format!(
636                    "Avro site '{}' is non-nullable, but array contains nulls",
637                    field.name()
638                )));
639            }
640            (Some(order), false) => {
641                // Optimization: drop any bitmap; emit a constant "value" branch byte.
642                NullState::NullableNoNulls {
643                    union_value_byte: union_value_branch_byte(order, false),
644                }
645            }
646            (Some(null_order), true) => {
647                let Some(nulls) = array.nulls().cloned() else {
648                    return Err(ArrowError::InvalidArgumentError(format!(
649                        "Array for Avro site '{}' reports nulls but has no null buffer",
650                        field.name()
651                    )));
652                };
653                NullState::Nullable { nulls, null_order }
654            }
655        };
656        Ok(Self {
657            encoder,
658            null_state,
659        })
660    }
661
662    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
663        match &self.null_state {
664            NullState::NonNullable => {}
665            NullState::NullableNoNulls { union_value_byte } => out
666                .write_all(&[*union_value_byte])
667                .map_err(|e| ArrowError::IoError(format!("write union value branch: {e}"), e))?,
668            NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
669                return write_optional_index(out, true, *null_order); // no value to write
670            }
671            NullState::Nullable { null_order, .. } => {
672                write_optional_index(out, false, *null_order)?;
673            }
674        }
675        self.encoder.encode(out, idx)
676    }
677}
678
679fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
680    let nulls_first = null_order == Nullability::default();
681    if nulls_first == is_null { 0x00 } else { 0x02 }
682}
683
684/// Per‑site encoder plan for a field. This mirrors the Avro structure, so nested
685/// optional branch order can be honored exactly as declared by the schema.
686#[derive(Debug, Clone)]
687enum FieldPlan {
688    /// Non-nested scalar/logical type
689    Scalar,
690    /// Record/Struct with Avro‑ordered children
691    Struct { bindings: Vec<FieldBinding> },
692    /// Array with item‑site nullability and nested plan
693    List {
694        items_nullability: Option<Nullability>,
695        item_plan: Box<FieldPlan>,
696    },
697    /// Avro decimal logical type (bytes or fixed). `size=None` => bytes(decimal), `Some(n)` => fixed(n)
698    Decimal { size: Option<usize> },
699    /// Avro UUID logical type (fixed)
700    Uuid,
701    /// Avro map with value‑site nullability and nested plan
702    Map {
703        values_nullability: Option<Nullability>,
704        value_plan: Box<FieldPlan>,
705    },
706    /// Avro enum; maps to Arrow Dictionary<Int32, Utf8> with dictionary values
707    /// exactly equal and ordered as the Avro enum `symbols`.
708    Enum { symbols: Arc<[String]> },
709    /// Avro union, maps to Arrow Union.
710    Union { bindings: Vec<FieldBinding> },
711    /// Avro RunEndEncoded site. Values are encoded per logical row by mapping the
712    /// row index to its containing run and emitting that run's value with `value_plan`.
713    RunEndEncoded {
714        values_nullability: Option<Nullability>,
715        value_plan: Box<FieldPlan>,
716    },
717}
718
719#[derive(Debug, Clone)]
720struct FieldBinding {
721    /// Index of the Arrow field/column associated with this Avro field site
722    arrow_index: usize,
723    /// Nullability/order for this site (None for required fields)
724    nullability: Option<Nullability>,
725    /// Nested plan for this site
726    plan: FieldPlan,
727}
728
729/// Builder for `RecordEncoder` write plan
730#[derive(Debug)]
731pub(crate) struct RecordEncoderBuilder<'a> {
732    avro_root: &'a AvroField,
733    arrow_schema: &'a ArrowSchema,
734    fingerprint: Option<Fingerprint>,
735}
736
737impl<'a> RecordEncoderBuilder<'a> {
738    /// Create a new builder from the Avro root and Arrow schema.
739    pub(crate) fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
740        Self {
741            avro_root,
742            arrow_schema,
743            fingerprint: None,
744        }
745    }
746
747    pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
748        self.fingerprint = fingerprint;
749        self
750    }
751
752    /// Build the `RecordEncoder` by walking the Avro **record** root in Avro order,
753    /// resolving each field to an Arrow index by name.
754    pub(crate) fn build(self) -> Result<RecordEncoder, ArrowError> {
755        let avro_root_dt = self.avro_root.data_type();
756        let Codec::Struct(root_fields) = avro_root_dt.codec() else {
757            return Err(ArrowError::SchemaError(
758                "Top-level Avro schema must be a record/struct".into(),
759            ));
760        };
761        let mut columns = Vec::with_capacity(root_fields.len());
762        for root_field in root_fields.as_ref() {
763            let name = root_field.name();
764            let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
765                ArrowError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
766            })?;
767            columns.push(FieldBinding {
768                arrow_index,
769                nullability: root_field.data_type().nullability(),
770                plan: FieldPlan::build(
771                    root_field.data_type(),
772                    self.arrow_schema.field(arrow_index),
773                )?,
774            });
775        }
776        Ok(RecordEncoder {
777            columns,
778            prefix: self.fingerprint.map(|fp| fp.make_prefix()),
779        })
780    }
781}
782
783/// A pre-computed plan for encoding a `RecordBatch` to Avro.
784///
785/// Derived from an Avro schema and an Arrow schema. It maps
786/// top-level Avro fields to Arrow columns and contains a nested encoding plan
787/// for each column.
788#[derive(Debug, Clone)]
789pub(crate) struct RecordEncoder {
790    columns: Vec<FieldBinding>,
791    /// Optional pre-built, variable-length prefix written before each record.
792    prefix: Option<Prefix>,
793}
794
795impl RecordEncoder {
796    fn prepare_for_batch<'a>(
797        &'a self,
798        batch: &'a RecordBatch,
799    ) -> Result<Vec<FieldEncoder<'a>>, ArrowError> {
800        let schema_binding = batch.schema();
801        let fields = schema_binding.fields();
802        let arrays = batch.columns();
803        let mut out = Vec::with_capacity(self.columns.len());
804        for col_plan in self.columns.iter() {
805            let arrow_index = col_plan.arrow_index;
806            let array = arrays.get(arrow_index).ok_or_else(|| {
807                ArrowError::SchemaError(format!("Column index {arrow_index} out of range"))
808            })?;
809            let field = fields[arrow_index].as_ref();
810            #[cfg(not(feature = "avro_custom_types"))]
811            let site_nullability = match &col_plan.plan {
812                FieldPlan::RunEndEncoded { .. } => None,
813                _ => col_plan.nullability,
814            };
815            #[cfg(feature = "avro_custom_types")]
816            let site_nullability = col_plan.nullability;
817            let encoder = prepare_value_site_encoder(
818                array.as_ref(),
819                field,
820                site_nullability,
821                &col_plan.plan,
822            )?;
823            out.push(encoder);
824        }
825        Ok(out)
826    }
827
828    /// Encode a `RecordBatch` using this encoder plan.
829    ///
830    /// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of many small writes.
831    pub(crate) fn encode<W: Write>(
832        &self,
833        out: &mut W,
834        batch: &RecordBatch,
835    ) -> Result<(), ArrowError> {
836        let mut column_encoders = self.prepare_for_batch(batch)?;
837        let n = batch.num_rows();
838        match self.prefix {
839            Some(prefix) => {
840                for row in 0..n {
841                    out.write_all(prefix.as_slice())
842                        .map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?;
843                    for enc in column_encoders.iter_mut() {
844                        enc.encode(out, row)?;
845                    }
846                }
847            }
848            None => {
849                for row in 0..n {
850                    for enc in column_encoders.iter_mut() {
851                        enc.encode(out, row)?;
852                    }
853                }
854            }
855        }
856        Ok(())
857    }
858}
859
860fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
861    fields.iter().position(|f| f.name() == name)
862}
863
864fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
865    // Prefer common Arrow field names; fall back to second child if exactly two
866    find_struct_child_index(fields, "value")
867        .or_else(|| find_struct_child_index(fields, "values"))
868        .or_else(|| if fields.len() == 2 { Some(1) } else { None })
869}
870
871impl FieldPlan {
872    fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, ArrowError> {
873        #[cfg(not(feature = "avro_custom_types"))]
874        if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() {
875            let values_nullability = avro_dt.nullability();
876            let value_site_dt: &AvroDataType = match avro_dt.codec() {
877                Codec::Union(branches, _, _) => branches
878                    .iter()
879                    .find(|b| !matches!(b.codec(), Codec::Null))
880                    .ok_or_else(|| {
881                        ArrowError::SchemaError(
882                            "Avro union at RunEndEncoded site has no non-null branch".into(),
883                        )
884                    })?,
885                _ => avro_dt,
886            };
887            return Ok(FieldPlan::RunEndEncoded {
888                values_nullability,
889                value_plan: Box::new(FieldPlan::build(value_site_dt, values_field.as_ref())?),
890            });
891        }
892        if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
893            // Extension-based detection (only when the feature is enabled)
894            let ext_is_uuid = {
895                #[cfg(feature = "canonical_extension_types")]
896                {
897                    matches!(
898                        arrow_field.extension_type_name(),
899                        Some("arrow.uuid") | Some("uuid")
900                    )
901                }
902                #[cfg(not(feature = "canonical_extension_types"))]
903                {
904                    false
905                }
906            };
907            let md_is_uuid = arrow_field
908                .metadata()
909                .get("logicalType")
910                .map(|s| s.as_str())
911                == Some("uuid");
912            if ext_is_uuid || md_is_uuid {
913                if *len != 16 {
914                    return Err(ArrowError::InvalidArgumentError(
915                        "logicalType=uuid requires FixedSizeBinary(16)".into(),
916                    ));
917                }
918                return Ok(FieldPlan::Uuid);
919            }
920        }
921        match avro_dt.codec() {
922            Codec::Struct(avro_fields) => {
923                let fields = match arrow_field.data_type() {
924                    DataType::Struct(struct_fields) => struct_fields,
925                    other => {
926                        return Err(ArrowError::SchemaError(format!(
927                            "Avro struct maps to Arrow Struct, found: {other:?}"
928                        )));
929                    }
930                };
931                let mut bindings = Vec::with_capacity(avro_fields.len());
932                for avro_field in avro_fields.iter() {
933                    let name = avro_field.name().to_string();
934                    let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
935                        ArrowError::SchemaError(format!(
936                            "Struct field '{name}' not present in Arrow field '{}'",
937                            arrow_field.name()
938                        ))
939                    })?;
940                    bindings.push(FieldBinding {
941                        arrow_index: idx,
942                        nullability: avro_field.data_type().nullability(),
943                        plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
944                    });
945                }
946                Ok(FieldPlan::Struct { bindings })
947            }
948            Codec::List(items_dt) => match arrow_field.data_type() {
949                DataType::List(field_ref)
950                | DataType::LargeList(field_ref)
951                | DataType::ListView(field_ref)
952                | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
953                    items_nullability: items_dt.nullability(),
954                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
955                }),
956                DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List {
957                    items_nullability: items_dt.nullability(),
958                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
959                }),
960                other => Err(ArrowError::SchemaError(format!(
961                    "Avro array maps to Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
962                ))),
963            },
964            Codec::Map(values_dt) => {
965                let entries_field = match arrow_field.data_type() {
966                    DataType::Map(entries, _sorted) => entries.as_ref(),
967                    other => {
968                        return Err(ArrowError::SchemaError(format!(
969                            "Avro map maps to Arrow DataType::Map, found: {other:?}"
970                        )));
971                    }
972                };
973                let entries_struct_fields = match entries_field.data_type() {
974                    DataType::Struct(fs) => fs,
975                    other => {
976                        return Err(ArrowError::SchemaError(format!(
977                            "Arrow Map entries must be Struct, found: {other:?}"
978                        )));
979                    }
980                };
981                let value_idx =
982                    find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
983                        ArrowError::SchemaError("Map entries struct missing value field".into())
984                    })?;
985                let value_field = entries_struct_fields[value_idx].as_ref();
986                let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
987                Ok(FieldPlan::Map {
988                    values_nullability: values_dt.nullability(),
989                    value_plan: Box::new(value_plan),
990                })
991            }
992            Codec::Enum(symbols) => match arrow_field.data_type() {
993                DataType::Dictionary(key_dt, value_dt) => {
994                    if **key_dt != DataType::Int32 {
995                        return Err(ArrowError::SchemaError(
996                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
997                        ));
998                    }
999                    if **value_dt != DataType::Utf8 {
1000                        return Err(ArrowError::SchemaError(
1001                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
1002                        ));
1003                    }
1004                    Ok(FieldPlan::Enum {
1005                        symbols: symbols.clone(),
1006                    })
1007                }
1008                other => Err(ArrowError::SchemaError(format!(
1009                    "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
1010                ))),
1011            },
1012            // decimal site (bytes or fixed(N)) with precision/scale validation
1013            Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
1014                let (ap, as_) = match arrow_field.data_type() {
1015                    #[cfg(feature = "small_decimals")]
1016                    DataType::Decimal32(p, s) => (*p as usize, *s as i32),
1017                    #[cfg(feature = "small_decimals")]
1018                    DataType::Decimal64(p, s) => (*p as usize, *s as i32),
1019                    DataType::Decimal128(p, s) => (*p as usize, *s as i32),
1020                    DataType::Decimal256(p, s) => (*p as usize, *s as i32),
1021                    other => {
1022                        return Err(ArrowError::SchemaError(format!(
1023                            "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
1024                            arrow_field.name()
1025                        )));
1026                    }
1027                };
1028                let sc = scale_opt.unwrap_or(0) as i32; // Avro scale defaults to 0 if absent
1029                if ap != *precision || as_ != sc {
1030                    return Err(ArrowError::SchemaError(format!(
1031                        "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
1032                        arrow_field.name()
1033                    )));
1034                }
1035                Ok(FieldPlan::Decimal {
1036                    size: *fixed_size_opt,
1037                })
1038            }
1039            Codec::Interval => match arrow_field.data_type() {
1040                DataType::Interval(
1041                    IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime,
1042                ) => Ok(FieldPlan::Scalar),
1043                other => Err(ArrowError::SchemaError(format!(
1044                    "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}"
1045                ))),
1046            },
1047            Codec::Union(avro_branches, _, UnionMode::Dense) => {
1048                let arrow_union_fields = match arrow_field.data_type() {
1049                    DataType::Union(fields, UnionMode::Dense) => fields,
1050                    DataType::Union(_, UnionMode::Sparse) => {
1051                        return Err(ArrowError::NotYetImplemented(
1052                            "Sparse Arrow unions are not yet supported".to_string(),
1053                        ));
1054                    }
1055                    other => {
1056                        return Err(ArrowError::SchemaError(format!(
1057                            "Avro union maps to Arrow Union, found: {other:?}"
1058                        )));
1059                    }
1060                };
1061                if avro_branches.len() != arrow_union_fields.len() {
1062                    return Err(ArrowError::SchemaError(format!(
1063                        "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'",
1064                        avro_branches.len(),
1065                        arrow_union_fields.len(),
1066                        arrow_field.name()
1067                    )));
1068                }
1069                let bindings = avro_branches
1070                    .iter()
1071                    .zip(arrow_union_fields.iter())
1072                    .enumerate()
1073                    .map(|(i, (avro_branch, (_, arrow_child_field)))| {
1074                        Ok(FieldBinding {
1075                            arrow_index: i,
1076                            nullability: avro_branch.nullability(),
1077                            plan: FieldPlan::build(avro_branch, arrow_child_field)?,
1078                        })
1079                    })
1080                    .collect::<Result<Vec<_>, ArrowError>>()?;
1081                Ok(FieldPlan::Union { bindings })
1082            }
1083            Codec::Union(_, _, UnionMode::Sparse) => Err(ArrowError::NotYetImplemented(
1084                "Sparse Arrow unions are not yet supported".to_string(),
1085            )),
1086            #[cfg(feature = "avro_custom_types")]
1087            Codec::RunEndEncoded(values_dt, _width_code) => {
1088                let values_field = match arrow_field.data_type() {
1089                    DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(),
1090                    other => {
1091                        return Err(ArrowError::SchemaError(format!(
1092                            "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}"
1093                        )));
1094                    }
1095                };
1096                Ok(FieldPlan::RunEndEncoded {
1097                    values_nullability: values_dt.nullability(),
1098                    value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), values_field)?),
1099                })
1100            }
1101            _ => Ok(FieldPlan::Scalar),
1102        }
1103    }
1104}
1105
1106enum Encoder<'a> {
1107    Boolean(BooleanEncoder<'a>),
1108    Int(IntEncoder<'a, Int32Type>),
1109    Long(LongEncoder<'a, Int64Type>),
1110    TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
1111    TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
1112    TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
1113    TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
1114    Date32(IntEncoder<'a, Date32Type>),
1115    Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
1116    Time32Millis(IntEncoder<'a, Time32MillisecondType>),
1117    Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
1118    DurationSeconds(LongEncoder<'a, DurationSecondType>),
1119    DurationMillis(LongEncoder<'a, DurationMillisecondType>),
1120    DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
1121    DurationNanos(LongEncoder<'a, DurationNanosecondType>),
1122    Float32(F32Encoder<'a>),
1123    Float64(F64Encoder<'a>),
1124    Binary(BinaryEncoder<'a, i32>),
1125    LargeBinary(BinaryEncoder<'a, i64>),
1126    Utf8(Utf8Encoder<'a>),
1127    Utf8Large(Utf8LargeEncoder<'a>),
1128    Utf8View(Utf8ViewEncoder<'a>),
1129    BinaryView(BinaryViewEncoder<'a>),
1130    List(Box<ListEncoder32<'a>>),
1131    LargeList(Box<ListEncoder64<'a>>),
1132    ListView(Box<ListViewEncoder32<'a>>),
1133    LargeListView(Box<ListViewEncoder64<'a>>),
1134    FixedSizeList(Box<FixedSizeListEncoder<'a>>),
1135    Struct(Box<StructEncoder<'a>>),
1136    /// Avro `fixed` encoder (raw bytes, no length)
1137    Fixed(FixedEncoder<'a>),
1138    /// Avro `uuid` logical type encoder (string with RFC‑4122 hyphenated text)
1139    Uuid(UuidEncoder<'a>),
1140    /// Avro `duration` logical type (Arrow Interval(MonthDayNano)) encoder
1141    IntervalMonthDayNano(DurationEncoder<'a, IntervalMonthDayNanoType>),
1142    /// Avro `duration` logical type (Arrow Interval(YearMonth)) encoder
1143    IntervalYearMonth(DurationEncoder<'a, IntervalYearMonthType>),
1144    /// Avro `duration` logical type (Arrow Interval(DayTime)) encoder
1145    IntervalDayTime(DurationEncoder<'a, IntervalDayTimeType>),
1146    #[cfg(feature = "small_decimals")]
1147    Decimal32(Decimal32Encoder<'a>),
1148    #[cfg(feature = "small_decimals")]
1149    Decimal64(Decimal64Encoder<'a>),
1150    Decimal128(Decimal128Encoder<'a>),
1151    Decimal256(Decimal256Encoder<'a>),
1152    /// Avro `enum` encoder: writes the key (int) as the enum index.
1153    Enum(EnumEncoder<'a>),
1154    Map(Box<MapEncoder<'a>>),
1155    Union(Box<UnionEncoder<'a>>),
1156    /// Run-end encoded values with specific run-end index widths
1157    RunEncoded16(Box<RunEncodedEncoder16<'a>>),
1158    RunEncoded32(Box<RunEncodedEncoder32<'a>>),
1159    RunEncoded64(Box<RunEncodedEncoder64<'a>>),
1160    Null,
1161}
1162
1163impl<'a> Encoder<'a> {
1164    /// Encode the value at `idx`.
1165    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1166        match self {
1167            Encoder::Boolean(e) => e.encode(out, idx),
1168            Encoder::Int(e) => e.encode(out, idx),
1169            Encoder::Long(e) => e.encode(out, idx),
1170            Encoder::TimestampMicros(e) => e.encode(out, idx),
1171            Encoder::TimestampMillis(e) => e.encode(out, idx),
1172            Encoder::TimestampNanos(e) => e.encode(out, idx),
1173            Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
1174            Encoder::Date32(e) => e.encode(out, idx),
1175            Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
1176            Encoder::Time32Millis(e) => e.encode(out, idx),
1177            Encoder::Time64Micros(e) => e.encode(out, idx),
1178            Encoder::DurationSeconds(e) => e.encode(out, idx),
1179            Encoder::DurationMicros(e) => e.encode(out, idx),
1180            Encoder::DurationMillis(e) => e.encode(out, idx),
1181            Encoder::DurationNanos(e) => e.encode(out, idx),
1182            Encoder::Float32(e) => e.encode(out, idx),
1183            Encoder::Float64(e) => e.encode(out, idx),
1184            Encoder::Binary(e) => e.encode(out, idx),
1185            Encoder::LargeBinary(e) => e.encode(out, idx),
1186            Encoder::Utf8(e) => e.encode(out, idx),
1187            Encoder::Utf8Large(e) => e.encode(out, idx),
1188            Encoder::Utf8View(e) => e.encode(out, idx),
1189            Encoder::BinaryView(e) => e.encode(out, idx),
1190            Encoder::List(e) => e.encode(out, idx),
1191            Encoder::LargeList(e) => e.encode(out, idx),
1192            Encoder::ListView(e) => e.encode(out, idx),
1193            Encoder::LargeListView(e) => e.encode(out, idx),
1194            Encoder::FixedSizeList(e) => e.encode(out, idx),
1195            Encoder::Struct(e) => e.encode(out, idx),
1196            Encoder::Fixed(e) => (e).encode(out, idx),
1197            Encoder::Uuid(e) => (e).encode(out, idx),
1198            Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx),
1199            Encoder::IntervalYearMonth(e) => (e).encode(out, idx),
1200            Encoder::IntervalDayTime(e) => (e).encode(out, idx),
1201            #[cfg(feature = "small_decimals")]
1202            Encoder::Decimal32(e) => (e).encode(out, idx),
1203            #[cfg(feature = "small_decimals")]
1204            Encoder::Decimal64(e) => (e).encode(out, idx),
1205            Encoder::Decimal128(e) => (e).encode(out, idx),
1206            Encoder::Decimal256(e) => (e).encode(out, idx),
1207            Encoder::Map(e) => (e).encode(out, idx),
1208            Encoder::Enum(e) => (e).encode(out, idx),
1209            Encoder::Union(e) => (e).encode(out, idx),
1210            Encoder::RunEncoded16(e) => (e).encode(out, idx),
1211            Encoder::RunEncoded32(e) => (e).encode(out, idx),
1212            Encoder::RunEncoded64(e) => (e).encode(out, idx),
1213            Encoder::Null => Ok(()),
1214        }
1215    }
1216}
1217
1218struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
1219impl BooleanEncoder<'_> {
1220    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1221        write_bool(out, self.0.value(idx))
1222    }
1223}
1224
1225/// Generic Avro `int` encoder for primitive arrays with `i32` native values.
1226struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
1227impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
1228    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1229        write_int(out, self.0.value(idx))
1230    }
1231}
1232
1233/// Generic Avro `long` encoder for primitive arrays with `i64` native values.
1234struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
1235impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
1236    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1237        write_long(out, self.0.value(idx))
1238    }
1239}
1240
1241/// Time32(Second) to Avro time-millis (int), via safe scaling by 1000
1242struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
1243impl<'a> Time32SecondsToMillisEncoder<'a> {
1244    #[inline]
1245    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1246        let secs = self.0.value(idx);
1247        let millis = secs.checked_mul(1000).ok_or_else(|| {
1248            ArrowError::InvalidArgumentError("time32(secs) * 1000 overflowed".into())
1249        })?;
1250        write_int(out, millis)
1251    }
1252}
1253
1254/// Timestamp(Second) to Avro timestamp-millis (long), via safe scaling by 1000
1255struct TimestampSecondsToMillisEncoder<'a>(&'a PrimitiveArray<TimestampSecondType>);
1256impl<'a> TimestampSecondsToMillisEncoder<'a> {
1257    #[inline]
1258    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1259        let secs = self.0.value(idx);
1260        let millis = secs.checked_mul(1000).ok_or_else(|| {
1261            ArrowError::InvalidArgumentError("timestamp(secs) * 1000 overflowed".into())
1262        })?;
1263        write_long(out, millis)
1264    }
1265}
1266
1267/// Unified binary encoder generic over offset size (i32/i64).
1268struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
1269impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
1270    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1271        write_len_prefixed(out, self.0.value(idx))
1272    }
1273}
1274
1275/// BinaryView (byte view) encoder.
1276struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
1277impl BinaryViewEncoder<'_> {
1278    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1279        write_len_prefixed(out, self.0.value(idx))
1280    }
1281}
1282
1283/// StringView encoder.
1284struct Utf8ViewEncoder<'a>(&'a StringViewArray);
1285impl Utf8ViewEncoder<'_> {
1286    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1287        write_len_prefixed(out, self.0.value(idx).as_bytes())
1288    }
1289}
1290
1291struct F32Encoder<'a>(&'a arrow_array::Float32Array);
1292impl F32Encoder<'_> {
1293    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1294        // Avro float: 4 bytes, IEEE-754 little-endian
1295        let bits = self.0.value(idx).to_bits();
1296        out.write_all(&bits.to_le_bytes())
1297            .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e))
1298    }
1299}
1300
1301struct F64Encoder<'a>(&'a arrow_array::Float64Array);
1302impl F64Encoder<'_> {
1303    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1304        // Avro double: 8 bytes, IEEE-754 little-endian
1305        let bits = self.0.value(idx).to_bits();
1306        out.write_all(&bits.to_le_bytes())
1307            .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
1308    }
1309}
1310
1311struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
1312
1313impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
1314    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1315        write_len_prefixed(out, self.0.value(idx).as_bytes())
1316    }
1317}
1318
1319type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
1320type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
1321
1322/// Internal key array kind used by Map encoder.
1323enum KeyKind<'a> {
1324    Utf8(&'a GenericStringArray<i32>),
1325    LargeUtf8(&'a GenericStringArray<i64>),
1326}
1327struct MapEncoder<'a> {
1328    map: &'a MapArray,
1329    keys: KeyKind<'a>,
1330    values: FieldEncoder<'a>,
1331    keys_offset: usize,
1332    values_offset: usize,
1333}
1334
1335impl<'a> MapEncoder<'a> {
1336    fn try_new(
1337        map: &'a MapArray,
1338        values_nullability: Option<Nullability>,
1339        value_plan: &FieldPlan,
1340    ) -> Result<Self, ArrowError> {
1341        let keys_arr = map.keys();
1342        let keys_kind = match keys_arr.data_type() {
1343            DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
1344            DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
1345            other => {
1346                return Err(ArrowError::SchemaError(format!(
1347                    "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
1348                )));
1349            }
1350        };
1351
1352        let entries_struct_fields = match map.data_type() {
1353            DataType::Map(entries, _) => match entries.data_type() {
1354                DataType::Struct(fs) => fs,
1355                other => {
1356                    return Err(ArrowError::SchemaError(format!(
1357                        "Arrow Map entries must be Struct, found: {other:?}"
1358                    )));
1359                }
1360            },
1361            _ => {
1362                return Err(ArrowError::SchemaError(
1363                    "Expected MapArray with DataType::Map".into(),
1364                ));
1365            }
1366        };
1367
1368        let v_idx = find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
1369            ArrowError::SchemaError("Map entries struct missing value field".into())
1370        })?;
1371        let value_field = entries_struct_fields[v_idx].as_ref();
1372
1373        let values_enc = prepare_value_site_encoder(
1374            map.values().as_ref(),
1375            value_field,
1376            values_nullability,
1377            value_plan,
1378        )?;
1379
1380        Ok(Self {
1381            map,
1382            keys: keys_kind,
1383            values: values_enc,
1384            keys_offset: keys_arr.offset(),
1385            values_offset: map.values().offset(),
1386        })
1387    }
1388
1389    fn encode_map_entries<W, O>(
1390        out: &mut W,
1391        keys: &GenericStringArray<O>,
1392        keys_offset: usize,
1393        start: usize,
1394        end: usize,
1395        mut write_item: impl FnMut(&mut W, usize) -> Result<(), ArrowError>,
1396    ) -> Result<(), ArrowError>
1397    where
1398        W: Write + ?Sized,
1399        O: OffsetSizeTrait,
1400    {
1401        encode_blocked_range(out, start, end, |out, j| {
1402            let j_key = j.saturating_sub(keys_offset);
1403            write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1404            write_item(out, j)
1405        })
1406    }
1407
1408    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1409        let offsets = self.map.offsets();
1410        let start = offsets[idx] as usize;
1411        let end = offsets[idx + 1] as usize;
1412        let write_item = |out: &mut W, j: usize| {
1413            let j_val = j.saturating_sub(self.values_offset);
1414            self.values.encode(out, j_val)
1415        };
1416        match self.keys {
1417            KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1418                out,
1419                arr,
1420                self.keys_offset,
1421                start,
1422                end,
1423                write_item,
1424            ),
1425            KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1426                out,
1427                arr,
1428                self.keys_offset,
1429                start,
1430                end,
1431                write_item,
1432            ),
1433        }
1434    }
1435}
1436
1437/// Avro `enum` encoder for Arrow `DictionaryArray<Int32, Utf8>`.
1438///
1439/// Per Avro spec, an enum is encoded as an **int** equal to the
1440/// zero-based position of the symbol in the schema’s `symbols` list.
1441/// We validate at construction that the dictionary values equal the symbols,
1442/// so we can directly write the key value here.
1443struct EnumEncoder<'a> {
1444    keys: &'a PrimitiveArray<Int32Type>,
1445}
1446impl EnumEncoder<'_> {
1447    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), ArrowError> {
1448        write_int(out, self.keys.value(row))
1449    }
1450}
1451
1452struct UnionEncoder<'a> {
1453    encoders: Vec<FieldEncoder<'a>>,
1454    array: &'a UnionArray,
1455}
1456
1457impl<'a> UnionEncoder<'a> {
1458    fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result<Self, ArrowError> {
1459        let DataType::Union(fields, UnionMode::Dense) = array.data_type() else {
1460            return Err(ArrowError::SchemaError("Expected Dense UnionArray".into()));
1461        };
1462
1463        if fields.len() != field_bindings.len() {
1464            return Err(ArrowError::SchemaError(format!(
1465                "Mismatched number of union branches between Arrow array ({}) and encoding plan ({})",
1466                fields.len(),
1467                field_bindings.len()
1468            )));
1469        }
1470        let mut encoders = Vec::with_capacity(fields.len());
1471        for (type_id, field_ref) in fields.iter() {
1472            let binding = field_bindings
1473                .get(type_id as usize)
1474                .ok_or_else(|| ArrowError::SchemaError("Binding and field mismatch".to_string()))?;
1475
1476            let child = array.child(type_id).as_ref();
1477
1478            let encoder = prepare_value_site_encoder(
1479                child,
1480                field_ref.as_ref(),
1481                binding.nullability,
1482                &binding.plan,
1483            )?;
1484            encoders.push(encoder);
1485        }
1486        Ok(Self { encoders, array })
1487    }
1488
1489    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1490        let type_id = self.array.type_ids()[idx];
1491        let branch_index = type_id as usize;
1492        write_int(out, type_id as i32)?;
1493        let child_row = self.array.value_offset(idx);
1494
1495        let encoder = self
1496            .encoders
1497            .get_mut(branch_index)
1498            .ok_or_else(|| ArrowError::SchemaError(format!("Invalid type_id {type_id}")))?;
1499
1500        encoder.encode(out, child_row)
1501    }
1502}
1503
1504struct StructEncoder<'a> {
1505    encoders: Vec<FieldEncoder<'a>>,
1506}
1507
1508impl<'a> StructEncoder<'a> {
1509    fn try_new(
1510        array: &'a StructArray,
1511        field_bindings: &[FieldBinding],
1512    ) -> Result<Self, ArrowError> {
1513        let DataType::Struct(fields) = array.data_type() else {
1514            return Err(ArrowError::SchemaError("Expected Struct".into()));
1515        };
1516        let mut encoders = Vec::with_capacity(field_bindings.len());
1517        for field_binding in field_bindings {
1518            let idx = field_binding.arrow_index;
1519            let column = array.columns().get(idx).ok_or_else(|| {
1520                ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1521            })?;
1522            let field = fields.get(idx).ok_or_else(|| {
1523                ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1524            })?;
1525            let encoder = prepare_value_site_encoder(
1526                column.as_ref(),
1527                field,
1528                field_binding.nullability,
1529                &field_binding.plan,
1530            )?;
1531            encoders.push(encoder);
1532        }
1533        Ok(Self { encoders })
1534    }
1535
1536    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1537        for encoder in self.encoders.iter_mut() {
1538            encoder.encode(out, idx)?;
1539        }
1540        Ok(())
1541    }
1542}
1543
1544/// Encode a blocked range of items with Avro array block framing.
1545///
1546/// `write_item` must take `(out, index)` to maintain the "out-first" convention.
1547fn encode_blocked_range<W: Write + ?Sized, F>(
1548    out: &mut W,
1549    start: usize,
1550    end: usize,
1551    mut write_item: F,
1552) -> Result<(), ArrowError>
1553where
1554    F: FnMut(&mut W, usize) -> Result<(), ArrowError>,
1555{
1556    let len = end.saturating_sub(start);
1557    if len == 0 {
1558        // Zero-length terminator per Avro spec.
1559        write_long(out, 0)?;
1560        return Ok(());
1561    }
1562    // Emit a single positive block for performance, then the end marker.
1563    write_long(out, len as i64)?;
1564    for row in start..end {
1565        write_item(out, row)?;
1566    }
1567    write_long(out, 0)?;
1568    Ok(())
1569}
1570
1571struct ListEncoder<'a, O: OffsetSizeTrait> {
1572    list: &'a GenericListArray<O>,
1573    values: FieldEncoder<'a>,
1574    values_offset: usize,
1575}
1576
1577type ListEncoder32<'a> = ListEncoder<'a, i32>;
1578type ListEncoder64<'a> = ListEncoder<'a, i64>;
1579
1580impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
1581    fn try_new(
1582        list: &'a GenericListArray<O>,
1583        items_nullability: Option<Nullability>,
1584        item_plan: &FieldPlan,
1585    ) -> Result<Self, ArrowError> {
1586        let child_field = match list.data_type() {
1587            DataType::List(field) => field.as_ref(),
1588            DataType::LargeList(field) => field.as_ref(),
1589            _ => {
1590                return Err(ArrowError::SchemaError(
1591                    "Expected List or LargeList for ListEncoder".into(),
1592                ));
1593            }
1594        };
1595        let values_enc = prepare_value_site_encoder(
1596            list.values().as_ref(),
1597            child_field,
1598            items_nullability,
1599            item_plan,
1600        )?;
1601        Ok(Self {
1602            list,
1603            values: values_enc,
1604            values_offset: list.values().offset(),
1605        })
1606    }
1607
1608    fn encode_list_range<W: Write + ?Sized>(
1609        &mut self,
1610        out: &mut W,
1611        start: usize,
1612        end: usize,
1613    ) -> Result<(), ArrowError> {
1614        encode_blocked_range(out, start, end, |out, row| {
1615            self.values
1616                .encode(out, row.saturating_sub(self.values_offset))
1617        })
1618    }
1619
1620    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1621        let offsets = self.list.offsets();
1622        let start = offsets[idx].to_usize().ok_or_else(|| {
1623            ArrowError::InvalidArgumentError(format!("Error converting offset[{idx}] to usize"))
1624        })?;
1625        let end = offsets[idx + 1].to_usize().ok_or_else(|| {
1626            ArrowError::InvalidArgumentError(format!(
1627                "Error converting offset[{}] to usize",
1628                idx + 1
1629            ))
1630        })?;
1631        self.encode_list_range(out, start, end)
1632    }
1633}
1634
1635/// ListView encoder using `(offset, size)` buffers.
1636struct ListViewEncoder<'a, O: OffsetSizeTrait> {
1637    list: &'a GenericListViewArray<O>,
1638    values: FieldEncoder<'a>,
1639    values_offset: usize,
1640}
1641type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
1642type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
1643
1644impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
1645    fn try_new(
1646        list: &'a GenericListViewArray<O>,
1647        items_nullability: Option<Nullability>,
1648        item_plan: &FieldPlan,
1649    ) -> Result<Self, ArrowError> {
1650        let child_field = match list.data_type() {
1651            DataType::ListView(field) => field.as_ref(),
1652            DataType::LargeListView(field) => field.as_ref(),
1653            _ => {
1654                return Err(ArrowError::SchemaError(
1655                    "Expected ListView or LargeListView for ListViewEncoder".into(),
1656                ));
1657            }
1658        };
1659        let values_enc = prepare_value_site_encoder(
1660            list.values().as_ref(),
1661            child_field,
1662            items_nullability,
1663            item_plan,
1664        )?;
1665        Ok(Self {
1666            list,
1667            values: values_enc,
1668            values_offset: list.values().offset(),
1669        })
1670    }
1671
1672    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1673        let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
1674            ArrowError::InvalidArgumentError(format!(
1675                "Error converting value_offset[{idx}] to usize"
1676            ))
1677        })?;
1678        let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
1679            ArrowError::InvalidArgumentError(format!("Error converting value_size[{idx}] to usize"))
1680        })?;
1681        let start = start + self.values_offset;
1682        let end = start + len;
1683        encode_blocked_range(out, start, end, |out, row| {
1684            self.values
1685                .encode(out, row.saturating_sub(self.values_offset))
1686        })
1687    }
1688}
1689
1690/// FixedSizeList encoder.
1691struct FixedSizeListEncoder<'a> {
1692    list: &'a FixedSizeListArray,
1693    values: FieldEncoder<'a>,
1694    values_offset: usize,
1695    elem_len: usize,
1696}
1697
1698impl<'a> FixedSizeListEncoder<'a> {
1699    fn try_new(
1700        list: &'a FixedSizeListArray,
1701        items_nullability: Option<Nullability>,
1702        item_plan: &FieldPlan,
1703    ) -> Result<Self, ArrowError> {
1704        let child_field = match list.data_type() {
1705            DataType::FixedSizeList(field, _len) => field.as_ref(),
1706            _ => {
1707                return Err(ArrowError::SchemaError(
1708                    "Expected FixedSizeList for FixedSizeListEncoder".into(),
1709                ));
1710            }
1711        };
1712        let values_enc = prepare_value_site_encoder(
1713            list.values().as_ref(),
1714            child_field,
1715            items_nullability,
1716            item_plan,
1717        )?;
1718        Ok(Self {
1719            list,
1720            values: values_enc,
1721            values_offset: list.values().offset(),
1722            elem_len: list.value_length() as usize,
1723        })
1724    }
1725
1726    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1727        // Starting index is relative to values() start
1728        let rel = self.list.value_offset(idx) as usize;
1729        let start = self.values_offset + rel;
1730        let end = start + self.elem_len;
1731        encode_blocked_range(out, start, end, |out, row| {
1732            self.values
1733                .encode(out, row.saturating_sub(self.values_offset))
1734        })
1735    }
1736}
1737
1738fn prepare_value_site_encoder<'a>(
1739    values_array: &'a dyn Array,
1740    value_field: &Field,
1741    nullability: Option<Nullability>,
1742    plan: &FieldPlan,
1743) -> Result<FieldEncoder<'a>, ArrowError> {
1744    // Effective nullability is computed here from the writer-declared site nullability and data.
1745    FieldEncoder::make_encoder(values_array, value_field, plan, nullability)
1746}
1747
1748/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`.
1749/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
1750struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
1751impl FixedEncoder<'_> {
1752    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1753        let v = self.0.value(idx); // &[u8] of fixed width
1754        out.write_all(v)
1755            .map_err(|e| ArrowError::IoError(format!("write fixed bytes: {e}"), e))
1756    }
1757}
1758
1759/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) to Avro string (UUID).
1760/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated form.
1761struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
1762impl UuidEncoder<'_> {
1763    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1764        let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
1765        buf[0] = 0x48;
1766        let v = self.0.value(idx);
1767        let u = Uuid::from_slice(v)
1768            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid UUID bytes: {e}")))?;
1769        let _ = u.hyphenated().encode_lower(&mut buf[1..]);
1770        out.write_all(&buf)
1771            .map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e))
1772    }
1773}
1774
1775#[derive(Copy, Clone)]
1776struct DurationParts {
1777    months: u32,
1778    days: u32,
1779    millis: u32,
1780}
1781/// Trait mapping an Arrow interval native value to Avro duration `(months, days, millis)`.
1782trait IntervalToDurationParts: ArrowPrimitiveType {
1783    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError>;
1784}
1785impl IntervalToDurationParts for IntervalMonthDayNanoType {
1786    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1787        let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
1788        if months < 0 || days < 0 || nanos < 0 {
1789            return Err(ArrowError::InvalidArgumentError(
1790                "Avro 'duration' cannot encode negative months/days/nanoseconds".into(),
1791            ));
1792        }
1793        if nanos % 1_000_000 != 0 {
1794            return Err(ArrowError::InvalidArgumentError(
1795                "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000"
1796                    .into(),
1797            ));
1798        }
1799        let millis = nanos / 1_000_000;
1800        if millis > u32::MAX as i64 {
1801            return Err(ArrowError::InvalidArgumentError(
1802                "Avro 'duration' milliseconds exceed u32::MAX".into(),
1803            ));
1804        }
1805        Ok(DurationParts {
1806            months: months as u32,
1807            days: days as u32,
1808            millis: millis as u32,
1809        })
1810    }
1811}
1812impl IntervalToDurationParts for IntervalYearMonthType {
1813    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1814        if native < 0 {
1815            return Err(ArrowError::InvalidArgumentError(
1816                "Avro 'duration' cannot encode negative months".into(),
1817            ));
1818        }
1819        Ok(DurationParts {
1820            months: native as u32,
1821            days: 0,
1822            millis: 0,
1823        })
1824    }
1825}
1826impl IntervalToDurationParts for IntervalDayTimeType {
1827    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1828        let (days, millis) = IntervalDayTimeType::to_parts(native);
1829        if days < 0 || millis < 0 {
1830            return Err(ArrowError::InvalidArgumentError(
1831                "Avro 'duration' cannot encode negative days or milliseconds".into(),
1832            ));
1833        }
1834        Ok(DurationParts {
1835            months: 0,
1836            days: days as u32,
1837            millis: millis as u32,
1838        })
1839    }
1840}
1841
1842/// Single generic encoder used for all three interval units.
1843/// Writes Avro `fixed(12)` as three little-endian u32 values in one call.
1844struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
1845impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
1846    #[inline(always)]
1847    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1848        let parts = P::duration_parts(self.0.value(idx))?;
1849        let months = parts.months.to_le_bytes();
1850        let days = parts.days.to_le_bytes();
1851        let ms = parts.millis.to_le_bytes();
1852        // SAFETY
1853        // - Endianness & layout: Avro's `duration` logical type is encoded as fixed(12)
1854        //   with three *little-endian* unsigned 32-bit integers in order: (months, days, millis).
1855        //   We explicitly materialize exactly those 12 bytes.
1856        // - In-bounds indexing: `to_le_bytes()` on `u32` returns `[u8; 4]` by contract,
1857        //   therefore, the constant indices 0..=3 used below are *always* in-bounds.
1858        //   Rust will panic on out-of-bounds indexing, but there is no such path here;
1859        //   the compiler can also elide the bound checks for constant, provably in-range
1860        //   indices. [std docs; Rust Performance Book on bounds-check elimination]
1861        // - Memory safety: The `[u8; 12]` array is built on the stack by value, with no
1862        //   aliasing and no uninitialized memory. There is no `unsafe`.
1863        // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated and mapped
1864        //   into `ArrowError`, so I/O errors are reported, not panicked.
1865        // Consequently, constructing `buf` with the constant indices below is safe and
1866        // panic-free under these validated preconditions.
1867        let buf = [
1868            months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
1869            ms[1], ms[2], ms[3],
1870        ];
1871        out.write_all(&buf)
1872            .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e))
1873    }
1874}
1875
1876/// Minimal trait to obtain a big-endian fixed-size byte array for a decimal's
1877/// unscaled integer value at `idx`.
1878trait DecimalBeBytes<const N: usize> {
1879    fn value_be_bytes(&self, idx: usize) -> [u8; N];
1880}
1881#[cfg(feature = "small_decimals")]
1882impl DecimalBeBytes<4> for Decimal32Array {
1883    fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
1884        self.value(idx).to_be_bytes()
1885    }
1886}
1887#[cfg(feature = "small_decimals")]
1888impl DecimalBeBytes<8> for Decimal64Array {
1889    fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
1890        self.value(idx).to_be_bytes()
1891    }
1892}
1893impl DecimalBeBytes<16> for Decimal128Array {
1894    fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
1895        self.value(idx).to_be_bytes()
1896    }
1897}
1898impl DecimalBeBytes<32> for Decimal256Array {
1899    fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
1900        // Arrow i256 → [u8; 32] big-endian
1901        self.value(idx).to_be_bytes()
1902    }
1903}
1904
1905/// Generic Avro decimal encoder over Arrow decimal arrays.
1906/// - When `fixed_size` is `None` → Avro `bytes(decimal)`; writes the minimal
1907///   two's-complement representation with a length prefix.
1908/// - When `Some(n)` → Avro `fixed(n, decimal)`; sign-extends (or validates)
1909///   to exactly `n` bytes and writes them directly.
1910struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
1911    arr: &'a A,
1912    fixed_size: Option<usize>,
1913}
1914
1915impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
1916    fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
1917        Self { arr, fixed_size }
1918    }
1919
1920    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1921        let be = self.arr.value_be_bytes(idx);
1922        match self.fixed_size {
1923            Some(n) => write_sign_extended(out, &be, n),
1924            None => write_len_prefixed(out, minimal_twos_complement(&be)),
1925        }
1926    }
1927}
1928
1929#[cfg(feature = "small_decimals")]
1930type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
1931#[cfg(feature = "small_decimals")]
1932type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
1933type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
1934type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
1935
1936/// Generic encoder for Arrow `RunArray<R>`-based sites (run-end encoded).
1937/// Follows the pattern used by other generic encoders (i.e., `ListEncoder<O>`),
1938/// avoiding runtime branching on run-end width.
1939struct RunEncodedEncoder<'a, R: RunEndIndexType> {
1940    ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
1941    base: usize,
1942    len: usize,
1943    values: FieldEncoder<'a>,
1944    // Cached run index used for sequential scans of rows [0..n)
1945    cur_run: usize,
1946    // Cached end (logical index, 1-based per spec) for the current run.
1947    cur_end: usize,
1948}
1949
1950type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
1951type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
1952type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
1953
1954impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
1955    fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
1956        let ends = arr.run_ends();
1957        let base = ends.get_start_physical_index();
1958        let slice = ends.values();
1959        let len = ends.len();
1960        let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
1961        Self {
1962            ends_slice: slice,
1963            base,
1964            len,
1965            values,
1966            cur_run: 0,
1967            cur_end,
1968        }
1969    }
1970
1971    /// Advance `cur_run` so that `idx` is within the run ending at `cur_end`.
1972    /// Uses the REE invariant: run ends are strictly increasing, positive, and 1-based.
1973    #[inline(always)]
1974    fn advance_to_row(&mut self, idx: usize) -> Result<(), ArrowError> {
1975        if idx < self.cur_end {
1976            return Ok(());
1977        }
1978        // Move forward across run boundaries until idx falls within cur_end
1979        while self.cur_run + 1 < self.len && idx >= self.cur_end {
1980            self.cur_run += 1;
1981            self.cur_end = self.ends_slice[self.base + self.cur_run].as_usize();
1982        }
1983        if idx < self.cur_end {
1984            Ok(())
1985        } else {
1986            Err(ArrowError::InvalidArgumentError(format!(
1987                "row index {idx} out of bounds for run-ends ({} runs)",
1988                self.len
1989            )))
1990        }
1991    }
1992
1993    #[inline(always)]
1994    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1995        self.advance_to_row(idx)?;
1996        // For REE values, the value for any logical row within a run is at
1997        // the physical index of that run.
1998        self.values.encode(out, self.cur_run)
1999    }
2000}
2001
2002#[cfg(test)]
2003mod tests {
2004    use super::*;
2005    use arrow_array::types::Int32Type;
2006    use arrow_array::{
2007        Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
2008        Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, NullArray,
2009        StringArray,
2010    };
2011    use arrow_buffer::Buffer;
2012    use arrow_schema::{DataType, Field, Fields, UnionFields};
2013
2014    fn zigzag_i64(v: i64) -> u64 {
2015        ((v << 1) ^ (v >> 63)) as u64
2016    }
2017
2018    fn varint(mut x: u64) -> Vec<u8> {
2019        let mut out = Vec::new();
2020        while (x & !0x7f) != 0 {
2021            out.push(((x & 0x7f) as u8) | 0x80);
2022            x >>= 7;
2023        }
2024        out.push((x & 0x7f) as u8);
2025        out
2026    }
2027
2028    fn avro_long_bytes(v: i64) -> Vec<u8> {
2029        varint(zigzag_i64(v))
2030    }
2031
2032    fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
2033        let mut out = avro_long_bytes(payload.len() as i64);
2034        out.extend_from_slice(payload);
2035        out
2036    }
2037
2038    fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
2039        let m = months.to_le_bytes();
2040        let d = days.to_le_bytes();
2041        let ms = millis.to_le_bytes();
2042        [
2043            m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
2044        ]
2045    }
2046
2047    fn encode_all(
2048        array: &dyn Array,
2049        plan: &FieldPlan,
2050        nullability: Option<Nullability>,
2051    ) -> Vec<u8> {
2052        let field = Field::new("f", array.data_type().clone(), true);
2053        let mut enc = FieldEncoder::make_encoder(array, &field, plan, nullability).unwrap();
2054        let mut out = Vec::new();
2055        for i in 0..array.len() {
2056            enc.encode(&mut out, i).unwrap();
2057        }
2058        out
2059    }
2060
2061    fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
2062        if actual != expected {
2063            let to_hex = |b: &[u8]| {
2064                b.iter()
2065                    .map(|x| format!("{:02X}", x))
2066                    .collect::<Vec<_>>()
2067                    .join(" ")
2068            };
2069            panic!(
2070                "mismatch\n  expected: [{}]\n    actual: [{}]",
2071                to_hex(expected),
2072                to_hex(actual)
2073            );
2074        }
2075    }
2076
2077    #[test]
2078    fn binary_encoder() {
2079        let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
2080        let arr = BinaryArray::from_vec(values);
2081        let mut expected = Vec::new();
2082        for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
2083            expected.extend(avro_len_prefixed_bytes(payload));
2084        }
2085        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2086        assert_bytes_eq(&got, &expected);
2087    }
2088
2089    #[test]
2090    fn large_binary_encoder() {
2091        let values: Vec<&[u8]> = vec![b"xyz", b""];
2092        let arr = LargeBinaryArray::from_vec(values);
2093        let mut expected = Vec::new();
2094        for payload in [b"xyz" as &[u8], b""] {
2095            expected.extend(avro_len_prefixed_bytes(payload));
2096        }
2097        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2098        assert_bytes_eq(&got, &expected);
2099    }
2100
2101    #[test]
2102    fn utf8_encoder() {
2103        let arr = StringArray::from(vec!["", "A", "BC"]);
2104        let mut expected = Vec::new();
2105        for s in ["", "A", "BC"] {
2106            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2107        }
2108        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2109        assert_bytes_eq(&got, &expected);
2110    }
2111
2112    #[test]
2113    fn large_utf8_encoder() {
2114        let arr = LargeStringArray::from(vec!["hello", ""]);
2115        let mut expected = Vec::new();
2116        for s in ["hello", ""] {
2117            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2118        }
2119        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2120        assert_bytes_eq(&got, &expected);
2121    }
2122
2123    #[test]
2124    fn list_encoder_int32() {
2125        // Build ListArray [[1,2], [], [3]]
2126        let values = Int32Array::from(vec![1, 2, 3]);
2127        let offsets = vec![0, 2, 2, 3];
2128        let list = ListArray::new(
2129            Field::new("item", DataType::Int32, true).into(),
2130            arrow_buffer::OffsetBuffer::new(offsets.into()),
2131            Arc::new(values) as ArrayRef,
2132            None,
2133        );
2134        // Avro array encoding per row
2135        let mut expected = Vec::new();
2136        // row 0: block len 2, items 1,2 then 0
2137        expected.extend(avro_long_bytes(2));
2138        expected.extend(avro_long_bytes(1));
2139        expected.extend(avro_long_bytes(2));
2140        expected.extend(avro_long_bytes(0));
2141        // row 1: empty
2142        expected.extend(avro_long_bytes(0));
2143        // row 2: one item 3
2144        expected.extend(avro_long_bytes(1));
2145        expected.extend(avro_long_bytes(3));
2146        expected.extend(avro_long_bytes(0));
2147
2148        let plan = FieldPlan::List {
2149            items_nullability: None,
2150            item_plan: Box::new(FieldPlan::Scalar),
2151        };
2152        let got = encode_all(&list, &plan, None);
2153        assert_bytes_eq(&got, &expected);
2154    }
2155
2156    #[test]
2157    fn struct_encoder_two_fields() {
2158        // Struct { a: Int32, b: Utf8 }
2159        let a = Int32Array::from(vec![1, 2]);
2160        let b = StringArray::from(vec!["x", "y"]);
2161        let fields = Fields::from(vec![
2162            Field::new("a", DataType::Int32, true),
2163            Field::new("b", DataType::Utf8, true),
2164        ]);
2165        let struct_arr = StructArray::new(
2166            fields.clone(),
2167            vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2168            None,
2169        );
2170        let plan = FieldPlan::Struct {
2171            bindings: vec![
2172                FieldBinding {
2173                    arrow_index: 0,
2174                    nullability: None,
2175                    plan: FieldPlan::Scalar,
2176                },
2177                FieldBinding {
2178                    arrow_index: 1,
2179                    nullability: None,
2180                    plan: FieldPlan::Scalar,
2181                },
2182            ],
2183        };
2184        let got = encode_all(&struct_arr, &plan, None);
2185        // Expected: rows concatenated: a then b
2186        let mut expected = Vec::new();
2187        expected.extend(avro_long_bytes(1)); // a=1
2188        expected.extend(avro_len_prefixed_bytes(b"x")); // b="x"
2189        expected.extend(avro_long_bytes(2)); // a=2
2190        expected.extend(avro_len_prefixed_bytes(b"y")); // b="y"
2191        assert_bytes_eq(&got, &expected);
2192    }
2193
2194    #[test]
2195    fn enum_encoder_dictionary() {
2196        // symbols: ["A","B","C"], keys [2,0,1]
2197        let dict_values = StringArray::from(vec!["A", "B", "C"]);
2198        let keys = Int32Array::from(vec![2, 0, 1]);
2199        let dict =
2200            DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
2201        let symbols = Arc::<[String]>::from(
2202            vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
2203        );
2204        let plan = FieldPlan::Enum { symbols };
2205        let got = encode_all(&dict, &plan, None);
2206        let mut expected = Vec::new();
2207        expected.extend(avro_long_bytes(2));
2208        expected.extend(avro_long_bytes(0));
2209        expected.extend(avro_long_bytes(1));
2210        assert_bytes_eq(&got, &expected);
2211    }
2212
2213    #[test]
2214    fn decimal_bytes_and_fixed() {
2215        // Use Decimal128 with small positives and negatives
2216        let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
2217            .with_precision_and_scale(20, 0)
2218            .unwrap();
2219        // bytes(decimal): minimal two's complement length-prefixed
2220        let plan_bytes = FieldPlan::Decimal { size: None };
2221        let got_bytes = encode_all(&dec, &plan_bytes, None);
2222        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
2223        let mut expected_bytes = Vec::new();
2224        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2225        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2226        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2227        assert_bytes_eq(&got_bytes, &expected_bytes);
2228
2229        let plan_fixed = FieldPlan::Decimal { size: Some(16) };
2230        let got_fixed = encode_all(&dec, &plan_fixed, None);
2231        let mut expected_fixed = Vec::new();
2232        expected_fixed.extend_from_slice(&1i128.to_be_bytes());
2233        expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
2234        expected_fixed.extend_from_slice(&0i128.to_be_bytes());
2235        assert_bytes_eq(&got_fixed, &expected_fixed);
2236    }
2237
2238    #[test]
2239    fn decimal_bytes_256() {
2240        use arrow_buffer::i256;
2241        // Use Decimal256 with small positives and negatives
2242        let dec = Decimal256Array::from(vec![
2243            i256::from_i128(1),
2244            i256::from_i128(-1),
2245            i256::from_i128(0),
2246        ])
2247        .with_precision_and_scale(76, 0)
2248        .unwrap();
2249        // bytes(decimal): minimal two's complement length-prefixed
2250        let plan_bytes = FieldPlan::Decimal { size: None };
2251        let got_bytes = encode_all(&dec, &plan_bytes, None);
2252        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
2253        let mut expected_bytes = Vec::new();
2254        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2255        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2256        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2257        assert_bytes_eq(&got_bytes, &expected_bytes);
2258
2259        // fixed(32): 32-byte big-endian two's complement
2260        let plan_fixed = FieldPlan::Decimal { size: Some(32) };
2261        let got_fixed = encode_all(&dec, &plan_fixed, None);
2262        let mut expected_fixed = Vec::new();
2263        expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
2264        expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
2265        expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
2266        assert_bytes_eq(&got_fixed, &expected_fixed);
2267    }
2268
2269    #[cfg(feature = "small_decimals")]
2270    #[test]
2271    fn decimal_bytes_and_fixed_32() {
2272        // Use Decimal32 with small positives and negatives
2273        let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
2274            .with_precision_and_scale(9, 0)
2275            .unwrap();
2276        // bytes(decimal)
2277        let plan_bytes = FieldPlan::Decimal { size: None };
2278        let got_bytes = encode_all(&dec, &plan_bytes, None);
2279        let mut expected_bytes = Vec::new();
2280        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2281        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2282        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2283        assert_bytes_eq(&got_bytes, &expected_bytes);
2284        // fixed(4)
2285        let plan_fixed = FieldPlan::Decimal { size: Some(4) };
2286        let got_fixed = encode_all(&dec, &plan_fixed, None);
2287        let mut expected_fixed = Vec::new();
2288        expected_fixed.extend_from_slice(&1i32.to_be_bytes());
2289        expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
2290        expected_fixed.extend_from_slice(&0i32.to_be_bytes());
2291        assert_bytes_eq(&got_fixed, &expected_fixed);
2292    }
2293
2294    #[cfg(feature = "small_decimals")]
2295    #[test]
2296    fn decimal_bytes_and_fixed_64() {
2297        // Use Decimal64 with small positives and negatives
2298        let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
2299            .with_precision_and_scale(18, 0)
2300            .unwrap();
2301        // bytes(decimal)
2302        let plan_bytes = FieldPlan::Decimal { size: None };
2303        let got_bytes = encode_all(&dec, &plan_bytes, None);
2304        let mut expected_bytes = Vec::new();
2305        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2306        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2307        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2308        assert_bytes_eq(&got_bytes, &expected_bytes);
2309        // fixed(8)
2310        let plan_fixed = FieldPlan::Decimal { size: Some(8) };
2311        let got_fixed = encode_all(&dec, &plan_fixed, None);
2312        let mut expected_fixed = Vec::new();
2313        expected_fixed.extend_from_slice(&1i64.to_be_bytes());
2314        expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
2315        expected_fixed.extend_from_slice(&0i64.to_be_bytes());
2316        assert_bytes_eq(&got_fixed, &expected_fixed);
2317    }
2318
2319    #[test]
2320    fn float32_and_float64_encoders() {
2321        let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); // includes a quiet NaN bit pattern
2322        let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
2323        // f32 expected
2324        let mut expected32 = Vec::new();
2325        for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
2326            expected32.extend_from_slice(&v.to_bits().to_le_bytes());
2327        }
2328        let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
2329        assert_bytes_eq(&got32, &expected32);
2330        // f64 expected
2331        let mut expected64 = Vec::new();
2332        for v in [0.0f64, -2.25f64] {
2333            expected64.extend_from_slice(&v.to_bits().to_le_bytes());
2334        }
2335        let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
2336        assert_bytes_eq(&got64, &expected64);
2337    }
2338
2339    #[test]
2340    fn long_encoder_int64() {
2341        let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
2342        let mut expected = Vec::new();
2343        for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
2344            expected.extend(avro_long_bytes(v));
2345        }
2346        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2347        assert_bytes_eq(&got, &expected);
2348    }
2349
2350    #[test]
2351    fn fixed_encoder_plain() {
2352        // Two values of width 4
2353        let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
2354        let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
2355        let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
2356        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2357        let mut expected = Vec::new();
2358        expected.extend_from_slice(&data[0]);
2359        expected.extend_from_slice(&data[1]);
2360        assert_bytes_eq(&got, &expected);
2361    }
2362
2363    #[test]
2364    fn uuid_encoder_test() {
2365        // Happy path
2366        let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
2367        let bytes = *u.as_bytes();
2368        let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
2369        // Expected: length 36 (0x48) followed by hyphenated lowercase text
2370        let mut expected = Vec::new();
2371        expected.push(0x48);
2372        expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
2373        let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
2374        assert_bytes_eq(&got, &expected);
2375    }
2376
2377    #[test]
2378    fn uuid_encoder_error() {
2379        // Invalid UUID bytes: wrong length
2380        let arr =
2381            FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
2382                .unwrap();
2383        let plan = FieldPlan::Uuid;
2384
2385        let field = Field::new("f", arr.data_type().clone(), true);
2386        let mut enc = FieldEncoder::make_encoder(&arr, &field, &plan, None).unwrap();
2387        let mut out = Vec::new();
2388        let err = enc.encode(&mut out, 0).unwrap_err();
2389        match err {
2390            ArrowError::InvalidArgumentError(msg) => {
2391                assert!(msg.contains("Invalid UUID bytes"))
2392            }
2393            other => panic!("expected InvalidArgumentError, got {other:?}"),
2394        }
2395    }
2396
2397    fn test_scalar_primitive_encoding<T>(
2398        non_nullable_data: &[T::Native],
2399        nullable_data: &[Option<T::Native>],
2400    ) where
2401        T: ArrowPrimitiveType,
2402        T::Native: Into<i64> + Copy,
2403        PrimitiveArray<T>: From<Vec<<T as ArrowPrimitiveType>::Native>>,
2404    {
2405        let plan = FieldPlan::Scalar;
2406
2407        let array = PrimitiveArray::<T>::from(non_nullable_data.to_vec());
2408        let got = encode_all(&array, &plan, None);
2409
2410        let mut expected = Vec::new();
2411        for &value in non_nullable_data {
2412            expected.extend(avro_long_bytes(value.into()));
2413        }
2414        assert_bytes_eq(&got, &expected);
2415
2416        let array_nullable: PrimitiveArray<T> = nullable_data.iter().copied().collect();
2417        let got_nullable = encode_all(&array_nullable, &plan, Some(Nullability::NullFirst));
2418
2419        let mut expected_nullable = Vec::new();
2420        for &opt_value in nullable_data {
2421            match opt_value {
2422                Some(value) => {
2423                    // Union index 1 for the value, then the value itself
2424                    expected_nullable.extend(avro_long_bytes(1));
2425                    expected_nullable.extend(avro_long_bytes(value.into()));
2426                }
2427                None => {
2428                    // Union index 0 for the null
2429                    expected_nullable.extend(avro_long_bytes(0));
2430                }
2431            }
2432        }
2433        assert_bytes_eq(&got_nullable, &expected_nullable);
2434    }
2435
2436    #[test]
2437    fn date32_encoder() {
2438        test_scalar_primitive_encoding::<Date32Type>(
2439            &[
2440                19345, // 2022-12-20
2441                0,     // 1970-01-01 (epoch)
2442                -1,    // 1969-12-31 (pre-epoch)
2443            ],
2444            &[Some(19345), None],
2445        );
2446    }
2447
2448    #[test]
2449    fn time32_millis_encoder() {
2450        test_scalar_primitive_encoding::<Time32MillisecondType>(
2451            &[
2452                0,        // Midnight
2453                49530123, // 13:45:30.123
2454                86399999, // 23:59:59.999
2455            ],
2456            &[None, Some(49530123)],
2457        );
2458    }
2459
2460    #[test]
2461    fn time64_micros_encoder() {
2462        test_scalar_primitive_encoding::<Time64MicrosecondType>(
2463            &[
2464                0,           // Midnight
2465                86399999999, // 23:59:59.999999
2466            ],
2467            &[Some(86399999999), None],
2468        );
2469    }
2470
2471    #[test]
2472    fn timestamp_millis_encoder() {
2473        test_scalar_primitive_encoding::<TimestampMillisecondType>(
2474            &[
2475                1704067200000, // 2024-01-01T00:00:00Z
2476                0,             // 1970-01-01T00:00:00Z (epoch)
2477                -123456789,    // Pre-epoch timestamp
2478            ],
2479            &[None, Some(1704067200000)],
2480        );
2481    }
2482
2483    #[test]
2484    fn map_encoder_string_keys_int_values() {
2485        // Build MapArray with two rows
2486        // Row0: {"k1":1, "k2":2}
2487        // Row1: {}
2488        let keys = StringArray::from(vec!["k1", "k2"]);
2489        let values = Int32Array::from(vec![1, 2]);
2490        let entries_fields = Fields::from(vec![
2491            Field::new("key", DataType::Utf8, false),
2492            Field::new("value", DataType::Int32, true),
2493        ]);
2494        let entries = StructArray::new(
2495            entries_fields,
2496            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2497            None,
2498        );
2499        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
2500        let map = MapArray::new(
2501            Field::new("entries", entries.data_type().clone(), false).into(),
2502            offsets,
2503            entries,
2504            None,
2505            false,
2506        );
2507        let plan = FieldPlan::Map {
2508            values_nullability: None,
2509            value_plan: Box::new(FieldPlan::Scalar),
2510        };
2511        let got = encode_all(&map, &plan, None);
2512        let mut expected = Vec::new();
2513        // Row0: block 2 then pairs
2514        expected.extend(avro_long_bytes(2));
2515        expected.extend(avro_len_prefixed_bytes(b"k1"));
2516        expected.extend(avro_long_bytes(1));
2517        expected.extend(avro_len_prefixed_bytes(b"k2"));
2518        expected.extend(avro_long_bytes(2));
2519        expected.extend(avro_long_bytes(0));
2520        // Row1: empty
2521        expected.extend(avro_long_bytes(0));
2522        assert_bytes_eq(&got, &expected);
2523    }
2524
2525    #[test]
2526    fn union_encoder_string_int() {
2527        let strings = StringArray::from(vec!["hello", "world"]);
2528        let ints = Int32Array::from(vec![10, 20, 30]);
2529
2530        let union_fields = UnionFields::new(
2531            vec![0, 1],
2532            vec![
2533                Field::new("v_str", DataType::Utf8, true),
2534                Field::new("v_int", DataType::Int32, true),
2535            ],
2536        );
2537
2538        let type_ids = Buffer::from_slice_ref([0_i8, 1, 1, 0, 1]);
2539        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2540
2541        let union_array = UnionArray::try_new(
2542            union_fields,
2543            type_ids.into(),
2544            Some(offsets.into()),
2545            vec![Arc::new(strings), Arc::new(ints)],
2546        )
2547        .unwrap();
2548
2549        let plan = FieldPlan::Union {
2550            bindings: vec![
2551                FieldBinding {
2552                    arrow_index: 0,
2553                    nullability: None,
2554                    plan: FieldPlan::Scalar,
2555                },
2556                FieldBinding {
2557                    arrow_index: 1,
2558                    nullability: None,
2559                    plan: FieldPlan::Scalar,
2560                },
2561            ],
2562        };
2563
2564        let got = encode_all(&union_array, &plan, None);
2565
2566        let mut expected = Vec::new();
2567        expected.extend(avro_long_bytes(0));
2568        expected.extend(avro_len_prefixed_bytes(b"hello"));
2569        expected.extend(avro_long_bytes(1));
2570        expected.extend(avro_long_bytes(10));
2571        expected.extend(avro_long_bytes(1));
2572        expected.extend(avro_long_bytes(20));
2573        expected.extend(avro_long_bytes(0));
2574        expected.extend(avro_len_prefixed_bytes(b"world"));
2575        expected.extend(avro_long_bytes(1));
2576        expected.extend(avro_long_bytes(30));
2577
2578        assert_bytes_eq(&got, &expected);
2579    }
2580
2581    #[test]
2582    fn union_encoder_null_string_int() {
2583        let nulls = NullArray::new(1);
2584        let strings = StringArray::from(vec!["hello"]);
2585        let ints = Int32Array::from(vec![10]);
2586
2587        let union_fields = UnionFields::new(
2588            vec![0, 1, 2],
2589            vec![
2590                Field::new("v_null", DataType::Null, true),
2591                Field::new("v_str", DataType::Utf8, true),
2592                Field::new("v_int", DataType::Int32, true),
2593            ],
2594        );
2595
2596        let type_ids = Buffer::from_slice_ref([0_i8, 1, 2]);
2597        // For a null value in a dense union, no value is added to a child array.
2598        // The offset points to the last value of that type. Since there's only one
2599        // null, and one of each other type, all offsets are 0.
2600        let offsets = Buffer::from_slice_ref([0_i32, 0, 0]);
2601
2602        let union_array = UnionArray::try_new(
2603            union_fields,
2604            type_ids.into(),
2605            Some(offsets.into()),
2606            vec![Arc::new(nulls), Arc::new(strings), Arc::new(ints)],
2607        )
2608        .unwrap();
2609
2610        let plan = FieldPlan::Union {
2611            bindings: vec![
2612                FieldBinding {
2613                    arrow_index: 0,
2614                    nullability: None,
2615                    plan: FieldPlan::Scalar,
2616                },
2617                FieldBinding {
2618                    arrow_index: 1,
2619                    nullability: None,
2620                    plan: FieldPlan::Scalar,
2621                },
2622                FieldBinding {
2623                    arrow_index: 2,
2624                    nullability: None,
2625                    plan: FieldPlan::Scalar,
2626                },
2627            ],
2628        };
2629
2630        let got = encode_all(&union_array, &plan, None);
2631
2632        let mut expected = Vec::new();
2633        expected.extend(avro_long_bytes(0));
2634        expected.extend(avro_long_bytes(1));
2635        expected.extend(avro_len_prefixed_bytes(b"hello"));
2636        expected.extend(avro_long_bytes(2));
2637        expected.extend(avro_long_bytes(10));
2638
2639        assert_bytes_eq(&got, &expected);
2640    }
2641
2642    #[test]
2643    fn list64_encoder_int32() {
2644        // LargeList [[1,2,3], []]
2645        let values = Int32Array::from(vec![1, 2, 3]);
2646        let offsets: Vec<i64> = vec![0, 3, 3];
2647        let list = LargeListArray::new(
2648            Field::new("item", DataType::Int32, true).into(),
2649            arrow_buffer::OffsetBuffer::new(offsets.into()),
2650            Arc::new(values) as ArrayRef,
2651            None,
2652        );
2653        let plan = FieldPlan::List {
2654            items_nullability: None,
2655            item_plan: Box::new(FieldPlan::Scalar),
2656        };
2657        let got = encode_all(&list, &plan, None);
2658        // Expected one block of 3 and then 0, then empty 0
2659        let mut expected = Vec::new();
2660        expected.extend(avro_long_bytes(3));
2661        expected.extend(avro_long_bytes(1));
2662        expected.extend(avro_long_bytes(2));
2663        expected.extend(avro_long_bytes(3));
2664        expected.extend(avro_long_bytes(0));
2665        expected.extend(avro_long_bytes(0));
2666        assert_bytes_eq(&got, &expected);
2667    }
2668
2669    #[test]
2670    fn int_encoder_test() {
2671        let ints = Int32Array::from(vec![0, -1, 2]);
2672        let mut expected_i = Vec::new();
2673        for v in [0i32, -1, 2] {
2674            expected_i.extend(avro_long_bytes(v as i64));
2675        }
2676        let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
2677        assert_bytes_eq(&got_i, &expected_i);
2678    }
2679
2680    #[test]
2681    fn boolean_encoder_test() {
2682        let bools = BooleanArray::from(vec![true, false]);
2683        let mut expected_b = Vec::new();
2684        expected_b.extend_from_slice(&[1]);
2685        expected_b.extend_from_slice(&[0]);
2686        let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
2687        assert_bytes_eq(&got_b, &expected_b);
2688    }
2689
2690    #[test]
2691    #[cfg(feature = "avro_custom_types")]
2692    fn duration_encoding_seconds() {
2693        let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
2694        let mut expected = Vec::new();
2695        for v in [0i64, -1, 2] {
2696            expected.extend_from_slice(&avro_long_bytes(v));
2697        }
2698        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2699        assert_bytes_eq(&got, &expected);
2700    }
2701
2702    #[test]
2703    #[cfg(feature = "avro_custom_types")]
2704    fn duration_encoding_milliseconds() {
2705        let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
2706        let mut expected = Vec::new();
2707        for v in [1i64, 0, -2] {
2708            expected.extend_from_slice(&avro_long_bytes(v));
2709        }
2710        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2711        assert_bytes_eq(&got, &expected);
2712    }
2713
2714    #[test]
2715    #[cfg(feature = "avro_custom_types")]
2716    fn duration_encoding_microseconds() {
2717        let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
2718        let mut expected = Vec::new();
2719        for v in [5i64, -6, 7] {
2720            expected.extend_from_slice(&avro_long_bytes(v));
2721        }
2722        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2723        assert_bytes_eq(&got, &expected);
2724    }
2725
2726    #[test]
2727    #[cfg(feature = "avro_custom_types")]
2728    fn duration_encoding_nanoseconds() {
2729        let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
2730        let mut expected = Vec::new();
2731        for v in [8i64, 9, -10] {
2732            expected.extend_from_slice(&avro_long_bytes(v));
2733        }
2734        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2735        assert_bytes_eq(&got, &expected);
2736    }
2737
2738    #[test]
2739    fn duration_encoder_year_month_happy_path() {
2740        let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
2741        let mut expected = Vec::new();
2742        for m in [0u32, 1u32, 25u32] {
2743            expected.extend_from_slice(&duration_fixed12(m, 0, 0));
2744        }
2745        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2746        assert_bytes_eq(&got, &expected);
2747    }
2748
2749    #[test]
2750    fn duration_encoder_year_month_rejects_negative() {
2751        let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
2752        let field = Field::new("f", DataType::Interval(IntervalUnit::YearMonth), true);
2753        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2754        let mut out = Vec::new();
2755        let err = enc.encode(&mut out, 0).unwrap_err();
2756        match err {
2757            ArrowError::InvalidArgumentError(msg) => {
2758                assert!(msg.contains("cannot encode negative months"))
2759            }
2760            other => panic!("expected InvalidArgumentError, got {other:?}"),
2761        }
2762    }
2763
2764    #[test]
2765    fn duration_encoder_day_time_happy_path() {
2766        let v0 = IntervalDayTimeType::make_value(2, 500); // days=2, millis=500
2767        let v1 = IntervalDayTimeType::make_value(0, 0);
2768        let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
2769        let mut expected = Vec::new();
2770        expected.extend_from_slice(&duration_fixed12(0, 2, 500));
2771        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2772        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2773        assert_bytes_eq(&got, &expected);
2774    }
2775
2776    #[test]
2777    fn duration_encoder_day_time_rejects_negative() {
2778        let bad = IntervalDayTimeType::make_value(-1, 0);
2779        let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
2780        let field = Field::new("f", DataType::Interval(IntervalUnit::DayTime), true);
2781        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2782        let mut out = Vec::new();
2783        let err = enc.encode(&mut out, 0).unwrap_err();
2784        match err {
2785            ArrowError::InvalidArgumentError(msg) => {
2786                assert!(msg.contains("cannot encode negative days"))
2787            }
2788            other => panic!("expected InvalidArgumentError, got {other:?}"),
2789        }
2790    }
2791
2792    #[test]
2793    fn duration_encoder_month_day_nano_happy_path() {
2794        let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); // -> millis = 3
2795        let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
2796        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
2797        let mut expected = Vec::new();
2798        expected.extend_from_slice(&duration_fixed12(1, 2, 3));
2799        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2800        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2801        assert_bytes_eq(&got, &expected);
2802    }
2803
2804    #[test]
2805    fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
2806        let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
2807        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
2808        let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true);
2809        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2810        let mut out = Vec::new();
2811        let err = enc.encode(&mut out, 0).unwrap_err();
2812        match err {
2813            ArrowError::InvalidArgumentError(msg) => {
2814                assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
2815            }
2816            other => panic!("expected InvalidArgumentError, got {other:?}"),
2817        }
2818    }
2819
2820    #[test]
2821    fn minimal_twos_complement_test() {
2822        let pos = [0x00, 0x00, 0x01];
2823        assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
2824        let neg = [0xFF, 0xFF, 0x80]; // negative minimal is 0x80
2825        assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
2826        let zero = [0x00, 0x00, 0x00];
2827        assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
2828    }
2829
2830    #[test]
2831    fn write_sign_extend_test() {
2832        let mut out = Vec::new();
2833        write_sign_extended(&mut out, &[0x01], 4).unwrap();
2834        assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
2835        out.clear();
2836        write_sign_extended(&mut out, &[0xFF], 4).unwrap();
2837        assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
2838        out.clear();
2839        // truncation success (sign bytes only removed)
2840        write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
2841        assert_eq!(out, vec![0xFF, 0x80]);
2842        out.clear();
2843        // truncation overflow
2844        let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
2845        match err {
2846            ArrowError::InvalidArgumentError(_) => {}
2847            _ => panic!("expected InvalidArgumentError"),
2848        }
2849    }
2850
2851    #[test]
2852    fn duration_month_day_nano_overflow_millis() {
2853        // nanos leading to millis > u32::MAX
2854        let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
2855        let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
2856        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
2857        let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true);
2858        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2859        let mut out = Vec::new();
2860        let err = enc.encode(&mut out, 0).unwrap_err();
2861        match err {
2862            ArrowError::InvalidArgumentError(msg) => assert!(msg.contains("exceed u32::MAX")),
2863            _ => panic!("expected InvalidArgumentError"),
2864        }
2865    }
2866
2867    #[test]
2868    fn fieldplan_decimal_precision_scale_mismatch_errors() {
2869        // Avro expects (10,2), Arrow has (12,2)
2870        use crate::codec::Codec;
2871        use std::collections::HashMap;
2872        let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
2873        let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2874        let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
2875        match err {
2876            ArrowError::SchemaError(msg) => {
2877                assert!(msg.contains("Decimal precision/scale mismatch"))
2878            }
2879            _ => panic!("expected SchemaError"),
2880        }
2881    }
2882
2883    #[test]
2884    fn timestamp_micros_encoder() {
2885        // Mirrors the style used by `timestamp_millis_encoder`
2886        test_scalar_primitive_encoding::<TimestampMicrosecondType>(
2887            &[
2888                1_704_067_200_000_000, // 2024-01-01T00:00:00Z in micros
2889                0,                     // epoch
2890                -123_456_789,          // pre-epoch
2891            ],
2892            &[None, Some(1_704_067_200_000_000)],
2893        );
2894    }
2895
2896    #[test]
2897    fn list_encoder_nullable_items_null_first() {
2898        // One List row with three elements: [Some(1), None, Some(2)]
2899        let values = Int32Array::from(vec![Some(1), None, Some(2)]);
2900        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
2901        let list = ListArray::new(
2902            Field::new("item", DataType::Int32, true).into(),
2903            offsets,
2904            Arc::new(values) as ArrayRef,
2905            None,
2906        );
2907
2908        let plan = FieldPlan::List {
2909            items_nullability: Some(Nullability::NullFirst),
2910            item_plan: Box::new(FieldPlan::Scalar),
2911        };
2912
2913        // Avro array encoding per row: one positive block, then 0 terminator.
2914        // For NullFirst: Some(v) => branch 1 (0x02) then the value; None => branch 0 (0x00)
2915        let mut expected = Vec::new();
2916        expected.extend(avro_long_bytes(3)); // block of 3
2917        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
2918        expected.extend(avro_long_bytes(1)); // value 1
2919        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
2920        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
2921        expected.extend(avro_long_bytes(2)); // value 2
2922        expected.extend(avro_long_bytes(0)); // block terminator
2923
2924        let got = encode_all(&list, &plan, None);
2925        assert_bytes_eq(&got, &expected);
2926    }
2927
2928    #[test]
2929    fn large_list_encoder_nullable_items_null_first() {
2930        // LargeList single row: [Some(10), None]
2931        let values = Int32Array::from(vec![Some(10), None]);
2932        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
2933        let list = LargeListArray::new(
2934            Field::new("item", DataType::Int32, true).into(),
2935            offsets,
2936            Arc::new(values) as ArrayRef,
2937            None,
2938        );
2939
2940        let plan = FieldPlan::List {
2941            items_nullability: Some(Nullability::NullFirst),
2942            item_plan: Box::new(FieldPlan::Scalar),
2943        };
2944
2945        let mut expected = Vec::new();
2946        expected.extend(avro_long_bytes(2)); // block of 2
2947        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
2948        expected.extend(avro_long_bytes(10)); // value 10
2949        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
2950        expected.extend(avro_long_bytes(0)); // block terminator
2951
2952        let got = encode_all(&list, &plan, None);
2953        assert_bytes_eq(&got, &expected);
2954    }
2955
2956    #[test]
2957    fn map_encoder_string_keys_nullable_int_values_null_first() {
2958        // One map row: {"k1": Some(7), "k2": None}
2959        let keys = StringArray::from(vec!["k1", "k2"]);
2960        let values = Int32Array::from(vec![Some(7), None]);
2961
2962        let entries_fields = Fields::from(vec![
2963            Field::new("key", DataType::Utf8, false),
2964            Field::new("value", DataType::Int32, true),
2965        ]);
2966        let entries = StructArray::new(
2967            entries_fields,
2968            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2969            None,
2970        );
2971
2972        // Single row -> offsets [0, 2]
2973        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
2974        let map = MapArray::new(
2975            Field::new("entries", entries.data_type().clone(), false).into(),
2976            offsets,
2977            entries,
2978            None,
2979            false,
2980        );
2981
2982        let plan = FieldPlan::Map {
2983            values_nullability: Some(Nullability::NullFirst),
2984            value_plan: Box::new(FieldPlan::Scalar),
2985        };
2986
2987        // Expected:
2988        // - one positive block (len=2)
2989        // - "k1", branch=1 + value=7
2990        // - "k2", branch=0 (null)
2991        // - end-of-block marker 0
2992        let mut expected = Vec::new();
2993        expected.extend(avro_long_bytes(2)); // block length 2
2994        expected.extend(avro_len_prefixed_bytes(b"k1")); // key "k1"
2995        expected.extend(avro_long_bytes(1)); // union branch 1 (value)
2996        expected.extend(avro_long_bytes(7)); // value 7
2997        expected.extend(avro_len_prefixed_bytes(b"k2")); // key "k2"
2998        expected.extend(avro_long_bytes(0)); // union branch 0 (null)
2999        expected.extend(avro_long_bytes(0)); // block terminator
3000
3001        let got = encode_all(&map, &plan, None);
3002        assert_bytes_eq(&got, &expected);
3003    }
3004
3005    #[test]
3006    fn time32_seconds_to_millis_encoder() {
3007        // Time32(Second) must encode as Avro time-millis (ms since midnight).
3008        let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
3009            vec![0i32, 1, -2, 12_345].into();
3010
3011        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3012
3013        let mut expected = Vec::new();
3014        for secs in [0i32, 1, -2, 12_345] {
3015            let millis = (secs as i64) * 1000;
3016            expected.extend_from_slice(&avro_long_bytes(millis));
3017        }
3018        assert_bytes_eq(&got, &expected);
3019    }
3020
3021    #[test]
3022    fn time32_seconds_to_millis_overflow() {
3023        // Choose a value that will overflow i32 when multiplied by 1000.
3024        let overflow_secs: i32 = i32::MAX / 1000 + 1;
3025        let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
3026            vec![overflow_secs].into();
3027
3028        let field = arrow_schema::Field::new(
3029            "f",
3030            arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second),
3031            true,
3032        );
3033        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
3034
3035        let mut out = Vec::new();
3036        let err = enc.encode(&mut out, 0).unwrap_err();
3037        match err {
3038            arrow_schema::ArrowError::InvalidArgumentError(msg) => {
3039                assert!(
3040                    msg.contains("overflowed") || msg.contains("overflow"),
3041                    "unexpected message: {msg}"
3042                )
3043            }
3044            other => panic!("expected InvalidArgumentError, got {other:?}"),
3045        }
3046    }
3047
3048    #[test]
3049    fn timestamp_seconds_to_millis_encoder() {
3050        // Timestamp(Second) must encode as Avro timestamp-millis (ms since epoch).
3051        let arr: arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
3052            vec![0i64, 1, -1, 1_234_567_890].into();
3053
3054        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3055
3056        let mut expected = Vec::new();
3057        for secs in [0i64, 1, -1, 1_234_567_890] {
3058            let millis = secs * 1000;
3059            expected.extend_from_slice(&avro_long_bytes(millis));
3060        }
3061        assert_bytes_eq(&got, &expected);
3062    }
3063
3064    #[test]
3065    fn timestamp_seconds_to_millis_overflow() {
3066        // Overflow i64 when multiplied by 1000.
3067        let overflow_secs: i64 = i64::MAX / 1000 + 1;
3068        let arr: arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
3069            vec![overflow_secs].into();
3070
3071        let field = arrow_schema::Field::new(
3072            "f",
3073            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, None),
3074            true,
3075        );
3076        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
3077
3078        let mut out = Vec::new();
3079        let err = enc.encode(&mut out, 0).unwrap_err();
3080        match err {
3081            arrow_schema::ArrowError::InvalidArgumentError(msg) => {
3082                assert!(
3083                    msg.contains("overflowed") || msg.contains("overflow"),
3084                    "unexpected message: {msg}"
3085                )
3086            }
3087            other => panic!("expected InvalidArgumentError, got {other:?}"),
3088        }
3089    }
3090
3091    #[test]
3092    fn timestamp_nanos_encoder() {
3093        let arr: arrow_array::PrimitiveArray<arrow_array::types::TimestampNanosecondType> =
3094            vec![0i64, 1, -1, 123].into();
3095
3096        let got = encode_all(&arr, &FieldPlan::Scalar, None);
3097
3098        let mut expected = Vec::new();
3099        for ns in [0i64, 1, -1, 123] {
3100            expected.extend_from_slice(&avro_long_bytes(ns));
3101        }
3102        assert_bytes_eq(&got, &expected);
3103    }
3104}