Skip to main content

arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Decoder for Arrow types.
19
20use crate::codec::{
21    AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
22    ResolvedUnion,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36    DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37    Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use std::cmp::Ordering;
42use std::sync::Arc;
43use strum_macros::AsRefStr;
44use uuid::Uuid;
45
46const DEFAULT_CAPACITY: usize = 1024;
47
48/// Runtime plan for decoding reader-side `["null", T]` types.
49#[derive(Clone, Copy, Debug)]
50enum NullablePlan {
51    /// Writer actually wrote a union (branch tag present).
52    ReadTag,
53    /// Writer wrote a single (non-union) value resolved to the non-null branch
54    /// of the reader union; do NOT read a branch tag, but apply any promotion.
55    FromSingle { promotion: Promotion },
56}
57
58/// Macro to decode a decimal payload for a given width and integer type.
59macro_rules! decode_decimal {
60    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
61        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
62        $builder.append_value(<$Int>::from_be_bytes(bytes));
63    }};
64}
65
66/// Macro to finish a decimal builder into an array with precision/scale and nulls.
67macro_rules! flush_decimal {
68    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
69        let (_, vals, _) = $builder.finish().into_parts();
70        let dec = <$ArrayTy>::try_new(vals, $nulls)?
71            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
72        Arc::new(dec) as ArrayRef
73    }};
74}
75
76/// Macro to append a default decimal value from two's-complement big-endian bytes
77/// into the corresponding decimal builder, with compile-time constructed error text.
78macro_rules! append_decimal_default {
79    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
80        match $lit {
81            AvroLiteral::Bytes(b) => {
82                let ext = sign_cast_to::<$N>(b)?;
83                let val = <$Int>::from_be_bytes(ext);
84                $builder.append_value(val);
85                Ok(())
86            }
87            _ => Err(AvroError::InvalidArgument(
88                concat!(
89                    "Default for ",
90                    $name,
91                    " must be bytes (two's-complement big-endian)"
92                )
93                .to_string(),
94            )),
95        }
96    }};
97}
98
99/// Decodes avro encoded data into [`RecordBatch`]
100#[derive(Debug)]
101pub(crate) struct RecordDecoder {
102    schema: SchemaRef,
103    fields: Vec<Decoder>,
104    projector: Option<Projector>,
105}
106
107impl RecordDecoder {
108    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
109    ///
110    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
111    ///
112    /// # Arguments
113    /// * `data_type` - The Avro data type to decode.
114    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
115    ///
116    /// # Errors
117    /// This function will return an error if the provided `data_type` is not a `Record`.
118    pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
119        match data_type.codec() {
120            Codec::Struct(reader_fields) => {
121                // Build Arrow schema fields and per-child decoders
122                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
123                let mut encodings = Vec::with_capacity(reader_fields.len());
124                for avro_field in reader_fields.iter() {
125                    arrow_fields.push(avro_field.field());
126                    encodings.push(Decoder::try_new(avro_field.data_type())?);
127                }
128                let projector = match data_type.resolution.as_ref() {
129                    Some(ResolutionInfo::Record(rec)) => {
130                        Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
131                    }
132                    _ => None,
133                };
134                Ok(Self {
135                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
136                    fields: encodings,
137                    projector,
138                })
139            }
140            other => Err(AvroError::ParseError(format!(
141                "Expected record got {other:?}"
142            ))),
143        }
144    }
145
146    /// Returns the decoder's `SchemaRef`
147    pub(crate) fn schema(&self) -> &SchemaRef {
148        &self.schema
149    }
150
151    /// Decode `count` records from `buf`
152    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
153        let mut cursor = AvroCursor::new(buf);
154        match self.projector.as_mut() {
155            Some(proj) => {
156                for _ in 0..count {
157                    proj.project_record(&mut cursor, &mut self.fields)?;
158                }
159            }
160            None => {
161                for _ in 0..count {
162                    for field in &mut self.fields {
163                        field.decode(&mut cursor)?;
164                    }
165                }
166            }
167        }
168        Ok(cursor.position())
169    }
170
171    /// Flush the decoded records into a [`RecordBatch`]
172    pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
173        let arrays = self
174            .fields
175            .iter_mut()
176            .map(|x| x.flush(None))
177            .collect::<Result<Vec<_>, _>>()?;
178        RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
179    }
180}
181
182#[derive(Debug)]
183struct EnumResolution {
184    mapping: Arc<[i32]>,
185    default_index: i32,
186}
187
188#[derive(Debug, AsRefStr)]
189enum Decoder {
190    Null(usize),
191    Boolean(BooleanBufferBuilder),
192    Int32(Vec<i32>),
193    Int64(Vec<i64>),
194    #[cfg(feature = "avro_custom_types")]
195    DurationSecond(Vec<i64>),
196    #[cfg(feature = "avro_custom_types")]
197    DurationMillisecond(Vec<i64>),
198    #[cfg(feature = "avro_custom_types")]
199    DurationMicrosecond(Vec<i64>),
200    #[cfg(feature = "avro_custom_types")]
201    DurationNanosecond(Vec<i64>),
202    #[cfg(feature = "avro_custom_types")]
203    Int8(Vec<i8>),
204    #[cfg(feature = "avro_custom_types")]
205    Int16(Vec<i16>),
206    #[cfg(feature = "avro_custom_types")]
207    UInt8(Vec<u8>),
208    #[cfg(feature = "avro_custom_types")]
209    UInt16(Vec<u16>),
210    #[cfg(feature = "avro_custom_types")]
211    UInt32(Vec<u32>),
212    #[cfg(feature = "avro_custom_types")]
213    UInt64(Vec<u64>),
214    #[cfg(feature = "avro_custom_types")]
215    Float16(Vec<u16>), // Stored as raw IEEE-754 f16 bits
216    #[cfg(feature = "avro_custom_types")]
217    Date64(Vec<i64>),
218    #[cfg(feature = "avro_custom_types")]
219    TimeNanos(Vec<i64>),
220    #[cfg(feature = "avro_custom_types")]
221    Time32Secs(Vec<i32>),
222    #[cfg(feature = "avro_custom_types")]
223    TimestampSecs(bool, Vec<i64>),
224    #[cfg(feature = "avro_custom_types")]
225    IntervalYearMonth(Vec<i32>),
226    #[cfg(feature = "avro_custom_types")]
227    IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
228    #[cfg(feature = "avro_custom_types")]
229    IntervalDayTime(Vec<IntervalDayTime>),
230    Float32(Vec<f32>),
231    Float64(Vec<f64>),
232    Date32(Vec<i32>),
233    TimeMillis(Vec<i32>),
234    TimeMicros(Vec<i64>),
235    TimestampMillis(bool, Vec<i64>),
236    TimestampMicros(bool, Vec<i64>),
237    TimestampNanos(bool, Vec<i64>),
238    Int32ToInt64(Vec<i64>),
239    Int32ToFloat32(Vec<f32>),
240    Int32ToFloat64(Vec<f64>),
241    Int64ToFloat32(Vec<f32>),
242    Int64ToFloat64(Vec<f64>),
243    Float32ToFloat64(Vec<f64>),
244    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
245    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
246    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
247    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
248    String(OffsetBufferBuilder<i32>, Vec<u8>),
249    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
250    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
251    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
252    Record(Fields, Vec<Decoder>, Option<Projector>),
253    Map(
254        FieldRef,
255        OffsetBufferBuilder<i32>,
256        OffsetBufferBuilder<i32>,
257        Vec<u8>,
258        Box<Decoder>,
259    ),
260    Fixed(i32, Vec<u8>),
261    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
262    Duration(IntervalMonthDayNanoBuilder),
263    Uuid(Vec<u8>),
264    #[cfg(feature = "small_decimals")]
265    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
266    #[cfg(feature = "small_decimals")]
267    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
268    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
269    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
270    #[cfg(feature = "avro_custom_types")]
271    RunEndEncoded(u8, usize, Box<Decoder>),
272    Union(UnionDecoder),
273    Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
274}
275
276impl Decoder {
277    fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
278        if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
279            if info.writer_is_union && !info.reader_is_union {
280                let mut clone = data_type.clone();
281                clone.resolution = None; // Build target base decoder without Union resolution
282                let target = Box::new(Self::try_new_internal(&clone)?);
283                let decoder = Self::Union(
284                    UnionDecoderBuilder::new()
285                        .with_resolved_union(info.clone())
286                        .with_target(target)
287                        .build()?,
288                );
289                return Ok(decoder);
290            }
291        }
292        Self::try_new_internal(data_type)
293    }
294
295    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
296        // Extract just the Promotion (if any) to simplify pattern matching
297        let promotion = match data_type.resolution.as_ref() {
298            Some(ResolutionInfo::Promotion(p)) => Some(p),
299            _ => None,
300        };
301        let decoder = match (data_type.codec(), promotion) {
302            (Codec::Int64, Some(Promotion::IntToLong)) => {
303                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
304            }
305            (Codec::Float32, Some(Promotion::IntToFloat)) => {
306                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
307            }
308            (Codec::Float64, Some(Promotion::IntToDouble)) => {
309                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
310            }
311            (Codec::Float32, Some(Promotion::LongToFloat)) => {
312                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
313            }
314            (Codec::Float64, Some(Promotion::LongToDouble)) => {
315                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
316            }
317            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
318                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
319            }
320            (Codec::Utf8, Some(Promotion::BytesToString))
321            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
322                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
323                Vec::with_capacity(DEFAULT_CAPACITY),
324            ),
325            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
326                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
327                Vec::with_capacity(DEFAULT_CAPACITY),
328            ),
329            (Codec::Null, _) => Self::Null(0),
330            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
331            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
332            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
333            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
334            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
335            (Codec::Binary, _) => Self::Binary(
336                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
337                Vec::with_capacity(DEFAULT_CAPACITY),
338            ),
339            (Codec::Utf8, _) => Self::String(
340                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
341                Vec::with_capacity(DEFAULT_CAPACITY),
342            ),
343            (Codec::Utf8View, _) => Self::StringView(
344                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
345                Vec::with_capacity(DEFAULT_CAPACITY),
346            ),
347            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
348            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
349            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
350            (Codec::TimestampMillis(is_utc), _) => {
351                Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
352            }
353            (Codec::TimestampMicros(is_utc), _) => {
354                Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
355            }
356            (Codec::TimestampNanos(is_utc), _) => {
357                Self::TimestampNanos(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
358            }
359            #[cfg(feature = "avro_custom_types")]
360            (Codec::DurationNanos, _) => {
361                Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
362            }
363            #[cfg(feature = "avro_custom_types")]
364            (Codec::DurationMicros, _) => {
365                Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
366            }
367            #[cfg(feature = "avro_custom_types")]
368            (Codec::DurationMillis, _) => {
369                Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
370            }
371            #[cfg(feature = "avro_custom_types")]
372            (Codec::DurationSeconds, _) => {
373                Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
374            }
375            #[cfg(feature = "avro_custom_types")]
376            (Codec::Int8, _) => Self::Int8(Vec::with_capacity(DEFAULT_CAPACITY)),
377            #[cfg(feature = "avro_custom_types")]
378            (Codec::Int16, _) => Self::Int16(Vec::with_capacity(DEFAULT_CAPACITY)),
379            #[cfg(feature = "avro_custom_types")]
380            (Codec::UInt8, _) => Self::UInt8(Vec::with_capacity(DEFAULT_CAPACITY)),
381            #[cfg(feature = "avro_custom_types")]
382            (Codec::UInt16, _) => Self::UInt16(Vec::with_capacity(DEFAULT_CAPACITY)),
383            #[cfg(feature = "avro_custom_types")]
384            (Codec::UInt32, _) => Self::UInt32(Vec::with_capacity(DEFAULT_CAPACITY)),
385            #[cfg(feature = "avro_custom_types")]
386            (Codec::UInt64, _) => Self::UInt64(Vec::with_capacity(DEFAULT_CAPACITY)),
387            #[cfg(feature = "avro_custom_types")]
388            (Codec::Float16, _) => Self::Float16(Vec::with_capacity(DEFAULT_CAPACITY)),
389            #[cfg(feature = "avro_custom_types")]
390            (Codec::Date64, _) => Self::Date64(Vec::with_capacity(DEFAULT_CAPACITY)),
391            #[cfg(feature = "avro_custom_types")]
392            (Codec::TimeNanos, _) => Self::TimeNanos(Vec::with_capacity(DEFAULT_CAPACITY)),
393            #[cfg(feature = "avro_custom_types")]
394            (Codec::Time32Secs, _) => Self::Time32Secs(Vec::with_capacity(DEFAULT_CAPACITY)),
395            #[cfg(feature = "avro_custom_types")]
396            (Codec::TimestampSecs(is_utc), _) => {
397                Self::TimestampSecs(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
398            }
399            #[cfg(feature = "avro_custom_types")]
400            (Codec::IntervalYearMonth, _) => {
401                Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
402            }
403            #[cfg(feature = "avro_custom_types")]
404            (Codec::IntervalMonthDayNano, _) => {
405                Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
406            }
407            #[cfg(feature = "avro_custom_types")]
408            (Codec::IntervalDayTime, _) => {
409                Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
410            }
411            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
412            (Codec::Decimal(precision, scale, size), _) => {
413                let p = *precision;
414                let s = *scale;
415                let prec = p as u8;
416                let scl = s.unwrap_or(0) as i8;
417                #[cfg(feature = "small_decimals")]
418                {
419                    if p <= DECIMAL32_MAX_PRECISION as usize {
420                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
421                            .with_precision_and_scale(prec, scl)?;
422                        Self::Decimal32(p, s, *size, builder)
423                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
424                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
425                            .with_precision_and_scale(prec, scl)?;
426                        Self::Decimal64(p, s, *size, builder)
427                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
428                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
429                            .with_precision_and_scale(prec, scl)?;
430                        Self::Decimal128(p, s, *size, builder)
431                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
432                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
433                            .with_precision_and_scale(prec, scl)?;
434                        Self::Decimal256(p, s, *size, builder)
435                    } else {
436                        return Err(AvroError::ParseError(format!(
437                            "Decimal precision {p} exceeds maximum supported"
438                        )));
439                    }
440                }
441                #[cfg(not(feature = "small_decimals"))]
442                {
443                    if p <= DECIMAL128_MAX_PRECISION as usize {
444                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
445                            .with_precision_and_scale(prec, scl)?;
446                        Self::Decimal128(p, s, *size, builder)
447                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
448                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
449                            .with_precision_and_scale(prec, scl)?;
450                        Self::Decimal256(p, s, *size, builder)
451                    } else {
452                        return Err(AvroError::ParseError(format!(
453                            "Decimal precision {p} exceeds maximum supported"
454                        )));
455                    }
456                }
457            }
458            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
459            (Codec::List(item), _) => {
460                let decoder = Self::try_new(item)?;
461                Self::Array(
462                    Arc::new(item.field_with_name("item")),
463                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
464                    Box::new(decoder),
465                )
466            }
467            (Codec::Enum(symbols), _) => {
468                let res = match data_type.resolution.as_ref() {
469                    Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
470                        mapping: mapping.mapping.clone(),
471                        default_index: mapping.default_index,
472                    }),
473                    _ => None,
474                };
475                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
476            }
477            (Codec::Struct(fields), _) => {
478                let mut arrow_fields = Vec::with_capacity(fields.len());
479                let mut encodings = Vec::with_capacity(fields.len());
480                for avro_field in fields.iter() {
481                    let encoding = Self::try_new(avro_field.data_type())?;
482                    arrow_fields.push(avro_field.field());
483                    encodings.push(encoding);
484                }
485                let projector =
486                    if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
487                        Some(ProjectorBuilder::try_new(rec, fields).build()?)
488                    } else {
489                        None
490                    };
491                Self::Record(arrow_fields.into(), encodings, projector)
492            }
493            (Codec::Map(child), _) => {
494                let val_field = child.field_with_name("value");
495                let map_field = Arc::new(ArrowField::new(
496                    "entries",
497                    DataType::Struct(Fields::from(vec![
498                        ArrowField::new("key", DataType::Utf8, false),
499                        val_field,
500                    ])),
501                    false,
502                ));
503                let val_dec = Self::try_new(child)?;
504                Self::Map(
505                    map_field,
506                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
507                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
508                    Vec::with_capacity(DEFAULT_CAPACITY),
509                    Box::new(val_dec),
510                )
511            }
512            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
513            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
514                let decoders = encodings
515                    .iter()
516                    .map(Self::try_new_internal)
517                    .collect::<Result<Vec<_>, _>>()?;
518                if fields.len() != decoders.len() {
519                    return Err(AvroError::SchemaError(format!(
520                        "Union has {} fields but {} decoders",
521                        fields.len(),
522                        decoders.len()
523                    )));
524                }
525                // Proactive guard: if a user provides a union with more branches than
526                // a 32-bit Avro index can address, fail fast with a clear message.
527                let branch_count = decoders.len();
528                let max_addr = (i32::MAX as usize) + 1;
529                if branch_count > max_addr {
530                    return Err(AvroError::SchemaError(format!(
531                        "Union has {branch_count} branches, which exceeds the maximum addressable \
532                         branches by an Avro int tag ({} + 1).",
533                        i32::MAX
534                    )));
535                }
536                let mut builder = UnionDecoderBuilder::new()
537                    .with_fields(fields.clone())
538                    .with_branches(decoders);
539                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
540                    if info.reader_is_union {
541                        builder = builder.with_resolved_union(info.clone());
542                    }
543                }
544                Self::Union(builder.build()?)
545            }
546            (Codec::Union(_, _, _), _) => {
547                return Err(AvroError::NYI(
548                    "Sparse Arrow unions are not yet supported".to_string(),
549                ));
550            }
551            #[cfg(feature = "avro_custom_types")]
552            (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
553                let inner = Self::try_new(values_dt)?;
554                let byte_width: u8 = match *width_bits_or_bytes {
555                    2 | 4 | 8 => *width_bits_or_bytes,
556                    16 => 2,
557                    32 => 4,
558                    64 => 8,
559                    other => {
560                        return Err(AvroError::InvalidArgument(format!(
561                            "Unsupported run-end width {other} for RunEndEncoded; \
562                             expected 16/32/64 bits or 2/4/8 bytes"
563                        )));
564                    }
565                };
566                Self::RunEndEncoded(byte_width, 0, Box::new(inner))
567            }
568        };
569        Ok(match data_type.nullability() {
570            Some(nullability) => {
571                // Default to reading a union branch tag unless the resolution proves otherwise.
572                let mut plan = NullablePlan::ReadTag;
573                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
574                    if !info.writer_is_union && info.reader_is_union {
575                        if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
576                            plan = NullablePlan::FromSingle { promotion: *promo };
577                        }
578                    }
579                }
580                Self::Nullable(
581                    nullability,
582                    NullBufferBuilder::new(DEFAULT_CAPACITY),
583                    Box::new(decoder),
584                    plan,
585                )
586            }
587            None => decoder,
588        })
589    }
590
591    /// Append a null record
592    fn append_null(&mut self) -> Result<(), AvroError> {
593        match self {
594            Self::Null(count) => *count += 1,
595            Self::Boolean(b) => b.append(false),
596            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
597            Self::Int64(v)
598            | Self::Int32ToInt64(v)
599            | Self::TimeMicros(v)
600            | Self::TimestampMillis(_, v)
601            | Self::TimestampMicros(_, v)
602            | Self::TimestampNanos(_, v) => v.push(0),
603            #[cfg(feature = "avro_custom_types")]
604            Self::DurationSecond(v)
605            | Self::DurationMillisecond(v)
606            | Self::DurationMicrosecond(v)
607            | Self::DurationNanosecond(v) => v.push(0),
608            #[cfg(feature = "avro_custom_types")]
609            Self::Int8(v) => v.push(0),
610            #[cfg(feature = "avro_custom_types")]
611            Self::Int16(v) => v.push(0),
612            #[cfg(feature = "avro_custom_types")]
613            Self::UInt8(v) => v.push(0),
614            #[cfg(feature = "avro_custom_types")]
615            Self::UInt16(v) => v.push(0),
616            #[cfg(feature = "avro_custom_types")]
617            Self::UInt32(v) => v.push(0),
618            #[cfg(feature = "avro_custom_types")]
619            Self::UInt64(v) => v.push(0),
620            #[cfg(feature = "avro_custom_types")]
621            Self::Float16(v) => v.push(0),
622            #[cfg(feature = "avro_custom_types")]
623            Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => v.push(0),
624            #[cfg(feature = "avro_custom_types")]
625            Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
626            #[cfg(feature = "avro_custom_types")]
627            Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
628            #[cfg(feature = "avro_custom_types")]
629            Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
630            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
631            Self::Float64(v)
632            | Self::Int32ToFloat64(v)
633            | Self::Int64ToFloat64(v)
634            | Self::Float32ToFloat64(v) => v.push(0.),
635            Self::Binary(offsets, _)
636            | Self::String(offsets, _)
637            | Self::StringView(offsets, _)
638            | Self::BytesToString(offsets, _)
639            | Self::StringToBytes(offsets, _) => {
640                offsets.push_length(0);
641            }
642            Self::Uuid(v) => {
643                v.extend([0; 16]);
644            }
645            Self::Array(_, offsets, _) => {
646                offsets.push_length(0);
647            }
648            Self::Record(_, e, _) => {
649                for encoding in e.iter_mut() {
650                    encoding.append_null()?;
651                }
652            }
653            Self::Map(_, _koff, moff, _, _) => {
654                moff.push_length(0);
655            }
656            Self::Fixed(sz, accum) => {
657                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
658            }
659            #[cfg(feature = "small_decimals")]
660            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
661            #[cfg(feature = "small_decimals")]
662            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
663            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
664            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
665            Self::Enum(indices, _, _) => indices.push(0),
666            Self::Duration(builder) => builder.append_null(),
667            #[cfg(feature = "avro_custom_types")]
668            Self::RunEndEncoded(_, len, inner) => {
669                *len += 1;
670                inner.append_null()?;
671            }
672            Self::Union(u) => u.append_null()?,
673            Self::Nullable(_, null_buffer, inner, _) => {
674                null_buffer.append(false);
675                inner.append_null()?;
676            }
677        }
678        Ok(())
679    }
680
681    /// Append a single default literal into the decoder's buffers
682    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
683        match self {
684            Self::Nullable(_, nb, inner, _) => {
685                if matches!(lit, AvroLiteral::Null) {
686                    nb.append(false);
687                    inner.append_null()
688                } else {
689                    nb.append(true);
690                    inner.append_default(lit)
691                }
692            }
693            Self::Null(count) => match lit {
694                AvroLiteral::Null => {
695                    *count += 1;
696                    Ok(())
697                }
698                _ => Err(AvroError::InvalidArgument(
699                    "Non-null default for null type".to_string(),
700                )),
701            },
702            Self::Boolean(b) => match lit {
703                AvroLiteral::Boolean(v) => {
704                    b.append(*v);
705                    Ok(())
706                }
707                _ => Err(AvroError::InvalidArgument(
708                    "Default for boolean must be boolean".to_string(),
709                )),
710            },
711            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
712                AvroLiteral::Int(i) => {
713                    v.push(*i);
714                    Ok(())
715                }
716                _ => Err(AvroError::InvalidArgument(
717                    "Default for int32/date32/time-millis must be int".to_string(),
718                )),
719            },
720            #[cfg(feature = "avro_custom_types")]
721            Self::DurationSecond(v)
722            | Self::DurationMillisecond(v)
723            | Self::DurationMicrosecond(v)
724            | Self::DurationNanosecond(v) => match lit {
725                AvroLiteral::Long(i) => {
726                    v.push(*i);
727                    Ok(())
728                }
729                _ => Err(AvroError::InvalidArgument(
730                    "Default for duration long must be long".to_string(),
731                )),
732            },
733            #[cfg(feature = "avro_custom_types")]
734            Self::Int8(v) => match lit {
735                AvroLiteral::Int(i) => {
736                    let x = i8::try_from(*i).map_err(|_| {
737                        AvroError::InvalidArgument(format!(
738                            "Default for int8 out of range for i8: {i}"
739                        ))
740                    })?;
741                    v.push(x);
742                    Ok(())
743                }
744                _ => Err(AvroError::InvalidArgument(
745                    "Default for int8 must be int".to_string(),
746                )),
747            },
748            #[cfg(feature = "avro_custom_types")]
749            Self::Int16(v) => match lit {
750                AvroLiteral::Int(i) => {
751                    let x = i16::try_from(*i).map_err(|_| {
752                        AvroError::InvalidArgument(format!(
753                            "Default for int16 out of range for i16: {i}"
754                        ))
755                    })?;
756                    v.push(x);
757                    Ok(())
758                }
759                _ => Err(AvroError::InvalidArgument(
760                    "Default for int16 must be int".to_string(),
761                )),
762            },
763            #[cfg(feature = "avro_custom_types")]
764            Self::UInt8(v) => match lit {
765                AvroLiteral::Int(i) => {
766                    let x = u8::try_from(*i).map_err(|_| {
767                        AvroError::InvalidArgument(format!(
768                            "Default for uint8 out of range for u8: {i}"
769                        ))
770                    })?;
771                    v.push(x);
772                    Ok(())
773                }
774                _ => Err(AvroError::InvalidArgument(
775                    "Default for uint8 must be int".to_string(),
776                )),
777            },
778            #[cfg(feature = "avro_custom_types")]
779            Self::UInt16(v) => match lit {
780                AvroLiteral::Int(i) => {
781                    let x = u16::try_from(*i).map_err(|_| {
782                        AvroError::InvalidArgument(format!(
783                            "Default for uint16 out of range for u16: {i}"
784                        ))
785                    })?;
786                    v.push(x);
787                    Ok(())
788                }
789                _ => Err(AvroError::InvalidArgument(
790                    "Default for uint16 must be int".to_string(),
791                )),
792            },
793            #[cfg(feature = "avro_custom_types")]
794            Self::UInt32(v) => match lit {
795                AvroLiteral::Long(i) => {
796                    let x = u32::try_from(*i).map_err(|_| {
797                        AvroError::InvalidArgument(format!(
798                            "Default for uint32 out of range for u32: {i}"
799                        ))
800                    })?;
801                    v.push(x);
802                    Ok(())
803                }
804                _ => Err(AvroError::InvalidArgument(
805                    "Default for uint32 must be long".to_string(),
806                )),
807            },
808            #[cfg(feature = "avro_custom_types")]
809            Self::UInt64(v) => match lit {
810                AvroLiteral::Bytes(b) => {
811                    if b.len() != 8 {
812                        return Err(AvroError::InvalidArgument(format!(
813                            "uint64 default must be exactly 8 bytes, got {}",
814                            b.len()
815                        )));
816                    }
817                    v.push(u64::from_le_bytes([
818                        b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
819                    ]));
820                    Ok(())
821                }
822                _ => Err(AvroError::InvalidArgument(
823                    "Default for uint64 must be bytes (8-byte LE)".to_string(),
824                )),
825            },
826            #[cfg(feature = "avro_custom_types")]
827            Self::Float16(v) => match lit {
828                AvroLiteral::Bytes(b) => {
829                    if b.len() != 2 {
830                        return Err(AvroError::InvalidArgument(format!(
831                            "float16 default must be exactly 2 bytes, got {}",
832                            b.len()
833                        )));
834                    }
835                    v.push(u16::from_le_bytes([b[0], b[1]]));
836                    Ok(())
837                }
838                _ => Err(AvroError::InvalidArgument(
839                    "Default for float16 must be bytes (2-byte LE IEEE-754)".to_string(),
840                )),
841            },
842            #[cfg(feature = "avro_custom_types")]
843            Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => match lit {
844                AvroLiteral::Long(i) => {
845                    v.push(*i);
846                    Ok(())
847                }
848                _ => Err(AvroError::InvalidArgument(
849                    "Default for date64/time-nanos/timestamp-secs must be long".to_string(),
850                )),
851            },
852            #[cfg(feature = "avro_custom_types")]
853            Self::Time32Secs(v) => match lit {
854                AvroLiteral::Int(i) => {
855                    v.push(*i);
856                    Ok(())
857                }
858                _ => Err(AvroError::InvalidArgument(
859                    "Default for time32-secs must be int".to_string(),
860                )),
861            },
862            #[cfg(feature = "avro_custom_types")]
863            Self::IntervalYearMonth(v) => match lit {
864                AvroLiteral::Bytes(b) => {
865                    if b.len() != 4 {
866                        return Err(AvroError::InvalidArgument(format!(
867                            "interval-year-month default must be exactly 4 bytes, got {}",
868                            b.len()
869                        )));
870                    }
871                    v.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
872                    Ok(())
873                }
874                _ => Err(AvroError::InvalidArgument(
875                    "Default for interval-year-month must be bytes (4-byte LE)".to_string(),
876                )),
877            },
878            #[cfg(feature = "avro_custom_types")]
879            Self::IntervalMonthDayNano(v) => match lit {
880                AvroLiteral::Bytes(b) => {
881                    if b.len() != 16 {
882                        return Err(AvroError::InvalidArgument(format!(
883                            "interval-month-day-nano default must be exactly 16 bytes, got {}",
884                            b.len()
885                        )));
886                    }
887                    let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
888                    let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
889                    let nanos =
890                        i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
891                    v.push(IntervalMonthDayNano::new(months, days, nanos));
892                    Ok(())
893                }
894                _ => Err(AvroError::InvalidArgument(
895                    "Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
896                )),
897            },
898            #[cfg(feature = "avro_custom_types")]
899            Self::IntervalDayTime(v) => match lit {
900                AvroLiteral::Bytes(b) => {
901                    if b.len() != 8 {
902                        return Err(AvroError::InvalidArgument(format!(
903                            "interval-day-time default must be exactly 8 bytes, got {}",
904                            b.len()
905                        )));
906                    }
907                    let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
908                    let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
909                    v.push(IntervalDayTime::new(days, milliseconds));
910                    Ok(())
911                }
912                _ => Err(AvroError::InvalidArgument(
913                    "Default for interval-day-time must be bytes (8-byte LE)".to_string(),
914                )),
915            },
916            Self::Int64(v)
917            | Self::Int32ToInt64(v)
918            | Self::TimeMicros(v)
919            | Self::TimestampMillis(_, v)
920            | Self::TimestampMicros(_, v)
921            | Self::TimestampNanos(_, v) => match lit {
922                AvroLiteral::Long(i) => {
923                    v.push(*i);
924                    Ok(())
925                }
926                AvroLiteral::Int(i) => {
927                    v.push(*i as i64);
928                    Ok(())
929                }
930                _ => Err(AvroError::InvalidArgument(
931                    "Default for long/time-micros/timestamp must be long or int".to_string(),
932                )),
933            },
934            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
935                AvroLiteral::Float(f) => {
936                    v.push(*f);
937                    Ok(())
938                }
939                _ => Err(AvroError::InvalidArgument(
940                    "Default for float must be float".to_string(),
941                )),
942            },
943            Self::Float64(v)
944            | Self::Int32ToFloat64(v)
945            | Self::Int64ToFloat64(v)
946            | Self::Float32ToFloat64(v) => match lit {
947                AvroLiteral::Double(f) => {
948                    v.push(*f);
949                    Ok(())
950                }
951                _ => Err(AvroError::InvalidArgument(
952                    "Default for double must be double".to_string(),
953                )),
954            },
955            Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
956                AvroLiteral::Bytes(b) => {
957                    offsets.push_length(b.len());
958                    values.extend_from_slice(b);
959                    Ok(())
960                }
961                _ => Err(AvroError::InvalidArgument(
962                    "Default for bytes must be bytes".to_string(),
963                )),
964            },
965            Self::BytesToString(offsets, values)
966            | Self::String(offsets, values)
967            | Self::StringView(offsets, values) => match lit {
968                AvroLiteral::String(s) => {
969                    let b = s.as_bytes();
970                    offsets.push_length(b.len());
971                    values.extend_from_slice(b);
972                    Ok(())
973                }
974                _ => Err(AvroError::InvalidArgument(
975                    "Default for string must be string".to_string(),
976                )),
977            },
978            Self::Uuid(values) => match lit {
979                AvroLiteral::String(s) => {
980                    let uuid = Uuid::try_parse(s).map_err(|e| {
981                        AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
982                    })?;
983                    values.extend_from_slice(uuid.as_bytes());
984                    Ok(())
985                }
986                _ => Err(AvroError::InvalidArgument(
987                    "Default for uuid must be string".to_string(),
988                )),
989            },
990            Self::Fixed(sz, accum) => match lit {
991                AvroLiteral::Bytes(b) => {
992                    if b.len() != *sz as usize {
993                        return Err(AvroError::InvalidArgument(format!(
994                            "Fixed default length {} does not match size {sz}",
995                            b.len(),
996                        )));
997                    }
998                    accum.extend_from_slice(b);
999                    Ok(())
1000                }
1001                _ => Err(AvroError::InvalidArgument(
1002                    "Default for fixed must be bytes".to_string(),
1003                )),
1004            },
1005            #[cfg(feature = "small_decimals")]
1006            Self::Decimal32(_, _, _, builder) => {
1007                append_decimal_default!(lit, builder, 4, i32, "decimal32")
1008            }
1009            #[cfg(feature = "small_decimals")]
1010            Self::Decimal64(_, _, _, builder) => {
1011                append_decimal_default!(lit, builder, 8, i64, "decimal64")
1012            }
1013            Self::Decimal128(_, _, _, builder) => {
1014                append_decimal_default!(lit, builder, 16, i128, "decimal128")
1015            }
1016            Self::Decimal256(_, _, _, builder) => {
1017                append_decimal_default!(lit, builder, 32, i256, "decimal256")
1018            }
1019            Self::Duration(builder) => match lit {
1020                AvroLiteral::Bytes(b) => {
1021                    if b.len() != 12 {
1022                        return Err(AvroError::InvalidArgument(format!(
1023                            "Duration default must be exactly 12 bytes, got {}",
1024                            b.len()
1025                        )));
1026                    }
1027                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1028                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1029                    let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1030                    let nanos = (millis as i64) * 1_000_000;
1031                    builder.append_value(IntervalMonthDayNano::new(
1032                        months as i32,
1033                        days as i32,
1034                        nanos,
1035                    ));
1036                    Ok(())
1037                }
1038                _ => Err(AvroError::InvalidArgument(
1039                    "Default for duration must be 12-byte little-endian months/days/millis"
1040                        .to_string(),
1041                )),
1042            },
1043            Self::Array(_, offsets, inner) => match lit {
1044                AvroLiteral::Array(items) => {
1045                    offsets.push_length(items.len());
1046                    for item in items {
1047                        inner.append_default(item)?;
1048                    }
1049                    Ok(())
1050                }
1051                _ => Err(AvroError::InvalidArgument(
1052                    "Default for array must be an array literal".to_string(),
1053                )),
1054            },
1055            Self::Map(_, koff, moff, kdata, valdec) => match lit {
1056                AvroLiteral::Map(entries) => {
1057                    moff.push_length(entries.len());
1058                    for (k, v) in entries {
1059                        let kb = k.as_bytes();
1060                        koff.push_length(kb.len());
1061                        kdata.extend_from_slice(kb);
1062                        valdec.append_default(v)?;
1063                    }
1064                    Ok(())
1065                }
1066                _ => Err(AvroError::InvalidArgument(
1067                    "Default for map must be a map/object literal".to_string(),
1068                )),
1069            },
1070            Self::Enum(indices, symbols, _) => match lit {
1071                AvroLiteral::Enum(sym) => {
1072                    let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
1073                        AvroError::InvalidArgument(format!(
1074                            "Enum default symbol {sym:?} not in reader symbols"
1075                        ))
1076                    })?;
1077                    indices.push(pos as i32);
1078                    Ok(())
1079                }
1080                _ => Err(AvroError::InvalidArgument(
1081                    "Default for enum must be a symbol".to_string(),
1082                )),
1083            },
1084            #[cfg(feature = "avro_custom_types")]
1085            Self::RunEndEncoded(_, len, inner) => {
1086                *len += 1;
1087                inner.append_default(lit)
1088            }
1089            Self::Union(u) => u.append_default(lit),
1090            Self::Record(field_meta, decoders, projector) => match lit {
1091                AvroLiteral::Map(entries) => {
1092                    for (i, dec) in decoders.iter_mut().enumerate() {
1093                        let name = field_meta[i].name();
1094                        if let Some(sub) = entries.get(name) {
1095                            dec.append_default(sub)?;
1096                        } else if let Some(proj) = projector.as_ref() {
1097                            proj.project_default(dec, i)?;
1098                        } else {
1099                            dec.append_null()?;
1100                        }
1101                    }
1102                    Ok(())
1103                }
1104                AvroLiteral::Null => {
1105                    for (i, dec) in decoders.iter_mut().enumerate() {
1106                        if let Some(proj) = projector.as_ref() {
1107                            proj.project_default(dec, i)?;
1108                        } else {
1109                            dec.append_null()?;
1110                        }
1111                    }
1112                    Ok(())
1113                }
1114                _ => Err(AvroError::InvalidArgument(
1115                    "Default for record must be a map/object or null".to_string(),
1116                )),
1117            },
1118        }
1119    }
1120
1121    /// Decode a single record from `buf`
1122    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1123        match self {
1124            Self::Null(x) => *x += 1,
1125            Self::Boolean(values) => values.append(buf.get_bool()?),
1126            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
1127                values.push(buf.get_int()?)
1128            }
1129            Self::Int64(values)
1130            | Self::TimeMicros(values)
1131            | Self::TimestampMillis(_, values)
1132            | Self::TimestampMicros(_, values)
1133            | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
1134            #[cfg(feature = "avro_custom_types")]
1135            Self::DurationSecond(values)
1136            | Self::DurationMillisecond(values)
1137            | Self::DurationMicrosecond(values)
1138            | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
1139            #[cfg(feature = "avro_custom_types")]
1140            Self::Int8(values) => {
1141                let raw = buf.get_int()?;
1142                let x = i8::try_from(raw).map_err(|_| {
1143                    AvroError::ParseError(format!("int8 value {raw} out of range for i8"))
1144                })?;
1145                values.push(x);
1146            }
1147            #[cfg(feature = "avro_custom_types")]
1148            Self::Int16(values) => {
1149                let raw = buf.get_int()?;
1150                let x = i16::try_from(raw).map_err(|_| {
1151                    AvroError::ParseError(format!("int16 value {raw} out of range for i16"))
1152                })?;
1153                values.push(x);
1154            }
1155            #[cfg(feature = "avro_custom_types")]
1156            Self::UInt8(values) => {
1157                let raw = buf.get_int()?;
1158                let x = u8::try_from(raw).map_err(|_| {
1159                    AvroError::ParseError(format!("uint8 value {raw} out of range for u8"))
1160                })?;
1161                values.push(x);
1162            }
1163            #[cfg(feature = "avro_custom_types")]
1164            Self::UInt16(values) => {
1165                let raw = buf.get_int()?;
1166                let x = u16::try_from(raw).map_err(|_| {
1167                    AvroError::ParseError(format!("uint16 value {raw} out of range for u16"))
1168                })?;
1169                values.push(x);
1170            }
1171            #[cfg(feature = "avro_custom_types")]
1172            Self::UInt32(values) => {
1173                let raw = buf.get_long()?;
1174                let x = u32::try_from(raw).map_err(|_| {
1175                    AvroError::ParseError(format!("uint32 value {raw} out of range for u32"))
1176                })?;
1177                values.push(x);
1178            }
1179            #[cfg(feature = "avro_custom_types")]
1180            Self::UInt64(values) => {
1181                let b = buf.get_fixed(8)?;
1182                values.push(u64::from_le_bytes([
1183                    b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
1184                ]));
1185            }
1186            #[cfg(feature = "avro_custom_types")]
1187            Self::Float16(values) => {
1188                let b = buf.get_fixed(2)?;
1189                values.push(u16::from_le_bytes([b[0], b[1]]));
1190            }
1191            #[cfg(feature = "avro_custom_types")]
1192            Self::Date64(values) | Self::TimeNanos(values) | Self::TimestampSecs(_, values) => {
1193                values.push(buf.get_long()?)
1194            }
1195            #[cfg(feature = "avro_custom_types")]
1196            Self::Time32Secs(values) => values.push(buf.get_int()?),
1197            #[cfg(feature = "avro_custom_types")]
1198            Self::IntervalYearMonth(values) => {
1199                let b = buf.get_fixed(4)?;
1200                values.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
1201            }
1202            #[cfg(feature = "avro_custom_types")]
1203            Self::IntervalMonthDayNano(values) => {
1204                let b = buf.get_fixed(16)?;
1205                let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1206                let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1207                let nanos =
1208                    i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
1209                values.push(IntervalMonthDayNano::new(months, days, nanos));
1210            }
1211            #[cfg(feature = "avro_custom_types")]
1212            Self::IntervalDayTime(values) => {
1213                let b = buf.get_fixed(8)?;
1214                // Read as two i32s: days (4 bytes) and milliseconds (4 bytes)
1215                let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1216                let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1217                values.push(IntervalDayTime::new(days, milliseconds));
1218            }
1219            Self::Float32(values) => values.push(buf.get_float()?),
1220            Self::Float64(values) => values.push(buf.get_double()?),
1221            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
1222            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
1223            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
1224            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
1225            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
1226            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
1227            Self::StringToBytes(offsets, values)
1228            | Self::BytesToString(offsets, values)
1229            | Self::Binary(offsets, values)
1230            | Self::String(offsets, values)
1231            | Self::StringView(offsets, values) => {
1232                let data = buf.get_bytes()?;
1233                offsets.push_length(data.len());
1234                values.extend_from_slice(data);
1235            }
1236            Self::Uuid(values) => {
1237                let s_bytes = buf.get_bytes()?;
1238                let s = std::str::from_utf8(s_bytes).map_err(|e| {
1239                    AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
1240                })?;
1241                let uuid = Uuid::try_parse(s)
1242                    .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
1243                values.extend_from_slice(uuid.as_bytes());
1244            }
1245            Self::Array(_, off, encoding) => {
1246                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
1247                off.push_length(total_items);
1248            }
1249            Self::Record(_, encodings, None) => {
1250                for encoding in encodings {
1251                    encoding.decode(buf)?;
1252                }
1253            }
1254            Self::Record(_, encodings, Some(proj)) => {
1255                proj.project_record(buf, encodings)?;
1256            }
1257            Self::Map(_, koff, moff, kdata, valdec) => {
1258                let newly_added = read_blocks(buf, |cur| {
1259                    let kb = cur.get_bytes()?;
1260                    koff.push_length(kb.len());
1261                    kdata.extend_from_slice(kb);
1262                    valdec.decode(cur)
1263                })?;
1264                moff.push_length(newly_added);
1265            }
1266            Self::Fixed(sz, accum) => {
1267                let fx = buf.get_fixed(*sz as usize)?;
1268                accum.extend_from_slice(fx);
1269            }
1270            #[cfg(feature = "small_decimals")]
1271            Self::Decimal32(_, _, size, builder) => {
1272                decode_decimal!(size, buf, builder, 4, i32);
1273            }
1274            #[cfg(feature = "small_decimals")]
1275            Self::Decimal64(_, _, size, builder) => {
1276                decode_decimal!(size, buf, builder, 8, i64);
1277            }
1278            Self::Decimal128(_, _, size, builder) => {
1279                decode_decimal!(size, buf, builder, 16, i128);
1280            }
1281            Self::Decimal256(_, _, size, builder) => {
1282                decode_decimal!(size, buf, builder, 32, i256);
1283            }
1284            Self::Enum(indices, _, None) => {
1285                indices.push(buf.get_int()?);
1286            }
1287            Self::Enum(indices, _, Some(res)) => {
1288                let raw = buf.get_int()?;
1289                let resolved = usize::try_from(raw)
1290                    .ok()
1291                    .and_then(|idx| res.mapping.get(idx).copied())
1292                    .filter(|&idx| idx >= 0)
1293                    .unwrap_or(res.default_index);
1294                if resolved >= 0 {
1295                    indices.push(resolved);
1296                } else {
1297                    return Err(AvroError::ParseError(format!(
1298                        "Enum symbol index {raw} not resolvable and no default provided",
1299                    )));
1300                }
1301            }
1302            Self::Duration(builder) => {
1303                let b = buf.get_fixed(12)?;
1304                let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1305                let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1306                let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1307                let nanos = (millis as i64) * 1_000_000;
1308                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
1309            }
1310            #[cfg(feature = "avro_custom_types")]
1311            Self::RunEndEncoded(_, len, inner) => {
1312                *len += 1;
1313                inner.decode(buf)?;
1314            }
1315            Self::Union(u) => u.decode(buf)?,
1316            Self::Nullable(order, nb, encoding, plan) => match *plan {
1317                NullablePlan::FromSingle { promotion } => {
1318                    encoding.decode_with_promotion(buf, promotion)?;
1319                    nb.append(true);
1320                }
1321                NullablePlan::ReadTag => {
1322                    let branch = buf.read_vlq()?;
1323                    let is_not_null = match *order {
1324                        Nullability::NullFirst => branch != 0,
1325                        Nullability::NullSecond => branch == 0,
1326                    };
1327                    if is_not_null {
1328                        // It is important to decode before appending to null buffer in case of decode error
1329                        encoding.decode(buf)?;
1330                    } else {
1331                        encoding.append_null()?;
1332                    }
1333                    nb.append(is_not_null);
1334                }
1335            },
1336        }
1337        Ok(())
1338    }
1339
1340    fn decode_with_promotion(
1341        &mut self,
1342        buf: &mut AvroCursor<'_>,
1343        promotion: Promotion,
1344    ) -> Result<(), AvroError> {
1345        #[cfg(feature = "avro_custom_types")]
1346        if let Self::RunEndEncoded(_, len, inner) = self {
1347            *len += 1;
1348            return inner.decode_with_promotion(buf, promotion);
1349        }
1350
1351        macro_rules! promote_numeric_to {
1352            ($variant:ident, $getter:ident, $to:ty) => {{
1353                match self {
1354                    Self::$variant(v) => {
1355                        let x = buf.$getter()?;
1356                        v.push(x as $to);
1357                        Ok(())
1358                    }
1359                    other => Err(AvroError::ParseError(format!(
1360                        "Promotion {promotion} target mismatch: expected {}, got {}",
1361                        stringify!($variant),
1362                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
1363                    ))),
1364                }
1365            }};
1366        }
1367        match promotion {
1368            Promotion::Direct => self.decode(buf),
1369            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1370            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1371            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1372            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1373            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1374            Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1375            Promotion::StringToBytes => match self {
1376                Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1377                    let data = buf.get_bytes()?;
1378                    offsets.push_length(data.len());
1379                    values.extend_from_slice(data);
1380                    Ok(())
1381                }
1382                other => Err(AvroError::ParseError(format!(
1383                    "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1384                    <Self as AsRef<str>>::as_ref(other)
1385                ))),
1386            },
1387            Promotion::BytesToString => match self {
1388                Self::String(offsets, values)
1389                | Self::StringView(offsets, values)
1390                | Self::BytesToString(offsets, values) => {
1391                    let data = buf.get_bytes()?;
1392                    offsets.push_length(data.len());
1393                    values.extend_from_slice(data);
1394                    Ok(())
1395                }
1396                other => Err(AvroError::ParseError(format!(
1397                    "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1398                    <Self as AsRef<str>>::as_ref(other)
1399                ))),
1400            },
1401        }
1402    }
1403
1404    /// Flush decoded records to an [`ArrayRef`]
1405    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1406        Ok(match self {
1407            Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1408            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1409            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1410            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1411            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1412            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1413            Self::TimeMillis(values) => {
1414                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1415            }
1416            Self::TimeMicros(values) => {
1417                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1418            }
1419            Self::TimestampMillis(is_utc, values) => Arc::new(
1420                flush_primitive::<TimestampMillisecondType>(values, nulls)
1421                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1422            ),
1423            Self::TimestampMicros(is_utc, values) => Arc::new(
1424                flush_primitive::<TimestampMicrosecondType>(values, nulls)
1425                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1426            ),
1427            Self::TimestampNanos(is_utc, values) => Arc::new(
1428                flush_primitive::<TimestampNanosecondType>(values, nulls)
1429                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1430            ),
1431            #[cfg(feature = "avro_custom_types")]
1432            Self::DurationSecond(values) => {
1433                Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1434            }
1435            #[cfg(feature = "avro_custom_types")]
1436            Self::DurationMillisecond(values) => {
1437                Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1438            }
1439            #[cfg(feature = "avro_custom_types")]
1440            Self::DurationMicrosecond(values) => {
1441                Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1442            }
1443            #[cfg(feature = "avro_custom_types")]
1444            Self::DurationNanosecond(values) => {
1445                Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1446            }
1447            #[cfg(feature = "avro_custom_types")]
1448            Self::Int8(values) => Arc::new(flush_primitive::<Int8Type>(values, nulls)),
1449            #[cfg(feature = "avro_custom_types")]
1450            Self::Int16(values) => Arc::new(flush_primitive::<Int16Type>(values, nulls)),
1451            #[cfg(feature = "avro_custom_types")]
1452            Self::UInt8(values) => Arc::new(flush_primitive::<UInt8Type>(values, nulls)),
1453            #[cfg(feature = "avro_custom_types")]
1454            Self::UInt16(values) => Arc::new(flush_primitive::<UInt16Type>(values, nulls)),
1455            #[cfg(feature = "avro_custom_types")]
1456            Self::UInt32(values) => Arc::new(flush_primitive::<UInt32Type>(values, nulls)),
1457            #[cfg(feature = "avro_custom_types")]
1458            Self::UInt64(values) => Arc::new(flush_primitive::<UInt64Type>(values, nulls)),
1459            #[cfg(feature = "avro_custom_types")]
1460            Self::Float16(values) => {
1461                // Convert Vec<u16> to Float16Array by reinterpreting the raw bytes.
1462                // This is safe because f16 and u16 have the same size and alignment.
1463                let len = values.len();
1464                let buf: Buffer = std::mem::take(values).into();
1465                let scalar_buf = ScalarBuffer::new(buf, 0, len);
1466                Arc::new(Float16Array::new(scalar_buf, nulls))
1467            }
1468            #[cfg(feature = "avro_custom_types")]
1469            Self::Date64(values) => Arc::new(flush_primitive::<Date64Type>(values, nulls)),
1470            #[cfg(feature = "avro_custom_types")]
1471            Self::TimeNanos(values) => {
1472                Arc::new(flush_primitive::<Time64NanosecondType>(values, nulls))
1473            }
1474            #[cfg(feature = "avro_custom_types")]
1475            Self::Time32Secs(values) => {
1476                Arc::new(flush_primitive::<Time32SecondType>(values, nulls))
1477            }
1478            #[cfg(feature = "avro_custom_types")]
1479            Self::TimestampSecs(is_utc, values) => Arc::new(
1480                flush_primitive::<TimestampSecondType>(values, nulls)
1481                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1482            ),
1483            #[cfg(feature = "avro_custom_types")]
1484            Self::IntervalYearMonth(values) => {
1485                Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
1486            }
1487            #[cfg(feature = "avro_custom_types")]
1488            Self::IntervalMonthDayNano(values) => {
1489                Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1490            }
1491            #[cfg(feature = "avro_custom_types")]
1492            Self::IntervalDayTime(values) => {
1493                Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
1494            }
1495            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1496            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1497            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1498            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1499                Arc::new(flush_primitive::<Float32Type>(values, nulls))
1500            }
1501            Self::Int32ToFloat64(values)
1502            | Self::Int64ToFloat64(values)
1503            | Self::Float32ToFloat64(values) => {
1504                Arc::new(flush_primitive::<Float64Type>(values, nulls))
1505            }
1506            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1507                let offsets = flush_offsets(offsets);
1508                let values = flush_values(values).into();
1509                Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1510            }
1511            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1512                let offsets = flush_offsets(offsets);
1513                let values = flush_values(values).into();
1514                Arc::new(StringArray::try_new(offsets, values, nulls)?)
1515            }
1516            Self::StringView(offsets, values) => {
1517                let offsets = flush_offsets(offsets);
1518                let values = flush_values(values);
1519                let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1520                let values: Vec<&str> = (0..array.len())
1521                    .map(|i| {
1522                        if array.is_valid(i) {
1523                            array.value(i)
1524                        } else {
1525                            ""
1526                        }
1527                    })
1528                    .collect();
1529                Arc::new(StringViewArray::from(values))
1530            }
1531            Self::Array(field, offsets, values) => {
1532                let values = values.flush(None)?;
1533                let offsets = flush_offsets(offsets);
1534                Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1535            }
1536            Self::Record(fields, encodings, _) => {
1537                let arrays = encodings
1538                    .iter_mut()
1539                    .map(|x| x.flush(None))
1540                    .collect::<Result<Vec<_>, _>>()?;
1541                Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1542            }
1543            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1544                let moff = flush_offsets(m_off);
1545                let koff = flush_offsets(k_off);
1546                let kd = flush_values(kdata).into();
1547                let val_arr = valdec.flush(None)?;
1548                let key_arr = StringArray::try_new(koff, kd, None)?;
1549                if key_arr.len() != val_arr.len() {
1550                    return Err(AvroError::InvalidArgument(format!(
1551                        "Map keys length ({}) != map values length ({})",
1552                        key_arr.len(),
1553                        val_arr.len()
1554                    )));
1555                }
1556                let final_len = moff.len() - 1;
1557                if let Some(n) = &nulls {
1558                    if n.len() != final_len {
1559                        return Err(AvroError::InvalidArgument(format!(
1560                            "Map array null buffer length {} != final map length {final_len}",
1561                            n.len()
1562                        )));
1563                    }
1564                }
1565                let entries_fields = match map_field.data_type() {
1566                    DataType::Struct(fields) => fields.clone(),
1567                    other => {
1568                        return Err(AvroError::InvalidArgument(format!(
1569                            "Map entries field must be a Struct, got {other:?}"
1570                        )));
1571                    }
1572                };
1573                let entries_struct =
1574                    StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1575                let map_arr =
1576                    MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1577                Arc::new(map_arr)
1578            }
1579            Self::Fixed(sz, accum) => {
1580                let b: Buffer = flush_values(accum).into();
1581                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1582                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1583                Arc::new(arr)
1584            }
1585            Self::Uuid(values) => {
1586                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1587                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1588                Arc::new(arr)
1589            }
1590            #[cfg(feature = "small_decimals")]
1591            Self::Decimal32(precision, scale, _, builder) => {
1592                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1593            }
1594            #[cfg(feature = "small_decimals")]
1595            Self::Decimal64(precision, scale, _, builder) => {
1596                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1597            }
1598            Self::Decimal128(precision, scale, _, builder) => {
1599                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1600            }
1601            Self::Decimal256(precision, scale, _, builder) => {
1602                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1603            }
1604            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1605            Self::Duration(builder) => {
1606                let (_, vals, _) = builder.finish().into_parts();
1607                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1608                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1609                Arc::new(vals)
1610            }
1611            #[cfg(feature = "avro_custom_types")]
1612            Self::RunEndEncoded(width, len, inner) => {
1613                let values = inner.flush(nulls)?;
1614                let n = *len;
1615                let arr = values.as_ref();
1616                let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1617                if n > 0 {
1618                    run_starts.push(0);
1619                    for i in 1..n {
1620                        if !values_equal_at(arr, i - 1, i) {
1621                            run_starts.push(i);
1622                        }
1623                    }
1624                }
1625                if n > (u32::MAX as usize) {
1626                    return Err(AvroError::InvalidArgument(format!(
1627                        "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1628                    )));
1629                }
1630                let run_count = run_starts.len();
1631                let take_idx: PrimitiveArray<UInt32Type> =
1632                    run_starts.iter().map(|&s| s as u32).collect();
1633                let per_run_values = if run_count == 0 {
1634                    values.slice(0, 0)
1635                } else {
1636                    take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1637                        AvroError::ParseError(format!("take() for REE values failed: {e}"))
1638                    })?
1639                };
1640
1641                macro_rules! build_run_array {
1642                    ($Native:ty, $ArrowTy:ty) => {{
1643                        let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1644                        for (idx, &_start) in run_starts.iter().enumerate() {
1645                            let end = if idx + 1 < run_count {
1646                                run_starts[idx + 1]
1647                            } else {
1648                                n
1649                            };
1650                            ends.push(end as $Native);
1651                        }
1652                        let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1653                        let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1654                            .map_err(|e| AvroError::ParseError(e.to_string()))?;
1655                        Arc::new(run_arr) as ArrayRef
1656                    }};
1657                }
1658                match *width {
1659                    2 => {
1660                        if n > i16::MAX as usize {
1661                            return Err(AvroError::InvalidArgument(format!(
1662                                "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1663                            )));
1664                        }
1665                        build_run_array!(i16, Int16Type)
1666                    }
1667                    4 => build_run_array!(i32, Int32Type),
1668                    8 => build_run_array!(i64, Int64Type),
1669                    other => {
1670                        return Err(AvroError::InvalidArgument(format!(
1671                            "Unsupported run-end width {other} for RunEndEncoded"
1672                        )));
1673                    }
1674                }
1675            }
1676            Self::Union(u) => u.flush(nulls)?,
1677        })
1678    }
1679}
1680
1681// A lookup table for resolving fields between writer and reader schemas during record projection.
1682#[derive(Debug)]
1683struct DispatchLookupTable {
1684    // Maps each reader field index `r` to the corresponding writer field index.
1685    //
1686    // Semantics:
1687    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. The value from
1688    //   the writer field is decoded, and `promotion[r]` is applied.
1689    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. The reader field's
1690    //   default value is used.
1691    //
1692    // Representation (`i8`):
1693    // `i8` is used for a dense, cache-friendly dispatch table, consistent with Arrow's use of
1694    // `i8` for union type IDs. This requires that writer field indices do not exceed `i8::MAX`.
1695    //
1696    // Invariants:
1697    // - `to_reader.len() == promotion.len()` and matches the reader field count.
1698    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
1699    to_reader: Box<[i8]>,
1700    // For each reader field `r`, specifies the `Promotion` to apply to the writer's value.
1701    //
1702    // This is used when a writer field's type can be promoted to a reader field's type
1703    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
1704    promotion: Box<[Promotion]>,
1705}
1706
1707// Sentinel used in `DispatchLookupTable::to_reader` to mark
1708// "no matching writer field".
1709const NO_SOURCE: i8 = -1;
1710
1711impl DispatchLookupTable {
1712    fn from_writer_to_reader(
1713        promotion_map: &[Option<(usize, Promotion)>],
1714    ) -> Result<Self, AvroError> {
1715        let mut to_reader = Vec::with_capacity(promotion_map.len());
1716        let mut promotion = Vec::with_capacity(promotion_map.len());
1717        for map in promotion_map {
1718            match *map {
1719                Some((idx, promo)) => {
1720                    let idx_i8 = i8::try_from(idx).map_err(|_| {
1721                        AvroError::SchemaError(format!(
1722                            "Reader branch index {idx} exceeds i8 range (max {})",
1723                            i8::MAX
1724                        ))
1725                    })?;
1726                    to_reader.push(idx_i8);
1727                    promotion.push(promo);
1728                }
1729                None => {
1730                    to_reader.push(NO_SOURCE);
1731                    promotion.push(Promotion::Direct);
1732                }
1733            }
1734        }
1735        Ok(Self {
1736            to_reader: to_reader.into_boxed_slice(),
1737            promotion: promotion.into_boxed_slice(),
1738        })
1739    }
1740
1741    // Resolve a writer branch index to (reader_idx, promotion)
1742    #[inline]
1743    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1744        let reader_index = *self.to_reader.get(writer_index)?;
1745        (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1746    }
1747}
1748
1749#[derive(Debug)]
1750struct UnionDecoder {
1751    fields: UnionFields,
1752    type_ids: Vec<i8>,
1753    offsets: Vec<i32>,
1754    branches: Vec<Decoder>,
1755    counts: Vec<i32>,
1756    reader_type_codes: Vec<i8>,
1757    default_emit_idx: usize,
1758    null_emit_idx: usize,
1759    plan: UnionReadPlan,
1760}
1761
1762impl Default for UnionDecoder {
1763    fn default() -> Self {
1764        Self {
1765            fields: UnionFields::empty(),
1766            type_ids: Vec::new(),
1767            offsets: Vec::new(),
1768            branches: Vec::new(),
1769            counts: Vec::new(),
1770            reader_type_codes: Vec::new(),
1771            default_emit_idx: 0,
1772            null_emit_idx: 0,
1773            plan: UnionReadPlan::Passthrough,
1774        }
1775    }
1776}
1777
1778#[derive(Debug)]
1779enum UnionReadPlan {
1780    ReaderUnion {
1781        lookup_table: DispatchLookupTable,
1782    },
1783    FromSingle {
1784        reader_idx: usize,
1785        promotion: Promotion,
1786    },
1787    ToSingle {
1788        target: Box<Decoder>,
1789        lookup_table: DispatchLookupTable,
1790    },
1791    Passthrough,
1792}
1793
1794impl UnionDecoder {
1795    fn try_new(
1796        fields: UnionFields,
1797        branches: Vec<Decoder>,
1798        resolved: Option<ResolvedUnion>,
1799    ) -> Result<Self, AvroError> {
1800        let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1801        let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1802        let default_emit_idx = 0;
1803        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1804        let branch_len = branches.len().max(reader_type_codes.len());
1805        // Guard against impractically large unions that cannot be indexed by an Avro int
1806        let max_addr = (i32::MAX as usize) + 1;
1807        if branches.len() > max_addr {
1808            return Err(AvroError::SchemaError(format!(
1809                "Reader union has {} branches, which exceeds the maximum addressable \
1810                 branches by an Avro int tag ({} + 1).",
1811                branches.len(),
1812                i32::MAX
1813            )));
1814        }
1815        Ok(Self {
1816            fields,
1817            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1818            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1819            branches,
1820            counts: vec![0; branch_len],
1821            reader_type_codes,
1822            default_emit_idx,
1823            null_emit_idx,
1824            plan: Self::plan_from_resolved(resolved)?,
1825        })
1826    }
1827
1828    fn try_new_from_writer_union(
1829        info: ResolvedUnion,
1830        target: Box<Decoder>,
1831    ) -> Result<Self, AvroError> {
1832        // This constructor is only for writer-union to single-type resolution
1833        debug_assert!(info.writer_is_union && !info.reader_is_union);
1834        let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1835        Ok(Self {
1836            plan: UnionReadPlan::ToSingle {
1837                target,
1838                lookup_table,
1839            },
1840            ..Self::default()
1841        })
1842    }
1843
1844    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, AvroError> {
1845        let Some(info) = resolved else {
1846            return Ok(UnionReadPlan::Passthrough);
1847        };
1848        match (info.writer_is_union, info.reader_is_union) {
1849            (true, true) => {
1850                let lookup_table =
1851                    DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1852                Ok(UnionReadPlan::ReaderUnion { lookup_table })
1853            }
1854            (false, true) => {
1855                let Some(&(reader_idx, promotion)) =
1856                    info.writer_to_reader.first().and_then(Option::as_ref)
1857                else {
1858                    return Err(AvroError::SchemaError(
1859                        "Writer type does not match any reader union branch".to_string(),
1860                    ));
1861                };
1862                Ok(UnionReadPlan::FromSingle {
1863                    reader_idx,
1864                    promotion,
1865                })
1866            }
1867            (true, false) => Err(AvroError::InvalidArgument(
1868                "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1869                    .to_string(),
1870            )),
1871            // (false, false) is invalid and should never be constructed by the resolver.
1872            _ => Err(AvroError::SchemaError(
1873                "ResolvedUnion constructed for non-union sides; resolver should return None"
1874                    .to_string(),
1875            )),
1876        }
1877    }
1878
1879    #[inline]
1880    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
1881        // Avro unions are encoded by first writing the zero-based branch index.
1882        // In Avro 1.11.1 this is specified as an *int*; older specs said *long*,
1883        // but both use zig-zag varint encoding, so decoding as long is compatible
1884        // with either form and widely used in practice.
1885        let raw = buf.get_long()?;
1886        if raw < 0 {
1887            return Err(AvroError::ParseError(format!(
1888                "Negative union branch index {raw}"
1889            )));
1890        }
1891        usize::try_from(raw).map_err(|_| {
1892            AvroError::ParseError(format!(
1893                "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1894                (usize::BITS as usize)
1895            ))
1896        })
1897    }
1898
1899    #[inline]
1900    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1901        let branches_len = self.branches.len();
1902        let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1903            return Err(AvroError::ParseError(format!(
1904                "Union branch index {reader_idx} out of range ({branches_len} branches)"
1905            )));
1906        };
1907        self.type_ids.push(self.reader_type_codes[reader_idx]);
1908        self.offsets.push(self.counts[reader_idx]);
1909        self.counts[reader_idx] += 1;
1910        Ok(reader_branch)
1911    }
1912
1913    #[inline]
1914    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
1915    where
1916        F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
1917    {
1918        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1919            return action(target);
1920        }
1921        let reader_idx = match &self.plan {
1922            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1923            _ => fallback_idx,
1924        };
1925        self.emit_to(reader_idx).and_then(action)
1926    }
1927
1928    fn append_null(&mut self) -> Result<(), AvroError> {
1929        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1930    }
1931
1932    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
1933        self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1934    }
1935
1936    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1937        let (reader_idx, promotion) = match &mut self.plan {
1938            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1939            UnionReadPlan::ReaderUnion { lookup_table } => {
1940                let idx = Self::read_tag(buf)?;
1941                lookup_table.resolve(idx).ok_or_else(|| {
1942                    AvroError::ParseError(format!(
1943                        "Union branch index {idx} not resolvable by reader schema"
1944                    ))
1945                })?
1946            }
1947            UnionReadPlan::FromSingle {
1948                reader_idx,
1949                promotion,
1950            } => (*reader_idx, *promotion),
1951            UnionReadPlan::ToSingle {
1952                target,
1953                lookup_table,
1954            } => {
1955                let idx = Self::read_tag(buf)?;
1956                return match lookup_table.resolve(idx) {
1957                    Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1958                    None => Err(AvroError::ParseError(format!(
1959                        "Writer union branch {idx} does not resolve to reader type"
1960                    ))),
1961                };
1962            }
1963        };
1964        let decoder = self.emit_to(reader_idx)?;
1965        decoder.decode_with_promotion(buf, promotion)
1966    }
1967
1968    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1969        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1970            return target.flush(nulls);
1971        }
1972        debug_assert!(
1973            nulls.is_none(),
1974            "UnionArray does not accept a validity bitmap; \
1975                     nulls should have been materialized as a Null child during decode"
1976        );
1977        let children = self
1978            .branches
1979            .iter_mut()
1980            .map(|d| d.flush(None))
1981            .collect::<Result<Vec<_>, _>>()?;
1982        let arr = UnionArray::try_new(
1983            self.fields.clone(),
1984            flush_values(&mut self.type_ids).into_iter().collect(),
1985            Some(flush_values(&mut self.offsets).into_iter().collect()),
1986            children,
1987        )
1988        .map_err(|e| AvroError::ParseError(e.to_string()))?;
1989        Ok(Arc::new(arr))
1990    }
1991}
1992
1993#[derive(Debug, Default)]
1994struct UnionDecoderBuilder {
1995    fields: Option<UnionFields>,
1996    branches: Option<Vec<Decoder>>,
1997    resolved: Option<ResolvedUnion>,
1998    target: Option<Box<Decoder>>,
1999}
2000
2001impl UnionDecoderBuilder {
2002    fn new() -> Self {
2003        Self::default()
2004    }
2005
2006    fn with_fields(mut self, fields: UnionFields) -> Self {
2007        self.fields = Some(fields);
2008        self
2009    }
2010
2011    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
2012        self.branches = Some(branches);
2013        self
2014    }
2015
2016    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
2017        self.resolved = Some(resolved_union);
2018        self
2019    }
2020
2021    fn with_target(mut self, target: Box<Decoder>) -> Self {
2022        self.target = Some(target);
2023        self
2024    }
2025
2026    fn build(self) -> Result<UnionDecoder, AvroError> {
2027        match (self.resolved, self.fields, self.branches, self.target) {
2028            (resolved, Some(fields), Some(branches), None) => {
2029                UnionDecoder::try_new(fields, branches, resolved)
2030            }
2031            (Some(info), None, None, Some(target))
2032                if info.writer_is_union && !info.reader_is_union =>
2033            {
2034                UnionDecoder::try_new_from_writer_union(info, target)
2035            }
2036            _ => Err(AvroError::InvalidArgument(
2037                "Invalid UnionDecoderBuilder configuration: expected either \
2038                 (fields + branches + resolved) with no target for reader-unions, or \
2039                 (resolved + target) with no fields/branches for writer-union to single."
2040                    .to_string(),
2041            )),
2042        }
2043    }
2044}
2045
2046#[derive(Debug, Copy, Clone)]
2047enum NegativeBlockBehavior {
2048    ProcessItems,
2049    SkipBySize,
2050}
2051
2052#[inline]
2053fn skip_blocks(
2054    buf: &mut AvroCursor,
2055    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2056) -> Result<usize, AvroError> {
2057    process_blockwise(
2058        buf,
2059        move |c| skip_item(c),
2060        NegativeBlockBehavior::SkipBySize,
2061    )
2062}
2063
2064#[inline]
2065fn flush_dict(
2066    indices: &mut Vec<i32>,
2067    symbols: &[String],
2068    nulls: Option<NullBuffer>,
2069) -> Result<ArrayRef, AvroError> {
2070    let keys = flush_primitive::<Int32Type>(indices, nulls);
2071    let values = Arc::new(StringArray::from_iter_values(
2072        symbols.iter().map(|s| s.as_str()),
2073    ));
2074    DictionaryArray::try_new(keys, values)
2075        .map_err(Into::into)
2076        .map(|arr| Arc::new(arr) as ArrayRef)
2077}
2078
2079#[inline]
2080fn read_blocks(
2081    buf: &mut AvroCursor,
2082    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2083) -> Result<usize, AvroError> {
2084    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
2085}
2086
2087#[inline]
2088fn process_blockwise(
2089    buf: &mut AvroCursor,
2090    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2091    negative_behavior: NegativeBlockBehavior,
2092) -> Result<usize, AvroError> {
2093    let mut total = 0usize;
2094    loop {
2095        // Read the block count
2096        //  positive = that many items
2097        //  negative = that many items + read block size
2098        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
2099        let block_count = buf.get_long()?;
2100        match block_count.cmp(&0) {
2101            Ordering::Equal => break,
2102            Ordering::Less => {
2103                let count = (-block_count) as usize;
2104                // A negative count is followed by a long of the size in bytes
2105                let size_in_bytes = buf.get_long()? as usize;
2106                match negative_behavior {
2107                    NegativeBlockBehavior::ProcessItems => {
2108                        // Process items one-by-one after reading size
2109                        for _ in 0..count {
2110                            on_item(buf)?;
2111                        }
2112                    }
2113                    NegativeBlockBehavior::SkipBySize => {
2114                        // Skip the entire block payload at once
2115                        let _ = buf.get_fixed(size_in_bytes)?;
2116                    }
2117                }
2118                total += count;
2119            }
2120            Ordering::Greater => {
2121                let count = block_count as usize;
2122                for _ in 0..count {
2123                    on_item(buf)?;
2124                }
2125                total += count;
2126            }
2127        }
2128    }
2129    Ok(total)
2130}
2131
2132#[inline]
2133fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
2134    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
2135}
2136
2137#[inline]
2138fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
2139    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
2140}
2141
2142#[inline]
2143fn flush_primitive<T: ArrowPrimitiveType>(
2144    values: &mut Vec<T::Native>,
2145    nulls: Option<NullBuffer>,
2146) -> PrimitiveArray<T> {
2147    PrimitiveArray::new(flush_values(values).into(), nulls)
2148}
2149
2150#[inline]
2151fn read_decimal_bytes_be<const N: usize>(
2152    buf: &mut AvroCursor<'_>,
2153    size: &Option<usize>,
2154) -> Result<[u8; N], AvroError> {
2155    match size {
2156        Some(n) if *n == N => {
2157            let raw = buf.get_fixed(N)?;
2158            let mut arr = [0u8; N];
2159            arr.copy_from_slice(raw);
2160            Ok(arr)
2161        }
2162        Some(n) => {
2163            let raw = buf.get_fixed(*n)?;
2164            sign_cast_to::<N>(raw)
2165        }
2166        None => {
2167            let raw = buf.get_bytes()?;
2168            sign_cast_to::<N>(raw)
2169        }
2170    }
2171}
2172
2173/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
2174/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
2175/// the payload is a big-endian two's-complement integer, and when narrowing it must
2176/// be representable without changing sign or value.
2177///
2178/// If `raw.len() < N`, the value is sign-extended.
2179/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
2180/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
2181#[inline]
2182fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
2183    let len = raw.len();
2184    // Fast path: exact width, just copy
2185    if len == N {
2186        let mut out = [0u8; N];
2187        out.copy_from_slice(raw);
2188        return Ok(out);
2189    }
2190    // Determine sign byte from MSB of first byte (empty => positive)
2191    let first = raw.first().copied().unwrap_or(0u8);
2192    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
2193    // Pre-fill with sign byte to support sign extension
2194    let mut out = [sign_byte; N];
2195    if len > N {
2196        // Validate truncation: all dropped leading bytes must equal sign_byte,
2197        // and the MSB of the first kept byte must match the sign.
2198        let extra = len - N;
2199        // Any non-sign byte in the truncated prefix indicates overflow
2200        if raw[..extra].iter().any(|&b| b != sign_byte) {
2201            return Err(AvroError::ParseError(format!(
2202                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2203                len, N
2204            )));
2205        }
2206        if N > 0 {
2207            let first_kept = raw[extra];
2208            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
2209            if sign_bit_mismatch {
2210                return Err(AvroError::ParseError(format!(
2211                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2212                    len, N
2213                )));
2214            }
2215        }
2216        out.copy_from_slice(&raw[extra..]);
2217        return Ok(out);
2218    }
2219    out[N - len..].copy_from_slice(raw);
2220    Ok(out)
2221}
2222
2223#[cfg(feature = "avro_custom_types")]
2224#[inline]
2225fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
2226    match (arr.is_null(i), arr.is_null(j)) {
2227        (true, true) => true,
2228        (true, false) | (false, true) => false,
2229        (false, false) => {
2230            let a = arr.slice(i, 1);
2231            let b = arr.slice(j, 1);
2232            a == b
2233        }
2234    }
2235}
2236
2237#[derive(Debug)]
2238struct Projector {
2239    writer_to_reader: Arc<[Option<usize>]>,
2240    skip_decoders: Vec<Option<Skipper>>,
2241    field_defaults: Vec<Option<AvroLiteral>>,
2242    default_injections: Arc<[(usize, AvroLiteral)]>,
2243}
2244
2245#[derive(Debug)]
2246struct ProjectorBuilder<'a> {
2247    rec: &'a ResolvedRecord,
2248    reader_fields: Arc<[AvroField]>,
2249}
2250
2251impl<'a> ProjectorBuilder<'a> {
2252    #[inline]
2253    fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
2254        Self {
2255            rec,
2256            reader_fields: reader_fields.clone(),
2257        }
2258    }
2259
2260    #[inline]
2261    fn build(self) -> Result<Projector, AvroError> {
2262        let reader_fields = self.reader_fields;
2263        let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
2264        for avro_field in reader_fields.as_ref() {
2265            if let Some(ResolutionInfo::DefaultValue(lit)) =
2266                avro_field.data_type().resolution.as_ref()
2267            {
2268                field_defaults.push(Some(lit.clone()));
2269            } else {
2270                field_defaults.push(None);
2271            }
2272        }
2273        let mut default_injections: Vec<(usize, AvroLiteral)> =
2274            Vec::with_capacity(self.rec.default_fields.len());
2275        for &idx in self.rec.default_fields.as_ref() {
2276            let lit = field_defaults
2277                .get(idx)
2278                .and_then(|lit| lit.clone())
2279                .unwrap_or(AvroLiteral::Null);
2280            default_injections.push((idx, lit));
2281        }
2282        let mut skip_decoders: Vec<Option<Skipper>> =
2283            Vec::with_capacity(self.rec.skip_fields.len());
2284        for datatype in self.rec.skip_fields.as_ref() {
2285            let skipper = match datatype {
2286                Some(datatype) => Some(Skipper::from_avro(datatype)?),
2287                None => None,
2288            };
2289            skip_decoders.push(skipper);
2290        }
2291        Ok(Projector {
2292            writer_to_reader: self.rec.writer_to_reader.clone(),
2293            skip_decoders,
2294            field_defaults,
2295            default_injections: default_injections.into(),
2296        })
2297    }
2298}
2299
2300impl Projector {
2301    #[inline]
2302    fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), AvroError> {
2303        // SAFETY: `index` is obtained by listing the reader's record fields (i.e., from
2304        // `decoders.iter_mut().enumerate()`), and `field_defaults` was built in
2305        // `ProjectorBuilder::build` to have exactly one element per reader field.
2306        // Therefore, `index < self.field_defaults.len()` always holds here, so
2307        // `self.field_defaults[index]` cannot panic. We only take an immutable reference
2308        // via `.as_ref()`, and `self` is borrowed immutably.
2309        if let Some(default_literal) = self.field_defaults[index].as_ref() {
2310            decoder.append_default(default_literal)
2311        } else {
2312            decoder.append_null()
2313        }
2314    }
2315
2316    #[inline]
2317    fn project_record(
2318        &mut self,
2319        buf: &mut AvroCursor<'_>,
2320        encodings: &mut [Decoder],
2321    ) -> Result<(), AvroError> {
2322        debug_assert_eq!(
2323            self.writer_to_reader.len(),
2324            self.skip_decoders.len(),
2325            "internal invariant: mapping and skipper lists must have equal length"
2326        );
2327        for (i, (mapping, skipper_opt)) in self
2328            .writer_to_reader
2329            .iter()
2330            .zip(self.skip_decoders.iter_mut())
2331            .enumerate()
2332        {
2333            match (mapping, skipper_opt.as_mut()) {
2334                (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
2335                (None, Some(skipper)) => skipper.skip(buf)?,
2336                (None, None) => {
2337                    return Err(AvroError::SchemaError(format!(
2338                        "No skipper available for writer-only field at index {i}",
2339                    )));
2340                }
2341            }
2342        }
2343        for (reader_index, lit) in self.default_injections.as_ref() {
2344            encodings[*reader_index].append_default(lit)?;
2345        }
2346        Ok(())
2347    }
2348}
2349
2350/// Lightweight skipper for non‑projected writer fields
2351/// (fields present in the writer schema but omitted by the reader/projection);
2352/// per Avro 1.11.1 schema resolution these fields are ignored.
2353///
2354/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
2355#[derive(Debug)]
2356enum Skipper {
2357    Null,
2358    Boolean,
2359    Int32,
2360    Int64,
2361    Float32,
2362    Float64,
2363    Bytes,
2364    String,
2365    TimeMicros,
2366    TimestampMillis,
2367    TimestampMicros,
2368    TimestampNanos,
2369    Fixed(usize),
2370    Decimal(Option<usize>),
2371    UuidString,
2372    Enum,
2373    DurationFixed12,
2374    List(Box<Skipper>),
2375    Map(Box<Skipper>),
2376    Struct(Vec<Skipper>),
2377    Union(Vec<Skipper>),
2378    Nullable(Nullability, Box<Skipper>),
2379    #[cfg(feature = "avro_custom_types")]
2380    RunEndEncoded(Box<Skipper>),
2381}
2382
2383impl Skipper {
2384    fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
2385        let mut base = match dt.codec() {
2386            Codec::Null => Self::Null,
2387            Codec::Boolean => Self::Boolean,
2388            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
2389            Codec::Int64 => Self::Int64,
2390            Codec::TimeMicros => Self::TimeMicros,
2391            Codec::TimestampMillis(_) => Self::TimestampMillis,
2392            Codec::TimestampMicros(_) => Self::TimestampMicros,
2393            Codec::TimestampNanos(_) => Self::TimestampNanos,
2394            #[cfg(feature = "avro_custom_types")]
2395            Codec::DurationNanos
2396            | Codec::DurationMicros
2397            | Codec::DurationMillis
2398            | Codec::DurationSeconds => Self::Int64,
2399            #[cfg(feature = "avro_custom_types")]
2400            Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 | Codec::Time32Secs => {
2401                Self::Int32
2402            }
2403            #[cfg(feature = "avro_custom_types")]
2404            Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
2405                Self::Int64
2406            }
2407            #[cfg(feature = "avro_custom_types")]
2408            Codec::UInt64 => Self::Fixed(8),
2409            #[cfg(feature = "avro_custom_types")]
2410            Codec::Float16 => Self::Fixed(2),
2411            #[cfg(feature = "avro_custom_types")]
2412            Codec::IntervalYearMonth => Self::Fixed(4),
2413            #[cfg(feature = "avro_custom_types")]
2414            Codec::IntervalMonthDayNano => Self::Fixed(16),
2415            #[cfg(feature = "avro_custom_types")]
2416            Codec::IntervalDayTime => Self::Fixed(8),
2417            Codec::Float32 => Self::Float32,
2418            Codec::Float64 => Self::Float64,
2419            Codec::Binary => Self::Bytes,
2420            Codec::Utf8 | Codec::Utf8View => Self::String,
2421            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2422            Codec::Decimal(_, _, size) => Self::Decimal(*size),
2423            Codec::Uuid => Self::UuidString, // encoded as string
2424            Codec::Enum(_) => Self::Enum,
2425            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2426            Codec::Struct(fields) => Self::Struct(
2427                fields
2428                    .iter()
2429                    .map(|f| Skipper::from_avro(f.data_type()))
2430                    .collect::<Result<_, _>>()?,
2431            ),
2432            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2433            Codec::Interval => Self::DurationFixed12,
2434            Codec::Union(encodings, _, _) => {
2435                let max_addr = (i32::MAX as usize) + 1;
2436                if encodings.len() > max_addr {
2437                    return Err(AvroError::SchemaError(format!(
2438                        "Writer union has {} branches, which exceeds the maximum addressable \
2439                         branches by an Avro int tag ({} + 1).",
2440                        encodings.len(),
2441                        i32::MAX
2442                    )));
2443                }
2444                Self::Union(
2445                    encodings
2446                        .iter()
2447                        .map(Skipper::from_avro)
2448                        .collect::<Result<_, _>>()?,
2449                )
2450            }
2451            #[cfg(feature = "avro_custom_types")]
2452            Codec::RunEndEncoded(inner, _w) => {
2453                Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2454            }
2455        };
2456        if let Some(n) = dt.nullability() {
2457            base = Self::Nullable(n, Box::new(base));
2458        }
2459        Ok(base)
2460    }
2461
2462    fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2463        match self {
2464            Self::Null => Ok(()),
2465            Self::Boolean => {
2466                buf.get_bool()?;
2467                Ok(())
2468            }
2469            Self::Int32 => {
2470                buf.skip_int()?;
2471                Ok(())
2472            }
2473            Self::Int64
2474            | Self::TimeMicros
2475            | Self::TimestampMillis
2476            | Self::TimestampMicros
2477            | Self::TimestampNanos => {
2478                buf.skip_long()?;
2479                Ok(())
2480            }
2481            Self::Float32 => {
2482                buf.get_float()?;
2483                Ok(())
2484            }
2485            Self::Float64 => {
2486                buf.get_double()?;
2487                Ok(())
2488            }
2489            Self::Bytes | Self::String | Self::UuidString => {
2490                buf.get_bytes()?;
2491                Ok(())
2492            }
2493            Self::Fixed(sz) => {
2494                buf.get_fixed(*sz)?;
2495                Ok(())
2496            }
2497            Self::Decimal(size) => {
2498                if let Some(s) = size {
2499                    buf.get_fixed(*s)
2500                } else {
2501                    buf.get_bytes()
2502                }?;
2503                Ok(())
2504            }
2505            Self::Enum => {
2506                buf.skip_int()?;
2507                Ok(())
2508            }
2509            Self::DurationFixed12 => {
2510                buf.get_fixed(12)?;
2511                Ok(())
2512            }
2513            Self::List(item) => {
2514                skip_blocks(buf, |c| item.skip(c))?;
2515                Ok(())
2516            }
2517            Self::Map(value) => {
2518                skip_blocks(buf, |c| {
2519                    c.get_bytes()?; // key
2520                    value.skip(c)
2521                })?;
2522                Ok(())
2523            }
2524            Self::Struct(fields) => {
2525                for f in fields.iter_mut() {
2526                    f.skip(buf)?
2527                }
2528                Ok(())
2529            }
2530            Self::Union(encodings) => {
2531                // Union tag must be ZigZag-decoded
2532                let raw = buf.get_long()?;
2533                if raw < 0 {
2534                    return Err(AvroError::ParseError(format!(
2535                        "Negative union branch index {raw}"
2536                    )));
2537                }
2538                let idx: usize = usize::try_from(raw).map_err(|_| {
2539                    AvroError::ParseError(format!(
2540                        "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2541                        (usize::BITS as usize)
2542                    ))
2543                })?;
2544                let Some(encoding) = encodings.get_mut(idx) else {
2545                    return Err(AvroError::ParseError(format!(
2546                        "Union branch index {idx} out of range for skipper ({} branches)",
2547                        encodings.len()
2548                    )));
2549                };
2550                encoding.skip(buf)
2551            }
2552            Self::Nullable(order, inner) => {
2553                let branch = buf.read_vlq()?;
2554                let is_not_null = match *order {
2555                    Nullability::NullFirst => branch != 0,
2556                    Nullability::NullSecond => branch == 0,
2557                };
2558                if is_not_null {
2559                    inner.skip(buf)?;
2560                }
2561                Ok(())
2562            }
2563            #[cfg(feature = "avro_custom_types")]
2564            Self::RunEndEncoded(inner) => inner.skip(buf),
2565        }
2566    }
2567}
2568
2569#[cfg(test)]
2570mod tests {
2571    use super::*;
2572    use crate::codec::AvroFieldBuilder;
2573    use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2574    use arrow_array::cast::AsArray;
2575    use indexmap::IndexMap;
2576    use std::collections::HashMap;
2577
2578    fn encode_avro_int(value: i32) -> Vec<u8> {
2579        let mut buf = Vec::new();
2580        let mut v = (value << 1) ^ (value >> 31);
2581        while v & !0x7F != 0 {
2582            buf.push(((v & 0x7F) | 0x80) as u8);
2583            v >>= 7;
2584        }
2585        buf.push(v as u8);
2586        buf
2587    }
2588
2589    fn encode_avro_long(value: i64) -> Vec<u8> {
2590        let mut buf = Vec::new();
2591        let mut v = (value << 1) ^ (value >> 63);
2592        while v & !0x7F != 0 {
2593            buf.push(((v & 0x7F) | 0x80) as u8);
2594            v >>= 7;
2595        }
2596        buf.push(v as u8);
2597        buf
2598    }
2599
2600    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2601        let mut buf = encode_avro_long(bytes.len() as i64);
2602        buf.extend_from_slice(bytes);
2603        buf
2604    }
2605
2606    fn avro_from_codec(codec: Codec) -> AvroDataType {
2607        AvroDataType::new(codec, Default::default(), None)
2608    }
2609
2610    fn resolved_root_datatype(
2611        writer: Schema<'static>,
2612        reader: Schema<'static>,
2613        use_utf8view: bool,
2614        strict_mode: bool,
2615    ) -> AvroDataType {
2616        // Wrap writer schema in a single-field record
2617        let writer_record = Schema::Complex(ComplexType::Record(Record {
2618            name: "Root",
2619            namespace: None,
2620            doc: None,
2621            aliases: vec![],
2622            fields: vec![Field {
2623                name: "v",
2624                r#type: writer,
2625                default: None,
2626                doc: None,
2627                aliases: vec![],
2628            }],
2629            attributes: Attributes::default(),
2630        }));
2631
2632        // Wrap reader schema in a single-field record
2633        let reader_record = Schema::Complex(ComplexType::Record(Record {
2634            name: "Root",
2635            namespace: None,
2636            doc: None,
2637            aliases: vec![],
2638            fields: vec![Field {
2639                name: "v",
2640                r#type: reader,
2641                default: None,
2642                doc: None,
2643                aliases: vec![],
2644            }],
2645            attributes: Attributes::default(),
2646        }));
2647
2648        // Build resolved record, then extract the inner field's resolved AvroDataType
2649        let field = AvroFieldBuilder::new(&writer_record)
2650            .with_reader_schema(&reader_record)
2651            .with_utf8view(use_utf8view)
2652            .with_strict_mode(strict_mode)
2653            .build()
2654            .expect("schema resolution should succeed");
2655
2656        match field.data_type().codec() {
2657            Codec::Struct(fields) => fields[0].data_type().clone(),
2658            other => panic!("expected wrapper struct, got {other:?}"),
2659        }
2660    }
2661
2662    fn decoder_for_promotion(
2663        writer: PrimitiveType,
2664        reader: PrimitiveType,
2665        use_utf8view: bool,
2666    ) -> Decoder {
2667        let ws = Schema::TypeName(TypeName::Primitive(writer));
2668        let rs = Schema::TypeName(TypeName::Primitive(reader));
2669        let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2670        Decoder::try_new(&dt).unwrap()
2671    }
2672
2673    fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2674        AvroDataType::new(codec, HashMap::new(), nullability)
2675    }
2676
2677    #[cfg(feature = "avro_custom_types")]
2678    fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2679        let mut out = Vec::with_capacity(10);
2680        while x >= 0x80 {
2681            out.push((x as u8) | 0x80);
2682            x >>= 7;
2683        }
2684        out.push(x as u8);
2685        out
2686    }
2687
2688    #[test]
2689    fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2690        let ws = Schema::Union(vec![
2691            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2692            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2693        ]);
2694        let rs = Schema::Union(vec![
2695            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2696            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2697        ]);
2698
2699        let dt = resolved_root_datatype(ws, rs, false, false);
2700        let mut dec = Decoder::try_new(&dt).unwrap();
2701
2702        let mut rec1 = encode_avro_long(0);
2703        rec1.extend(encode_avro_int(7));
2704        let mut cur1 = AvroCursor::new(&rec1);
2705        dec.decode(&mut cur1).unwrap();
2706
2707        let mut rec2 = encode_avro_long(1);
2708        rec2.extend(encode_avro_bytes("abc".as_bytes()));
2709        let mut cur2 = AvroCursor::new(&rec2);
2710        dec.decode(&mut cur2).unwrap();
2711
2712        let arr = dec.flush(None).unwrap();
2713        let ua = arr
2714            .as_any()
2715            .downcast_ref::<UnionArray>()
2716            .expect("dense union output");
2717
2718        assert_eq!(
2719            ua.type_id(0),
2720            1,
2721            "first value must select reader 'long' branch"
2722        );
2723        assert_eq!(ua.value_offset(0), 0);
2724
2725        assert_eq!(
2726            ua.type_id(1),
2727            0,
2728            "second value must select reader 'string' branch"
2729        );
2730        assert_eq!(ua.value_offset(1), 0);
2731
2732        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2733        assert_eq!(long_child.len(), 1);
2734        assert_eq!(long_child.value(0), 7);
2735
2736        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2737        assert_eq!(str_child.len(), 1);
2738        assert_eq!(str_child.value(0), "abc");
2739    }
2740
2741    #[test]
2742    fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2743        let ws = Schema::Union(vec![
2744            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2745            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2746        ]);
2747        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2748
2749        let dt = resolved_root_datatype(ws, rs, false, false);
2750        let mut dec = Decoder::try_new(&dt).unwrap();
2751
2752        let mut data = encode_avro_long(0);
2753        data.extend(encode_avro_int(5));
2754        let mut cur = AvroCursor::new(&data);
2755        dec.decode(&mut cur).unwrap();
2756
2757        let arr = dec.flush(None).unwrap();
2758        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2759        assert_eq!(out.len(), 1);
2760        assert_eq!(out.value(0), 5);
2761    }
2762
2763    #[test]
2764    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2765        let ws = Schema::Union(vec![
2766            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2767            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2768        ]);
2769        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2770
2771        let dt = resolved_root_datatype(ws, rs, false, false);
2772        let mut dec = Decoder::try_new(&dt).unwrap();
2773
2774        let mut data = encode_avro_long(1);
2775        data.extend(encode_avro_bytes("z".as_bytes()));
2776        let mut cur = AvroCursor::new(&data);
2777        let res = dec.decode(&mut cur);
2778        assert!(
2779            res.is_err(),
2780            "expected error when writer union branch does not resolve to reader non-union type"
2781        );
2782    }
2783
2784    #[test]
2785    fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2786        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2787        let rs = Schema::Union(vec![
2788            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2789            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2790        ]);
2791
2792        let dt = resolved_root_datatype(ws, rs, false, false);
2793        let mut dec = Decoder::try_new(&dt).unwrap();
2794
2795        let data = encode_avro_int(6);
2796        let mut cur = AvroCursor::new(&data);
2797        dec.decode(&mut cur).unwrap();
2798
2799        let arr = dec.flush(None).unwrap();
2800        let ua = arr
2801            .as_any()
2802            .downcast_ref::<UnionArray>()
2803            .expect("dense union output");
2804        assert_eq!(ua.len(), 1);
2805        assert_eq!(
2806            ua.type_id(0),
2807            1,
2808            "must resolve to reader 'long' branch (type_id 1)"
2809        );
2810        assert_eq!(ua.value_offset(0), 0);
2811
2812        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2813        assert_eq!(long_child.len(), 1);
2814        assert_eq!(long_child.value(0), 6);
2815
2816        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2817        assert_eq!(str_child.len(), 0, "string branch must be empty");
2818    }
2819
2820    #[test]
2821    fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2822        let ws = Schema::Union(vec![
2823            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2824            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2825        ]);
2826        let rs = Schema::Union(vec![
2827            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2828            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2829        ]);
2830
2831        let dt = resolved_root_datatype(ws, rs, false, false);
2832        let mut dec = Decoder::try_new(&dt).unwrap();
2833
2834        let mut data = encode_avro_long(1);
2835        data.push(1);
2836        let mut cur = AvroCursor::new(&data);
2837        let res = dec.decode(&mut cur);
2838        assert!(
2839            res.is_err(),
2840            "expected error for unmapped writer 'boolean' branch"
2841        );
2842    }
2843
2844    #[test]
2845    fn test_schema_resolution_promotion_int_to_long() {
2846        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2847        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2848        for v in [0, 1, -2, 123456] {
2849            let data = encode_avro_int(v);
2850            let mut cur = AvroCursor::new(&data);
2851            dec.decode(&mut cur).unwrap();
2852        }
2853        let arr = dec.flush(None).unwrap();
2854        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2855        assert_eq!(a.value(0), 0);
2856        assert_eq!(a.value(1), 1);
2857        assert_eq!(a.value(2), -2);
2858        assert_eq!(a.value(3), 123456);
2859    }
2860
2861    #[test]
2862    fn test_schema_resolution_promotion_int_to_float() {
2863        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2864        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2865        for v in [0, 42, -7] {
2866            let data = encode_avro_int(v);
2867            let mut cur = AvroCursor::new(&data);
2868            dec.decode(&mut cur).unwrap();
2869        }
2870        let arr = dec.flush(None).unwrap();
2871        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2872        assert_eq!(a.value(0), 0.0);
2873        assert_eq!(a.value(1), 42.0);
2874        assert_eq!(a.value(2), -7.0);
2875    }
2876
2877    #[test]
2878    fn test_schema_resolution_promotion_int_to_double() {
2879        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2880        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2881        for v in [1, -1, 10_000] {
2882            let data = encode_avro_int(v);
2883            let mut cur = AvroCursor::new(&data);
2884            dec.decode(&mut cur).unwrap();
2885        }
2886        let arr = dec.flush(None).unwrap();
2887        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2888        assert_eq!(a.value(0), 1.0);
2889        assert_eq!(a.value(1), -1.0);
2890        assert_eq!(a.value(2), 10_000.0);
2891    }
2892
2893    #[test]
2894    fn test_schema_resolution_promotion_long_to_float() {
2895        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2896        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2897        for v in [0_i64, 1_000_000_i64, -123_i64] {
2898            let data = encode_avro_long(v);
2899            let mut cur = AvroCursor::new(&data);
2900            dec.decode(&mut cur).unwrap();
2901        }
2902        let arr = dec.flush(None).unwrap();
2903        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2904        assert_eq!(a.value(0), 0.0);
2905        assert_eq!(a.value(1), 1_000_000.0);
2906        assert_eq!(a.value(2), -123.0);
2907    }
2908
2909    #[test]
2910    fn test_schema_resolution_promotion_long_to_double() {
2911        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2912        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2913        for v in [2_i64, -2_i64, 9_223_372_i64] {
2914            let data = encode_avro_long(v);
2915            let mut cur = AvroCursor::new(&data);
2916            dec.decode(&mut cur).unwrap();
2917        }
2918        let arr = dec.flush(None).unwrap();
2919        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2920        assert_eq!(a.value(0), 2.0);
2921        assert_eq!(a.value(1), -2.0);
2922        assert_eq!(a.value(2), 9_223_372.0);
2923    }
2924
2925    #[test]
2926    fn test_schema_resolution_promotion_float_to_double() {
2927        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2928        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2929        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2930            let data = v.to_le_bytes().to_vec();
2931            let mut cur = AvroCursor::new(&data);
2932            dec.decode(&mut cur).unwrap();
2933        }
2934        let arr = dec.flush(None).unwrap();
2935        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2936        assert_eq!(a.value(0), 0.5_f64);
2937        assert_eq!(a.value(1), -3.25_f64);
2938        assert_eq!(a.value(2), 1.0e6_f64);
2939    }
2940
2941    #[test]
2942    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2943        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2944        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2945        for s in ["hello", "world", "héllo"] {
2946            let data = encode_avro_bytes(s.as_bytes());
2947            let mut cur = AvroCursor::new(&data);
2948            dec.decode(&mut cur).unwrap();
2949        }
2950        let arr = dec.flush(None).unwrap();
2951        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2952        assert_eq!(a.value(0), "hello");
2953        assert_eq!(a.value(1), "world");
2954        assert_eq!(a.value(2), "héllo");
2955    }
2956
2957    #[test]
2958    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2959        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2960        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2961        let data = encode_avro_bytes("abc".as_bytes());
2962        let mut cur = AvroCursor::new(&data);
2963        dec.decode(&mut cur).unwrap();
2964        let arr = dec.flush(None).unwrap();
2965        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2966        assert_eq!(a.value(0), "abc");
2967    }
2968
2969    #[test]
2970    fn test_schema_resolution_promotion_string_to_bytes() {
2971        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2972        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2973        for s in ["", "abc", "data"] {
2974            let data = encode_avro_bytes(s.as_bytes());
2975            let mut cur = AvroCursor::new(&data);
2976            dec.decode(&mut cur).unwrap();
2977        }
2978        let arr = dec.flush(None).unwrap();
2979        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2980        assert_eq!(a.value(0), b"");
2981        assert_eq!(a.value(1), b"abc");
2982        assert_eq!(a.value(2), "data".as_bytes());
2983    }
2984
2985    #[test]
2986    fn test_schema_resolution_no_promotion_passthrough_int() {
2987        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2988        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2989        // Wrap both in a synthetic single-field record and resolve with AvroFieldBuilder
2990        let writer_record = Schema::Complex(ComplexType::Record(Record {
2991            name: "Root",
2992            namespace: None,
2993            doc: None,
2994            aliases: vec![],
2995            fields: vec![Field {
2996                name: "v",
2997                r#type: ws,
2998                default: None,
2999                doc: None,
3000                aliases: vec![],
3001            }],
3002            attributes: Attributes::default(),
3003        }));
3004        let reader_record = Schema::Complex(ComplexType::Record(Record {
3005            name: "Root",
3006            namespace: None,
3007            doc: None,
3008            aliases: vec![],
3009            fields: vec![Field {
3010                name: "v",
3011                r#type: rs,
3012                default: None,
3013                doc: None,
3014                aliases: vec![],
3015            }],
3016            attributes: Attributes::default(),
3017        }));
3018        let field = AvroFieldBuilder::new(&writer_record)
3019            .with_reader_schema(&reader_record)
3020            .with_utf8view(false)
3021            .with_strict_mode(false)
3022            .build()
3023            .unwrap();
3024        // Extract the resolved inner field's AvroDataType
3025        let dt = match field.data_type().codec() {
3026            Codec::Struct(fields) => fields[0].data_type().clone(),
3027            other => panic!("expected wrapper struct, got {other:?}"),
3028        };
3029        let mut dec = Decoder::try_new(&dt).unwrap();
3030        assert!(matches!(dec, Decoder::Int32(_)));
3031        for v in [7, -9] {
3032            let data = encode_avro_int(v);
3033            let mut cur = AvroCursor::new(&data);
3034            dec.decode(&mut cur).unwrap();
3035        }
3036        let arr = dec.flush(None).unwrap();
3037        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3038        assert_eq!(a.value(0), 7);
3039        assert_eq!(a.value(1), -9);
3040    }
3041
3042    #[test]
3043    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
3044        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3045        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
3046        let writer_record = Schema::Complex(ComplexType::Record(Record {
3047            name: "Root",
3048            namespace: None,
3049            doc: None,
3050            aliases: vec![],
3051            fields: vec![Field {
3052                name: "v",
3053                r#type: ws,
3054                default: None,
3055                doc: None,
3056                aliases: vec![],
3057            }],
3058            attributes: Attributes::default(),
3059        }));
3060        let reader_record = Schema::Complex(ComplexType::Record(Record {
3061            name: "Root",
3062            namespace: None,
3063            doc: None,
3064            aliases: vec![],
3065            fields: vec![Field {
3066                name: "v",
3067                r#type: rs,
3068                default: None,
3069                doc: None,
3070                aliases: vec![],
3071            }],
3072            attributes: Attributes::default(),
3073        }));
3074        let res = AvroFieldBuilder::new(&writer_record)
3075            .with_reader_schema(&reader_record)
3076            .with_utf8view(false)
3077            .with_strict_mode(false)
3078            .build();
3079        assert!(res.is_err(), "expected error for illegal promotion");
3080    }
3081
3082    #[test]
3083    fn test_map_decoding_one_entry() {
3084        let value_type = avro_from_codec(Codec::Utf8);
3085        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3086        let mut decoder = Decoder::try_new(&map_type).unwrap();
3087        // Encode a single map with one entry: {"hello": "world"}
3088        let mut data = Vec::new();
3089        data.extend_from_slice(&encode_avro_long(1));
3090        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
3091        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
3092        data.extend_from_slice(&encode_avro_long(0));
3093        let mut cursor = AvroCursor::new(&data);
3094        decoder.decode(&mut cursor).unwrap();
3095        let array = decoder.flush(None).unwrap();
3096        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3097        assert_eq!(map_arr.len(), 1); // one map
3098        assert_eq!(map_arr.value_length(0), 1);
3099        let entries = map_arr.value(0);
3100        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
3101        assert_eq!(struct_entries.len(), 1);
3102        let key_arr = struct_entries
3103            .column_by_name("key")
3104            .unwrap()
3105            .as_any()
3106            .downcast_ref::<StringArray>()
3107            .unwrap();
3108        let val_arr = struct_entries
3109            .column_by_name("value")
3110            .unwrap()
3111            .as_any()
3112            .downcast_ref::<StringArray>()
3113            .unwrap();
3114        assert_eq!(key_arr.value(0), "hello");
3115        assert_eq!(val_arr.value(0), "world");
3116    }
3117
3118    #[test]
3119    fn test_map_decoding_empty() {
3120        let value_type = avro_from_codec(Codec::Utf8);
3121        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3122        let mut decoder = Decoder::try_new(&map_type).unwrap();
3123        let data = encode_avro_long(0);
3124        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3125        let array = decoder.flush(None).unwrap();
3126        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3127        assert_eq!(map_arr.len(), 1);
3128        assert_eq!(map_arr.value_length(0), 0);
3129    }
3130
3131    #[test]
3132    fn test_fixed_decoding() {
3133        let avro_type = avro_from_codec(Codec::Fixed(3));
3134        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3135
3136        let data1 = [1u8, 2, 3];
3137        let mut cursor1 = AvroCursor::new(&data1);
3138        decoder
3139            .decode(&mut cursor1)
3140            .expect("Failed to decode data1");
3141        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
3142        let data2 = [4u8, 5, 6];
3143        let mut cursor2 = AvroCursor::new(&data2);
3144        decoder
3145            .decode(&mut cursor2)
3146            .expect("Failed to decode data2");
3147        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
3148        let array = decoder.flush(None).expect("Failed to flush decoder");
3149        assert_eq!(array.len(), 2, "Array should contain two items");
3150        let fixed_size_binary_array = array
3151            .as_any()
3152            .downcast_ref::<FixedSizeBinaryArray>()
3153            .expect("Failed to downcast to FixedSizeBinaryArray");
3154        assert_eq!(
3155            fixed_size_binary_array.value_length(),
3156            3,
3157            "Fixed size of binary values should be 3"
3158        );
3159        assert_eq!(
3160            fixed_size_binary_array.value(0),
3161            &[1, 2, 3],
3162            "First item mismatch"
3163        );
3164        assert_eq!(
3165            fixed_size_binary_array.value(1),
3166            &[4, 5, 6],
3167            "Second item mismatch"
3168        );
3169    }
3170
3171    #[test]
3172    fn test_fixed_decoding_empty() {
3173        let avro_type = avro_from_codec(Codec::Fixed(5));
3174        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3175
3176        let array = decoder
3177            .flush(None)
3178            .expect("Failed to flush decoder for empty input");
3179
3180        assert_eq!(array.len(), 0, "Array should be empty");
3181        let fixed_size_binary_array = array
3182            .as_any()
3183            .downcast_ref::<FixedSizeBinaryArray>()
3184            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
3185
3186        assert_eq!(
3187            fixed_size_binary_array.value_length(),
3188            5,
3189            "Fixed size of binary values should be 5 as per type"
3190        );
3191    }
3192
3193    #[test]
3194    fn test_uuid_decoding() {
3195        let avro_type = avro_from_codec(Codec::Uuid);
3196        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3197        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
3198        let data = encode_avro_bytes(uuid_str.as_bytes());
3199        let mut cursor = AvroCursor::new(&data);
3200        decoder.decode(&mut cursor).expect("Failed to decode data");
3201        assert_eq!(
3202            cursor.position(),
3203            data.len(),
3204            "Cursor should advance by varint size + data size"
3205        );
3206        let array = decoder.flush(None).expect("Failed to flush decoder");
3207        let fixed_size_binary_array = array
3208            .as_any()
3209            .downcast_ref::<FixedSizeBinaryArray>()
3210            .expect("Array should be a FixedSizeBinaryArray");
3211        assert_eq!(fixed_size_binary_array.len(), 1);
3212        assert_eq!(fixed_size_binary_array.value_length(), 16);
3213        let expected_bytes = [
3214            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
3215            0x6b, 0xf6,
3216        ];
3217        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
3218    }
3219
3220    #[test]
3221    fn test_array_decoding() {
3222        let item_dt = avro_from_codec(Codec::Int32);
3223        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3224        let mut decoder = Decoder::try_new(&list_dt).unwrap();
3225        let mut row1 = Vec::new();
3226        row1.extend_from_slice(&encode_avro_long(2));
3227        row1.extend_from_slice(&encode_avro_int(10));
3228        row1.extend_from_slice(&encode_avro_int(20));
3229        row1.extend_from_slice(&encode_avro_long(0));
3230        let row2 = encode_avro_long(0);
3231        let mut cursor = AvroCursor::new(&row1);
3232        decoder.decode(&mut cursor).unwrap();
3233        let mut cursor2 = AvroCursor::new(&row2);
3234        decoder.decode(&mut cursor2).unwrap();
3235        let array = decoder.flush(None).unwrap();
3236        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3237        assert_eq!(list_arr.len(), 2);
3238        let offsets = list_arr.value_offsets();
3239        assert_eq!(offsets, &[0, 2, 2]);
3240        let values = list_arr.values();
3241        let int_arr = values.as_primitive::<Int32Type>();
3242        assert_eq!(int_arr.len(), 2);
3243        assert_eq!(int_arr.value(0), 10);
3244        assert_eq!(int_arr.value(1), 20);
3245    }
3246
3247    #[test]
3248    fn test_array_decoding_with_negative_block_count() {
3249        let item_dt = avro_from_codec(Codec::Int32);
3250        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3251        let mut decoder = Decoder::try_new(&list_dt).unwrap();
3252        let mut data = encode_avro_long(-3);
3253        data.extend_from_slice(&encode_avro_long(12));
3254        data.extend_from_slice(&encode_avro_int(1));
3255        data.extend_from_slice(&encode_avro_int(2));
3256        data.extend_from_slice(&encode_avro_int(3));
3257        data.extend_from_slice(&encode_avro_long(0));
3258        let mut cursor = AvroCursor::new(&data);
3259        decoder.decode(&mut cursor).unwrap();
3260        let array = decoder.flush(None).unwrap();
3261        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3262        assert_eq!(list_arr.len(), 1);
3263        assert_eq!(list_arr.value_length(0), 3);
3264        let values = list_arr.values().as_primitive::<Int32Type>();
3265        assert_eq!(values.len(), 3);
3266        assert_eq!(values.value(0), 1);
3267        assert_eq!(values.value(1), 2);
3268        assert_eq!(values.value(2), 3);
3269    }
3270
3271    #[test]
3272    fn test_nested_array_decoding() {
3273        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3274        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
3275        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
3276        let mut buf = Vec::new();
3277        buf.extend(encode_avro_long(1));
3278        buf.extend(encode_avro_long(2));
3279        buf.extend(encode_avro_int(5));
3280        buf.extend(encode_avro_int(6));
3281        buf.extend(encode_avro_long(0));
3282        buf.extend(encode_avro_long(0));
3283        let mut cursor = AvroCursor::new(&buf);
3284        decoder.decode(&mut cursor).unwrap();
3285        let arr = decoder.flush(None).unwrap();
3286        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
3287        assert_eq!(outer.len(), 1);
3288        assert_eq!(outer.value_length(0), 1);
3289        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
3290        assert_eq!(inner.len(), 1);
3291        assert_eq!(inner.value_length(0), 2);
3292        let values = inner
3293            .values()
3294            .as_any()
3295            .downcast_ref::<Int32Array>()
3296            .unwrap();
3297        assert_eq!(values.values(), &[5, 6]);
3298    }
3299
3300    #[test]
3301    fn test_array_decoding_empty_array() {
3302        let value_type = avro_from_codec(Codec::Utf8);
3303        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
3304        let mut decoder = Decoder::try_new(&map_type).unwrap();
3305        let data = encode_avro_long(0);
3306        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3307        let array = decoder.flush(None).unwrap();
3308        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3309        assert_eq!(list_arr.len(), 1);
3310        assert_eq!(list_arr.value_length(0), 0);
3311    }
3312
3313    #[test]
3314    fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
3315        use crate::schema::Array;
3316        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3317            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3318            attributes: Attributes::default(),
3319        }));
3320        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3321            items: Box::new(Schema::Union(vec![
3322                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3323                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3324            ])),
3325            attributes: Attributes::default(),
3326        }));
3327        let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
3328        if let Codec::List(inner) = dt.codec() {
3329            assert_eq!(
3330                inner.nullability(),
3331                Some(Nullability::NullFirst),
3332                "items should be nullable"
3333            );
3334        } else {
3335            panic!("expected List codec");
3336        }
3337        let mut decoder = Decoder::try_new(&dt).unwrap();
3338        let mut data = encode_avro_long(2);
3339        data.extend(encode_avro_int(10));
3340        data.extend(encode_avro_int(20));
3341        data.extend(encode_avro_long(0));
3342        let mut cursor = AvroCursor::new(&data);
3343        decoder.decode(&mut cursor).unwrap();
3344        assert_eq!(
3345            cursor.position(),
3346            data.len(),
3347            "all bytes should be consumed"
3348        );
3349        let array = decoder.flush(None).unwrap();
3350        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3351        assert_eq!(list_arr.len(), 1, "one list/row");
3352        assert_eq!(list_arr.value_length(0), 2, "two items in the list");
3353        let values = list_arr.values().as_primitive::<Int32Type>();
3354        assert_eq!(values.len(), 2);
3355        assert_eq!(values.value(0), 10);
3356        assert_eq!(values.value(1), 20);
3357        assert!(!values.is_null(0));
3358        assert!(!values.is_null(1));
3359    }
3360
3361    #[test]
3362    fn test_decimal_decoding_fixed256() {
3363        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3364        let mut decoder = Decoder::try_new(&dt).unwrap();
3365        let row1 = [
3366            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3367            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3368            0x00, 0x00, 0x30, 0x39,
3369        ];
3370        let row2 = [
3371            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3372            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3373            0xFF, 0xFF, 0xFF, 0x85,
3374        ];
3375        let mut data = Vec::new();
3376        data.extend_from_slice(&row1);
3377        data.extend_from_slice(&row2);
3378        let mut cursor = AvroCursor::new(&data);
3379        decoder.decode(&mut cursor).unwrap();
3380        decoder.decode(&mut cursor).unwrap();
3381        let arr = decoder.flush(None).unwrap();
3382        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3383        assert_eq!(dec.len(), 2);
3384        assert_eq!(dec.value_as_string(0), "123.45");
3385        assert_eq!(dec.value_as_string(1), "-1.23");
3386    }
3387
3388    #[test]
3389    fn test_decimal_decoding_fixed128() {
3390        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
3391        let mut decoder = Decoder::try_new(&dt).unwrap();
3392        let row1 = [
3393            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3394            0x30, 0x39,
3395        ];
3396        let row2 = [
3397            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3398            0xFF, 0x85,
3399        ];
3400        let mut data = Vec::new();
3401        data.extend_from_slice(&row1);
3402        data.extend_from_slice(&row2);
3403        let mut cursor = AvroCursor::new(&data);
3404        decoder.decode(&mut cursor).unwrap();
3405        decoder.decode(&mut cursor).unwrap();
3406        let arr = decoder.flush(None).unwrap();
3407        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3408        assert_eq!(dec.len(), 2);
3409        assert_eq!(dec.value_as_string(0), "123.45");
3410        assert_eq!(dec.value_as_string(1), "-1.23");
3411    }
3412
3413    #[test]
3414    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3415        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3416        let mut decoder = Decoder::try_new(&dt).unwrap();
3417        let row1 = [
3418            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3419            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3420            0x00, 0x00, 0x30, 0x39,
3421        ];
3422        let row2 = [
3423            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3424            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3425            0xFF, 0xFF, 0xFF, 0x85,
3426        ];
3427        let mut data = Vec::new();
3428        data.extend_from_slice(&row1);
3429        data.extend_from_slice(&row2);
3430        let mut cursor = AvroCursor::new(&data);
3431        decoder.decode(&mut cursor).unwrap();
3432        decoder.decode(&mut cursor).unwrap();
3433        let arr = decoder.flush(None).unwrap();
3434        #[cfg(feature = "small_decimals")]
3435        {
3436            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3437            assert_eq!(dec.len(), 2);
3438            assert_eq!(dec.value_as_string(0), "123.45");
3439            assert_eq!(dec.value_as_string(1), "-1.23");
3440        }
3441        #[cfg(not(feature = "small_decimals"))]
3442        {
3443            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3444            assert_eq!(dec.len(), 2);
3445            assert_eq!(dec.value_as_string(0), "123.45");
3446            assert_eq!(dec.value_as_string(1), "-1.23");
3447        }
3448    }
3449
3450    #[test]
3451    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3452        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3453        let mut decoder = Decoder::try_new(&dt).unwrap();
3454        let row1 = [
3455            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3456            0x30, 0x39,
3457        ];
3458        let row2 = [
3459            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3460            0xFF, 0x85,
3461        ];
3462        let mut data = Vec::new();
3463        data.extend_from_slice(&row1);
3464        data.extend_from_slice(&row2);
3465        let mut cursor = AvroCursor::new(&data);
3466        decoder.decode(&mut cursor).unwrap();
3467        decoder.decode(&mut cursor).unwrap();
3468
3469        let arr = decoder.flush(None).unwrap();
3470        #[cfg(feature = "small_decimals")]
3471        {
3472            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3473            assert_eq!(dec.len(), 2);
3474            assert_eq!(dec.value_as_string(0), "123.45");
3475            assert_eq!(dec.value_as_string(1), "-1.23");
3476        }
3477        #[cfg(not(feature = "small_decimals"))]
3478        {
3479            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3480            assert_eq!(dec.len(), 2);
3481            assert_eq!(dec.value_as_string(0), "123.45");
3482            assert_eq!(dec.value_as_string(1), "-1.23");
3483        }
3484    }
3485
3486    #[test]
3487    fn test_decimal_decoding_bytes_with_nulls() {
3488        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3489        let inner = Decoder::try_new(&dt).unwrap();
3490        let mut decoder = Decoder::Nullable(
3491            Nullability::NullSecond,
3492            NullBufferBuilder::new(DEFAULT_CAPACITY),
3493            Box::new(inner),
3494            NullablePlan::ReadTag,
3495        );
3496        let mut data = Vec::new();
3497        data.extend_from_slice(&encode_avro_int(0));
3498        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3499        data.extend_from_slice(&encode_avro_int(1));
3500        data.extend_from_slice(&encode_avro_int(0));
3501        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3502        let mut cursor = AvroCursor::new(&data);
3503        decoder.decode(&mut cursor).unwrap();
3504        decoder.decode(&mut cursor).unwrap();
3505        decoder.decode(&mut cursor).unwrap();
3506        let arr = decoder.flush(None).unwrap();
3507        #[cfg(feature = "small_decimals")]
3508        {
3509            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3510            assert_eq!(dec_arr.len(), 3);
3511            assert!(dec_arr.is_valid(0));
3512            assert!(!dec_arr.is_valid(1));
3513            assert!(dec_arr.is_valid(2));
3514            assert_eq!(dec_arr.value_as_string(0), "123.4");
3515            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3516        }
3517        #[cfg(not(feature = "small_decimals"))]
3518        {
3519            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3520            assert_eq!(dec_arr.len(), 3);
3521            assert!(dec_arr.is_valid(0));
3522            assert!(!dec_arr.is_valid(1));
3523            assert!(dec_arr.is_valid(2));
3524            assert_eq!(dec_arr.value_as_string(0), "123.4");
3525            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3526        }
3527    }
3528
3529    #[test]
3530    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3531        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3532        let inner = Decoder::try_new(&dt).unwrap();
3533        let mut decoder = Decoder::Nullable(
3534            Nullability::NullSecond,
3535            NullBufferBuilder::new(DEFAULT_CAPACITY),
3536            Box::new(inner),
3537            NullablePlan::ReadTag,
3538        );
3539        let row1 = [
3540            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3541            0xE2, 0x40,
3542        ];
3543        let row3 = [
3544            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3545            0x1D, 0xC0,
3546        ];
3547        let mut data = Vec::new();
3548        data.extend_from_slice(&encode_avro_int(0));
3549        data.extend_from_slice(&row1);
3550        data.extend_from_slice(&encode_avro_int(1));
3551        data.extend_from_slice(&encode_avro_int(0));
3552        data.extend_from_slice(&row3);
3553        let mut cursor = AvroCursor::new(&data);
3554        decoder.decode(&mut cursor).unwrap();
3555        decoder.decode(&mut cursor).unwrap();
3556        decoder.decode(&mut cursor).unwrap();
3557        let arr = decoder.flush(None).unwrap();
3558        #[cfg(feature = "small_decimals")]
3559        {
3560            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3561            assert_eq!(dec_arr.len(), 3);
3562            assert!(dec_arr.is_valid(0));
3563            assert!(!dec_arr.is_valid(1));
3564            assert!(dec_arr.is_valid(2));
3565            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3566            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3567        }
3568        #[cfg(not(feature = "small_decimals"))]
3569        {
3570            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3571            assert_eq!(dec_arr.len(), 3);
3572            assert!(dec_arr.is_valid(0));
3573            assert!(!dec_arr.is_valid(1));
3574            assert!(dec_arr.is_valid(2));
3575            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3576            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3577        }
3578    }
3579
3580    #[test]
3581    fn test_enum_decoding() {
3582        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3583        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3584        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3585        let mut data = Vec::new();
3586        data.extend_from_slice(&encode_avro_int(2));
3587        data.extend_from_slice(&encode_avro_int(0));
3588        data.extend_from_slice(&encode_avro_int(1));
3589        let mut cursor = AvroCursor::new(&data);
3590        decoder.decode(&mut cursor).unwrap();
3591        decoder.decode(&mut cursor).unwrap();
3592        decoder.decode(&mut cursor).unwrap();
3593        let array = decoder.flush(None).unwrap();
3594        let dict_array = array
3595            .as_any()
3596            .downcast_ref::<DictionaryArray<Int32Type>>()
3597            .unwrap();
3598        assert_eq!(dict_array.len(), 3);
3599        let values = dict_array
3600            .values()
3601            .as_any()
3602            .downcast_ref::<StringArray>()
3603            .unwrap();
3604        assert_eq!(values.value(0), "A");
3605        assert_eq!(values.value(1), "B");
3606        assert_eq!(values.value(2), "C");
3607        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3608    }
3609
3610    #[test]
3611    fn test_enum_decoding_with_nulls() {
3612        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3613        let enum_codec = Codec::Enum(symbols.clone());
3614        let avro_type =
3615            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3616        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3617        let mut data = Vec::new();
3618        data.extend_from_slice(&encode_avro_long(1));
3619        data.extend_from_slice(&encode_avro_int(1));
3620        data.extend_from_slice(&encode_avro_long(0));
3621        data.extend_from_slice(&encode_avro_long(1));
3622        data.extend_from_slice(&encode_avro_int(0));
3623        let mut cursor = AvroCursor::new(&data);
3624        decoder.decode(&mut cursor).unwrap();
3625        decoder.decode(&mut cursor).unwrap();
3626        decoder.decode(&mut cursor).unwrap();
3627        let array = decoder.flush(None).unwrap();
3628        let dict_array = array
3629            .as_any()
3630            .downcast_ref::<DictionaryArray<Int32Type>>()
3631            .unwrap();
3632        assert_eq!(dict_array.len(), 3);
3633        assert!(dict_array.is_valid(0));
3634        assert!(dict_array.is_null(1));
3635        assert!(dict_array.is_valid(2));
3636        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3637        assert_eq!(dict_array.keys(), &expected_keys);
3638        let values = dict_array
3639            .values()
3640            .as_any()
3641            .downcast_ref::<StringArray>()
3642            .unwrap();
3643        assert_eq!(values.value(0), "X");
3644        assert_eq!(values.value(1), "Y");
3645    }
3646
3647    #[test]
3648    fn test_duration_decoding_with_nulls() {
3649        let duration_codec = Codec::Interval;
3650        let avro_type = AvroDataType::new(
3651            duration_codec,
3652            Default::default(),
3653            Some(Nullability::NullFirst),
3654        );
3655        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3656        let mut data = Vec::new();
3657        // First value: 1 month, 2 days, 3 millis
3658        data.extend_from_slice(&encode_avro_long(1)); // not null
3659        let mut duration1 = Vec::new();
3660        duration1.extend_from_slice(&1u32.to_le_bytes());
3661        duration1.extend_from_slice(&2u32.to_le_bytes());
3662        duration1.extend_from_slice(&3u32.to_le_bytes());
3663        data.extend_from_slice(&duration1);
3664        // Second value: null
3665        data.extend_from_slice(&encode_avro_long(0)); // null
3666        data.extend_from_slice(&encode_avro_long(1)); // not null
3667        let mut duration2 = Vec::new();
3668        duration2.extend_from_slice(&4u32.to_le_bytes());
3669        duration2.extend_from_slice(&5u32.to_le_bytes());
3670        duration2.extend_from_slice(&6u32.to_le_bytes());
3671        data.extend_from_slice(&duration2);
3672        let mut cursor = AvroCursor::new(&data);
3673        decoder.decode(&mut cursor).unwrap();
3674        decoder.decode(&mut cursor).unwrap();
3675        decoder.decode(&mut cursor).unwrap();
3676        let array = decoder.flush(None).unwrap();
3677        let interval_array = array
3678            .as_any()
3679            .downcast_ref::<IntervalMonthDayNanoArray>()
3680            .unwrap();
3681        assert_eq!(interval_array.len(), 3);
3682        assert!(interval_array.is_valid(0));
3683        assert!(interval_array.is_null(1));
3684        assert!(interval_array.is_valid(2));
3685        let expected = IntervalMonthDayNanoArray::from(vec![
3686            Some(IntervalMonthDayNano {
3687                months: 1,
3688                days: 2,
3689                nanoseconds: 3_000_000,
3690            }),
3691            None,
3692            Some(IntervalMonthDayNano {
3693                months: 4,
3694                days: 5,
3695                nanoseconds: 6_000_000,
3696            }),
3697        ]);
3698        assert_eq!(interval_array, &expected);
3699    }
3700
3701    #[cfg(feature = "avro_custom_types")]
3702    #[test]
3703    fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3704        let avro_type = AvroDataType::new(
3705            Codec::IntervalMonthDayNano,
3706            Default::default(),
3707            Some(Nullability::NullFirst),
3708        );
3709        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3710        let mut data = Vec::new();
3711        // First value: months=1, days=-2, nanos=3
3712        data.extend_from_slice(&encode_avro_long(1));
3713        data.extend_from_slice(&1i32.to_le_bytes());
3714        data.extend_from_slice(&(-2i32).to_le_bytes());
3715        data.extend_from_slice(&3i64.to_le_bytes());
3716        // Second value: null
3717        data.extend_from_slice(&encode_avro_long(0));
3718        // Third value: months=-4, days=5, nanos=-6
3719        data.extend_from_slice(&encode_avro_long(1));
3720        data.extend_from_slice(&(-4i32).to_le_bytes());
3721        data.extend_from_slice(&5i32.to_le_bytes());
3722        data.extend_from_slice(&(-6i64).to_le_bytes());
3723        let mut cursor = AvroCursor::new(&data);
3724        decoder.decode(&mut cursor).unwrap();
3725        decoder.decode(&mut cursor).unwrap();
3726        decoder.decode(&mut cursor).unwrap();
3727        let array = decoder.flush(None).unwrap();
3728        let interval_array = array
3729            .as_any()
3730            .downcast_ref::<IntervalMonthDayNanoArray>()
3731            .unwrap();
3732        assert_eq!(interval_array.len(), 3);
3733        let expected = IntervalMonthDayNanoArray::from(vec![
3734            Some(IntervalMonthDayNano::new(1, -2, 3)),
3735            None,
3736            Some(IntervalMonthDayNano::new(-4, 5, -6)),
3737        ]);
3738        assert_eq!(interval_array, &expected);
3739    }
3740
3741    #[test]
3742    fn test_duration_decoding_empty() {
3743        let duration_codec = Codec::Interval;
3744        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3745        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3746        let array = decoder.flush(None).unwrap();
3747        assert_eq!(array.len(), 0);
3748    }
3749
3750    #[test]
3751    #[cfg(feature = "avro_custom_types")]
3752    fn test_duration_seconds_decoding() {
3753        let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3754        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3755        let mut data = Vec::new();
3756        // Three values: 0, -1, 2
3757        data.extend_from_slice(&encode_avro_long(0));
3758        data.extend_from_slice(&encode_avro_long(-1));
3759        data.extend_from_slice(&encode_avro_long(2));
3760        let mut cursor = AvroCursor::new(&data);
3761        decoder.decode(&mut cursor).unwrap();
3762        decoder.decode(&mut cursor).unwrap();
3763        decoder.decode(&mut cursor).unwrap();
3764        let array = decoder.flush(None).unwrap();
3765        let dur = array
3766            .as_any()
3767            .downcast_ref::<DurationSecondArray>()
3768            .unwrap();
3769        assert_eq!(dur.values(), &[0, -1, 2]);
3770    }
3771
3772    #[test]
3773    #[cfg(feature = "avro_custom_types")]
3774    fn test_duration_milliseconds_decoding() {
3775        let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3776        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3777        let mut data = Vec::new();
3778        for v in [1i64, 0, -2] {
3779            data.extend_from_slice(&encode_avro_long(v));
3780        }
3781        let mut cursor = AvroCursor::new(&data);
3782        for _ in 0..3 {
3783            decoder.decode(&mut cursor).unwrap();
3784        }
3785        let array = decoder.flush(None).unwrap();
3786        let dur = array
3787            .as_any()
3788            .downcast_ref::<DurationMillisecondArray>()
3789            .unwrap();
3790        assert_eq!(dur.values(), &[1, 0, -2]);
3791    }
3792
3793    #[test]
3794    #[cfg(feature = "avro_custom_types")]
3795    fn test_duration_microseconds_decoding() {
3796        let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3797        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3798        let mut data = Vec::new();
3799        for v in [5i64, -6, 7] {
3800            data.extend_from_slice(&encode_avro_long(v));
3801        }
3802        let mut cursor = AvroCursor::new(&data);
3803        for _ in 0..3 {
3804            decoder.decode(&mut cursor).unwrap();
3805        }
3806        let array = decoder.flush(None).unwrap();
3807        let dur = array
3808            .as_any()
3809            .downcast_ref::<DurationMicrosecondArray>()
3810            .unwrap();
3811        assert_eq!(dur.values(), &[5, -6, 7]);
3812    }
3813
3814    #[test]
3815    #[cfg(feature = "avro_custom_types")]
3816    fn test_duration_nanoseconds_decoding() {
3817        let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3818        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3819        let mut data = Vec::new();
3820        for v in [8i64, 9, -10] {
3821            data.extend_from_slice(&encode_avro_long(v));
3822        }
3823        let mut cursor = AvroCursor::new(&data);
3824        for _ in 0..3 {
3825            decoder.decode(&mut cursor).unwrap();
3826        }
3827        let array = decoder.flush(None).unwrap();
3828        let dur = array
3829            .as_any()
3830            .downcast_ref::<DurationNanosecondArray>()
3831            .unwrap();
3832        assert_eq!(dur.values(), &[8, 9, -10]);
3833    }
3834
3835    #[test]
3836    fn test_nullable_decode_error_bitmap_corruption() {
3837        // Nullable Int32 with ['T','null'] encoding (NullSecond)
3838        let avro_type = AvroDataType::new(
3839            Codec::Int32,
3840            Default::default(),
3841            Some(Nullability::NullSecond),
3842        );
3843        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3844
3845        // Row 1: union branch 1 (null)
3846        let mut row1 = Vec::new();
3847        row1.extend_from_slice(&encode_avro_int(1));
3848
3849        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
3850        let mut row2 = Vec::new();
3851        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
3852
3853        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
3854        let mut row3 = Vec::new();
3855        row3.extend_from_slice(&encode_avro_int(0)); // branch
3856        row3.extend_from_slice(&encode_avro_int(42)); // actual value
3857
3858        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3859        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
3860        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3861
3862        let array = decoder.flush(None).unwrap();
3863
3864        // Should contain 2 elements: row1 (null) and row3 (42)
3865        assert_eq!(array.len(), 2);
3866        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3867        assert!(int_array.is_null(0)); // row1 is null
3868        assert_eq!(int_array.value(1), 42); // row3 value is 42
3869    }
3870
3871    #[test]
3872    fn test_enum_mapping_reordered_symbols() {
3873        let reader_symbols: Arc<[String]> =
3874            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3875        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3876        let default_index: i32 = -1;
3877        let mut dec = Decoder::Enum(
3878            Vec::with_capacity(DEFAULT_CAPACITY),
3879            reader_symbols.clone(),
3880            Some(EnumResolution {
3881                mapping,
3882                default_index,
3883            }),
3884        );
3885        let mut data = Vec::new();
3886        data.extend_from_slice(&encode_avro_int(0));
3887        data.extend_from_slice(&encode_avro_int(1));
3888        data.extend_from_slice(&encode_avro_int(2));
3889        let mut cur = AvroCursor::new(&data);
3890        dec.decode(&mut cur).unwrap();
3891        dec.decode(&mut cur).unwrap();
3892        dec.decode(&mut cur).unwrap();
3893        let arr = dec.flush(None).unwrap();
3894        let dict = arr
3895            .as_any()
3896            .downcast_ref::<DictionaryArray<Int32Type>>()
3897            .unwrap();
3898        let expected_keys = Int32Array::from(vec![2, 0, 1]);
3899        assert_eq!(dict.keys(), &expected_keys);
3900        let values = dict
3901            .values()
3902            .as_any()
3903            .downcast_ref::<StringArray>()
3904            .unwrap();
3905        assert_eq!(values.value(0), "B");
3906        assert_eq!(values.value(1), "C");
3907        assert_eq!(values.value(2), "A");
3908    }
3909
3910    #[test]
3911    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3912        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3913        let default_index: i32 = 1;
3914        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3915        let mut dec = Decoder::Enum(
3916            Vec::with_capacity(DEFAULT_CAPACITY),
3917            reader_symbols.clone(),
3918            Some(EnumResolution {
3919                mapping,
3920                default_index,
3921            }),
3922        );
3923        let mut data = Vec::new();
3924        data.extend_from_slice(&encode_avro_int(0));
3925        data.extend_from_slice(&encode_avro_int(1));
3926        data.extend_from_slice(&encode_avro_int(99));
3927        let mut cur = AvroCursor::new(&data);
3928        dec.decode(&mut cur).unwrap();
3929        dec.decode(&mut cur).unwrap();
3930        dec.decode(&mut cur).unwrap();
3931        let arr = dec.flush(None).unwrap();
3932        let dict = arr
3933            .as_any()
3934            .downcast_ref::<DictionaryArray<Int32Type>>()
3935            .unwrap();
3936        let expected_keys = Int32Array::from(vec![0, 1, 1]);
3937        assert_eq!(dict.keys(), &expected_keys);
3938        let values = dict
3939            .values()
3940            .as_any()
3941            .downcast_ref::<StringArray>()
3942            .unwrap();
3943        assert_eq!(values.value(0), "A");
3944        assert_eq!(values.value(1), "B");
3945    }
3946
3947    #[test]
3948    fn test_enum_mapping_unknown_symbol_without_default_errors() {
3949        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3950        let default_index: i32 = -1; // indicates no default at type-level
3951        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3952        let mut dec = Decoder::Enum(
3953            Vec::with_capacity(DEFAULT_CAPACITY),
3954            reader_symbols,
3955            Some(EnumResolution {
3956                mapping,
3957                default_index,
3958            }),
3959        );
3960        let data = encode_avro_int(0);
3961        let mut cur = AvroCursor::new(&data);
3962        let err = dec
3963            .decode(&mut cur)
3964            .expect_err("expected decode error for unresolved enum without default");
3965        let msg = err.to_string();
3966        assert!(
3967            msg.contains("not resolvable") && msg.contains("no default"),
3968            "unexpected error message: {msg}"
3969        );
3970    }
3971
3972    fn make_record_resolved_decoder(
3973        reader_fields: &[(&str, DataType, bool)],
3974        writer_to_reader: Vec<Option<usize>>,
3975        skip_decoders: Vec<Option<Skipper>>,
3976    ) -> Decoder {
3977        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3978        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3979        for (name, dt, nullable) in reader_fields {
3980            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3981            let enc = match dt {
3982                DataType::Int32 => Decoder::Int32(Vec::new()),
3983                DataType::Int64 => Decoder::Int64(Vec::new()),
3984                DataType::Utf8 => {
3985                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3986                }
3987                other => panic!("Unsupported test reader field type: {other:?}"),
3988            };
3989            encodings.push(enc);
3990        }
3991        let fields: Fields = field_refs.into();
3992        Decoder::Record(
3993            fields,
3994            encodings,
3995            Some(Projector {
3996                writer_to_reader: Arc::from(writer_to_reader),
3997                skip_decoders,
3998                field_defaults: vec![None; reader_fields.len()],
3999                default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4000            }),
4001        )
4002    }
4003
4004    #[test]
4005    fn test_skip_writer_trailing_field_int32() {
4006        let mut dec = make_record_resolved_decoder(
4007            &[("id", arrow_schema::DataType::Int32, false)],
4008            vec![Some(0), None],
4009            vec![None, Some(super::Skipper::Int32)],
4010        );
4011        let mut data = Vec::new();
4012        data.extend_from_slice(&encode_avro_int(7));
4013        data.extend_from_slice(&encode_avro_int(999));
4014        let mut cur = AvroCursor::new(&data);
4015        dec.decode(&mut cur).unwrap();
4016        assert_eq!(cur.position(), data.len());
4017        let arr = dec.flush(None).unwrap();
4018        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
4019        assert_eq!(struct_arr.len(), 1);
4020        let id = struct_arr
4021            .column_by_name("id")
4022            .unwrap()
4023            .as_any()
4024            .downcast_ref::<Int32Array>()
4025            .unwrap();
4026        assert_eq!(id.value(0), 7);
4027    }
4028
4029    #[test]
4030    fn test_skip_writer_middle_field_string() {
4031        let mut dec = make_record_resolved_decoder(
4032            &[
4033                ("id", DataType::Int32, false),
4034                ("score", DataType::Int64, false),
4035            ],
4036            vec![Some(0), None, Some(1)],
4037            vec![None, Some(Skipper::String), None],
4038        );
4039        let mut data = Vec::new();
4040        data.extend_from_slice(&encode_avro_int(42));
4041        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
4042        data.extend_from_slice(&encode_avro_long(1000));
4043        let mut cur = AvroCursor::new(&data);
4044        dec.decode(&mut cur).unwrap();
4045        assert_eq!(cur.position(), data.len());
4046        let arr = dec.flush(None).unwrap();
4047        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4048        let id = s
4049            .column_by_name("id")
4050            .unwrap()
4051            .as_any()
4052            .downcast_ref::<Int32Array>()
4053            .unwrap();
4054        let score = s
4055            .column_by_name("score")
4056            .unwrap()
4057            .as_any()
4058            .downcast_ref::<Int64Array>()
4059            .unwrap();
4060        assert_eq!(id.value(0), 42);
4061        assert_eq!(score.value(0), 1000);
4062    }
4063
4064    #[test]
4065    fn test_skip_writer_array_with_negative_block_count_fast() {
4066        let mut dec = make_record_resolved_decoder(
4067            &[("id", DataType::Int32, false)],
4068            vec![None, Some(0)],
4069            vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
4070        );
4071        let mut array_payload = Vec::new();
4072        array_payload.extend_from_slice(&encode_avro_int(1));
4073        array_payload.extend_from_slice(&encode_avro_int(2));
4074        array_payload.extend_from_slice(&encode_avro_int(3));
4075        let mut data = Vec::new();
4076        data.extend_from_slice(&encode_avro_long(-3));
4077        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
4078        data.extend_from_slice(&array_payload);
4079        data.extend_from_slice(&encode_avro_long(0));
4080        data.extend_from_slice(&encode_avro_int(5));
4081        let mut cur = AvroCursor::new(&data);
4082        dec.decode(&mut cur).unwrap();
4083        assert_eq!(cur.position(), data.len());
4084        let arr = dec.flush(None).unwrap();
4085        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4086        let id = s
4087            .column_by_name("id")
4088            .unwrap()
4089            .as_any()
4090            .downcast_ref::<Int32Array>()
4091            .unwrap();
4092        assert_eq!(id.len(), 1);
4093        assert_eq!(id.value(0), 5);
4094    }
4095
4096    #[test]
4097    fn test_skip_writer_map_with_negative_block_count_fast() {
4098        let mut dec = make_record_resolved_decoder(
4099            &[("id", DataType::Int32, false)],
4100            vec![None, Some(0)],
4101            vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
4102        );
4103        let mut entries = Vec::new();
4104        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
4105        entries.extend_from_slice(&encode_avro_int(10));
4106        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
4107        entries.extend_from_slice(&encode_avro_int(20));
4108        let mut data = Vec::new();
4109        data.extend_from_slice(&encode_avro_long(-2));
4110        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
4111        data.extend_from_slice(&entries);
4112        data.extend_from_slice(&encode_avro_long(0));
4113        data.extend_from_slice(&encode_avro_int(123));
4114        let mut cur = AvroCursor::new(&data);
4115        dec.decode(&mut cur).unwrap();
4116        assert_eq!(cur.position(), data.len());
4117        let arr = dec.flush(None).unwrap();
4118        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4119        let id = s
4120            .column_by_name("id")
4121            .unwrap()
4122            .as_any()
4123            .downcast_ref::<Int32Array>()
4124            .unwrap();
4125        assert_eq!(id.len(), 1);
4126        assert_eq!(id.value(0), 123);
4127    }
4128
4129    #[test]
4130    fn test_skip_writer_nullable_field_union_nullfirst() {
4131        let mut dec = make_record_resolved_decoder(
4132            &[("id", DataType::Int32, false)],
4133            vec![None, Some(0)],
4134            vec![
4135                Some(super::Skipper::Nullable(
4136                    Nullability::NullFirst,
4137                    Box::new(super::Skipper::Int32),
4138                )),
4139                None,
4140            ],
4141        );
4142        let mut row1 = Vec::new();
4143        row1.extend_from_slice(&encode_avro_long(0));
4144        row1.extend_from_slice(&encode_avro_int(5));
4145        let mut row2 = Vec::new();
4146        row2.extend_from_slice(&encode_avro_long(1));
4147        row2.extend_from_slice(&encode_avro_int(123));
4148        row2.extend_from_slice(&encode_avro_int(7));
4149        let mut cur1 = AvroCursor::new(&row1);
4150        let mut cur2 = AvroCursor::new(&row2);
4151        dec.decode(&mut cur1).unwrap();
4152        dec.decode(&mut cur2).unwrap();
4153        assert_eq!(cur1.position(), row1.len());
4154        assert_eq!(cur2.position(), row2.len());
4155        let arr = dec.flush(None).unwrap();
4156        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4157        let id = s
4158            .column_by_name("id")
4159            .unwrap()
4160            .as_any()
4161            .downcast_ref::<Int32Array>()
4162            .unwrap();
4163        assert_eq!(id.len(), 2);
4164        assert_eq!(id.value(0), 5);
4165        assert_eq!(id.value(1), 7);
4166    }
4167
4168    fn make_dense_union_avro(
4169        children: Vec<(Codec, &'_ str, DataType)>,
4170        type_ids: Vec<i8>,
4171    ) -> AvroDataType {
4172        let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
4173        let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
4174        for (codec, name, dt) in children.into_iter() {
4175            avro_children.push(AvroDataType::new(codec, Default::default(), None));
4176            fields.push(arrow_schema::Field::new(name, dt, true));
4177        }
4178        let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
4179        let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
4180        AvroDataType::new(union_codec, Default::default(), None)
4181    }
4182
4183    #[test]
4184    fn test_union_dense_two_children_custom_type_ids() {
4185        let union_dt = make_dense_union_avro(
4186            vec![
4187                (Codec::Int32, "i", DataType::Int32),
4188                (Codec::Utf8, "s", DataType::Utf8),
4189            ],
4190            vec![2, 5],
4191        );
4192        let mut dec = Decoder::try_new(&union_dt).unwrap();
4193        let mut r1 = Vec::new();
4194        r1.extend_from_slice(&encode_avro_long(0));
4195        r1.extend_from_slice(&encode_avro_int(7));
4196        let mut r2 = Vec::new();
4197        r2.extend_from_slice(&encode_avro_long(1));
4198        r2.extend_from_slice(&encode_avro_bytes(b"x"));
4199        let mut r3 = Vec::new();
4200        r3.extend_from_slice(&encode_avro_long(0));
4201        r3.extend_from_slice(&encode_avro_int(-1));
4202        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4203        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4204        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4205        let array = dec.flush(None).unwrap();
4206        let ua = array
4207            .as_any()
4208            .downcast_ref::<UnionArray>()
4209            .expect("expected UnionArray");
4210        assert_eq!(ua.len(), 3);
4211        assert_eq!(ua.type_id(0), 2);
4212        assert_eq!(ua.type_id(1), 5);
4213        assert_eq!(ua.type_id(2), 2);
4214        assert_eq!(ua.value_offset(0), 0);
4215        assert_eq!(ua.value_offset(1), 0);
4216        assert_eq!(ua.value_offset(2), 1);
4217        let int_child = ua
4218            .child(2)
4219            .as_any()
4220            .downcast_ref::<Int32Array>()
4221            .expect("int child");
4222        assert_eq!(int_child.len(), 2);
4223        assert_eq!(int_child.value(0), 7);
4224        assert_eq!(int_child.value(1), -1);
4225        let str_child = ua
4226            .child(5)
4227            .as_any()
4228            .downcast_ref::<StringArray>()
4229            .expect("string child");
4230        assert_eq!(str_child.len(), 1);
4231        assert_eq!(str_child.value(0), "x");
4232    }
4233
4234    #[test]
4235    fn test_union_dense_with_null_and_string_children() {
4236        let union_dt = make_dense_union_avro(
4237            vec![
4238                (Codec::Null, "n", DataType::Null),
4239                (Codec::Utf8, "s", DataType::Utf8),
4240            ],
4241            vec![42, 7],
4242        );
4243        let mut dec = Decoder::try_new(&union_dt).unwrap();
4244        let r1 = encode_avro_long(0);
4245        let mut r2 = Vec::new();
4246        r2.extend_from_slice(&encode_avro_long(1));
4247        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
4248        let r3 = encode_avro_long(0);
4249        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4250        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4251        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4252        let array = dec.flush(None).unwrap();
4253        let ua = array
4254            .as_any()
4255            .downcast_ref::<UnionArray>()
4256            .expect("expected UnionArray");
4257        assert_eq!(ua.len(), 3);
4258        assert_eq!(ua.type_id(0), 42);
4259        assert_eq!(ua.type_id(1), 7);
4260        assert_eq!(ua.type_id(2), 42);
4261        assert_eq!(ua.value_offset(0), 0);
4262        assert_eq!(ua.value_offset(1), 0);
4263        assert_eq!(ua.value_offset(2), 1);
4264        let null_child = ua
4265            .child(42)
4266            .as_any()
4267            .downcast_ref::<NullArray>()
4268            .expect("null child");
4269        assert_eq!(null_child.len(), 2);
4270        let str_child = ua
4271            .child(7)
4272            .as_any()
4273            .downcast_ref::<StringArray>()
4274            .expect("string child");
4275        assert_eq!(str_child.len(), 1);
4276        assert_eq!(str_child.value(0), "abc");
4277    }
4278
4279    #[test]
4280    fn test_union_decode_negative_branch_index_errors() {
4281        let union_dt = make_dense_union_avro(
4282            vec![
4283                (Codec::Int32, "i", DataType::Int32),
4284                (Codec::Utf8, "s", DataType::Utf8),
4285            ],
4286            vec![0, 1],
4287        );
4288        let mut dec = Decoder::try_new(&union_dt).unwrap();
4289        let row = encode_avro_long(-1); // decodes back to -1
4290        let err = dec
4291            .decode(&mut AvroCursor::new(&row))
4292            .expect_err("expected error for negative branch index");
4293        let msg = err.to_string();
4294        assert!(
4295            msg.contains("Negative union branch index"),
4296            "unexpected error message: {msg}"
4297        );
4298    }
4299
4300    #[test]
4301    fn test_union_decode_out_of_range_branch_index_errors() {
4302        let union_dt = make_dense_union_avro(
4303            vec![
4304                (Codec::Int32, "i", DataType::Int32),
4305                (Codec::Utf8, "s", DataType::Utf8),
4306            ],
4307            vec![10, 11],
4308        );
4309        let mut dec = Decoder::try_new(&union_dt).unwrap();
4310        let row = encode_avro_long(2);
4311        let err = dec
4312            .decode(&mut AvroCursor::new(&row))
4313            .expect_err("expected error for out-of-range branch index");
4314        let msg = err.to_string();
4315        assert!(
4316            msg.contains("out of range"),
4317            "unexpected error message: {msg}"
4318        );
4319    }
4320
4321    #[test]
4322    fn test_union_sparse_mode_not_supported() {
4323        let children: Vec<AvroDataType> = vec![
4324            AvroDataType::new(Codec::Int32, Default::default(), None),
4325            AvroDataType::new(Codec::Utf8, Default::default(), None),
4326        ];
4327        let uf = UnionFields::try_new(
4328            vec![1, 3],
4329            vec![
4330                arrow_schema::Field::new("i", DataType::Int32, true),
4331                arrow_schema::Field::new("s", DataType::Utf8, true),
4332            ],
4333        )
4334        .unwrap();
4335        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
4336        let dt = AvroDataType::new(codec, Default::default(), None);
4337        let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
4338        let msg = err.to_string();
4339        assert!(
4340            msg.contains("Sparse Arrow unions are not yet supported"),
4341            "unexpected error message: {msg}"
4342        );
4343    }
4344
4345    fn make_record_decoder_with_projector_defaults(
4346        reader_fields: &[(&str, DataType, bool)],
4347        field_defaults: Vec<Option<AvroLiteral>>,
4348        default_injections: Vec<(usize, AvroLiteral)>,
4349        writer_to_reader_len: usize,
4350    ) -> Decoder {
4351        assert_eq!(
4352            field_defaults.len(),
4353            reader_fields.len(),
4354            "field_defaults must have one entry per reader field"
4355        );
4356        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4357        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4358        for (name, dt, nullable) in reader_fields {
4359            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4360            let enc = match dt {
4361                DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
4362                DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
4363                DataType::Utf8 => Decoder::String(
4364                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4365                    Vec::with_capacity(DEFAULT_CAPACITY),
4366                ),
4367                other => panic!("Unsupported test field type in helper: {other:?}"),
4368            };
4369            encodings.push(enc);
4370        }
4371        let fields: Fields = field_refs.into();
4372        let skip_decoders: Vec<Option<Skipper>> =
4373            (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
4374        let projector = Projector {
4375            writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
4376            skip_decoders,
4377            field_defaults,
4378            default_injections: Arc::from(default_injections),
4379        };
4380        Decoder::Record(fields, encodings, Some(projector))
4381    }
4382
4383    #[cfg(feature = "avro_custom_types")]
4384    #[test]
4385    fn test_default_append_custom_integer_range_validation() {
4386        let mut d_i8 = Decoder::Int8(Vec::with_capacity(DEFAULT_CAPACITY));
4387        d_i8.append_default(&AvroLiteral::Int(i8::MIN as i32))
4388            .unwrap();
4389        d_i8.append_default(&AvroLiteral::Int(i8::MAX as i32))
4390            .unwrap();
4391        let err_i8_high = d_i8
4392            .append_default(&AvroLiteral::Int(i8::MAX as i32 + 1))
4393            .unwrap_err();
4394        assert!(err_i8_high.to_string().contains("out of range for i8"));
4395        let err_i8_low = d_i8
4396            .append_default(&AvroLiteral::Int(i8::MIN as i32 - 1))
4397            .unwrap_err();
4398        assert!(err_i8_low.to_string().contains("out of range for i8"));
4399        let arr_i8 = d_i8.flush(None).unwrap();
4400        let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4401        assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4402
4403        let mut d_i16 = Decoder::Int16(Vec::with_capacity(DEFAULT_CAPACITY));
4404        d_i16
4405            .append_default(&AvroLiteral::Int(i16::MIN as i32))
4406            .unwrap();
4407        d_i16
4408            .append_default(&AvroLiteral::Int(i16::MAX as i32))
4409            .unwrap();
4410        let err_i16_high = d_i16
4411            .append_default(&AvroLiteral::Int(i16::MAX as i32 + 1))
4412            .unwrap_err();
4413        assert!(err_i16_high.to_string().contains("out of range for i16"));
4414        let err_i16_low = d_i16
4415            .append_default(&AvroLiteral::Int(i16::MIN as i32 - 1))
4416            .unwrap_err();
4417        assert!(err_i16_low.to_string().contains("out of range for i16"));
4418        let arr_i16 = d_i16.flush(None).unwrap();
4419        let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4420        assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4421
4422        let mut d_u8 = Decoder::UInt8(Vec::with_capacity(DEFAULT_CAPACITY));
4423        d_u8.append_default(&AvroLiteral::Int(0)).unwrap();
4424        d_u8.append_default(&AvroLiteral::Int(u8::MAX as i32))
4425            .unwrap();
4426        let err_u8_neg = d_u8.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4427        assert!(err_u8_neg.to_string().contains("out of range for u8"));
4428        let err_u8_high = d_u8
4429            .append_default(&AvroLiteral::Int(u8::MAX as i32 + 1))
4430            .unwrap_err();
4431        assert!(err_u8_high.to_string().contains("out of range for u8"));
4432        let arr_u8 = d_u8.flush(None).unwrap();
4433        let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4434        assert_eq!(values_u8.values(), &[0, u8::MAX]);
4435
4436        let mut d_u16 = Decoder::UInt16(Vec::with_capacity(DEFAULT_CAPACITY));
4437        d_u16.append_default(&AvroLiteral::Int(0)).unwrap();
4438        d_u16
4439            .append_default(&AvroLiteral::Int(u16::MAX as i32))
4440            .unwrap();
4441        let err_u16_neg = d_u16.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4442        assert!(err_u16_neg.to_string().contains("out of range for u16"));
4443        let err_u16_high = d_u16
4444            .append_default(&AvroLiteral::Int(u16::MAX as i32 + 1))
4445            .unwrap_err();
4446        assert!(err_u16_high.to_string().contains("out of range for u16"));
4447        let arr_u16 = d_u16.flush(None).unwrap();
4448        let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4449        assert_eq!(values_u16.values(), &[0, u16::MAX]);
4450
4451        let mut d_u32 = Decoder::UInt32(Vec::with_capacity(DEFAULT_CAPACITY));
4452        d_u32.append_default(&AvroLiteral::Long(0)).unwrap();
4453        d_u32
4454            .append_default(&AvroLiteral::Long(u32::MAX as i64))
4455            .unwrap();
4456        let err_u32_neg = d_u32.append_default(&AvroLiteral::Long(-1)).unwrap_err();
4457        assert!(err_u32_neg.to_string().contains("out of range for u32"));
4458        let err_u32_high = d_u32
4459            .append_default(&AvroLiteral::Long(u32::MAX as i64 + 1))
4460            .unwrap_err();
4461        assert!(err_u32_high.to_string().contains("out of range for u32"));
4462        let arr_u32 = d_u32.flush(None).unwrap();
4463        let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4464        assert_eq!(values_u32.values(), &[0, u32::MAX]);
4465    }
4466
4467    #[cfg(feature = "avro_custom_types")]
4468    #[test]
4469    fn test_decode_custom_integer_range_validation() {
4470        let mut d_i8 = Decoder::try_new(&avro_from_codec(Codec::Int8)).unwrap();
4471        d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32)))
4472            .unwrap();
4473        d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32)))
4474            .unwrap();
4475        let err_i8_high = d_i8
4476            .decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32 + 1)))
4477            .unwrap_err();
4478        assert!(err_i8_high.to_string().contains("out of range for i8"));
4479        let err_i8_low = d_i8
4480            .decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32 - 1)))
4481            .unwrap_err();
4482        assert!(err_i8_low.to_string().contains("out of range for i8"));
4483        let arr_i8 = d_i8.flush(None).unwrap();
4484        let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4485        assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4486
4487        let mut d_i16 = Decoder::try_new(&avro_from_codec(Codec::Int16)).unwrap();
4488        d_i16
4489            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32)))
4490            .unwrap();
4491        d_i16
4492            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32)))
4493            .unwrap();
4494        let err_i16_high = d_i16
4495            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32 + 1)))
4496            .unwrap_err();
4497        assert!(err_i16_high.to_string().contains("out of range for i16"));
4498        let err_i16_low = d_i16
4499            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32 - 1)))
4500            .unwrap_err();
4501        assert!(err_i16_low.to_string().contains("out of range for i16"));
4502        let arr_i16 = d_i16.flush(None).unwrap();
4503        let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4504        assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4505
4506        let mut d_u8 = Decoder::try_new(&avro_from_codec(Codec::UInt8)).unwrap();
4507        d_u8.decode(&mut AvroCursor::new(&encode_avro_int(0)))
4508            .unwrap();
4509        d_u8.decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32)))
4510            .unwrap();
4511        let err_u8_neg = d_u8
4512            .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4513            .unwrap_err();
4514        assert!(err_u8_neg.to_string().contains("out of range for u8"));
4515        let err_u8_high = d_u8
4516            .decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32 + 1)))
4517            .unwrap_err();
4518        assert!(err_u8_high.to_string().contains("out of range for u8"));
4519        let arr_u8 = d_u8.flush(None).unwrap();
4520        let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4521        assert_eq!(values_u8.values(), &[0, u8::MAX]);
4522
4523        let mut d_u16 = Decoder::try_new(&avro_from_codec(Codec::UInt16)).unwrap();
4524        d_u16
4525            .decode(&mut AvroCursor::new(&encode_avro_int(0)))
4526            .unwrap();
4527        d_u16
4528            .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32)))
4529            .unwrap();
4530        let err_u16_neg = d_u16
4531            .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4532            .unwrap_err();
4533        assert!(err_u16_neg.to_string().contains("out of range for u16"));
4534        let err_u16_high = d_u16
4535            .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32 + 1)))
4536            .unwrap_err();
4537        assert!(err_u16_high.to_string().contains("out of range for u16"));
4538        let arr_u16 = d_u16.flush(None).unwrap();
4539        let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4540        assert_eq!(values_u16.values(), &[0, u16::MAX]);
4541
4542        let mut d_u32 = Decoder::try_new(&avro_from_codec(Codec::UInt32)).unwrap();
4543        d_u32
4544            .decode(&mut AvroCursor::new(&encode_avro_long(0)))
4545            .unwrap();
4546        d_u32
4547            .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64)))
4548            .unwrap();
4549        let err_u32_neg = d_u32
4550            .decode(&mut AvroCursor::new(&encode_avro_long(-1)))
4551            .unwrap_err();
4552        assert!(err_u32_neg.to_string().contains("out of range for u32"));
4553        let err_u32_high = d_u32
4554            .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64 + 1)))
4555            .unwrap_err();
4556        assert!(err_u32_high.to_string().contains("out of range for u32"));
4557        let arr_u32 = d_u32.flush(None).unwrap();
4558        let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4559        assert_eq!(values_u32.values(), &[0, u32::MAX]);
4560    }
4561
4562    #[test]
4563    fn test_default_append_int32_and_int64_from_int_and_long() {
4564        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4565        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
4566        let arr = d_i32.flush(None).unwrap();
4567        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4568        assert_eq!(a.len(), 1);
4569        assert_eq!(a.value(0), 42);
4570        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
4571        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
4572        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
4573        let arr64 = d_i64.flush(None).unwrap();
4574        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
4575        assert_eq!(a64.len(), 2);
4576        assert_eq!(a64.value(0), 5);
4577        assert_eq!(a64.value(1), 7);
4578    }
4579
4580    #[test]
4581    fn test_default_append_floats_and_doubles() {
4582        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
4583        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
4584        let arr32 = d_f32.flush(None).unwrap();
4585        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
4586        assert_eq!(a.value(0), 1.5);
4587        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4588        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
4589        let arr64 = d_f64.flush(None).unwrap();
4590        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
4591        assert_eq!(b.value(0), 2.25);
4592    }
4593
4594    #[test]
4595    fn test_default_append_string_and_bytes() {
4596        let mut d_str = Decoder::String(
4597            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4598            Vec::with_capacity(DEFAULT_CAPACITY),
4599        );
4600        d_str
4601            .append_default(&AvroLiteral::String("hi".into()))
4602            .unwrap();
4603        let s_arr = d_str.flush(None).unwrap();
4604        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
4605        assert_eq!(arr.value(0), "hi");
4606        let mut d_bytes = Decoder::Binary(
4607            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4608            Vec::with_capacity(DEFAULT_CAPACITY),
4609        );
4610        d_bytes
4611            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4612            .unwrap();
4613        let b_arr = d_bytes.flush(None).unwrap();
4614        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
4615        assert_eq!(barr.value(0), &[1, 2, 3]);
4616        let mut d_str_err = Decoder::String(
4617            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4618            Vec::with_capacity(DEFAULT_CAPACITY),
4619        );
4620        let err = d_str_err
4621            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
4622            .unwrap_err();
4623        assert!(
4624            err.to_string()
4625                .contains("Default for string must be string"),
4626            "unexpected error: {err:?}"
4627        );
4628    }
4629
4630    #[test]
4631    fn test_default_append_nullable_int32_null_and_value() {
4632        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4633        let mut dec = Decoder::Nullable(
4634            Nullability::NullFirst,
4635            NullBufferBuilder::new(DEFAULT_CAPACITY),
4636            Box::new(inner),
4637            NullablePlan::ReadTag,
4638        );
4639        dec.append_default(&AvroLiteral::Null).unwrap();
4640        dec.append_default(&AvroLiteral::Int(11)).unwrap();
4641        let arr = dec.flush(None).unwrap();
4642        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4643        assert_eq!(a.len(), 2);
4644        assert!(a.is_null(0));
4645        assert_eq!(a.value(1), 11);
4646    }
4647
4648    #[test]
4649    fn test_default_append_array_of_ints() {
4650        let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4651        let mut d = Decoder::try_new(&list_dt).unwrap();
4652        let items = vec![
4653            AvroLiteral::Int(1),
4654            AvroLiteral::Int(2),
4655            AvroLiteral::Int(3),
4656        ];
4657        d.append_default(&AvroLiteral::Array(items)).unwrap();
4658        let arr = d.flush(None).unwrap();
4659        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4660        assert_eq!(list.len(), 1);
4661        assert_eq!(list.value_length(0), 3);
4662        let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4663        assert_eq!(vals.values(), &[1, 2, 3]);
4664    }
4665
4666    #[test]
4667    fn test_default_append_map_string_to_int() {
4668        let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4669        let mut d = Decoder::try_new(&map_dt).unwrap();
4670        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4671        m.insert("k1".to_string(), AvroLiteral::Int(10));
4672        m.insert("k2".to_string(), AvroLiteral::Int(20));
4673        d.append_default(&AvroLiteral::Map(m)).unwrap();
4674        let arr = d.flush(None).unwrap();
4675        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4676        assert_eq!(map.len(), 1);
4677        assert_eq!(map.value_length(0), 2);
4678        let binding = map.value(0);
4679        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4680        let k = entries
4681            .column_by_name("key")
4682            .unwrap()
4683            .as_any()
4684            .downcast_ref::<StringArray>()
4685            .unwrap();
4686        let v = entries
4687            .column_by_name("value")
4688            .unwrap()
4689            .as_any()
4690            .downcast_ref::<Int32Array>()
4691            .unwrap();
4692        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4693        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4694        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4695        assert_eq!(vals, [10, 20].into_iter().collect());
4696    }
4697
4698    #[test]
4699    fn test_default_append_enum_by_symbol() {
4700        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4701        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4702        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4703        let arr = d.flush(None).unwrap();
4704        let dict = arr
4705            .as_any()
4706            .downcast_ref::<DictionaryArray<Int32Type>>()
4707            .unwrap();
4708        assert_eq!(dict.len(), 1);
4709        let expected = Int32Array::from(vec![1]);
4710        assert_eq!(dict.keys(), &expected);
4711        let values = dict
4712            .values()
4713            .as_any()
4714            .downcast_ref::<StringArray>()
4715            .unwrap();
4716        assert_eq!(values.value(1), "B");
4717    }
4718
4719    #[test]
4720    fn test_default_append_uuid_and_type_error() {
4721        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4722        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4723        d.append_default(&AvroLiteral::String(uuid_str.into()))
4724            .unwrap();
4725        let arr_ref = d.flush(None).unwrap();
4726        let arr = arr_ref
4727            .as_any()
4728            .downcast_ref::<FixedSizeBinaryArray>()
4729            .unwrap();
4730        assert_eq!(arr.value_length(), 16);
4731        assert_eq!(arr.len(), 1);
4732        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4733        let err = d2
4734            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4735            .unwrap_err();
4736        assert!(
4737            err.to_string().contains("Default for uuid must be string"),
4738            "unexpected error: {err:?}"
4739        );
4740    }
4741
4742    #[test]
4743    fn test_default_append_fixed_and_length_mismatch() {
4744        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4745        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4746            .unwrap();
4747        let arr_ref = d.flush(None).unwrap();
4748        let arr = arr_ref
4749            .as_any()
4750            .downcast_ref::<FixedSizeBinaryArray>()
4751            .unwrap();
4752        assert_eq!(arr.value_length(), 4);
4753        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4754        let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4755        let err = d_err
4756            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4757            .unwrap_err();
4758        assert!(
4759            err.to_string().contains("Fixed default length"),
4760            "unexpected error: {err:?}"
4761        );
4762    }
4763
4764    #[test]
4765    fn test_default_append_duration_and_length_validation() {
4766        let dt = avro_from_codec(Codec::Interval);
4767        let mut d = Decoder::try_new(&dt).unwrap();
4768        let mut bytes = Vec::with_capacity(12);
4769        bytes.extend_from_slice(&1u32.to_le_bytes());
4770        bytes.extend_from_slice(&2u32.to_le_bytes());
4771        bytes.extend_from_slice(&3u32.to_le_bytes());
4772        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4773        let arr_ref = d.flush(None).unwrap();
4774        let arr = arr_ref
4775            .as_any()
4776            .downcast_ref::<IntervalMonthDayNanoArray>()
4777            .unwrap();
4778        assert_eq!(arr.len(), 1);
4779        let v = arr.value(0);
4780        assert_eq!(v.months, 1);
4781        assert_eq!(v.days, 2);
4782        assert_eq!(v.nanoseconds, 3_000_000);
4783        let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4784        let err = d_err
4785            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4786            .unwrap_err();
4787        assert!(
4788            err.to_string()
4789                .contains("Duration default must be exactly 12 bytes"),
4790            "unexpected error: {err:?}"
4791        );
4792    }
4793
4794    #[test]
4795    fn test_default_append_decimal256_from_bytes() {
4796        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4797        let mut d = Decoder::try_new(&dt).unwrap();
4798        let pos: [u8; 32] = [
4799            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4800            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4801            0x00, 0x00, 0x30, 0x39,
4802        ];
4803        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4804        let neg: [u8; 32] = [
4805            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4806            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4807            0xFF, 0xFF, 0xFF, 0x85,
4808        ];
4809        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4810        let arr = d.flush(None).unwrap();
4811        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4812        assert_eq!(dec.len(), 2);
4813        assert_eq!(dec.value_as_string(0), "123.45");
4814        assert_eq!(dec.value_as_string(1), "-1.23");
4815    }
4816
4817    #[test]
4818    fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4819        let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4820        let mut rec = make_record_decoder_with_projector_defaults(
4821            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4822            field_defaults,
4823            vec![],
4824            0,
4825        );
4826        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4827        map.insert("a".to_string(), AvroLiteral::Int(7));
4828        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4829        let arr = rec.flush(None).unwrap();
4830        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4831        let a = s
4832            .column_by_name("a")
4833            .unwrap()
4834            .as_any()
4835            .downcast_ref::<Int32Array>()
4836            .unwrap();
4837        let b = s
4838            .column_by_name("b")
4839            .unwrap()
4840            .as_any()
4841            .downcast_ref::<StringArray>()
4842            .unwrap();
4843        assert_eq!(a.value(0), 7);
4844        assert_eq!(b.value(0), "hi");
4845    }
4846
4847    #[test]
4848    fn test_record_append_default_null_uses_projector_field_defaults() {
4849        let field_defaults = vec![
4850            Some(AvroLiteral::Int(5)),
4851            Some(AvroLiteral::String("x".into())),
4852        ];
4853        let mut rec = make_record_decoder_with_projector_defaults(
4854            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4855            field_defaults,
4856            vec![],
4857            0,
4858        );
4859        rec.append_default(&AvroLiteral::Null).unwrap();
4860        let arr = rec.flush(None).unwrap();
4861        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4862        let a = s
4863            .column_by_name("a")
4864            .unwrap()
4865            .as_any()
4866            .downcast_ref::<Int32Array>()
4867            .unwrap();
4868        let b = s
4869            .column_by_name("b")
4870            .unwrap()
4871            .as_any()
4872            .downcast_ref::<StringArray>()
4873            .unwrap();
4874        assert_eq!(a.value(0), 5);
4875        assert_eq!(b.value(0), "x");
4876    }
4877
4878    #[test]
4879    fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
4880     {
4881        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
4882        let mut field_refs: Vec<FieldRef> = Vec::new();
4883        let mut encoders: Vec<Decoder> = Vec::new();
4884        for (name, dt, nullable) in &fields {
4885            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4886        }
4887        let enc_a = Decoder::Nullable(
4888            Nullability::NullSecond,
4889            NullBufferBuilder::new(DEFAULT_CAPACITY),
4890            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
4891            NullablePlan::ReadTag,
4892        );
4893        let enc_b = Decoder::Nullable(
4894            Nullability::NullSecond,
4895            NullBufferBuilder::new(DEFAULT_CAPACITY),
4896            Box::new(Decoder::String(
4897                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4898                Vec::with_capacity(DEFAULT_CAPACITY),
4899            )),
4900            NullablePlan::ReadTag,
4901        );
4902        encoders.push(enc_a);
4903        encoders.push(enc_b);
4904        let projector = Projector {
4905            writer_to_reader: Arc::from(vec![]),
4906            skip_decoders: vec![],
4907            field_defaults: vec![None, None], // no defaults -> append_null
4908            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4909        };
4910        let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
4911        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4912        map.insert("a".to_string(), AvroLiteral::Int(9));
4913        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4914        let arr = rec.flush(None).unwrap();
4915        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4916        let a = s
4917            .column_by_name("a")
4918            .unwrap()
4919            .as_any()
4920            .downcast_ref::<Int32Array>()
4921            .unwrap();
4922        let b = s
4923            .column_by_name("b")
4924            .unwrap()
4925            .as_any()
4926            .downcast_ref::<StringArray>()
4927            .unwrap();
4928        assert!(a.is_valid(0));
4929        assert_eq!(a.value(0), 9);
4930        assert!(b.is_null(0));
4931    }
4932
4933    #[test]
4934    fn test_projector_default_injection_when_writer_lacks_fields() {
4935        let defaults = vec![None, None];
4936        let injections = vec![
4937            (0, AvroLiteral::Int(99)),
4938            (1, AvroLiteral::String("alice".into())),
4939        ];
4940        let mut rec = make_record_decoder_with_projector_defaults(
4941            &[
4942                ("id", DataType::Int32, false),
4943                ("name", DataType::Utf8, false),
4944            ],
4945            defaults,
4946            injections,
4947            0,
4948        );
4949        rec.decode(&mut AvroCursor::new(&[])).unwrap();
4950        let arr = rec.flush(None).unwrap();
4951        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4952        let id = s
4953            .column_by_name("id")
4954            .unwrap()
4955            .as_any()
4956            .downcast_ref::<Int32Array>()
4957            .unwrap();
4958        let name = s
4959            .column_by_name("name")
4960            .unwrap()
4961            .as_any()
4962            .downcast_ref::<StringArray>()
4963            .unwrap();
4964        assert_eq!(id.value(0), 99);
4965        assert_eq!(name.value(0), "alice");
4966    }
4967
4968    #[test]
4969    fn union_type_ids_are_not_child_indexes() {
4970        let encodings: Vec<AvroDataType> =
4971            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4972        let fields: UnionFields = [
4973            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4974            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4975        ]
4976        .into_iter()
4977        .collect();
4978        let dt = avro_from_codec(Codec::Union(
4979            encodings.into(),
4980            fields.clone(),
4981            UnionMode::Dense,
4982        ));
4983        let mut dec = Decoder::try_new(&dt).expect("decoder");
4984        let mut b1 = encode_avro_long(1);
4985        b1.extend(encode_avro_bytes("hi".as_bytes()));
4986        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4987        let mut b0 = encode_avro_long(0);
4988        b0.extend(encode_avro_int(5));
4989        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4990        let arr = dec.flush(None).expect("flush");
4991        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4992        assert_eq!(ua.len(), 2);
4993        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4994        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4995        assert_eq!(ua.value_offset(0), 0);
4996        assert_eq!(ua.value_offset(1), 0);
4997        let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4998        assert_eq!(utf8_child.len(), 1);
4999        assert_eq!(utf8_child.value(0), "hi");
5000        let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
5001        assert_eq!(int_child.len(), 1);
5002        assert_eq!(int_child.value(0), 5);
5003        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
5004        assert_eq!(type_ids, vec![42_i8, 7_i8]);
5005    }
5006
5007    #[cfg(feature = "avro_custom_types")]
5008    #[test]
5009    fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
5010        for codec in [
5011            Codec::DurationNanos,
5012            Codec::DurationMicros,
5013            Codec::DurationMillis,
5014            Codec::DurationSeconds,
5015        ] {
5016            let dt = make_avro_dt(codec.clone(), None);
5017            let s = Skipper::from_avro(&dt)?;
5018            match s {
5019                Skipper::Int64 => {}
5020                other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
5021            }
5022        }
5023        Ok(())
5024    }
5025
5026    #[cfg(feature = "avro_custom_types")]
5027    #[test]
5028    fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
5029        let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
5030        for codec in [
5031            Codec::DurationNanos,
5032            Codec::DurationMicros,
5033            Codec::DurationMillis,
5034            Codec::DurationSeconds,
5035        ] {
5036            let dt = make_avro_dt(codec.clone(), None);
5037            let mut s = Skipper::from_avro(&dt)?;
5038            for &v in &values {
5039                let bytes = encode_avro_long(v);
5040                let mut cursor = AvroCursor::new(&bytes);
5041                s.skip(&mut cursor)?;
5042                assert_eq!(
5043                    cursor.position(),
5044                    bytes.len(),
5045                    "did not consume all bytes for {:?} value {}",
5046                    codec,
5047                    v
5048                );
5049            }
5050        }
5051        Ok(())
5052    }
5053
5054    #[cfg(feature = "avro_custom_types")]
5055    #[test]
5056    fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
5057        let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
5058        let mut s = Skipper::from_avro(&dt)?;
5059        match &s {
5060            Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
5061                Skipper::Int64 => {}
5062                ref other => panic!("expected inner Int64, got {:?}", other),
5063            },
5064            other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
5065        }
5066        {
5067            let buf = encode_vlq_u64(0);
5068            let mut cursor = AvroCursor::new(&buf);
5069            s.skip(&mut cursor)?;
5070            assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
5071        }
5072        {
5073            let mut buf = encode_vlq_u64(1);
5074            buf.extend(encode_avro_long(0));
5075            let mut cursor = AvroCursor::new(&buf);
5076            s.skip(&mut cursor)?;
5077            assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
5078        }
5079
5080        Ok(())
5081    }
5082
5083    #[cfg(feature = "avro_custom_types")]
5084    #[test]
5085    fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
5086        let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
5087        let mut s = Skipper::from_avro(&dt)?;
5088        match &s {
5089            Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
5090                Skipper::Int64 => {}
5091                ref other => panic!("expected inner Int64, got {:?}", other),
5092            },
5093            other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
5094        }
5095        {
5096            let buf = encode_vlq_u64(1);
5097            let mut cursor = AvroCursor::new(&buf);
5098            s.skip(&mut cursor)?;
5099            assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
5100        }
5101        {
5102            let mut buf = encode_vlq_u64(0);
5103            buf.extend(encode_avro_long(-1));
5104            let mut cursor = AvroCursor::new(&buf);
5105            s.skip(&mut cursor)?;
5106            assert_eq!(
5107                cursor.position(),
5108                1 + encode_avro_long(-1).len(),
5109                "expected to consume tag=0 + long(-1)"
5110            );
5111        }
5112        Ok(())
5113    }
5114
5115    #[test]
5116    fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
5117        let dt = make_avro_dt(Codec::Interval, None);
5118        let mut s = Skipper::from_avro(&dt)?;
5119        match s {
5120            Skipper::DurationFixed12 => {}
5121            other => panic!("expected DurationFixed12, got {:?}", other),
5122        }
5123        let payload = vec![0u8; 12];
5124        let mut cursor = AvroCursor::new(&payload);
5125        s.skip(&mut cursor)?;
5126        assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
5127        Ok(())
5128    }
5129
5130    #[cfg(feature = "avro_custom_types")]
5131    #[test]
5132    fn test_run_end_encoded_width16_int32_basic_grouping() {
5133        use arrow_array::RunArray;
5134        use std::sync::Arc;
5135        let inner = avro_from_codec(Codec::Int32);
5136        let ree = AvroDataType::new(
5137            Codec::RunEndEncoded(Arc::new(inner), 16),
5138            Default::default(),
5139            None,
5140        );
5141        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5142        for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
5143            let bytes = encode_avro_int(v);
5144            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5145        }
5146        let arr = dec.flush(None).expect("flush");
5147        let ra = arr
5148            .as_any()
5149            .downcast_ref::<RunArray<Int16Type>>()
5150            .expect("RunArray<Int16Type>");
5151        assert_eq!(ra.len(), 9);
5152        assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
5153        let vals = ra
5154            .values()
5155            .as_ref()
5156            .as_any()
5157            .downcast_ref::<Int32Array>()
5158            .expect("values Int32");
5159        assert_eq!(vals.values(), &[1, 2, 3]);
5160    }
5161
5162    #[cfg(feature = "avro_custom_types")]
5163    #[test]
5164    fn test_run_end_encoded_width32_nullable_values_group_nulls() {
5165        use arrow_array::RunArray;
5166        use std::sync::Arc;
5167        let inner = AvroDataType::new(
5168            Codec::Int32,
5169            Default::default(),
5170            Some(Nullability::NullSecond),
5171        );
5172        let ree = AvroDataType::new(
5173            Codec::RunEndEncoded(Arc::new(inner), 32),
5174            Default::default(),
5175            None,
5176        );
5177        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5178        let seq: [Option<i32>; 8] = [
5179            None,
5180            None,
5181            Some(7),
5182            Some(7),
5183            Some(7),
5184            None,
5185            Some(5),
5186            Some(5),
5187        ];
5188        for item in seq {
5189            let mut bytes = Vec::new();
5190            match item {
5191                None => bytes.extend_from_slice(&encode_vlq_u64(1)),
5192                Some(v) => {
5193                    bytes.extend_from_slice(&encode_vlq_u64(0));
5194                    bytes.extend_from_slice(&encode_avro_int(v));
5195                }
5196            }
5197            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5198        }
5199        let arr = dec.flush(None).expect("flush");
5200        let ra = arr
5201            .as_any()
5202            .downcast_ref::<RunArray<Int32Type>>()
5203            .expect("RunArray<Int32Type>");
5204        assert_eq!(ra.len(), 8);
5205        assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
5206        let vals = ra
5207            .values()
5208            .as_ref()
5209            .as_any()
5210            .downcast_ref::<Int32Array>()
5211            .expect("values Int32 (nullable)");
5212        assert_eq!(vals.len(), 4);
5213        assert!(vals.is_null(0));
5214        assert_eq!(vals.value(1), 7);
5215        assert!(vals.is_null(2));
5216        assert_eq!(vals.value(3), 5);
5217    }
5218
5219    #[cfg(feature = "avro_custom_types")]
5220    #[test]
5221    fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
5222        use arrow_array::RunArray;
5223        let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
5224        let ree = Decoder::RunEndEncoded(
5225            8, /* bytes => Int64 run-ends */
5226            0,
5227            Box::new(inner_values),
5228        );
5229        let mut dec = Decoder::Nullable(
5230            Nullability::NullSecond,
5231            NullBufferBuilder::new(DEFAULT_CAPACITY),
5232            Box::new(ree),
5233            NullablePlan::FromSingle {
5234                promotion: Promotion::IntToDouble,
5235            },
5236        );
5237        for v in [1, 1, 2, 2, 2] {
5238            let bytes = encode_avro_int(v);
5239            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5240        }
5241        let arr = dec.flush(None).expect("flush");
5242        let ra = arr
5243            .as_any()
5244            .downcast_ref::<RunArray<Int64Type>>()
5245            .expect("RunArray<Int64Type>");
5246        assert_eq!(ra.len(), 5);
5247        assert_eq!(ra.run_ends().values(), &[2, 5]);
5248        let vals = ra
5249            .values()
5250            .as_ref()
5251            .as_any()
5252            .downcast_ref::<Float64Array>()
5253            .expect("values Float64");
5254        assert_eq!(vals.values(), &[1.0, 2.0]);
5255    }
5256
5257    #[cfg(feature = "avro_custom_types")]
5258    #[test]
5259    fn test_run_end_encoded_unsupported_run_end_width_errors() {
5260        use std::sync::Arc;
5261        let inner = avro_from_codec(Codec::Int32);
5262        let dt = AvroDataType::new(
5263            Codec::RunEndEncoded(Arc::new(inner), 3),
5264            Default::default(),
5265            None,
5266        );
5267        let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
5268        let msg = err.to_string();
5269        assert!(
5270            msg.contains("Unsupported run-end width")
5271                && msg.contains("16/32/64 bits or 2/4/8 bytes"),
5272            "unexpected error message: {msg}"
5273        );
5274    }
5275
5276    #[cfg(feature = "avro_custom_types")]
5277    #[test]
5278    fn test_run_end_encoded_empty_input_is_empty_runarray() {
5279        use arrow_array::RunArray;
5280        use std::sync::Arc;
5281        let inner = avro_from_codec(Codec::Utf8);
5282        let dt = AvroDataType::new(
5283            Codec::RunEndEncoded(Arc::new(inner), 4),
5284            Default::default(),
5285            None,
5286        );
5287        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5288        let arr = dec.flush(None).expect("flush");
5289        let ra = arr
5290            .as_any()
5291            .downcast_ref::<RunArray<Int32Type>>()
5292            .expect("RunArray<Int32Type>");
5293        assert_eq!(ra.len(), 0);
5294        assert_eq!(ra.run_ends().len(), 0);
5295        assert_eq!(ra.values().len(), 0);
5296    }
5297
5298    #[cfg(feature = "avro_custom_types")]
5299    #[test]
5300    fn test_run_end_encoded_strings_grouping_width32_bits() {
5301        use arrow_array::RunArray;
5302        use std::sync::Arc;
5303        let inner = avro_from_codec(Codec::Utf8);
5304        let dt = AvroDataType::new(
5305            Codec::RunEndEncoded(Arc::new(inner), 32),
5306            Default::default(),
5307            None,
5308        );
5309        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5310        for s in ["a", "a", "bb", "bb", "bb", "a"] {
5311            let bytes = encode_avro_bytes(s.as_bytes());
5312            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5313        }
5314        let arr = dec.flush(None).expect("flush");
5315        let ra = arr
5316            .as_any()
5317            .downcast_ref::<RunArray<Int32Type>>()
5318            .expect("RunArray<Int32Type>");
5319        assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
5320        let vals = ra
5321            .values()
5322            .as_ref()
5323            .as_any()
5324            .downcast_ref::<StringArray>()
5325            .expect("values String");
5326        assert_eq!(vals.len(), 3);
5327        assert_eq!(vals.value(0), "a");
5328        assert_eq!(vals.value(1), "bb");
5329        assert_eq!(vals.value(2), "a");
5330    }
5331
5332    #[cfg(not(feature = "avro_custom_types"))]
5333    #[test]
5334    fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
5335        let dt = avro_from_codec(Codec::Int32);
5336        let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
5337        for v in [1, 2, 3] {
5338            let bytes = encode_avro_int(v);
5339            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5340        }
5341        let arr = dec.flush(None).expect("flush");
5342        let a = arr
5343            .as_any()
5344            .downcast_ref::<Int32Array>()
5345            .expect("Int32Array");
5346        assert_eq!(a.values(), &[1, 2, 3]);
5347    }
5348
5349    #[test]
5350    fn test_timestamp_nanos_decoding_utc() {
5351        let avro_type = avro_from_codec(Codec::TimestampNanos(true));
5352        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5353        let mut data = Vec::new();
5354        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5355            data.extend_from_slice(&encode_avro_long(v));
5356        }
5357        let mut cur = AvroCursor::new(&data);
5358        for _ in 0..4 {
5359            decoder.decode(&mut cur).expect("decode nanos ts");
5360        }
5361        let array = decoder.flush(None).expect("flush nanos ts");
5362        let ts = array
5363            .as_any()
5364            .downcast_ref::<TimestampNanosecondArray>()
5365            .expect("TimestampNanosecondArray");
5366        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5367        match ts.data_type() {
5368            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5369                assert_eq!(tz.as_deref(), Some("+00:00"));
5370            }
5371            other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
5372        }
5373    }
5374
5375    #[test]
5376    fn test_timestamp_nanos_decoding_local() {
5377        let avro_type = avro_from_codec(Codec::TimestampNanos(false));
5378        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5379        let mut data = Vec::new();
5380        for v in [10_i64, 20_i64, -30_i64] {
5381            data.extend_from_slice(&encode_avro_long(v));
5382        }
5383        let mut cur = AvroCursor::new(&data);
5384        for _ in 0..3 {
5385            decoder.decode(&mut cur).expect("decode nanos ts");
5386        }
5387        let array = decoder.flush(None).expect("flush nanos ts");
5388        let ts = array
5389            .as_any()
5390            .downcast_ref::<TimestampNanosecondArray>()
5391            .expect("TimestampNanosecondArray");
5392        assert_eq!(ts.values(), &[10, 20, -30]);
5393        match ts.data_type() {
5394            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5395                assert_eq!(tz.as_deref(), None);
5396            }
5397            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5398        }
5399    }
5400
5401    #[test]
5402    fn test_timestamp_nanos_decoding_with_nulls() {
5403        let avro_type = AvroDataType::new(
5404            Codec::TimestampNanos(false),
5405            Default::default(),
5406            Some(Nullability::NullFirst),
5407        );
5408        let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
5409        let mut data = Vec::new();
5410        data.extend_from_slice(&encode_avro_long(1));
5411        data.extend_from_slice(&encode_avro_long(42));
5412        data.extend_from_slice(&encode_avro_long(0));
5413        data.extend_from_slice(&encode_avro_long(1));
5414        data.extend_from_slice(&encode_avro_long(-7));
5415        let mut cur = AvroCursor::new(&data);
5416        for _ in 0..3 {
5417            decoder.decode(&mut cur).expect("decode nullable nanos ts");
5418        }
5419        let array = decoder.flush(None).expect("flush nullable nanos ts");
5420        let ts = array
5421            .as_any()
5422            .downcast_ref::<TimestampNanosecondArray>()
5423            .expect("TimestampNanosecondArray");
5424        assert_eq!(ts.len(), 3);
5425        assert!(ts.is_valid(0));
5426        assert!(ts.is_null(1));
5427        assert!(ts.is_valid(2));
5428        assert_eq!(ts.value(0), 42);
5429        assert_eq!(ts.value(2), -7);
5430        match ts.data_type() {
5431            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5432                assert_eq!(tz.as_deref(), None);
5433            }
5434            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5435        }
5436    }
5437}