Skip to main content

arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Decoder for Arrow types.
19
20use crate::codec::{
21    AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo, ResolvedField,
22    ResolvedRecord, ResolvedUnion, Tz,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36    DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37    Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use strum_macros::AsRefStr;
42use uuid::Uuid;
43
44use std::cmp::Ordering;
45use std::mem;
46use std::sync::Arc;
47
48const DEFAULT_CAPACITY: usize = 1024;
49
50/// Macro to decode a decimal payload for a given width and integer type.
51macro_rules! decode_decimal {
52    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
53        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
54        $builder.append_value(<$Int>::from_be_bytes(bytes));
55    }};
56}
57
58/// Macro to finish a decimal builder into an array with precision/scale and nulls.
59macro_rules! flush_decimal {
60    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
61        let (_, vals, _) = $builder.finish().into_parts();
62        let dec = <$ArrayTy>::try_new(vals, $nulls)?
63            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
64        Arc::new(dec) as ArrayRef
65    }};
66}
67
68/// Macro to append a default decimal value from two's-complement big-endian bytes
69/// into the corresponding decimal builder, with compile-time constructed error text.
70macro_rules! append_decimal_default {
71    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
72        match $lit {
73            AvroLiteral::Bytes(b) => {
74                let ext = sign_cast_to::<$N>(b)?;
75                let val = <$Int>::from_be_bytes(ext);
76                $builder.append_value(val);
77                Ok(())
78            }
79            _ => Err(AvroError::InvalidArgument(
80                concat!(
81                    "Default for ",
82                    $name,
83                    " must be bytes (two's-complement big-endian)"
84                )
85                .to_string(),
86            )),
87        }
88    }};
89}
90
91/// Decodes avro encoded data into [`RecordBatch`]
92#[derive(Debug)]
93pub(crate) struct RecordDecoder {
94    schema: SchemaRef,
95    fields: Vec<Decoder>,
96    projector: Option<Projector>,
97}
98
99impl RecordDecoder {
100    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
101    ///
102    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
103    ///
104    /// # Arguments
105    /// * `data_type` - The Avro data type to decode.
106    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
107    ///
108    /// # Errors
109    /// This function will return an error if the provided `data_type` is not a `Record`.
110    pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
111        match data_type.codec() {
112            Codec::Struct(reader_fields) => {
113                // Build Arrow schema fields and per-child decoders
114                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
115                let mut encodings = Vec::with_capacity(reader_fields.len());
116                let mut field_defaults = Vec::with_capacity(reader_fields.len());
117                for avro_field in reader_fields.iter() {
118                    arrow_fields.push(avro_field.field());
119                    encodings.push(Decoder::try_new(avro_field.data_type())?);
120
121                    if let Some(ResolutionInfo::DefaultValue(lit)) =
122                        avro_field.data_type().resolution.as_ref()
123                    {
124                        field_defaults.push(Some(lit.clone()));
125                    } else {
126                        field_defaults.push(None);
127                    }
128                }
129                let projector = match data_type.resolution.as_ref() {
130                    Some(ResolutionInfo::Record(rec)) => {
131                        Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
132                    }
133                    _ => None,
134                };
135                Ok(Self {
136                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
137                    fields: encodings,
138                    projector,
139                })
140            }
141            other => Err(AvroError::ParseError(format!(
142                "Expected record got {other:?}"
143            ))),
144        }
145    }
146
147    /// Returns the decoder's `SchemaRef`
148    pub(crate) fn schema(&self) -> &SchemaRef {
149        &self.schema
150    }
151
152    /// Decode `count` records from `buf`
153    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
154        let mut cursor = AvroCursor::new(buf);
155        match self.projector.as_mut() {
156            Some(proj) => {
157                for _ in 0..count {
158                    proj.project_record(&mut cursor, &mut self.fields)?;
159                }
160            }
161            None => {
162                for _ in 0..count {
163                    for field in &mut self.fields {
164                        field.decode(&mut cursor)?;
165                    }
166                }
167            }
168        }
169        Ok(cursor.position())
170    }
171
172    /// Flush the decoded records into a [`RecordBatch`]
173    pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
174        let arrays = self
175            .fields
176            .iter_mut()
177            .map(|x| x.flush(None))
178            .collect::<Result<Vec<_>, _>>()?;
179        RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
180    }
181}
182
183#[derive(Debug, AsRefStr)]
184enum Decoder {
185    Null(usize),
186    Boolean(BooleanBufferBuilder),
187    Int32(Vec<i32>),
188    Int64(Vec<i64>),
189    #[cfg(feature = "avro_custom_types")]
190    DurationSecond(Vec<i64>),
191    #[cfg(feature = "avro_custom_types")]
192    DurationMillisecond(Vec<i64>),
193    #[cfg(feature = "avro_custom_types")]
194    DurationMicrosecond(Vec<i64>),
195    #[cfg(feature = "avro_custom_types")]
196    DurationNanosecond(Vec<i64>),
197    #[cfg(feature = "avro_custom_types")]
198    Int8(Vec<i8>),
199    #[cfg(feature = "avro_custom_types")]
200    Int16(Vec<i16>),
201    #[cfg(feature = "avro_custom_types")]
202    UInt8(Vec<u8>),
203    #[cfg(feature = "avro_custom_types")]
204    UInt16(Vec<u16>),
205    #[cfg(feature = "avro_custom_types")]
206    UInt32(Vec<u32>),
207    #[cfg(feature = "avro_custom_types")]
208    UInt64(Vec<u64>),
209    #[cfg(feature = "avro_custom_types")]
210    Float16(Vec<u16>), // Stored as raw IEEE-754 f16 bits
211    #[cfg(feature = "avro_custom_types")]
212    Date64(Vec<i64>),
213    #[cfg(feature = "avro_custom_types")]
214    TimeNanos(Vec<i64>),
215    #[cfg(feature = "avro_custom_types")]
216    Time32Secs(Vec<i32>),
217    #[cfg(feature = "avro_custom_types")]
218    TimestampSecs(bool, Vec<i64>),
219    #[cfg(feature = "avro_custom_types")]
220    IntervalYearMonth(Vec<i32>),
221    #[cfg(feature = "avro_custom_types")]
222    IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
223    #[cfg(feature = "avro_custom_types")]
224    IntervalDayTime(Vec<IntervalDayTime>),
225    Float32(Vec<f32>),
226    Float64(Vec<f64>),
227    Date32(Vec<i32>),
228    TimeMillis(Vec<i32>),
229    TimeMicros(Vec<i64>),
230    TimestampMillis(Option<Tz>, Vec<i64>),
231    TimestampMicros(Option<Tz>, Vec<i64>),
232    TimestampNanos(Option<Tz>, Vec<i64>),
233    Int32ToInt64(Vec<i64>),
234    Int32ToFloat32(Vec<f32>),
235    Int32ToFloat64(Vec<f64>),
236    Int64ToFloat32(Vec<f32>),
237    Int64ToFloat64(Vec<f64>),
238    Float32ToFloat64(Vec<f64>),
239    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
240    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
241    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
242    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
243    String(OffsetBufferBuilder<i32>, Vec<u8>),
244    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
245    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
246    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
247    Record(
248        Fields,
249        Vec<Decoder>,
250        Vec<Option<AvroLiteral>>,
251        Option<Projector>,
252    ),
253    Map(
254        FieldRef,
255        OffsetBufferBuilder<i32>,
256        OffsetBufferBuilder<i32>,
257        Vec<u8>,
258        Box<Decoder>,
259    ),
260    Fixed(i32, Vec<u8>),
261    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
262    Duration(IntervalMonthDayNanoBuilder),
263    Uuid(Vec<u8>),
264    #[cfg(feature = "small_decimals")]
265    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
266    #[cfg(feature = "small_decimals")]
267    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
268    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
269    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
270    #[cfg(feature = "avro_custom_types")]
271    RunEndEncoded(u8, usize, Box<Decoder>),
272    Union(UnionDecoder),
273    Nullable(NullablePlan, NullBufferBuilder, Box<Decoder>),
274}
275
276impl Decoder {
277    fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
278        if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
279            if info.writer_is_union && !info.reader_is_union {
280                let mut clone = data_type.clone();
281                clone.resolution = None; // Build target base decoder without Union resolution
282                let target = Self::try_new_internal(&clone)?;
283                let decoder = Self::Union(
284                    UnionDecoderBuilder::new()
285                        .with_resolved_union(info.clone())
286                        .with_target(target)
287                        .build()?,
288                );
289                return Ok(decoder);
290            }
291        }
292        Self::try_new_internal(data_type)
293    }
294
295    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
296        // Extract just the Promotion (if any) to simplify pattern matching
297        let promotion = match data_type.resolution.as_ref() {
298            Some(ResolutionInfo::Promotion(p)) => Some(*p),
299            _ => None,
300        };
301        let decoder = match (data_type.codec(), promotion) {
302            (Codec::Int64, Some(Promotion::IntToLong)) => {
303                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
304            }
305            (Codec::Float32, Some(Promotion::IntToFloat)) => {
306                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
307            }
308            (Codec::Float64, Some(Promotion::IntToDouble)) => {
309                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
310            }
311            (Codec::Float32, Some(Promotion::LongToFloat)) => {
312                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
313            }
314            (Codec::Float64, Some(Promotion::LongToDouble)) => {
315                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
316            }
317            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
318                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
319            }
320            (Codec::Utf8, Some(Promotion::BytesToString))
321            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
322                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
323                Vec::with_capacity(DEFAULT_CAPACITY),
324            ),
325            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
326                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
327                Vec::with_capacity(DEFAULT_CAPACITY),
328            ),
329            (Codec::Null, _) => Self::Null(0),
330            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
331            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
332            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
333            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
334            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
335            (Codec::Binary, _) => Self::Binary(
336                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
337                Vec::with_capacity(DEFAULT_CAPACITY),
338            ),
339            (Codec::Utf8, _) => Self::String(
340                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
341                Vec::with_capacity(DEFAULT_CAPACITY),
342            ),
343            (Codec::Utf8View, _) => Self::StringView(
344                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
345                Vec::with_capacity(DEFAULT_CAPACITY),
346            ),
347            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
348            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
349            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
350            (Codec::TimestampMillis(tz), _) => {
351                Self::TimestampMillis(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
352            }
353            (Codec::TimestampMicros(tz), _) => {
354                Self::TimestampMicros(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
355            }
356            (Codec::TimestampNanos(tz), _) => {
357                Self::TimestampNanos(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
358            }
359            #[cfg(feature = "avro_custom_types")]
360            (Codec::DurationNanos, _) => {
361                Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
362            }
363            #[cfg(feature = "avro_custom_types")]
364            (Codec::DurationMicros, _) => {
365                Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
366            }
367            #[cfg(feature = "avro_custom_types")]
368            (Codec::DurationMillis, _) => {
369                Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
370            }
371            #[cfg(feature = "avro_custom_types")]
372            (Codec::DurationSeconds, _) => {
373                Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
374            }
375            #[cfg(feature = "avro_custom_types")]
376            (Codec::Int8, _) => Self::Int8(Vec::with_capacity(DEFAULT_CAPACITY)),
377            #[cfg(feature = "avro_custom_types")]
378            (Codec::Int16, _) => Self::Int16(Vec::with_capacity(DEFAULT_CAPACITY)),
379            #[cfg(feature = "avro_custom_types")]
380            (Codec::UInt8, _) => Self::UInt8(Vec::with_capacity(DEFAULT_CAPACITY)),
381            #[cfg(feature = "avro_custom_types")]
382            (Codec::UInt16, _) => Self::UInt16(Vec::with_capacity(DEFAULT_CAPACITY)),
383            #[cfg(feature = "avro_custom_types")]
384            (Codec::UInt32, _) => Self::UInt32(Vec::with_capacity(DEFAULT_CAPACITY)),
385            #[cfg(feature = "avro_custom_types")]
386            (Codec::UInt64, _) => Self::UInt64(Vec::with_capacity(DEFAULT_CAPACITY)),
387            #[cfg(feature = "avro_custom_types")]
388            (Codec::Float16, _) => Self::Float16(Vec::with_capacity(DEFAULT_CAPACITY)),
389            #[cfg(feature = "avro_custom_types")]
390            (Codec::Date64, _) => Self::Date64(Vec::with_capacity(DEFAULT_CAPACITY)),
391            #[cfg(feature = "avro_custom_types")]
392            (Codec::TimeNanos, _) => Self::TimeNanos(Vec::with_capacity(DEFAULT_CAPACITY)),
393            #[cfg(feature = "avro_custom_types")]
394            (Codec::Time32Secs, _) => Self::Time32Secs(Vec::with_capacity(DEFAULT_CAPACITY)),
395            #[cfg(feature = "avro_custom_types")]
396            (Codec::TimestampSecs(is_utc), _) => {
397                Self::TimestampSecs(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
398            }
399            #[cfg(feature = "avro_custom_types")]
400            (Codec::IntervalYearMonth, _) => {
401                Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
402            }
403            #[cfg(feature = "avro_custom_types")]
404            (Codec::IntervalMonthDayNano, _) => {
405                Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
406            }
407            #[cfg(feature = "avro_custom_types")]
408            (Codec::IntervalDayTime, _) => {
409                Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
410            }
411            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
412            (Codec::Decimal(precision, scale, size), _) => {
413                let p = *precision;
414                let s = *scale;
415                let prec = p as u8;
416                let scl = s.unwrap_or(0) as i8;
417                #[cfg(feature = "small_decimals")]
418                {
419                    if p <= DECIMAL32_MAX_PRECISION as usize {
420                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
421                            .with_precision_and_scale(prec, scl)?;
422                        Self::Decimal32(p, s, *size, builder)
423                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
424                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
425                            .with_precision_and_scale(prec, scl)?;
426                        Self::Decimal64(p, s, *size, builder)
427                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
428                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
429                            .with_precision_and_scale(prec, scl)?;
430                        Self::Decimal128(p, s, *size, builder)
431                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
432                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
433                            .with_precision_and_scale(prec, scl)?;
434                        Self::Decimal256(p, s, *size, builder)
435                    } else {
436                        return Err(AvroError::ParseError(format!(
437                            "Decimal precision {p} exceeds maximum supported"
438                        )));
439                    }
440                }
441                #[cfg(not(feature = "small_decimals"))]
442                {
443                    if p <= DECIMAL128_MAX_PRECISION as usize {
444                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
445                            .with_precision_and_scale(prec, scl)?;
446                        Self::Decimal128(p, s, *size, builder)
447                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
448                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
449                            .with_precision_and_scale(prec, scl)?;
450                        Self::Decimal256(p, s, *size, builder)
451                    } else {
452                        return Err(AvroError::ParseError(format!(
453                            "Decimal precision {p} exceeds maximum supported"
454                        )));
455                    }
456                }
457            }
458            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
459            (Codec::List(item), _) => {
460                let decoder = Self::try_new(item)?;
461                Self::Array(
462                    Arc::new(item.field_with_name("item")),
463                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
464                    Box::new(decoder),
465                )
466            }
467            (Codec::Enum(symbols), _) => {
468                let res = match data_type.resolution.as_ref() {
469                    Some(ResolutionInfo::EnumMapping(mapping)) => {
470                        Some(EnumResolution::new(mapping))
471                    }
472                    _ => None,
473                };
474                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
475            }
476            (Codec::Struct(fields), _) => {
477                let mut arrow_fields = Vec::with_capacity(fields.len());
478                let mut encodings = Vec::with_capacity(fields.len());
479                let mut field_defaults = Vec::with_capacity(fields.len());
480                for avro_field in fields.iter() {
481                    let encoding = Self::try_new(avro_field.data_type())?;
482                    arrow_fields.push(avro_field.field());
483                    encodings.push(encoding);
484
485                    if let Some(ResolutionInfo::DefaultValue(lit)) =
486                        avro_field.data_type().resolution.as_ref()
487                    {
488                        field_defaults.push(Some(lit.clone()));
489                    } else {
490                        field_defaults.push(None);
491                    }
492                }
493                let projector =
494                    if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
495                        Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
496                    } else {
497                        None
498                    };
499                Self::Record(arrow_fields.into(), encodings, field_defaults, projector)
500            }
501            (Codec::Map(child), _) => {
502                let val_field = child.field_with_name("value");
503                let map_field = Arc::new(ArrowField::new(
504                    "entries",
505                    DataType::Struct(Fields::from(vec![
506                        ArrowField::new("key", DataType::Utf8, false),
507                        val_field,
508                    ])),
509                    false,
510                ));
511                let val_dec = Self::try_new(child)?;
512                Self::Map(
513                    map_field,
514                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
515                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
516                    Vec::with_capacity(DEFAULT_CAPACITY),
517                    Box::new(val_dec),
518                )
519            }
520            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
521            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
522                let decoders = encodings
523                    .iter()
524                    .map(Self::try_new_internal)
525                    .collect::<Result<Vec<_>, _>>()?;
526                if fields.len() != decoders.len() {
527                    return Err(AvroError::SchemaError(format!(
528                        "Union has {} fields but {} decoders",
529                        fields.len(),
530                        decoders.len()
531                    )));
532                }
533                // Proactive guard: if a user provides a union with more branches than
534                // a 32-bit Avro index can address, fail fast with a clear message.
535                let branch_count = decoders.len();
536                let max_addr = (i32::MAX as usize) + 1;
537                if branch_count > max_addr {
538                    return Err(AvroError::SchemaError(format!(
539                        "Union has {branch_count} branches, which exceeds the maximum addressable \
540                         branches by an Avro int tag ({} + 1).",
541                        i32::MAX
542                    )));
543                }
544                let mut builder = UnionDecoderBuilder::new()
545                    .with_fields(fields.clone())
546                    .with_branches(decoders);
547                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
548                    if info.reader_is_union {
549                        builder = builder.with_resolved_union(info.clone());
550                    }
551                }
552                Self::Union(builder.build()?)
553            }
554            (Codec::Union(_, _, _), _) => {
555                return Err(AvroError::NYI(
556                    "Sparse Arrow unions are not yet supported".to_string(),
557                ));
558            }
559            #[cfg(feature = "avro_custom_types")]
560            (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
561                let inner = Self::try_new(values_dt)?;
562                let byte_width: u8 = match *width_bits_or_bytes {
563                    2 | 4 | 8 => *width_bits_or_bytes,
564                    16 => 2,
565                    32 => 4,
566                    64 => 8,
567                    other => {
568                        return Err(AvroError::InvalidArgument(format!(
569                            "Unsupported run-end width {other} for RunEndEncoded; \
570                             expected 16/32/64 bits or 2/4/8 bytes"
571                        )));
572                    }
573                };
574                Self::RunEndEncoded(byte_width, 0, Box::new(inner))
575            }
576        };
577        Ok(match data_type.nullability() {
578            Some(nullability) => {
579                // Default to reading a union branch tag unless the resolution directs otherwise.
580                let plan = match &data_type.resolution {
581                    None => NullablePlan::ReadTag {
582                        nullability,
583                        resolution: ResolutionPlan::Promotion(Promotion::Direct),
584                    },
585                    Some(ResolutionInfo::Promotion(_)) => {
586                        // Promotions should have been incorporated
587                        // into the inner decoder.
588                        NullablePlan::FromSingle {
589                            resolution: ResolutionPlan::Promotion(Promotion::Direct),
590                        }
591                    }
592                    Some(ResolutionInfo::Union(info)) if !info.writer_is_union => {
593                        let Some(Some((_, resolution))) = info.writer_to_reader.first() else {
594                            return Err(AvroError::SchemaError(
595                                "unexpected union resolution info for non-union writer and union reader type".into(),
596                            ));
597                        };
598                        let resolution = ResolutionPlan::try_new(&decoder, resolution)?;
599                        NullablePlan::FromSingle { resolution }
600                    }
601                    Some(ResolutionInfo::Union(info)) => {
602                        let Some((_, resolution)) =
603                            info.writer_to_reader[nullability.non_null_index()].as_ref()
604                        else {
605                            return Err(AvroError::SchemaError(
606                                "unexpected union resolution info for nullable writer type".into(),
607                            ));
608                        };
609                        NullablePlan::ReadTag {
610                            nullability,
611                            resolution: ResolutionPlan::try_new(&decoder, resolution)?,
612                        }
613                    }
614                    Some(resolution) => NullablePlan::FromSingle {
615                        resolution: ResolutionPlan::try_new(&decoder, resolution)?,
616                    },
617                };
618                Self::Nullable(
619                    plan,
620                    NullBufferBuilder::new(DEFAULT_CAPACITY),
621                    Box::new(decoder),
622                )
623            }
624            None => decoder,
625        })
626    }
627
628    /// Append a null record
629    fn append_null(&mut self) -> Result<(), AvroError> {
630        match self {
631            Self::Null(count) => *count += 1,
632            Self::Boolean(b) => b.append(false),
633            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
634            Self::Int64(v)
635            | Self::Int32ToInt64(v)
636            | Self::TimeMicros(v)
637            | Self::TimestampMillis(_, v)
638            | Self::TimestampMicros(_, v)
639            | Self::TimestampNanos(_, v) => v.push(0),
640            #[cfg(feature = "avro_custom_types")]
641            Self::DurationSecond(v)
642            | Self::DurationMillisecond(v)
643            | Self::DurationMicrosecond(v)
644            | Self::DurationNanosecond(v) => v.push(0),
645            #[cfg(feature = "avro_custom_types")]
646            Self::Int8(v) => v.push(0),
647            #[cfg(feature = "avro_custom_types")]
648            Self::Int16(v) => v.push(0),
649            #[cfg(feature = "avro_custom_types")]
650            Self::UInt8(v) => v.push(0),
651            #[cfg(feature = "avro_custom_types")]
652            Self::UInt16(v) => v.push(0),
653            #[cfg(feature = "avro_custom_types")]
654            Self::UInt32(v) => v.push(0),
655            #[cfg(feature = "avro_custom_types")]
656            Self::UInt64(v) => v.push(0),
657            #[cfg(feature = "avro_custom_types")]
658            Self::Float16(v) => v.push(0),
659            #[cfg(feature = "avro_custom_types")]
660            Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => v.push(0),
661            #[cfg(feature = "avro_custom_types")]
662            Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
663            #[cfg(feature = "avro_custom_types")]
664            Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
665            #[cfg(feature = "avro_custom_types")]
666            Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
667            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
668            Self::Float64(v)
669            | Self::Int32ToFloat64(v)
670            | Self::Int64ToFloat64(v)
671            | Self::Float32ToFloat64(v) => v.push(0.),
672            Self::Binary(offsets, _)
673            | Self::String(offsets, _)
674            | Self::StringView(offsets, _)
675            | Self::BytesToString(offsets, _)
676            | Self::StringToBytes(offsets, _) => {
677                offsets.push_length(0);
678            }
679            Self::Uuid(v) => {
680                v.extend([0; 16]);
681            }
682            Self::Array(_, offsets, _) => {
683                offsets.push_length(0);
684            }
685            Self::Record(_, e, _, _) => {
686                for encoding in e.iter_mut() {
687                    encoding.append_null()?;
688                }
689            }
690            Self::Map(_, _koff, moff, _, _) => {
691                moff.push_length(0);
692            }
693            Self::Fixed(sz, accum) => {
694                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
695            }
696            #[cfg(feature = "small_decimals")]
697            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
698            #[cfg(feature = "small_decimals")]
699            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
700            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
701            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
702            Self::Enum(indices, _, _) => indices.push(0),
703            Self::Duration(builder) => builder.append_null(),
704            #[cfg(feature = "avro_custom_types")]
705            Self::RunEndEncoded(_, len, inner) => {
706                *len += 1;
707                inner.append_null()?;
708            }
709            Self::Union(u) => u.append_null()?,
710            Self::Nullable(_, null_buffer, inner) => {
711                null_buffer.append(false);
712                inner.append_null()?;
713            }
714        }
715        Ok(())
716    }
717
718    /// Append a single default literal into the decoder's buffers
719    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
720        match self {
721            Self::Nullable(_, nb, inner) => {
722                if matches!(lit, AvroLiteral::Null) {
723                    nb.append(false);
724                    inner.append_null()
725                } else {
726                    nb.append(true);
727                    inner.append_default(lit)
728                }
729            }
730            Self::Null(count) => match lit {
731                AvroLiteral::Null => {
732                    *count += 1;
733                    Ok(())
734                }
735                _ => Err(AvroError::InvalidArgument(
736                    "Non-null default for null type".to_string(),
737                )),
738            },
739            Self::Boolean(b) => match lit {
740                AvroLiteral::Boolean(v) => {
741                    b.append(*v);
742                    Ok(())
743                }
744                _ => Err(AvroError::InvalidArgument(
745                    "Default for boolean must be boolean".to_string(),
746                )),
747            },
748            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
749                AvroLiteral::Int(i) => {
750                    v.push(*i);
751                    Ok(())
752                }
753                _ => Err(AvroError::InvalidArgument(
754                    "Default for int32/date32/time-millis must be int".to_string(),
755                )),
756            },
757            #[cfg(feature = "avro_custom_types")]
758            Self::DurationSecond(v)
759            | Self::DurationMillisecond(v)
760            | Self::DurationMicrosecond(v)
761            | Self::DurationNanosecond(v) => match lit {
762                AvroLiteral::Long(i) => {
763                    v.push(*i);
764                    Ok(())
765                }
766                _ => Err(AvroError::InvalidArgument(
767                    "Default for duration long must be long".to_string(),
768                )),
769            },
770            #[cfg(feature = "avro_custom_types")]
771            Self::Int8(v) => match lit {
772                AvroLiteral::Int(i) => {
773                    let x = i8::try_from(*i).map_err(|_| {
774                        AvroError::InvalidArgument(format!(
775                            "Default for int8 out of range for i8: {i}"
776                        ))
777                    })?;
778                    v.push(x);
779                    Ok(())
780                }
781                _ => Err(AvroError::InvalidArgument(
782                    "Default for int8 must be int".to_string(),
783                )),
784            },
785            #[cfg(feature = "avro_custom_types")]
786            Self::Int16(v) => match lit {
787                AvroLiteral::Int(i) => {
788                    let x = i16::try_from(*i).map_err(|_| {
789                        AvroError::InvalidArgument(format!(
790                            "Default for int16 out of range for i16: {i}"
791                        ))
792                    })?;
793                    v.push(x);
794                    Ok(())
795                }
796                _ => Err(AvroError::InvalidArgument(
797                    "Default for int16 must be int".to_string(),
798                )),
799            },
800            #[cfg(feature = "avro_custom_types")]
801            Self::UInt8(v) => match lit {
802                AvroLiteral::Int(i) => {
803                    let x = u8::try_from(*i).map_err(|_| {
804                        AvroError::InvalidArgument(format!(
805                            "Default for uint8 out of range for u8: {i}"
806                        ))
807                    })?;
808                    v.push(x);
809                    Ok(())
810                }
811                _ => Err(AvroError::InvalidArgument(
812                    "Default for uint8 must be int".to_string(),
813                )),
814            },
815            #[cfg(feature = "avro_custom_types")]
816            Self::UInt16(v) => match lit {
817                AvroLiteral::Int(i) => {
818                    let x = u16::try_from(*i).map_err(|_| {
819                        AvroError::InvalidArgument(format!(
820                            "Default for uint16 out of range for u16: {i}"
821                        ))
822                    })?;
823                    v.push(x);
824                    Ok(())
825                }
826                _ => Err(AvroError::InvalidArgument(
827                    "Default for uint16 must be int".to_string(),
828                )),
829            },
830            #[cfg(feature = "avro_custom_types")]
831            Self::UInt32(v) => match lit {
832                AvroLiteral::Long(i) => {
833                    let x = u32::try_from(*i).map_err(|_| {
834                        AvroError::InvalidArgument(format!(
835                            "Default for uint32 out of range for u32: {i}"
836                        ))
837                    })?;
838                    v.push(x);
839                    Ok(())
840                }
841                _ => Err(AvroError::InvalidArgument(
842                    "Default for uint32 must be long".to_string(),
843                )),
844            },
845            #[cfg(feature = "avro_custom_types")]
846            Self::UInt64(v) => match lit {
847                AvroLiteral::Bytes(b) => {
848                    if b.len() != 8 {
849                        return Err(AvroError::InvalidArgument(format!(
850                            "uint64 default must be exactly 8 bytes, got {}",
851                            b.len()
852                        )));
853                    }
854                    v.push(u64::from_le_bytes([
855                        b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
856                    ]));
857                    Ok(())
858                }
859                _ => Err(AvroError::InvalidArgument(
860                    "Default for uint64 must be bytes (8-byte LE)".to_string(),
861                )),
862            },
863            #[cfg(feature = "avro_custom_types")]
864            Self::Float16(v) => match lit {
865                AvroLiteral::Bytes(b) => {
866                    if b.len() != 2 {
867                        return Err(AvroError::InvalidArgument(format!(
868                            "float16 default must be exactly 2 bytes, got {}",
869                            b.len()
870                        )));
871                    }
872                    v.push(u16::from_le_bytes([b[0], b[1]]));
873                    Ok(())
874                }
875                _ => Err(AvroError::InvalidArgument(
876                    "Default for float16 must be bytes (2-byte LE IEEE-754)".to_string(),
877                )),
878            },
879            #[cfg(feature = "avro_custom_types")]
880            Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => match lit {
881                AvroLiteral::Long(i) => {
882                    v.push(*i);
883                    Ok(())
884                }
885                _ => Err(AvroError::InvalidArgument(
886                    "Default for date64/time-nanos/timestamp-secs must be long".to_string(),
887                )),
888            },
889            #[cfg(feature = "avro_custom_types")]
890            Self::Time32Secs(v) => match lit {
891                AvroLiteral::Int(i) => {
892                    v.push(*i);
893                    Ok(())
894                }
895                _ => Err(AvroError::InvalidArgument(
896                    "Default for time32-secs must be int".to_string(),
897                )),
898            },
899            #[cfg(feature = "avro_custom_types")]
900            Self::IntervalYearMonth(v) => match lit {
901                AvroLiteral::Bytes(b) => {
902                    if b.len() != 4 {
903                        return Err(AvroError::InvalidArgument(format!(
904                            "interval-year-month default must be exactly 4 bytes, got {}",
905                            b.len()
906                        )));
907                    }
908                    v.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
909                    Ok(())
910                }
911                _ => Err(AvroError::InvalidArgument(
912                    "Default for interval-year-month must be bytes (4-byte LE)".to_string(),
913                )),
914            },
915            #[cfg(feature = "avro_custom_types")]
916            Self::IntervalMonthDayNano(v) => match lit {
917                AvroLiteral::Bytes(b) => {
918                    if b.len() != 16 {
919                        return Err(AvroError::InvalidArgument(format!(
920                            "interval-month-day-nano default must be exactly 16 bytes, got {}",
921                            b.len()
922                        )));
923                    }
924                    let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
925                    let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
926                    let nanos =
927                        i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
928                    v.push(IntervalMonthDayNano::new(months, days, nanos));
929                    Ok(())
930                }
931                _ => Err(AvroError::InvalidArgument(
932                    "Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
933                )),
934            },
935            #[cfg(feature = "avro_custom_types")]
936            Self::IntervalDayTime(v) => match lit {
937                AvroLiteral::Bytes(b) => {
938                    if b.len() != 8 {
939                        return Err(AvroError::InvalidArgument(format!(
940                            "interval-day-time default must be exactly 8 bytes, got {}",
941                            b.len()
942                        )));
943                    }
944                    let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
945                    let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
946                    v.push(IntervalDayTime::new(days, milliseconds));
947                    Ok(())
948                }
949                _ => Err(AvroError::InvalidArgument(
950                    "Default for interval-day-time must be bytes (8-byte LE)".to_string(),
951                )),
952            },
953            Self::Int64(v)
954            | Self::Int32ToInt64(v)
955            | Self::TimeMicros(v)
956            | Self::TimestampMillis(_, v)
957            | Self::TimestampMicros(_, v)
958            | Self::TimestampNanos(_, v) => match lit {
959                AvroLiteral::Long(i) => {
960                    v.push(*i);
961                    Ok(())
962                }
963                AvroLiteral::Int(i) => {
964                    v.push(*i as i64);
965                    Ok(())
966                }
967                _ => Err(AvroError::InvalidArgument(
968                    "Default for long/time-micros/timestamp must be long or int".to_string(),
969                )),
970            },
971            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
972                AvroLiteral::Float(f) => {
973                    v.push(*f);
974                    Ok(())
975                }
976                _ => Err(AvroError::InvalidArgument(
977                    "Default for float must be float".to_string(),
978                )),
979            },
980            Self::Float64(v)
981            | Self::Int32ToFloat64(v)
982            | Self::Int64ToFloat64(v)
983            | Self::Float32ToFloat64(v) => match lit {
984                AvroLiteral::Double(f) => {
985                    v.push(*f);
986                    Ok(())
987                }
988                _ => Err(AvroError::InvalidArgument(
989                    "Default for double must be double".to_string(),
990                )),
991            },
992            Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
993                AvroLiteral::Bytes(b) => {
994                    offsets.push_length(b.len());
995                    values.extend_from_slice(b);
996                    Ok(())
997                }
998                _ => Err(AvroError::InvalidArgument(
999                    "Default for bytes must be bytes".to_string(),
1000                )),
1001            },
1002            Self::BytesToString(offsets, values)
1003            | Self::String(offsets, values)
1004            | Self::StringView(offsets, values) => match lit {
1005                AvroLiteral::String(s) => {
1006                    let b = s.as_bytes();
1007                    offsets.push_length(b.len());
1008                    values.extend_from_slice(b);
1009                    Ok(())
1010                }
1011                _ => Err(AvroError::InvalidArgument(
1012                    "Default for string must be string".to_string(),
1013                )),
1014            },
1015            Self::Uuid(values) => match lit {
1016                AvroLiteral::String(s) => {
1017                    let uuid = Uuid::try_parse(s).map_err(|e| {
1018                        AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
1019                    })?;
1020                    values.extend_from_slice(uuid.as_bytes());
1021                    Ok(())
1022                }
1023                _ => Err(AvroError::InvalidArgument(
1024                    "Default for uuid must be string".to_string(),
1025                )),
1026            },
1027            Self::Fixed(sz, accum) => match lit {
1028                AvroLiteral::Bytes(b) => {
1029                    if b.len() != *sz as usize {
1030                        return Err(AvroError::InvalidArgument(format!(
1031                            "Fixed default length {} does not match size {sz}",
1032                            b.len(),
1033                        )));
1034                    }
1035                    accum.extend_from_slice(b);
1036                    Ok(())
1037                }
1038                _ => Err(AvroError::InvalidArgument(
1039                    "Default for fixed must be bytes".to_string(),
1040                )),
1041            },
1042            #[cfg(feature = "small_decimals")]
1043            Self::Decimal32(_, _, _, builder) => {
1044                append_decimal_default!(lit, builder, 4, i32, "decimal32")
1045            }
1046            #[cfg(feature = "small_decimals")]
1047            Self::Decimal64(_, _, _, builder) => {
1048                append_decimal_default!(lit, builder, 8, i64, "decimal64")
1049            }
1050            Self::Decimal128(_, _, _, builder) => {
1051                append_decimal_default!(lit, builder, 16, i128, "decimal128")
1052            }
1053            Self::Decimal256(_, _, _, builder) => {
1054                append_decimal_default!(lit, builder, 32, i256, "decimal256")
1055            }
1056            Self::Duration(builder) => match lit {
1057                AvroLiteral::Bytes(b) => {
1058                    if b.len() != 12 {
1059                        return Err(AvroError::InvalidArgument(format!(
1060                            "Duration default must be exactly 12 bytes, got {}",
1061                            b.len()
1062                        )));
1063                    }
1064                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1065                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1066                    let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1067                    let nanos = (millis as i64) * 1_000_000;
1068                    builder.append_value(IntervalMonthDayNano::new(
1069                        months as i32,
1070                        days as i32,
1071                        nanos,
1072                    ));
1073                    Ok(())
1074                }
1075                _ => Err(AvroError::InvalidArgument(
1076                    "Default for duration must be 12-byte little-endian months/days/millis"
1077                        .to_string(),
1078                )),
1079            },
1080            Self::Array(_, offsets, inner) => match lit {
1081                AvroLiteral::Array(items) => {
1082                    offsets.push_length(items.len());
1083                    for item in items {
1084                        inner.append_default(item)?;
1085                    }
1086                    Ok(())
1087                }
1088                _ => Err(AvroError::InvalidArgument(
1089                    "Default for array must be an array literal".to_string(),
1090                )),
1091            },
1092            Self::Map(_, koff, moff, kdata, valdec) => match lit {
1093                AvroLiteral::Map(entries) => {
1094                    moff.push_length(entries.len());
1095                    for (k, v) in entries {
1096                        let kb = k.as_bytes();
1097                        koff.push_length(kb.len());
1098                        kdata.extend_from_slice(kb);
1099                        valdec.append_default(v)?;
1100                    }
1101                    Ok(())
1102                }
1103                _ => Err(AvroError::InvalidArgument(
1104                    "Default for map must be a map/object literal".to_string(),
1105                )),
1106            },
1107            Self::Enum(indices, symbols, _) => match lit {
1108                AvroLiteral::Enum(sym) => {
1109                    let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
1110                        AvroError::InvalidArgument(format!(
1111                            "Enum default symbol {sym:?} not in reader symbols"
1112                        ))
1113                    })?;
1114                    indices.push(pos as i32);
1115                    Ok(())
1116                }
1117                _ => Err(AvroError::InvalidArgument(
1118                    "Default for enum must be a symbol".to_string(),
1119                )),
1120            },
1121            #[cfg(feature = "avro_custom_types")]
1122            Self::RunEndEncoded(_, len, inner) => {
1123                *len += 1;
1124                inner.append_default(lit)
1125            }
1126            Self::Union(u) => u.append_default(lit),
1127            Self::Record(field_meta, decoders, field_defaults, _) => match lit {
1128                AvroLiteral::Map(entries) => {
1129                    for (i, dec) in decoders.iter_mut().enumerate() {
1130                        let name = field_meta[i].name();
1131                        if let Some(sub) = entries.get(name) {
1132                            dec.append_default(sub)?;
1133                        } else if let Some(default_literal) = field_defaults[i].as_ref() {
1134                            dec.append_default(default_literal)?;
1135                        } else {
1136                            dec.append_null()?;
1137                        }
1138                    }
1139                    Ok(())
1140                }
1141                AvroLiteral::Null => {
1142                    for (i, dec) in decoders.iter_mut().enumerate() {
1143                        if let Some(default_literal) = field_defaults[i].as_ref() {
1144                            dec.append_default(default_literal)?;
1145                        } else {
1146                            dec.append_null()?;
1147                        }
1148                    }
1149                    Ok(())
1150                }
1151                _ => Err(AvroError::InvalidArgument(
1152                    "Default for record must be a map/object or null".to_string(),
1153                )),
1154            },
1155        }
1156    }
1157
1158    /// Decode a single record from `buf`
1159    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1160        match self {
1161            Self::Null(x) => *x += 1,
1162            Self::Boolean(values) => values.append(buf.get_bool()?),
1163            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
1164                values.push(buf.get_int()?)
1165            }
1166            Self::Int64(values)
1167            | Self::TimeMicros(values)
1168            | Self::TimestampMillis(_, values)
1169            | Self::TimestampMicros(_, values)
1170            | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
1171            #[cfg(feature = "avro_custom_types")]
1172            Self::DurationSecond(values)
1173            | Self::DurationMillisecond(values)
1174            | Self::DurationMicrosecond(values)
1175            | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
1176            #[cfg(feature = "avro_custom_types")]
1177            Self::Int8(values) => {
1178                let raw = buf.get_int()?;
1179                let x = i8::try_from(raw).map_err(|_| {
1180                    AvroError::ParseError(format!("int8 value {raw} out of range for i8"))
1181                })?;
1182                values.push(x);
1183            }
1184            #[cfg(feature = "avro_custom_types")]
1185            Self::Int16(values) => {
1186                let raw = buf.get_int()?;
1187                let x = i16::try_from(raw).map_err(|_| {
1188                    AvroError::ParseError(format!("int16 value {raw} out of range for i16"))
1189                })?;
1190                values.push(x);
1191            }
1192            #[cfg(feature = "avro_custom_types")]
1193            Self::UInt8(values) => {
1194                let raw = buf.get_int()?;
1195                let x = u8::try_from(raw).map_err(|_| {
1196                    AvroError::ParseError(format!("uint8 value {raw} out of range for u8"))
1197                })?;
1198                values.push(x);
1199            }
1200            #[cfg(feature = "avro_custom_types")]
1201            Self::UInt16(values) => {
1202                let raw = buf.get_int()?;
1203                let x = u16::try_from(raw).map_err(|_| {
1204                    AvroError::ParseError(format!("uint16 value {raw} out of range for u16"))
1205                })?;
1206                values.push(x);
1207            }
1208            #[cfg(feature = "avro_custom_types")]
1209            Self::UInt32(values) => {
1210                let raw = buf.get_long()?;
1211                let x = u32::try_from(raw).map_err(|_| {
1212                    AvroError::ParseError(format!("uint32 value {raw} out of range for u32"))
1213                })?;
1214                values.push(x);
1215            }
1216            #[cfg(feature = "avro_custom_types")]
1217            Self::UInt64(values) => {
1218                let b = buf.get_fixed(8)?;
1219                values.push(u64::from_le_bytes([
1220                    b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
1221                ]));
1222            }
1223            #[cfg(feature = "avro_custom_types")]
1224            Self::Float16(values) => {
1225                let b = buf.get_fixed(2)?;
1226                values.push(u16::from_le_bytes([b[0], b[1]]));
1227            }
1228            #[cfg(feature = "avro_custom_types")]
1229            Self::Date64(values) | Self::TimeNanos(values) | Self::TimestampSecs(_, values) => {
1230                values.push(buf.get_long()?)
1231            }
1232            #[cfg(feature = "avro_custom_types")]
1233            Self::Time32Secs(values) => values.push(buf.get_int()?),
1234            #[cfg(feature = "avro_custom_types")]
1235            Self::IntervalYearMonth(values) => {
1236                let b = buf.get_fixed(4)?;
1237                values.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
1238            }
1239            #[cfg(feature = "avro_custom_types")]
1240            Self::IntervalMonthDayNano(values) => {
1241                let b = buf.get_fixed(16)?;
1242                let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1243                let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1244                let nanos =
1245                    i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
1246                values.push(IntervalMonthDayNano::new(months, days, nanos));
1247            }
1248            #[cfg(feature = "avro_custom_types")]
1249            Self::IntervalDayTime(values) => {
1250                let b = buf.get_fixed(8)?;
1251                // Read as two i32s: days (4 bytes) and milliseconds (4 bytes)
1252                let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1253                let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1254                values.push(IntervalDayTime::new(days, milliseconds));
1255            }
1256            Self::Float32(values) => values.push(buf.get_float()?),
1257            Self::Float64(values) => values.push(buf.get_double()?),
1258            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
1259            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
1260            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
1261            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
1262            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
1263            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
1264            Self::StringToBytes(offsets, values)
1265            | Self::BytesToString(offsets, values)
1266            | Self::Binary(offsets, values)
1267            | Self::String(offsets, values)
1268            | Self::StringView(offsets, values) => {
1269                let data = buf.get_bytes()?;
1270                offsets.push_length(data.len());
1271                values.extend_from_slice(data);
1272            }
1273            Self::Uuid(values) => {
1274                let s_bytes = buf.get_bytes()?;
1275                let s = std::str::from_utf8(s_bytes).map_err(|e| {
1276                    AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
1277                })?;
1278                let uuid = Uuid::try_parse(s)
1279                    .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
1280                values.extend_from_slice(uuid.as_bytes());
1281            }
1282            Self::Array(_, off, encoding) => {
1283                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
1284                off.push_length(total_items);
1285            }
1286            Self::Record(_, encodings, _, None) => {
1287                for encoding in encodings {
1288                    encoding.decode(buf)?;
1289                }
1290            }
1291            Self::Record(_, encodings, _, Some(proj)) => {
1292                proj.project_record(buf, encodings)?;
1293            }
1294            Self::Map(_, koff, moff, kdata, valdec) => {
1295                let newly_added = read_blocks(buf, |cur| {
1296                    let kb = cur.get_bytes()?;
1297                    koff.push_length(kb.len());
1298                    kdata.extend_from_slice(kb);
1299                    valdec.decode(cur)
1300                })?;
1301                moff.push_length(newly_added);
1302            }
1303            Self::Fixed(sz, accum) => {
1304                let fx = buf.get_fixed(*sz as usize)?;
1305                accum.extend_from_slice(fx);
1306            }
1307            #[cfg(feature = "small_decimals")]
1308            Self::Decimal32(_, _, size, builder) => {
1309                decode_decimal!(size, buf, builder, 4, i32);
1310            }
1311            #[cfg(feature = "small_decimals")]
1312            Self::Decimal64(_, _, size, builder) => {
1313                decode_decimal!(size, buf, builder, 8, i64);
1314            }
1315            Self::Decimal128(_, _, size, builder) => {
1316                decode_decimal!(size, buf, builder, 16, i128);
1317            }
1318            Self::Decimal256(_, _, size, builder) => {
1319                decode_decimal!(size, buf, builder, 32, i256);
1320            }
1321            Self::Enum(indices, _, None) => {
1322                indices.push(buf.get_int()?);
1323            }
1324            Self::Enum(indices, _, Some(res)) => {
1325                let raw = buf.get_int()?;
1326                let resolved = res.resolve(raw)?;
1327                indices.push(resolved);
1328            }
1329            Self::Duration(builder) => {
1330                let b = buf.get_fixed(12)?;
1331                let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1332                let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1333                let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1334                let nanos = (millis as i64) * 1_000_000;
1335                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
1336            }
1337            #[cfg(feature = "avro_custom_types")]
1338            Self::RunEndEncoded(_, len, inner) => {
1339                *len += 1;
1340                inner.decode(buf)?;
1341            }
1342            Self::Union(u) => u.decode(buf)?,
1343            Self::Nullable(plan, nb, encoding) => {
1344                match plan {
1345                    NullablePlan::FromSingle { resolution } => {
1346                        encoding.decode_with_resolution(buf, resolution)?;
1347                        nb.append(true);
1348                    }
1349                    NullablePlan::ReadTag {
1350                        nullability,
1351                        resolution,
1352                    } => {
1353                        let branch = buf.read_vlq()?;
1354                        let is_not_null = match *nullability {
1355                            Nullability::NullFirst => branch != 0,
1356                            Nullability::NullSecond => branch == 0,
1357                        };
1358                        if is_not_null {
1359                            // It is important to decode before appending to null buffer in case of decode error
1360                            encoding.decode_with_resolution(buf, resolution)?;
1361                        } else {
1362                            encoding.append_null()?;
1363                        }
1364                        nb.append(is_not_null);
1365                    }
1366                }
1367            }
1368        }
1369        Ok(())
1370    }
1371
1372    fn decode_with_promotion(
1373        &mut self,
1374        buf: &mut AvroCursor<'_>,
1375        promotion: Promotion,
1376    ) -> Result<(), AvroError> {
1377        #[cfg(feature = "avro_custom_types")]
1378        if let Self::RunEndEncoded(_, len, inner) = self {
1379            *len += 1;
1380            return inner.decode_with_promotion(buf, promotion);
1381        }
1382
1383        macro_rules! promote_numeric_to {
1384            ($variant:ident, $getter:ident, $to:ty) => {{
1385                match self {
1386                    Self::$variant(v) => {
1387                        let x = buf.$getter()?;
1388                        v.push(x as $to);
1389                        Ok(())
1390                    }
1391                    other => Err(AvroError::ParseError(format!(
1392                        "Promotion {promotion} target mismatch: expected {}, got {}",
1393                        stringify!($variant),
1394                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
1395                    ))),
1396                }
1397            }};
1398        }
1399        match promotion {
1400            Promotion::Direct => self.decode(buf),
1401            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1402            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1403            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1404            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1405            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1406            Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1407            Promotion::StringToBytes => match self {
1408                Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1409                    let data = buf.get_bytes()?;
1410                    offsets.push_length(data.len());
1411                    values.extend_from_slice(data);
1412                    Ok(())
1413                }
1414                other => Err(AvroError::ParseError(format!(
1415                    "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1416                    <Self as AsRef<str>>::as_ref(other)
1417                ))),
1418            },
1419            Promotion::BytesToString => match self {
1420                Self::String(offsets, values)
1421                | Self::StringView(offsets, values)
1422                | Self::BytesToString(offsets, values) => {
1423                    let data = buf.get_bytes()?;
1424                    offsets.push_length(data.len());
1425                    values.extend_from_slice(data);
1426                    Ok(())
1427                }
1428                other => Err(AvroError::ParseError(format!(
1429                    "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1430                    <Self as AsRef<str>>::as_ref(other)
1431                ))),
1432            },
1433        }
1434    }
1435
1436    fn decode_with_resolution<'d>(
1437        &'d mut self,
1438        buf: &mut AvroCursor<'_>,
1439        resolution: &'d ResolutionPlan,
1440    ) -> Result<(), AvroError> {
1441        #[cfg(feature = "avro_custom_types")]
1442        if let Self::RunEndEncoded(_, len, inner) = self {
1443            *len += 1;
1444            return inner.decode_with_resolution(buf, resolution);
1445        }
1446
1447        match resolution {
1448            ResolutionPlan::Promotion(promotion) => {
1449                let promotion = *promotion;
1450                self.decode_with_promotion(buf, promotion)
1451            }
1452            ResolutionPlan::DefaultValue(lit) => self.append_default(lit),
1453            ResolutionPlan::EnumMapping(res) => {
1454                let Self::Enum(indices, _, _) = self else {
1455                    return Err(AvroError::SchemaError(
1456                        "enum mapping resolution provided for non-enum decoder".into(),
1457                    ));
1458                };
1459                let raw = buf.get_int()?;
1460                let resolved = res.resolve(raw)?;
1461                indices.push(resolved);
1462                Ok(())
1463            }
1464            ResolutionPlan::Record(proj) => {
1465                let Self::Record(_, encodings, _, _) = self else {
1466                    return Err(AvroError::SchemaError(
1467                        "record projection provided for non-record decoder".into(),
1468                    ));
1469                };
1470                proj.project_record(buf, encodings)
1471            }
1472        }
1473    }
1474
1475    /// Flush decoded records to an [`ArrayRef`]
1476    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1477        Ok(match self {
1478            Self::Nullable(_, n, e) => e.flush(n.finish())?,
1479            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1480            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1481            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1482            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1483            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1484            Self::TimeMillis(values) => {
1485                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1486            }
1487            Self::TimeMicros(values) => {
1488                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1489            }
1490            Self::TimestampMillis(tz, values) => Arc::new(
1491                flush_primitive::<TimestampMillisecondType>(values, nulls)
1492                    .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1493            ),
1494            Self::TimestampMicros(tz, values) => Arc::new(
1495                flush_primitive::<TimestampMicrosecondType>(values, nulls)
1496                    .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1497            ),
1498            Self::TimestampNanos(tz, values) => Arc::new(
1499                flush_primitive::<TimestampNanosecondType>(values, nulls)
1500                    .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1501            ),
1502            #[cfg(feature = "avro_custom_types")]
1503            Self::DurationSecond(values) => {
1504                Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1505            }
1506            #[cfg(feature = "avro_custom_types")]
1507            Self::DurationMillisecond(values) => {
1508                Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1509            }
1510            #[cfg(feature = "avro_custom_types")]
1511            Self::DurationMicrosecond(values) => {
1512                Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1513            }
1514            #[cfg(feature = "avro_custom_types")]
1515            Self::DurationNanosecond(values) => {
1516                Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1517            }
1518            #[cfg(feature = "avro_custom_types")]
1519            Self::Int8(values) => Arc::new(flush_primitive::<Int8Type>(values, nulls)),
1520            #[cfg(feature = "avro_custom_types")]
1521            Self::Int16(values) => Arc::new(flush_primitive::<Int16Type>(values, nulls)),
1522            #[cfg(feature = "avro_custom_types")]
1523            Self::UInt8(values) => Arc::new(flush_primitive::<UInt8Type>(values, nulls)),
1524            #[cfg(feature = "avro_custom_types")]
1525            Self::UInt16(values) => Arc::new(flush_primitive::<UInt16Type>(values, nulls)),
1526            #[cfg(feature = "avro_custom_types")]
1527            Self::UInt32(values) => Arc::new(flush_primitive::<UInt32Type>(values, nulls)),
1528            #[cfg(feature = "avro_custom_types")]
1529            Self::UInt64(values) => Arc::new(flush_primitive::<UInt64Type>(values, nulls)),
1530            #[cfg(feature = "avro_custom_types")]
1531            Self::Float16(values) => {
1532                // Convert Vec<u16> to Float16Array by reinterpreting the raw bytes.
1533                // This is safe because f16 and u16 have the same size and alignment.
1534                let len = values.len();
1535                let buf: Buffer = std::mem::take(values).into();
1536                let scalar_buf = ScalarBuffer::new(buf, 0, len);
1537                Arc::new(Float16Array::new(scalar_buf, nulls))
1538            }
1539            #[cfg(feature = "avro_custom_types")]
1540            Self::Date64(values) => Arc::new(flush_primitive::<Date64Type>(values, nulls)),
1541            #[cfg(feature = "avro_custom_types")]
1542            Self::TimeNanos(values) => {
1543                Arc::new(flush_primitive::<Time64NanosecondType>(values, nulls))
1544            }
1545            #[cfg(feature = "avro_custom_types")]
1546            Self::Time32Secs(values) => {
1547                Arc::new(flush_primitive::<Time32SecondType>(values, nulls))
1548            }
1549            #[cfg(feature = "avro_custom_types")]
1550            Self::TimestampSecs(is_utc, values) => Arc::new(
1551                flush_primitive::<TimestampSecondType>(values, nulls)
1552                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1553            ),
1554            #[cfg(feature = "avro_custom_types")]
1555            Self::IntervalYearMonth(values) => {
1556                Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
1557            }
1558            #[cfg(feature = "avro_custom_types")]
1559            Self::IntervalMonthDayNano(values) => {
1560                Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1561            }
1562            #[cfg(feature = "avro_custom_types")]
1563            Self::IntervalDayTime(values) => {
1564                Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
1565            }
1566            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1567            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1568            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1569            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1570                Arc::new(flush_primitive::<Float32Type>(values, nulls))
1571            }
1572            Self::Int32ToFloat64(values)
1573            | Self::Int64ToFloat64(values)
1574            | Self::Float32ToFloat64(values) => {
1575                Arc::new(flush_primitive::<Float64Type>(values, nulls))
1576            }
1577            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1578                let offsets = flush_offsets(offsets);
1579                let values = flush_values(values).into();
1580                Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1581            }
1582            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1583                let offsets = flush_offsets(offsets);
1584                let values = flush_values(values).into();
1585                Arc::new(StringArray::try_new(offsets, values, nulls)?)
1586            }
1587            Self::StringView(offsets, values) => {
1588                let offsets = flush_offsets(offsets);
1589                let values = flush_values(values);
1590                let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1591                let values: Vec<&str> = (0..array.len())
1592                    .map(|i| {
1593                        if array.is_valid(i) {
1594                            array.value(i)
1595                        } else {
1596                            ""
1597                        }
1598                    })
1599                    .collect();
1600                Arc::new(StringViewArray::from(values))
1601            }
1602            Self::Array(field, offsets, values) => {
1603                let values = values.flush(None)?;
1604                let offsets = flush_offsets(offsets);
1605                Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1606            }
1607            Self::Record(fields, encodings, _, _) => {
1608                let arrays = encodings
1609                    .iter_mut()
1610                    .map(|x| x.flush(None))
1611                    .collect::<Result<Vec<_>, _>>()?;
1612                Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1613            }
1614            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1615                let moff = flush_offsets(m_off);
1616                let koff = flush_offsets(k_off);
1617                let kd = flush_values(kdata).into();
1618                let val_arr = valdec.flush(None)?;
1619                let key_arr = StringArray::try_new(koff, kd, None)?;
1620                if key_arr.len() != val_arr.len() {
1621                    return Err(AvroError::InvalidArgument(format!(
1622                        "Map keys length ({}) != map values length ({})",
1623                        key_arr.len(),
1624                        val_arr.len()
1625                    )));
1626                }
1627                let final_len = moff.len() - 1;
1628                if let Some(n) = &nulls {
1629                    if n.len() != final_len {
1630                        return Err(AvroError::InvalidArgument(format!(
1631                            "Map array null buffer length {} != final map length {final_len}",
1632                            n.len()
1633                        )));
1634                    }
1635                }
1636                let entries_fields = match map_field.data_type() {
1637                    DataType::Struct(fields) => fields.clone(),
1638                    other => {
1639                        return Err(AvroError::InvalidArgument(format!(
1640                            "Map entries field must be a Struct, got {other:?}"
1641                        )));
1642                    }
1643                };
1644                let entries_struct =
1645                    StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1646                let map_arr =
1647                    MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1648                Arc::new(map_arr)
1649            }
1650            Self::Fixed(sz, accum) => {
1651                let b: Buffer = flush_values(accum).into();
1652                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1653                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1654                Arc::new(arr)
1655            }
1656            Self::Uuid(values) => {
1657                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1658                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1659                Arc::new(arr)
1660            }
1661            #[cfg(feature = "small_decimals")]
1662            Self::Decimal32(precision, scale, _, builder) => {
1663                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1664            }
1665            #[cfg(feature = "small_decimals")]
1666            Self::Decimal64(precision, scale, _, builder) => {
1667                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1668            }
1669            Self::Decimal128(precision, scale, _, builder) => {
1670                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1671            }
1672            Self::Decimal256(precision, scale, _, builder) => {
1673                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1674            }
1675            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1676            Self::Duration(builder) => {
1677                let (_, vals, _) = builder.finish().into_parts();
1678                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1679                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1680                Arc::new(vals)
1681            }
1682            #[cfg(feature = "avro_custom_types")]
1683            Self::RunEndEncoded(width, len, inner) => {
1684                let values = inner.flush(nulls)?;
1685                let n = *len;
1686                let arr = values.as_ref();
1687                let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1688                if n > 0 {
1689                    run_starts.push(0);
1690                    for i in 1..n {
1691                        if !values_equal_at(arr, i - 1, i) {
1692                            run_starts.push(i);
1693                        }
1694                    }
1695                }
1696                if n > (u32::MAX as usize) {
1697                    return Err(AvroError::InvalidArgument(format!(
1698                        "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1699                    )));
1700                }
1701                let run_count = run_starts.len();
1702                let take_idx: PrimitiveArray<UInt32Type> =
1703                    run_starts.iter().map(|&s| s as u32).collect();
1704                let per_run_values = if run_count == 0 {
1705                    values.slice(0, 0)
1706                } else {
1707                    take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1708                        AvroError::ParseError(format!("take() for REE values failed: {e}"))
1709                    })?
1710                };
1711
1712                macro_rules! build_run_array {
1713                    ($Native:ty, $ArrowTy:ty) => {{
1714                        let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1715                        for (idx, &_start) in run_starts.iter().enumerate() {
1716                            let end = if idx + 1 < run_count {
1717                                run_starts[idx + 1]
1718                            } else {
1719                                n
1720                            };
1721                            ends.push(end as $Native);
1722                        }
1723                        let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1724                        let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1725                            .map_err(|e| AvroError::ParseError(e.to_string()))?;
1726                        Arc::new(run_arr) as ArrayRef
1727                    }};
1728                }
1729                match *width {
1730                    2 => {
1731                        if n > i16::MAX as usize {
1732                            return Err(AvroError::InvalidArgument(format!(
1733                                "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1734                            )));
1735                        }
1736                        build_run_array!(i16, Int16Type)
1737                    }
1738                    4 => build_run_array!(i32, Int32Type),
1739                    8 => build_run_array!(i64, Int64Type),
1740                    other => {
1741                        return Err(AvroError::InvalidArgument(format!(
1742                            "Unsupported run-end width {other} for RunEndEncoded"
1743                        )));
1744                    }
1745                }
1746            }
1747            Self::Union(u) => u.flush(nulls)?,
1748        })
1749    }
1750}
1751
1752/// Runtime plan for decoding reader-side `["null", T]` types.
1753#[derive(Debug)]
1754enum NullablePlan {
1755    /// Writer actually wrote a union (branch tag present).
1756    ReadTag {
1757        nullability: Nullability,
1758        resolution: ResolutionPlan,
1759    },
1760    /// Writer wrote a single (non-union) value resolved to the non-null branch
1761    /// of the reader union; do NOT read a branch tag, but apply any resolution.
1762    FromSingle { resolution: ResolutionPlan },
1763}
1764
1765/// Runtime plan for resolving writer-reader type differences.
1766#[derive(Debug)]
1767enum ResolutionPlan {
1768    /// Indicates that the writer's type should be promoted to the reader's type.
1769    Promotion(Promotion),
1770    /// Provides a default value for the field missing in the writer type.
1771    DefaultValue(AvroLiteral),
1772    /// Provides mapping information for resolving enums.
1773    EnumMapping(EnumResolution),
1774    /// Provides projection information for record fields.
1775    Record(Projector),
1776}
1777
1778impl ResolutionPlan {
1779    fn try_new(decoder: &Decoder, resolution: &ResolutionInfo) -> Result<Self, AvroError> {
1780        match (decoder, resolution) {
1781            (_, ResolutionInfo::Promotion(p)) => Ok(ResolutionPlan::Promotion(*p)),
1782            (_, ResolutionInfo::DefaultValue(lit)) => Ok(ResolutionPlan::DefaultValue(lit.clone())),
1783            (_, ResolutionInfo::EnumMapping(m)) => {
1784                Ok(ResolutionPlan::EnumMapping(EnumResolution::new(m)))
1785            }
1786            (Decoder::Record(_, _, field_defaults, _), ResolutionInfo::Record(r)) => Ok(
1787                ResolutionPlan::Record(ProjectorBuilder::try_new(r, field_defaults).build()?),
1788            ),
1789            (_, ResolutionInfo::Record(_)) => Err(AvroError::SchemaError(
1790                "record resolution on non-record decoder".into(),
1791            )),
1792            (_, ResolutionInfo::Union(_)) => Err(AvroError::SchemaError(
1793                "union variant cannot be resolved to a union type".into(),
1794            )),
1795        }
1796    }
1797}
1798
1799#[derive(Debug)]
1800struct EnumResolution {
1801    mapping: Arc<[i32]>,
1802    default_index: i32,
1803}
1804
1805impl EnumResolution {
1806    fn new(mapping: &EnumMapping) -> Self {
1807        EnumResolution {
1808            mapping: mapping.mapping.clone(),
1809            default_index: mapping.default_index,
1810        }
1811    }
1812
1813    fn resolve(&self, index: i32) -> Result<i32, AvroError> {
1814        let resolved = usize::try_from(index)
1815            .ok()
1816            .and_then(|idx| self.mapping.get(idx).copied())
1817            .filter(|&idx| idx >= 0)
1818            .unwrap_or(self.default_index);
1819        if resolved >= 0 {
1820            Ok(resolved)
1821        } else {
1822            Err(AvroError::ParseError(format!(
1823                "Enum symbol index {index} not resolvable and no default provided",
1824            )))
1825        }
1826    }
1827}
1828
1829// A lookup table for resolving fields between writer and reader schemas during record projection.
1830#[derive(Debug)]
1831struct DispatchLookupTable {
1832    // Maps each reader field index `r` to the corresponding writer field index.
1833    //
1834    // Semantics:
1835    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. The value from
1836    //   the writer field is decoded, and `promotion[r]` is applied.
1837    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. The reader field's
1838    //   default value is used.
1839    //
1840    // Representation (`i8`):
1841    // `i8` is used for a dense, cache-friendly dispatch table, consistent with Arrow's use of
1842    // `i8` for union type IDs. This requires that writer field indices do not exceed `i8::MAX`.
1843    //
1844    // Invariants:
1845    // - `to_reader.len() == promotion.len()` and matches the reader field count.
1846    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
1847    to_reader: Box<[i8]>,
1848    // For each reader field `r`, specifies the resolution to apply to the writer's value.
1849    //
1850    // This is used when a writer field's type can be promoted to a reader field's type
1851    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
1852    resolution: Box<[ResolutionPlan]>,
1853}
1854
1855// Sentinel used in `DispatchLookupTable::to_reader` to mark
1856// "no matching writer field".
1857const NO_SOURCE: i8 = -1;
1858
1859impl DispatchLookupTable {
1860    fn from_writer_to_reader(
1861        reader_branches: &[Decoder],
1862        resolution_map: &[Option<(usize, ResolutionInfo)>],
1863    ) -> Result<Self, AvroError> {
1864        let mut to_reader = Vec::with_capacity(resolution_map.len());
1865        let mut resolution = Vec::with_capacity(resolution_map.len());
1866        for map in resolution_map {
1867            match map {
1868                Some((idx, res)) => {
1869                    let idx = *idx;
1870                    let idx_i8 = i8::try_from(idx).map_err(|_| {
1871                        AvroError::SchemaError(format!(
1872                            "Reader branch index {idx} exceeds i8 range (max {})",
1873                            i8::MAX
1874                        ))
1875                    })?;
1876                    let plan = ResolutionPlan::try_new(&reader_branches[idx], res)?;
1877                    to_reader.push(idx_i8);
1878                    resolution.push(plan);
1879                }
1880                None => {
1881                    to_reader.push(NO_SOURCE);
1882                    resolution.push(ResolutionPlan::DefaultValue(AvroLiteral::Null));
1883                }
1884            }
1885        }
1886        Ok(Self {
1887            to_reader: to_reader.into_boxed_slice(),
1888            resolution: resolution.into_boxed_slice(),
1889        })
1890    }
1891
1892    // Resolve a writer branch index to (reader_idx, resolution)
1893    #[inline]
1894    fn resolve(&self, writer_index: usize) -> Option<(usize, &ResolutionPlan)> {
1895        let reader_index = *self.to_reader.get(writer_index)?;
1896        (reader_index >= 0).then(|| (reader_index as usize, &self.resolution[writer_index]))
1897    }
1898}
1899
1900#[derive(Debug)]
1901struct UnionDecoder {
1902    fields: UnionFields,
1903    branches: UnionDecoderBranches,
1904    default_emit_idx: usize,
1905    null_emit_idx: usize,
1906    plan: UnionReadPlan,
1907}
1908
1909#[derive(Debug, Default)]
1910struct UnionDecoderBranches {
1911    decoders: Vec<Decoder>,
1912    reader_type_codes: Vec<i8>,
1913    type_ids: Vec<i8>,
1914    offsets: Vec<i32>,
1915    counts: Vec<i32>,
1916}
1917
1918impl UnionDecoderBranches {
1919    fn new(decoders: Vec<Decoder>, reader_type_codes: Vec<i8>) -> Self {
1920        let branch_len = decoders.len().max(reader_type_codes.len());
1921        Self {
1922            decoders,
1923            reader_type_codes,
1924            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1925            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1926            counts: vec![0; branch_len],
1927        }
1928    }
1929
1930    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1931        let branches_len = self.decoders.len();
1932        let Some(reader_branch) = self.decoders.get_mut(reader_idx) else {
1933            return Err(AvroError::ParseError(format!(
1934                "Union branch index {reader_idx} out of range ({branches_len} branches)"
1935            )));
1936        };
1937        self.type_ids.push(self.reader_type_codes[reader_idx]);
1938        self.offsets.push(self.counts[reader_idx]);
1939        self.counts[reader_idx] += 1;
1940        Ok(reader_branch)
1941    }
1942}
1943
1944impl Default for UnionDecoder {
1945    fn default() -> Self {
1946        Self {
1947            fields: UnionFields::empty(),
1948            branches: Default::default(),
1949            default_emit_idx: 0,
1950            null_emit_idx: 0,
1951            plan: UnionReadPlan::Passthrough,
1952        }
1953    }
1954}
1955
1956#[derive(Debug)]
1957enum UnionReadPlan {
1958    ReaderUnion {
1959        lookup_table: DispatchLookupTable,
1960    },
1961    FromSingle {
1962        reader_idx: usize,
1963        resolution: ResolutionPlan,
1964    },
1965    ToSingle {
1966        target: Box<Decoder>,
1967        lookup_table: DispatchLookupTable,
1968    },
1969    Passthrough,
1970}
1971
1972impl UnionReadPlan {
1973    fn from_resolved(
1974        reader_branches: &[Decoder],
1975        resolved: Option<ResolvedUnion>,
1976    ) -> Result<Self, AvroError> {
1977        let Some(info) = resolved else {
1978            return Ok(Self::Passthrough);
1979        };
1980        match (info.writer_is_union, info.reader_is_union) {
1981            (true, true) => {
1982                let lookup_table =
1983                    DispatchLookupTable::from_writer_to_reader(reader_branches, &info.writer_to_reader)?;
1984                Ok(Self::ReaderUnion { lookup_table })
1985            }
1986            (false, true) => {
1987                let Some((idx, resolution)) =
1988                    info.writer_to_reader.first().and_then(Option::as_ref)
1989                else {
1990                    return Err(AvroError::SchemaError(
1991                        "Writer type does not match any reader union branch".to_string(),
1992                    ));
1993                };
1994                let reader_idx = *idx;
1995                Ok(Self::FromSingle {
1996                    reader_idx,
1997                    resolution: ResolutionPlan::try_new(&reader_branches[reader_idx], resolution)?,
1998                })
1999            }
2000            (true, false) => Err(AvroError::InvalidArgument(
2001                "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
2002                    .to_string(),
2003            )),
2004            // (false, false) is invalid and should never be constructed by the resolver.
2005            _ => Err(AvroError::SchemaError(
2006                "ResolvedUnion constructed for non-union sides; resolver should return None"
2007                    .to_string(),
2008            )),
2009        }
2010    }
2011}
2012
2013impl UnionDecoder {
2014    fn try_new(
2015        fields: UnionFields,
2016        branches: Vec<Decoder>,
2017        resolved: Option<ResolvedUnion>,
2018    ) -> Result<Self, AvroError> {
2019        let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
2020        let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
2021        let default_emit_idx = 0;
2022        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
2023        // Guard against impractically large unions that cannot be indexed by an Avro int
2024        let max_addr = (i32::MAX as usize) + 1;
2025        if branches.len() > max_addr {
2026            return Err(AvroError::SchemaError(format!(
2027                "Reader union has {} branches, which exceeds the maximum addressable \
2028                 branches by an Avro int tag ({} + 1).",
2029                branches.len(),
2030                i32::MAX
2031            )));
2032        }
2033        let plan = UnionReadPlan::from_resolved(&branches, resolved)?;
2034        Ok(Self {
2035            fields,
2036            branches: UnionDecoderBranches::new(branches, reader_type_codes),
2037            default_emit_idx,
2038            null_emit_idx,
2039            plan,
2040        })
2041    }
2042
2043    fn with_single_target(target: Decoder, info: ResolvedUnion) -> Result<Self, AvroError> {
2044        // This constructor is only for writer-union to single-type resolution
2045        debug_assert!(info.writer_is_union && !info.reader_is_union);
2046        let mut reader_branches = [target];
2047        let lookup_table =
2048            DispatchLookupTable::from_writer_to_reader(&reader_branches, &info.writer_to_reader)?;
2049        let target = Box::new(mem::replace(&mut reader_branches[0], Decoder::Null(0)));
2050        Ok(Self {
2051            plan: UnionReadPlan::ToSingle {
2052                target,
2053                lookup_table,
2054            },
2055            ..Self::default()
2056        })
2057    }
2058
2059    #[inline]
2060    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
2061        // Avro unions are encoded by first writing the zero-based branch index.
2062        // In Avro 1.11.1 this is specified as an *int*; older specs said *long*,
2063        // but both use zig-zag varint encoding, so decoding as long is compatible
2064        // with either form and widely used in practice.
2065        let raw = buf.get_long()?;
2066        if raw < 0 {
2067            return Err(AvroError::ParseError(format!(
2068                "Negative union branch index {raw}"
2069            )));
2070        }
2071        usize::try_from(raw).map_err(|_| {
2072            AvroError::ParseError(format!(
2073                "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2074                (usize::BITS as usize)
2075            ))
2076        })
2077    }
2078
2079    #[inline]
2080    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
2081    where
2082        F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
2083    {
2084        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2085            return action(target);
2086        }
2087        let reader_idx = match &self.plan {
2088            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
2089            _ => fallback_idx,
2090        };
2091        self.branches.emit_to(reader_idx).and_then(action)
2092    }
2093
2094    fn append_null(&mut self) -> Result<(), AvroError> {
2095        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
2096    }
2097
2098    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
2099        self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
2100    }
2101
2102    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2103        match &mut self.plan {
2104            UnionReadPlan::Passthrough => {
2105                let reader_idx = Self::read_tag(buf)?;
2106                let decoder = self.branches.emit_to(reader_idx)?;
2107                decoder.decode(buf)
2108            }
2109            UnionReadPlan::ReaderUnion { lookup_table } => {
2110                let idx = Self::read_tag(buf)?;
2111                let Some((reader_idx, resolution)) = lookup_table.resolve(idx) else {
2112                    return Err(AvroError::ParseError(format!(
2113                        "Union branch index {idx} not resolvable by reader schema"
2114                    )));
2115                };
2116                let decoder = self.branches.emit_to(reader_idx)?;
2117                decoder.decode_with_resolution(buf, resolution)
2118            }
2119            UnionReadPlan::FromSingle {
2120                reader_idx,
2121                resolution,
2122            } => {
2123                let decoder = self.branches.emit_to(*reader_idx)?;
2124                decoder.decode_with_resolution(buf, resolution)
2125            }
2126            UnionReadPlan::ToSingle {
2127                target,
2128                lookup_table,
2129            } => {
2130                let idx = Self::read_tag(buf)?;
2131                let Some((_, resolution)) = lookup_table.resolve(idx) else {
2132                    return Err(AvroError::ParseError(format!(
2133                        "Writer union branch index {idx} not resolvable by reader schema"
2134                    )));
2135                };
2136                target.decode_with_resolution(buf, resolution)
2137            }
2138        }
2139    }
2140
2141    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
2142        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2143            return target.flush(nulls);
2144        }
2145        debug_assert!(
2146            nulls.is_none(),
2147            "UnionArray does not accept a validity bitmap; \
2148                     nulls should have been materialized as a Null child during decode"
2149        );
2150        let children = self
2151            .branches
2152            .decoders
2153            .iter_mut()
2154            .map(|d| d.flush(None))
2155            .collect::<Result<Vec<_>, _>>()?;
2156        let arr = UnionArray::try_new(
2157            self.fields.clone(),
2158            flush_values(&mut self.branches.type_ids)
2159                .into_iter()
2160                .collect(),
2161            Some(
2162                flush_values(&mut self.branches.offsets)
2163                    .into_iter()
2164                    .collect(),
2165            ),
2166            children,
2167        )
2168        .map_err(|e| AvroError::ParseError(e.to_string()))?;
2169        Ok(Arc::new(arr))
2170    }
2171}
2172
2173#[derive(Debug, Default)]
2174struct UnionDecoderBuilder {
2175    fields: Option<UnionFields>,
2176    branches: Option<Vec<Decoder>>,
2177    resolved: Option<ResolvedUnion>,
2178    target: Option<Decoder>,
2179}
2180
2181impl UnionDecoderBuilder {
2182    fn new() -> Self {
2183        Self::default()
2184    }
2185
2186    fn with_fields(mut self, fields: UnionFields) -> Self {
2187        self.fields = Some(fields);
2188        self
2189    }
2190
2191    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
2192        self.branches = Some(branches);
2193        self
2194    }
2195
2196    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
2197        self.resolved = Some(resolved_union);
2198        self
2199    }
2200
2201    fn with_target(mut self, target: Decoder) -> Self {
2202        self.target = Some(target);
2203        self
2204    }
2205
2206    fn build(self) -> Result<UnionDecoder, AvroError> {
2207        match (self.resolved, self.fields, self.branches, self.target) {
2208            (resolved, Some(fields), Some(branches), None) => {
2209                UnionDecoder::try_new(fields, branches, resolved)
2210            }
2211            (Some(info), None, None, Some(target))
2212                if info.writer_is_union && !info.reader_is_union =>
2213            {
2214                UnionDecoder::with_single_target(target, info)
2215            }
2216            _ => Err(AvroError::InvalidArgument(
2217                "Invalid UnionDecoderBuilder configuration: expected either \
2218                 (fields + branches + resolved) with no target for reader-unions, or \
2219                 (resolved + target) with no fields/branches for writer-union to single."
2220                    .to_string(),
2221            )),
2222        }
2223    }
2224}
2225
2226#[derive(Debug, Copy, Clone)]
2227enum NegativeBlockBehavior {
2228    ProcessItems,
2229    SkipBySize,
2230}
2231
2232#[inline]
2233fn skip_blocks(
2234    buf: &mut AvroCursor,
2235    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2236) -> Result<usize, AvroError> {
2237    process_blockwise(
2238        buf,
2239        move |c| skip_item(c),
2240        NegativeBlockBehavior::SkipBySize,
2241    )
2242}
2243
2244#[inline]
2245fn flush_dict(
2246    indices: &mut Vec<i32>,
2247    symbols: &[String],
2248    nulls: Option<NullBuffer>,
2249) -> Result<ArrayRef, AvroError> {
2250    let keys = flush_primitive::<Int32Type>(indices, nulls);
2251    let values = Arc::new(StringArray::from_iter_values(
2252        symbols.iter().map(|s| s.as_str()),
2253    ));
2254    DictionaryArray::try_new(keys, values)
2255        .map_err(Into::into)
2256        .map(|arr| Arc::new(arr) as ArrayRef)
2257}
2258
2259#[inline]
2260fn read_blocks(
2261    buf: &mut AvroCursor,
2262    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2263) -> Result<usize, AvroError> {
2264    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
2265}
2266
2267#[inline]
2268fn process_blockwise(
2269    buf: &mut AvroCursor,
2270    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2271    negative_behavior: NegativeBlockBehavior,
2272) -> Result<usize, AvroError> {
2273    let mut total = 0usize;
2274    loop {
2275        // Read the block count
2276        //  positive = that many items
2277        //  negative = that many items + read block size
2278        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
2279        let block_count = buf.get_long()?;
2280        match block_count.cmp(&0) {
2281            Ordering::Equal => break,
2282            Ordering::Less => {
2283                let count = (-block_count) as usize;
2284                // A negative count is followed by a long of the size in bytes
2285                let size_in_bytes = buf.get_long()? as usize;
2286                match negative_behavior {
2287                    NegativeBlockBehavior::ProcessItems => {
2288                        // Process items one-by-one after reading size
2289                        for _ in 0..count {
2290                            on_item(buf)?;
2291                        }
2292                    }
2293                    NegativeBlockBehavior::SkipBySize => {
2294                        // Skip the entire block payload at once
2295                        let _ = buf.get_fixed(size_in_bytes)?;
2296                    }
2297                }
2298                total += count;
2299            }
2300            Ordering::Greater => {
2301                let count = block_count as usize;
2302                for _ in 0..count {
2303                    on_item(buf)?;
2304                }
2305                total += count;
2306            }
2307        }
2308    }
2309    Ok(total)
2310}
2311
2312#[inline]
2313fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
2314    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
2315}
2316
2317#[inline]
2318fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
2319    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
2320}
2321
2322#[inline]
2323fn flush_primitive<T: ArrowPrimitiveType>(
2324    values: &mut Vec<T::Native>,
2325    nulls: Option<NullBuffer>,
2326) -> PrimitiveArray<T> {
2327    PrimitiveArray::new(flush_values(values).into(), nulls)
2328}
2329
2330#[inline]
2331fn read_decimal_bytes_be<const N: usize>(
2332    buf: &mut AvroCursor<'_>,
2333    size: &Option<usize>,
2334) -> Result<[u8; N], AvroError> {
2335    match size {
2336        Some(n) if *n == N => {
2337            let raw = buf.get_fixed(N)?;
2338            let mut arr = [0u8; N];
2339            arr.copy_from_slice(raw);
2340            Ok(arr)
2341        }
2342        Some(n) => {
2343            let raw = buf.get_fixed(*n)?;
2344            sign_cast_to::<N>(raw)
2345        }
2346        None => {
2347            let raw = buf.get_bytes()?;
2348            sign_cast_to::<N>(raw)
2349        }
2350    }
2351}
2352
2353/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
2354/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
2355/// the payload is a big-endian two's-complement integer, and when narrowing it must
2356/// be representable without changing sign or value.
2357///
2358/// If `raw.len() < N`, the value is sign-extended.
2359/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
2360/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
2361#[inline]
2362fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
2363    let len = raw.len();
2364    // Fast path: exact width, just copy
2365    if len == N {
2366        let mut out = [0u8; N];
2367        out.copy_from_slice(raw);
2368        return Ok(out);
2369    }
2370    // Determine sign byte from MSB of first byte (empty => positive)
2371    let first = raw.first().copied().unwrap_or(0u8);
2372    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
2373    // Pre-fill with sign byte to support sign extension
2374    let mut out = [sign_byte; N];
2375    if len > N {
2376        // Validate truncation: all dropped leading bytes must equal sign_byte,
2377        // and the MSB of the first kept byte must match the sign.
2378        let extra = len - N;
2379        // Any non-sign byte in the truncated prefix indicates overflow
2380        if raw[..extra].iter().any(|&b| b != sign_byte) {
2381            return Err(AvroError::ParseError(format!(
2382                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2383                len, N
2384            )));
2385        }
2386        if N > 0 {
2387            let first_kept = raw[extra];
2388            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
2389            if sign_bit_mismatch {
2390                return Err(AvroError::ParseError(format!(
2391                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2392                    len, N
2393                )));
2394            }
2395        }
2396        out.copy_from_slice(&raw[extra..]);
2397        return Ok(out);
2398    }
2399    out[N - len..].copy_from_slice(raw);
2400    Ok(out)
2401}
2402
2403#[cfg(feature = "avro_custom_types")]
2404#[inline]
2405fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
2406    match (arr.is_null(i), arr.is_null(j)) {
2407        (true, true) => true,
2408        (true, false) | (false, true) => false,
2409        (false, false) => {
2410            let a = arr.slice(i, 1);
2411            let b = arr.slice(j, 1);
2412            a == b
2413        }
2414    }
2415}
2416
2417#[derive(Debug)]
2418struct Projector {
2419    writer_projections: Vec<FieldProjection>,
2420    default_injections: Arc<[(usize, AvroLiteral)]>,
2421}
2422
2423#[derive(Debug)]
2424enum FieldProjection {
2425    ToReader(usize),
2426    Skip(Skipper),
2427}
2428
2429#[derive(Debug)]
2430struct ProjectorBuilder<'a> {
2431    rec: &'a ResolvedRecord,
2432    field_defaults: &'a [Option<AvroLiteral>],
2433}
2434
2435impl<'a> ProjectorBuilder<'a> {
2436    #[inline]
2437    fn try_new(rec: &'a ResolvedRecord, field_defaults: &'a [Option<AvroLiteral>]) -> Self {
2438        Self {
2439            rec,
2440            field_defaults,
2441        }
2442    }
2443
2444    #[inline]
2445    fn build(self) -> Result<Projector, AvroError> {
2446        let mut default_injections: Vec<(usize, AvroLiteral)> =
2447            Vec::with_capacity(self.rec.default_fields.len());
2448        for &idx in self.rec.default_fields.as_ref() {
2449            let lit = self
2450                .field_defaults
2451                .get(idx)
2452                .and_then(|lit| lit.clone())
2453                .unwrap_or(AvroLiteral::Null);
2454            default_injections.push((idx, lit));
2455        }
2456        let writer_projections = self
2457            .rec
2458            .writer_fields
2459            .iter()
2460            .map(|field| match field {
2461                ResolvedField::ToReader(index, _) => Ok(FieldProjection::ToReader(*index)),
2462                ResolvedField::Skip(datatype) => {
2463                    let skipper = Skipper::from_avro(datatype)?;
2464                    Ok(FieldProjection::Skip(skipper))
2465                }
2466            })
2467            .collect::<Result<_, AvroError>>()?;
2468        Ok(Projector {
2469            writer_projections,
2470            default_injections: default_injections.into(),
2471        })
2472    }
2473}
2474
2475impl Projector {
2476    #[inline]
2477    fn project_record(
2478        &self,
2479        buf: &mut AvroCursor<'_>,
2480        encodings: &mut [Decoder],
2481    ) -> Result<(), AvroError> {
2482        for field_proj in self.writer_projections.iter() {
2483            match field_proj {
2484                FieldProjection::ToReader(index) => encodings[*index].decode(buf)?,
2485                FieldProjection::Skip(skipper) => skipper.skip(buf)?,
2486            }
2487        }
2488        for (reader_index, lit) in self.default_injections.as_ref() {
2489            encodings[*reader_index].append_default(lit)?;
2490        }
2491        Ok(())
2492    }
2493}
2494
2495/// Lightweight skipper for non‑projected writer fields
2496/// (fields present in the writer schema but omitted by the reader/projection);
2497/// per Avro 1.11.1 schema resolution these fields are ignored.
2498///
2499/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
2500#[derive(Debug)]
2501enum Skipper {
2502    Null,
2503    Boolean,
2504    Int32,
2505    Int64,
2506    Float32,
2507    Float64,
2508    Bytes,
2509    String,
2510    TimeMicros,
2511    TimestampMillis,
2512    TimestampMicros,
2513    TimestampNanos,
2514    Fixed(usize),
2515    Decimal(Option<usize>),
2516    UuidString,
2517    Enum,
2518    DurationFixed12,
2519    List(Box<Skipper>),
2520    Map(Box<Skipper>),
2521    Struct(Vec<Skipper>),
2522    Union(Vec<Skipper>),
2523    Nullable(Nullability, Box<Skipper>),
2524    #[cfg(feature = "avro_custom_types")]
2525    RunEndEncoded(Box<Skipper>),
2526}
2527
2528impl Skipper {
2529    fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
2530        let mut base = match dt.codec() {
2531            Codec::Null => Self::Null,
2532            Codec::Boolean => Self::Boolean,
2533            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
2534            Codec::Int64 => Self::Int64,
2535            Codec::TimeMicros => Self::TimeMicros,
2536            Codec::TimestampMillis(_) => Self::TimestampMillis,
2537            Codec::TimestampMicros(_) => Self::TimestampMicros,
2538            Codec::TimestampNanos(_) => Self::TimestampNanos,
2539            #[cfg(feature = "avro_custom_types")]
2540            Codec::DurationNanos
2541            | Codec::DurationMicros
2542            | Codec::DurationMillis
2543            | Codec::DurationSeconds => Self::Int64,
2544            #[cfg(feature = "avro_custom_types")]
2545            Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 | Codec::Time32Secs => {
2546                Self::Int32
2547            }
2548            #[cfg(feature = "avro_custom_types")]
2549            Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
2550                Self::Int64
2551            }
2552            #[cfg(feature = "avro_custom_types")]
2553            Codec::UInt64 => Self::Fixed(8),
2554            #[cfg(feature = "avro_custom_types")]
2555            Codec::Float16 => Self::Fixed(2),
2556            #[cfg(feature = "avro_custom_types")]
2557            Codec::IntervalYearMonth => Self::Fixed(4),
2558            #[cfg(feature = "avro_custom_types")]
2559            Codec::IntervalMonthDayNano => Self::Fixed(16),
2560            #[cfg(feature = "avro_custom_types")]
2561            Codec::IntervalDayTime => Self::Fixed(8),
2562            Codec::Float32 => Self::Float32,
2563            Codec::Float64 => Self::Float64,
2564            Codec::Binary => Self::Bytes,
2565            Codec::Utf8 | Codec::Utf8View => Self::String,
2566            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2567            Codec::Decimal(_, _, size) => Self::Decimal(*size),
2568            Codec::Uuid => Self::UuidString, // encoded as string
2569            Codec::Enum(_) => Self::Enum,
2570            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2571            Codec::Struct(fields) => {
2572                if let Some(ResolutionInfo::Record(rec)) = dt.resolution.as_ref() {
2573                    Self::Struct(
2574                        rec.writer_fields
2575                            .iter()
2576                            .map(|wf| match wf {
2577                                ResolvedField::ToReader(_, wdt) | ResolvedField::Skip(wdt) => {
2578                                    Skipper::from_avro(wdt)
2579                                }
2580                            })
2581                            .collect::<Result<_, _>>()?,
2582                    )
2583                } else {
2584                    Self::Struct(
2585                        fields
2586                            .iter()
2587                            .map(|f| Skipper::from_avro(f.data_type()))
2588                            .collect::<Result<_, _>>()?,
2589                    )
2590                }
2591            }
2592            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2593            Codec::Interval => Self::DurationFixed12,
2594            Codec::Union(encodings, _, _) => {
2595                let max_addr = (i32::MAX as usize) + 1;
2596                if encodings.len() > max_addr {
2597                    return Err(AvroError::SchemaError(format!(
2598                        "Writer union has {} branches, which exceeds the maximum addressable \
2599                         branches by an Avro int tag ({} + 1).",
2600                        encodings.len(),
2601                        i32::MAX
2602                    )));
2603                }
2604                Self::Union(
2605                    encodings
2606                        .iter()
2607                        .map(Skipper::from_avro)
2608                        .collect::<Result<_, _>>()?,
2609                )
2610            }
2611            #[cfg(feature = "avro_custom_types")]
2612            Codec::RunEndEncoded(inner, _w) => {
2613                Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2614            }
2615        };
2616        if let Some(n) = dt.nullability() {
2617            base = Self::Nullable(n, Box::new(base));
2618        }
2619        Ok(base)
2620    }
2621
2622    fn skip(&self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2623        match self {
2624            Self::Null => Ok(()),
2625            Self::Boolean => {
2626                buf.get_bool()?;
2627                Ok(())
2628            }
2629            Self::Int32 => {
2630                buf.skip_int()?;
2631                Ok(())
2632            }
2633            Self::Int64
2634            | Self::TimeMicros
2635            | Self::TimestampMillis
2636            | Self::TimestampMicros
2637            | Self::TimestampNanos => {
2638                buf.skip_long()?;
2639                Ok(())
2640            }
2641            Self::Float32 => {
2642                buf.get_float()?;
2643                Ok(())
2644            }
2645            Self::Float64 => {
2646                buf.get_double()?;
2647                Ok(())
2648            }
2649            Self::Bytes | Self::String | Self::UuidString => {
2650                buf.get_bytes()?;
2651                Ok(())
2652            }
2653            Self::Fixed(sz) => {
2654                buf.get_fixed(*sz)?;
2655                Ok(())
2656            }
2657            Self::Decimal(size) => {
2658                if let Some(s) = size {
2659                    buf.get_fixed(*s)
2660                } else {
2661                    buf.get_bytes()
2662                }?;
2663                Ok(())
2664            }
2665            Self::Enum => {
2666                buf.skip_int()?;
2667                Ok(())
2668            }
2669            Self::DurationFixed12 => {
2670                buf.get_fixed(12)?;
2671                Ok(())
2672            }
2673            Self::List(item) => {
2674                skip_blocks(buf, |c| item.skip(c))?;
2675                Ok(())
2676            }
2677            Self::Map(value) => {
2678                skip_blocks(buf, |c| {
2679                    c.get_bytes()?; // key
2680                    value.skip(c)
2681                })?;
2682                Ok(())
2683            }
2684            Self::Struct(fields) => {
2685                for f in fields.iter() {
2686                    f.skip(buf)?
2687                }
2688                Ok(())
2689            }
2690            Self::Union(encodings) => {
2691                // Union tag must be ZigZag-decoded
2692                let raw = buf.get_long()?;
2693                if raw < 0 {
2694                    return Err(AvroError::ParseError(format!(
2695                        "Negative union branch index {raw}"
2696                    )));
2697                }
2698                let idx: usize = usize::try_from(raw).map_err(|_| {
2699                    AvroError::ParseError(format!(
2700                        "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2701                        (usize::BITS as usize)
2702                    ))
2703                })?;
2704                let Some(encoding) = encodings.get(idx) else {
2705                    return Err(AvroError::ParseError(format!(
2706                        "Union branch index {idx} out of range for skipper ({} branches)",
2707                        encodings.len()
2708                    )));
2709                };
2710                encoding.skip(buf)
2711            }
2712            Self::Nullable(order, inner) => {
2713                let branch = buf.read_vlq()?;
2714                let is_not_null = match *order {
2715                    Nullability::NullFirst => branch != 0,
2716                    Nullability::NullSecond => branch == 0,
2717                };
2718                if is_not_null {
2719                    inner.skip(buf)?;
2720                }
2721                Ok(())
2722            }
2723            #[cfg(feature = "avro_custom_types")]
2724            Self::RunEndEncoded(inner) => inner.skip(buf),
2725        }
2726    }
2727}
2728
2729#[cfg(test)]
2730mod tests {
2731    use super::*;
2732    use crate::codec::AvroFieldBuilder;
2733    use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2734    use arrow_array::cast::AsArray;
2735    use indexmap::IndexMap;
2736    use std::collections::HashMap;
2737
2738    fn encode_avro_int(value: i32) -> Vec<u8> {
2739        let mut buf = Vec::new();
2740        let mut v = (value << 1) ^ (value >> 31);
2741        while v & !0x7F != 0 {
2742            buf.push(((v & 0x7F) | 0x80) as u8);
2743            v >>= 7;
2744        }
2745        buf.push(v as u8);
2746        buf
2747    }
2748
2749    fn encode_avro_long(value: i64) -> Vec<u8> {
2750        let mut buf = Vec::new();
2751        let mut v = (value << 1) ^ (value >> 63);
2752        while v & !0x7F != 0 {
2753            buf.push(((v & 0x7F) | 0x80) as u8);
2754            v >>= 7;
2755        }
2756        buf.push(v as u8);
2757        buf
2758    }
2759
2760    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2761        let mut buf = encode_avro_long(bytes.len() as i64);
2762        buf.extend_from_slice(bytes);
2763        buf
2764    }
2765
2766    fn avro_from_codec(codec: Codec) -> AvroDataType {
2767        AvroDataType::new(codec, Default::default(), None)
2768    }
2769
2770    fn resolved_root_datatype(
2771        writer: Schema<'static>,
2772        reader: Schema<'static>,
2773        use_utf8view: bool,
2774        strict_mode: bool,
2775    ) -> AvroDataType {
2776        // Wrap writer schema in a single-field record
2777        let writer_record = Schema::Complex(ComplexType::Record(Record {
2778            name: "Root",
2779            namespace: None,
2780            doc: None,
2781            aliases: vec![],
2782            fields: vec![Field {
2783                name: "v",
2784                r#type: writer,
2785                default: None,
2786                doc: None,
2787                aliases: vec![],
2788            }],
2789            attributes: Attributes::default(),
2790        }));
2791
2792        // Wrap reader schema in a single-field record
2793        let reader_record = Schema::Complex(ComplexType::Record(Record {
2794            name: "Root",
2795            namespace: None,
2796            doc: None,
2797            aliases: vec![],
2798            fields: vec![Field {
2799                name: "v",
2800                r#type: reader,
2801                default: None,
2802                doc: None,
2803                aliases: vec![],
2804            }],
2805            attributes: Attributes::default(),
2806        }));
2807
2808        // Build resolved record, then extract the inner field's resolved AvroDataType
2809        let field = AvroFieldBuilder::new(&writer_record)
2810            .with_reader_schema(&reader_record)
2811            .with_utf8view(use_utf8view)
2812            .with_strict_mode(strict_mode)
2813            .build()
2814            .expect("schema resolution should succeed");
2815
2816        match field.data_type().codec() {
2817            Codec::Struct(fields) => fields[0].data_type().clone(),
2818            other => panic!("expected wrapper struct, got {other:?}"),
2819        }
2820    }
2821
2822    fn decoder_for_promotion(
2823        writer: PrimitiveType,
2824        reader: PrimitiveType,
2825        use_utf8view: bool,
2826    ) -> Decoder {
2827        let ws = Schema::TypeName(TypeName::Primitive(writer));
2828        let rs = Schema::TypeName(TypeName::Primitive(reader));
2829        let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2830        Decoder::try_new(&dt).unwrap()
2831    }
2832
2833    fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2834        AvroDataType::new(codec, HashMap::new(), nullability)
2835    }
2836
2837    #[cfg(feature = "avro_custom_types")]
2838    fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2839        let mut out = Vec::with_capacity(10);
2840        while x >= 0x80 {
2841            out.push((x as u8) | 0x80);
2842            x >>= 7;
2843        }
2844        out.push(x as u8);
2845        out
2846    }
2847
2848    #[test]
2849    fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2850        let ws = Schema::Union(vec![
2851            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2852            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2853        ]);
2854        let rs = Schema::Union(vec![
2855            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2856            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2857        ]);
2858
2859        let dt = resolved_root_datatype(ws, rs, false, false);
2860        let mut dec = Decoder::try_new(&dt).unwrap();
2861
2862        let mut rec1 = encode_avro_long(0);
2863        rec1.extend(encode_avro_int(7));
2864        let mut cur1 = AvroCursor::new(&rec1);
2865        dec.decode(&mut cur1).unwrap();
2866
2867        let mut rec2 = encode_avro_long(1);
2868        rec2.extend(encode_avro_bytes("abc".as_bytes()));
2869        let mut cur2 = AvroCursor::new(&rec2);
2870        dec.decode(&mut cur2).unwrap();
2871
2872        let arr = dec.flush(None).unwrap();
2873        let ua = arr
2874            .as_any()
2875            .downcast_ref::<UnionArray>()
2876            .expect("dense union output");
2877
2878        assert_eq!(
2879            ua.type_id(0),
2880            1,
2881            "first value must select reader 'long' branch"
2882        );
2883        assert_eq!(ua.value_offset(0), 0);
2884
2885        assert_eq!(
2886            ua.type_id(1),
2887            0,
2888            "second value must select reader 'string' branch"
2889        );
2890        assert_eq!(ua.value_offset(1), 0);
2891
2892        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2893        assert_eq!(long_child.len(), 1);
2894        assert_eq!(long_child.value(0), 7);
2895
2896        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2897        assert_eq!(str_child.len(), 1);
2898        assert_eq!(str_child.value(0), "abc");
2899    }
2900
2901    #[test]
2902    fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2903        let ws = Schema::Union(vec![
2904            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2905            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2906        ]);
2907        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2908
2909        let dt = resolved_root_datatype(ws, rs, false, false);
2910        let mut dec = Decoder::try_new(&dt).unwrap();
2911
2912        let mut data = encode_avro_long(0);
2913        data.extend(encode_avro_int(5));
2914        let mut cur = AvroCursor::new(&data);
2915        dec.decode(&mut cur).unwrap();
2916
2917        let arr = dec.flush(None).unwrap();
2918        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2919        assert_eq!(out.len(), 1);
2920        assert_eq!(out.value(0), 5);
2921    }
2922
2923    #[test]
2924    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2925        let ws = Schema::Union(vec![
2926            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2927            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2928        ]);
2929        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2930
2931        let dt = resolved_root_datatype(ws, rs, false, false);
2932        let mut dec = Decoder::try_new(&dt).unwrap();
2933
2934        let mut data = encode_avro_long(1);
2935        data.extend(encode_avro_bytes("z".as_bytes()));
2936        let mut cur = AvroCursor::new(&data);
2937        let res = dec.decode(&mut cur);
2938        assert!(
2939            res.is_err(),
2940            "expected error when writer union branch does not resolve to reader non-union type"
2941        );
2942    }
2943
2944    #[test]
2945    fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2946        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2947        let rs = Schema::Union(vec![
2948            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2949            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2950        ]);
2951
2952        let dt = resolved_root_datatype(ws, rs, false, false);
2953        let mut dec = Decoder::try_new(&dt).unwrap();
2954
2955        let data = encode_avro_int(6);
2956        let mut cur = AvroCursor::new(&data);
2957        dec.decode(&mut cur).unwrap();
2958
2959        let arr = dec.flush(None).unwrap();
2960        let ua = arr
2961            .as_any()
2962            .downcast_ref::<UnionArray>()
2963            .expect("dense union output");
2964        assert_eq!(ua.len(), 1);
2965        assert_eq!(
2966            ua.type_id(0),
2967            1,
2968            "must resolve to reader 'long' branch (type_id 1)"
2969        );
2970        assert_eq!(ua.value_offset(0), 0);
2971
2972        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2973        assert_eq!(long_child.len(), 1);
2974        assert_eq!(long_child.value(0), 6);
2975
2976        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2977        assert_eq!(str_child.len(), 0, "string branch must be empty");
2978    }
2979
2980    #[test]
2981    fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2982        let ws = Schema::Union(vec![
2983            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2984            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2985        ]);
2986        let rs = Schema::Union(vec![
2987            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2988            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2989        ]);
2990
2991        let dt = resolved_root_datatype(ws, rs, false, false);
2992        let mut dec = Decoder::try_new(&dt).unwrap();
2993
2994        let mut data = encode_avro_long(1);
2995        data.push(1);
2996        let mut cur = AvroCursor::new(&data);
2997        let res = dec.decode(&mut cur);
2998        assert!(
2999            res.is_err(),
3000            "expected error for unmapped writer 'boolean' branch"
3001        );
3002    }
3003
3004    #[test]
3005    fn test_schema_resolution_promotion_int_to_long() {
3006        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
3007        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
3008        for v in [0, 1, -2, 123456] {
3009            let data = encode_avro_int(v);
3010            let mut cur = AvroCursor::new(&data);
3011            dec.decode(&mut cur).unwrap();
3012        }
3013        let arr = dec.flush(None).unwrap();
3014        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
3015        assert_eq!(a.value(0), 0);
3016        assert_eq!(a.value(1), 1);
3017        assert_eq!(a.value(2), -2);
3018        assert_eq!(a.value(3), 123456);
3019    }
3020
3021    #[test]
3022    fn test_schema_resolution_promotion_int_to_float() {
3023        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
3024        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
3025        for v in [0, 42, -7] {
3026            let data = encode_avro_int(v);
3027            let mut cur = AvroCursor::new(&data);
3028            dec.decode(&mut cur).unwrap();
3029        }
3030        let arr = dec.flush(None).unwrap();
3031        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3032        assert_eq!(a.value(0), 0.0);
3033        assert_eq!(a.value(1), 42.0);
3034        assert_eq!(a.value(2), -7.0);
3035    }
3036
3037    #[test]
3038    fn test_schema_resolution_promotion_int_to_double() {
3039        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
3040        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
3041        for v in [1, -1, 10_000] {
3042            let data = encode_avro_int(v);
3043            let mut cur = AvroCursor::new(&data);
3044            dec.decode(&mut cur).unwrap();
3045        }
3046        let arr = dec.flush(None).unwrap();
3047        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3048        assert_eq!(a.value(0), 1.0);
3049        assert_eq!(a.value(1), -1.0);
3050        assert_eq!(a.value(2), 10_000.0);
3051    }
3052
3053    #[test]
3054    fn test_schema_resolution_promotion_long_to_float() {
3055        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
3056        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
3057        for v in [0_i64, 1_000_000_i64, -123_i64] {
3058            let data = encode_avro_long(v);
3059            let mut cur = AvroCursor::new(&data);
3060            dec.decode(&mut cur).unwrap();
3061        }
3062        let arr = dec.flush(None).unwrap();
3063        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3064        assert_eq!(a.value(0), 0.0);
3065        assert_eq!(a.value(1), 1_000_000.0);
3066        assert_eq!(a.value(2), -123.0);
3067    }
3068
3069    #[test]
3070    fn test_schema_resolution_promotion_long_to_double() {
3071        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
3072        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
3073        for v in [2_i64, -2_i64, 9_223_372_i64] {
3074            let data = encode_avro_long(v);
3075            let mut cur = AvroCursor::new(&data);
3076            dec.decode(&mut cur).unwrap();
3077        }
3078        let arr = dec.flush(None).unwrap();
3079        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3080        assert_eq!(a.value(0), 2.0);
3081        assert_eq!(a.value(1), -2.0);
3082        assert_eq!(a.value(2), 9_223_372.0);
3083    }
3084
3085    #[test]
3086    fn test_schema_resolution_promotion_float_to_double() {
3087        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
3088        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
3089        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
3090            let data = v.to_le_bytes().to_vec();
3091            let mut cur = AvroCursor::new(&data);
3092            dec.decode(&mut cur).unwrap();
3093        }
3094        let arr = dec.flush(None).unwrap();
3095        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3096        assert_eq!(a.value(0), 0.5_f64);
3097        assert_eq!(a.value(1), -3.25_f64);
3098        assert_eq!(a.value(2), 1.0e6_f64);
3099    }
3100
3101    #[test]
3102    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
3103        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
3104        assert!(matches!(dec, Decoder::BytesToString(_, _)));
3105        for s in ["hello", "world", "héllo"] {
3106            let data = encode_avro_bytes(s.as_bytes());
3107            let mut cur = AvroCursor::new(&data);
3108            dec.decode(&mut cur).unwrap();
3109        }
3110        let arr = dec.flush(None).unwrap();
3111        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3112        assert_eq!(a.value(0), "hello");
3113        assert_eq!(a.value(1), "world");
3114        assert_eq!(a.value(2), "héllo");
3115    }
3116
3117    #[test]
3118    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
3119        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
3120        assert!(matches!(dec, Decoder::BytesToString(_, _)));
3121        let data = encode_avro_bytes("abc".as_bytes());
3122        let mut cur = AvroCursor::new(&data);
3123        dec.decode(&mut cur).unwrap();
3124        let arr = dec.flush(None).unwrap();
3125        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3126        assert_eq!(a.value(0), "abc");
3127    }
3128
3129    #[test]
3130    fn test_schema_resolution_promotion_string_to_bytes() {
3131        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
3132        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
3133        for s in ["", "abc", "data"] {
3134            let data = encode_avro_bytes(s.as_bytes());
3135            let mut cur = AvroCursor::new(&data);
3136            dec.decode(&mut cur).unwrap();
3137        }
3138        let arr = dec.flush(None).unwrap();
3139        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3140        assert_eq!(a.value(0), b"");
3141        assert_eq!(a.value(1), b"abc");
3142        assert_eq!(a.value(2), "data".as_bytes());
3143    }
3144
3145    #[test]
3146    fn test_schema_resolution_no_promotion_passthrough_int() {
3147        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3148        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3149        // Wrap both in a synthetic single-field record and resolve with AvroFieldBuilder
3150        let writer_record = Schema::Complex(ComplexType::Record(Record {
3151            name: "Root",
3152            namespace: None,
3153            doc: None,
3154            aliases: vec![],
3155            fields: vec![Field {
3156                name: "v",
3157                r#type: ws,
3158                default: None,
3159                doc: None,
3160                aliases: vec![],
3161            }],
3162            attributes: Attributes::default(),
3163        }));
3164        let reader_record = Schema::Complex(ComplexType::Record(Record {
3165            name: "Root",
3166            namespace: None,
3167            doc: None,
3168            aliases: vec![],
3169            fields: vec![Field {
3170                name: "v",
3171                r#type: rs,
3172                default: None,
3173                doc: None,
3174                aliases: vec![],
3175            }],
3176            attributes: Attributes::default(),
3177        }));
3178        let field = AvroFieldBuilder::new(&writer_record)
3179            .with_reader_schema(&reader_record)
3180            .with_utf8view(false)
3181            .with_strict_mode(false)
3182            .build()
3183            .unwrap();
3184        // Extract the resolved inner field's AvroDataType
3185        let dt = match field.data_type().codec() {
3186            Codec::Struct(fields) => fields[0].data_type().clone(),
3187            other => panic!("expected wrapper struct, got {other:?}"),
3188        };
3189        let mut dec = Decoder::try_new(&dt).unwrap();
3190        assert!(matches!(dec, Decoder::Int32(_)));
3191        for v in [7, -9] {
3192            let data = encode_avro_int(v);
3193            let mut cur = AvroCursor::new(&data);
3194            dec.decode(&mut cur).unwrap();
3195        }
3196        let arr = dec.flush(None).unwrap();
3197        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3198        assert_eq!(a.value(0), 7);
3199        assert_eq!(a.value(1), -9);
3200    }
3201
3202    #[test]
3203    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
3204        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3205        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
3206        let writer_record = Schema::Complex(ComplexType::Record(Record {
3207            name: "Root",
3208            namespace: None,
3209            doc: None,
3210            aliases: vec![],
3211            fields: vec![Field {
3212                name: "v",
3213                r#type: ws,
3214                default: None,
3215                doc: None,
3216                aliases: vec![],
3217            }],
3218            attributes: Attributes::default(),
3219        }));
3220        let reader_record = Schema::Complex(ComplexType::Record(Record {
3221            name: "Root",
3222            namespace: None,
3223            doc: None,
3224            aliases: vec![],
3225            fields: vec![Field {
3226                name: "v",
3227                r#type: rs,
3228                default: None,
3229                doc: None,
3230                aliases: vec![],
3231            }],
3232            attributes: Attributes::default(),
3233        }));
3234        let res = AvroFieldBuilder::new(&writer_record)
3235            .with_reader_schema(&reader_record)
3236            .with_utf8view(false)
3237            .with_strict_mode(false)
3238            .build();
3239        assert!(res.is_err(), "expected error for illegal promotion");
3240    }
3241
3242    #[test]
3243    fn test_map_decoding_one_entry() {
3244        let value_type = avro_from_codec(Codec::Utf8);
3245        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3246        let mut decoder = Decoder::try_new(&map_type).unwrap();
3247        // Encode a single map with one entry: {"hello": "world"}
3248        let mut data = Vec::new();
3249        data.extend_from_slice(&encode_avro_long(1));
3250        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
3251        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
3252        data.extend_from_slice(&encode_avro_long(0));
3253        let mut cursor = AvroCursor::new(&data);
3254        decoder.decode(&mut cursor).unwrap();
3255        let array = decoder.flush(None).unwrap();
3256        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3257        assert_eq!(map_arr.len(), 1); // one map
3258        assert_eq!(map_arr.value_length(0), 1);
3259        let entries = map_arr.value(0);
3260        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
3261        assert_eq!(struct_entries.len(), 1);
3262        let key_arr = struct_entries
3263            .column_by_name("key")
3264            .unwrap()
3265            .as_any()
3266            .downcast_ref::<StringArray>()
3267            .unwrap();
3268        let val_arr = struct_entries
3269            .column_by_name("value")
3270            .unwrap()
3271            .as_any()
3272            .downcast_ref::<StringArray>()
3273            .unwrap();
3274        assert_eq!(key_arr.value(0), "hello");
3275        assert_eq!(val_arr.value(0), "world");
3276    }
3277
3278    #[test]
3279    fn test_map_decoding_empty() {
3280        let value_type = avro_from_codec(Codec::Utf8);
3281        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3282        let mut decoder = Decoder::try_new(&map_type).unwrap();
3283        let data = encode_avro_long(0);
3284        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3285        let array = decoder.flush(None).unwrap();
3286        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3287        assert_eq!(map_arr.len(), 1);
3288        assert_eq!(map_arr.value_length(0), 0);
3289    }
3290
3291    #[test]
3292    fn test_fixed_decoding() {
3293        let avro_type = avro_from_codec(Codec::Fixed(3));
3294        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3295
3296        let data1 = [1u8, 2, 3];
3297        let mut cursor1 = AvroCursor::new(&data1);
3298        decoder
3299            .decode(&mut cursor1)
3300            .expect("Failed to decode data1");
3301        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
3302        let data2 = [4u8, 5, 6];
3303        let mut cursor2 = AvroCursor::new(&data2);
3304        decoder
3305            .decode(&mut cursor2)
3306            .expect("Failed to decode data2");
3307        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
3308        let array = decoder.flush(None).expect("Failed to flush decoder");
3309        assert_eq!(array.len(), 2, "Array should contain two items");
3310        let fixed_size_binary_array = array
3311            .as_any()
3312            .downcast_ref::<FixedSizeBinaryArray>()
3313            .expect("Failed to downcast to FixedSizeBinaryArray");
3314        assert_eq!(
3315            fixed_size_binary_array.value_length(),
3316            3,
3317            "Fixed size of binary values should be 3"
3318        );
3319        assert_eq!(
3320            fixed_size_binary_array.value(0),
3321            &[1, 2, 3],
3322            "First item mismatch"
3323        );
3324        assert_eq!(
3325            fixed_size_binary_array.value(1),
3326            &[4, 5, 6],
3327            "Second item mismatch"
3328        );
3329    }
3330
3331    #[test]
3332    fn test_fixed_decoding_empty() {
3333        let avro_type = avro_from_codec(Codec::Fixed(5));
3334        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3335
3336        let array = decoder
3337            .flush(None)
3338            .expect("Failed to flush decoder for empty input");
3339
3340        assert_eq!(array.len(), 0, "Array should be empty");
3341        let fixed_size_binary_array = array
3342            .as_any()
3343            .downcast_ref::<FixedSizeBinaryArray>()
3344            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
3345
3346        assert_eq!(
3347            fixed_size_binary_array.value_length(),
3348            5,
3349            "Fixed size of binary values should be 5 as per type"
3350        );
3351    }
3352
3353    #[test]
3354    fn test_uuid_decoding() {
3355        let avro_type = avro_from_codec(Codec::Uuid);
3356        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3357        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
3358        let data = encode_avro_bytes(uuid_str.as_bytes());
3359        let mut cursor = AvroCursor::new(&data);
3360        decoder.decode(&mut cursor).expect("Failed to decode data");
3361        assert_eq!(
3362            cursor.position(),
3363            data.len(),
3364            "Cursor should advance by varint size + data size"
3365        );
3366        let array = decoder.flush(None).expect("Failed to flush decoder");
3367        let fixed_size_binary_array = array
3368            .as_any()
3369            .downcast_ref::<FixedSizeBinaryArray>()
3370            .expect("Array should be a FixedSizeBinaryArray");
3371        assert_eq!(fixed_size_binary_array.len(), 1);
3372        assert_eq!(fixed_size_binary_array.value_length(), 16);
3373        let expected_bytes = [
3374            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
3375            0x6b, 0xf6,
3376        ];
3377        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
3378    }
3379
3380    #[test]
3381    fn test_array_decoding() {
3382        let item_dt = avro_from_codec(Codec::Int32);
3383        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3384        let mut decoder = Decoder::try_new(&list_dt).unwrap();
3385        let mut row1 = Vec::new();
3386        row1.extend_from_slice(&encode_avro_long(2));
3387        row1.extend_from_slice(&encode_avro_int(10));
3388        row1.extend_from_slice(&encode_avro_int(20));
3389        row1.extend_from_slice(&encode_avro_long(0));
3390        let row2 = encode_avro_long(0);
3391        let mut cursor = AvroCursor::new(&row1);
3392        decoder.decode(&mut cursor).unwrap();
3393        let mut cursor2 = AvroCursor::new(&row2);
3394        decoder.decode(&mut cursor2).unwrap();
3395        let array = decoder.flush(None).unwrap();
3396        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3397        assert_eq!(list_arr.len(), 2);
3398        let offsets = list_arr.value_offsets();
3399        assert_eq!(offsets, &[0, 2, 2]);
3400        let values = list_arr.values();
3401        let int_arr = values.as_primitive::<Int32Type>();
3402        assert_eq!(int_arr.len(), 2);
3403        assert_eq!(int_arr.value(0), 10);
3404        assert_eq!(int_arr.value(1), 20);
3405    }
3406
3407    #[test]
3408    fn test_array_decoding_with_negative_block_count() {
3409        let item_dt = avro_from_codec(Codec::Int32);
3410        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3411        let mut decoder = Decoder::try_new(&list_dt).unwrap();
3412        let mut data = encode_avro_long(-3);
3413        data.extend_from_slice(&encode_avro_long(12));
3414        data.extend_from_slice(&encode_avro_int(1));
3415        data.extend_from_slice(&encode_avro_int(2));
3416        data.extend_from_slice(&encode_avro_int(3));
3417        data.extend_from_slice(&encode_avro_long(0));
3418        let mut cursor = AvroCursor::new(&data);
3419        decoder.decode(&mut cursor).unwrap();
3420        let array = decoder.flush(None).unwrap();
3421        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3422        assert_eq!(list_arr.len(), 1);
3423        assert_eq!(list_arr.value_length(0), 3);
3424        let values = list_arr.values().as_primitive::<Int32Type>();
3425        assert_eq!(values.len(), 3);
3426        assert_eq!(values.value(0), 1);
3427        assert_eq!(values.value(1), 2);
3428        assert_eq!(values.value(2), 3);
3429    }
3430
3431    #[test]
3432    fn test_nested_array_decoding() {
3433        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3434        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
3435        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
3436        let mut buf = Vec::new();
3437        buf.extend(encode_avro_long(1));
3438        buf.extend(encode_avro_long(2));
3439        buf.extend(encode_avro_int(5));
3440        buf.extend(encode_avro_int(6));
3441        buf.extend(encode_avro_long(0));
3442        buf.extend(encode_avro_long(0));
3443        let mut cursor = AvroCursor::new(&buf);
3444        decoder.decode(&mut cursor).unwrap();
3445        let arr = decoder.flush(None).unwrap();
3446        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
3447        assert_eq!(outer.len(), 1);
3448        assert_eq!(outer.value_length(0), 1);
3449        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
3450        assert_eq!(inner.len(), 1);
3451        assert_eq!(inner.value_length(0), 2);
3452        let values = inner
3453            .values()
3454            .as_any()
3455            .downcast_ref::<Int32Array>()
3456            .unwrap();
3457        assert_eq!(values.values(), &[5, 6]);
3458    }
3459
3460    #[test]
3461    fn test_array_decoding_empty_array() {
3462        let value_type = avro_from_codec(Codec::Utf8);
3463        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
3464        let mut decoder = Decoder::try_new(&map_type).unwrap();
3465        let data = encode_avro_long(0);
3466        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3467        let array = decoder.flush(None).unwrap();
3468        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3469        assert_eq!(list_arr.len(), 1);
3470        assert_eq!(list_arr.value_length(0), 0);
3471    }
3472
3473    #[test]
3474    fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
3475        use crate::schema::Array;
3476        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3477            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3478            attributes: Attributes::default(),
3479        }));
3480        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3481            items: Box::new(Schema::Union(vec![
3482                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3483                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3484            ])),
3485            attributes: Attributes::default(),
3486        }));
3487        let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
3488        if let Codec::List(inner) = dt.codec() {
3489            assert_eq!(
3490                inner.nullability(),
3491                Some(Nullability::NullFirst),
3492                "items should be nullable"
3493            );
3494        } else {
3495            panic!("expected List codec");
3496        }
3497        let mut decoder = Decoder::try_new(&dt).unwrap();
3498        let mut data = encode_avro_long(2);
3499        data.extend(encode_avro_int(10));
3500        data.extend(encode_avro_int(20));
3501        data.extend(encode_avro_long(0));
3502        let mut cursor = AvroCursor::new(&data);
3503        decoder.decode(&mut cursor).unwrap();
3504        assert_eq!(
3505            cursor.position(),
3506            data.len(),
3507            "all bytes should be consumed"
3508        );
3509        let array = decoder.flush(None).unwrap();
3510        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3511        assert_eq!(list_arr.len(), 1, "one list/row");
3512        assert_eq!(list_arr.value_length(0), 2, "two items in the list");
3513        let values = list_arr.values().as_primitive::<Int32Type>();
3514        assert_eq!(values.len(), 2);
3515        assert_eq!(values.value(0), 10);
3516        assert_eq!(values.value(1), 20);
3517        assert!(!values.is_null(0));
3518        assert!(!values.is_null(1));
3519    }
3520
3521    #[test]
3522    fn test_decimal_decoding_fixed256() {
3523        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3524        let mut decoder = Decoder::try_new(&dt).unwrap();
3525        let row1 = [
3526            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3527            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3528            0x00, 0x00, 0x30, 0x39,
3529        ];
3530        let row2 = [
3531            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3532            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3533            0xFF, 0xFF, 0xFF, 0x85,
3534        ];
3535        let mut data = Vec::new();
3536        data.extend_from_slice(&row1);
3537        data.extend_from_slice(&row2);
3538        let mut cursor = AvroCursor::new(&data);
3539        decoder.decode(&mut cursor).unwrap();
3540        decoder.decode(&mut cursor).unwrap();
3541        let arr = decoder.flush(None).unwrap();
3542        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3543        assert_eq!(dec.len(), 2);
3544        assert_eq!(dec.value_as_string(0), "123.45");
3545        assert_eq!(dec.value_as_string(1), "-1.23");
3546    }
3547
3548    #[test]
3549    fn test_decimal_decoding_fixed128() {
3550        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
3551        let mut decoder = Decoder::try_new(&dt).unwrap();
3552        let row1 = [
3553            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3554            0x30, 0x39,
3555        ];
3556        let row2 = [
3557            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3558            0xFF, 0x85,
3559        ];
3560        let mut data = Vec::new();
3561        data.extend_from_slice(&row1);
3562        data.extend_from_slice(&row2);
3563        let mut cursor = AvroCursor::new(&data);
3564        decoder.decode(&mut cursor).unwrap();
3565        decoder.decode(&mut cursor).unwrap();
3566        let arr = decoder.flush(None).unwrap();
3567        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3568        assert_eq!(dec.len(), 2);
3569        assert_eq!(dec.value_as_string(0), "123.45");
3570        assert_eq!(dec.value_as_string(1), "-1.23");
3571    }
3572
3573    #[test]
3574    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3575        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3576        let mut decoder = Decoder::try_new(&dt).unwrap();
3577        let row1 = [
3578            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3579            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3580            0x00, 0x00, 0x30, 0x39,
3581        ];
3582        let row2 = [
3583            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3584            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3585            0xFF, 0xFF, 0xFF, 0x85,
3586        ];
3587        let mut data = Vec::new();
3588        data.extend_from_slice(&row1);
3589        data.extend_from_slice(&row2);
3590        let mut cursor = AvroCursor::new(&data);
3591        decoder.decode(&mut cursor).unwrap();
3592        decoder.decode(&mut cursor).unwrap();
3593        let arr = decoder.flush(None).unwrap();
3594        #[cfg(feature = "small_decimals")]
3595        {
3596            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3597            assert_eq!(dec.len(), 2);
3598            assert_eq!(dec.value_as_string(0), "123.45");
3599            assert_eq!(dec.value_as_string(1), "-1.23");
3600        }
3601        #[cfg(not(feature = "small_decimals"))]
3602        {
3603            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3604            assert_eq!(dec.len(), 2);
3605            assert_eq!(dec.value_as_string(0), "123.45");
3606            assert_eq!(dec.value_as_string(1), "-1.23");
3607        }
3608    }
3609
3610    #[test]
3611    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3612        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3613        let mut decoder = Decoder::try_new(&dt).unwrap();
3614        let row1 = [
3615            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3616            0x30, 0x39,
3617        ];
3618        let row2 = [
3619            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3620            0xFF, 0x85,
3621        ];
3622        let mut data = Vec::new();
3623        data.extend_from_slice(&row1);
3624        data.extend_from_slice(&row2);
3625        let mut cursor = AvroCursor::new(&data);
3626        decoder.decode(&mut cursor).unwrap();
3627        decoder.decode(&mut cursor).unwrap();
3628
3629        let arr = decoder.flush(None).unwrap();
3630        #[cfg(feature = "small_decimals")]
3631        {
3632            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3633            assert_eq!(dec.len(), 2);
3634            assert_eq!(dec.value_as_string(0), "123.45");
3635            assert_eq!(dec.value_as_string(1), "-1.23");
3636        }
3637        #[cfg(not(feature = "small_decimals"))]
3638        {
3639            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3640            assert_eq!(dec.len(), 2);
3641            assert_eq!(dec.value_as_string(0), "123.45");
3642            assert_eq!(dec.value_as_string(1), "-1.23");
3643        }
3644    }
3645
3646    #[test]
3647    fn test_decimal_decoding_bytes_with_nulls() {
3648        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3649        let inner = Decoder::try_new(&dt).unwrap();
3650        let mut decoder = Decoder::Nullable(
3651            NullablePlan::ReadTag {
3652                nullability: Nullability::NullSecond,
3653                resolution: ResolutionPlan::Promotion(Promotion::Direct),
3654            },
3655            NullBufferBuilder::new(DEFAULT_CAPACITY),
3656            Box::new(inner),
3657        );
3658        let mut data = Vec::new();
3659        data.extend_from_slice(&encode_avro_int(0));
3660        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3661        data.extend_from_slice(&encode_avro_int(1));
3662        data.extend_from_slice(&encode_avro_int(0));
3663        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3664        let mut cursor = AvroCursor::new(&data);
3665        decoder.decode(&mut cursor).unwrap();
3666        decoder.decode(&mut cursor).unwrap();
3667        decoder.decode(&mut cursor).unwrap();
3668        let arr = decoder.flush(None).unwrap();
3669        #[cfg(feature = "small_decimals")]
3670        {
3671            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3672            assert_eq!(dec_arr.len(), 3);
3673            assert!(dec_arr.is_valid(0));
3674            assert!(!dec_arr.is_valid(1));
3675            assert!(dec_arr.is_valid(2));
3676            assert_eq!(dec_arr.value_as_string(0), "123.4");
3677            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3678        }
3679        #[cfg(not(feature = "small_decimals"))]
3680        {
3681            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3682            assert_eq!(dec_arr.len(), 3);
3683            assert!(dec_arr.is_valid(0));
3684            assert!(!dec_arr.is_valid(1));
3685            assert!(dec_arr.is_valid(2));
3686            assert_eq!(dec_arr.value_as_string(0), "123.4");
3687            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3688        }
3689    }
3690
3691    #[test]
3692    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3693        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3694        let inner = Decoder::try_new(&dt).unwrap();
3695        let mut decoder = Decoder::Nullable(
3696            NullablePlan::ReadTag {
3697                nullability: Nullability::NullSecond,
3698                resolution: ResolutionPlan::Promotion(Promotion::Direct),
3699            },
3700            NullBufferBuilder::new(DEFAULT_CAPACITY),
3701            Box::new(inner),
3702        );
3703        let row1 = [
3704            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3705            0xE2, 0x40,
3706        ];
3707        let row3 = [
3708            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3709            0x1D, 0xC0,
3710        ];
3711        let mut data = Vec::new();
3712        data.extend_from_slice(&encode_avro_int(0));
3713        data.extend_from_slice(&row1);
3714        data.extend_from_slice(&encode_avro_int(1));
3715        data.extend_from_slice(&encode_avro_int(0));
3716        data.extend_from_slice(&row3);
3717        let mut cursor = AvroCursor::new(&data);
3718        decoder.decode(&mut cursor).unwrap();
3719        decoder.decode(&mut cursor).unwrap();
3720        decoder.decode(&mut cursor).unwrap();
3721        let arr = decoder.flush(None).unwrap();
3722        #[cfg(feature = "small_decimals")]
3723        {
3724            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3725            assert_eq!(dec_arr.len(), 3);
3726            assert!(dec_arr.is_valid(0));
3727            assert!(!dec_arr.is_valid(1));
3728            assert!(dec_arr.is_valid(2));
3729            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3730            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3731        }
3732        #[cfg(not(feature = "small_decimals"))]
3733        {
3734            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3735            assert_eq!(dec_arr.len(), 3);
3736            assert!(dec_arr.is_valid(0));
3737            assert!(!dec_arr.is_valid(1));
3738            assert!(dec_arr.is_valid(2));
3739            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3740            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3741        }
3742    }
3743
3744    #[test]
3745    fn test_enum_decoding() {
3746        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3747        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3748        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3749        let mut data = Vec::new();
3750        data.extend_from_slice(&encode_avro_int(2));
3751        data.extend_from_slice(&encode_avro_int(0));
3752        data.extend_from_slice(&encode_avro_int(1));
3753        let mut cursor = AvroCursor::new(&data);
3754        decoder.decode(&mut cursor).unwrap();
3755        decoder.decode(&mut cursor).unwrap();
3756        decoder.decode(&mut cursor).unwrap();
3757        let array = decoder.flush(None).unwrap();
3758        let dict_array = array
3759            .as_any()
3760            .downcast_ref::<DictionaryArray<Int32Type>>()
3761            .unwrap();
3762        assert_eq!(dict_array.len(), 3);
3763        let values = dict_array
3764            .values()
3765            .as_any()
3766            .downcast_ref::<StringArray>()
3767            .unwrap();
3768        assert_eq!(values.value(0), "A");
3769        assert_eq!(values.value(1), "B");
3770        assert_eq!(values.value(2), "C");
3771        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3772    }
3773
3774    #[test]
3775    fn test_enum_decoding_with_nulls() {
3776        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3777        let enum_codec = Codec::Enum(symbols.clone());
3778        let avro_type =
3779            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3780        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3781        let mut data = Vec::new();
3782        data.extend_from_slice(&encode_avro_long(1));
3783        data.extend_from_slice(&encode_avro_int(1));
3784        data.extend_from_slice(&encode_avro_long(0));
3785        data.extend_from_slice(&encode_avro_long(1));
3786        data.extend_from_slice(&encode_avro_int(0));
3787        let mut cursor = AvroCursor::new(&data);
3788        decoder.decode(&mut cursor).unwrap();
3789        decoder.decode(&mut cursor).unwrap();
3790        decoder.decode(&mut cursor).unwrap();
3791        let array = decoder.flush(None).unwrap();
3792        let dict_array = array
3793            .as_any()
3794            .downcast_ref::<DictionaryArray<Int32Type>>()
3795            .unwrap();
3796        assert_eq!(dict_array.len(), 3);
3797        assert!(dict_array.is_valid(0));
3798        assert!(dict_array.is_null(1));
3799        assert!(dict_array.is_valid(2));
3800        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3801        assert_eq!(dict_array.keys(), &expected_keys);
3802        let values = dict_array
3803            .values()
3804            .as_any()
3805            .downcast_ref::<StringArray>()
3806            .unwrap();
3807        assert_eq!(values.value(0), "X");
3808        assert_eq!(values.value(1), "Y");
3809    }
3810
3811    #[test]
3812    fn test_duration_decoding_with_nulls() {
3813        let duration_codec = Codec::Interval;
3814        let avro_type = AvroDataType::new(
3815            duration_codec,
3816            Default::default(),
3817            Some(Nullability::NullFirst),
3818        );
3819        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3820        let mut data = Vec::new();
3821        // First value: 1 month, 2 days, 3 millis
3822        data.extend_from_slice(&encode_avro_long(1)); // not null
3823        let mut duration1 = Vec::new();
3824        duration1.extend_from_slice(&1u32.to_le_bytes());
3825        duration1.extend_from_slice(&2u32.to_le_bytes());
3826        duration1.extend_from_slice(&3u32.to_le_bytes());
3827        data.extend_from_slice(&duration1);
3828        // Second value: null
3829        data.extend_from_slice(&encode_avro_long(0)); // null
3830        data.extend_from_slice(&encode_avro_long(1)); // not null
3831        let mut duration2 = Vec::new();
3832        duration2.extend_from_slice(&4u32.to_le_bytes());
3833        duration2.extend_from_slice(&5u32.to_le_bytes());
3834        duration2.extend_from_slice(&6u32.to_le_bytes());
3835        data.extend_from_slice(&duration2);
3836        let mut cursor = AvroCursor::new(&data);
3837        decoder.decode(&mut cursor).unwrap();
3838        decoder.decode(&mut cursor).unwrap();
3839        decoder.decode(&mut cursor).unwrap();
3840        let array = decoder.flush(None).unwrap();
3841        let interval_array = array
3842            .as_any()
3843            .downcast_ref::<IntervalMonthDayNanoArray>()
3844            .unwrap();
3845        assert_eq!(interval_array.len(), 3);
3846        assert!(interval_array.is_valid(0));
3847        assert!(interval_array.is_null(1));
3848        assert!(interval_array.is_valid(2));
3849        let expected = IntervalMonthDayNanoArray::from(vec![
3850            Some(IntervalMonthDayNano {
3851                months: 1,
3852                days: 2,
3853                nanoseconds: 3_000_000,
3854            }),
3855            None,
3856            Some(IntervalMonthDayNano {
3857                months: 4,
3858                days: 5,
3859                nanoseconds: 6_000_000,
3860            }),
3861        ]);
3862        assert_eq!(interval_array, &expected);
3863    }
3864
3865    #[cfg(feature = "avro_custom_types")]
3866    #[test]
3867    fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3868        let avro_type = AvroDataType::new(
3869            Codec::IntervalMonthDayNano,
3870            Default::default(),
3871            Some(Nullability::NullFirst),
3872        );
3873        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3874        let mut data = Vec::new();
3875        // First value: months=1, days=-2, nanos=3
3876        data.extend_from_slice(&encode_avro_long(1));
3877        data.extend_from_slice(&1i32.to_le_bytes());
3878        data.extend_from_slice(&(-2i32).to_le_bytes());
3879        data.extend_from_slice(&3i64.to_le_bytes());
3880        // Second value: null
3881        data.extend_from_slice(&encode_avro_long(0));
3882        // Third value: months=-4, days=5, nanos=-6
3883        data.extend_from_slice(&encode_avro_long(1));
3884        data.extend_from_slice(&(-4i32).to_le_bytes());
3885        data.extend_from_slice(&5i32.to_le_bytes());
3886        data.extend_from_slice(&(-6i64).to_le_bytes());
3887        let mut cursor = AvroCursor::new(&data);
3888        decoder.decode(&mut cursor).unwrap();
3889        decoder.decode(&mut cursor).unwrap();
3890        decoder.decode(&mut cursor).unwrap();
3891        let array = decoder.flush(None).unwrap();
3892        let interval_array = array
3893            .as_any()
3894            .downcast_ref::<IntervalMonthDayNanoArray>()
3895            .unwrap();
3896        assert_eq!(interval_array.len(), 3);
3897        let expected = IntervalMonthDayNanoArray::from(vec![
3898            Some(IntervalMonthDayNano::new(1, -2, 3)),
3899            None,
3900            Some(IntervalMonthDayNano::new(-4, 5, -6)),
3901        ]);
3902        assert_eq!(interval_array, &expected);
3903    }
3904
3905    #[test]
3906    fn test_duration_decoding_empty() {
3907        let duration_codec = Codec::Interval;
3908        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3909        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3910        let array = decoder.flush(None).unwrap();
3911        assert_eq!(array.len(), 0);
3912    }
3913
3914    #[test]
3915    #[cfg(feature = "avro_custom_types")]
3916    fn test_duration_seconds_decoding() {
3917        let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3918        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3919        let mut data = Vec::new();
3920        // Three values: 0, -1, 2
3921        data.extend_from_slice(&encode_avro_long(0));
3922        data.extend_from_slice(&encode_avro_long(-1));
3923        data.extend_from_slice(&encode_avro_long(2));
3924        let mut cursor = AvroCursor::new(&data);
3925        decoder.decode(&mut cursor).unwrap();
3926        decoder.decode(&mut cursor).unwrap();
3927        decoder.decode(&mut cursor).unwrap();
3928        let array = decoder.flush(None).unwrap();
3929        let dur = array
3930            .as_any()
3931            .downcast_ref::<DurationSecondArray>()
3932            .unwrap();
3933        assert_eq!(dur.values(), &[0, -1, 2]);
3934    }
3935
3936    #[test]
3937    #[cfg(feature = "avro_custom_types")]
3938    fn test_duration_milliseconds_decoding() {
3939        let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3940        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3941        let mut data = Vec::new();
3942        for v in [1i64, 0, -2] {
3943            data.extend_from_slice(&encode_avro_long(v));
3944        }
3945        let mut cursor = AvroCursor::new(&data);
3946        for _ in 0..3 {
3947            decoder.decode(&mut cursor).unwrap();
3948        }
3949        let array = decoder.flush(None).unwrap();
3950        let dur = array
3951            .as_any()
3952            .downcast_ref::<DurationMillisecondArray>()
3953            .unwrap();
3954        assert_eq!(dur.values(), &[1, 0, -2]);
3955    }
3956
3957    #[test]
3958    #[cfg(feature = "avro_custom_types")]
3959    fn test_duration_microseconds_decoding() {
3960        let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3961        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3962        let mut data = Vec::new();
3963        for v in [5i64, -6, 7] {
3964            data.extend_from_slice(&encode_avro_long(v));
3965        }
3966        let mut cursor = AvroCursor::new(&data);
3967        for _ in 0..3 {
3968            decoder.decode(&mut cursor).unwrap();
3969        }
3970        let array = decoder.flush(None).unwrap();
3971        let dur = array
3972            .as_any()
3973            .downcast_ref::<DurationMicrosecondArray>()
3974            .unwrap();
3975        assert_eq!(dur.values(), &[5, -6, 7]);
3976    }
3977
3978    #[test]
3979    #[cfg(feature = "avro_custom_types")]
3980    fn test_duration_nanoseconds_decoding() {
3981        let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3982        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3983        let mut data = Vec::new();
3984        for v in [8i64, 9, -10] {
3985            data.extend_from_slice(&encode_avro_long(v));
3986        }
3987        let mut cursor = AvroCursor::new(&data);
3988        for _ in 0..3 {
3989            decoder.decode(&mut cursor).unwrap();
3990        }
3991        let array = decoder.flush(None).unwrap();
3992        let dur = array
3993            .as_any()
3994            .downcast_ref::<DurationNanosecondArray>()
3995            .unwrap();
3996        assert_eq!(dur.values(), &[8, 9, -10]);
3997    }
3998
3999    #[test]
4000    fn test_nullable_decode_error_bitmap_corruption() {
4001        // Nullable Int32 with ['T','null'] encoding (NullSecond)
4002        let avro_type = AvroDataType::new(
4003            Codec::Int32,
4004            Default::default(),
4005            Some(Nullability::NullSecond),
4006        );
4007        let mut decoder = Decoder::try_new(&avro_type).unwrap();
4008
4009        // Row 1: union branch 1 (null)
4010        let mut row1 = Vec::new();
4011        row1.extend_from_slice(&encode_avro_int(1));
4012
4013        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
4014        let mut row2 = Vec::new();
4015        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
4016
4017        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
4018        let mut row3 = Vec::new();
4019        row3.extend_from_slice(&encode_avro_int(0)); // branch
4020        row3.extend_from_slice(&encode_avro_int(42)); // actual value
4021
4022        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
4023        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
4024        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
4025
4026        let array = decoder.flush(None).unwrap();
4027
4028        // Should contain 2 elements: row1 (null) and row3 (42)
4029        assert_eq!(array.len(), 2);
4030        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
4031        assert!(int_array.is_null(0)); // row1 is null
4032        assert_eq!(int_array.value(1), 42); // row3 value is 42
4033    }
4034
4035    #[test]
4036    fn test_enum_mapping_reordered_symbols() {
4037        let reader_symbols: Arc<[String]> =
4038            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
4039        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
4040        let default_index: i32 = -1;
4041        let mut dec = Decoder::Enum(
4042            Vec::with_capacity(DEFAULT_CAPACITY),
4043            reader_symbols.clone(),
4044            Some(EnumResolution {
4045                mapping,
4046                default_index,
4047            }),
4048        );
4049        let mut data = Vec::new();
4050        data.extend_from_slice(&encode_avro_int(0));
4051        data.extend_from_slice(&encode_avro_int(1));
4052        data.extend_from_slice(&encode_avro_int(2));
4053        let mut cur = AvroCursor::new(&data);
4054        dec.decode(&mut cur).unwrap();
4055        dec.decode(&mut cur).unwrap();
4056        dec.decode(&mut cur).unwrap();
4057        let arr = dec.flush(None).unwrap();
4058        let dict = arr
4059            .as_any()
4060            .downcast_ref::<DictionaryArray<Int32Type>>()
4061            .unwrap();
4062        let expected_keys = Int32Array::from(vec![2, 0, 1]);
4063        assert_eq!(dict.keys(), &expected_keys);
4064        let values = dict
4065            .values()
4066            .as_any()
4067            .downcast_ref::<StringArray>()
4068            .unwrap();
4069        assert_eq!(values.value(0), "B");
4070        assert_eq!(values.value(1), "C");
4071        assert_eq!(values.value(2), "A");
4072    }
4073
4074    #[test]
4075    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
4076        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
4077        let default_index: i32 = 1;
4078        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
4079        let mut dec = Decoder::Enum(
4080            Vec::with_capacity(DEFAULT_CAPACITY),
4081            reader_symbols.clone(),
4082            Some(EnumResolution {
4083                mapping,
4084                default_index,
4085            }),
4086        );
4087        let mut data = Vec::new();
4088        data.extend_from_slice(&encode_avro_int(0));
4089        data.extend_from_slice(&encode_avro_int(1));
4090        data.extend_from_slice(&encode_avro_int(99));
4091        let mut cur = AvroCursor::new(&data);
4092        dec.decode(&mut cur).unwrap();
4093        dec.decode(&mut cur).unwrap();
4094        dec.decode(&mut cur).unwrap();
4095        let arr = dec.flush(None).unwrap();
4096        let dict = arr
4097            .as_any()
4098            .downcast_ref::<DictionaryArray<Int32Type>>()
4099            .unwrap();
4100        let expected_keys = Int32Array::from(vec![0, 1, 1]);
4101        assert_eq!(dict.keys(), &expected_keys);
4102        let values = dict
4103            .values()
4104            .as_any()
4105            .downcast_ref::<StringArray>()
4106            .unwrap();
4107        assert_eq!(values.value(0), "A");
4108        assert_eq!(values.value(1), "B");
4109    }
4110
4111    #[test]
4112    fn test_enum_mapping_unknown_symbol_without_default_errors() {
4113        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
4114        let default_index: i32 = -1; // indicates no default at type-level
4115        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
4116        let mut dec = Decoder::Enum(
4117            Vec::with_capacity(DEFAULT_CAPACITY),
4118            reader_symbols,
4119            Some(EnumResolution {
4120                mapping,
4121                default_index,
4122            }),
4123        );
4124        let data = encode_avro_int(0);
4125        let mut cur = AvroCursor::new(&data);
4126        let err = dec
4127            .decode(&mut cur)
4128            .expect_err("expected decode error for unresolved enum without default");
4129        let msg = err.to_string();
4130        assert!(
4131            msg.contains("not resolvable") && msg.contains("no default"),
4132            "unexpected error message: {msg}"
4133        );
4134    }
4135
4136    fn make_record_resolved_decoder(
4137        reader_fields: &[(&str, DataType, bool)],
4138        writer_projections: Vec<FieldProjection>,
4139    ) -> Decoder {
4140        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4141        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4142        for (name, dt, nullable) in reader_fields {
4143            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4144            let enc = match dt {
4145                DataType::Int32 => Decoder::Int32(Vec::new()),
4146                DataType::Int64 => Decoder::Int64(Vec::new()),
4147                DataType::Utf8 => {
4148                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
4149                }
4150                other => panic!("Unsupported test reader field type: {other:?}"),
4151            };
4152            encodings.push(enc);
4153        }
4154        let fields: Fields = field_refs.into();
4155        Decoder::Record(
4156            fields,
4157            encodings,
4158            vec![None; reader_fields.len()],
4159            Some(Projector {
4160                writer_projections,
4161                default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4162            }),
4163        )
4164    }
4165
4166    #[test]
4167    fn test_skip_writer_trailing_field_int32() {
4168        let mut dec = make_record_resolved_decoder(
4169            &[("id", arrow_schema::DataType::Int32, false)],
4170            vec![
4171                FieldProjection::ToReader(0),
4172                FieldProjection::Skip(super::Skipper::Int32),
4173            ],
4174        );
4175        let mut data = Vec::new();
4176        data.extend_from_slice(&encode_avro_int(7));
4177        data.extend_from_slice(&encode_avro_int(999));
4178        let mut cur = AvroCursor::new(&data);
4179        dec.decode(&mut cur).unwrap();
4180        assert_eq!(cur.position(), data.len());
4181        let arr = dec.flush(None).unwrap();
4182        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
4183        assert_eq!(struct_arr.len(), 1);
4184        let id = struct_arr
4185            .column_by_name("id")
4186            .unwrap()
4187            .as_any()
4188            .downcast_ref::<Int32Array>()
4189            .unwrap();
4190        assert_eq!(id.value(0), 7);
4191    }
4192
4193    #[test]
4194    fn test_skip_writer_middle_field_string() {
4195        let mut dec = make_record_resolved_decoder(
4196            &[
4197                ("id", DataType::Int32, false),
4198                ("score", DataType::Int64, false),
4199            ],
4200            vec![
4201                FieldProjection::ToReader(0),
4202                FieldProjection::Skip(Skipper::String),
4203                FieldProjection::ToReader(1),
4204            ],
4205        );
4206        let mut data = Vec::new();
4207        data.extend_from_slice(&encode_avro_int(42));
4208        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
4209        data.extend_from_slice(&encode_avro_long(1000));
4210        let mut cur = AvroCursor::new(&data);
4211        dec.decode(&mut cur).unwrap();
4212        assert_eq!(cur.position(), data.len());
4213        let arr = dec.flush(None).unwrap();
4214        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4215        let id = s
4216            .column_by_name("id")
4217            .unwrap()
4218            .as_any()
4219            .downcast_ref::<Int32Array>()
4220            .unwrap();
4221        let score = s
4222            .column_by_name("score")
4223            .unwrap()
4224            .as_any()
4225            .downcast_ref::<Int64Array>()
4226            .unwrap();
4227        assert_eq!(id.value(0), 42);
4228        assert_eq!(score.value(0), 1000);
4229    }
4230
4231    #[test]
4232    fn test_skip_writer_array_with_negative_block_count_fast() {
4233        let mut dec = make_record_resolved_decoder(
4234            &[("id", DataType::Int32, false)],
4235            vec![
4236                FieldProjection::Skip(super::Skipper::List(Box::new(Skipper::Int32))),
4237                FieldProjection::ToReader(0),
4238            ],
4239        );
4240        let mut array_payload = Vec::new();
4241        array_payload.extend_from_slice(&encode_avro_int(1));
4242        array_payload.extend_from_slice(&encode_avro_int(2));
4243        array_payload.extend_from_slice(&encode_avro_int(3));
4244        let mut data = Vec::new();
4245        data.extend_from_slice(&encode_avro_long(-3));
4246        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
4247        data.extend_from_slice(&array_payload);
4248        data.extend_from_slice(&encode_avro_long(0));
4249        data.extend_from_slice(&encode_avro_int(5));
4250        let mut cur = AvroCursor::new(&data);
4251        dec.decode(&mut cur).unwrap();
4252        assert_eq!(cur.position(), data.len());
4253        let arr = dec.flush(None).unwrap();
4254        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4255        let id = s
4256            .column_by_name("id")
4257            .unwrap()
4258            .as_any()
4259            .downcast_ref::<Int32Array>()
4260            .unwrap();
4261        assert_eq!(id.len(), 1);
4262        assert_eq!(id.value(0), 5);
4263    }
4264
4265    #[test]
4266    fn test_skip_writer_map_with_negative_block_count_fast() {
4267        let mut dec = make_record_resolved_decoder(
4268            &[("id", DataType::Int32, false)],
4269            vec![
4270                FieldProjection::Skip(Skipper::Map(Box::new(Skipper::Int32))),
4271                FieldProjection::ToReader(0),
4272            ],
4273        );
4274        let mut entries = Vec::new();
4275        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
4276        entries.extend_from_slice(&encode_avro_int(10));
4277        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
4278        entries.extend_from_slice(&encode_avro_int(20));
4279        let mut data = Vec::new();
4280        data.extend_from_slice(&encode_avro_long(-2));
4281        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
4282        data.extend_from_slice(&entries);
4283        data.extend_from_slice(&encode_avro_long(0));
4284        data.extend_from_slice(&encode_avro_int(123));
4285        let mut cur = AvroCursor::new(&data);
4286        dec.decode(&mut cur).unwrap();
4287        assert_eq!(cur.position(), data.len());
4288        let arr = dec.flush(None).unwrap();
4289        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4290        let id = s
4291            .column_by_name("id")
4292            .unwrap()
4293            .as_any()
4294            .downcast_ref::<Int32Array>()
4295            .unwrap();
4296        assert_eq!(id.len(), 1);
4297        assert_eq!(id.value(0), 123);
4298    }
4299
4300    #[test]
4301    fn test_skip_writer_nullable_field_union_nullfirst() {
4302        let mut dec = make_record_resolved_decoder(
4303            &[("id", DataType::Int32, false)],
4304            vec![
4305                FieldProjection::Skip(super::Skipper::Nullable(
4306                    Nullability::NullFirst,
4307                    Box::new(super::Skipper::Int32),
4308                )),
4309                FieldProjection::ToReader(0),
4310            ],
4311        );
4312        let mut row1 = Vec::new();
4313        row1.extend_from_slice(&encode_avro_long(0));
4314        row1.extend_from_slice(&encode_avro_int(5));
4315        let mut row2 = Vec::new();
4316        row2.extend_from_slice(&encode_avro_long(1));
4317        row2.extend_from_slice(&encode_avro_int(123));
4318        row2.extend_from_slice(&encode_avro_int(7));
4319        let mut cur1 = AvroCursor::new(&row1);
4320        let mut cur2 = AvroCursor::new(&row2);
4321        dec.decode(&mut cur1).unwrap();
4322        dec.decode(&mut cur2).unwrap();
4323        assert_eq!(cur1.position(), row1.len());
4324        assert_eq!(cur2.position(), row2.len());
4325        let arr = dec.flush(None).unwrap();
4326        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4327        let id = s
4328            .column_by_name("id")
4329            .unwrap()
4330            .as_any()
4331            .downcast_ref::<Int32Array>()
4332            .unwrap();
4333        assert_eq!(id.len(), 2);
4334        assert_eq!(id.value(0), 5);
4335        assert_eq!(id.value(1), 7);
4336    }
4337
4338    fn make_dense_union_avro(
4339        children: Vec<(Codec, &'_ str, DataType)>,
4340        type_ids: Vec<i8>,
4341    ) -> AvroDataType {
4342        let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
4343        let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
4344        for (codec, name, dt) in children.into_iter() {
4345            avro_children.push(AvroDataType::new(codec, Default::default(), None));
4346            fields.push(arrow_schema::Field::new(name, dt, true));
4347        }
4348        let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
4349        let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
4350        AvroDataType::new(union_codec, Default::default(), None)
4351    }
4352
4353    #[test]
4354    fn test_union_dense_two_children_custom_type_ids() {
4355        let union_dt = make_dense_union_avro(
4356            vec![
4357                (Codec::Int32, "i", DataType::Int32),
4358                (Codec::Utf8, "s", DataType::Utf8),
4359            ],
4360            vec![2, 5],
4361        );
4362        let mut dec = Decoder::try_new(&union_dt).unwrap();
4363        let mut r1 = Vec::new();
4364        r1.extend_from_slice(&encode_avro_long(0));
4365        r1.extend_from_slice(&encode_avro_int(7));
4366        let mut r2 = Vec::new();
4367        r2.extend_from_slice(&encode_avro_long(1));
4368        r2.extend_from_slice(&encode_avro_bytes(b"x"));
4369        let mut r3 = Vec::new();
4370        r3.extend_from_slice(&encode_avro_long(0));
4371        r3.extend_from_slice(&encode_avro_int(-1));
4372        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4373        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4374        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4375        let array = dec.flush(None).unwrap();
4376        let ua = array
4377            .as_any()
4378            .downcast_ref::<UnionArray>()
4379            .expect("expected UnionArray");
4380        assert_eq!(ua.len(), 3);
4381        assert_eq!(ua.type_id(0), 2);
4382        assert_eq!(ua.type_id(1), 5);
4383        assert_eq!(ua.type_id(2), 2);
4384        assert_eq!(ua.value_offset(0), 0);
4385        assert_eq!(ua.value_offset(1), 0);
4386        assert_eq!(ua.value_offset(2), 1);
4387        let int_child = ua
4388            .child(2)
4389            .as_any()
4390            .downcast_ref::<Int32Array>()
4391            .expect("int child");
4392        assert_eq!(int_child.len(), 2);
4393        assert_eq!(int_child.value(0), 7);
4394        assert_eq!(int_child.value(1), -1);
4395        let str_child = ua
4396            .child(5)
4397            .as_any()
4398            .downcast_ref::<StringArray>()
4399            .expect("string child");
4400        assert_eq!(str_child.len(), 1);
4401        assert_eq!(str_child.value(0), "x");
4402    }
4403
4404    #[test]
4405    fn test_union_dense_with_null_and_string_children() {
4406        let union_dt = make_dense_union_avro(
4407            vec![
4408                (Codec::Null, "n", DataType::Null),
4409                (Codec::Utf8, "s", DataType::Utf8),
4410            ],
4411            vec![42, 7],
4412        );
4413        let mut dec = Decoder::try_new(&union_dt).unwrap();
4414        let r1 = encode_avro_long(0);
4415        let mut r2 = Vec::new();
4416        r2.extend_from_slice(&encode_avro_long(1));
4417        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
4418        let r3 = encode_avro_long(0);
4419        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4420        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4421        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4422        let array = dec.flush(None).unwrap();
4423        let ua = array
4424            .as_any()
4425            .downcast_ref::<UnionArray>()
4426            .expect("expected UnionArray");
4427        assert_eq!(ua.len(), 3);
4428        assert_eq!(ua.type_id(0), 42);
4429        assert_eq!(ua.type_id(1), 7);
4430        assert_eq!(ua.type_id(2), 42);
4431        assert_eq!(ua.value_offset(0), 0);
4432        assert_eq!(ua.value_offset(1), 0);
4433        assert_eq!(ua.value_offset(2), 1);
4434        let null_child = ua
4435            .child(42)
4436            .as_any()
4437            .downcast_ref::<NullArray>()
4438            .expect("null child");
4439        assert_eq!(null_child.len(), 2);
4440        let str_child = ua
4441            .child(7)
4442            .as_any()
4443            .downcast_ref::<StringArray>()
4444            .expect("string child");
4445        assert_eq!(str_child.len(), 1);
4446        assert_eq!(str_child.value(0), "abc");
4447    }
4448
4449    #[test]
4450    fn test_union_decode_negative_branch_index_errors() {
4451        let union_dt = make_dense_union_avro(
4452            vec![
4453                (Codec::Int32, "i", DataType::Int32),
4454                (Codec::Utf8, "s", DataType::Utf8),
4455            ],
4456            vec![0, 1],
4457        );
4458        let mut dec = Decoder::try_new(&union_dt).unwrap();
4459        let row = encode_avro_long(-1); // decodes back to -1
4460        let err = dec
4461            .decode(&mut AvroCursor::new(&row))
4462            .expect_err("expected error for negative branch index");
4463        let msg = err.to_string();
4464        assert!(
4465            msg.contains("Negative union branch index"),
4466            "unexpected error message: {msg}"
4467        );
4468    }
4469
4470    #[test]
4471    fn test_union_decode_out_of_range_branch_index_errors() {
4472        let union_dt = make_dense_union_avro(
4473            vec![
4474                (Codec::Int32, "i", DataType::Int32),
4475                (Codec::Utf8, "s", DataType::Utf8),
4476            ],
4477            vec![10, 11],
4478        );
4479        let mut dec = Decoder::try_new(&union_dt).unwrap();
4480        let row = encode_avro_long(2);
4481        let err = dec
4482            .decode(&mut AvroCursor::new(&row))
4483            .expect_err("expected error for out-of-range branch index");
4484        let msg = err.to_string();
4485        assert!(
4486            msg.contains("out of range"),
4487            "unexpected error message: {msg}"
4488        );
4489    }
4490
4491    #[test]
4492    fn test_union_sparse_mode_not_supported() {
4493        let children: Vec<AvroDataType> = vec![
4494            AvroDataType::new(Codec::Int32, Default::default(), None),
4495            AvroDataType::new(Codec::Utf8, Default::default(), None),
4496        ];
4497        let uf = UnionFields::try_new(
4498            vec![1, 3],
4499            vec![
4500                arrow_schema::Field::new("i", DataType::Int32, true),
4501                arrow_schema::Field::new("s", DataType::Utf8, true),
4502            ],
4503        )
4504        .unwrap();
4505        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
4506        let dt = AvroDataType::new(codec, Default::default(), None);
4507        let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
4508        let msg = err.to_string();
4509        assert!(
4510            msg.contains("Sparse Arrow unions are not yet supported"),
4511            "unexpected error message: {msg}"
4512        );
4513    }
4514
4515    fn make_record_decoder_with_projector_defaults(
4516        reader_fields: &[(&str, DataType, bool)],
4517        field_defaults: Vec<Option<AvroLiteral>>,
4518        default_injections: Vec<(usize, AvroLiteral)>,
4519    ) -> Decoder {
4520        assert_eq!(
4521            field_defaults.len(),
4522            reader_fields.len(),
4523            "field_defaults must have one entry per reader field"
4524        );
4525        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4526        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4527        for (name, dt, nullable) in reader_fields {
4528            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4529            let enc = match dt {
4530                DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
4531                DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
4532                DataType::Utf8 => Decoder::String(
4533                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4534                    Vec::with_capacity(DEFAULT_CAPACITY),
4535                ),
4536                other => panic!("Unsupported test field type in helper: {other:?}"),
4537            };
4538            encodings.push(enc);
4539        }
4540        let fields: Fields = field_refs.into();
4541        let projector = Projector {
4542            writer_projections: vec![],
4543            default_injections: Arc::from(default_injections),
4544        };
4545        Decoder::Record(fields, encodings, field_defaults, Some(projector))
4546    }
4547
4548    #[cfg(feature = "avro_custom_types")]
4549    #[test]
4550    fn test_default_append_custom_integer_range_validation() {
4551        let mut d_i8 = Decoder::Int8(Vec::with_capacity(DEFAULT_CAPACITY));
4552        d_i8.append_default(&AvroLiteral::Int(i8::MIN as i32))
4553            .unwrap();
4554        d_i8.append_default(&AvroLiteral::Int(i8::MAX as i32))
4555            .unwrap();
4556        let err_i8_high = d_i8
4557            .append_default(&AvroLiteral::Int(i8::MAX as i32 + 1))
4558            .unwrap_err();
4559        assert!(err_i8_high.to_string().contains("out of range for i8"));
4560        let err_i8_low = d_i8
4561            .append_default(&AvroLiteral::Int(i8::MIN as i32 - 1))
4562            .unwrap_err();
4563        assert!(err_i8_low.to_string().contains("out of range for i8"));
4564        let arr_i8 = d_i8.flush(None).unwrap();
4565        let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4566        assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4567
4568        let mut d_i16 = Decoder::Int16(Vec::with_capacity(DEFAULT_CAPACITY));
4569        d_i16
4570            .append_default(&AvroLiteral::Int(i16::MIN as i32))
4571            .unwrap();
4572        d_i16
4573            .append_default(&AvroLiteral::Int(i16::MAX as i32))
4574            .unwrap();
4575        let err_i16_high = d_i16
4576            .append_default(&AvroLiteral::Int(i16::MAX as i32 + 1))
4577            .unwrap_err();
4578        assert!(err_i16_high.to_string().contains("out of range for i16"));
4579        let err_i16_low = d_i16
4580            .append_default(&AvroLiteral::Int(i16::MIN as i32 - 1))
4581            .unwrap_err();
4582        assert!(err_i16_low.to_string().contains("out of range for i16"));
4583        let arr_i16 = d_i16.flush(None).unwrap();
4584        let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4585        assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4586
4587        let mut d_u8 = Decoder::UInt8(Vec::with_capacity(DEFAULT_CAPACITY));
4588        d_u8.append_default(&AvroLiteral::Int(0)).unwrap();
4589        d_u8.append_default(&AvroLiteral::Int(u8::MAX as i32))
4590            .unwrap();
4591        let err_u8_neg = d_u8.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4592        assert!(err_u8_neg.to_string().contains("out of range for u8"));
4593        let err_u8_high = d_u8
4594            .append_default(&AvroLiteral::Int(u8::MAX as i32 + 1))
4595            .unwrap_err();
4596        assert!(err_u8_high.to_string().contains("out of range for u8"));
4597        let arr_u8 = d_u8.flush(None).unwrap();
4598        let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4599        assert_eq!(values_u8.values(), &[0, u8::MAX]);
4600
4601        let mut d_u16 = Decoder::UInt16(Vec::with_capacity(DEFAULT_CAPACITY));
4602        d_u16.append_default(&AvroLiteral::Int(0)).unwrap();
4603        d_u16
4604            .append_default(&AvroLiteral::Int(u16::MAX as i32))
4605            .unwrap();
4606        let err_u16_neg = d_u16.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4607        assert!(err_u16_neg.to_string().contains("out of range for u16"));
4608        let err_u16_high = d_u16
4609            .append_default(&AvroLiteral::Int(u16::MAX as i32 + 1))
4610            .unwrap_err();
4611        assert!(err_u16_high.to_string().contains("out of range for u16"));
4612        let arr_u16 = d_u16.flush(None).unwrap();
4613        let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4614        assert_eq!(values_u16.values(), &[0, u16::MAX]);
4615
4616        let mut d_u32 = Decoder::UInt32(Vec::with_capacity(DEFAULT_CAPACITY));
4617        d_u32.append_default(&AvroLiteral::Long(0)).unwrap();
4618        d_u32
4619            .append_default(&AvroLiteral::Long(u32::MAX as i64))
4620            .unwrap();
4621        let err_u32_neg = d_u32.append_default(&AvroLiteral::Long(-1)).unwrap_err();
4622        assert!(err_u32_neg.to_string().contains("out of range for u32"));
4623        let err_u32_high = d_u32
4624            .append_default(&AvroLiteral::Long(u32::MAX as i64 + 1))
4625            .unwrap_err();
4626        assert!(err_u32_high.to_string().contains("out of range for u32"));
4627        let arr_u32 = d_u32.flush(None).unwrap();
4628        let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4629        assert_eq!(values_u32.values(), &[0, u32::MAX]);
4630    }
4631
4632    #[cfg(feature = "avro_custom_types")]
4633    #[test]
4634    fn test_decode_custom_integer_range_validation() {
4635        let mut d_i8 = Decoder::try_new(&avro_from_codec(Codec::Int8)).unwrap();
4636        d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32)))
4637            .unwrap();
4638        d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32)))
4639            .unwrap();
4640        let err_i8_high = d_i8
4641            .decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32 + 1)))
4642            .unwrap_err();
4643        assert!(err_i8_high.to_string().contains("out of range for i8"));
4644        let err_i8_low = d_i8
4645            .decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32 - 1)))
4646            .unwrap_err();
4647        assert!(err_i8_low.to_string().contains("out of range for i8"));
4648        let arr_i8 = d_i8.flush(None).unwrap();
4649        let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4650        assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4651
4652        let mut d_i16 = Decoder::try_new(&avro_from_codec(Codec::Int16)).unwrap();
4653        d_i16
4654            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32)))
4655            .unwrap();
4656        d_i16
4657            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32)))
4658            .unwrap();
4659        let err_i16_high = d_i16
4660            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32 + 1)))
4661            .unwrap_err();
4662        assert!(err_i16_high.to_string().contains("out of range for i16"));
4663        let err_i16_low = d_i16
4664            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32 - 1)))
4665            .unwrap_err();
4666        assert!(err_i16_low.to_string().contains("out of range for i16"));
4667        let arr_i16 = d_i16.flush(None).unwrap();
4668        let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4669        assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4670
4671        let mut d_u8 = Decoder::try_new(&avro_from_codec(Codec::UInt8)).unwrap();
4672        d_u8.decode(&mut AvroCursor::new(&encode_avro_int(0)))
4673            .unwrap();
4674        d_u8.decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32)))
4675            .unwrap();
4676        let err_u8_neg = d_u8
4677            .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4678            .unwrap_err();
4679        assert!(err_u8_neg.to_string().contains("out of range for u8"));
4680        let err_u8_high = d_u8
4681            .decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32 + 1)))
4682            .unwrap_err();
4683        assert!(err_u8_high.to_string().contains("out of range for u8"));
4684        let arr_u8 = d_u8.flush(None).unwrap();
4685        let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4686        assert_eq!(values_u8.values(), &[0, u8::MAX]);
4687
4688        let mut d_u16 = Decoder::try_new(&avro_from_codec(Codec::UInt16)).unwrap();
4689        d_u16
4690            .decode(&mut AvroCursor::new(&encode_avro_int(0)))
4691            .unwrap();
4692        d_u16
4693            .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32)))
4694            .unwrap();
4695        let err_u16_neg = d_u16
4696            .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4697            .unwrap_err();
4698        assert!(err_u16_neg.to_string().contains("out of range for u16"));
4699        let err_u16_high = d_u16
4700            .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32 + 1)))
4701            .unwrap_err();
4702        assert!(err_u16_high.to_string().contains("out of range for u16"));
4703        let arr_u16 = d_u16.flush(None).unwrap();
4704        let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4705        assert_eq!(values_u16.values(), &[0, u16::MAX]);
4706
4707        let mut d_u32 = Decoder::try_new(&avro_from_codec(Codec::UInt32)).unwrap();
4708        d_u32
4709            .decode(&mut AvroCursor::new(&encode_avro_long(0)))
4710            .unwrap();
4711        d_u32
4712            .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64)))
4713            .unwrap();
4714        let err_u32_neg = d_u32
4715            .decode(&mut AvroCursor::new(&encode_avro_long(-1)))
4716            .unwrap_err();
4717        assert!(err_u32_neg.to_string().contains("out of range for u32"));
4718        let err_u32_high = d_u32
4719            .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64 + 1)))
4720            .unwrap_err();
4721        assert!(err_u32_high.to_string().contains("out of range for u32"));
4722        let arr_u32 = d_u32.flush(None).unwrap();
4723        let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4724        assert_eq!(values_u32.values(), &[0, u32::MAX]);
4725    }
4726
4727    #[test]
4728    fn test_default_append_int32_and_int64_from_int_and_long() {
4729        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4730        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
4731        let arr = d_i32.flush(None).unwrap();
4732        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4733        assert_eq!(a.len(), 1);
4734        assert_eq!(a.value(0), 42);
4735        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
4736        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
4737        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
4738        let arr64 = d_i64.flush(None).unwrap();
4739        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
4740        assert_eq!(a64.len(), 2);
4741        assert_eq!(a64.value(0), 5);
4742        assert_eq!(a64.value(1), 7);
4743    }
4744
4745    #[test]
4746    fn test_default_append_floats_and_doubles() {
4747        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
4748        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
4749        let arr32 = d_f32.flush(None).unwrap();
4750        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
4751        assert_eq!(a.value(0), 1.5);
4752        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4753        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
4754        let arr64 = d_f64.flush(None).unwrap();
4755        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
4756        assert_eq!(b.value(0), 2.25);
4757    }
4758
4759    #[test]
4760    fn test_default_append_string_and_bytes() {
4761        let mut d_str = Decoder::String(
4762            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4763            Vec::with_capacity(DEFAULT_CAPACITY),
4764        );
4765        d_str
4766            .append_default(&AvroLiteral::String("hi".into()))
4767            .unwrap();
4768        let s_arr = d_str.flush(None).unwrap();
4769        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
4770        assert_eq!(arr.value(0), "hi");
4771        let mut d_bytes = Decoder::Binary(
4772            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4773            Vec::with_capacity(DEFAULT_CAPACITY),
4774        );
4775        d_bytes
4776            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4777            .unwrap();
4778        let b_arr = d_bytes.flush(None).unwrap();
4779        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
4780        assert_eq!(barr.value(0), &[1, 2, 3]);
4781        let mut d_str_err = Decoder::String(
4782            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4783            Vec::with_capacity(DEFAULT_CAPACITY),
4784        );
4785        let err = d_str_err
4786            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
4787            .unwrap_err();
4788        assert!(
4789            err.to_string()
4790                .contains("Default for string must be string"),
4791            "unexpected error: {err:?}"
4792        );
4793    }
4794
4795    #[test]
4796    fn test_default_append_nullable_int32_null_and_value() {
4797        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4798        let mut dec = Decoder::Nullable(
4799            NullablePlan::ReadTag {
4800                nullability: Nullability::NullFirst,
4801                resolution: ResolutionPlan::Promotion(Promotion::Direct),
4802            },
4803            NullBufferBuilder::new(DEFAULT_CAPACITY),
4804            Box::new(inner),
4805        );
4806        dec.append_default(&AvroLiteral::Null).unwrap();
4807        dec.append_default(&AvroLiteral::Int(11)).unwrap();
4808        let arr = dec.flush(None).unwrap();
4809        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4810        assert_eq!(a.len(), 2);
4811        assert!(a.is_null(0));
4812        assert_eq!(a.value(1), 11);
4813    }
4814
4815    #[test]
4816    fn test_default_append_array_of_ints() {
4817        let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4818        let mut d = Decoder::try_new(&list_dt).unwrap();
4819        let items = vec![
4820            AvroLiteral::Int(1),
4821            AvroLiteral::Int(2),
4822            AvroLiteral::Int(3),
4823        ];
4824        d.append_default(&AvroLiteral::Array(items)).unwrap();
4825        let arr = d.flush(None).unwrap();
4826        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4827        assert_eq!(list.len(), 1);
4828        assert_eq!(list.value_length(0), 3);
4829        let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4830        assert_eq!(vals.values(), &[1, 2, 3]);
4831    }
4832
4833    #[test]
4834    fn test_default_append_map_string_to_int() {
4835        let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4836        let mut d = Decoder::try_new(&map_dt).unwrap();
4837        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4838        m.insert("k1".to_string(), AvroLiteral::Int(10));
4839        m.insert("k2".to_string(), AvroLiteral::Int(20));
4840        d.append_default(&AvroLiteral::Map(m)).unwrap();
4841        let arr = d.flush(None).unwrap();
4842        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4843        assert_eq!(map.len(), 1);
4844        assert_eq!(map.value_length(0), 2);
4845        let binding = map.value(0);
4846        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4847        let k = entries
4848            .column_by_name("key")
4849            .unwrap()
4850            .as_any()
4851            .downcast_ref::<StringArray>()
4852            .unwrap();
4853        let v = entries
4854            .column_by_name("value")
4855            .unwrap()
4856            .as_any()
4857            .downcast_ref::<Int32Array>()
4858            .unwrap();
4859        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4860        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4861        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4862        assert_eq!(vals, [10, 20].into_iter().collect());
4863    }
4864
4865    #[test]
4866    fn test_default_append_enum_by_symbol() {
4867        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4868        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4869        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4870        let arr = d.flush(None).unwrap();
4871        let dict = arr
4872            .as_any()
4873            .downcast_ref::<DictionaryArray<Int32Type>>()
4874            .unwrap();
4875        assert_eq!(dict.len(), 1);
4876        let expected = Int32Array::from(vec![1]);
4877        assert_eq!(dict.keys(), &expected);
4878        let values = dict
4879            .values()
4880            .as_any()
4881            .downcast_ref::<StringArray>()
4882            .unwrap();
4883        assert_eq!(values.value(1), "B");
4884    }
4885
4886    #[test]
4887    fn test_default_append_uuid_and_type_error() {
4888        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4889        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4890        d.append_default(&AvroLiteral::String(uuid_str.into()))
4891            .unwrap();
4892        let arr_ref = d.flush(None).unwrap();
4893        let arr = arr_ref
4894            .as_any()
4895            .downcast_ref::<FixedSizeBinaryArray>()
4896            .unwrap();
4897        assert_eq!(arr.value_length(), 16);
4898        assert_eq!(arr.len(), 1);
4899        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4900        let err = d2
4901            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4902            .unwrap_err();
4903        assert!(
4904            err.to_string().contains("Default for uuid must be string"),
4905            "unexpected error: {err:?}"
4906        );
4907    }
4908
4909    #[test]
4910    fn test_default_append_fixed_and_length_mismatch() {
4911        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4912        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4913            .unwrap();
4914        let arr_ref = d.flush(None).unwrap();
4915        let arr = arr_ref
4916            .as_any()
4917            .downcast_ref::<FixedSizeBinaryArray>()
4918            .unwrap();
4919        assert_eq!(arr.value_length(), 4);
4920        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4921        let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4922        let err = d_err
4923            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4924            .unwrap_err();
4925        assert!(
4926            err.to_string().contains("Fixed default length"),
4927            "unexpected error: {err:?}"
4928        );
4929    }
4930
4931    #[test]
4932    fn test_default_append_duration_and_length_validation() {
4933        let dt = avro_from_codec(Codec::Interval);
4934        let mut d = Decoder::try_new(&dt).unwrap();
4935        let mut bytes = Vec::with_capacity(12);
4936        bytes.extend_from_slice(&1u32.to_le_bytes());
4937        bytes.extend_from_slice(&2u32.to_le_bytes());
4938        bytes.extend_from_slice(&3u32.to_le_bytes());
4939        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4940        let arr_ref = d.flush(None).unwrap();
4941        let arr = arr_ref
4942            .as_any()
4943            .downcast_ref::<IntervalMonthDayNanoArray>()
4944            .unwrap();
4945        assert_eq!(arr.len(), 1);
4946        let v = arr.value(0);
4947        assert_eq!(v.months, 1);
4948        assert_eq!(v.days, 2);
4949        assert_eq!(v.nanoseconds, 3_000_000);
4950        let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4951        let err = d_err
4952            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4953            .unwrap_err();
4954        assert!(
4955            err.to_string()
4956                .contains("Duration default must be exactly 12 bytes"),
4957            "unexpected error: {err:?}"
4958        );
4959    }
4960
4961    #[test]
4962    fn test_default_append_decimal256_from_bytes() {
4963        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4964        let mut d = Decoder::try_new(&dt).unwrap();
4965        let pos: [u8; 32] = [
4966            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4967            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4968            0x00, 0x00, 0x30, 0x39,
4969        ];
4970        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4971        let neg: [u8; 32] = [
4972            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4973            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4974            0xFF, 0xFF, 0xFF, 0x85,
4975        ];
4976        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4977        let arr = d.flush(None).unwrap();
4978        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4979        assert_eq!(dec.len(), 2);
4980        assert_eq!(dec.value_as_string(0), "123.45");
4981        assert_eq!(dec.value_as_string(1), "-1.23");
4982    }
4983
4984    #[test]
4985    fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4986        let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4987        let mut rec = make_record_decoder_with_projector_defaults(
4988            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4989            field_defaults,
4990            vec![],
4991        );
4992        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4993        map.insert("a".to_string(), AvroLiteral::Int(7));
4994        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4995        let arr = rec.flush(None).unwrap();
4996        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4997        let a = s
4998            .column_by_name("a")
4999            .unwrap()
5000            .as_any()
5001            .downcast_ref::<Int32Array>()
5002            .unwrap();
5003        let b = s
5004            .column_by_name("b")
5005            .unwrap()
5006            .as_any()
5007            .downcast_ref::<StringArray>()
5008            .unwrap();
5009        assert_eq!(a.value(0), 7);
5010        assert_eq!(b.value(0), "hi");
5011    }
5012
5013    #[test]
5014    fn test_record_append_default_null_uses_projector_field_defaults() {
5015        let field_defaults = vec![
5016            Some(AvroLiteral::Int(5)),
5017            Some(AvroLiteral::String("x".into())),
5018        ];
5019        let mut rec = make_record_decoder_with_projector_defaults(
5020            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
5021            field_defaults,
5022            vec![],
5023        );
5024        rec.append_default(&AvroLiteral::Null).unwrap();
5025        let arr = rec.flush(None).unwrap();
5026        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5027        let a = s
5028            .column_by_name("a")
5029            .unwrap()
5030            .as_any()
5031            .downcast_ref::<Int32Array>()
5032            .unwrap();
5033        let b = s
5034            .column_by_name("b")
5035            .unwrap()
5036            .as_any()
5037            .downcast_ref::<StringArray>()
5038            .unwrap();
5039        assert_eq!(a.value(0), 5);
5040        assert_eq!(b.value(0), "x");
5041    }
5042
5043    #[test]
5044    fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
5045     {
5046        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
5047        let mut field_refs: Vec<FieldRef> = Vec::new();
5048        let mut encoders: Vec<Decoder> = Vec::new();
5049        for (name, dt, nullable) in &fields {
5050            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
5051        }
5052        let enc_a = Decoder::Nullable(
5053            NullablePlan::ReadTag {
5054                nullability: Nullability::NullSecond,
5055                resolution: ResolutionPlan::Promotion(Promotion::Direct),
5056            },
5057            NullBufferBuilder::new(DEFAULT_CAPACITY),
5058            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
5059        );
5060        let enc_b = Decoder::Nullable(
5061            NullablePlan::ReadTag {
5062                nullability: Nullability::NullSecond,
5063                resolution: ResolutionPlan::Promotion(Promotion::Direct),
5064            },
5065            NullBufferBuilder::new(DEFAULT_CAPACITY),
5066            Box::new(Decoder::String(
5067                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
5068                Vec::with_capacity(DEFAULT_CAPACITY),
5069            )),
5070        );
5071        encoders.push(enc_a);
5072        encoders.push(enc_b);
5073        let field_defaults = vec![None, None]; // no defaults -> append_null
5074        let projector = Projector {
5075            writer_projections: vec![],
5076            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
5077        };
5078        let mut rec = Decoder::Record(field_refs.into(), encoders, field_defaults, Some(projector));
5079        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
5080        map.insert("a".to_string(), AvroLiteral::Int(9));
5081        rec.append_default(&AvroLiteral::Map(map)).unwrap();
5082        let arr = rec.flush(None).unwrap();
5083        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5084        let a = s
5085            .column_by_name("a")
5086            .unwrap()
5087            .as_any()
5088            .downcast_ref::<Int32Array>()
5089            .unwrap();
5090        let b = s
5091            .column_by_name("b")
5092            .unwrap()
5093            .as_any()
5094            .downcast_ref::<StringArray>()
5095            .unwrap();
5096        assert!(a.is_valid(0));
5097        assert_eq!(a.value(0), 9);
5098        assert!(b.is_null(0));
5099    }
5100
5101    #[test]
5102    fn test_projector_default_injection_when_writer_lacks_fields() {
5103        let defaults = vec![None, None];
5104        let injections = vec![
5105            (0, AvroLiteral::Int(99)),
5106            (1, AvroLiteral::String("alice".into())),
5107        ];
5108        let mut rec = make_record_decoder_with_projector_defaults(
5109            &[
5110                ("id", DataType::Int32, false),
5111                ("name", DataType::Utf8, false),
5112            ],
5113            defaults,
5114            injections,
5115        );
5116        rec.decode(&mut AvroCursor::new(&[])).unwrap();
5117        let arr = rec.flush(None).unwrap();
5118        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5119        let id = s
5120            .column_by_name("id")
5121            .unwrap()
5122            .as_any()
5123            .downcast_ref::<Int32Array>()
5124            .unwrap();
5125        let name = s
5126            .column_by_name("name")
5127            .unwrap()
5128            .as_any()
5129            .downcast_ref::<StringArray>()
5130            .unwrap();
5131        assert_eq!(id.value(0), 99);
5132        assert_eq!(name.value(0), "alice");
5133    }
5134
5135    #[test]
5136    fn union_type_ids_are_not_child_indexes() {
5137        let encodings: Vec<AvroDataType> =
5138            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
5139        let fields: UnionFields = [
5140            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
5141            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
5142        ]
5143        .into_iter()
5144        .collect();
5145        let dt = avro_from_codec(Codec::Union(
5146            encodings.into(),
5147            fields.clone(),
5148            UnionMode::Dense,
5149        ));
5150        let mut dec = Decoder::try_new(&dt).expect("decoder");
5151        let mut b1 = encode_avro_long(1);
5152        b1.extend(encode_avro_bytes("hi".as_bytes()));
5153        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
5154        let mut b0 = encode_avro_long(0);
5155        b0.extend(encode_avro_int(5));
5156        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
5157        let arr = dec.flush(None).expect("flush");
5158        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
5159        assert_eq!(ua.len(), 2);
5160        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
5161        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
5162        assert_eq!(ua.value_offset(0), 0);
5163        assert_eq!(ua.value_offset(1), 0);
5164        let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
5165        assert_eq!(utf8_child.len(), 1);
5166        assert_eq!(utf8_child.value(0), "hi");
5167        let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
5168        assert_eq!(int_child.len(), 1);
5169        assert_eq!(int_child.value(0), 5);
5170        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
5171        assert_eq!(type_ids, vec![42_i8, 7_i8]);
5172    }
5173
5174    #[cfg(feature = "avro_custom_types")]
5175    #[test]
5176    fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
5177        for codec in [
5178            Codec::DurationNanos,
5179            Codec::DurationMicros,
5180            Codec::DurationMillis,
5181            Codec::DurationSeconds,
5182        ] {
5183            let dt = make_avro_dt(codec.clone(), None);
5184            let s = Skipper::from_avro(&dt)?;
5185            match s {
5186                Skipper::Int64 => {}
5187                other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
5188            }
5189        }
5190        Ok(())
5191    }
5192
5193    #[cfg(feature = "avro_custom_types")]
5194    #[test]
5195    fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
5196        let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
5197        for codec in [
5198            Codec::DurationNanos,
5199            Codec::DurationMicros,
5200            Codec::DurationMillis,
5201            Codec::DurationSeconds,
5202        ] {
5203            let dt = make_avro_dt(codec.clone(), None);
5204            let s = Skipper::from_avro(&dt)?;
5205            for &v in &values {
5206                let bytes = encode_avro_long(v);
5207                let mut cursor = AvroCursor::new(&bytes);
5208                s.skip(&mut cursor)?;
5209                assert_eq!(
5210                    cursor.position(),
5211                    bytes.len(),
5212                    "did not consume all bytes for {:?} value {}",
5213                    codec,
5214                    v
5215                );
5216            }
5217        }
5218        Ok(())
5219    }
5220
5221    #[cfg(feature = "avro_custom_types")]
5222    #[test]
5223    fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
5224        let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
5225        let s = Skipper::from_avro(&dt)?;
5226        match &s {
5227            Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
5228                Skipper::Int64 => {}
5229                ref other => panic!("expected inner Int64, got {:?}", other),
5230            },
5231            other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
5232        }
5233        {
5234            let buf = encode_vlq_u64(0);
5235            let mut cursor = AvroCursor::new(&buf);
5236            s.skip(&mut cursor)?;
5237            assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
5238        }
5239        {
5240            let mut buf = encode_vlq_u64(1);
5241            buf.extend(encode_avro_long(0));
5242            let mut cursor = AvroCursor::new(&buf);
5243            s.skip(&mut cursor)?;
5244            assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
5245        }
5246
5247        Ok(())
5248    }
5249
5250    #[cfg(feature = "avro_custom_types")]
5251    #[test]
5252    fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
5253        let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
5254        let s = Skipper::from_avro(&dt)?;
5255        match &s {
5256            Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
5257                Skipper::Int64 => {}
5258                ref other => panic!("expected inner Int64, got {:?}", other),
5259            },
5260            other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
5261        }
5262        {
5263            let buf = encode_vlq_u64(1);
5264            let mut cursor = AvroCursor::new(&buf);
5265            s.skip(&mut cursor)?;
5266            assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
5267        }
5268        {
5269            let mut buf = encode_vlq_u64(0);
5270            buf.extend(encode_avro_long(-1));
5271            let mut cursor = AvroCursor::new(&buf);
5272            s.skip(&mut cursor)?;
5273            assert_eq!(
5274                cursor.position(),
5275                1 + encode_avro_long(-1).len(),
5276                "expected to consume tag=0 + long(-1)"
5277            );
5278        }
5279        Ok(())
5280    }
5281
5282    #[test]
5283    fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
5284        let dt = make_avro_dt(Codec::Interval, None);
5285        let s = Skipper::from_avro(&dt)?;
5286        match s {
5287            Skipper::DurationFixed12 => {}
5288            other => panic!("expected DurationFixed12, got {:?}", other),
5289        }
5290        let payload = vec![0u8; 12];
5291        let mut cursor = AvroCursor::new(&payload);
5292        s.skip(&mut cursor)?;
5293        assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
5294        Ok(())
5295    }
5296
5297    #[cfg(feature = "avro_custom_types")]
5298    #[test]
5299    fn test_run_end_encoded_width16_int32_basic_grouping() {
5300        use arrow_array::RunArray;
5301        use std::sync::Arc;
5302        let inner = avro_from_codec(Codec::Int32);
5303        let ree = AvroDataType::new(
5304            Codec::RunEndEncoded(Arc::new(inner), 16),
5305            Default::default(),
5306            None,
5307        );
5308        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5309        for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
5310            let bytes = encode_avro_int(v);
5311            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5312        }
5313        let arr = dec.flush(None).expect("flush");
5314        let ra = arr
5315            .as_any()
5316            .downcast_ref::<RunArray<Int16Type>>()
5317            .expect("RunArray<Int16Type>");
5318        assert_eq!(ra.len(), 9);
5319        assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
5320        let vals = ra
5321            .values()
5322            .as_ref()
5323            .as_any()
5324            .downcast_ref::<Int32Array>()
5325            .expect("values Int32");
5326        assert_eq!(vals.values(), &[1, 2, 3]);
5327    }
5328
5329    #[cfg(feature = "avro_custom_types")]
5330    #[test]
5331    fn test_run_end_encoded_width32_nullable_values_group_nulls() {
5332        use arrow_array::RunArray;
5333        use std::sync::Arc;
5334        let inner = AvroDataType::new(
5335            Codec::Int32,
5336            Default::default(),
5337            Some(Nullability::NullSecond),
5338        );
5339        let ree = AvroDataType::new(
5340            Codec::RunEndEncoded(Arc::new(inner), 32),
5341            Default::default(),
5342            None,
5343        );
5344        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5345        let seq: [Option<i32>; 8] = [
5346            None,
5347            None,
5348            Some(7),
5349            Some(7),
5350            Some(7),
5351            None,
5352            Some(5),
5353            Some(5),
5354        ];
5355        for item in seq {
5356            let mut bytes = Vec::new();
5357            match item {
5358                None => bytes.extend_from_slice(&encode_vlq_u64(1)),
5359                Some(v) => {
5360                    bytes.extend_from_slice(&encode_vlq_u64(0));
5361                    bytes.extend_from_slice(&encode_avro_int(v));
5362                }
5363            }
5364            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5365        }
5366        let arr = dec.flush(None).expect("flush");
5367        let ra = arr
5368            .as_any()
5369            .downcast_ref::<RunArray<Int32Type>>()
5370            .expect("RunArray<Int32Type>");
5371        assert_eq!(ra.len(), 8);
5372        assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
5373        let vals = ra
5374            .values()
5375            .as_ref()
5376            .as_any()
5377            .downcast_ref::<Int32Array>()
5378            .expect("values Int32 (nullable)");
5379        assert_eq!(vals.len(), 4);
5380        assert!(vals.is_null(0));
5381        assert_eq!(vals.value(1), 7);
5382        assert!(vals.is_null(2));
5383        assert_eq!(vals.value(3), 5);
5384    }
5385
5386    #[cfg(feature = "avro_custom_types")]
5387    #[test]
5388    fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
5389        use arrow_array::RunArray;
5390        let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
5391        let ree = Decoder::RunEndEncoded(
5392            8, /* bytes => Int64 run-ends */
5393            0,
5394            Box::new(inner_values),
5395        );
5396        let mut dec = Decoder::Nullable(
5397            NullablePlan::FromSingle {
5398                resolution: ResolutionPlan::Promotion(Promotion::IntToDouble),
5399            },
5400            NullBufferBuilder::new(DEFAULT_CAPACITY),
5401            Box::new(ree),
5402        );
5403        for v in [1, 1, 2, 2, 2] {
5404            let bytes = encode_avro_int(v);
5405            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5406        }
5407        let arr = dec.flush(None).expect("flush");
5408        let ra = arr
5409            .as_any()
5410            .downcast_ref::<RunArray<Int64Type>>()
5411            .expect("RunArray<Int64Type>");
5412        assert_eq!(ra.len(), 5);
5413        assert_eq!(ra.run_ends().values(), &[2, 5]);
5414        let vals = ra
5415            .values()
5416            .as_ref()
5417            .as_any()
5418            .downcast_ref::<Float64Array>()
5419            .expect("values Float64");
5420        assert_eq!(vals.values(), &[1.0, 2.0]);
5421    }
5422
5423    #[cfg(feature = "avro_custom_types")]
5424    #[test]
5425    fn test_run_end_encoded_unsupported_run_end_width_errors() {
5426        use std::sync::Arc;
5427        let inner = avro_from_codec(Codec::Int32);
5428        let dt = AvroDataType::new(
5429            Codec::RunEndEncoded(Arc::new(inner), 3),
5430            Default::default(),
5431            None,
5432        );
5433        let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
5434        let msg = err.to_string();
5435        assert!(
5436            msg.contains("Unsupported run-end width")
5437                && msg.contains("16/32/64 bits or 2/4/8 bytes"),
5438            "unexpected error message: {msg}"
5439        );
5440    }
5441
5442    #[cfg(feature = "avro_custom_types")]
5443    #[test]
5444    fn test_run_end_encoded_empty_input_is_empty_runarray() {
5445        use arrow_array::RunArray;
5446        use std::sync::Arc;
5447        let inner = avro_from_codec(Codec::Utf8);
5448        let dt = AvroDataType::new(
5449            Codec::RunEndEncoded(Arc::new(inner), 4),
5450            Default::default(),
5451            None,
5452        );
5453        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5454        let arr = dec.flush(None).expect("flush");
5455        let ra = arr
5456            .as_any()
5457            .downcast_ref::<RunArray<Int32Type>>()
5458            .expect("RunArray<Int32Type>");
5459        assert_eq!(ra.len(), 0);
5460        assert_eq!(ra.run_ends().len(), 0);
5461        assert_eq!(ra.values().len(), 0);
5462    }
5463
5464    #[cfg(feature = "avro_custom_types")]
5465    #[test]
5466    fn test_run_end_encoded_strings_grouping_width32_bits() {
5467        use arrow_array::RunArray;
5468        use std::sync::Arc;
5469        let inner = avro_from_codec(Codec::Utf8);
5470        let dt = AvroDataType::new(
5471            Codec::RunEndEncoded(Arc::new(inner), 32),
5472            Default::default(),
5473            None,
5474        );
5475        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5476        for s in ["a", "a", "bb", "bb", "bb", "a"] {
5477            let bytes = encode_avro_bytes(s.as_bytes());
5478            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5479        }
5480        let arr = dec.flush(None).expect("flush");
5481        let ra = arr
5482            .as_any()
5483            .downcast_ref::<RunArray<Int32Type>>()
5484            .expect("RunArray<Int32Type>");
5485        assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
5486        let vals = ra
5487            .values()
5488            .as_ref()
5489            .as_any()
5490            .downcast_ref::<StringArray>()
5491            .expect("values String");
5492        assert_eq!(vals.len(), 3);
5493        assert_eq!(vals.value(0), "a");
5494        assert_eq!(vals.value(1), "bb");
5495        assert_eq!(vals.value(2), "a");
5496    }
5497
5498    #[cfg(not(feature = "avro_custom_types"))]
5499    #[test]
5500    fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
5501        let dt = avro_from_codec(Codec::Int32);
5502        let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
5503        for v in [1, 2, 3] {
5504            let bytes = encode_avro_int(v);
5505            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5506        }
5507        let arr = dec.flush(None).expect("flush");
5508        let a = arr
5509            .as_any()
5510            .downcast_ref::<Int32Array>()
5511            .expect("Int32Array");
5512        assert_eq!(a.values(), &[1, 2, 3]);
5513    }
5514
5515    #[test]
5516    fn test_timestamp_nanos_decoding_offset_zero() {
5517        let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::OffsetZero)));
5518        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5519        let mut data = Vec::new();
5520        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5521            data.extend_from_slice(&encode_avro_long(v));
5522        }
5523        let mut cur = AvroCursor::new(&data);
5524        for _ in 0..4 {
5525            decoder.decode(&mut cur).expect("decode nanos ts");
5526        }
5527        let array = decoder.flush(None).expect("flush nanos ts");
5528        let ts = array
5529            .as_any()
5530            .downcast_ref::<TimestampNanosecondArray>()
5531            .expect("TimestampNanosecondArray");
5532        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5533        match ts.data_type() {
5534            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5535                assert_eq!(tz.as_deref(), Some("+00:00"));
5536            }
5537            other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
5538        }
5539    }
5540
5541    #[test]
5542    fn test_timestamp_nanos_decoding_utc() {
5543        let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::Utc)));
5544        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5545        let mut data = Vec::new();
5546        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5547            data.extend_from_slice(&encode_avro_long(v));
5548        }
5549        let mut cur = AvroCursor::new(&data);
5550        for _ in 0..4 {
5551            decoder.decode(&mut cur).expect("decode nanos ts");
5552        }
5553        let array = decoder.flush(None).expect("flush nanos ts");
5554        let ts = array
5555            .as_any()
5556            .downcast_ref::<TimestampNanosecondArray>()
5557            .expect("TimestampNanosecondArray");
5558        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5559        match ts.data_type() {
5560            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5561                assert_eq!(tz.as_deref(), Some("UTC"));
5562            }
5563            other => panic!("expected Timestamp(Nanosecond, Some(\"UTC\")), got {other:?}"),
5564        }
5565    }
5566
5567    #[test]
5568    fn test_timestamp_nanos_decoding_local() {
5569        let avro_type = avro_from_codec(Codec::TimestampNanos(None));
5570        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5571        let mut data = Vec::new();
5572        for v in [10_i64, 20_i64, -30_i64] {
5573            data.extend_from_slice(&encode_avro_long(v));
5574        }
5575        let mut cur = AvroCursor::new(&data);
5576        for _ in 0..3 {
5577            decoder.decode(&mut cur).expect("decode nanos ts");
5578        }
5579        let array = decoder.flush(None).expect("flush nanos ts");
5580        let ts = array
5581            .as_any()
5582            .downcast_ref::<TimestampNanosecondArray>()
5583            .expect("TimestampNanosecondArray");
5584        assert_eq!(ts.values(), &[10, 20, -30]);
5585        match ts.data_type() {
5586            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5587                assert_eq!(tz.as_deref(), None);
5588            }
5589            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5590        }
5591    }
5592
5593    #[test]
5594    fn test_timestamp_nanos_decoding_with_nulls() {
5595        let avro_type = AvroDataType::new(
5596            Codec::TimestampNanos(None),
5597            Default::default(),
5598            Some(Nullability::NullFirst),
5599        );
5600        let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
5601        let mut data = Vec::new();
5602        data.extend_from_slice(&encode_avro_long(1));
5603        data.extend_from_slice(&encode_avro_long(42));
5604        data.extend_from_slice(&encode_avro_long(0));
5605        data.extend_from_slice(&encode_avro_long(1));
5606        data.extend_from_slice(&encode_avro_long(-7));
5607        let mut cur = AvroCursor::new(&data);
5608        for _ in 0..3 {
5609            decoder.decode(&mut cur).expect("decode nullable nanos ts");
5610        }
5611        let array = decoder.flush(None).expect("flush nullable nanos ts");
5612        let ts = array
5613            .as_any()
5614            .downcast_ref::<TimestampNanosecondArray>()
5615            .expect("TimestampNanosecondArray");
5616        assert_eq!(ts.len(), 3);
5617        assert!(ts.is_valid(0));
5618        assert!(ts.is_null(1));
5619        assert!(ts.is_valid(2));
5620        assert_eq!(ts.value(0), 42);
5621        assert_eq!(ts.value(2), -7);
5622        match ts.data_type() {
5623            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5624                assert_eq!(tz.as_deref(), None);
5625            }
5626            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5627        }
5628    }
5629}