arrow_avro/writer/
encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Encoder for Arrow types.
19
20use crate::codec::{AvroDataType, AvroField, Codec};
21use crate::schema::{Fingerprint, Nullability, Prefix};
22use arrow_array::cast::AsArray;
23use arrow_array::types::{
24    ArrowPrimitiveType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType,
25    DurationSecondType, Float32Type, Float64Type, Int32Type, Int64Type, IntervalDayTimeType,
26    IntervalMonthDayNanoType, IntervalYearMonthType, TimestampMicrosecondType,
27};
28use arrow_array::{
29    Array, Decimal128Array, Decimal256Array, DictionaryArray, FixedSizeBinaryArray,
30    GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray, ListArray, MapArray,
31    OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray,
32};
33#[cfg(feature = "small_decimals")]
34use arrow_array::{Decimal32Array, Decimal64Array};
35use arrow_buffer::NullBuffer;
36use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit};
37use serde::Serialize;
38use std::io::Write;
39use std::sync::Arc;
40use uuid::Uuid;
41
42/// Encode a single Avro-`long` using ZigZag + variable length, buffered.
43///
44/// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding>
45#[inline]
46pub fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) -> Result<(), ArrowError> {
47    let mut zz = ((value << 1) ^ (value >> 63)) as u64;
48    // At most 10 bytes for 64-bit varint
49    let mut buf = [0u8; 10];
50    let mut i = 0;
51    while (zz & !0x7F) != 0 {
52        buf[i] = ((zz & 0x7F) as u8) | 0x80;
53        i += 1;
54        zz >>= 7;
55    }
56    buf[i] = (zz & 0x7F) as u8;
57    i += 1;
58    out.write_all(&buf[..i])
59        .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e))
60}
61
62#[inline]
63fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(), ArrowError> {
64    write_long(out, value as i64)
65}
66
67#[inline]
68fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
69    write_long(out, bytes.len() as i64)?;
70    out.write_all(bytes)
71        .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
72}
73
74#[inline]
75fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(), ArrowError> {
76    out.write_all(&[if v { 1 } else { 0 }])
77        .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
78}
79
80/// Minimal two's-complement big-endian representation helper for Avro decimal (bytes).
81///
82/// For positive numbers, trim leading 0x00 until an essential byte is reached.
83/// For negative numbers, trim leading 0xFF until an essential byte is reached.
84/// The resulting slice still encodes the same signed value.
85///
86/// See Avro spec: decimal over `bytes` uses two's-complement big-endian
87/// representation of the unscaled integer value. 1.11.1 specification.
88#[inline]
89fn minimal_twos_complement(be: &[u8]) -> &[u8] {
90    if be.is_empty() {
91        return be;
92    }
93    let sign_byte = if (be[0] & 0x80) != 0 { 0xFF } else { 0x00 };
94    let mut k = 0usize;
95    while k < be.len() && be[k] == sign_byte {
96        k += 1;
97    }
98    if k == 0 {
99        return be;
100    }
101    if k == be.len() {
102        return &be[be.len() - 1..];
103    }
104    let drop = if ((be[k] ^ sign_byte) & 0x80) == 0 {
105        k
106    } else {
107        k - 1
108    };
109    &be[drop..]
110}
111
112/// Sign-extend (or validate/truncate) big-endian integer bytes to exactly `n` bytes.
113///
114///
115/// - If shorter than `n`, the slice is sign-extended by left-padding with the
116///   sign byte (`0x00` for positive, `0xFF` for negative).
117/// - If longer than `n`, the slice is truncated from the left. An overflow error
118///   is returned if any of the truncated bytes are not redundant sign bytes,
119///   or if the resulting value's sign bit would differ from the original.
120/// - If the slice is already `n` bytes long, it is copied.
121///
122/// Used for encoding Avro decimal values into `fixed(N)` fields.
123#[inline]
124fn write_sign_extended<W: Write + ?Sized>(
125    out: &mut W,
126    src_be: &[u8],
127    n: usize,
128) -> Result<(), ArrowError> {
129    let len = src_be.len();
130    if len == n {
131        return out
132            .write_all(src_be)
133            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
134    }
135    let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
136        0xFF
137    } else {
138        0x00
139    };
140    if len > n {
141        let extra = len - n;
142        if n == 0 && src_be.iter().all(|&b| b == sign_byte) {
143            return Ok(());
144        }
145        // All truncated bytes must equal the sign byte, and the MSB of the first
146        // retained byte must match the sign (otherwise overflow).
147        if src_be[..extra].iter().any(|&b| b != sign_byte)
148            || ((src_be[extra] ^ sign_byte) & 0x80) != 0
149        {
150            return Err(ArrowError::InvalidArgumentError(format!(
151                "Decimal value with {len} bytes cannot be represented in {n} bytes without overflow",
152            )));
153        }
154        return out
155            .write_all(&src_be[extra..])
156            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e));
157    }
158    // len < n: prepend sign bytes (sign extension) then the payload
159    let pad_len = n - len;
160    // Fixed-size stack pads to avoid heap allocation on the hot path
161    const ZPAD: [u8; 64] = [0x00; 64];
162    const FPAD: [u8; 64] = [0xFF; 64];
163    let pad = if sign_byte == 0x00 {
164        &ZPAD[..]
165    } else {
166        &FPAD[..]
167    };
168    // Emit padding in 64‑byte chunks (minimizes write calls without allocating),
169    // then write the original bytes.
170    let mut rem = pad_len;
171    while rem >= pad.len() {
172        out.write_all(pad)
173            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
174        rem -= pad.len();
175    }
176    if rem > 0 {
177        out.write_all(&pad[..rem])
178            .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))?;
179    }
180    out.write_all(src_be)
181        .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e))
182}
183
184/// Write the union branch index for an optional field.
185///
186/// Branch index is 0-based per Avro unions:
187/// - Null-first (default): null => 0, value => 1
188/// - Null-second (Impala): value => 0, null => 1
189fn write_optional_index<W: Write + ?Sized>(
190    out: &mut W,
191    is_null: bool,
192    null_order: Nullability,
193) -> Result<(), ArrowError> {
194    let byte = union_value_branch_byte(null_order, is_null);
195    out.write_all(&[byte])
196        .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), e))
197}
198
199#[derive(Debug, Clone)]
200enum NullState {
201    NonNullable,
202    NullableNoNulls {
203        union_value_byte: u8,
204    },
205    Nullable {
206        nulls: NullBuffer,
207        null_order: Nullability,
208    },
209}
210
211/// Arrow to Avro FieldEncoder:
212/// - Holds the inner `Encoder` (by value)
213/// - Carries the per-site nullability **state** as a single enum that enforces invariants
214pub struct FieldEncoder<'a> {
215    encoder: Encoder<'a>,
216    null_state: NullState,
217}
218
219impl<'a> FieldEncoder<'a> {
220    fn make_encoder(
221        array: &'a dyn Array,
222        field: &Field,
223        plan: &FieldPlan,
224        nullability: Option<Nullability>,
225    ) -> Result<Self, ArrowError> {
226        let encoder = match plan {
227            FieldPlan::Scalar => match array.data_type() {
228                DataType::Boolean => Encoder::Boolean(BooleanEncoder(array.as_boolean())),
229                DataType::Utf8 => {
230                    Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
231                }
232                DataType::LargeUtf8 => {
233                    Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
234                }
235                DataType::Int32 => Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
236                DataType::Int64 => Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
237                DataType::Float32 => {
238                    Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
239                }
240                DataType::Float64 => {
241                    Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
242                }
243                DataType::Binary => Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
244                DataType::LargeBinary => {
245                    Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
246                }
247                DataType::FixedSizeBinary(len) => {
248                    let arr = array
249                        .as_any()
250                        .downcast_ref::<FixedSizeBinaryArray>()
251                        .ok_or_else(|| {
252                            ArrowError::SchemaError("Expected FixedSizeBinaryArray".into())
253                        })?;
254                    Encoder::Fixed(FixedEncoder(arr))
255                }
256                DataType::Timestamp(TimeUnit::Microsecond, _) => Encoder::Timestamp(LongEncoder(
257                    array.as_primitive::<TimestampMicrosecondType>(),
258                )),
259                DataType::Interval(unit) => match unit {
260                    IntervalUnit::MonthDayNano => {
261                        Encoder::IntervalMonthDayNano(DurationEncoder(
262                            array.as_primitive::<IntervalMonthDayNanoType>(),
263                        ))
264                    }
265                    IntervalUnit::YearMonth => {
266                        Encoder::IntervalYearMonth(DurationEncoder(
267                            array.as_primitive::<IntervalYearMonthType>(),
268                        ))
269                    }
270                    IntervalUnit::DayTime => Encoder::IntervalDayTime(DurationEncoder(
271                        array.as_primitive::<IntervalDayTimeType>(),
272                    )),
273                }
274                DataType::Duration(tu) => {
275                    match tu {
276                        TimeUnit::Second => Encoder::DurationSeconds(LongEncoder(
277                            array.as_primitive::<DurationSecondType>(),
278                        )),
279                        TimeUnit::Millisecond => Encoder::DurationMillis(LongEncoder(
280                            array.as_primitive::<DurationMillisecondType>(),
281                        )),
282                        TimeUnit::Microsecond => Encoder::DurationMicros(LongEncoder(
283                            array.as_primitive::<DurationMicrosecondType>(),
284                        )),
285                        TimeUnit::Nanosecond => Encoder::DurationNanos(LongEncoder(
286                            array.as_primitive::<DurationNanosecondType>(),
287                        )),
288                    }
289                }
290                other => {
291                    return Err(ArrowError::NotYetImplemented(format!(
292                        "Avro scalar type not yet supported: {other:?}"
293                    )));
294                }
295            },
296            FieldPlan::Struct { encoders } => {
297                let arr = array
298                    .as_any()
299                    .downcast_ref::<StructArray>()
300                    .ok_or_else(|| ArrowError::SchemaError("Expected StructArray".into()))?;
301                Encoder::Struct(Box::new(StructEncoder::try_new(arr, encoders)?))
302            }
303            FieldPlan::List {
304                items_nullability,
305                item_plan,
306            } => match array.data_type() {
307                DataType::List(_) => {
308                    let arr = array
309                        .as_any()
310                        .downcast_ref::<ListArray>()
311                        .ok_or_else(|| ArrowError::SchemaError("Expected ListArray".into()))?;
312                    Encoder::List(Box::new(ListEncoder32::try_new(
313                        arr,
314                        *items_nullability,
315                        item_plan.as_ref(),
316                    )?))
317                }
318                DataType::LargeList(_) => {
319                    let arr = array
320                        .as_any()
321                        .downcast_ref::<LargeListArray>()
322                        .ok_or_else(|| ArrowError::SchemaError("Expected LargeListArray".into()))?;
323                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
324                        arr,
325                        *items_nullability,
326                        item_plan.as_ref(),
327                    )?))
328                }
329                other => {
330                    return Err(ArrowError::SchemaError(format!(
331                        "Avro array site requires Arrow List/LargeList, found: {other:?}"
332                    )))
333                }
334            },
335            FieldPlan::Decimal {size} => match array.data_type() {
336                #[cfg(feature = "small_decimals")]
337                DataType::Decimal32(_,_) => {
338                    let arr = array
339                        .as_any()
340                        .downcast_ref::<Decimal32Array>()
341                        .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?;
342                    Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size))
343                }
344                #[cfg(feature = "small_decimals")]
345                DataType::Decimal64(_,_) => {
346                    let arr = array
347                        .as_any()
348                        .downcast_ref::<Decimal64Array>()
349                        .ok_or_else(|| ArrowError::SchemaError("Expected Decimal64Array".into()))?;
350                    Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size))
351                }
352                DataType::Decimal128(_,_) => {
353                    let arr = array
354                        .as_any()
355                        .downcast_ref::<Decimal128Array>()
356                        .ok_or_else(|| ArrowError::SchemaError("Expected Decimal128Array".into()))?;
357                    Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size))
358                }
359                DataType::Decimal256(_,_) => {
360                    let arr = array
361                        .as_any()
362                        .downcast_ref::<Decimal256Array>()
363                        .ok_or_else(|| ArrowError::SchemaError("Expected Decimal256Array".into()))?;
364                    Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size))
365                }
366                other => {
367                    return Err(ArrowError::SchemaError(format!(
368                        "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}"
369                    )))
370                }
371            },
372            FieldPlan::Uuid => {
373                let arr = array
374                    .as_any()
375                    .downcast_ref::<FixedSizeBinaryArray>()
376                    .ok_or_else(|| ArrowError::SchemaError("Expected FixedSizeBinaryArray".into()))?;
377                Encoder::Uuid(UuidEncoder(arr))
378            }
379            FieldPlan::Map { values_nullability,
380                value_plan } => {
381                let arr = array
382                    .as_any()
383                    .downcast_ref::<MapArray>()
384                    .ok_or_else(|| ArrowError::SchemaError("Expected MapArray".into()))?;
385                Encoder::Map(Box::new(MapEncoder::try_new(arr, *values_nullability, value_plan.as_ref())?))
386            }
387            FieldPlan::Enum { symbols} => match array.data_type() {
388                DataType::Dictionary(key_dt, value_dt) => {
389                    if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 {
390                        return Err(ArrowError::SchemaError(
391                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
392                        ));
393                    }
394                    let dict = array
395                        .as_any()
396                        .downcast_ref::<DictionaryArray<Int32Type>>()
397                        .ok_or_else(|| {
398                            ArrowError::SchemaError("Expected DictionaryArray<Int32>".into())
399                        })?;
400
401                    let values = dict
402                        .values()
403                        .as_any()
404                        .downcast_ref::<StringArray>()
405                        .ok_or_else(|| {
406                            ArrowError::SchemaError("Dictionary values must be Utf8".into())
407                        })?;
408                    if values.len() != symbols.len() {
409                        return Err(ArrowError::SchemaError(format!(
410                            "Enum symbol length {} != dictionary size {}",
411                            symbols.len(),
412                            values.len()
413                        )));
414                    }
415                    for i in 0..values.len() {
416                        if values.value(i) != symbols[i].as_str() {
417                            return Err(ArrowError::SchemaError(format!(
418                                "Enum symbol mismatch at {i}: schema='{}' dict='{}'",
419                                symbols[i],
420                                values.value(i)
421                            )));
422                        }
423                    }
424                    let keys = dict.keys();
425                    Encoder::Enum(EnumEncoder { keys })
426                }
427                other => {
428                    return Err(ArrowError::SchemaError(format!(
429                        "Avro enum site requires DataType::Dictionary, found: {other:?}"
430                    )))
431                }
432            }
433            other => {
434                return Err(ArrowError::NotYetImplemented(format!(
435                    "Avro writer: {other:?} not yet supported",
436                )));
437            }
438        };
439        // Compute the effective null state from writer-declared nullability and data nulls.
440        let null_state = match (nullability, array.null_count() > 0) {
441            (None, false) => NullState::NonNullable,
442            (None, true) => {
443                return Err(ArrowError::InvalidArgumentError(format!(
444                    "Avro site '{}' is non-nullable, but array contains nulls",
445                    field.name()
446                )));
447            }
448            (Some(order), false) => {
449                // Optimization: drop any bitmap; emit a constant "value" branch byte.
450                NullState::NullableNoNulls {
451                    union_value_byte: union_value_branch_byte(order, false),
452                }
453            }
454            (Some(null_order), true) => {
455                let Some(nulls) = array.nulls().cloned() else {
456                    return Err(ArrowError::InvalidArgumentError(format!(
457                        "Array for Avro site '{}' reports nulls but has no null buffer",
458                        field.name()
459                    )));
460                };
461                NullState::Nullable { nulls, null_order }
462            }
463        };
464        Ok(Self {
465            encoder,
466            null_state,
467        })
468    }
469
470    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
471        match &self.null_state {
472            NullState::NonNullable => {}
473            NullState::NullableNoNulls { union_value_byte } => out
474                .write_all(&[*union_value_byte])
475                .map_err(|e| ArrowError::IoError(format!("write union value branch: {e}"), e))?,
476            NullState::Nullable { nulls, null_order } if nulls.is_null(idx) => {
477                return write_optional_index(out, true, *null_order); // no value to write
478            }
479            NullState::Nullable { null_order, .. } => {
480                write_optional_index(out, false, *null_order)?;
481            }
482        }
483        self.encoder.encode(out, idx)
484    }
485}
486
487fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
488    let nulls_first = null_order == Nullability::default();
489    if nulls_first == is_null {
490        0x00
491    } else {
492        0x02
493    }
494}
495
496/// Per‑site encoder plan for a field. This mirrors the Avro structure, so nested
497/// optional branch order can be honored exactly as declared by the schema.
498#[derive(Debug, Clone)]
499enum FieldPlan {
500    /// Non-nested scalar/logical type
501    Scalar,
502    /// Record/Struct with Avro‑ordered children
503    Struct { encoders: Vec<FieldBinding> },
504    /// Array with item‑site nullability and nested plan
505    List {
506        items_nullability: Option<Nullability>,
507        item_plan: Box<FieldPlan>,
508    },
509    /// Avro decimal logical type (bytes or fixed). `size=None` => bytes(decimal), `Some(n)` => fixed(n)
510    Decimal { size: Option<usize> },
511    /// Avro UUID logical type (fixed)
512    Uuid,
513    /// Avro map with value‑site nullability and nested plan
514    Map {
515        values_nullability: Option<Nullability>,
516        value_plan: Box<FieldPlan>,
517    },
518    /// Avro enum; maps to Arrow Dictionary<Int32, Utf8> with dictionary values
519    /// exactly equal and ordered as the Avro enum `symbols`.
520    Enum { symbols: Arc<[String]> },
521}
522
523#[derive(Debug, Clone)]
524struct FieldBinding {
525    /// Index of the Arrow field/column associated with this Avro field site
526    arrow_index: usize,
527    /// Nullability/order for this site (None for required fields)
528    nullability: Option<Nullability>,
529    /// Nested plan for this site
530    plan: FieldPlan,
531}
532
533/// Builder for `RecordEncoder` write plan
534#[derive(Debug)]
535pub struct RecordEncoderBuilder<'a> {
536    avro_root: &'a AvroField,
537    arrow_schema: &'a ArrowSchema,
538    fingerprint: Option<Fingerprint>,
539}
540
541impl<'a> RecordEncoderBuilder<'a> {
542    /// Create a new builder from the Avro root and Arrow schema.
543    pub fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self {
544        Self {
545            avro_root,
546            arrow_schema,
547            fingerprint: None,
548        }
549    }
550
551    pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
552        self.fingerprint = fingerprint;
553        self
554    }
555
556    /// Build the `RecordEncoder` by walking the Avro **record** root in Avro order,
557    /// resolving each field to an Arrow index by name.
558    pub fn build(self) -> Result<RecordEncoder, ArrowError> {
559        let avro_root_dt = self.avro_root.data_type();
560        let Codec::Struct(root_fields) = avro_root_dt.codec() else {
561            return Err(ArrowError::SchemaError(
562                "Top-level Avro schema must be a record/struct".into(),
563            ));
564        };
565        let mut columns = Vec::with_capacity(root_fields.len());
566        for root_field in root_fields.as_ref() {
567            let name = root_field.name();
568            let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
569                ArrowError::SchemaError(format!("Schema mismatch for field '{name}': {e}"))
570            })?;
571            columns.push(FieldBinding {
572                arrow_index,
573                nullability: root_field.data_type().nullability(),
574                plan: FieldPlan::build(
575                    root_field.data_type(),
576                    self.arrow_schema.field(arrow_index),
577                )?,
578            });
579        }
580        Ok(RecordEncoder {
581            columns,
582            prefix: self.fingerprint.map(|fp| fp.make_prefix()),
583        })
584    }
585}
586
587/// A pre-computed plan for encoding a `RecordBatch` to Avro.
588///
589/// Derived from an Avro schema and an Arrow schema. It maps
590/// top-level Avro fields to Arrow columns and contains a nested encoding plan
591/// for each column.
592#[derive(Debug, Clone)]
593pub struct RecordEncoder {
594    columns: Vec<FieldBinding>,
595    /// Optional pre-built, variable-length prefix written before each record.
596    prefix: Option<Prefix>,
597}
598
599impl RecordEncoder {
600    fn prepare_for_batch<'a>(
601        &'a self,
602        batch: &'a RecordBatch,
603    ) -> Result<Vec<FieldEncoder<'a>>, ArrowError> {
604        let schema_binding = batch.schema();
605        let fields = schema_binding.fields();
606        let arrays = batch.columns();
607        let mut out = Vec::with_capacity(self.columns.len());
608        for col_plan in self.columns.iter() {
609            let arrow_index = col_plan.arrow_index;
610            let array = arrays.get(arrow_index).ok_or_else(|| {
611                ArrowError::SchemaError(format!("Column index {arrow_index} out of range"))
612            })?;
613            let field = fields[arrow_index].as_ref();
614            let encoder = prepare_value_site_encoder(
615                array.as_ref(),
616                field,
617                col_plan.nullability,
618                &col_plan.plan,
619            )?;
620            out.push(encoder);
621        }
622        Ok(out)
623    }
624
625    /// Encode a `RecordBatch` using this encoder plan.
626    ///
627    /// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of many small writes.
628    pub fn encode<W: Write>(&self, out: &mut W, batch: &RecordBatch) -> Result<(), ArrowError> {
629        let mut column_encoders = self.prepare_for_batch(batch)?;
630        let n = batch.num_rows();
631        match self.prefix {
632            Some(prefix) => {
633                for row in 0..n {
634                    out.write_all(prefix.as_slice())
635                        .map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?;
636                    for enc in column_encoders.iter_mut() {
637                        enc.encode(out, row)?;
638                    }
639                }
640            }
641            None => {
642                for row in 0..n {
643                    for enc in column_encoders.iter_mut() {
644                        enc.encode(out, row)?;
645                    }
646                }
647            }
648        }
649        Ok(())
650    }
651}
652
653fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
654    fields.iter().position(|f| f.name() == name)
655}
656
657fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option<usize> {
658    // Prefer common Arrow field names; fall back to second child if exactly two
659    find_struct_child_index(fields, "value")
660        .or_else(|| find_struct_child_index(fields, "values"))
661        .or_else(|| if fields.len() == 2 { Some(1) } else { None })
662}
663
664impl FieldPlan {
665    fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self, ArrowError> {
666        if let DataType::FixedSizeBinary(len) = arrow_field.data_type() {
667            // Extension-based detection (only when the feature is enabled)
668            let ext_is_uuid = {
669                #[cfg(feature = "canonical_extension_types")]
670                {
671                    matches!(
672                        arrow_field.extension_type_name(),
673                        Some("arrow.uuid") | Some("uuid")
674                    )
675                }
676                #[cfg(not(feature = "canonical_extension_types"))]
677                {
678                    false
679                }
680            };
681            let md_is_uuid = arrow_field
682                .metadata()
683                .get("logicalType")
684                .map(|s| s.as_str())
685                == Some("uuid");
686            if ext_is_uuid || md_is_uuid {
687                if *len != 16 {
688                    return Err(ArrowError::InvalidArgumentError(
689                        "logicalType=uuid requires FixedSizeBinary(16)".into(),
690                    ));
691                }
692                return Ok(FieldPlan::Uuid);
693            }
694        }
695        match avro_dt.codec() {
696            Codec::Struct(avro_fields) => {
697                let fields = match arrow_field.data_type() {
698                    DataType::Struct(struct_fields) => struct_fields,
699                    other => {
700                        return Err(ArrowError::SchemaError(format!(
701                            "Avro struct maps to Arrow Struct, found: {other:?}"
702                        )))
703                    }
704                };
705                let mut encoders = Vec::with_capacity(avro_fields.len());
706                for avro_field in avro_fields.iter() {
707                    let name = avro_field.name().to_string();
708                    let idx = find_struct_child_index(fields, &name).ok_or_else(|| {
709                        ArrowError::SchemaError(format!(
710                            "Struct field '{name}' not present in Arrow field '{}'",
711                            arrow_field.name()
712                        ))
713                    })?;
714                    encoders.push(FieldBinding {
715                        arrow_index: idx,
716                        nullability: avro_field.data_type().nullability(),
717                        plan: FieldPlan::build(avro_field.data_type(), fields[idx].as_ref())?,
718                    });
719                }
720                Ok(FieldPlan::Struct { encoders })
721            }
722            Codec::List(items_dt) => match arrow_field.data_type() {
723                DataType::List(field_ref) => Ok(FieldPlan::List {
724                    items_nullability: items_dt.nullability(),
725                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
726                }),
727                DataType::LargeList(field_ref) => Ok(FieldPlan::List {
728                    items_nullability: items_dt.nullability(),
729                    item_plan: Box::new(FieldPlan::build(items_dt.as_ref(), field_ref.as_ref())?),
730                }),
731                other => Err(ArrowError::SchemaError(format!(
732                    "Avro array maps to Arrow List/LargeList, found: {other:?}"
733                ))),
734            },
735            Codec::Map(values_dt) => {
736                let entries_field = match arrow_field.data_type() {
737                    DataType::Map(entries, _sorted) => entries.as_ref(),
738                    other => {
739                        return Err(ArrowError::SchemaError(format!(
740                            "Avro map maps to Arrow DataType::Map, found: {other:?}"
741                        )))
742                    }
743                };
744                let entries_struct_fields = match entries_field.data_type() {
745                    DataType::Struct(fs) => fs,
746                    other => {
747                        return Err(ArrowError::SchemaError(format!(
748                            "Arrow Map entries must be Struct, found: {other:?}"
749                        )))
750                    }
751                };
752                let value_idx =
753                    find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
754                        ArrowError::SchemaError("Map entries struct missing value field".into())
755                    })?;
756                let value_field = entries_struct_fields[value_idx].as_ref();
757                let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?;
758                Ok(FieldPlan::Map {
759                    values_nullability: values_dt.nullability(),
760                    value_plan: Box::new(value_plan),
761                })
762            }
763            Codec::Enum(symbols) => match arrow_field.data_type() {
764                DataType::Dictionary(key_dt, value_dt) => {
765                    if **key_dt != DataType::Int32 {
766                        return Err(ArrowError::SchemaError(
767                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
768                        ));
769                    }
770                    if **value_dt != DataType::Utf8 {
771                        return Err(ArrowError::SchemaError(
772                            "Avro enum requires Dictionary<Int32, Utf8>".into(),
773                        ));
774                    }
775                    Ok(FieldPlan::Enum {
776                        symbols: symbols.clone(),
777                    })
778                }
779                other => Err(ArrowError::SchemaError(format!(
780                    "Avro enum maps to Arrow Dictionary<Int32, Utf8>, found: {other:?}"
781                ))),
782            },
783            // decimal site (bytes or fixed(N)) with precision/scale validation
784            Codec::Decimal(precision, scale_opt, fixed_size_opt) => {
785                let (ap, as_) = match arrow_field.data_type() {
786                    #[cfg(feature = "small_decimals")]
787                    DataType::Decimal32(p, s) => (*p as usize, *s as i32),
788                    #[cfg(feature = "small_decimals")]
789                    DataType::Decimal64(p, s) => (*p as usize, *s as i32),
790                    DataType::Decimal128(p, s) => (*p as usize, *s as i32),
791                    DataType::Decimal256(p, s) => (*p as usize, *s as i32),
792                    other => {
793                        return Err(ArrowError::SchemaError(format!(
794                            "Avro decimal requires Arrow decimal, got {other:?} for field '{}'",
795                            arrow_field.name()
796                        )))
797                    }
798                };
799                let sc = scale_opt.unwrap_or(0) as i32; // Avro scale defaults to 0 if absent
800                if ap != *precision || as_ != sc {
801                    return Err(ArrowError::SchemaError(format!(
802                        "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})",
803                        arrow_field.name()
804                    )));
805                }
806                Ok(FieldPlan::Decimal {
807                    size: *fixed_size_opt,
808                })
809            }
810            Codec::Interval => match arrow_field.data_type() {
811                DataType::Interval(IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime
812                ) => Ok(FieldPlan::Scalar),
813                other => Err(ArrowError::SchemaError(format!(
814                    "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}"
815                ))),
816            }
817            _ => Ok(FieldPlan::Scalar),
818        }
819    }
820}
821
822enum Encoder<'a> {
823    Boolean(BooleanEncoder<'a>),
824    Int(IntEncoder<'a, Int32Type>),
825    Long(LongEncoder<'a, Int64Type>),
826    Timestamp(LongEncoder<'a, TimestampMicrosecondType>),
827    DurationSeconds(LongEncoder<'a, DurationSecondType>),
828    DurationMillis(LongEncoder<'a, DurationMillisecondType>),
829    DurationMicros(LongEncoder<'a, DurationMicrosecondType>),
830    DurationNanos(LongEncoder<'a, DurationNanosecondType>),
831    Float32(F32Encoder<'a>),
832    Float64(F64Encoder<'a>),
833    Binary(BinaryEncoder<'a, i32>),
834    LargeBinary(BinaryEncoder<'a, i64>),
835    Utf8(Utf8Encoder<'a>),
836    Utf8Large(Utf8LargeEncoder<'a>),
837    List(Box<ListEncoder32<'a>>),
838    LargeList(Box<ListEncoder64<'a>>),
839    Struct(Box<StructEncoder<'a>>),
840    /// Avro `fixed` encoder (raw bytes, no length)
841    Fixed(FixedEncoder<'a>),
842    /// Avro `uuid` logical type encoder (string with RFC‑4122 hyphenated text)
843    Uuid(UuidEncoder<'a>),
844    /// Avro `duration` logical type (Arrow Interval(MonthDayNano)) encoder
845    IntervalMonthDayNano(DurationEncoder<'a, IntervalMonthDayNanoType>),
846    /// Avro `duration` logical type (Arrow Interval(YearMonth)) encoder
847    IntervalYearMonth(DurationEncoder<'a, IntervalYearMonthType>),
848    /// Avro `duration` logical type (Arrow Interval(DayTime)) encoder
849    IntervalDayTime(DurationEncoder<'a, IntervalDayTimeType>),
850    #[cfg(feature = "small_decimals")]
851    Decimal32(Decimal32Encoder<'a>),
852    #[cfg(feature = "small_decimals")]
853    Decimal64(Decimal64Encoder<'a>),
854    Decimal128(Decimal128Encoder<'a>),
855    Decimal256(Decimal256Encoder<'a>),
856    /// Avro `enum` encoder: writes the key (int) as the enum index.
857    Enum(EnumEncoder<'a>),
858    Map(Box<MapEncoder<'a>>),
859}
860
861impl<'a> Encoder<'a> {
862    /// Encode the value at `idx`.
863    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
864        match self {
865            Encoder::Boolean(e) => e.encode(out, idx),
866            Encoder::Int(e) => e.encode(out, idx),
867            Encoder::Long(e) => e.encode(out, idx),
868            Encoder::Timestamp(e) => e.encode(out, idx),
869            Encoder::DurationSeconds(e) => e.encode(out, idx),
870            Encoder::DurationMicros(e) => e.encode(out, idx),
871            Encoder::DurationMillis(e) => e.encode(out, idx),
872            Encoder::DurationNanos(e) => e.encode(out, idx),
873            Encoder::Float32(e) => e.encode(out, idx),
874            Encoder::Float64(e) => e.encode(out, idx),
875            Encoder::Binary(e) => e.encode(out, idx),
876            Encoder::LargeBinary(e) => e.encode(out, idx),
877            Encoder::Utf8(e) => e.encode(out, idx),
878            Encoder::Utf8Large(e) => e.encode(out, idx),
879            Encoder::List(e) => e.encode(out, idx),
880            Encoder::LargeList(e) => e.encode(out, idx),
881            Encoder::Struct(e) => e.encode(out, idx),
882            Encoder::Fixed(e) => (e).encode(out, idx),
883            Encoder::Uuid(e) => (e).encode(out, idx),
884            Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx),
885            Encoder::IntervalYearMonth(e) => (e).encode(out, idx),
886            Encoder::IntervalDayTime(e) => (e).encode(out, idx),
887            #[cfg(feature = "small_decimals")]
888            Encoder::Decimal32(e) => (e).encode(out, idx),
889            #[cfg(feature = "small_decimals")]
890            Encoder::Decimal64(e) => (e).encode(out, idx),
891            Encoder::Decimal128(e) => (e).encode(out, idx),
892            Encoder::Decimal256(e) => (e).encode(out, idx),
893            Encoder::Map(e) => (e).encode(out, idx),
894            Encoder::Enum(e) => (e).encode(out, idx),
895        }
896    }
897}
898
899struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
900impl BooleanEncoder<'_> {
901    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
902        write_bool(out, self.0.value(idx))
903    }
904}
905
906/// Generic Avro `int` encoder for primitive arrays with `i32` native values.
907struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
908impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
909    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
910        write_int(out, self.0.value(idx))
911    }
912}
913
914/// Generic Avro `long` encoder for primitive arrays with `i64` native values.
915struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
916impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
917    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
918        write_long(out, self.0.value(idx))
919    }
920}
921
922/// Unified binary encoder generic over offset size (i32/i64).
923struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
924impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
925    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
926        write_len_prefixed(out, self.0.value(idx))
927    }
928}
929
930struct F32Encoder<'a>(&'a arrow_array::Float32Array);
931impl F32Encoder<'_> {
932    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
933        // Avro float: 4 bytes, IEEE-754 little-endian
934        let bits = self.0.value(idx).to_bits();
935        out.write_all(&bits.to_le_bytes())
936            .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e))
937    }
938}
939
940struct F64Encoder<'a>(&'a arrow_array::Float64Array);
941impl F64Encoder<'_> {
942    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
943        // Avro double: 8 bytes, IEEE-754 little-endian
944        let bits = self.0.value(idx).to_bits();
945        out.write_all(&bits.to_le_bytes())
946            .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
947    }
948}
949
950struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
951
952impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
953    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
954        write_len_prefixed(out, self.0.value(idx).as_bytes())
955    }
956}
957
958type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
959type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
960
961/// Internal key array kind used by Map encoder.
962enum KeyKind<'a> {
963    Utf8(&'a GenericStringArray<i32>),
964    LargeUtf8(&'a GenericStringArray<i64>),
965}
966struct MapEncoder<'a> {
967    map: &'a MapArray,
968    keys: KeyKind<'a>,
969    values: FieldEncoder<'a>,
970    keys_offset: usize,
971    values_offset: usize,
972}
973
974impl<'a> MapEncoder<'a> {
975    fn try_new(
976        map: &'a MapArray,
977        values_nullability: Option<Nullability>,
978        value_plan: &FieldPlan,
979    ) -> Result<Self, ArrowError> {
980        let keys_arr = map.keys();
981        let keys_kind = match keys_arr.data_type() {
982            DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
983            DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
984            other => {
985                return Err(ArrowError::SchemaError(format!(
986                    "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}"
987                )))
988            }
989        };
990
991        let entries_struct_fields = match map.data_type() {
992            DataType::Map(entries, _) => match entries.data_type() {
993                DataType::Struct(fs) => fs,
994                other => {
995                    return Err(ArrowError::SchemaError(format!(
996                        "Arrow Map entries must be Struct, found: {other:?}"
997                    )))
998                }
999            },
1000            _ => {
1001                return Err(ArrowError::SchemaError(
1002                    "Expected MapArray with DataType::Map".into(),
1003                ))
1004            }
1005        };
1006
1007        let v_idx = find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
1008            ArrowError::SchemaError("Map entries struct missing value field".into())
1009        })?;
1010        let value_field = entries_struct_fields[v_idx].as_ref();
1011
1012        let values_enc = prepare_value_site_encoder(
1013            map.values().as_ref(),
1014            value_field,
1015            values_nullability,
1016            value_plan,
1017        )?;
1018
1019        Ok(Self {
1020            map,
1021            keys: keys_kind,
1022            values: values_enc,
1023            keys_offset: keys_arr.offset(),
1024            values_offset: map.values().offset(),
1025        })
1026    }
1027
1028    fn encode_map_entries<W, O>(
1029        out: &mut W,
1030        keys: &GenericStringArray<O>,
1031        keys_offset: usize,
1032        start: usize,
1033        end: usize,
1034        mut write_item: impl FnMut(&mut W, usize) -> Result<(), ArrowError>,
1035    ) -> Result<(), ArrowError>
1036    where
1037        W: Write + ?Sized,
1038        O: OffsetSizeTrait,
1039    {
1040        encode_blocked_range(out, start, end, |out, j| {
1041            let j_key = j.saturating_sub(keys_offset);
1042            write_len_prefixed(out, keys.value(j_key).as_bytes())?;
1043            write_item(out, j)
1044        })
1045    }
1046
1047    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1048        let offsets = self.map.offsets();
1049        let start = offsets[idx] as usize;
1050        let end = offsets[idx + 1] as usize;
1051
1052        let mut write_item = |out: &mut W, j: usize| {
1053            let j_val = j.saturating_sub(self.values_offset);
1054            self.values.encode(out, j_val)
1055        };
1056
1057        match self.keys {
1058            KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
1059                out,
1060                arr,
1061                self.keys_offset,
1062                start,
1063                end,
1064                write_item,
1065            ),
1066            KeyKind::LargeUtf8(arr) => MapEncoder::<'a>::encode_map_entries(
1067                out,
1068                arr,
1069                self.keys_offset,
1070                start,
1071                end,
1072                write_item,
1073            ),
1074        }
1075    }
1076}
1077
1078/// Avro `enum` encoder for Arrow `DictionaryArray<Int32, Utf8>`.
1079///
1080/// Per Avro spec, an enum is encoded as an **int** equal to the
1081/// zero-based position of the symbol in the schema’s `symbols` list.
1082/// We validate at construction that the dictionary values equal the symbols,
1083/// so we can directly write the key value here.
1084struct EnumEncoder<'a> {
1085    keys: &'a PrimitiveArray<Int32Type>,
1086}
1087impl EnumEncoder<'_> {
1088    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) -> Result<(), ArrowError> {
1089        write_int(out, self.keys.value(row))
1090    }
1091}
1092
1093struct StructEncoder<'a> {
1094    encoders: Vec<FieldEncoder<'a>>,
1095}
1096
1097impl<'a> StructEncoder<'a> {
1098    fn try_new(
1099        array: &'a StructArray,
1100        field_bindings: &[FieldBinding],
1101    ) -> Result<Self, ArrowError> {
1102        let DataType::Struct(fields) = array.data_type() else {
1103            return Err(ArrowError::SchemaError("Expected Struct".into()));
1104        };
1105        let mut encoders = Vec::with_capacity(field_bindings.len());
1106        for field_binding in field_bindings {
1107            let idx = field_binding.arrow_index;
1108            let column = array.columns().get(idx).ok_or_else(|| {
1109                ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1110            })?;
1111            let field = fields.get(idx).ok_or_else(|| {
1112                ArrowError::SchemaError(format!("Struct child index {idx} out of range"))
1113            })?;
1114            let encoder = prepare_value_site_encoder(
1115                column.as_ref(),
1116                field,
1117                field_binding.nullability,
1118                &field_binding.plan,
1119            )?;
1120            encoders.push(encoder);
1121        }
1122        Ok(Self { encoders })
1123    }
1124
1125    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1126        for encoder in self.encoders.iter_mut() {
1127            encoder.encode(out, idx)?;
1128        }
1129        Ok(())
1130    }
1131}
1132
1133/// Encode a blocked range of items with Avro array block framing.
1134///
1135/// `write_item` must take `(out, index)` to maintain the "out-first" convention.
1136fn encode_blocked_range<W: Write + ?Sized, F>(
1137    out: &mut W,
1138    start: usize,
1139    end: usize,
1140    mut write_item: F,
1141) -> Result<(), ArrowError>
1142where
1143    F: FnMut(&mut W, usize) -> Result<(), ArrowError>,
1144{
1145    let len = end.saturating_sub(start);
1146    if len == 0 {
1147        // Zero-length terminator per Avro spec.
1148        write_long(out, 0)?;
1149        return Ok(());
1150    }
1151    // Emit a single positive block for performance, then the end marker.
1152    write_long(out, len as i64)?;
1153    for row in start..end {
1154        write_item(out, row)?;
1155    }
1156    write_long(out, 0)?;
1157    Ok(())
1158}
1159
1160struct ListEncoder<'a, O: OffsetSizeTrait> {
1161    list: &'a GenericListArray<O>,
1162    values: FieldEncoder<'a>,
1163    values_offset: usize,
1164}
1165
1166type ListEncoder32<'a> = ListEncoder<'a, i32>;
1167type ListEncoder64<'a> = ListEncoder<'a, i64>;
1168
1169impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
1170    fn try_new(
1171        list: &'a GenericListArray<O>,
1172        items_nullability: Option<Nullability>,
1173        item_plan: &FieldPlan,
1174    ) -> Result<Self, ArrowError> {
1175        let child_field = match list.data_type() {
1176            DataType::List(field) => field.as_ref(),
1177            DataType::LargeList(field) => field.as_ref(),
1178            _ => {
1179                return Err(ArrowError::SchemaError(
1180                    "Expected List or LargeList for ListEncoder".into(),
1181                ))
1182            }
1183        };
1184        let values_enc = prepare_value_site_encoder(
1185            list.values().as_ref(),
1186            child_field,
1187            items_nullability,
1188            item_plan,
1189        )?;
1190        Ok(Self {
1191            list,
1192            values: values_enc,
1193            values_offset: list.values().offset(),
1194        })
1195    }
1196
1197    fn encode_list_range<W: Write + ?Sized>(
1198        &mut self,
1199        out: &mut W,
1200        start: usize,
1201        end: usize,
1202    ) -> Result<(), ArrowError> {
1203        encode_blocked_range(out, start, end, |out, row| {
1204            self.values
1205                .encode(out, row.saturating_sub(self.values_offset))
1206        })
1207    }
1208
1209    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1210        let offsets = self.list.offsets();
1211        let start = offsets[idx].to_usize().ok_or_else(|| {
1212            ArrowError::InvalidArgumentError(format!("Error converting offset[{idx}] to usize"))
1213        })?;
1214        let end = offsets[idx + 1].to_usize().ok_or_else(|| {
1215            ArrowError::InvalidArgumentError(format!(
1216                "Error converting offset[{}] to usize",
1217                idx + 1
1218            ))
1219        })?;
1220        self.encode_list_range(out, start, end)
1221    }
1222}
1223
1224fn prepare_value_site_encoder<'a>(
1225    values_array: &'a dyn Array,
1226    value_field: &Field,
1227    nullability: Option<Nullability>,
1228    plan: &FieldPlan,
1229) -> Result<FieldEncoder<'a>, ArrowError> {
1230    // Effective nullability is computed here from the writer-declared site nullability and data.
1231    FieldEncoder::make_encoder(values_array, value_field, plan, nullability)
1232}
1233
1234/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`.
1235/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
1236struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
1237impl FixedEncoder<'_> {
1238    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1239        let v = self.0.value(idx); // &[u8] of fixed width
1240        out.write_all(v)
1241            .map_err(|e| ArrowError::IoError(format!("write fixed bytes: {e}"), e))
1242    }
1243}
1244
1245/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) → Avro string (UUID).
1246/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated form.
1247struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
1248impl UuidEncoder<'_> {
1249    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1250        let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
1251        buf[0] = 0x48;
1252        let v = self.0.value(idx);
1253        let u = Uuid::from_slice(v)
1254            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid UUID bytes: {e}")))?;
1255        let _ = u.hyphenated().encode_lower(&mut buf[1..]);
1256        out.write_all(&buf)
1257            .map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e))
1258    }
1259}
1260
1261#[derive(Copy, Clone)]
1262struct DurationParts {
1263    months: u32,
1264    days: u32,
1265    millis: u32,
1266}
1267/// Trait mapping an Arrow interval native value to Avro duration `(months, days, millis)`.
1268trait IntervalToDurationParts: ArrowPrimitiveType {
1269    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError>;
1270}
1271impl IntervalToDurationParts for IntervalMonthDayNanoType {
1272    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1273        let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
1274        if months < 0 || days < 0 || nanos < 0 {
1275            return Err(ArrowError::InvalidArgumentError(
1276                "Avro 'duration' cannot encode negative months/days/nanoseconds".into(),
1277            ));
1278        }
1279        if nanos % 1_000_000 != 0 {
1280            return Err(ArrowError::InvalidArgumentError(
1281                "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000"
1282                    .into(),
1283            ));
1284        }
1285        let millis = nanos / 1_000_000;
1286        if millis > u32::MAX as i64 {
1287            return Err(ArrowError::InvalidArgumentError(
1288                "Avro 'duration' milliseconds exceed u32::MAX".into(),
1289            ));
1290        }
1291        Ok(DurationParts {
1292            months: months as u32,
1293            days: days as u32,
1294            millis: millis as u32,
1295        })
1296    }
1297}
1298impl IntervalToDurationParts for IntervalYearMonthType {
1299    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1300        if native < 0 {
1301            return Err(ArrowError::InvalidArgumentError(
1302                "Avro 'duration' cannot encode negative months".into(),
1303            ));
1304        }
1305        Ok(DurationParts {
1306            months: native as u32,
1307            days: 0,
1308            millis: 0,
1309        })
1310    }
1311}
1312impl IntervalToDurationParts for IntervalDayTimeType {
1313    fn duration_parts(native: Self::Native) -> Result<DurationParts, ArrowError> {
1314        let (days, millis) = IntervalDayTimeType::to_parts(native);
1315        if days < 0 || millis < 0 {
1316            return Err(ArrowError::InvalidArgumentError(
1317                "Avro 'duration' cannot encode negative days or milliseconds".into(),
1318            ));
1319        }
1320        Ok(DurationParts {
1321            months: 0,
1322            days: days as u32,
1323            millis: millis as u32,
1324        })
1325    }
1326}
1327/// Single generic encoder used for all three interval units.
1328/// Writes Avro `fixed(12)` as three little-endian u32 values in one call.
1329struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray<P>);
1330impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a, P> {
1331    #[inline(always)]
1332    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1333        let parts = P::duration_parts(self.0.value(idx))?;
1334        let months = parts.months.to_le_bytes();
1335        let days = parts.days.to_le_bytes();
1336        let ms = parts.millis.to_le_bytes();
1337        // SAFETY
1338        // - Endianness & layout: Avro's `duration` logical type is encoded as fixed(12)
1339        //   with three *little-endian* unsigned 32-bit integers in order: (months, days, millis).
1340        //   We explicitly materialize exactly those 12 bytes.
1341        // - In-bounds indexing: `to_le_bytes()` on `u32` returns `[u8; 4]` by contract,
1342        //   therefore, the constant indices 0..=3 used below are *always* in-bounds.
1343        //   Rust will panic on out-of-bounds indexing, but there is no such path here;
1344        //   the compiler can also elide the bound checks for constant, provably in-range
1345        //   indices. [std docs; Rust Performance Book on bounds-check elimination]
1346        // - Memory safety: The `[u8; 12]` array is built on the stack by value, with no
1347        //   aliasing and no uninitialized memory. There is no `unsafe`.
1348        // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated and mapped
1349        //   into `ArrowError`, so I/O errors are reported, not panicked.
1350        // Consequently, constructing `buf` with the constant indices below is safe and
1351        // panic-free under these validated preconditions.
1352        let buf = [
1353            months[0], months[1], months[2], months[3], days[0], days[1], days[2], days[3], ms[0],
1354            ms[1], ms[2], ms[3],
1355        ];
1356        out.write_all(&buf)
1357            .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e))
1358    }
1359}
1360
1361/// Minimal trait to obtain a big-endian fixed-size byte array for a decimal's
1362/// unscaled integer value at `idx`.
1363trait DecimalBeBytes<const N: usize> {
1364    fn value_be_bytes(&self, idx: usize) -> [u8; N];
1365}
1366#[cfg(feature = "small_decimals")]
1367impl DecimalBeBytes<4> for Decimal32Array {
1368    fn value_be_bytes(&self, idx: usize) -> [u8; 4] {
1369        self.value(idx).to_be_bytes()
1370    }
1371}
1372#[cfg(feature = "small_decimals")]
1373impl DecimalBeBytes<8> for Decimal64Array {
1374    fn value_be_bytes(&self, idx: usize) -> [u8; 8] {
1375        self.value(idx).to_be_bytes()
1376    }
1377}
1378impl DecimalBeBytes<16> for Decimal128Array {
1379    fn value_be_bytes(&self, idx: usize) -> [u8; 16] {
1380        self.value(idx).to_be_bytes()
1381    }
1382}
1383impl DecimalBeBytes<32> for Decimal256Array {
1384    fn value_be_bytes(&self, idx: usize) -> [u8; 32] {
1385        // Arrow i256 → [u8; 32] big-endian
1386        self.value(idx).to_be_bytes()
1387    }
1388}
1389
1390/// Generic Avro decimal encoder over Arrow decimal arrays.
1391/// - When `fixed_size` is `None` → Avro `bytes(decimal)`; writes the minimal
1392///   two's-complement representation with a length prefix.
1393/// - When `Some(n)` → Avro `fixed(n, decimal)`; sign-extends (or validates)
1394///   to exactly `n` bytes and writes them directly.
1395struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes<N>> {
1396    arr: &'a A,
1397    fixed_size: Option<usize>,
1398}
1399
1400impl<'a, const N: usize, A: DecimalBeBytes<N>> DecimalEncoder<'a, N, A> {
1401    fn new(arr: &'a A, fixed_size: Option<usize>) -> Self {
1402        Self { arr, fixed_size }
1403    }
1404
1405    fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> {
1406        let be = self.arr.value_be_bytes(idx);
1407        match self.fixed_size {
1408            Some(n) => write_sign_extended(out, &be, n),
1409            None => write_len_prefixed(out, minimal_twos_complement(&be)),
1410        }
1411    }
1412}
1413
1414#[cfg(feature = "small_decimals")]
1415type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>;
1416#[cfg(feature = "small_decimals")]
1417type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>;
1418type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>;
1419type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>;
1420
1421#[cfg(test)]
1422mod tests {
1423    use super::*;
1424    use arrow_array::types::Int32Type;
1425    use arrow_array::{
1426        Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
1427        Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, StringArray,
1428        TimestampMicrosecondArray,
1429    };
1430    use arrow_schema::{DataType, Field, Fields};
1431
1432    fn zigzag_i64(v: i64) -> u64 {
1433        ((v << 1) ^ (v >> 63)) as u64
1434    }
1435
1436    fn varint(mut x: u64) -> Vec<u8> {
1437        let mut out = Vec::new();
1438        while (x & !0x7f) != 0 {
1439            out.push(((x & 0x7f) as u8) | 0x80);
1440            x >>= 7;
1441        }
1442        out.push((x & 0x7f) as u8);
1443        out
1444    }
1445
1446    fn avro_long_bytes(v: i64) -> Vec<u8> {
1447        varint(zigzag_i64(v))
1448    }
1449
1450    fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
1451        let mut out = avro_long_bytes(payload.len() as i64);
1452        out.extend_from_slice(payload);
1453        out
1454    }
1455
1456    fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
1457        let m = months.to_le_bytes();
1458        let d = days.to_le_bytes();
1459        let ms = millis.to_le_bytes();
1460        [
1461            m[0], m[1], m[2], m[3], d[0], d[1], d[2], d[3], ms[0], ms[1], ms[2], ms[3],
1462        ]
1463    }
1464
1465    fn encode_all(
1466        array: &dyn Array,
1467        plan: &FieldPlan,
1468        nullability: Option<Nullability>,
1469    ) -> Vec<u8> {
1470        let field = Field::new("f", array.data_type().clone(), true);
1471        let mut enc = FieldEncoder::make_encoder(array, &field, plan, nullability).unwrap();
1472        let mut out = Vec::new();
1473        for i in 0..array.len() {
1474            enc.encode(&mut out, i).unwrap();
1475        }
1476        out
1477    }
1478
1479    fn assert_bytes_eq(actual: &[u8], expected: &[u8]) {
1480        if actual != expected {
1481            let to_hex = |b: &[u8]| {
1482                b.iter()
1483                    .map(|x| format!("{:02X}", x))
1484                    .collect::<Vec<_>>()
1485                    .join(" ")
1486            };
1487            panic!(
1488                "mismatch\n  expected: [{}]\n    actual: [{}]",
1489                to_hex(expected),
1490                to_hex(actual)
1491            );
1492        }
1493    }
1494
1495    #[test]
1496    fn binary_encoder() {
1497        let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
1498        let arr = BinaryArray::from_vec(values);
1499        let mut expected = Vec::new();
1500        for payload in [b"" as &[u8], b"ab", b"\x00\xFF"] {
1501            expected.extend(avro_len_prefixed_bytes(payload));
1502        }
1503        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1504        assert_bytes_eq(&got, &expected);
1505    }
1506
1507    #[test]
1508    fn large_binary_encoder() {
1509        let values: Vec<&[u8]> = vec![b"xyz", b""];
1510        let arr = LargeBinaryArray::from_vec(values);
1511        let mut expected = Vec::new();
1512        for payload in [b"xyz" as &[u8], b""] {
1513            expected.extend(avro_len_prefixed_bytes(payload));
1514        }
1515        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1516        assert_bytes_eq(&got, &expected);
1517    }
1518
1519    #[test]
1520    fn utf8_encoder() {
1521        let arr = StringArray::from(vec!["", "A", "BC"]);
1522        let mut expected = Vec::new();
1523        for s in ["", "A", "BC"] {
1524            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
1525        }
1526        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1527        assert_bytes_eq(&got, &expected);
1528    }
1529
1530    #[test]
1531    fn large_utf8_encoder() {
1532        let arr = LargeStringArray::from(vec!["hello", ""]);
1533        let mut expected = Vec::new();
1534        for s in ["hello", ""] {
1535            expected.extend(avro_len_prefixed_bytes(s.as_bytes()));
1536        }
1537        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1538        assert_bytes_eq(&got, &expected);
1539    }
1540
1541    #[test]
1542    fn list_encoder_int32() {
1543        // Build ListArray [[1,2], [], [3]]
1544        let values = Int32Array::from(vec![1, 2, 3]);
1545        let offsets = vec![0, 2, 2, 3];
1546        let list = ListArray::new(
1547            Field::new("item", DataType::Int32, true).into(),
1548            arrow_buffer::OffsetBuffer::new(offsets.into()),
1549            Arc::new(values) as ArrayRef,
1550            None,
1551        );
1552        // Avro array encoding per row
1553        let mut expected = Vec::new();
1554        // row 0: block len 2, items 1,2 then 0
1555        expected.extend(avro_long_bytes(2));
1556        expected.extend(avro_long_bytes(1));
1557        expected.extend(avro_long_bytes(2));
1558        expected.extend(avro_long_bytes(0));
1559        // row 1: empty
1560        expected.extend(avro_long_bytes(0));
1561        // row 2: one item 3
1562        expected.extend(avro_long_bytes(1));
1563        expected.extend(avro_long_bytes(3));
1564        expected.extend(avro_long_bytes(0));
1565
1566        let plan = FieldPlan::List {
1567            items_nullability: None,
1568            item_plan: Box::new(FieldPlan::Scalar),
1569        };
1570        let got = encode_all(&list, &plan, None);
1571        assert_bytes_eq(&got, &expected);
1572    }
1573
1574    #[test]
1575    fn struct_encoder_two_fields() {
1576        // Struct { a: Int32, b: Utf8 }
1577        let a = Int32Array::from(vec![1, 2]);
1578        let b = StringArray::from(vec!["x", "y"]);
1579        let fields = Fields::from(vec![
1580            Field::new("a", DataType::Int32, true),
1581            Field::new("b", DataType::Utf8, true),
1582        ]);
1583        let struct_arr = StructArray::new(
1584            fields.clone(),
1585            vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
1586            None,
1587        );
1588        let plan = FieldPlan::Struct {
1589            encoders: vec![
1590                FieldBinding {
1591                    arrow_index: 0,
1592                    nullability: None,
1593                    plan: FieldPlan::Scalar,
1594                },
1595                FieldBinding {
1596                    arrow_index: 1,
1597                    nullability: None,
1598                    plan: FieldPlan::Scalar,
1599                },
1600            ],
1601        };
1602        let got = encode_all(&struct_arr, &plan, None);
1603        // Expected: rows concatenated: a then b
1604        let mut expected = Vec::new();
1605        expected.extend(avro_long_bytes(1)); // a=1
1606        expected.extend(avro_len_prefixed_bytes(b"x")); // b="x"
1607        expected.extend(avro_long_bytes(2)); // a=2
1608        expected.extend(avro_len_prefixed_bytes(b"y")); // b="y"
1609        assert_bytes_eq(&got, &expected);
1610    }
1611
1612    #[test]
1613    fn enum_encoder_dictionary() {
1614        // symbols: ["A","B","C"], keys [2,0,1]
1615        let dict_values = StringArray::from(vec!["A", "B", "C"]);
1616        let keys = Int32Array::from(vec![2, 0, 1]);
1617        let dict =
1618            DictionaryArray::<Int32Type>::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap();
1619        let symbols = Arc::<[String]>::from(
1620            vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(),
1621        );
1622        let plan = FieldPlan::Enum { symbols };
1623        let got = encode_all(&dict, &plan, None);
1624        let mut expected = Vec::new();
1625        expected.extend(avro_long_bytes(2));
1626        expected.extend(avro_long_bytes(0));
1627        expected.extend(avro_long_bytes(1));
1628        assert_bytes_eq(&got, &expected);
1629    }
1630
1631    #[test]
1632    fn decimal_bytes_and_fixed() {
1633        // Use Decimal128 with small positives and negatives
1634        let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
1635            .with_precision_and_scale(20, 0)
1636            .unwrap();
1637        // bytes(decimal): minimal two's complement length-prefixed
1638        let plan_bytes = FieldPlan::Decimal { size: None };
1639        let got_bytes = encode_all(&dec, &plan_bytes, None);
1640        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
1641        let mut expected_bytes = Vec::new();
1642        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
1643        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
1644        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
1645        assert_bytes_eq(&got_bytes, &expected_bytes);
1646
1647        let plan_fixed = FieldPlan::Decimal { size: Some(16) };
1648        let got_fixed = encode_all(&dec, &plan_fixed, None);
1649        let mut expected_fixed = Vec::new();
1650        expected_fixed.extend_from_slice(&1i128.to_be_bytes());
1651        expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
1652        expected_fixed.extend_from_slice(&0i128.to_be_bytes());
1653        assert_bytes_eq(&got_fixed, &expected_fixed);
1654    }
1655
1656    #[test]
1657    fn decimal_bytes_256() {
1658        use arrow_buffer::i256;
1659        // Use Decimal256 with small positives and negatives
1660        let dec = Decimal256Array::from(vec![
1661            i256::from_i128(1),
1662            i256::from_i128(-1),
1663            i256::from_i128(0),
1664        ])
1665        .with_precision_and_scale(76, 0)
1666        .unwrap();
1667        // bytes(decimal): minimal two's complement length-prefixed
1668        let plan_bytes = FieldPlan::Decimal { size: None };
1669        let got_bytes = encode_all(&dec, &plan_bytes, None);
1670        // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
1671        let mut expected_bytes = Vec::new();
1672        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
1673        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
1674        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
1675        assert_bytes_eq(&got_bytes, &expected_bytes);
1676
1677        // fixed(32): 32-byte big-endian two's complement
1678        let plan_fixed = FieldPlan::Decimal { size: Some(32) };
1679        let got_fixed = encode_all(&dec, &plan_fixed, None);
1680        let mut expected_fixed = Vec::new();
1681        expected_fixed.extend_from_slice(&i256::from_i128(1).to_be_bytes());
1682        expected_fixed.extend_from_slice(&i256::from_i128(-1).to_be_bytes());
1683        expected_fixed.extend_from_slice(&i256::from_i128(0).to_be_bytes());
1684        assert_bytes_eq(&got_fixed, &expected_fixed);
1685    }
1686
1687    #[cfg(feature = "small_decimals")]
1688    #[test]
1689    fn decimal_bytes_and_fixed_32() {
1690        // Use Decimal32 with small positives and negatives
1691        let dec = Decimal32Array::from(vec![1i32, -1i32, 0i32])
1692            .with_precision_and_scale(9, 0)
1693            .unwrap();
1694        // bytes(decimal)
1695        let plan_bytes = FieldPlan::Decimal { size: None };
1696        let got_bytes = encode_all(&dec, &plan_bytes, None);
1697        let mut expected_bytes = Vec::new();
1698        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
1699        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
1700        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
1701        assert_bytes_eq(&got_bytes, &expected_bytes);
1702        // fixed(4)
1703        let plan_fixed = FieldPlan::Decimal { size: Some(4) };
1704        let got_fixed = encode_all(&dec, &plan_fixed, None);
1705        let mut expected_fixed = Vec::new();
1706        expected_fixed.extend_from_slice(&1i32.to_be_bytes());
1707        expected_fixed.extend_from_slice(&(-1i32).to_be_bytes());
1708        expected_fixed.extend_from_slice(&0i32.to_be_bytes());
1709        assert_bytes_eq(&got_fixed, &expected_fixed);
1710    }
1711
1712    #[cfg(feature = "small_decimals")]
1713    #[test]
1714    fn decimal_bytes_and_fixed_64() {
1715        // Use Decimal64 with small positives and negatives
1716        let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64])
1717            .with_precision_and_scale(18, 0)
1718            .unwrap();
1719        // bytes(decimal)
1720        let plan_bytes = FieldPlan::Decimal { size: None };
1721        let got_bytes = encode_all(&dec, &plan_bytes, None);
1722        let mut expected_bytes = Vec::new();
1723        expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
1724        expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
1725        expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
1726        assert_bytes_eq(&got_bytes, &expected_bytes);
1727        // fixed(8)
1728        let plan_fixed = FieldPlan::Decimal { size: Some(8) };
1729        let got_fixed = encode_all(&dec, &plan_fixed, None);
1730        let mut expected_fixed = Vec::new();
1731        expected_fixed.extend_from_slice(&1i64.to_be_bytes());
1732        expected_fixed.extend_from_slice(&(-1i64).to_be_bytes());
1733        expected_fixed.extend_from_slice(&0i64.to_be_bytes());
1734        assert_bytes_eq(&got_fixed, &expected_fixed);
1735    }
1736
1737    #[test]
1738    fn float32_and_float64_encoders() {
1739        let f32a = Float32Array::from(vec![0.0f32, -1.5f32, f32::from_bits(0x7fc00000)]); // includes a quiet NaN bit pattern
1740        let f64a = Float64Array::from(vec![0.0f64, -2.25f64]);
1741        // f32 expected
1742        let mut expected32 = Vec::new();
1743        for v in [0.0f32, -1.5f32, f32::from_bits(0x7fc00000)] {
1744            expected32.extend_from_slice(&v.to_bits().to_le_bytes());
1745        }
1746        let got32 = encode_all(&f32a, &FieldPlan::Scalar, None);
1747        assert_bytes_eq(&got32, &expected32);
1748        // f64 expected
1749        let mut expected64 = Vec::new();
1750        for v in [0.0f64, -2.25f64] {
1751            expected64.extend_from_slice(&v.to_bits().to_le_bytes());
1752        }
1753        let got64 = encode_all(&f64a, &FieldPlan::Scalar, None);
1754        assert_bytes_eq(&got64, &expected64);
1755    }
1756
1757    #[test]
1758    fn long_encoder_int64() {
1759        let arr = Int64Array::from(vec![0i64, 1i64, -1i64, 2i64, -2i64, i64::MIN + 1]);
1760        let mut expected = Vec::new();
1761        for v in [0, 1, -1, 2, -2, i64::MIN + 1] {
1762            expected.extend(avro_long_bytes(v));
1763        }
1764        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1765        assert_bytes_eq(&got, &expected);
1766    }
1767
1768    #[test]
1769    fn fixed_encoder_plain() {
1770        // Two values of width 4
1771        let data = [[0xDE, 0xAD, 0xBE, 0xEF], [0x00, 0x01, 0x02, 0x03]];
1772        let values: Vec<Vec<u8>> = data.iter().map(|x| x.to_vec()).collect();
1773        let arr = FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap();
1774        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1775        let mut expected = Vec::new();
1776        expected.extend_from_slice(&data[0]);
1777        expected.extend_from_slice(&data[1]);
1778        assert_bytes_eq(&got, &expected);
1779    }
1780
1781    #[test]
1782    fn uuid_encoder_test() {
1783        // Happy path
1784        let u = Uuid::parse_str("00112233-4455-6677-8899-aabbccddeeff").unwrap();
1785        let bytes = *u.as_bytes();
1786        let arr_ok = FixedSizeBinaryArray::try_from_iter(vec![bytes.to_vec()].into_iter()).unwrap();
1787        // Expected: length 36 (0x48) followed by hyphenated lowercase text
1788        let mut expected = Vec::new();
1789        expected.push(0x48);
1790        expected.extend_from_slice(u.hyphenated().to_string().as_bytes());
1791        let got = encode_all(&arr_ok, &FieldPlan::Uuid, None);
1792        assert_bytes_eq(&got, &expected);
1793    }
1794
1795    #[test]
1796    fn uuid_encoder_error() {
1797        // Invalid UUID bytes: wrong length
1798        let arr =
1799            FixedSizeBinaryArray::try_new(10, arrow_buffer::Buffer::from(vec![0u8; 10]), None)
1800                .unwrap();
1801        let plan = FieldPlan::Uuid;
1802
1803        let field = Field::new("f", arr.data_type().clone(), true);
1804        let mut enc = FieldEncoder::make_encoder(&arr, &field, &plan, None).unwrap();
1805        let mut out = Vec::new();
1806        let err = enc.encode(&mut out, 0).unwrap_err();
1807        match err {
1808            ArrowError::InvalidArgumentError(msg) => {
1809                assert!(msg.contains("Invalid UUID bytes"))
1810            }
1811            other => panic!("expected InvalidArgumentError, got {other:?}"),
1812        }
1813    }
1814
1815    #[test]
1816    fn map_encoder_string_keys_int_values() {
1817        // Build MapArray with two rows
1818        // Row0: {"k1":1, "k2":2}
1819        // Row1: {}
1820        let keys = StringArray::from(vec!["k1", "k2"]);
1821        let values = Int32Array::from(vec![1, 2]);
1822        let entries_fields = Fields::from(vec![
1823            Field::new("key", DataType::Utf8, false),
1824            Field::new("value", DataType::Int32, true),
1825        ]);
1826        let entries = StructArray::new(
1827            entries_fields,
1828            vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
1829            None,
1830        );
1831        let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into());
1832        let map = MapArray::new(
1833            Field::new("entries", entries.data_type().clone(), false).into(),
1834            offsets,
1835            entries,
1836            None,
1837            false,
1838        );
1839        let plan = FieldPlan::Map {
1840            values_nullability: None,
1841            value_plan: Box::new(FieldPlan::Scalar),
1842        };
1843        let got = encode_all(&map, &plan, None);
1844        let mut expected = Vec::new();
1845        // Row0: block 2 then pairs
1846        expected.extend(avro_long_bytes(2));
1847        expected.extend(avro_len_prefixed_bytes(b"k1"));
1848        expected.extend(avro_long_bytes(1));
1849        expected.extend(avro_len_prefixed_bytes(b"k2"));
1850        expected.extend(avro_long_bytes(2));
1851        expected.extend(avro_long_bytes(0));
1852        // Row1: empty
1853        expected.extend(avro_long_bytes(0));
1854        assert_bytes_eq(&got, &expected);
1855    }
1856
1857    #[test]
1858    fn list64_encoder_int32() {
1859        // LargeList [[1,2,3], []]
1860        let values = Int32Array::from(vec![1, 2, 3]);
1861        let offsets: Vec<i64> = vec![0, 3, 3];
1862        let list = LargeListArray::new(
1863            Field::new("item", DataType::Int32, true).into(),
1864            arrow_buffer::OffsetBuffer::new(offsets.into()),
1865            Arc::new(values) as ArrayRef,
1866            None,
1867        );
1868        let plan = FieldPlan::List {
1869            items_nullability: None,
1870            item_plan: Box::new(FieldPlan::Scalar),
1871        };
1872        let got = encode_all(&list, &plan, None);
1873        // Expected one block of 3 and then 0, then empty 0
1874        let mut expected = Vec::new();
1875        expected.extend(avro_long_bytes(3));
1876        expected.extend(avro_long_bytes(1));
1877        expected.extend(avro_long_bytes(2));
1878        expected.extend(avro_long_bytes(3));
1879        expected.extend(avro_long_bytes(0));
1880        expected.extend(avro_long_bytes(0));
1881        assert_bytes_eq(&got, &expected);
1882    }
1883
1884    #[test]
1885    fn int_encoder_test() {
1886        let ints = Int32Array::from(vec![0, -1, 2]);
1887        let mut expected_i = Vec::new();
1888        for v in [0i32, -1, 2] {
1889            expected_i.extend(avro_long_bytes(v as i64));
1890        }
1891        let got_i = encode_all(&ints, &FieldPlan::Scalar, None);
1892        assert_bytes_eq(&got_i, &expected_i);
1893    }
1894
1895    #[test]
1896    fn boolean_encoder_test() {
1897        let bools = BooleanArray::from(vec![true, false]);
1898        let mut expected_b = Vec::new();
1899        expected_b.extend_from_slice(&[1]);
1900        expected_b.extend_from_slice(&[0]);
1901        let got_b = encode_all(&bools, &FieldPlan::Scalar, None);
1902        assert_bytes_eq(&got_b, &expected_b);
1903    }
1904
1905    #[test]
1906    #[cfg(feature = "avro_custom_types")]
1907    fn duration_encoding_seconds() {
1908        let arr: PrimitiveArray<DurationSecondType> = vec![0i64, -1, 2].into();
1909        let mut expected = Vec::new();
1910        for v in [0i64, -1, 2] {
1911            expected.extend_from_slice(&avro_long_bytes(v));
1912        }
1913        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1914        assert_bytes_eq(&got, &expected);
1915    }
1916
1917    #[test]
1918    #[cfg(feature = "avro_custom_types")]
1919    fn duration_encoding_milliseconds() {
1920        let arr: PrimitiveArray<DurationMillisecondType> = vec![1i64, 0, -2].into();
1921        let mut expected = Vec::new();
1922        for v in [1i64, 0, -2] {
1923            expected.extend_from_slice(&avro_long_bytes(v));
1924        }
1925        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1926        assert_bytes_eq(&got, &expected);
1927    }
1928
1929    #[test]
1930    #[cfg(feature = "avro_custom_types")]
1931    fn duration_encoding_microseconds() {
1932        let arr: PrimitiveArray<DurationMicrosecondType> = vec![5i64, -6, 7].into();
1933        let mut expected = Vec::new();
1934        for v in [5i64, -6, 7] {
1935            expected.extend_from_slice(&avro_long_bytes(v));
1936        }
1937        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1938        assert_bytes_eq(&got, &expected);
1939    }
1940
1941    #[test]
1942    #[cfg(feature = "avro_custom_types")]
1943    fn duration_encoding_nanoseconds() {
1944        let arr: PrimitiveArray<DurationNanosecondType> = vec![8i64, 9, -10].into();
1945        let mut expected = Vec::new();
1946        for v in [8i64, 9, -10] {
1947            expected.extend_from_slice(&avro_long_bytes(v));
1948        }
1949        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1950        assert_bytes_eq(&got, &expected);
1951    }
1952
1953    #[test]
1954    fn duration_encoder_year_month_happy_path() {
1955        let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32, 25i32].into();
1956        let mut expected = Vec::new();
1957        for m in [0u32, 1u32, 25u32] {
1958            expected.extend_from_slice(&duration_fixed12(m, 0, 0));
1959        }
1960        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1961        assert_bytes_eq(&got, &expected);
1962    }
1963
1964    #[test]
1965    fn duration_encoder_year_month_rejects_negative() {
1966        let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
1967        let field = Field::new("f", DataType::Interval(IntervalUnit::YearMonth), true);
1968        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
1969        let mut out = Vec::new();
1970        let err = enc.encode(&mut out, 0).unwrap_err();
1971        match err {
1972            ArrowError::InvalidArgumentError(msg) => {
1973                assert!(msg.contains("cannot encode negative months"))
1974            }
1975            other => panic!("expected InvalidArgumentError, got {other:?}"),
1976        }
1977    }
1978
1979    #[test]
1980    fn duration_encoder_day_time_happy_path() {
1981        let v0 = IntervalDayTimeType::make_value(2, 500); // days=2, millis=500
1982        let v1 = IntervalDayTimeType::make_value(0, 0);
1983        let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
1984        let mut expected = Vec::new();
1985        expected.extend_from_slice(&duration_fixed12(0, 2, 500));
1986        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
1987        let got = encode_all(&arr, &FieldPlan::Scalar, None);
1988        assert_bytes_eq(&got, &expected);
1989    }
1990
1991    #[test]
1992    fn duration_encoder_day_time_rejects_negative() {
1993        let bad = IntervalDayTimeType::make_value(-1, 0);
1994        let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
1995        let field = Field::new("f", DataType::Interval(IntervalUnit::DayTime), true);
1996        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
1997        let mut out = Vec::new();
1998        let err = enc.encode(&mut out, 0).unwrap_err();
1999        match err {
2000            ArrowError::InvalidArgumentError(msg) => {
2001                assert!(msg.contains("cannot encode negative days"))
2002            }
2003            other => panic!("expected InvalidArgumentError, got {other:?}"),
2004        }
2005    }
2006
2007    #[test]
2008    fn duration_encoder_month_day_nano_happy_path() {
2009        let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); // -> millis = 3
2010        let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
2011        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0, v1].into();
2012        let mut expected = Vec::new();
2013        expected.extend_from_slice(&duration_fixed12(1, 2, 3));
2014        expected.extend_from_slice(&duration_fixed12(0, 0, 0));
2015        let got = encode_all(&arr, &FieldPlan::Scalar, None);
2016        assert_bytes_eq(&got, &expected);
2017    }
2018
2019    #[test]
2020    fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
2021        let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
2022        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
2023        let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true);
2024        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2025        let mut out = Vec::new();
2026        let err = enc.encode(&mut out, 0).unwrap_err();
2027        match err {
2028            ArrowError::InvalidArgumentError(msg) => {
2029                assert!(msg.contains("requires whole milliseconds") || msg.contains("divisible"))
2030            }
2031            other => panic!("expected InvalidArgumentError, got {other:?}"),
2032        }
2033    }
2034
2035    #[test]
2036    fn minimal_twos_complement_test() {
2037        let pos = [0x00, 0x00, 0x01];
2038        assert_eq!(minimal_twos_complement(&pos), &pos[2..]);
2039        let neg = [0xFF, 0xFF, 0x80]; // negative minimal is 0x80
2040        assert_eq!(minimal_twos_complement(&neg), &neg[2..]);
2041        let zero = [0x00, 0x00, 0x00];
2042        assert_eq!(minimal_twos_complement(&zero), &zero[2..]);
2043    }
2044
2045    #[test]
2046    fn write_sign_extend_test() {
2047        let mut out = Vec::new();
2048        write_sign_extended(&mut out, &[0x01], 4).unwrap();
2049        assert_eq!(out, vec![0x00, 0x00, 0x00, 0x01]);
2050        out.clear();
2051        write_sign_extended(&mut out, &[0xFF], 4).unwrap();
2052        assert_eq!(out, vec![0xFF, 0xFF, 0xFF, 0xFF]);
2053        out.clear();
2054        // truncation success (sign bytes only removed)
2055        write_sign_extended(&mut out, &[0xFF, 0xFF, 0x80], 2).unwrap();
2056        assert_eq!(out, vec![0xFF, 0x80]);
2057        out.clear();
2058        // truncation overflow
2059        let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
2060        match err {
2061            ArrowError::InvalidArgumentError(_) => {}
2062            _ => panic!("expected InvalidArgumentError"),
2063        }
2064    }
2065
2066    #[test]
2067    fn duration_month_day_nano_overflow_millis() {
2068        // nanos leading to millis > u32::MAX
2069        let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
2070        let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
2071        let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
2072        let field = Field::new("f", DataType::Interval(IntervalUnit::MonthDayNano), true);
2073        let mut enc = FieldEncoder::make_encoder(&arr, &field, &FieldPlan::Scalar, None).unwrap();
2074        let mut out = Vec::new();
2075        let err = enc.encode(&mut out, 0).unwrap_err();
2076        match err {
2077            ArrowError::InvalidArgumentError(msg) => assert!(msg.contains("exceed u32::MAX")),
2078            _ => panic!("expected InvalidArgumentError"),
2079        }
2080    }
2081
2082    #[test]
2083    fn fieldplan_decimal_precision_scale_mismatch_errors() {
2084        // Avro expects (10,2), Arrow has (12,2)
2085        use crate::codec::Codec;
2086        use std::collections::HashMap;
2087        let arrow_field = Field::new("d", DataType::Decimal128(12, 2), true);
2088        let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2089        let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
2090        match err {
2091            ArrowError::SchemaError(msg) => {
2092                assert!(msg.contains("Decimal precision/scale mismatch"))
2093            }
2094            _ => panic!("expected SchemaError"),
2095        }
2096    }
2097}