arrow_avro/writer/
encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Encoder for Arrow types.
19
20use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::schema::{Fingerprint, Nullability, Prefix};
22use arrow_array::cast::AsArray;
23use arrow_array::types::{
24    ArrowPrimitiveType, Date32Type, DurationMicrosecondType, DurationMillisecondType,
25    DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int16Type, Int32Type,
26    Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
27    Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
28    TimestampMillisecondType,
29};
30use arrow_array::types::{
31    RunEndIndexType, Time32SecondType, TimestampNanosecondType, TimestampSecondType,
32};
33use arrow_array::{
34    Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
35    FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray,
36    GenericListViewArray, GenericStringArray, LargeListArray, LargeListViewArray, ListArray,
37    ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
38    StringViewArray, StructArray, UnionArray,
39};
40#[cfg(feature = "small_decimals")]
41use arrow_array::{Decimal32Array, Decimal64Array};
42use arrow_buffer::{ArrowNativeType, NullBuffer};
43use arrow_schema::{
44    ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode,
45};
46use std::io::Write;
47use std::sync::Arc;
48use uuid::Uuid;
49
50/// Encode a single Avro-`long` using ZigZag + variable length, buffered.
51///
52/// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding>
53#[inline]
54pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), ArrowError> {
55    let mut zz = ((value << 1) ^ (value >> 63)) as u64;
56    // At most 10 bytes for 64-bit varint
57    let mut buf = [0u8; 10];
58    let mut i = 0;
59    while (zz & !0x7F) != 0 {
60        buf[i] = ((zz & 0x7F) as u8) | 0x80;
61        i += 1;
62        zz >>= 7;
63    }
64    buf[i] = (zz & 0x7F) as u8;
65    i += 1;
66    out.write_all(&buf[..i])
67        .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e))
68}
69
70#[inline]
71fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), ArrowError> {
72    write_long(out, value as i64)
73}
74
75#[inline]
76fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
77    write_long(out, bytes.len() as i64)?;
78    out.write_all(bytes)
79        .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
80}
81
82#[inline]
83fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), ArrowError> {
84    out.write_all(&[if v { 1 } else { 0 }])
85        .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
86}
87
88/// Minimal two's-complement big-endian representation helper for Avro decimal (bytes).
89///
90/// For positive numbers, trim leading 0x00 until an essential byte is reached.
91/// For negative numbers, trim leading 0xFF until an essential byte is reached.
92/// The resulting slice still encodes the same signed value.
93///
94/// See Avro spec: decimal over `bytes` uses two's-complement big-endian
95/// representation of the unscaled integer value. 1.11.1 specification.
96#[inline]
97fn minimal_twos_complement(be: &[u8]) -> &[u8] {
98    if be.is_empty() {
99        return be;
100    }
101    let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
102    let mut k = 0usize;
103    while k < be.len() && be[k] == sign_byte {
104        k += 1;
105    }
106    if k == 0 {
107        return be;
108    }
109    if k == be.len() {
110        return &be[be.len() - 1..];
111    }
112    let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
113        k
114    } else {
115        k - 1
116    };
117    &be[drop..]
118}
119
120/// Sign-extend (or validate/truncate) big-endian integer bytes to exactly `n` bytes.
121///
122///
123/// - If shorter than `n`, the slice is sign-extended by left-padding with the
124///   sign byte (`0x00` for positive, `0xFF` for negative).
125/// - If longer than `n`, the slice is truncated from the left. An overflow error
126///   is returned if any of the truncated bytes are not redundant sign bytes,
127///   or if the resulting value's sign bit would differ from the original.
128/// - If the slice is already `n` bytes long, it is copied.
129///
130/// Used for encoding Avro decimal values into `fixed(N)` fields.
131#[inline]
132fn write_sign_extended<W: Write + ?Sized>(
133    out: &mut W,
134    src_be: &[u8],
135    n: usize,
136) -> Result<(), ArrowError> {
137    let len = src_be.len();
138    if len == n {
139        return out
140            .write_all(src_be)
141            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
142    }
143    let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
144        0xFF
145    } else {
146        0x00
147    };
148    if len > n {
149        let extra = len - n;
150        if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
151            return Ok(());
152        }
153        // All truncated bytes must equal the sign byte, and the MSB of the first
154        // retained byte must match the sign (otherwise overflow).
155        if src_be[..extra].iter().any(|&b| b != sign_byte)
156            || ((src_be[extra] ^ sign_byte) & 0x80) != 0
157        {
158            return Err(ArrowError::InvalidArgumentError(format!(
159                "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
160            )));
161        }
162        return out
163            .write_all(&src_be[extra..])
164            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
165    }
166    // len < n: prepend sign bytes (sign extension) then the payload
167    let pad_len = n - len;
168    // Fixed-size stack pads to avoid heap allocation on the hot path
169    const ZPAD: [u8; 64] = [0x00; 64];
170    const FPAD: [u8; 64] = [0xFF; 64];
171    let pad = if sign_byte == 0x00 {
172        &ZPAD[..]
173    } else {
174        &FPAD[..]
175    };
176    // Emit padding in 64‑byte chunks (minimizes write calls without allocating),
177    // then write the original bytes.
178    let mut rem = pad_len;
179    while rem >= pad.len() {
180        out.write_all(pad)
181            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
182        rem -= pad.len();
183    }
184    if rem > 0 {
185        out.write_all(&pad[..rem])
186            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
187    }
188    out.write_all(src_be)
189        .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))
190}
191
192/// Write the union branch index for an optional field.
193///
194/// Branch index is 0-based per Avro unions:
195/// - Null-first (default): null => 0, value => 1
196/// - Null-second (Impala): value => 0, null => 1
197fn write_optional_index<W: Write + ?Sized>(
198    out: &mut W,
199    is_null: bool,
200    null_order: Nullability,
201) -> Result<(), ArrowError> {
202    let byte = union_value_branch_byte(null_order, is_null);
203    out.write_all(&[byte])
204        .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), e))
205}
206
207#[derive(Debug, Clone)]
208enum NullState<'a> {
209    NonNullable,
210    NullableNoNulls {
211        union_value_byte: u8,
212    },
213    Nullable {
214        nulls: &'a NullBuffer,
215        null_order: Nullability,
216    },
217}
218
219/// Arrow to Avro FieldEncoder:
220/// - Holds the inner `Encoder` (by value)
221/// - Carries the per-site nullability **state** as a single enum that enforces invariants
222pub(crate) struct FieldEncoder<'a> {
223    encoder: Encoder<'a>,
224    null_state: NullState<'a>,
225}
226
227impl<'a> FieldEncoder<'a> {
228    fn make_encoder(
229        array: &'a dyn Array,
230        plan: &FieldPlan,
231        nullability: Option<Nullability>,
232    ) -> Result<Self, ArrowError> {
233        let encoder = match plan {
234            FieldPlan::Scalar => match array.data_type() {
235                DataType::Null => Encoder::Null,
236                DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
237                DataType::Utf8 => {
238                    Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
239                }
240                DataType::LargeUtf8 => {
241                    Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
242                }
243                DataType::Utf8View => {
244                    let arr = array
245                        .as_any()
246                        .downcast_ref::<StringViewArray>()
247                        .ok_or_else(|| {
248                            ArrowError::SchemaError("Expected StringViewArray".into())
249                        })?;
250                    Encoder::Utf8View(Utf8ViewEncoder(arr))
251                }
252                DataType::BinaryView => {
253                    let arr = array
254                        .as_any()
255                        .downcast_ref::<BinaryViewArray>()
256                        .ok_or_else(|| {
257                            ArrowError::SchemaError("Expected BinaryViewArray".into())
258                        })?;
259                    Encoder::BinaryView(BinaryViewEncoder(arr))
260                }
261                DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
262                DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
263                DataType::Date32 => Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
264                DataType::Date64 => {
265                    return Err(ArrowError::NotYetImplemented(
266                        "Avro logical type 'date' is days since epoch (int). Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a Timestamp."
267                            .into(),
268                    ));
269                }
270                DataType::Time32(TimeUnit::Second) => Encoder::Time32SecsToMillis(
271                    Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
272                ),
273                DataType::Time32(TimeUnit::Millisecond) => {
274                    Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
275                }
276                DataType::Time32(TimeUnit::Microsecond) => {
277                    return Err(ArrowError::InvalidArgumentError(
278                        "Arrow Time32 only supports Second or Millisecond. Use Time64 for microseconds."
279                            .into(),
280                    ));
281                }
282                DataType::Time32(TimeUnit::Nanosecond) => {
283                    return Err(ArrowError::InvalidArgumentError(
284                        "Arrow Time32 only supports Second or Millisecond. Use Time64 for nanoseconds."
285                            .into(),
286                    ));
287                }
288                DataType::Time64(TimeUnit::Microsecond) => Encoder::Time64Micros(LongEncoder(
289                    array.as_primitive::<Time64MicrosecondType>(),
290                )),
291                DataType::Time64(TimeUnit::Nanosecond) => {
292                    return Err(ArrowError::NotYetImplemented(
293                        "Avro writer does not support time-nanos; cast to Time64(Microsecond)."
294                            .into(),
295                    ));
296                }
297                DataType::Time64(TimeUnit::Millisecond) => {
298                    return Err(ArrowError::InvalidArgumentError(
299                        "Arrow Time64 with millisecond unit is not a valid Arrow type (use Time32 for millis)."
300                            .into(),
301                    ));
302                }
303                DataType::Time64(TimeUnit::Second) => {
304                    return Err(ArrowError::InvalidArgumentError(
305                        "Arrow Time64 with second unit is not a valid Arrow type (use Time32 for seconds)."
306                            .into(),
307                    ));
308                }
309                DataType::Float32 => {
310                    Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
311                }
312                DataType::Float64 => {
313                    Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
314                }
315                DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
316                DataType::LargeBinary => {
317                    Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
318                }
319                DataType::FixedSizeBinary(_len) => {
320                    let arr = array
321                        .as_any()
322                        .downcast_ref::<FixedSizeBinaryArray>()
323                        .ok_or_else(|| {
324                            ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
325                        })?;
326                    Encoder::Fixed(FixedEncoder(arr))
327                }
328                DataType::Timestamp(unit, _) => match unit {
329                    TimeUnit::Second => {
330                        Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
331                            array.as_primitive::<TimestampSecondType>(),
332                        ))
333                    }
334                    TimeUnit::Millisecond => Encoder::TimestampMillis(LongEncoder(
335                        array.as_primitive::<TimestampMillisecondType>(),
336                    )),
337                    TimeUnit::Microsecond => Encoder::TimestampMicros(LongEncoder(
338                        array.as_primitive::<TimestampMicrosecondType>(),
339                    )),
340                    TimeUnit::Nanosecond => Encoder::TimestampNanos(LongEncoder(
341                        array.as_primitive::<TimestampNanosecondType>(),
342                    )),
343                },
344                DataType::Interval(unit) => match unit {
345                    IntervalUnit::MonthDayNano => Encoder::IntervalMonthDayNano(DurationEncoder(
346                        array.as_primitive::<IntervalMonthDayNanoType>(),
347                    )),
348                    IntervalUnit::YearMonth => Encoder::IntervalYearMonth(DurationEncoder(
349                        array.as_primitive::<IntervalYearMonthType>(),
350                    )),
351                    IntervalUnit::DayTime => Encoder::IntervalDayTime(DurationEncoder(
352                        array.as_primitive::<IntervalDayTimeType>(),
353                    )),
354                },
355                DataType::Duration(tu) => match tu {
356                    TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
357                        array.as_primitive::<DurationSecondType>(),
358                    )),
359                    TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
360                        array.as_primitive::<DurationMillisecondType>(),
361                    )),
362                    TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
363                        array.as_primitive::<DurationMicrosecondType>(),
364                    )),
365                    TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
366                        array.as_primitive::<DurationNanosecondType>(),
367                    )),
368                },
369                other => {
370                    return Err(ArrowError::NotYetImplemented(format!(
371                        "Avro scalar type not yet supported: {other:?}"
372                    )));
373                }
374            },
375            FieldPlan::Struct { bindings } => {
376                let arr = array
377                    .as_any()
378                    .downcast_ref::<StructArray>()
379                    .ok_or_else(|| ArrowError::SchemaError("Expected StructArray".into()))?;
380                Encoder::Struct(Box::new(StructEncoder::try_new(arr, bindings)?))
381            }
382            FieldPlan::List {
383                items_nullability,
384                item_plan,
385            } => match array.data_type() {
386                DataType::List(_) => {
387                    let arr = array
388                        .as_any()
389                        .downcast_ref::<ListArray>()
390                        .ok_or_else(|| ArrowError::SchemaError("Expected ListArray".into()))?;
391                    Encoder::List(Box::new(ListEncoder32::try_new(
392                        arr,
393                        *items_nullability,
394                        item_plan.as_ref(),
395                    )?))
396                }
397                DataType::LargeList(_) => {
398                    let arr = array
399                        .as_any()
400                        .downcast_ref::<LargeListArray>()
401                        .ok_or_else(|| ArrowError::SchemaError("Expected LargeListArray".into()))?;
402                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
403                        arr,
404                        *items_nullability,
405                        item_plan.as_ref(),
406                    )?))
407                }
408                DataType::ListView(_) => {
409                    let arr = array
410                        .as_any()
411                        .downcast_ref::<ListViewArray>()
412                        .ok_or_else(|| ArrowError::SchemaError("Expected ListViewArray".into()))?;
413                    Encoder::ListView(Box::new(ListViewEncoder32::try_new(
414                        arr,
415                        *items_nullability,
416                        item_plan.as_ref(),
417                    )?))
418                }
419                DataType::LargeListView(_) => {
420                    let arr = array
421                        .as_any()
422                        .downcast_ref::<LargeListViewArray>()
423                        .ok_or_else(|| {
424                            ArrowError::SchemaError("Expected LargeListViewArray".into())
425                        })?;
426                    Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
427                        arr,
428                        *items_nullability,
429                        item_plan.as_ref(),
430                    )?))
431                }
432                DataType::FixedSizeList(_, _) => {
433                    let arr = array
434                        .as_any()
435                        .downcast_ref::<FixedSizeListArray>()
436                        .ok_or_else(|| {
437                            ArrowError::SchemaError("Expected FixedSizeListArray".into())
438                        })?;
439                    Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
440                        arr,
441                        *items_nullability,
442                        item_plan.as_ref(),
443                    )?))
444                }
445                other => {
446                    return Err(ArrowError::SchemaError(format!(
447                        "Avro array site requires Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
448                    )));
449                }
450            },
451            FieldPlan::Decimal { size } => match array.data_type() {
452                #[cfg(feature = "small_decimals")]
453                DataType::Decimal32(_, _) => {
454                    let arr = array
455                        .as_any()
456                        .downcast_ref::<Decimal32Array>()
457                        .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?;
458                    Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
459                }
460                #[cfg(feature = "small_decimals")]
461                DataType::Decimal64(_, _) => {
462                    let arr = array
463                        .as_any()
464                        .downcast_ref::<Decimal64Array>()
465                        .ok_or_else(|| ArrowError::SchemaError("Expected Decimal64Array".into()))?;
466                    Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
467                }
468                DataType::Decimal128(_, _) => {
469                    let arr = array
470                        .as_any()
471                        .downcast_ref::<Decimal128Array>()
472                        .ok_or_else(|| {
473                            ArrowError::SchemaError("Expected Decimal128Array".into())
474                        })?;
475                    Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
476                }
477                DataType::Decimal256(_, _) => {
478                    let arr = array
479                        .as_any()
480                        .downcast_ref::<Decimal256Array>()
481                        .ok_or_else(|| {
482                            ArrowError::SchemaError("Expected Decimal256Array".into())
483                        })?;
484                    Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
485                }
486                other => {
487                    return Err(ArrowError::SchemaError(format!(
488                        "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
489                    )));
490                }
491            },
492            FieldPlan::Uuid => {
493                let arr = array
494                    .as_any()
495                    .downcast_ref::<FixedSizeBinaryArray>()
496                    .ok_or_else(|| {
497                        ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
498                    })?;
499                Encoder::Uuid(UuidEncoder(arr))
500            }
501            FieldPlan::Map {
502                values_nullability,
503                value_plan,
504            } => {
505                let arr = array
506                    .as_any()
507                    .downcast_ref::<MapArray>()
508                    .ok_or_else(|| ArrowError::SchemaError("Expected MapArray".into()))?;
509                Encoder::Map(Box::new(MapEncoder::try_new(
510                    arr,
511                    *values_nullability,
512                    value_plan.as_ref(),
513                )?))
514            }
515            FieldPlan::Enum { symbols } => match array.data_type() {
516                DataType::Dictionary(key_dt, value_dt) => {
517                    if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
518                        return Err(ArrowError::SchemaError(
519                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
520                        ));
521                    }
522                    let dict = array
523                        .as_any()
524                        .downcast_ref::<DictionaryArray<Int32Type>>()
525                        .ok_or_else(|| {
526                            ArrowError::SchemaError("Expected DictionaryArray<Int32>".into())
527                        })?;
528                    let values = dict
529                        .values()
530                        .as_any()
531                        .downcast_ref::<StringArray>()
532                        .ok_or_else(|| {
533                            ArrowError::SchemaError("Dictionary values must be Utf8".into())
534                        })?;
535                    if values.len() != symbols.len() {
536                        return Err(ArrowError::SchemaError(format!(
537                            "Enum symbol length {} != dictionary size {}",
538                            symbols.len(),
539                            values.len()
540                        )));
541                    }
542                    for i in 0..values.len() {
543                        if values.value(i) != symbols[i].as_str() {
544                            return Err(ArrowError::SchemaError(format!(
545                                "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
546                                symbols[i],
547                                values.value(i)
548                            )));
549                        }
550                    }
551                    let keys = dict.keys();
552                    Encoder::Enum(EnumEncoder { keys })
553                }
554                other => {
555                    return Err(ArrowError::SchemaError(format!(
556                        "Avro enum site requires DataType::Dictionary, found: {other:?}"
557                    )));
558                }
559            },
560            FieldPlan::Union { bindings } => {
561                let arr = array
562                    .as_any()
563                    .downcast_ref::<UnionArray>()
564                    .ok_or_else(|| ArrowError::SchemaError("Expected UnionArray".into()))?;
565                Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
566            }
567            FieldPlan::RunEndEncoded {
568                values_nullability,
569                value_plan,
570            } => {
571                // Helper closure to build a typed RunEncodedEncoder<R>
572                let build = |run_arr_any: &'a dyn Array| -> Result<Encoder<'a>, ArrowError> {
573                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
574                        return Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
575                            Int16Type,
576                        >::new(
577                            arr,
578                            FieldEncoder::make_encoder(
579                                arr.values().as_ref(),
580                                value_plan.as_ref(),
581                                *values_nullability,
582                            )?,
583                        ))));
584                    }
585                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
586                        return Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
587                            Int32Type,
588                        >::new(
589                            arr,
590                            FieldEncoder::make_encoder(
591                                arr.values().as_ref(),
592                                value_plan.as_ref(),
593                                *values_nullability,
594                            )?,
595                        ))));
596                    }
597                    if let Some(arr) = run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
598                        return Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
599                            Int64Type,
600                        >::new(
601                            arr,
602                            FieldEncoder::make_encoder(
603                                arr.values().as_ref(),
604                                value_plan.as_ref(),
605                                *values_nullability,
606                            )?,
607                        ))));
608                    }
609                    Err(ArrowError::SchemaError(
610                        "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64"
611                            .into(),
612                    ))
613                };
614                build(array)?
615            }
616        };
617        // Compute the effective null state from writer-declared nullability and data nulls.
618        let null_state = match nullability {
619            None => NullState::NonNullable,
620            Some(null_order) => {
621                match array.nulls() {
622                    Some(nulls) if array.null_count() > 0 => {
623                        NullState::Nullable { nulls, null_order }
624                    }
625                    _ => NullState::NullableNoNulls {
626                        // Nullable site with no null buffer for this view
627                        union_value_byte: union_value_branch_byte(null_order, false),
628                    },
629                }
630            }
631        };
632        Ok(Self {
633            encoder,
634            null_state,
635        })
636    }
637
638    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
639        match &self.null_state {
640            NullState::NonNullable => {}
641            NullState::NullableNoNulls { union_value_byte } => out
642                .write_all(&[*union_value_byte])
643                .map_err(|e| ArrowError::IoError(format!("write union value branch: {e}"), e))?,
644            NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
645                return write_optional_index(out, true, *null_order); // no value to write
646            }
647            NullState::Nullable { null_order, .. } => {
648                write_optional_index(out, false, *null_order)?;
649            }
650        }
651        self.encoder.encode(out, idx)
652    }
653}
654
655fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
656    let nulls_first = null_order == Nullability::default();
657    if nulls_first == is_null { 0x00 } else { 0x02 }
658}
659
660/// Per‑site encoder plan for a field. This mirrors the Avro structure, so nested
661/// optional branch order can be honored exactly as declared by the schema.
662#[derive(Debug, Clone)]
663enum FieldPlan {
664    /// Non-nested scalar/logical type
665    Scalar,
666    /// Record/Struct with Avro‑ordered children
667    Struct { bindings: Vec<FieldBinding> },
668    /// Array with item‑site nullability and nested plan
669    List {
670        items_nullability: Option<Nullability>,
671        item_plan: Box<FieldPlan>,
672    },
673    /// Avro decimal logical type (bytes or fixed). `size=None` => bytes(decimal), `Some(n)` => fixed(n)
674    Decimal { size: Option<usize> },
675    /// Avro UUID logical type (fixed)
676    Uuid,
677    /// Avro map with value‑site nullability and nested plan
678    Map {
679        values_nullability: Option<Nullability>,
680        value_plan: Box<FieldPlan>,
681    },
682    /// Avro enum; maps to Arrow Dictionary<Int32, Utf8> with dictionary values
683    /// exactly equal and ordered as the Avro enum `symbols`.
684    Enum { symbols: Arc<[String]> },
685    /// Avro union, maps to Arrow Union.
686    Union { bindings: Vec<FieldBinding> },
687    /// Avro RunEndEncoded site. Values are encoded per logical row by mapping the
688    /// row index to its containing run and emitting that run's value with `value_plan`.
689    RunEndEncoded {
690        values_nullability: Option<Nullability>,
691        value_plan: Box<FieldPlan>,
692    },
693}
694
695#[derive(Debug, Clone)]
696struct FieldBinding {
697    /// Index of the Arrow field/column associated with this Avro field site
698    arrow_index: usize,
699    /// Nullability/order for this site (None for required fields)
700    nullability: Option<Nullability>,
701    /// Nested plan for this site
702    plan: FieldPlan,
703}
704
705/// Builder for `RecordEncoder` write plan
706#[derive(Debug)]
707pub(crate) struct RecordEncoderBuilder<'a> {
708    avro_root: &'a AvroField,
709    arrow_schema: &'a ArrowSchema,
710    fingerprint: Option<Fingerprint>,
711}
712
713impl<'a> RecordEncoderBuilder<'a> {
714    /// Create a new builder from the Avro root and Arrow schema.
715    pub(crate) fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
716        Self {
717            avro_root,
718            arrow_schema,
719            fingerprint: None,
720        }
721    }
722
723    pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
724        self.fingerprint = fingerprint;
725        self
726    }
727
728    /// Build the `RecordEncoder` by walking the Avro **record** root in Avro order,
729    /// resolving each field to an Arrow index by name.
730    pub(crate) fn build(self) -> Result<RecordEncoder, ArrowError> {
731        let avro_root_dt = self.avro_root.data_type();
732        let Codec::Struct(root_fields) = avro_root_dt.codec() else {
733            return Err(ArrowError::SchemaError(
734                "Top-level Avro schema must be a record/struct".into(),
735            ));
736        };
737        let mut columns = Vec::with_capacity(root_fields.len());
738        for root_field in root_fields.as_ref() {
739            let name = root_field.name();
740            let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
741                ArrowError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
742            })?;
743            columns.push(FieldBinding {
744                arrow_index,
745                nullability: root_field.data_type().nullability(),
746                plan: FieldPlan::build(
747                    root_field.data_type(),
748                    self.arrow_schema.field(arrow_index),
749                )?,
750            });
751        }
752        Ok(RecordEncoder {
753            columns,
754            prefix: self.fingerprint.map(|fp| fp.make_prefix()),
755        })
756    }
757}
758
759/// A pre-computed plan for encoding a `RecordBatch` to Avro.
760///
761/// Derived from an Avro schema and an Arrow schema. It maps
762/// top-level Avro fields to Arrow columns and contains a nested encoding plan
763/// for each column.
764#[derive(Debug, Clone)]
765pub(crate) struct RecordEncoder {
766    columns: Vec<FieldBinding>,
767    /// Optional pre-built, variable-length prefix written before each record.
768    prefix: Option<Prefix>,
769}
770
771impl RecordEncoder {
772    fn prepare_for_batch<'a>(
773        &'a self,
774        batch: &'a RecordBatch,
775    ) -> Result<Vec<FieldEncoder<'a>>, ArrowError> {
776        let arrays = batch.columns();
777        let mut out = Vec::with_capacity(self.columns.len());
778        for col_plan in self.columns.iter() {
779            let arrow_index = col_plan.arrow_index;
780            let array = arrays.get(arrow_index).ok_or_else(|| {
781                ArrowError::SchemaError(format!("Column index {arrow_index} out of range"))
782            })?;
783            #[cfg(not(feature = "avro_custom_types"))]
784            let site_nullability = match &col_plan.plan {
785                FieldPlan::RunEndEncoded { .. } => None,
786                _ => col_plan.nullability,
787            };
788            #[cfg(feature = "avro_custom_types")]
789            let site_nullability = col_plan.nullability;
790            out.push(FieldEncoder::make_encoder(
791                array.as_ref(),
792                &col_plan.plan,
793                site_nullability,
794            )?);
795        }
796        Ok(out)
797    }
798
799    /// Encode a `RecordBatch` using this encoder plan.
800    ///
801    /// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of many small writes.
802    pub(crate) fn encode<W: Write>(
803        &self,
804        out: &mut W,
805        batch: &RecordBatch,
806    ) -> Result<(), ArrowError> {
807        let mut column_encoders = self.prepare_for_batch(batch)?;
808        let n = batch.num_rows();
809        match self.prefix {
810            Some(prefix) => {
811                for row in 0..n {
812                    out.write_all(prefix.as_slice())
813                        .map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?;
814                    for enc in column_encoders.iter_mut() {
815                        enc.encode(out, row)?;
816                    }
817                }
818            }
819            None => {
820                for row in 0..n {
821                    for enc in column_encoders.iter_mut() {
822                        enc.encode(out, row)?;
823                    }
824                }
825            }
826        }
827        Ok(())
828    }
829}
830
831fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
832    fields.iter().position(|f| f.name() == name)
833}
834
835fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
836    // Prefer common Arrow field names; fall back to second child if exactly two
837    find_struct_child_index(fields, "value")
838        .or_else(|| find_struct_child_index(fields, "values"))
839        .or_else(|| if fields.len() == 2 { Some(1) } else { None })
840}
841
842impl FieldPlan {
843    fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, ArrowError> {
844        #[cfg(not(feature = "avro_custom_types"))]
845        if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() {
846            let values_nullability = avro_dt.nullability();
847            let value_site_dt: &AvroDataType = match avro_dt.codec() {
848                Codec::Union(branches, _, _) => branches
849                    .iter()
850                    .find(|b| !matches!(b.codec(), Codec::Null))
851                    .ok_or_else(|| {
852                        ArrowError::SchemaError(
853                            "Avro union at RunEndEncoded site has no non-null branch".into(),
854                        )
855                    })?,
856                _ => avro_dt,
857            };
858            return Ok(FieldPlan::RunEndEncoded {
859                values_nullability,
860                value_plan: Box::new(FieldPlan::build(value_site_dt, values_field.as_ref())?),
861            });
862        }
863        if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
864            // Extension-based detection (only when the feature is enabled)
865            let ext_is_uuid = {
866                #[cfg(feature = "canonical_extension_types")]
867                {
868                    matches!(
869                        arrow_field.extension_type_name(),
870                        Some("arrow.uuid") | Some("uuid")
871                    )
872                }
873                #[cfg(not(feature = "canonical_extension_types"))]
874                {
875                    false
876                }
877            };
878            let md_is_uuid = arrow_field
879                .metadata()
880                .get("logicalType")
881                .map(|s| s.as_str())
882                == Some("uuid");
883            if ext_is_uuid || md_is_uuid {
884                if *len != 16 {
885                    return Err(ArrowError::InvalidArgumentError(
886                        "logicalType=uuid requires FixedSizeBinary(16)".into(),
887                    ));
888                }
889                return Ok(FieldPlan::Uuid);
890            }
891        }
892        match avro_dt.codec() {
893            Codec::Struct(avro_fields) => {
894                let fields = match arrow_field.data_type() {
895                    DataType::Struct(struct_fields) => struct_fields,
896                    other => {
897                        return Err(ArrowError::SchemaError(format!(
898                            "Avro struct maps to Arrow Struct, found: {other:?}"
899                        )));
900                    }
901                };
902                let mut bindings = Vec::with_capacity(avro_fields.len());
903                for avro_field in avro_fields.iter() {
904                    let name = avro_field.name().to_string();
905                    let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
906                        ArrowError::SchemaError(format!(
907                            "Struct field '{name}' not present in Arrow field '{}'",
908                            arrow_field.name()
909                        ))
910                    })?;
911                    bindings.push(FieldBinding {
912                        arrow_index: idx,
913                        nullability: avro_field.data_type().nullability(),
914                        plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
915                    });
916                }
917                Ok(FieldPlan::Struct { bindings })
918            }
919            Codec::List(items_dt) => match arrow_field.data_type() {
920                DataType::List(field_ref)
921                | DataType::LargeList(field_ref)
922                | DataType::ListView(field_ref)
923                | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
924                    items_nullability: items_dt.nullability(),
925                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
926                }),
927                DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List {
928                    items_nullability: items_dt.nullability(),
929                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
930                }),
931                other => Err(ArrowError::SchemaError(format!(
932                    "Avro array maps to Arrow List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
933                ))),
934            },
935            Codec::Map(values_dt) => {
936                let entries_field = match arrow_field.data_type() {
937                    DataType::Map(entries, _sorted) => entries.as_ref(),
938                    other => {
939                        return Err(ArrowError::SchemaError(format!(
940                            "Avro map maps to Arrow DataType::Map, found: {other:?}"
941                        )));
942                    }
943                };
944                let entries_struct_fields = match entries_field.data_type() {
945                    DataType::Struct(fs) => fs,
946                    other => {
947                        return Err(ArrowError::SchemaError(format!(
948                            "Arrow Map entries must be Struct, found: {other:?}"
949                        )));
950                    }
951                };
952                let value_idx =
953                    find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
954                        ArrowError::SchemaError("Map entries struct missing value field".into())
955                    })?;
956                let value_field = entries_struct_fields[value_idx].as_ref();
957                let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
958                Ok(FieldPlan::Map {
959                    values_nullability: values_dt.nullability(),
960                    value_plan: Box::new(value_plan),
961                })
962            }
963            Codec::Enum(symbols) => match arrow_field.data_type() {
964                DataType::Dictionary(key_dt, value_dt) => {
965                    if **key_dt != DataType::Int32 {
966                        return Err(ArrowError::SchemaError(
967                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
968                        ));
969                    }
970                    if **value_dt != DataType::Utf8 {
971                        return Err(ArrowError::SchemaError(
972                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
973                        ));
974                    }
975                    Ok(FieldPlan::Enum {
976                        symbols: symbols.clone(),
977                    })
978                }
979                other => Err(ArrowError::SchemaError(format!(
980                    "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
981                ))),
982            },
983            // decimal site (bytes or fixed(N)) with precision/scale validation
984            Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
985                let (ap, as_) = match arrow_field.data_type() {
986                    #[cfg(feature = "small_decimals")]
987                    DataType::Decimal32(p, s) => (*p as usize, *s as i32),
988                    #[cfg(feature = "small_decimals")]
989                    DataType::Decimal64(p, s) => (*p as usize, *s as i32),
990                    DataType::Decimal128(p, s) => (*p as usize, *s as i32),
991                    DataType::Decimal256(p, s) => (*p as usize, *s as i32),
992                    other => {
993                        return Err(ArrowError::SchemaError(format!(
994                            "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
995                            arrow_field.name()
996                        )));
997                    }
998                };
999                let sc = scale_opt.unwrap_or(0) as i32; // Avro scale defaults to 0 if absent
1000                if ap != *precision || as_ != sc {
1001                    return Err(ArrowError::SchemaError(format!(
1002                        "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
1003                        arrow_field.name()
1004                    )));
1005                }
1006                Ok(FieldPlan::Decimal {
1007                    size: *fixed_size_opt,
1008                })
1009            }
1010            Codec::Interval => match arrow_field.data_type() {
1011                DataType::Interval(
1012                    IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime,
1013                ) => Ok(FieldPlan::Scalar),
1014                other => Err(ArrowError::SchemaError(format!(
1015                    "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}"
1016                ))),
1017            },
1018            Codec::Union(avro_branches, _, UnionMode::Dense) => {
1019                let arrow_union_fields = match arrow_field.data_type() {
1020                    DataType::Union(fields, UnionMode::Dense) => fields,
1021                    DataType::Union(_, UnionMode::Sparse) => {
1022                        return Err(ArrowError::NotYetImplemented(
1023                            "Sparse Arrow unions are not yet supported".to_string(),
1024                        ));
1025                    }
1026                    other => {
1027                        return Err(ArrowError::SchemaError(format!(
1028                            "Avro union maps to Arrow Union, found: {other:?}"
1029                        )));
1030                    }
1031                };
1032                if avro_branches.len() != arrow_union_fields.len() {
1033                    return Err(ArrowError::SchemaError(format!(
1034                        "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'",
1035                        avro_branches.len(),
1036                        arrow_union_fields.len(),
1037                        arrow_field.name()
1038                    )));
1039                }
1040                let bindings = avro_branches
1041                    .iter()
1042                    .zip(arrow_union_fields.iter())
1043                    .enumerate()
1044                    .map(|(i, (avro_branch, (_, arrow_child_field)))| {
1045                        Ok(FieldBinding {
1046                            arrow_index: i,
1047                            nullability: avro_branch.nullability(),
1048                            plan: FieldPlan::build(avro_branch, arrow_child_field)?,
1049                        })
1050                    })
1051                    .collect::<Result<Vec<_>, ArrowError>>()?;
1052                Ok(FieldPlan::Union { bindings })
1053            }
1054            Codec::Union(_, _, UnionMode::Sparse) => Err(ArrowError::NotYetImplemented(
1055                "Sparse Arrow unions are not yet supported".to_string(),
1056            )),
1057            #[cfg(feature = "avro_custom_types")]
1058            Codec::RunEndEncoded(values_dt, _width_code) => {
1059                let values_field = match arrow_field.data_type() {
1060                    DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(),
1061                    other => {
1062                        return Err(ArrowError::SchemaError(format!(
1063                            "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}"
1064                        )));
1065                    }
1066                };
1067                Ok(FieldPlan::RunEndEncoded {
1068                    values_nullability: values_dt.nullability(),
1069                    value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), values_field)?),
1070                })
1071            }
1072            _ => Ok(FieldPlan::Scalar),
1073        }
1074    }
1075}
1076
1077enum Encoder<'a> {
1078    Boolean(BooleanEncoder<'a>),
1079    Int(IntEncoder<'a, Int32Type>),
1080    Long(LongEncoder<'a, Int64Type>),
1081    TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
1082    TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
1083    TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
1084    TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
1085    Date32(IntEncoder<'a, Date32Type>),
1086    Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
1087    Time32Millis(IntEncoder<'a, Time32MillisecondType>),
1088    Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
1089    DurationSeconds(LongEncoder<'a, DurationSecondType>),
1090    DurationMillis(LongEncoder<'a, DurationMillisecondType>),
1091    DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
1092    DurationNanos(LongEncoder<'a, DurationNanosecondType>),
1093    Float32(F32Encoder<'a>),
1094    Float64(F64Encoder<'a>),
1095    Binary(BinaryEncoder<'a, i32>),
1096    LargeBinary(BinaryEncoder<'a, i64>),
1097    Utf8(Utf8Encoder<'a>),
1098    Utf8Large(Utf8LargeEncoder<'a>),
1099    Utf8View(Utf8ViewEncoder<'a>),
1100    BinaryView(BinaryViewEncoder<'a>),
1101    List(Box<ListEncoder32<'a>>),
1102    LargeList(Box<ListEncoder64<'a>>),
1103    ListView(Box<ListViewEncoder32<'a>>),
1104    LargeListView(Box<ListViewEncoder64<'a>>),
1105    FixedSizeList(Box<FixedSizeListEncoder<'a>>),
1106    Struct(Box<StructEncoder<'a>>),
1107    /// Avro `fixed` encoder (raw bytes, no length)
1108    Fixed(FixedEncoder<'a>),
1109    /// Avro `uuid` logical type encoder (string with RFC‑4122 hyphenated text)
1110    Uuid(UuidEncoder<'a>),
1111    /// Avro `duration` logical type (Arrow Interval(MonthDayNano)) encoder
1112    IntervalMonthDayNano(DurationEncoder<'a, IntervalMonthDayNanoType>),
1113    /// Avro `duration` logical type (Arrow Interval(YearMonth)) encoder
1114    IntervalYearMonth(DurationEncoder<'a, IntervalYearMonthType>),
1115    /// Avro `duration` logical type (Arrow Interval(DayTime)) encoder
1116    IntervalDayTime(DurationEncoder<'a, IntervalDayTimeType>),
1117    #[cfg(feature = "small_decimals")]
1118    Decimal32(Decimal32Encoder<'a>),
1119    #[cfg(feature = "small_decimals")]
1120    Decimal64(Decimal64Encoder<'a>),
1121    Decimal128(Decimal128Encoder<'a>),
1122    Decimal256(Decimal256Encoder<'a>),
1123    /// Avro `enum` encoder: writes the key (int) as the enum index.
1124    Enum(EnumEncoder<'a>),
1125    Map(Box<MapEncoder<'a>>),
1126    Union(Box<UnionEncoder<'a>>),
1127    /// Run-end encoded values with specific run-end index widths
1128    RunEncoded16(Box<RunEncodedEncoder16<'a>>),
1129    RunEncoded32(Box<RunEncodedEncoder32<'a>>),
1130    RunEncoded64(Box<RunEncodedEncoder64<'a>>),
1131    Null,
1132}
1133
1134impl<'a> Encoder<'a> {
1135    /// Encode the value at `idx`.
1136    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1137        match self {
1138            Encoder::Boolean(e) => e.encode(out, idx),
1139            Encoder::Int(e) => e.encode(out, idx),
1140            Encoder::Long(e) => e.encode(out, idx),
1141            Encoder::TimestampMicros(e) => e.encode(out, idx),
1142            Encoder::TimestampMillis(e) => e.encode(out, idx),
1143            Encoder::TimestampNanos(e) => e.encode(out, idx),
1144            Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
1145            Encoder::Date32(e) => e.encode(out, idx),
1146            Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
1147            Encoder::Time32Millis(e) => e.encode(out, idx),
1148            Encoder::Time64Micros(e) => e.encode(out, idx),
1149            Encoder::DurationSeconds(e) => e.encode(out, idx),
1150            Encoder::DurationMicros(e) => e.encode(out, idx),
1151            Encoder::DurationMillis(e) => e.encode(out, idx),
1152            Encoder::DurationNanos(e) => e.encode(out, idx),
1153            Encoder::Float32(e) => e.encode(out, idx),
1154            Encoder::Float64(e) => e.encode(out, idx),
1155            Encoder::Binary(e) => e.encode(out, idx),
1156            Encoder::LargeBinary(e) => e.encode(out, idx),
1157            Encoder::Utf8(e) => e.encode(out, idx),
1158            Encoder::Utf8Large(e) => e.encode(out, idx),
1159            Encoder::Utf8View(e) => e.encode(out, idx),
1160            Encoder::BinaryView(e) => e.encode(out, idx),
1161            Encoder::List(e) => e.encode(out, idx),
1162            Encoder::LargeList(e) => e.encode(out, idx),
1163            Encoder::ListView(e) => e.encode(out, idx),
1164            Encoder::LargeListView(e) => e.encode(out, idx),
1165            Encoder::FixedSizeList(e) => e.encode(out, idx),
1166            Encoder::Struct(e) => e.encode(out, idx),
1167            Encoder::Fixed(e) => (e).encode(out, idx),
1168            Encoder::Uuid(e) => (e).encode(out, idx),
1169            Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx),
1170            Encoder::IntervalYearMonth(e) => (e).encode(out, idx),
1171            Encoder::IntervalDayTime(e) => (e).encode(out, idx),
1172            #[cfg(feature = "small_decimals")]
1173            Encoder::Decimal32(e) => (e).encode(out, idx),
1174            #[cfg(feature = "small_decimals")]
1175            Encoder::Decimal64(e) => (e).encode(out, idx),
1176            Encoder::Decimal128(e) => (e).encode(out, idx),
1177            Encoder::Decimal256(e) => (e).encode(out, idx),
1178            Encoder::Map(e) => (e).encode(out, idx),
1179            Encoder::Enum(e) => (e).encode(out, idx),
1180            Encoder::Union(e) => (e).encode(out, idx),
1181            Encoder::RunEncoded16(e) => (e).encode(out, idx),
1182            Encoder::RunEncoded32(e) => (e).encode(out, idx),
1183            Encoder::RunEncoded64(e) => (e).encode(out, idx),
1184            Encoder::Null => Ok(()),
1185        }
1186    }
1187}
1188
1189struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
1190impl BooleanEncoder<'_> {
1191    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1192        write_bool(out, self.0.value(idx))
1193    }
1194}
1195
1196/// Generic Avro `int` encoder for primitive arrays with `i32` native values.
1197struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
1198impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
1199    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1200        write_int(out, self.0.value(idx))
1201    }
1202}
1203
1204/// Generic Avro `long` encoder for primitive arrays with `i64` native values.
1205struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
1206impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
1207    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1208        write_long(out, self.0.value(idx))
1209    }
1210}
1211
1212/// Time32(Second) to Avro time-millis (int), via safe scaling by 1000
1213struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
1214impl<'a> Time32SecondsToMillisEncoder<'a> {
1215    #[inline]
1216    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1217        let secs = self.0.value(idx);
1218        let millis = secs.checked_mul(1000).ok_or_else(|| {
1219            ArrowError::InvalidArgumentError("time32(secs) * 1000 overflowed".into())
1220        })?;
1221        write_int(out, millis)
1222    }
1223}
1224
1225/// Timestamp(Second) to Avro timestamp-millis (long), via safe scaling by 1000
1226struct TimestampSecondsToMillisEncoder<'a>(&'a PrimitiveArray<TimestampSecondType>);
1227impl<'a> TimestampSecondsToMillisEncoder<'a> {
1228    #[inline]
1229    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1230        let secs = self.0.value(idx);
1231        let millis = secs.checked_mul(1000).ok_or_else(|| {
1232            ArrowError::InvalidArgumentError("timestamp(secs) * 1000 overflowed".into())
1233        })?;
1234        write_long(out, millis)
1235    }
1236}
1237
1238/// Unified binary encoder generic over offset size (i32/i64).
1239struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
1240impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
1241    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1242        write_len_prefixed(out, self.0.value(idx))
1243    }
1244}
1245
1246/// BinaryView (byte view) encoder.
1247struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
1248impl BinaryViewEncoder<'_> {
1249    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1250        write_len_prefixed(out, self.0.value(idx))
1251    }
1252}
1253
1254/// StringView encoder.
1255struct Utf8ViewEncoder<'a>(&'a StringViewArray);
1256impl Utf8ViewEncoder<'_> {
1257    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1258        write_len_prefixed(out, self.0.value(idx).as_bytes())
1259    }
1260}
1261
1262struct F32Encoder<'a>(&'a arrow_array::Float32Array);
1263impl F32Encoder<'_> {
1264    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1265        // Avro float: 4 bytes, IEEE-754 little-endian
1266        let bits = self.0.value(idx).to_bits();
1267        out.write_all(&bits.to_le_bytes())
1268            .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e))
1269    }
1270}
1271
1272struct F64Encoder<'a>(&'a arrow_array::Float64Array);
1273impl F64Encoder<'_> {
1274    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1275        // Avro double: 8 bytes, IEEE-754 little-endian
1276        let bits = self.0.value(idx).to_bits();
1277        out.write_all(&bits.to_le_bytes())
1278            .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
1279    }
1280}
1281
1282struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
1283
1284impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
1285    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1286        write_len_prefixed(out, self.0.value(idx).as_bytes())
1287    }
1288}
1289
1290type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
1291type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
1292
1293/// Internal key array kind used by Map encoder.
1294enum KeyKind<'a> {
1295    Utf8(&'a GenericStringArray<i32>),
1296    LargeUtf8(&'a GenericStringArray<i64>),
1297}
1298struct MapEncoder<'a> {
1299    map: &'a MapArray,
1300    keys: KeyKind<'a>,
1301    values: FieldEncoder<'a>,
1302    keys_offset: usize,
1303    values_offset: usize,
1304}
1305
1306impl<'a> MapEncoder<'a> {
1307    fn try_new(
1308        map: &'a MapArray,
1309        values_nullability: Option<Nullability>,
1310        value_plan: &FieldPlan,
1311    ) -> Result<Self, ArrowError> {
1312        let keys_arr = map.keys();
1313        let keys_kind = match keys_arr.data_type() {
1314            DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
1315            DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
1316            other => {
1317                return Err(ArrowError::SchemaError(format!(
1318                    "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
1319                )));
1320            }
1321        };
1322        Ok(Self {
1323            map,
1324            keys: keys_kind,
1325            values: FieldEncoder::make_encoder(
1326                map.values().as_ref(),
1327                value_plan,
1328                values_nullability,
1329            )?,
1330            keys_offset: keys_arr.offset(),
1331            values_offset: map.values().offset(),
1332        })
1333    }
1334
1335    fn encode_map_entries<W, O>(
1336        out: &mut W,
1337        keys: &GenericStringArray<O>,
1338        keys_offset: usize,
1339        start: usize,
1340        end: usize,
1341        mut write_item: impl FnMut(&mut W, usize) -> Result<(), ArrowError>,
1342    ) -> Result<(), ArrowError>
1343    where
1344        W: Write + ?Sized,
1345        O: OffsetSizeTrait,
1346    {
1347        encode_blocked_range(out, start, end, |out, j| {
1348            let j_key = j.saturating_sub(keys_offset);
1349            write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1350            write_item(out, j)
1351        })
1352    }
1353
1354    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1355        let offsets = self.map.offsets();
1356        let start = offsets[idx] as usize;
1357        let end = offsets[idx + 1] as usize;
1358        let write_item = |out: &mut W, j: usize| {
1359            let j_val = j.saturating_sub(self.values_offset);
1360            self.values.encode(out, j_val)
1361        };
1362        match self.keys {
1363            KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1364                out,
1365                arr,
1366                self.keys_offset,
1367                start,
1368                end,
1369                write_item,
1370            ),
1371            KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1372                out,
1373                arr,
1374                self.keys_offset,
1375                start,
1376                end,
1377                write_item,
1378            ),
1379        }
1380    }
1381}
1382
1383/// Avro `enum` encoder for Arrow `DictionaryArray<Int32, Utf8>`.
1384///
1385/// Per Avro spec, an enum is encoded as an **int** equal to the
1386/// zero-based position of the symbol in the schema’s `symbols` list.
1387/// We validate at construction that the dictionary values equal the symbols,
1388/// so we can directly write the key value here.
1389struct EnumEncoder<'a> {
1390    keys: &'a PrimitiveArray<Int32Type>,
1391}
1392impl EnumEncoder<'_> {
1393    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), ArrowError> {
1394        write_int(out, self.keys.value(row))
1395    }
1396}
1397
1398struct UnionEncoder<'a> {
1399    encoders: Vec<FieldEncoder<'a>>,
1400    array: &'a UnionArray,
1401    type_id_to_encoder_index: Vec<Option<usize>>,
1402}
1403
1404impl<'a> UnionEncoder<'a> {
1405    fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) -> Result<Self, ArrowError> {
1406        let DataType::Union(fields, UnionMode::Dense) = array.data_type() else {
1407            return Err(ArrowError::SchemaError("Expected Dense UnionArray".into()));
1408        };
1409        if fields.len() != field_bindings.len() {
1410            return Err(ArrowError::SchemaError(format!(
1411                "Mismatched number of union branches between Arrow array ({}) and encoding plan ({})",
1412                fields.len(),
1413                field_bindings.len()
1414            )));
1415        }
1416        let max_type_id = fields.iter().map(|(tid, _)| tid).max().unwrap_or(0);
1417        let mut type_id_to_encoder_index: Vec<Option<usize>> =
1418            vec![None; (max_type_id + 1) as usize];
1419        let mut encoders = Vec::with_capacity(fields.len());
1420        for (i, (type_id, _)) in fields.iter().enumerate() {
1421            let binding = field_bindings
1422                .get(i)
1423                .ok_or_else(|| ArrowError::SchemaError("Binding and field mismatch".to_string()))?;
1424            encoders.push(FieldEncoder::make_encoder(
1425                array.child(type_id).as_ref(),
1426                &binding.plan,
1427                binding.nullability,
1428            )?);
1429            type_id_to_encoder_index[type_id as usize] = Some(i);
1430        }
1431        Ok(Self {
1432            encoders,
1433            array,
1434            type_id_to_encoder_index,
1435        })
1436    }
1437
1438    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1439        // SAFETY: `idx` is always in bounds because:
1440        // 1. The encoder is called from `RecordEncoder::encode,` which iterates over `0..batch.num_rows()`
1441        // 2. `self.array` is a column from the same batch, so its length equals `batch.num_rows()`
1442        // 3. `type_ids()` returns a buffer with exactly `self.array.len()` entries (one per logical element)
1443        let type_id = self.array.type_ids()[idx];
1444        let encoder_index = self
1445            .type_id_to_encoder_index
1446            .get(type_id as usize)
1447            .and_then(|opt| *opt)
1448            .ok_or_else(|| ArrowError::SchemaError(format!("Invalid type_id {type_id}")))?;
1449        write_int(out, encoder_index as i32)?;
1450        let encoder = self.encoders.get_mut(encoder_index).ok_or_else(|| {
1451            ArrowError::SchemaError(format!("Invalid encoder index {encoder_index}"))
1452        })?;
1453        encoder.encode(out, self.array.value_offset(idx))
1454    }
1455}
1456
1457struct StructEncoder<'a> {
1458    encoders: Vec<FieldEncoder<'a>>,
1459}
1460
1461impl<'a> StructEncoder<'a> {
1462    fn try_new(
1463        array: &'a StructArray,
1464        field_bindings: &[FieldBinding],
1465    ) -> Result<Self, ArrowError> {
1466        let mut encoders = Vec::with_capacity(field_bindings.len());
1467        for field_binding in field_bindings {
1468            let idx = field_binding.arrow_index;
1469            let column = array.columns().get(idx).ok_or_else(|| {
1470                ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1471            })?;
1472            let encoder = FieldEncoder::make_encoder(
1473                column.as_ref(),
1474                &field_binding.plan,
1475                field_binding.nullability,
1476            )?;
1477            encoders.push(encoder);
1478        }
1479        Ok(Self { encoders })
1480    }
1481
1482    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1483        for encoder in self.encoders.iter_mut() {
1484            encoder.encode(out, idx)?;
1485        }
1486        Ok(())
1487    }
1488}
1489
1490/// Encode a blocked range of items with Avro array block framing.
1491///
1492/// `write_item` must take `(out, index)` to maintain the "out-first" convention.
1493fn encode_blocked_range<W: Write + ?Sized, F>(
1494    out: &mut W,
1495    start: usize,
1496    end: usize,
1497    mut write_item: F,
1498) -> Result<(), ArrowError>
1499where
1500    F: FnMut(&mut W, usize) -> Result<(), ArrowError>,
1501{
1502    let len = end.saturating_sub(start);
1503    if len == 0 {
1504        // Zero-length terminator per Avro spec.
1505        write_long(out, 0)?;
1506        return Ok(());
1507    }
1508    // Emit a single positive block for performance, then the end marker.
1509    write_long(out, len as i64)?;
1510    for row in start..end {
1511        write_item(out, row)?;
1512    }
1513    write_long(out, 0)?;
1514    Ok(())
1515}
1516
1517struct ListEncoder<'a, O: OffsetSizeTrait> {
1518    list: &'a GenericListArray<O>,
1519    values: FieldEncoder<'a>,
1520    values_offset: usize,
1521}
1522
1523type ListEncoder32<'a> = ListEncoder<'a, i32>;
1524type ListEncoder64<'a> = ListEncoder<'a, i64>;
1525
1526impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
1527    fn try_new(
1528        list: &'a GenericListArray<O>,
1529        items_nullability: Option<Nullability>,
1530        item_plan: &FieldPlan,
1531    ) -> Result<Self, ArrowError> {
1532        Ok(Self {
1533            list,
1534            values: FieldEncoder::make_encoder(
1535                list.values().as_ref(),
1536                item_plan,
1537                items_nullability,
1538            )?,
1539            values_offset: list.values().offset(),
1540        })
1541    }
1542
1543    fn encode_list_range<W: Write + ?Sized>(
1544        &mut self,
1545        out: &mut W,
1546        start: usize,
1547        end: usize,
1548    ) -> Result<(), ArrowError> {
1549        encode_blocked_range(out, start, end, |out, row| {
1550            self.values
1551                .encode(out, row.saturating_sub(self.values_offset))
1552        })
1553    }
1554
1555    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1556        let offsets = self.list.offsets();
1557        let start = offsets[idx].to_usize().ok_or_else(|| {
1558            ArrowError::InvalidArgumentError(format!("Error converting offset[{idx}] to usize"))
1559        })?;
1560        let end = offsets[idx + 1].to_usize().ok_or_else(|| {
1561            ArrowError::InvalidArgumentError(format!(
1562                "Error converting offset[{}] to usize",
1563                idx + 1
1564            ))
1565        })?;
1566        self.encode_list_range(out, start, end)
1567    }
1568}
1569
1570/// ListView encoder using `(offset, size)` buffers.
1571struct ListViewEncoder<'a, O: OffsetSizeTrait> {
1572    list: &'a GenericListViewArray<O>,
1573    values: FieldEncoder<'a>,
1574    values_offset: usize,
1575}
1576type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
1577type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
1578
1579impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
1580    fn try_new(
1581        list: &'a GenericListViewArray<O>,
1582        items_nullability: Option<Nullability>,
1583        item_plan: &FieldPlan,
1584    ) -> Result<Self, ArrowError> {
1585        Ok(Self {
1586            list,
1587            values: FieldEncoder::make_encoder(
1588                list.values().as_ref(),
1589                item_plan,
1590                items_nullability,
1591            )?,
1592            values_offset: list.values().offset(),
1593        })
1594    }
1595
1596    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1597        let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
1598            ArrowError::InvalidArgumentError(format!(
1599                "Error converting value_offset[{idx}] to usize"
1600            ))
1601        })?;
1602        let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
1603            ArrowError::InvalidArgumentError(format!("Error converting value_size[{idx}] to usize"))
1604        })?;
1605        let start = start + self.values_offset;
1606        let end = start + len;
1607        encode_blocked_range(out, start, end, |out, row| {
1608            self.values
1609                .encode(out, row.saturating_sub(self.values_offset))
1610        })
1611    }
1612}
1613
1614/// FixedSizeList encoder.
1615struct FixedSizeListEncoder<'a> {
1616    list: &'a FixedSizeListArray,
1617    values: FieldEncoder<'a>,
1618    values_offset: usize,
1619    elem_len: usize,
1620}
1621
1622impl<'a> FixedSizeListEncoder<'a> {
1623    fn try_new(
1624        list: &'a FixedSizeListArray,
1625        items_nullability: Option<Nullability>,
1626        item_plan: &FieldPlan,
1627    ) -> Result<Self, ArrowError> {
1628        Ok(Self {
1629            list,
1630            values: FieldEncoder::make_encoder(
1631                list.values().as_ref(),
1632                item_plan,
1633                items_nullability,
1634            )?,
1635            values_offset: list.values().offset(),
1636            elem_len: list.value_length() as usize,
1637        })
1638    }
1639
1640    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1641        // Starting index is relative to values() start
1642        let rel = self.list.value_offset(idx) as usize;
1643        let start = self.values_offset + rel;
1644        let end = start + self.elem_len;
1645        encode_blocked_range(out, start, end, |out, row| {
1646            self.values
1647                .encode(out, row.saturating_sub(self.values_offset))
1648        })
1649    }
1650}
1651
1652/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`.
1653/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
1654struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
1655impl FixedEncoder<'_> {
1656    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1657        let v = self.0.value(idx); // &[u8] of fixed width
1658        out.write_all(v)
1659            .map_err(|e| ArrowError::IoError(format!("write fixed bytes: {e}"), e))
1660    }
1661}
1662
1663/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) to Avro string (UUID).
1664/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated form.
1665struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
1666impl UuidEncoder<'_> {
1667    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1668        let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
1669        buf[0] = 0x48;
1670        let v = self.0.value(idx);
1671        let u = Uuid::from_slice(v)
1672            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid UUID bytes: {e}")))?;
1673        let _ = u.hyphenated().encode_lower(&mut buf[1..]);
1674        out.write_all(&buf)
1675            .map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e))
1676    }
1677}
1678
1679#[derive(Copy, Clone)]
1680struct DurationParts {
1681    months: u32,
1682    days: u32,
1683    millis: u32,
1684}
1685/// Trait mapping an Arrow interval native value to Avro duration `(months, days, millis)`.
1686trait IntervalToDurationParts: ArrowPrimitiveType {
1687    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError>;
1688}
1689impl IntervalToDurationParts for IntervalMonthDayNanoType {
1690    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1691        let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
1692        if months < 0 || days < 0 || nanos < 0 {
1693            return Err(ArrowError::InvalidArgumentError(
1694                "Avro 'duration' cannot encode negative months/days/nanoseconds".into(),
1695            ));
1696        }
1697        if nanos % 1_000_000 != 0 {
1698            return Err(ArrowError::InvalidArgumentError(
1699                "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000"
1700                    .into(),
1701            ));
1702        }
1703        let millis = nanos / 1_000_000;
1704        if millis > u32::MAX as i64 {
1705            return Err(ArrowError::InvalidArgumentError(
1706                "Avro 'duration' milliseconds exceed u32::MAX".into(),
1707            ));
1708        }
1709        Ok(DurationParts {
1710            months: months as u32,
1711            days: days as u32,
1712            millis: millis as u32,
1713        })
1714    }
1715}
1716impl IntervalToDurationParts for IntervalYearMonthType {
1717    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1718        if native < 0 {
1719            return Err(ArrowError::InvalidArgumentError(
1720                "Avro 'duration' cannot encode negative months".into(),
1721            ));
1722        }
1723        Ok(DurationParts {
1724            months: native as u32,
1725            days: 0,
1726            millis: 0,
1727        })
1728    }
1729}
1730impl IntervalToDurationParts for IntervalDayTimeType {
1731    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1732        let (days, millis) = IntervalDayTimeType::to_parts(native);
1733        if days < 0 || millis < 0 {
1734            return Err(ArrowError::InvalidArgumentError(
1735                "Avro 'duration' cannot encode negative days or milliseconds".into(),
1736            ));
1737        }
1738        Ok(DurationParts {
1739            months: 0,
1740            days: days as u32,
1741            millis: millis as u32,
1742        })
1743    }
1744}
1745
1746/// Single generic encoder used for all three interval units.
1747/// Writes Avro `fixed(12)` as three little-endian u32 values in one call.
1748struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
1749impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
1750    #[inline(always)]
1751    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1752        let parts = P::duration_parts(self.0.value(idx))?;
1753        let months = parts.months.to_le_bytes();
1754        let days = parts.days.to_le_bytes();
1755        let ms = parts.millis.to_le_bytes();
1756        // SAFETY
1757        // - Endianness & layout: Avro's `duration` logical type is encoded as fixed(12)
1758        //   with three *little-endian* unsigned 32-bit integers in order: (months, days, millis).
1759        //   We explicitly materialize exactly those 12 bytes.
1760        // - In-bounds indexing: `to_le_bytes()` on `u32` returns `[u8; 4]` by contract,
1761        //   therefore, the constant indices 0..=3 used below are *always* in-bounds.
1762        //   Rust will panic on out-of-bounds indexing, but there is no such path here;
1763        //   the compiler can also elide the bound checks for constant, provably in-range
1764        //   indices. [std docs; Rust Performance Book on bounds-check elimination]
1765        // - Memory safety: The `[u8; 12]` array is built on the stack by value, with no
1766        //   aliasing and no uninitialized memory. There is no `unsafe`.
1767        // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated and mapped
1768        //   into `ArrowError`, so I/O errors are reported, not panicked.
1769        // Consequently, constructing `buf` with the constant indices below is safe and
1770        // panic-free under these validated preconditions.
1771        let buf = [
1772            months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
1773            ms[1], ms[2], ms[3],
1774        ];
1775        out.write_all(&buf)
1776            .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e))
1777    }
1778}
1779
1780/// Minimal trait to obtain a big-endian fixed-size byte array for a decimal's
1781/// unscaled integer value at `idx`.
1782trait DecimalBeBytes<const N: usize> {
1783    fn value_be_bytes(&self, idx: usize) -> [u8; N];
1784}
1785#[cfg(feature = "small_decimals")]
1786impl DecimalBeBytes<4> for Decimal32Array {
1787    fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
1788        self.value(idx).to_be_bytes()
1789    }
1790}
1791#[cfg(feature = "small_decimals")]
1792impl DecimalBeBytes<8> for Decimal64Array {
1793    fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
1794        self.value(idx).to_be_bytes()
1795    }
1796}
1797impl DecimalBeBytes<16> for Decimal128Array {
1798    fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
1799        self.value(idx).to_be_bytes()
1800    }
1801}
1802impl DecimalBeBytes<32> for Decimal256Array {
1803    fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
1804        // Arrow i256 → [u8; 32] big-endian
1805        self.value(idx).to_be_bytes()
1806    }
1807}
1808
1809/// Generic Avro decimal encoder over Arrow decimal arrays.
1810/// - When `fixed_size` is `None` → Avro `bytes(decimal)`; writes the minimal
1811///   two's-complement representation with a length prefix.
1812/// - When `Some(n)` → Avro `fixed(n, decimal)`; sign-extends (or validates)
1813///   to exactly `n` bytes and writes them directly.
1814struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
1815    arr: &'a A,
1816    fixed_size: Option<usize>,
1817}
1818
1819impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
1820    fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
1821        Self { arr, fixed_size }
1822    }
1823
1824    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1825        let be = self.arr.value_be_bytes(idx);
1826        match self.fixed_size {
1827            Some(n) => write_sign_extended(out, &be, n),
1828            None => write_len_prefixed(out, minimal_twos_complement(&be)),
1829        }
1830    }
1831}
1832
1833#[cfg(feature = "small_decimals")]
1834type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
1835#[cfg(feature = "small_decimals")]
1836type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
1837type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
1838type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
1839
1840/// Generic encoder for Arrow `RunArray<R>`-based sites (run-end encoded).
1841/// Follows the pattern used by other generic encoders (i.e., `ListEncoder<O>`),
1842/// avoiding runtime branching on run-end width.
1843struct RunEncodedEncoder<'a, R: RunEndIndexType> {
1844    ends_slice: &'a [<R as ArrowPrimitiveType>::Native],
1845    base: usize,
1846    len: usize,
1847    values: FieldEncoder<'a>,
1848    // Cached run index used for sequential scans of rows [0..n)
1849    cur_run: usize,
1850    // Cached end (logical index, 1-based per spec) for the current run.
1851    cur_end: usize,
1852}
1853
1854type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>;
1855type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>;
1856type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>;
1857
1858impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
1859    fn new(arr: &'a RunArray<R>, values: FieldEncoder<'a>) -> Self {
1860        let ends = arr.run_ends();
1861        let base = ends.get_start_physical_index();
1862        let slice = ends.values();
1863        let len = ends.len();
1864        let cur_end = if len == 0 { 0 } else { slice[base].as_usize() };
1865        Self {
1866            ends_slice: slice,
1867            base,
1868            len,
1869            values,
1870            cur_run: 0,
1871            cur_end,
1872        }
1873    }
1874
1875    /// Advance `cur_run` so that `idx` is within the run ending at `cur_end`.
1876    /// Uses the REE invariant: run ends are strictly increasing, positive, and 1-based.
1877    #[inline(always)]
1878    fn advance_to_row(&mut self, idx: usize) -> Result<(), ArrowError> {
1879        if idx < self.cur_end {
1880            return Ok(());
1881        }
1882        // Move forward across run boundaries until idx falls within cur_end
1883        while self.cur_run + 1 < self.len && idx >= self.cur_end {
1884            self.cur_run += 1;
1885            self.cur_end = self.ends_slice[self.base + self.cur_run].as_usize();
1886        }
1887        if idx < self.cur_end {
1888            Ok(())
1889        } else {
1890            Err(ArrowError::InvalidArgumentError(format!(
1891                "row index {idx} out of bounds for run-ends ({} runs)",
1892                self.len
1893            )))
1894        }
1895    }
1896
1897    #[inline(always)]
1898    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1899        self.advance_to_row(idx)?;
1900        // For REE values, the value for any logical row within a run is at
1901        // the physical index of that run.
1902        self.values.encode(out, self.cur_run)
1903    }
1904}
1905
1906#[cfg(test)]
1907mod tests {
1908    use super::*;
1909    use arrow_array::types::Int32Type;
1910    use arrow_array::{
1911        Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
1912        Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, NullArray,
1913        StringArray,
1914    };
1915    use arrow_buffer::Buffer;
1916    use arrow_schema::{DataType, Field, Fields, UnionFields};
1917
1918    fn zigzag_i64(v: i64) -> u64 {
1919        ((v << 1) ^ (v >> 63)) as u64
1920    }
1921
1922    fn varint(mut x: u64) -> Vec<u8> {
1923        let mut out = Vec::new();
1924        while (x & !0x7f) != 0 {
1925            out.push(((x & 0x7f) as u8) | 0x80);
1926            x >>= 7;
1927        }
1928        out.push((x & 0x7f) as u8);
1929        out
1930    }
1931
1932    fn avro_long_bytes(v: i64) -> Vec<u8> {
1933        varint(zigzag_i64(v))
1934    }
1935
1936    fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
1937        let mut out = avro_long_bytes(payload.len() as i64);
1938        out.extend_from_slice(payload);
1939        out
1940    }
1941
1942    fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
1943        let m = months.to_le_bytes();
1944        let d = days.to_le_bytes();
1945        let ms = millis.to_le_bytes();
1946        [
1947            m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
1948        ]
1949    }
1950
1951    fn encode_all(
1952        array: &dyn Array,
1953        plan: &FieldPlan,
1954        nullability: Option<Nullability>,
1955    ) -> Vec<u8> {
1956        let mut enc = FieldEncoder::make_encoder(array, plan, nullability).unwrap();
1957        let mut out = Vec::new();
1958        for i in 0..array.len() {
1959            enc.encode(&mut out, i).unwrap();
1960        }
1961        out
1962    }
1963
1964    fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
1965        if actual != expected {
1966            let to_hex = |b: &[u8]| {
1967                b.iter()
1968                    .map(|x| format!("{:02X}", x))
1969                    .collect::<Vec<_>>()
1970                    .join(" ")
1971            };
1972            panic!(
1973                "mismatch\n  expected: [{}]\n    actual: [{}]",
1974                to_hex(expected),
1975                to_hex(actual)
1976            );
1977        }
1978    }
1979
1980    #[test]
1981    fn binary_encoder() {
1982        let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
1983        let arr = BinaryArray::from_vec(values);
1984        let mut expected = Vec::new();
1985        for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
1986            expected.extend(avro_len_prefixed_bytes(payload));
1987        }
1988        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1989        assert_bytes_eq(&got, &expected);
1990    }
1991
1992    #[test]
1993    fn large_binary_encoder() {
1994        let values: Vec<&[u8]> = vec![b"xyz", b""];
1995        let arr = LargeBinaryArray::from_vec(values);
1996        let mut expected = Vec::new();
1997        for payload in [b"xyz" as &[u8], b""] {
1998            expected.extend(avro_len_prefixed_bytes(payload));
1999        }
2000        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2001        assert_bytes_eq(&got, &expected);
2002    }
2003
2004    #[test]
2005    fn utf8_encoder() {
2006        let arr = StringArray::from(vec!["", "A", "BC"]);
2007        let mut expected = Vec::new();
2008        for s in ["", "A", "BC"] {
2009            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2010        }
2011        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2012        assert_bytes_eq(&got, &expected);
2013    }
2014
2015    #[test]
2016    fn large_utf8_encoder() {
2017        let arr = LargeStringArray::from(vec!["hello", ""]);
2018        let mut expected = Vec::new();
2019        for s in ["hello", ""] {
2020            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
2021        }
2022        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2023        assert_bytes_eq(&got, &expected);
2024    }
2025
2026    #[test]
2027    fn list_encoder_int32() {
2028        // Build ListArray [[1,2], [], [3]]
2029        let values = Int32Array::from(vec![1, 2, 3]);
2030        let offsets = vec![0, 2, 2, 3];
2031        let list = ListArray::new(
2032            Field::new("item", DataType::Int32, true).into(),
2033            arrow_buffer::OffsetBuffer::new(offsets.into()),
2034            Arc::new(values) as ArrayRef,
2035            None,
2036        );
2037        // Avro array encoding per row
2038        let mut expected = Vec::new();
2039        // row 0: block len 2, items 1,2 then 0
2040        expected.extend(avro_long_bytes(2));
2041        expected.extend(avro_long_bytes(1));
2042        expected.extend(avro_long_bytes(2));
2043        expected.extend(avro_long_bytes(0));
2044        // row 1: empty
2045        expected.extend(avro_long_bytes(0));
2046        // row 2: one item 3
2047        expected.extend(avro_long_bytes(1));
2048        expected.extend(avro_long_bytes(3));
2049        expected.extend(avro_long_bytes(0));
2050
2051        let plan = FieldPlan::List {
2052            items_nullability: None,
2053            item_plan: Box::new(FieldPlan::Scalar),
2054        };
2055        let got = encode_all(&list, &plan, None);
2056        assert_bytes_eq(&got, &expected);
2057    }
2058
2059    #[test]
2060    fn struct_encoder_two_fields() {
2061        // Struct { a: Int32, b: Utf8 }
2062        let a = Int32Array::from(vec![1, 2]);
2063        let b = StringArray::from(vec!["x", "y"]);
2064        let fields = Fields::from(vec![
2065            Field::new("a", DataType::Int32, true),
2066            Field::new("b", DataType::Utf8, true),
2067        ]);
2068        let struct_arr = StructArray::new(
2069            fields.clone(),
2070            vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2071            None,
2072        );
2073        let plan = FieldPlan::Struct {
2074            bindings: vec![
2075                FieldBinding {
2076                    arrow_index: 0,
2077                    nullability: None,
2078                    plan: FieldPlan::Scalar,
2079                },
2080                FieldBinding {
2081                    arrow_index: 1,
2082                    nullability: None,
2083                    plan: FieldPlan::Scalar,
2084                },
2085            ],
2086        };
2087        let got = encode_all(&struct_arr, &plan, None);
2088        // Expected: rows concatenated: a then b
2089        let mut expected = Vec::new();
2090        expected.extend(avro_long_bytes(1)); // a=1
2091        expected.extend(avro_len_prefixed_bytes(b"x")); // b="x"
2092        expected.extend(avro_long_bytes(2)); // a=2
2093        expected.extend(avro_len_prefixed_bytes(b"y")); // b="y"
2094        assert_bytes_eq(&got, &expected);
2095    }
2096
2097    #[test]
2098    fn enum_encoder_dictionary() {
2099        // symbols: ["A","B","C"], keys [2,0,1]
2100        let dict_values = StringArray::from(vec!["A", "B", "C"]);
2101        let keys = Int32Array::from(vec![2, 0, 1]);
2102        let dict =
2103            DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
2104        let symbols = Arc::<[String]>::from(
2105            vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
2106        );
2107        let plan = FieldPlan::Enum { symbols };
2108        let got = encode_all(&dict, &plan, None);
2109        let mut expected = Vec::new();
2110        expected.extend(avro_long_bytes(2));
2111        expected.extend(avro_long_bytes(0));
2112        expected.extend(avro_long_bytes(1));
2113        assert_bytes_eq(&got, &expected);
2114    }
2115
2116    #[test]
2117    fn decimal_bytes_and_fixed() {
2118        // Use Decimal128 with small positives and negatives
2119        let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
2120            .with_precision_and_scale(20, 0)
2121            .unwrap();
2122        // bytes(decimal): minimal two's complement length-prefixed
2123        let plan_bytes = FieldPlan::Decimal { size: None };
2124        let got_bytes = encode_all(&dec, &plan_bytes, None);
2125        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
2126        let mut expected_bytes = Vec::new();
2127        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2128        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2129        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2130        assert_bytes_eq(&got_bytes, &expected_bytes);
2131
2132        let plan_fixed = FieldPlan::Decimal { size: Some(16) };
2133        let got_fixed = encode_all(&dec, &plan_fixed, None);
2134        let mut expected_fixed = Vec::new();
2135        expected_fixed.extend_from_slice(&1i128.to_be_bytes());
2136        expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
2137        expected_fixed.extend_from_slice(&0i128.to_be_bytes());
2138        assert_bytes_eq(&got_fixed, &expected_fixed);
2139    }
2140
2141    #[test]
2142    fn decimal_bytes_256() {
2143        use arrow_buffer::i256;
2144        // Use Decimal256 with small positives and negatives
2145        let dec = Decimal256Array::from(vec![
2146            i256::from_i128(1),
2147            i256::from_i128(-1),
2148            i256::from_i128(0),
2149        ])
2150        .with_precision_and_scale(76, 0)
2151        .unwrap();
2152        // bytes(decimal): minimal two's complement length-prefixed
2153        let plan_bytes = FieldPlan::Decimal { size: None };
2154        let got_bytes = encode_all(&dec, &plan_bytes, None);
2155        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
2156        let mut expected_bytes = Vec::new();
2157        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2158        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2159        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2160        assert_bytes_eq(&got_bytes, &expected_bytes);
2161
2162        // fixed(32): 32-byte big-endian two's complement
2163        let plan_fixed = FieldPlan::Decimal { size: Some(32) };
2164        let got_fixed = encode_all(&dec, &plan_fixed, None);
2165        let mut expected_fixed = Vec::new();
2166        expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
2167        expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
2168        expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
2169        assert_bytes_eq(&got_fixed, &expected_fixed);
2170    }
2171
2172    #[cfg(feature = "small_decimals")]
2173    #[test]
2174    fn decimal_bytes_and_fixed_32() {
2175        // Use Decimal32 with small positives and negatives
2176        let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
2177            .with_precision_and_scale(9, 0)
2178            .unwrap();
2179        // bytes(decimal)
2180        let plan_bytes = FieldPlan::Decimal { size: None };
2181        let got_bytes = encode_all(&dec, &plan_bytes, None);
2182        let mut expected_bytes = Vec::new();
2183        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2184        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2185        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2186        assert_bytes_eq(&got_bytes, &expected_bytes);
2187        // fixed(4)
2188        let plan_fixed = FieldPlan::Decimal { size: Some(4) };
2189        let got_fixed = encode_all(&dec, &plan_fixed, None);
2190        let mut expected_fixed = Vec::new();
2191        expected_fixed.extend_from_slice(&1i32.to_be_bytes());
2192        expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
2193        expected_fixed.extend_from_slice(&0i32.to_be_bytes());
2194        assert_bytes_eq(&got_fixed, &expected_fixed);
2195    }
2196
2197    #[cfg(feature = "small_decimals")]
2198    #[test]
2199    fn decimal_bytes_and_fixed_64() {
2200        // Use Decimal64 with small positives and negatives
2201        let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
2202            .with_precision_and_scale(18, 0)
2203            .unwrap();
2204        // bytes(decimal)
2205        let plan_bytes = FieldPlan::Decimal { size: None };
2206        let got_bytes = encode_all(&dec, &plan_bytes, None);
2207        let mut expected_bytes = Vec::new();
2208        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
2209        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
2210        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
2211        assert_bytes_eq(&got_bytes, &expected_bytes);
2212        // fixed(8)
2213        let plan_fixed = FieldPlan::Decimal { size: Some(8) };
2214        let got_fixed = encode_all(&dec, &plan_fixed, None);
2215        let mut expected_fixed = Vec::new();
2216        expected_fixed.extend_from_slice(&1i64.to_be_bytes());
2217        expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
2218        expected_fixed.extend_from_slice(&0i64.to_be_bytes());
2219        assert_bytes_eq(&got_fixed, &expected_fixed);
2220    }
2221
2222    #[test]
2223    fn float32_and_float64_encoders() {
2224        let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); // includes a quiet NaN bit pattern
2225        let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
2226        // f32 expected
2227        let mut expected32 = Vec::new();
2228        for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
2229            expected32.extend_from_slice(&v.to_bits().to_le_bytes());
2230        }
2231        let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
2232        assert_bytes_eq(&got32, &expected32);
2233        // f64 expected
2234        let mut expected64 = Vec::new();
2235        for v in [0.0f64, -2.25f64] {
2236            expected64.extend_from_slice(&v.to_bits().to_le_bytes());
2237        }
2238        let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
2239        assert_bytes_eq(&got64, &expected64);
2240    }
2241
2242    #[test]
2243    fn long_encoder_int64() {
2244        let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
2245        let mut expected = Vec::new();
2246        for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
2247            expected.extend(avro_long_bytes(v));
2248        }
2249        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2250        assert_bytes_eq(&got, &expected);
2251    }
2252
2253    #[test]
2254    fn fixed_encoder_plain() {
2255        // Two values of width 4
2256        let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
2257        let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
2258        let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
2259        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2260        let mut expected = Vec::new();
2261        expected.extend_from_slice(&data[0]);
2262        expected.extend_from_slice(&data[1]);
2263        assert_bytes_eq(&got, &expected);
2264    }
2265
2266    #[test]
2267    fn uuid_encoder_test() {
2268        // Happy path
2269        let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
2270        let bytes = *u.as_bytes();
2271        let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
2272        // Expected: length 36 (0x48) followed by hyphenated lowercase text
2273        let mut expected = Vec::new();
2274        expected.push(0x48);
2275        expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
2276        let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
2277        assert_bytes_eq(&got, &expected);
2278    }
2279
2280    #[test]
2281    fn uuid_encoder_error() {
2282        // Invalid UUID bytes: wrong length
2283        let arr =
2284            FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
2285                .unwrap();
2286        let plan = FieldPlan::Uuid;
2287        let mut enc = FieldEncoder::make_encoder(&arr, &plan, None).unwrap();
2288        let mut out = Vec::new();
2289        let err = enc.encode(&mut out, 0).unwrap_err();
2290        match err {
2291            ArrowError::InvalidArgumentError(msg) => {
2292                assert!(msg.contains("Invalid UUID bytes"))
2293            }
2294            other => panic!("expected InvalidArgumentError, got {other:?}"),
2295        }
2296    }
2297
2298    fn test_scalar_primitive_encoding<T>(
2299        non_nullable_data: &[T::Native],
2300        nullable_data: &[Option<T::Native>],
2301    ) where
2302        T: ArrowPrimitiveType,
2303        T::Native: Into<i64> + Copy,
2304        PrimitiveArray<T>: From<Vec<<T as ArrowPrimitiveType>::Native>>,
2305    {
2306        let plan = FieldPlan::Scalar;
2307
2308        let array = PrimitiveArray::<T>::from(non_nullable_data.to_vec());
2309        let got = encode_all(&array, &plan, None);
2310
2311        let mut expected = Vec::new();
2312        for &value in non_nullable_data {
2313            expected.extend(avro_long_bytes(value.into()));
2314        }
2315        assert_bytes_eq(&got, &expected);
2316
2317        let array_nullable: PrimitiveArray<T> = nullable_data.iter().copied().collect();
2318        let got_nullable = encode_all(&array_nullable, &plan, Some(Nullability::NullFirst));
2319
2320        let mut expected_nullable = Vec::new();
2321        for &opt_value in nullable_data {
2322            match opt_value {
2323                Some(value) => {
2324                    // Union index 1 for the value, then the value itself
2325                    expected_nullable.extend(avro_long_bytes(1));
2326                    expected_nullable.extend(avro_long_bytes(value.into()));
2327                }
2328                None => {
2329                    // Union index 0 for the null
2330                    expected_nullable.extend(avro_long_bytes(0));
2331                }
2332            }
2333        }
2334        assert_bytes_eq(&got_nullable, &expected_nullable);
2335    }
2336
2337    #[test]
2338    fn date32_encoder() {
2339        test_scalar_primitive_encoding::<Date32Type>(
2340            &[
2341                19345, // 2022-12-20
2342                0,     // 1970-01-01 (epoch)
2343                -1,    // 1969-12-31 (pre-epoch)
2344            ],
2345            &[Some(19345), None],
2346        );
2347    }
2348
2349    #[test]
2350    fn time32_millis_encoder() {
2351        test_scalar_primitive_encoding::<Time32MillisecondType>(
2352            &[
2353                0,        // Midnight
2354                49530123, // 13:45:30.123
2355                86399999, // 23:59:59.999
2356            ],
2357            &[None, Some(49530123)],
2358        );
2359    }
2360
2361    #[test]
2362    fn time64_micros_encoder() {
2363        test_scalar_primitive_encoding::<Time64MicrosecondType>(
2364            &[
2365                0,           // Midnight
2366                86399999999, // 23:59:59.999999
2367            ],
2368            &[Some(86399999999), None],
2369        );
2370    }
2371
2372    #[test]
2373    fn timestamp_millis_encoder() {
2374        test_scalar_primitive_encoding::<TimestampMillisecondType>(
2375            &[
2376                1704067200000, // 2024-01-01T00:00:00Z
2377                0,             // 1970-01-01T00:00:00Z (epoch)
2378                -123456789,    // Pre-epoch timestamp
2379            ],
2380            &[None, Some(1704067200000)],
2381        );
2382    }
2383
2384    #[test]
2385    fn map_encoder_string_keys_int_values() {
2386        // Build MapArray with two rows
2387        // Row0: {"k1":1, "k2":2}
2388        // Row1: {}
2389        let keys = StringArray::from(vec!["k1", "k2"]);
2390        let values = Int32Array::from(vec![1, 2]);
2391        let entries_fields = Fields::from(vec![
2392            Field::new("key", DataType::Utf8, false),
2393            Field::new("value", DataType::Int32, true),
2394        ]);
2395        let entries = StructArray::new(
2396            entries_fields,
2397            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2398            None,
2399        );
2400        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
2401        let map = MapArray::new(
2402            Field::new("entries", entries.data_type().clone(), false).into(),
2403            offsets,
2404            entries,
2405            None,
2406            false,
2407        );
2408        let plan = FieldPlan::Map {
2409            values_nullability: None,
2410            value_plan: Box::new(FieldPlan::Scalar),
2411        };
2412        let got = encode_all(&map, &plan, None);
2413        let mut expected = Vec::new();
2414        // Row0: block 2 then pairs
2415        expected.extend(avro_long_bytes(2));
2416        expected.extend(avro_len_prefixed_bytes(b"k1"));
2417        expected.extend(avro_long_bytes(1));
2418        expected.extend(avro_len_prefixed_bytes(b"k2"));
2419        expected.extend(avro_long_bytes(2));
2420        expected.extend(avro_long_bytes(0));
2421        // Row1: empty
2422        expected.extend(avro_long_bytes(0));
2423        assert_bytes_eq(&got, &expected);
2424    }
2425
2426    #[test]
2427    fn union_encoder_string_int() {
2428        let strings = StringArray::from(vec!["hello", "world"]);
2429        let ints = Int32Array::from(vec![10, 20, 30]);
2430
2431        let union_fields = UnionFields::try_new(
2432            vec![0, 1],
2433            vec![
2434                Field::new("v_str", DataType::Utf8, true),
2435                Field::new("v_int", DataType::Int32, true),
2436            ],
2437        )
2438        .unwrap();
2439
2440        let type_ids = Buffer::from_slice_ref([0_i8, 1, 1, 0, 1]);
2441        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2442
2443        let union_array = UnionArray::try_new(
2444            union_fields,
2445            type_ids.into(),
2446            Some(offsets.into()),
2447            vec![Arc::new(strings), Arc::new(ints)],
2448        )
2449        .unwrap();
2450
2451        let plan = FieldPlan::Union {
2452            bindings: vec![
2453                FieldBinding {
2454                    arrow_index: 0,
2455                    nullability: None,
2456                    plan: FieldPlan::Scalar,
2457                },
2458                FieldBinding {
2459                    arrow_index: 1,
2460                    nullability: None,
2461                    plan: FieldPlan::Scalar,
2462                },
2463            ],
2464        };
2465
2466        let got = encode_all(&union_array, &plan, None);
2467
2468        let mut expected = Vec::new();
2469        expected.extend(avro_long_bytes(0));
2470        expected.extend(avro_len_prefixed_bytes(b"hello"));
2471        expected.extend(avro_long_bytes(1));
2472        expected.extend(avro_long_bytes(10));
2473        expected.extend(avro_long_bytes(1));
2474        expected.extend(avro_long_bytes(20));
2475        expected.extend(avro_long_bytes(0));
2476        expected.extend(avro_len_prefixed_bytes(b"world"));
2477        expected.extend(avro_long_bytes(1));
2478        expected.extend(avro_long_bytes(30));
2479
2480        assert_bytes_eq(&got, &expected);
2481    }
2482
2483    #[test]
2484    fn union_encoder_null_string_int() {
2485        let nulls = NullArray::new(1);
2486        let strings = StringArray::from(vec!["hello"]);
2487        let ints = Int32Array::from(vec![10]);
2488
2489        let union_fields = UnionFields::try_new(
2490            vec![0, 1, 2],
2491            vec![
2492                Field::new("v_null", DataType::Null, true),
2493                Field::new("v_str", DataType::Utf8, true),
2494                Field::new("v_int", DataType::Int32, true),
2495            ],
2496        )
2497        .unwrap();
2498
2499        let type_ids = Buffer::from_slice_ref([0_i8, 1, 2]);
2500        // For a null value in a dense union, no value is added to a child array.
2501        // The offset points to the last value of that type. Since there's only one
2502        // null, and one of each other type, all offsets are 0.
2503        let offsets = Buffer::from_slice_ref([0_i32, 0, 0]);
2504
2505        let union_array = UnionArray::try_new(
2506            union_fields,
2507            type_ids.into(),
2508            Some(offsets.into()),
2509            vec![Arc::new(nulls), Arc::new(strings), Arc::new(ints)],
2510        )
2511        .unwrap();
2512
2513        let plan = FieldPlan::Union {
2514            bindings: vec![
2515                FieldBinding {
2516                    arrow_index: 0,
2517                    nullability: None,
2518                    plan: FieldPlan::Scalar,
2519                },
2520                FieldBinding {
2521                    arrow_index: 1,
2522                    nullability: None,
2523                    plan: FieldPlan::Scalar,
2524                },
2525                FieldBinding {
2526                    arrow_index: 2,
2527                    nullability: None,
2528                    plan: FieldPlan::Scalar,
2529                },
2530            ],
2531        };
2532
2533        let got = encode_all(&union_array, &plan, None);
2534
2535        let mut expected = Vec::new();
2536        expected.extend(avro_long_bytes(0));
2537        expected.extend(avro_long_bytes(1));
2538        expected.extend(avro_len_prefixed_bytes(b"hello"));
2539        expected.extend(avro_long_bytes(2));
2540        expected.extend(avro_long_bytes(10));
2541
2542        assert_bytes_eq(&got, &expected);
2543    }
2544
2545    #[test]
2546    fn list64_encoder_int32() {
2547        // LargeList [[1,2,3], []]
2548        let values = Int32Array::from(vec![1, 2, 3]);
2549        let offsets: Vec<i64> = vec![0, 3, 3];
2550        let list = LargeListArray::new(
2551            Field::new("item", DataType::Int32, true).into(),
2552            arrow_buffer::OffsetBuffer::new(offsets.into()),
2553            Arc::new(values) as ArrayRef,
2554            None,
2555        );
2556        let plan = FieldPlan::List {
2557            items_nullability: None,
2558            item_plan: Box::new(FieldPlan::Scalar),
2559        };
2560        let got = encode_all(&list, &plan, None);
2561        // Expected one block of 3 and then 0, then empty 0
2562        let mut expected = Vec::new();
2563        expected.extend(avro_long_bytes(3));
2564        expected.extend(avro_long_bytes(1));
2565        expected.extend(avro_long_bytes(2));
2566        expected.extend(avro_long_bytes(3));
2567        expected.extend(avro_long_bytes(0));
2568        expected.extend(avro_long_bytes(0));
2569        assert_bytes_eq(&got, &expected);
2570    }
2571
2572    #[test]
2573    fn int_encoder_test() {
2574        let ints = Int32Array::from(vec![0, -1, 2]);
2575        let mut expected_i = Vec::new();
2576        for v in [0i32, -1, 2] {
2577            expected_i.extend(avro_long_bytes(v as i64));
2578        }
2579        let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
2580        assert_bytes_eq(&got_i, &expected_i);
2581    }
2582
2583    #[test]
2584    fn boolean_encoder_test() {
2585        let bools = BooleanArray::from(vec![true, false]);
2586        let mut expected_b = Vec::new();
2587        expected_b.extend_from_slice(&[1]);
2588        expected_b.extend_from_slice(&[0]);
2589        let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
2590        assert_bytes_eq(&got_b, &expected_b);
2591    }
2592
2593    #[test]
2594    #[cfg(feature = "avro_custom_types")]
2595    fn duration_encoding_seconds() {
2596        let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
2597        let mut expected = Vec::new();
2598        for v in [0i64, -1, 2] {
2599            expected.extend_from_slice(&avro_long_bytes(v));
2600        }
2601        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2602        assert_bytes_eq(&got, &expected);
2603    }
2604
2605    #[test]
2606    #[cfg(feature = "avro_custom_types")]
2607    fn duration_encoding_milliseconds() {
2608        let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
2609        let mut expected = Vec::new();
2610        for v in [1i64, 0, -2] {
2611            expected.extend_from_slice(&avro_long_bytes(v));
2612        }
2613        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2614        assert_bytes_eq(&got, &expected);
2615    }
2616
2617    #[test]
2618    #[cfg(feature = "avro_custom_types")]
2619    fn duration_encoding_microseconds() {
2620        let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
2621        let mut expected = Vec::new();
2622        for v in [5i64, -6, 7] {
2623            expected.extend_from_slice(&avro_long_bytes(v));
2624        }
2625        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2626        assert_bytes_eq(&got, &expected);
2627    }
2628
2629    #[test]
2630    #[cfg(feature = "avro_custom_types")]
2631    fn duration_encoding_nanoseconds() {
2632        let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
2633        let mut expected = Vec::new();
2634        for v in [8i64, 9, -10] {
2635            expected.extend_from_slice(&avro_long_bytes(v));
2636        }
2637        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2638        assert_bytes_eq(&got, &expected);
2639    }
2640
2641    #[test]
2642    fn duration_encoder_year_month_happy_path() {
2643        let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
2644        let mut expected = Vec::new();
2645        for m in [0u32, 1u32, 25u32] {
2646            expected.extend_from_slice(&duration_fixed12(m, 0, 0));
2647        }
2648        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2649        assert_bytes_eq(&got, &expected);
2650    }
2651
2652    #[test]
2653    fn duration_encoder_year_month_rejects_negative() {
2654        let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
2655        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2656        let mut out = Vec::new();
2657        let err = enc.encode(&mut out, 0).unwrap_err();
2658        match err {
2659            ArrowError::InvalidArgumentError(msg) => {
2660                assert!(msg.contains("cannot encode negative months"))
2661            }
2662            other => panic!("expected InvalidArgumentError, got {other:?}"),
2663        }
2664    }
2665
2666    #[test]
2667    fn duration_encoder_day_time_happy_path() {
2668        let v0 = IntervalDayTimeType::make_value(2, 500); // days=2, millis=500
2669        let v1 = IntervalDayTimeType::make_value(0, 0);
2670        let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
2671        let mut expected = Vec::new();
2672        expected.extend_from_slice(&duration_fixed12(0, 2, 500));
2673        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2674        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2675        assert_bytes_eq(&got, &expected);
2676    }
2677
2678    #[test]
2679    fn duration_encoder_day_time_rejects_negative() {
2680        let bad = IntervalDayTimeType::make_value(-1, 0);
2681        let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
2682        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2683        let mut out = Vec::new();
2684        let err = enc.encode(&mut out, 0).unwrap_err();
2685        match err {
2686            ArrowError::InvalidArgumentError(msg) => {
2687                assert!(msg.contains("cannot encode negative days"))
2688            }
2689            other => panic!("expected InvalidArgumentError, got {other:?}"),
2690        }
2691    }
2692
2693    #[test]
2694    fn duration_encoder_month_day_nano_happy_path() {
2695        let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); // -> millis = 3
2696        let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
2697        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
2698        let mut expected = Vec::new();
2699        expected.extend_from_slice(&duration_fixed12(1, 2, 3));
2700        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2701        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2702        assert_bytes_eq(&got, &expected);
2703    }
2704
2705    #[test]
2706    fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
2707        let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
2708        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
2709        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2710        let mut out = Vec::new();
2711        let err = enc.encode(&mut out, 0).unwrap_err();
2712        match err {
2713            ArrowError::InvalidArgumentError(msg) => {
2714                assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
2715            }
2716            other => panic!("expected InvalidArgumentError, got {other:?}"),
2717        }
2718    }
2719
2720    #[test]
2721    fn minimal_twos_complement_test() {
2722        let pos = [0x00, 0x00, 0x01];
2723        assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
2724        let neg = [0xFF, 0xFF, 0x80]; // negative minimal is 0x80
2725        assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
2726        let zero = [0x00, 0x00, 0x00];
2727        assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
2728    }
2729
2730    #[test]
2731    fn write_sign_extend_test() {
2732        let mut out = Vec::new();
2733        write_sign_extended(&mut out, &[0x01], 4).unwrap();
2734        assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
2735        out.clear();
2736        write_sign_extended(&mut out, &[0xFF], 4).unwrap();
2737        assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
2738        out.clear();
2739        // truncation success (sign bytes only removed)
2740        write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
2741        assert_eq!(out, vec![0xFF, 0x80]);
2742        out.clear();
2743        // truncation overflow
2744        let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
2745        match err {
2746            ArrowError::InvalidArgumentError(_) => {}
2747            _ => panic!("expected InvalidArgumentError"),
2748        }
2749    }
2750
2751    #[test]
2752    fn duration_month_day_nano_overflow_millis() {
2753        // nanos leading to millis > u32::MAX
2754        let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
2755        let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
2756        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
2757        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2758        let mut out = Vec::new();
2759        let err = enc.encode(&mut out, 0).unwrap_err();
2760        match err {
2761            ArrowError::InvalidArgumentError(msg) => assert!(msg.contains("exceed u32::MAX")),
2762            _ => panic!("expected InvalidArgumentError"),
2763        }
2764    }
2765
2766    #[test]
2767    fn fieldplan_decimal_precision_scale_mismatch_errors() {
2768        // Avro expects (10,2), Arrow has (12,2)
2769        use crate::codec::Codec;
2770        use std::collections::HashMap;
2771        let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
2772        let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2773        let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
2774        match err {
2775            ArrowError::SchemaError(msg) => {
2776                assert!(msg.contains("Decimal precision/scale mismatch"))
2777            }
2778            _ => panic!("expected SchemaError"),
2779        }
2780    }
2781
2782    #[test]
2783    fn timestamp_micros_encoder() {
2784        // Mirrors the style used by `timestamp_millis_encoder`
2785        test_scalar_primitive_encoding::<TimestampMicrosecondType>(
2786            &[
2787                1_704_067_200_000_000, // 2024-01-01T00:00:00Z in micros
2788                0,                     // epoch
2789                -123_456_789,          // pre-epoch
2790            ],
2791            &[None, Some(1_704_067_200_000_000)],
2792        );
2793    }
2794
2795    #[test]
2796    fn list_encoder_nullable_items_null_first() {
2797        // One List row with three elements: [Some(1), None, Some(2)]
2798        let values = Int32Array::from(vec![Some(1), None, Some(2)]);
2799        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
2800        let list = ListArray::new(
2801            Field::new("item", DataType::Int32, true).into(),
2802            offsets,
2803            Arc::new(values) as ArrayRef,
2804            None,
2805        );
2806
2807        let plan = FieldPlan::List {
2808            items_nullability: Some(Nullability::NullFirst),
2809            item_plan: Box::new(FieldPlan::Scalar),
2810        };
2811
2812        // Avro array encoding per row: one positive block, then 0 terminator.
2813        // For NullFirst: Some(v) => branch 1 (0x02) then the value; None => branch 0 (0x00)
2814        let mut expected = Vec::new();
2815        expected.extend(avro_long_bytes(3)); // block of 3
2816        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
2817        expected.extend(avro_long_bytes(1)); // value 1
2818        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
2819        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
2820        expected.extend(avro_long_bytes(2)); // value 2
2821        expected.extend(avro_long_bytes(0)); // block terminator
2822
2823        let got = encode_all(&list, &plan, None);
2824        assert_bytes_eq(&got, &expected);
2825    }
2826
2827    #[test]
2828    fn large_list_encoder_nullable_items_null_first() {
2829        // LargeList single row: [Some(10), None]
2830        let values = Int32Array::from(vec![Some(10), None]);
2831        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
2832        let list = LargeListArray::new(
2833            Field::new("item", DataType::Int32, true).into(),
2834            offsets,
2835            Arc::new(values) as ArrayRef,
2836            None,
2837        );
2838
2839        let plan = FieldPlan::List {
2840            items_nullability: Some(Nullability::NullFirst),
2841            item_plan: Box::new(FieldPlan::Scalar),
2842        };
2843
2844        let mut expected = Vec::new();
2845        expected.extend(avro_long_bytes(2)); // block of 2
2846        expected.extend(avro_long_bytes(1)); // union branch=1 (value)
2847        expected.extend(avro_long_bytes(10)); // value 10
2848        expected.extend(avro_long_bytes(0)); // union branch=0 (null)
2849        expected.extend(avro_long_bytes(0)); // block terminator
2850
2851        let got = encode_all(&list, &plan, None);
2852        assert_bytes_eq(&got, &expected);
2853    }
2854
2855    #[test]
2856    fn map_encoder_string_keys_nullable_int_values_null_first() {
2857        // One map row: {"k1": Some(7), "k2": None}
2858        let keys = StringArray::from(vec!["k1", "k2"]);
2859        let values = Int32Array::from(vec![Some(7), None]);
2860
2861        let entries_fields = Fields::from(vec![
2862            Field::new("key", DataType::Utf8, false),
2863            Field::new("value", DataType::Int32, true),
2864        ]);
2865        let entries = StructArray::new(
2866            entries_fields,
2867            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
2868            None,
2869        );
2870
2871        // Single row -> offsets [0, 2]
2872        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
2873        let map = MapArray::new(
2874            Field::new("entries", entries.data_type().clone(), false).into(),
2875            offsets,
2876            entries,
2877            None,
2878            false,
2879        );
2880
2881        let plan = FieldPlan::Map {
2882            values_nullability: Some(Nullability::NullFirst),
2883            value_plan: Box::new(FieldPlan::Scalar),
2884        };
2885
2886        // Expected:
2887        // - one positive block (len=2)
2888        // - "k1", branch=1 + value=7
2889        // - "k2", branch=0 (null)
2890        // - end-of-block marker 0
2891        let mut expected = Vec::new();
2892        expected.extend(avro_long_bytes(2)); // block length 2
2893        expected.extend(avro_len_prefixed_bytes(b"k1")); // key "k1"
2894        expected.extend(avro_long_bytes(1)); // union branch 1 (value)
2895        expected.extend(avro_long_bytes(7)); // value 7
2896        expected.extend(avro_len_prefixed_bytes(b"k2")); // key "k2"
2897        expected.extend(avro_long_bytes(0)); // union branch 0 (null)
2898        expected.extend(avro_long_bytes(0)); // block terminator
2899
2900        let got = encode_all(&map, &plan, None);
2901        assert_bytes_eq(&got, &expected);
2902    }
2903
2904    #[test]
2905    fn time32_seconds_to_millis_encoder() {
2906        // Time32(Second) must encode as Avro time-millis (ms since midnight).
2907        let arr: arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
2908            vec![0i32, 1, -2, 12_345].into();
2909        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2910        let mut expected = Vec::new();
2911        for secs in [0i32, 1, -2, 12_345] {
2912            let millis = (secs as i64) * 1000;
2913            expected.extend_from_slice(&avro_long_bytes(millis));
2914        }
2915        assert_bytes_eq(&got, &expected);
2916    }
2917
2918    #[test]
2919    fn time32_seconds_to_millis_overflow() {
2920        // Choose a value that will overflow i32 when multiplied by 1000.
2921        let overflow_secs: i32 = i32::MAX / 1000 + 1;
2922        let arr: PrimitiveArray<Time32SecondType> = vec![overflow_secs].into();
2923        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2924        let mut out = Vec::new();
2925        let err = enc.encode(&mut out, 0).unwrap_err();
2926        match err {
2927            arrow_schema::ArrowError::InvalidArgumentError(msg) => {
2928                assert!(
2929                    msg.contains("overflowed") || msg.contains("overflow"),
2930                    "unexpected message: {msg}"
2931                )
2932            }
2933            other => panic!("expected InvalidArgumentError, got {other:?}"),
2934        }
2935    }
2936
2937    #[test]
2938    fn timestamp_seconds_to_millis_encoder() {
2939        // Timestamp(Second) must encode as Avro timestamp-millis (ms since epoch).
2940        let arr: PrimitiveArray<TimestampSecondType> = vec![0i64, 1, -1, 1_234_567_890].into();
2941        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2942        let mut expected = Vec::new();
2943        for secs in [0i64, 1, -1, 1_234_567_890] {
2944            let millis = secs * 1000;
2945            expected.extend_from_slice(&avro_long_bytes(millis));
2946        }
2947        assert_bytes_eq(&got, &expected);
2948    }
2949
2950    #[test]
2951    fn timestamp_seconds_to_millis_overflow() {
2952        // Overflow i64 when multiplied by 1000.
2953        let overflow_secs: i64 = i64::MAX / 1000 + 1;
2954        let arr: PrimitiveArray<TimestampSecondType> = vec![overflow_secs].into();
2955        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, None).unwrap();
2956        let mut out = Vec::new();
2957        let err = enc.encode(&mut out, 0).unwrap_err();
2958        match err {
2959            arrow_schema::ArrowError::InvalidArgumentError(msg) => {
2960                assert!(
2961                    msg.contains("overflowed") || msg.contains("overflow"),
2962                    "unexpected message: {msg}"
2963                )
2964            }
2965            other => panic!("expected InvalidArgumentError, got {other:?}"),
2966        }
2967    }
2968
2969    #[test]
2970    fn timestamp_nanos_encoder() {
2971        let arr: PrimitiveArray<TimestampNanosecondType> = vec![0i64, 1, -1, 123].into();
2972        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2973        let mut expected = Vec::new();
2974        for ns in [0i64, 1, -1, 123] {
2975            expected.extend_from_slice(&avro_long_bytes(ns));
2976        }
2977        assert_bytes_eq(&got, &expected);
2978    }
2979
2980    #[test]
2981    fn union_encoder_string_int_nonzero_type_ids() {
2982        let strings = StringArray::from(vec!["hello", "world"]);
2983        let ints = Int32Array::from(vec![10, 20, 30]);
2984        let union_fields = UnionFields::try_new(
2985            vec![2, 5],
2986            vec![
2987                Field::new("v_str", DataType::Utf8, true),
2988                Field::new("v_int", DataType::Int32, true),
2989            ],
2990        )
2991        .unwrap();
2992        let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
2993        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
2994        let union_array = UnionArray::try_new(
2995            union_fields,
2996            type_ids.into(),
2997            Some(offsets.into()),
2998            vec![Arc::new(strings), Arc::new(ints)],
2999        )
3000        .unwrap();
3001        let plan = FieldPlan::Union {
3002            bindings: vec![
3003                FieldBinding {
3004                    arrow_index: 0,
3005                    nullability: None,
3006                    plan: FieldPlan::Scalar,
3007                },
3008                FieldBinding {
3009                    arrow_index: 1,
3010                    nullability: None,
3011                    plan: FieldPlan::Scalar,
3012                },
3013            ],
3014        };
3015        let got = encode_all(&union_array, &plan, None);
3016        let mut expected = Vec::new();
3017        expected.extend(avro_long_bytes(0));
3018        expected.extend(avro_len_prefixed_bytes(b"hello"));
3019        expected.extend(avro_long_bytes(1));
3020        expected.extend(avro_long_bytes(10));
3021        expected.extend(avro_long_bytes(1));
3022        expected.extend(avro_long_bytes(20));
3023        expected.extend(avro_long_bytes(0));
3024        expected.extend(avro_len_prefixed_bytes(b"world"));
3025        expected.extend(avro_long_bytes(1));
3026        expected.extend(avro_long_bytes(30));
3027        assert_bytes_eq(&got, &expected);
3028    }
3029
3030    #[test]
3031    fn nullable_state_with_null_buffer_and_zero_nulls() {
3032        let values = vec![1i32, 2, 3];
3033        let arr = Int32Array::from_iter_values_with_nulls(values, Some(NullBuffer::new_valid(3)));
3034        assert_eq!(arr.null_count(), 0);
3035        assert!(arr.nulls().is_some());
3036        let plan = FieldPlan::Scalar;
3037        let enc = FieldEncoder::make_encoder(&arr, &plan, Some(Nullability::NullFirst)).unwrap();
3038        match enc.null_state {
3039            NullState::NullableNoNulls { union_value_byte } => {
3040                assert_eq!(
3041                    union_value_byte,
3042                    union_value_branch_byte(Nullability::NullFirst, false)
3043                );
3044            }
3045            other => panic!("expected NullableNoNulls, got {other:?}"),
3046        }
3047    }
3048}