Skip to main content

arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Decoder for Arrow types.
19
20use crate::codec::{
21    AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo, ResolvedField,
22    ResolvedRecord, ResolvedUnion, Tz,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36    DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37    Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use strum_macros::AsRefStr;
42use uuid::Uuid;
43
44use std::cmp::Ordering;
45use std::mem;
46use std::sync::Arc;
47
48const DEFAULT_CAPACITY: usize = 1024;
49
50/// Macro to decode a decimal payload for a given width and integer type.
51macro_rules! decode_decimal {
52    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
53        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
54        $builder.append_value(<$Int>::from_be_bytes(bytes));
55    }};
56}
57
58/// Macro to finish a decimal builder into an array with precision/scale and nulls.
59macro_rules! flush_decimal {
60    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
61        let (_, vals, _) = $builder.finish().into_parts();
62        let dec = <$ArrayTy>::try_new(vals, $nulls)?
63            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
64        Arc::new(dec) as ArrayRef
65    }};
66}
67
68/// Macro to append a default decimal value from two's-complement big-endian bytes
69/// into the corresponding decimal builder, with compile-time constructed error text.
70macro_rules! append_decimal_default {
71    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
72        match $lit {
73            AvroLiteral::Bytes(b) => {
74                let ext = sign_cast_to::<$N>(b)?;
75                let val = <$Int>::from_be_bytes(ext);
76                $builder.append_value(val);
77                Ok(())
78            }
79            _ => Err(AvroError::InvalidArgument(
80                concat!(
81                    "Default for ",
82                    $name,
83                    " must be bytes (two's-complement big-endian)"
84                )
85                .to_string(),
86            )),
87        }
88    }};
89}
90
91/// Decodes avro encoded data into [`RecordBatch`]
92#[derive(Debug)]
93pub(crate) struct RecordDecoder {
94    schema: SchemaRef,
95    fields: Vec<Decoder>,
96    projector: Option<Projector>,
97    row_count: usize,
98}
99
100impl RecordDecoder {
101    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
102    ///
103    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
104    ///
105    /// # Arguments
106    /// * `data_type` - The Avro data type to decode.
107    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
108    ///
109    /// # Errors
110    /// This function will return an error if the provided `data_type` is not a `Record`.
111    pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
112        match data_type.codec() {
113            Codec::Struct(reader_fields) => {
114                // Build Arrow schema fields and per-child decoders
115                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
116                let mut encodings = Vec::with_capacity(reader_fields.len());
117                let mut field_defaults = Vec::with_capacity(reader_fields.len());
118                for avro_field in reader_fields.iter() {
119                    arrow_fields.push(avro_field.field());
120                    encodings.push(Decoder::try_new(avro_field.data_type())?);
121
122                    if let Some(ResolutionInfo::DefaultValue(lit)) =
123                        avro_field.data_type().resolution.as_ref()
124                    {
125                        field_defaults.push(Some(lit.clone()));
126                    } else {
127                        field_defaults.push(None);
128                    }
129                }
130                let projector = match data_type.resolution.as_ref() {
131                    Some(ResolutionInfo::Record(rec)) => {
132                        Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
133                    }
134                    _ => None,
135                };
136                Ok(Self {
137                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
138                    fields: encodings,
139                    projector,
140                    row_count: 0,
141                })
142            }
143            other => Err(AvroError::ParseError(format!(
144                "Expected record got {other:?}"
145            ))),
146        }
147    }
148
149    /// Returns the decoder's `SchemaRef`
150    pub(crate) fn schema(&self) -> &SchemaRef {
151        &self.schema
152    }
153
154    /// Decode `count` records from `buf`
155    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
156        let mut cursor = AvroCursor::new(buf);
157        match self.projector.as_mut() {
158            Some(proj) => {
159                for _ in 0..count {
160                    proj.project_record(&mut cursor, &mut self.fields)?;
161                }
162            }
163            None => {
164                for _ in 0..count {
165                    for field in &mut self.fields {
166                        field.decode(&mut cursor)?;
167                    }
168                }
169            }
170        }
171        self.row_count += count;
172        Ok(cursor.position())
173    }
174
175    /// Flush the decoded records into a [`RecordBatch`]
176    pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
177        let arrays = self
178            .fields
179            .iter_mut()
180            .map(|x| x.flush(None))
181            .collect::<Result<Vec<_>, _>>()?;
182        let batch_options = RecordBatchOptions::new().with_row_count(Some(self.row_count));
183        self.row_count = 0;
184        RecordBatch::try_new_with_options(self.schema.clone(), arrays, &batch_options)
185            .map_err(Into::into)
186    }
187}
188
189#[derive(Debug, AsRefStr)]
190enum Decoder {
191    Null(usize),
192    Boolean(BooleanBufferBuilder),
193    Int32(Vec<i32>),
194    Int64(Vec<i64>),
195    #[cfg(feature = "avro_custom_types")]
196    DurationSecond(Vec<i64>),
197    #[cfg(feature = "avro_custom_types")]
198    DurationMillisecond(Vec<i64>),
199    #[cfg(feature = "avro_custom_types")]
200    DurationMicrosecond(Vec<i64>),
201    #[cfg(feature = "avro_custom_types")]
202    DurationNanosecond(Vec<i64>),
203    #[cfg(feature = "avro_custom_types")]
204    Int8(Vec<i8>),
205    #[cfg(feature = "avro_custom_types")]
206    Int16(Vec<i16>),
207    #[cfg(feature = "avro_custom_types")]
208    UInt8(Vec<u8>),
209    #[cfg(feature = "avro_custom_types")]
210    UInt16(Vec<u16>),
211    #[cfg(feature = "avro_custom_types")]
212    UInt32(Vec<u32>),
213    #[cfg(feature = "avro_custom_types")]
214    UInt64(Vec<u64>),
215    #[cfg(feature = "avro_custom_types")]
216    Float16(Vec<u16>), // Stored as raw IEEE-754 f16 bits
217    #[cfg(feature = "avro_custom_types")]
218    Date64(Vec<i64>),
219    #[cfg(feature = "avro_custom_types")]
220    TimeNanos(Vec<i64>),
221    #[cfg(feature = "avro_custom_types")]
222    Time32Secs(Vec<i32>),
223    #[cfg(feature = "avro_custom_types")]
224    TimestampSecs(bool, Vec<i64>),
225    #[cfg(feature = "avro_custom_types")]
226    IntervalYearMonth(Vec<i32>),
227    #[cfg(feature = "avro_custom_types")]
228    IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
229    #[cfg(feature = "avro_custom_types")]
230    IntervalDayTime(Vec<IntervalDayTime>),
231    Float32(Vec<f32>),
232    Float64(Vec<f64>),
233    Date32(Vec<i32>),
234    TimeMillis(Vec<i32>),
235    TimeMicros(Vec<i64>),
236    TimestampMillis(Option<Tz>, Vec<i64>),
237    TimestampMicros(Option<Tz>, Vec<i64>),
238    TimestampNanos(Option<Tz>, Vec<i64>),
239    Int32ToInt64(Vec<i64>),
240    Int32ToFloat32(Vec<f32>),
241    Int32ToFloat64(Vec<f64>),
242    Int64ToFloat32(Vec<f32>),
243    Int64ToFloat64(Vec<f64>),
244    Float32ToFloat64(Vec<f64>),
245    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
246    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
247    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
248    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
249    String(OffsetBufferBuilder<i32>, Vec<u8>),
250    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
251    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
252    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
253    Record(
254        Fields,
255        Vec<Decoder>,
256        Vec<Option<AvroLiteral>>,
257        Option<Projector>,
258    ),
259    Map(
260        FieldRef,
261        OffsetBufferBuilder<i32>,
262        OffsetBufferBuilder<i32>,
263        Vec<u8>,
264        Box<Decoder>,
265    ),
266    Fixed(i32, Vec<u8>),
267    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
268    Duration(IntervalMonthDayNanoBuilder),
269    Uuid(Vec<u8>),
270    #[cfg(feature = "small_decimals")]
271    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
272    #[cfg(feature = "small_decimals")]
273    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
274    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
275    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
276    #[cfg(feature = "avro_custom_types")]
277    RunEndEncoded(u8, usize, Box<Decoder>),
278    Union(UnionDecoder),
279    Nullable(NullablePlan, NullBufferBuilder, Box<Decoder>),
280}
281
282impl Decoder {
283    fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
284        if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
285            if info.writer_is_union && !info.reader_is_union {
286                let mut clone = data_type.clone();
287                clone.resolution = None; // Build target base decoder without Union resolution
288                let target = Self::try_new_internal(&clone)?;
289                let decoder = Self::Union(
290                    UnionDecoderBuilder::new()
291                        .with_resolved_union(info.clone())
292                        .with_target(target)
293                        .build()?,
294                );
295                return Ok(decoder);
296            }
297        }
298        Self::try_new_internal(data_type)
299    }
300
301    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
302        // Extract just the Promotion (if any) to simplify pattern matching
303        let promotion = match data_type.resolution.as_ref() {
304            Some(ResolutionInfo::Promotion(p)) => Some(*p),
305            _ => None,
306        };
307        let decoder = match (data_type.codec(), promotion) {
308            (Codec::Int64, Some(Promotion::IntToLong)) => {
309                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
310            }
311            (Codec::Float32, Some(Promotion::IntToFloat)) => {
312                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
313            }
314            (Codec::Float64, Some(Promotion::IntToDouble)) => {
315                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
316            }
317            (Codec::Float32, Some(Promotion::LongToFloat)) => {
318                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
319            }
320            (Codec::Float64, Some(Promotion::LongToDouble)) => {
321                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
322            }
323            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
324                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
325            }
326            (Codec::Utf8, Some(Promotion::BytesToString))
327            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
328                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
329                Vec::with_capacity(DEFAULT_CAPACITY),
330            ),
331            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
332                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
333                Vec::with_capacity(DEFAULT_CAPACITY),
334            ),
335            (Codec::Null, _) => Self::Null(0),
336            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
337            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
338            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
339            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
340            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
341            (Codec::Binary, _) => Self::Binary(
342                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
343                Vec::with_capacity(DEFAULT_CAPACITY),
344            ),
345            (Codec::Utf8, _) => Self::String(
346                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
347                Vec::with_capacity(DEFAULT_CAPACITY),
348            ),
349            (Codec::Utf8View, _) => Self::StringView(
350                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
351                Vec::with_capacity(DEFAULT_CAPACITY),
352            ),
353            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
354            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
355            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
356            (Codec::TimestampMillis(tz), _) => {
357                Self::TimestampMillis(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
358            }
359            (Codec::TimestampMicros(tz), _) => {
360                Self::TimestampMicros(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
361            }
362            (Codec::TimestampNanos(tz), _) => {
363                Self::TimestampNanos(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
364            }
365            #[cfg(feature = "avro_custom_types")]
366            (Codec::DurationNanos, _) => {
367                Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
368            }
369            #[cfg(feature = "avro_custom_types")]
370            (Codec::DurationMicros, _) => {
371                Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
372            }
373            #[cfg(feature = "avro_custom_types")]
374            (Codec::DurationMillis, _) => {
375                Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
376            }
377            #[cfg(feature = "avro_custom_types")]
378            (Codec::DurationSeconds, _) => {
379                Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
380            }
381            #[cfg(feature = "avro_custom_types")]
382            (Codec::Int8, _) => Self::Int8(Vec::with_capacity(DEFAULT_CAPACITY)),
383            #[cfg(feature = "avro_custom_types")]
384            (Codec::Int16, _) => Self::Int16(Vec::with_capacity(DEFAULT_CAPACITY)),
385            #[cfg(feature = "avro_custom_types")]
386            (Codec::UInt8, _) => Self::UInt8(Vec::with_capacity(DEFAULT_CAPACITY)),
387            #[cfg(feature = "avro_custom_types")]
388            (Codec::UInt16, _) => Self::UInt16(Vec::with_capacity(DEFAULT_CAPACITY)),
389            #[cfg(feature = "avro_custom_types")]
390            (Codec::UInt32, _) => Self::UInt32(Vec::with_capacity(DEFAULT_CAPACITY)),
391            #[cfg(feature = "avro_custom_types")]
392            (Codec::UInt64, _) => Self::UInt64(Vec::with_capacity(DEFAULT_CAPACITY)),
393            #[cfg(feature = "avro_custom_types")]
394            (Codec::Float16, _) => Self::Float16(Vec::with_capacity(DEFAULT_CAPACITY)),
395            #[cfg(feature = "avro_custom_types")]
396            (Codec::Date64, _) => Self::Date64(Vec::with_capacity(DEFAULT_CAPACITY)),
397            #[cfg(feature = "avro_custom_types")]
398            (Codec::TimeNanos, _) => Self::TimeNanos(Vec::with_capacity(DEFAULT_CAPACITY)),
399            #[cfg(feature = "avro_custom_types")]
400            (Codec::Time32Secs, _) => Self::Time32Secs(Vec::with_capacity(DEFAULT_CAPACITY)),
401            #[cfg(feature = "avro_custom_types")]
402            (Codec::TimestampSecs(is_utc), _) => {
403                Self::TimestampSecs(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
404            }
405            #[cfg(feature = "avro_custom_types")]
406            (Codec::IntervalYearMonth, _) => {
407                Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
408            }
409            #[cfg(feature = "avro_custom_types")]
410            (Codec::IntervalMonthDayNano, _) => {
411                Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
412            }
413            #[cfg(feature = "avro_custom_types")]
414            (Codec::IntervalDayTime, _) => {
415                Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
416            }
417            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
418            (Codec::Decimal(precision, scale, size), _) => {
419                let p = *precision;
420                let s = *scale;
421                let prec = p as u8;
422                let scl = s.unwrap_or(0) as i8;
423                #[cfg(feature = "small_decimals")]
424                {
425                    if p <= DECIMAL32_MAX_PRECISION as usize {
426                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
427                            .with_precision_and_scale(prec, scl)?;
428                        Self::Decimal32(p, s, *size, builder)
429                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
430                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
431                            .with_precision_and_scale(prec, scl)?;
432                        Self::Decimal64(p, s, *size, builder)
433                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
434                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
435                            .with_precision_and_scale(prec, scl)?;
436                        Self::Decimal128(p, s, *size, builder)
437                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
438                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
439                            .with_precision_and_scale(prec, scl)?;
440                        Self::Decimal256(p, s, *size, builder)
441                    } else {
442                        return Err(AvroError::ParseError(format!(
443                            "Decimal precision {p} exceeds maximum supported"
444                        )));
445                    }
446                }
447                #[cfg(not(feature = "small_decimals"))]
448                {
449                    if p <= DECIMAL128_MAX_PRECISION as usize {
450                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
451                            .with_precision_and_scale(prec, scl)?;
452                        Self::Decimal128(p, s, *size, builder)
453                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
454                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
455                            .with_precision_and_scale(prec, scl)?;
456                        Self::Decimal256(p, s, *size, builder)
457                    } else {
458                        return Err(AvroError::ParseError(format!(
459                            "Decimal precision {p} exceeds maximum supported"
460                        )));
461                    }
462                }
463            }
464            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
465            (Codec::List(item), _) => {
466                let decoder = Self::try_new(item)?;
467                Self::Array(
468                    Arc::new(item.field_with_name("item")),
469                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
470                    Box::new(decoder),
471                )
472            }
473            (Codec::Enum(symbols), _) => {
474                let res = match data_type.resolution.as_ref() {
475                    Some(ResolutionInfo::EnumMapping(mapping)) => {
476                        Some(EnumResolution::new(mapping))
477                    }
478                    _ => None,
479                };
480                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
481            }
482            (Codec::Struct(fields), _) => {
483                let mut arrow_fields = Vec::with_capacity(fields.len());
484                let mut encodings = Vec::with_capacity(fields.len());
485                let mut field_defaults = Vec::with_capacity(fields.len());
486                for avro_field in fields.iter() {
487                    let encoding = Self::try_new(avro_field.data_type())?;
488                    arrow_fields.push(avro_field.field());
489                    encodings.push(encoding);
490
491                    if let Some(ResolutionInfo::DefaultValue(lit)) =
492                        avro_field.data_type().resolution.as_ref()
493                    {
494                        field_defaults.push(Some(lit.clone()));
495                    } else {
496                        field_defaults.push(None);
497                    }
498                }
499                let projector =
500                    if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
501                        Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
502                    } else {
503                        None
504                    };
505                Self::Record(arrow_fields.into(), encodings, field_defaults, projector)
506            }
507            (Codec::Map(child), _) => {
508                let val_field = child.field_with_name("value");
509                let map_field = Arc::new(ArrowField::new(
510                    "entries",
511                    DataType::Struct(Fields::from(vec![
512                        ArrowField::new("key", DataType::Utf8, false),
513                        val_field,
514                    ])),
515                    false,
516                ));
517                let val_dec = Self::try_new(child)?;
518                Self::Map(
519                    map_field,
520                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
521                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
522                    Vec::with_capacity(DEFAULT_CAPACITY),
523                    Box::new(val_dec),
524                )
525            }
526            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
527            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
528                let decoders = encodings
529                    .iter()
530                    .map(Self::try_new_internal)
531                    .collect::<Result<Vec<_>, _>>()?;
532                if fields.len() != decoders.len() {
533                    return Err(AvroError::SchemaError(format!(
534                        "Union has {} fields but {} decoders",
535                        fields.len(),
536                        decoders.len()
537                    )));
538                }
539                // Proactive guard: if a user provides a union with more branches than
540                // a 32-bit Avro index can address, fail fast with a clear message.
541                let branch_count = decoders.len();
542                let max_addr = (i32::MAX as usize) + 1;
543                if branch_count > max_addr {
544                    return Err(AvroError::SchemaError(format!(
545                        "Union has {branch_count} branches, which exceeds the maximum addressable \
546                         branches by an Avro int tag ({} + 1).",
547                        i32::MAX
548                    )));
549                }
550                let mut builder = UnionDecoderBuilder::new()
551                    .with_fields(fields.clone())
552                    .with_branches(decoders);
553                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
554                    if info.reader_is_union {
555                        builder = builder.with_resolved_union(info.clone());
556                    }
557                }
558                Self::Union(builder.build()?)
559            }
560            (Codec::Union(_, _, _), _) => {
561                return Err(AvroError::NYI(
562                    "Sparse Arrow unions are not yet supported".to_string(),
563                ));
564            }
565            #[cfg(feature = "avro_custom_types")]
566            (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
567                let inner = Self::try_new(values_dt)?;
568                let byte_width: u8 = match *width_bits_or_bytes {
569                    2 | 4 | 8 => *width_bits_or_bytes,
570                    16 => 2,
571                    32 => 4,
572                    64 => 8,
573                    other => {
574                        return Err(AvroError::InvalidArgument(format!(
575                            "Unsupported run-end width {other} for RunEndEncoded; \
576                             expected 16/32/64 bits or 2/4/8 bytes"
577                        )));
578                    }
579                };
580                Self::RunEndEncoded(byte_width, 0, Box::new(inner))
581            }
582        };
583        Ok(match data_type.nullability() {
584            Some(nullability) => {
585                // Default to reading a union branch tag unless the resolution directs otherwise.
586                let plan = match &data_type.resolution {
587                    None => NullablePlan::ReadTag {
588                        nullability,
589                        resolution: ResolutionPlan::Promotion(Promotion::Direct),
590                    },
591                    Some(ResolutionInfo::Promotion(_)) => {
592                        // Promotions should have been incorporated
593                        // into the inner decoder.
594                        NullablePlan::FromSingle {
595                            resolution: ResolutionPlan::Promotion(Promotion::Direct),
596                        }
597                    }
598                    Some(ResolutionInfo::Union(info)) if !info.writer_is_union => {
599                        let Some(Some((_, resolution))) = info.writer_to_reader.first() else {
600                            return Err(AvroError::SchemaError(
601                                "unexpected union resolution info for non-union writer and union reader type".into(),
602                            ));
603                        };
604                        let resolution = ResolutionPlan::try_new(&decoder, resolution)?;
605                        NullablePlan::FromSingle { resolution }
606                    }
607                    Some(ResolutionInfo::Union(info)) => {
608                        let Some((_, resolution)) =
609                            info.writer_to_reader[nullability.non_null_index()].as_ref()
610                        else {
611                            return Err(AvroError::SchemaError(
612                                "unexpected union resolution info for nullable writer type".into(),
613                            ));
614                        };
615                        NullablePlan::ReadTag {
616                            nullability,
617                            resolution: ResolutionPlan::try_new(&decoder, resolution)?,
618                        }
619                    }
620                    Some(resolution) => NullablePlan::FromSingle {
621                        resolution: ResolutionPlan::try_new(&decoder, resolution)?,
622                    },
623                };
624                Self::Nullable(
625                    plan,
626                    NullBufferBuilder::new(DEFAULT_CAPACITY),
627                    Box::new(decoder),
628                )
629            }
630            None => decoder,
631        })
632    }
633
634    /// Append a null record
635    fn append_null(&mut self) -> Result<(), AvroError> {
636        match self {
637            Self::Null(count) => *count += 1,
638            Self::Boolean(b) => b.append(false),
639            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
640            Self::Int64(v)
641            | Self::Int32ToInt64(v)
642            | Self::TimeMicros(v)
643            | Self::TimestampMillis(_, v)
644            | Self::TimestampMicros(_, v)
645            | Self::TimestampNanos(_, v) => v.push(0),
646            #[cfg(feature = "avro_custom_types")]
647            Self::DurationSecond(v)
648            | Self::DurationMillisecond(v)
649            | Self::DurationMicrosecond(v)
650            | Self::DurationNanosecond(v) => v.push(0),
651            #[cfg(feature = "avro_custom_types")]
652            Self::Int8(v) => v.push(0),
653            #[cfg(feature = "avro_custom_types")]
654            Self::Int16(v) => v.push(0),
655            #[cfg(feature = "avro_custom_types")]
656            Self::UInt8(v) => v.push(0),
657            #[cfg(feature = "avro_custom_types")]
658            Self::UInt16(v) => v.push(0),
659            #[cfg(feature = "avro_custom_types")]
660            Self::UInt32(v) => v.push(0),
661            #[cfg(feature = "avro_custom_types")]
662            Self::UInt64(v) => v.push(0),
663            #[cfg(feature = "avro_custom_types")]
664            Self::Float16(v) => v.push(0),
665            #[cfg(feature = "avro_custom_types")]
666            Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => v.push(0),
667            #[cfg(feature = "avro_custom_types")]
668            Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
669            #[cfg(feature = "avro_custom_types")]
670            Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
671            #[cfg(feature = "avro_custom_types")]
672            Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
673            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
674            Self::Float64(v)
675            | Self::Int32ToFloat64(v)
676            | Self::Int64ToFloat64(v)
677            | Self::Float32ToFloat64(v) => v.push(0.),
678            Self::Binary(offsets, _)
679            | Self::String(offsets, _)
680            | Self::StringView(offsets, _)
681            | Self::BytesToString(offsets, _)
682            | Self::StringToBytes(offsets, _) => {
683                offsets.push_length(0);
684            }
685            Self::Uuid(v) => {
686                v.extend([0; 16]);
687            }
688            Self::Array(_, offsets, _) => {
689                offsets.push_length(0);
690            }
691            Self::Record(_, e, _, _) => {
692                for encoding in e.iter_mut() {
693                    encoding.append_null()?;
694                }
695            }
696            Self::Map(_, _koff, moff, _, _) => {
697                moff.push_length(0);
698            }
699            Self::Fixed(sz, accum) => {
700                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
701            }
702            #[cfg(feature = "small_decimals")]
703            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
704            #[cfg(feature = "small_decimals")]
705            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
706            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
707            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
708            Self::Enum(indices, _, _) => indices.push(0),
709            Self::Duration(builder) => builder.append_null(),
710            #[cfg(feature = "avro_custom_types")]
711            Self::RunEndEncoded(_, len, inner) => {
712                *len += 1;
713                inner.append_null()?;
714            }
715            Self::Union(u) => u.append_null()?,
716            Self::Nullable(_, null_buffer, inner) => {
717                null_buffer.append(false);
718                inner.append_null()?;
719            }
720        }
721        Ok(())
722    }
723
724    /// Append a single default literal into the decoder's buffers
725    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
726        match self {
727            Self::Nullable(_, nb, inner) => {
728                if matches!(lit, AvroLiteral::Null) {
729                    nb.append(false);
730                    inner.append_null()
731                } else {
732                    nb.append(true);
733                    inner.append_default(lit)
734                }
735            }
736            Self::Null(count) => match lit {
737                AvroLiteral::Null => {
738                    *count += 1;
739                    Ok(())
740                }
741                _ => Err(AvroError::InvalidArgument(
742                    "Non-null default for null type".to_string(),
743                )),
744            },
745            Self::Boolean(b) => match lit {
746                AvroLiteral::Boolean(v) => {
747                    b.append(*v);
748                    Ok(())
749                }
750                _ => Err(AvroError::InvalidArgument(
751                    "Default for boolean must be boolean".to_string(),
752                )),
753            },
754            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
755                AvroLiteral::Int(i) => {
756                    v.push(*i);
757                    Ok(())
758                }
759                _ => Err(AvroError::InvalidArgument(
760                    "Default for int32/date32/time-millis must be int".to_string(),
761                )),
762            },
763            #[cfg(feature = "avro_custom_types")]
764            Self::DurationSecond(v)
765            | Self::DurationMillisecond(v)
766            | Self::DurationMicrosecond(v)
767            | Self::DurationNanosecond(v) => match lit {
768                AvroLiteral::Long(i) => {
769                    v.push(*i);
770                    Ok(())
771                }
772                _ => Err(AvroError::InvalidArgument(
773                    "Default for duration long must be long".to_string(),
774                )),
775            },
776            #[cfg(feature = "avro_custom_types")]
777            Self::Int8(v) => match lit {
778                AvroLiteral::Int(i) => {
779                    let x = i8::try_from(*i).map_err(|_| {
780                        AvroError::InvalidArgument(format!(
781                            "Default for int8 out of range for i8: {i}"
782                        ))
783                    })?;
784                    v.push(x);
785                    Ok(())
786                }
787                _ => Err(AvroError::InvalidArgument(
788                    "Default for int8 must be int".to_string(),
789                )),
790            },
791            #[cfg(feature = "avro_custom_types")]
792            Self::Int16(v) => match lit {
793                AvroLiteral::Int(i) => {
794                    let x = i16::try_from(*i).map_err(|_| {
795                        AvroError::InvalidArgument(format!(
796                            "Default for int16 out of range for i16: {i}"
797                        ))
798                    })?;
799                    v.push(x);
800                    Ok(())
801                }
802                _ => Err(AvroError::InvalidArgument(
803                    "Default for int16 must be int".to_string(),
804                )),
805            },
806            #[cfg(feature = "avro_custom_types")]
807            Self::UInt8(v) => match lit {
808                AvroLiteral::Int(i) => {
809                    let x = u8::try_from(*i).map_err(|_| {
810                        AvroError::InvalidArgument(format!(
811                            "Default for uint8 out of range for u8: {i}"
812                        ))
813                    })?;
814                    v.push(x);
815                    Ok(())
816                }
817                _ => Err(AvroError::InvalidArgument(
818                    "Default for uint8 must be int".to_string(),
819                )),
820            },
821            #[cfg(feature = "avro_custom_types")]
822            Self::UInt16(v) => match lit {
823                AvroLiteral::Int(i) => {
824                    let x = u16::try_from(*i).map_err(|_| {
825                        AvroError::InvalidArgument(format!(
826                            "Default for uint16 out of range for u16: {i}"
827                        ))
828                    })?;
829                    v.push(x);
830                    Ok(())
831                }
832                _ => Err(AvroError::InvalidArgument(
833                    "Default for uint16 must be int".to_string(),
834                )),
835            },
836            #[cfg(feature = "avro_custom_types")]
837            Self::UInt32(v) => match lit {
838                AvroLiteral::Long(i) => {
839                    let x = u32::try_from(*i).map_err(|_| {
840                        AvroError::InvalidArgument(format!(
841                            "Default for uint32 out of range for u32: {i}"
842                        ))
843                    })?;
844                    v.push(x);
845                    Ok(())
846                }
847                _ => Err(AvroError::InvalidArgument(
848                    "Default for uint32 must be long".to_string(),
849                )),
850            },
851            #[cfg(feature = "avro_custom_types")]
852            Self::UInt64(v) => match lit {
853                AvroLiteral::Bytes(b) => {
854                    if b.len() != 8 {
855                        return Err(AvroError::InvalidArgument(format!(
856                            "uint64 default must be exactly 8 bytes, got {}",
857                            b.len()
858                        )));
859                    }
860                    v.push(u64::from_le_bytes([
861                        b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
862                    ]));
863                    Ok(())
864                }
865                _ => Err(AvroError::InvalidArgument(
866                    "Default for uint64 must be bytes (8-byte LE)".to_string(),
867                )),
868            },
869            #[cfg(feature = "avro_custom_types")]
870            Self::Float16(v) => match lit {
871                AvroLiteral::Bytes(b) => {
872                    if b.len() != 2 {
873                        return Err(AvroError::InvalidArgument(format!(
874                            "float16 default must be exactly 2 bytes, got {}",
875                            b.len()
876                        )));
877                    }
878                    v.push(u16::from_le_bytes([b[0], b[1]]));
879                    Ok(())
880                }
881                _ => Err(AvroError::InvalidArgument(
882                    "Default for float16 must be bytes (2-byte LE IEEE-754)".to_string(),
883                )),
884            },
885            #[cfg(feature = "avro_custom_types")]
886            Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => match lit {
887                AvroLiteral::Long(i) => {
888                    v.push(*i);
889                    Ok(())
890                }
891                _ => Err(AvroError::InvalidArgument(
892                    "Default for date64/time-nanos/timestamp-secs must be long".to_string(),
893                )),
894            },
895            #[cfg(feature = "avro_custom_types")]
896            Self::Time32Secs(v) => match lit {
897                AvroLiteral::Int(i) => {
898                    v.push(*i);
899                    Ok(())
900                }
901                _ => Err(AvroError::InvalidArgument(
902                    "Default for time32-secs must be int".to_string(),
903                )),
904            },
905            #[cfg(feature = "avro_custom_types")]
906            Self::IntervalYearMonth(v) => match lit {
907                AvroLiteral::Bytes(b) => {
908                    if b.len() != 4 {
909                        return Err(AvroError::InvalidArgument(format!(
910                            "interval-year-month default must be exactly 4 bytes, got {}",
911                            b.len()
912                        )));
913                    }
914                    v.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
915                    Ok(())
916                }
917                _ => Err(AvroError::InvalidArgument(
918                    "Default for interval-year-month must be bytes (4-byte LE)".to_string(),
919                )),
920            },
921            #[cfg(feature = "avro_custom_types")]
922            Self::IntervalMonthDayNano(v) => match lit {
923                AvroLiteral::Bytes(b) => {
924                    if b.len() != 16 {
925                        return Err(AvroError::InvalidArgument(format!(
926                            "interval-month-day-nano default must be exactly 16 bytes, got {}",
927                            b.len()
928                        )));
929                    }
930                    let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
931                    let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
932                    let nanos =
933                        i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
934                    v.push(IntervalMonthDayNano::new(months, days, nanos));
935                    Ok(())
936                }
937                _ => Err(AvroError::InvalidArgument(
938                    "Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
939                )),
940            },
941            #[cfg(feature = "avro_custom_types")]
942            Self::IntervalDayTime(v) => match lit {
943                AvroLiteral::Bytes(b) => {
944                    if b.len() != 8 {
945                        return Err(AvroError::InvalidArgument(format!(
946                            "interval-day-time default must be exactly 8 bytes, got {}",
947                            b.len()
948                        )));
949                    }
950                    let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
951                    let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
952                    v.push(IntervalDayTime::new(days, milliseconds));
953                    Ok(())
954                }
955                _ => Err(AvroError::InvalidArgument(
956                    "Default for interval-day-time must be bytes (8-byte LE)".to_string(),
957                )),
958            },
959            Self::Int64(v)
960            | Self::Int32ToInt64(v)
961            | Self::TimeMicros(v)
962            | Self::TimestampMillis(_, v)
963            | Self::TimestampMicros(_, v)
964            | Self::TimestampNanos(_, v) => match lit {
965                AvroLiteral::Long(i) => {
966                    v.push(*i);
967                    Ok(())
968                }
969                AvroLiteral::Int(i) => {
970                    v.push(*i as i64);
971                    Ok(())
972                }
973                _ => Err(AvroError::InvalidArgument(
974                    "Default for long/time-micros/timestamp must be long or int".to_string(),
975                )),
976            },
977            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
978                AvroLiteral::Float(f) => {
979                    v.push(*f);
980                    Ok(())
981                }
982                _ => Err(AvroError::InvalidArgument(
983                    "Default for float must be float".to_string(),
984                )),
985            },
986            Self::Float64(v)
987            | Self::Int32ToFloat64(v)
988            | Self::Int64ToFloat64(v)
989            | Self::Float32ToFloat64(v) => match lit {
990                AvroLiteral::Double(f) => {
991                    v.push(*f);
992                    Ok(())
993                }
994                _ => Err(AvroError::InvalidArgument(
995                    "Default for double must be double".to_string(),
996                )),
997            },
998            Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
999                AvroLiteral::Bytes(b) => {
1000                    offsets.push_length(b.len());
1001                    values.extend_from_slice(b);
1002                    Ok(())
1003                }
1004                _ => Err(AvroError::InvalidArgument(
1005                    "Default for bytes must be bytes".to_string(),
1006                )),
1007            },
1008            Self::BytesToString(offsets, values)
1009            | Self::String(offsets, values)
1010            | Self::StringView(offsets, values) => match lit {
1011                AvroLiteral::String(s) => {
1012                    let b = s.as_bytes();
1013                    offsets.push_length(b.len());
1014                    values.extend_from_slice(b);
1015                    Ok(())
1016                }
1017                _ => Err(AvroError::InvalidArgument(
1018                    "Default for string must be string".to_string(),
1019                )),
1020            },
1021            Self::Uuid(values) => match lit {
1022                AvroLiteral::String(s) => {
1023                    let uuid = Uuid::try_parse(s).map_err(|e| {
1024                        AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
1025                    })?;
1026                    values.extend_from_slice(uuid.as_bytes());
1027                    Ok(())
1028                }
1029                _ => Err(AvroError::InvalidArgument(
1030                    "Default for uuid must be string".to_string(),
1031                )),
1032            },
1033            Self::Fixed(sz, accum) => match lit {
1034                AvroLiteral::Bytes(b) => {
1035                    if b.len() != *sz as usize {
1036                        return Err(AvroError::InvalidArgument(format!(
1037                            "Fixed default length {} does not match size {sz}",
1038                            b.len(),
1039                        )));
1040                    }
1041                    accum.extend_from_slice(b);
1042                    Ok(())
1043                }
1044                _ => Err(AvroError::InvalidArgument(
1045                    "Default for fixed must be bytes".to_string(),
1046                )),
1047            },
1048            #[cfg(feature = "small_decimals")]
1049            Self::Decimal32(_, _, _, builder) => {
1050                append_decimal_default!(lit, builder, 4, i32, "decimal32")
1051            }
1052            #[cfg(feature = "small_decimals")]
1053            Self::Decimal64(_, _, _, builder) => {
1054                append_decimal_default!(lit, builder, 8, i64, "decimal64")
1055            }
1056            Self::Decimal128(_, _, _, builder) => {
1057                append_decimal_default!(lit, builder, 16, i128, "decimal128")
1058            }
1059            Self::Decimal256(_, _, _, builder) => {
1060                append_decimal_default!(lit, builder, 32, i256, "decimal256")
1061            }
1062            Self::Duration(builder) => match lit {
1063                AvroLiteral::Bytes(b) => {
1064                    if b.len() != 12 {
1065                        return Err(AvroError::InvalidArgument(format!(
1066                            "Duration default must be exactly 12 bytes, got {}",
1067                            b.len()
1068                        )));
1069                    }
1070                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1071                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1072                    let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1073                    let nanos = (millis as i64) * 1_000_000;
1074                    builder.append_value(IntervalMonthDayNano::new(
1075                        months as i32,
1076                        days as i32,
1077                        nanos,
1078                    ));
1079                    Ok(())
1080                }
1081                _ => Err(AvroError::InvalidArgument(
1082                    "Default for duration must be 12-byte little-endian months/days/millis"
1083                        .to_string(),
1084                )),
1085            },
1086            Self::Array(_, offsets, inner) => match lit {
1087                AvroLiteral::Array(items) => {
1088                    offsets.push_length(items.len());
1089                    for item in items {
1090                        inner.append_default(item)?;
1091                    }
1092                    Ok(())
1093                }
1094                _ => Err(AvroError::InvalidArgument(
1095                    "Default for array must be an array literal".to_string(),
1096                )),
1097            },
1098            Self::Map(_, koff, moff, kdata, valdec) => match lit {
1099                AvroLiteral::Map(entries) => {
1100                    moff.push_length(entries.len());
1101                    for (k, v) in entries {
1102                        let kb = k.as_bytes();
1103                        koff.push_length(kb.len());
1104                        kdata.extend_from_slice(kb);
1105                        valdec.append_default(v)?;
1106                    }
1107                    Ok(())
1108                }
1109                _ => Err(AvroError::InvalidArgument(
1110                    "Default for map must be a map/object literal".to_string(),
1111                )),
1112            },
1113            Self::Enum(indices, symbols, _) => match lit {
1114                AvroLiteral::Enum(sym) => {
1115                    let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
1116                        AvroError::InvalidArgument(format!(
1117                            "Enum default symbol {sym:?} not in reader symbols"
1118                        ))
1119                    })?;
1120                    indices.push(pos as i32);
1121                    Ok(())
1122                }
1123                _ => Err(AvroError::InvalidArgument(
1124                    "Default for enum must be a symbol".to_string(),
1125                )),
1126            },
1127            #[cfg(feature = "avro_custom_types")]
1128            Self::RunEndEncoded(_, len, inner) => {
1129                *len += 1;
1130                inner.append_default(lit)
1131            }
1132            Self::Union(u) => u.append_default(lit),
1133            Self::Record(field_meta, decoders, field_defaults, _) => match lit {
1134                AvroLiteral::Map(entries) => {
1135                    for (i, dec) in decoders.iter_mut().enumerate() {
1136                        let name = field_meta[i].name();
1137                        if let Some(sub) = entries.get(name) {
1138                            dec.append_default(sub)?;
1139                        } else if let Some(default_literal) = field_defaults[i].as_ref() {
1140                            dec.append_default(default_literal)?;
1141                        } else {
1142                            dec.append_null()?;
1143                        }
1144                    }
1145                    Ok(())
1146                }
1147                AvroLiteral::Null => {
1148                    for (i, dec) in decoders.iter_mut().enumerate() {
1149                        if let Some(default_literal) = field_defaults[i].as_ref() {
1150                            dec.append_default(default_literal)?;
1151                        } else {
1152                            dec.append_null()?;
1153                        }
1154                    }
1155                    Ok(())
1156                }
1157                _ => Err(AvroError::InvalidArgument(
1158                    "Default for record must be a map/object or null".to_string(),
1159                )),
1160            },
1161        }
1162    }
1163
1164    /// Decode a single record from `buf`
1165    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1166        match self {
1167            Self::Null(x) => *x += 1,
1168            Self::Boolean(values) => values.append(buf.get_bool()?),
1169            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
1170                values.push(buf.get_int()?)
1171            }
1172            Self::Int64(values)
1173            | Self::TimeMicros(values)
1174            | Self::TimestampMillis(_, values)
1175            | Self::TimestampMicros(_, values)
1176            | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
1177            #[cfg(feature = "avro_custom_types")]
1178            Self::DurationSecond(values)
1179            | Self::DurationMillisecond(values)
1180            | Self::DurationMicrosecond(values)
1181            | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
1182            #[cfg(feature = "avro_custom_types")]
1183            Self::Int8(values) => {
1184                let raw = buf.get_int()?;
1185                let x = i8::try_from(raw).map_err(|_| {
1186                    AvroError::ParseError(format!("int8 value {raw} out of range for i8"))
1187                })?;
1188                values.push(x);
1189            }
1190            #[cfg(feature = "avro_custom_types")]
1191            Self::Int16(values) => {
1192                let raw = buf.get_int()?;
1193                let x = i16::try_from(raw).map_err(|_| {
1194                    AvroError::ParseError(format!("int16 value {raw} out of range for i16"))
1195                })?;
1196                values.push(x);
1197            }
1198            #[cfg(feature = "avro_custom_types")]
1199            Self::UInt8(values) => {
1200                let raw = buf.get_int()?;
1201                let x = u8::try_from(raw).map_err(|_| {
1202                    AvroError::ParseError(format!("uint8 value {raw} out of range for u8"))
1203                })?;
1204                values.push(x);
1205            }
1206            #[cfg(feature = "avro_custom_types")]
1207            Self::UInt16(values) => {
1208                let raw = buf.get_int()?;
1209                let x = u16::try_from(raw).map_err(|_| {
1210                    AvroError::ParseError(format!("uint16 value {raw} out of range for u16"))
1211                })?;
1212                values.push(x);
1213            }
1214            #[cfg(feature = "avro_custom_types")]
1215            Self::UInt32(values) => {
1216                let raw = buf.get_long()?;
1217                let x = u32::try_from(raw).map_err(|_| {
1218                    AvroError::ParseError(format!("uint32 value {raw} out of range for u32"))
1219                })?;
1220                values.push(x);
1221            }
1222            #[cfg(feature = "avro_custom_types")]
1223            Self::UInt64(values) => {
1224                let b = buf.get_fixed(8)?;
1225                values.push(u64::from_le_bytes([
1226                    b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
1227                ]));
1228            }
1229            #[cfg(feature = "avro_custom_types")]
1230            Self::Float16(values) => {
1231                let b = buf.get_fixed(2)?;
1232                values.push(u16::from_le_bytes([b[0], b[1]]));
1233            }
1234            #[cfg(feature = "avro_custom_types")]
1235            Self::Date64(values) | Self::TimeNanos(values) | Self::TimestampSecs(_, values) => {
1236                values.push(buf.get_long()?)
1237            }
1238            #[cfg(feature = "avro_custom_types")]
1239            Self::Time32Secs(values) => values.push(buf.get_int()?),
1240            #[cfg(feature = "avro_custom_types")]
1241            Self::IntervalYearMonth(values) => {
1242                let b = buf.get_fixed(4)?;
1243                values.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
1244            }
1245            #[cfg(feature = "avro_custom_types")]
1246            Self::IntervalMonthDayNano(values) => {
1247                let b = buf.get_fixed(16)?;
1248                let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1249                let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1250                let nanos =
1251                    i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
1252                values.push(IntervalMonthDayNano::new(months, days, nanos));
1253            }
1254            #[cfg(feature = "avro_custom_types")]
1255            Self::IntervalDayTime(values) => {
1256                let b = buf.get_fixed(8)?;
1257                // Read as two i32s: days (4 bytes) and milliseconds (4 bytes)
1258                let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1259                let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1260                values.push(IntervalDayTime::new(days, milliseconds));
1261            }
1262            Self::Float32(values) => values.push(buf.get_float()?),
1263            Self::Float64(values) => values.push(buf.get_double()?),
1264            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
1265            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
1266            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
1267            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
1268            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
1269            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
1270            Self::StringToBytes(offsets, values)
1271            | Self::BytesToString(offsets, values)
1272            | Self::Binary(offsets, values)
1273            | Self::String(offsets, values)
1274            | Self::StringView(offsets, values) => {
1275                let data = buf.get_bytes()?;
1276                offsets.push_length(data.len());
1277                values.extend_from_slice(data);
1278            }
1279            Self::Uuid(values) => {
1280                let s_bytes = buf.get_bytes()?;
1281                let s = std::str::from_utf8(s_bytes).map_err(|e| {
1282                    AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
1283                })?;
1284                let uuid = Uuid::try_parse(s)
1285                    .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
1286                values.extend_from_slice(uuid.as_bytes());
1287            }
1288            Self::Array(_, off, encoding) => {
1289                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
1290                off.push_length(total_items);
1291            }
1292            Self::Record(_, encodings, _, None) => {
1293                for encoding in encodings {
1294                    encoding.decode(buf)?;
1295                }
1296            }
1297            Self::Record(_, encodings, _, Some(proj)) => {
1298                proj.project_record(buf, encodings)?;
1299            }
1300            Self::Map(_, koff, moff, kdata, valdec) => {
1301                let newly_added = read_blocks(buf, |cur| {
1302                    let kb = cur.get_bytes()?;
1303                    koff.push_length(kb.len());
1304                    kdata.extend_from_slice(kb);
1305                    valdec.decode(cur)
1306                })?;
1307                moff.push_length(newly_added);
1308            }
1309            Self::Fixed(sz, accum) => {
1310                let fx = buf.get_fixed(*sz as usize)?;
1311                accum.extend_from_slice(fx);
1312            }
1313            #[cfg(feature = "small_decimals")]
1314            Self::Decimal32(_, _, size, builder) => {
1315                decode_decimal!(size, buf, builder, 4, i32);
1316            }
1317            #[cfg(feature = "small_decimals")]
1318            Self::Decimal64(_, _, size, builder) => {
1319                decode_decimal!(size, buf, builder, 8, i64);
1320            }
1321            Self::Decimal128(_, _, size, builder) => {
1322                decode_decimal!(size, buf, builder, 16, i128);
1323            }
1324            Self::Decimal256(_, _, size, builder) => {
1325                decode_decimal!(size, buf, builder, 32, i256);
1326            }
1327            Self::Enum(indices, _, None) => {
1328                indices.push(buf.get_int()?);
1329            }
1330            Self::Enum(indices, _, Some(res)) => {
1331                let raw = buf.get_int()?;
1332                let resolved = res.resolve(raw)?;
1333                indices.push(resolved);
1334            }
1335            Self::Duration(builder) => {
1336                let b = buf.get_fixed(12)?;
1337                let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1338                let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1339                let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1340                let nanos = (millis as i64) * 1_000_000;
1341                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
1342            }
1343            #[cfg(feature = "avro_custom_types")]
1344            Self::RunEndEncoded(_, len, inner) => {
1345                *len += 1;
1346                inner.decode(buf)?;
1347            }
1348            Self::Union(u) => u.decode(buf)?,
1349            Self::Nullable(plan, nb, encoding) => {
1350                match plan {
1351                    NullablePlan::FromSingle { resolution } => {
1352                        encoding.decode_with_resolution(buf, resolution)?;
1353                        nb.append(true);
1354                    }
1355                    NullablePlan::ReadTag {
1356                        nullability,
1357                        resolution,
1358                    } => {
1359                        let branch = buf.read_vlq()?;
1360                        let is_not_null = match *nullability {
1361                            Nullability::NullFirst => branch != 0,
1362                            Nullability::NullSecond => branch == 0,
1363                        };
1364                        if is_not_null {
1365                            // It is important to decode before appending to null buffer in case of decode error
1366                            encoding.decode_with_resolution(buf, resolution)?;
1367                        } else {
1368                            encoding.append_null()?;
1369                        }
1370                        nb.append(is_not_null);
1371                    }
1372                }
1373            }
1374        }
1375        Ok(())
1376    }
1377
1378    fn decode_with_promotion(
1379        &mut self,
1380        buf: &mut AvroCursor<'_>,
1381        promotion: Promotion,
1382    ) -> Result<(), AvroError> {
1383        #[cfg(feature = "avro_custom_types")]
1384        if let Self::RunEndEncoded(_, len, inner) = self {
1385            *len += 1;
1386            return inner.decode_with_promotion(buf, promotion);
1387        }
1388
1389        macro_rules! promote_numeric_to {
1390            ($variant:ident, $getter:ident, $to:ty) => {{
1391                match self {
1392                    Self::$variant(v) => {
1393                        let x = buf.$getter()?;
1394                        v.push(x as $to);
1395                        Ok(())
1396                    }
1397                    other => Err(AvroError::ParseError(format!(
1398                        "Promotion {promotion} target mismatch: expected {}, got {}",
1399                        stringify!($variant),
1400                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
1401                    ))),
1402                }
1403            }};
1404        }
1405        match promotion {
1406            Promotion::Direct => self.decode(buf),
1407            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1408            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1409            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1410            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1411            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1412            Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1413            Promotion::StringToBytes => match self {
1414                Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1415                    let data = buf.get_bytes()?;
1416                    offsets.push_length(data.len());
1417                    values.extend_from_slice(data);
1418                    Ok(())
1419                }
1420                other => Err(AvroError::ParseError(format!(
1421                    "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1422                    <Self as AsRef<str>>::as_ref(other)
1423                ))),
1424            },
1425            Promotion::BytesToString => match self {
1426                Self::String(offsets, values)
1427                | Self::StringView(offsets, values)
1428                | Self::BytesToString(offsets, values) => {
1429                    let data = buf.get_bytes()?;
1430                    offsets.push_length(data.len());
1431                    values.extend_from_slice(data);
1432                    Ok(())
1433                }
1434                other => Err(AvroError::ParseError(format!(
1435                    "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1436                    <Self as AsRef<str>>::as_ref(other)
1437                ))),
1438            },
1439        }
1440    }
1441
1442    fn decode_with_resolution<'d>(
1443        &'d mut self,
1444        buf: &mut AvroCursor<'_>,
1445        resolution: &'d ResolutionPlan,
1446    ) -> Result<(), AvroError> {
1447        #[cfg(feature = "avro_custom_types")]
1448        if let Self::RunEndEncoded(_, len, inner) = self {
1449            *len += 1;
1450            return inner.decode_with_resolution(buf, resolution);
1451        }
1452
1453        match resolution {
1454            ResolutionPlan::Promotion(promotion) => {
1455                let promotion = *promotion;
1456                self.decode_with_promotion(buf, promotion)
1457            }
1458            ResolutionPlan::DefaultValue(lit) => self.append_default(lit),
1459            ResolutionPlan::EnumMapping(res) => {
1460                let Self::Enum(indices, _, _) = self else {
1461                    return Err(AvroError::SchemaError(
1462                        "enum mapping resolution provided for non-enum decoder".into(),
1463                    ));
1464                };
1465                let raw = buf.get_int()?;
1466                let resolved = res.resolve(raw)?;
1467                indices.push(resolved);
1468                Ok(())
1469            }
1470            ResolutionPlan::Record(proj) => {
1471                let Self::Record(_, encodings, _, _) = self else {
1472                    return Err(AvroError::SchemaError(
1473                        "record projection provided for non-record decoder".into(),
1474                    ));
1475                };
1476                proj.project_record(buf, encodings)
1477            }
1478        }
1479    }
1480
1481    /// Flush decoded records to an [`ArrayRef`]
1482    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1483        Ok(match self {
1484            Self::Nullable(_, n, e) => e.flush(n.finish())?,
1485            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1486            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1487            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1488            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1489            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1490            Self::TimeMillis(values) => {
1491                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1492            }
1493            Self::TimeMicros(values) => {
1494                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1495            }
1496            Self::TimestampMillis(tz, values) => Arc::new(
1497                flush_primitive::<TimestampMillisecondType>(values, nulls)
1498                    .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1499            ),
1500            Self::TimestampMicros(tz, values) => Arc::new(
1501                flush_primitive::<TimestampMicrosecondType>(values, nulls)
1502                    .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1503            ),
1504            Self::TimestampNanos(tz, values) => Arc::new(
1505                flush_primitive::<TimestampNanosecondType>(values, nulls)
1506                    .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1507            ),
1508            #[cfg(feature = "avro_custom_types")]
1509            Self::DurationSecond(values) => {
1510                Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1511            }
1512            #[cfg(feature = "avro_custom_types")]
1513            Self::DurationMillisecond(values) => {
1514                Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1515            }
1516            #[cfg(feature = "avro_custom_types")]
1517            Self::DurationMicrosecond(values) => {
1518                Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1519            }
1520            #[cfg(feature = "avro_custom_types")]
1521            Self::DurationNanosecond(values) => {
1522                Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1523            }
1524            #[cfg(feature = "avro_custom_types")]
1525            Self::Int8(values) => Arc::new(flush_primitive::<Int8Type>(values, nulls)),
1526            #[cfg(feature = "avro_custom_types")]
1527            Self::Int16(values) => Arc::new(flush_primitive::<Int16Type>(values, nulls)),
1528            #[cfg(feature = "avro_custom_types")]
1529            Self::UInt8(values) => Arc::new(flush_primitive::<UInt8Type>(values, nulls)),
1530            #[cfg(feature = "avro_custom_types")]
1531            Self::UInt16(values) => Arc::new(flush_primitive::<UInt16Type>(values, nulls)),
1532            #[cfg(feature = "avro_custom_types")]
1533            Self::UInt32(values) => Arc::new(flush_primitive::<UInt32Type>(values, nulls)),
1534            #[cfg(feature = "avro_custom_types")]
1535            Self::UInt64(values) => Arc::new(flush_primitive::<UInt64Type>(values, nulls)),
1536            #[cfg(feature = "avro_custom_types")]
1537            Self::Float16(values) => {
1538                // Convert Vec<u16> to Float16Array by reinterpreting the raw bytes.
1539                // This is safe because f16 and u16 have the same size and alignment.
1540                let len = values.len();
1541                let buf: Buffer = std::mem::take(values).into();
1542                let scalar_buf = ScalarBuffer::new(buf, 0, len);
1543                Arc::new(Float16Array::new(scalar_buf, nulls))
1544            }
1545            #[cfg(feature = "avro_custom_types")]
1546            Self::Date64(values) => Arc::new(flush_primitive::<Date64Type>(values, nulls)),
1547            #[cfg(feature = "avro_custom_types")]
1548            Self::TimeNanos(values) => {
1549                Arc::new(flush_primitive::<Time64NanosecondType>(values, nulls))
1550            }
1551            #[cfg(feature = "avro_custom_types")]
1552            Self::Time32Secs(values) => {
1553                Arc::new(flush_primitive::<Time32SecondType>(values, nulls))
1554            }
1555            #[cfg(feature = "avro_custom_types")]
1556            Self::TimestampSecs(is_utc, values) => Arc::new(
1557                flush_primitive::<TimestampSecondType>(values, nulls)
1558                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1559            ),
1560            #[cfg(feature = "avro_custom_types")]
1561            Self::IntervalYearMonth(values) => {
1562                Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
1563            }
1564            #[cfg(feature = "avro_custom_types")]
1565            Self::IntervalMonthDayNano(values) => {
1566                Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1567            }
1568            #[cfg(feature = "avro_custom_types")]
1569            Self::IntervalDayTime(values) => {
1570                Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
1571            }
1572            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1573            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1574            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1575            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1576                Arc::new(flush_primitive::<Float32Type>(values, nulls))
1577            }
1578            Self::Int32ToFloat64(values)
1579            | Self::Int64ToFloat64(values)
1580            | Self::Float32ToFloat64(values) => {
1581                Arc::new(flush_primitive::<Float64Type>(values, nulls))
1582            }
1583            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1584                let offsets = flush_offsets(offsets);
1585                let values = flush_values(values).into();
1586                Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1587            }
1588            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1589                let offsets = flush_offsets(offsets);
1590                let values = flush_values(values).into();
1591                Arc::new(StringArray::try_new(offsets, values, nulls)?)
1592            }
1593            Self::StringView(offsets, values) => {
1594                let offsets = flush_offsets(offsets);
1595                let values = flush_values(values);
1596                let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1597                let values: Vec<&str> = (0..array.len())
1598                    .map(|i| {
1599                        if array.is_valid(i) {
1600                            array.value(i)
1601                        } else {
1602                            ""
1603                        }
1604                    })
1605                    .collect();
1606                Arc::new(StringViewArray::from(values))
1607            }
1608            Self::Array(field, offsets, values) => {
1609                let values = values.flush(None)?;
1610                let offsets = flush_offsets(offsets);
1611                Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1612            }
1613            Self::Record(fields, encodings, _, _) => {
1614                let arrays = encodings
1615                    .iter_mut()
1616                    .map(|x| x.flush(None))
1617                    .collect::<Result<Vec<_>, _>>()?;
1618                Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1619            }
1620            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1621                let moff = flush_offsets(m_off);
1622                let koff = flush_offsets(k_off);
1623                let kd = flush_values(kdata).into();
1624                let val_arr = valdec.flush(None)?;
1625                let key_arr = StringArray::try_new(koff, kd, None)?;
1626                if key_arr.len() != val_arr.len() {
1627                    return Err(AvroError::InvalidArgument(format!(
1628                        "Map keys length ({}) != map values length ({})",
1629                        key_arr.len(),
1630                        val_arr.len()
1631                    )));
1632                }
1633                let final_len = moff.len() - 1;
1634                if let Some(n) = &nulls {
1635                    if n.len() != final_len {
1636                        return Err(AvroError::InvalidArgument(format!(
1637                            "Map array null buffer length {} != final map length {final_len}",
1638                            n.len()
1639                        )));
1640                    }
1641                }
1642                let entries_fields = match map_field.data_type() {
1643                    DataType::Struct(fields) => fields.clone(),
1644                    other => {
1645                        return Err(AvroError::InvalidArgument(format!(
1646                            "Map entries field must be a Struct, got {other:?}"
1647                        )));
1648                    }
1649                };
1650                let entries_struct =
1651                    StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1652                let map_arr =
1653                    MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1654                Arc::new(map_arr)
1655            }
1656            Self::Fixed(sz, accum) => {
1657                let b: Buffer = flush_values(accum).into();
1658                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1659                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1660                Arc::new(arr)
1661            }
1662            Self::Uuid(values) => {
1663                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1664                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1665                Arc::new(arr)
1666            }
1667            #[cfg(feature = "small_decimals")]
1668            Self::Decimal32(precision, scale, _, builder) => {
1669                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1670            }
1671            #[cfg(feature = "small_decimals")]
1672            Self::Decimal64(precision, scale, _, builder) => {
1673                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1674            }
1675            Self::Decimal128(precision, scale, _, builder) => {
1676                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1677            }
1678            Self::Decimal256(precision, scale, _, builder) => {
1679                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1680            }
1681            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1682            Self::Duration(builder) => {
1683                let (_, vals, _) = builder.finish().into_parts();
1684                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1685                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1686                Arc::new(vals)
1687            }
1688            #[cfg(feature = "avro_custom_types")]
1689            Self::RunEndEncoded(width, len, inner) => {
1690                let values = inner.flush(nulls)?;
1691                let n = *len;
1692                let arr = values.as_ref();
1693                let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1694                if n > 0 {
1695                    run_starts.push(0);
1696                    for i in 1..n {
1697                        if !values_equal_at(arr, i - 1, i) {
1698                            run_starts.push(i);
1699                        }
1700                    }
1701                }
1702                if n > (u32::MAX as usize) {
1703                    return Err(AvroError::InvalidArgument(format!(
1704                        "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1705                    )));
1706                }
1707                let run_count = run_starts.len();
1708                let take_idx: PrimitiveArray<UInt32Type> =
1709                    run_starts.iter().map(|&s| s as u32).collect();
1710                let per_run_values = if run_count == 0 {
1711                    values.slice(0, 0)
1712                } else {
1713                    take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1714                        AvroError::ParseError(format!("take() for REE values failed: {e}"))
1715                    })?
1716                };
1717
1718                macro_rules! build_run_array {
1719                    ($Native:ty, $ArrowTy:ty) => {{
1720                        let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1721                        for (idx, &_start) in run_starts.iter().enumerate() {
1722                            let end = if idx + 1 < run_count {
1723                                run_starts[idx + 1]
1724                            } else {
1725                                n
1726                            };
1727                            ends.push(end as $Native);
1728                        }
1729                        let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1730                        let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1731                            .map_err(|e| AvroError::ParseError(e.to_string()))?;
1732                        Arc::new(run_arr) as ArrayRef
1733                    }};
1734                }
1735                match *width {
1736                    2 => {
1737                        if n > i16::MAX as usize {
1738                            return Err(AvroError::InvalidArgument(format!(
1739                                "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1740                            )));
1741                        }
1742                        build_run_array!(i16, Int16Type)
1743                    }
1744                    4 => build_run_array!(i32, Int32Type),
1745                    8 => build_run_array!(i64, Int64Type),
1746                    other => {
1747                        return Err(AvroError::InvalidArgument(format!(
1748                            "Unsupported run-end width {other} for RunEndEncoded"
1749                        )));
1750                    }
1751                }
1752            }
1753            Self::Union(u) => u.flush(nulls)?,
1754        })
1755    }
1756}
1757
1758/// Runtime plan for decoding reader-side `["null", T]` types.
1759#[derive(Debug)]
1760enum NullablePlan {
1761    /// Writer actually wrote a union (branch tag present).
1762    ReadTag {
1763        nullability: Nullability,
1764        resolution: ResolutionPlan,
1765    },
1766    /// Writer wrote a single (non-union) value resolved to the non-null branch
1767    /// of the reader union; do NOT read a branch tag, but apply any resolution.
1768    FromSingle { resolution: ResolutionPlan },
1769}
1770
1771/// Runtime plan for resolving writer-reader type differences.
1772#[derive(Debug)]
1773enum ResolutionPlan {
1774    /// Indicates that the writer's type should be promoted to the reader's type.
1775    Promotion(Promotion),
1776    /// Provides a default value for the field missing in the writer type.
1777    DefaultValue(AvroLiteral),
1778    /// Provides mapping information for resolving enums.
1779    EnumMapping(EnumResolution),
1780    /// Provides projection information for record fields.
1781    Record(Projector),
1782}
1783
1784impl ResolutionPlan {
1785    fn try_new(decoder: &Decoder, resolution: &ResolutionInfo) -> Result<Self, AvroError> {
1786        match (decoder, resolution) {
1787            (_, ResolutionInfo::Promotion(p)) => Ok(ResolutionPlan::Promotion(*p)),
1788            (_, ResolutionInfo::DefaultValue(lit)) => Ok(ResolutionPlan::DefaultValue(lit.clone())),
1789            (_, ResolutionInfo::EnumMapping(m)) => {
1790                Ok(ResolutionPlan::EnumMapping(EnumResolution::new(m)))
1791            }
1792            (Decoder::Record(_, _, field_defaults, _), ResolutionInfo::Record(r)) => Ok(
1793                ResolutionPlan::Record(ProjectorBuilder::try_new(r, field_defaults).build()?),
1794            ),
1795            (_, ResolutionInfo::Record(_)) => Err(AvroError::SchemaError(
1796                "record resolution on non-record decoder".into(),
1797            )),
1798            (_, ResolutionInfo::Union(_)) => Err(AvroError::SchemaError(
1799                "union variant cannot be resolved to a union type".into(),
1800            )),
1801        }
1802    }
1803}
1804
1805#[derive(Debug)]
1806struct EnumResolution {
1807    mapping: Arc<[i32]>,
1808    default_index: i32,
1809}
1810
1811impl EnumResolution {
1812    fn new(mapping: &EnumMapping) -> Self {
1813        EnumResolution {
1814            mapping: mapping.mapping.clone(),
1815            default_index: mapping.default_index,
1816        }
1817    }
1818
1819    fn resolve(&self, index: i32) -> Result<i32, AvroError> {
1820        let resolved = usize::try_from(index)
1821            .ok()
1822            .and_then(|idx| self.mapping.get(idx).copied())
1823            .filter(|&idx| idx >= 0)
1824            .unwrap_or(self.default_index);
1825        if resolved >= 0 {
1826            Ok(resolved)
1827        } else {
1828            Err(AvroError::ParseError(format!(
1829                "Enum symbol index {index} not resolvable and no default provided",
1830            )))
1831        }
1832    }
1833}
1834
1835// A lookup table for resolving fields between writer and reader schemas during record projection.
1836#[derive(Debug)]
1837struct DispatchLookupTable {
1838    // Maps each reader field index `r` to the corresponding writer field index.
1839    //
1840    // Semantics:
1841    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. The value from
1842    //   the writer field is decoded, and `promotion[r]` is applied.
1843    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. The reader field's
1844    //   default value is used.
1845    //
1846    // Representation (`i8`):
1847    // `i8` is used for a dense, cache-friendly dispatch table, consistent with Arrow's use of
1848    // `i8` for union type IDs. This requires that writer field indices do not exceed `i8::MAX`.
1849    //
1850    // Invariants:
1851    // - `to_reader.len() == promotion.len()` and matches the reader field count.
1852    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
1853    to_reader: Box<[i8]>,
1854    // For each reader field `r`, specifies the resolution to apply to the writer's value.
1855    //
1856    // This is used when a writer field's type can be promoted to a reader field's type
1857    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
1858    resolution: Box<[ResolutionPlan]>,
1859}
1860
1861// Sentinel used in `DispatchLookupTable::to_reader` to mark
1862// "no matching writer field".
1863const NO_SOURCE: i8 = -1;
1864
1865impl DispatchLookupTable {
1866    fn from_writer_to_reader(
1867        reader_branches: &[Decoder],
1868        resolution_map: &[Option<(usize, ResolutionInfo)>],
1869    ) -> Result<Self, AvroError> {
1870        let mut to_reader = Vec::with_capacity(resolution_map.len());
1871        let mut resolution = Vec::with_capacity(resolution_map.len());
1872        for map in resolution_map {
1873            match map {
1874                Some((idx, res)) => {
1875                    let idx = *idx;
1876                    let idx_i8 = i8::try_from(idx).map_err(|_| {
1877                        AvroError::SchemaError(format!(
1878                            "Reader branch index {idx} exceeds i8 range (max {})",
1879                            i8::MAX
1880                        ))
1881                    })?;
1882                    let plan = ResolutionPlan::try_new(&reader_branches[idx], res)?;
1883                    to_reader.push(idx_i8);
1884                    resolution.push(plan);
1885                }
1886                None => {
1887                    to_reader.push(NO_SOURCE);
1888                    resolution.push(ResolutionPlan::DefaultValue(AvroLiteral::Null));
1889                }
1890            }
1891        }
1892        Ok(Self {
1893            to_reader: to_reader.into_boxed_slice(),
1894            resolution: resolution.into_boxed_slice(),
1895        })
1896    }
1897
1898    // Resolve a writer branch index to (reader_idx, resolution)
1899    #[inline]
1900    fn resolve(&self, writer_index: usize) -> Option<(usize, &ResolutionPlan)> {
1901        let reader_index = *self.to_reader.get(writer_index)?;
1902        (reader_index >= 0).then(|| (reader_index as usize, &self.resolution[writer_index]))
1903    }
1904}
1905
1906#[derive(Debug)]
1907struct UnionDecoder {
1908    fields: UnionFields,
1909    branches: UnionDecoderBranches,
1910    default_emit_idx: usize,
1911    null_emit_idx: usize,
1912    plan: UnionReadPlan,
1913}
1914
1915#[derive(Debug, Default)]
1916struct UnionDecoderBranches {
1917    decoders: Vec<Decoder>,
1918    reader_type_codes: Vec<i8>,
1919    type_ids: Vec<i8>,
1920    offsets: Vec<i32>,
1921    counts: Vec<i32>,
1922}
1923
1924impl UnionDecoderBranches {
1925    fn new(decoders: Vec<Decoder>, reader_type_codes: Vec<i8>) -> Self {
1926        let branch_len = decoders.len().max(reader_type_codes.len());
1927        Self {
1928            decoders,
1929            reader_type_codes,
1930            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1931            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1932            counts: vec![0; branch_len],
1933        }
1934    }
1935
1936    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1937        let branches_len = self.decoders.len();
1938        let Some(reader_branch) = self.decoders.get_mut(reader_idx) else {
1939            return Err(AvroError::ParseError(format!(
1940                "Union branch index {reader_idx} out of range ({branches_len} branches)"
1941            )));
1942        };
1943        self.type_ids.push(self.reader_type_codes[reader_idx]);
1944        self.offsets.push(self.counts[reader_idx]);
1945        self.counts[reader_idx] += 1;
1946        Ok(reader_branch)
1947    }
1948}
1949
1950impl Default for UnionDecoder {
1951    fn default() -> Self {
1952        Self {
1953            fields: UnionFields::empty(),
1954            branches: Default::default(),
1955            default_emit_idx: 0,
1956            null_emit_idx: 0,
1957            plan: UnionReadPlan::Passthrough,
1958        }
1959    }
1960}
1961
1962#[derive(Debug)]
1963enum UnionReadPlan {
1964    ReaderUnion {
1965        lookup_table: DispatchLookupTable,
1966    },
1967    FromSingle {
1968        reader_idx: usize,
1969        resolution: ResolutionPlan,
1970    },
1971    ToSingle {
1972        target: Box<Decoder>,
1973        lookup_table: DispatchLookupTable,
1974    },
1975    Passthrough,
1976}
1977
1978impl UnionReadPlan {
1979    fn from_resolved(
1980        reader_branches: &[Decoder],
1981        resolved: Option<ResolvedUnion>,
1982    ) -> Result<Self, AvroError> {
1983        let Some(info) = resolved else {
1984            return Ok(Self::Passthrough);
1985        };
1986        match (info.writer_is_union, info.reader_is_union) {
1987            (true, true) => {
1988                let lookup_table =
1989                    DispatchLookupTable::from_writer_to_reader(reader_branches, &info.writer_to_reader)?;
1990                Ok(Self::ReaderUnion { lookup_table })
1991            }
1992            (false, true) => {
1993                let Some((idx, resolution)) =
1994                    info.writer_to_reader.first().and_then(Option::as_ref)
1995                else {
1996                    return Err(AvroError::SchemaError(
1997                        "Writer type does not match any reader union branch".to_string(),
1998                    ));
1999                };
2000                let reader_idx = *idx;
2001                Ok(Self::FromSingle {
2002                    reader_idx,
2003                    resolution: ResolutionPlan::try_new(&reader_branches[reader_idx], resolution)?,
2004                })
2005            }
2006            (true, false) => Err(AvroError::InvalidArgument(
2007                "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
2008                    .to_string(),
2009            )),
2010            // (false, false) is invalid and should never be constructed by the resolver.
2011            _ => Err(AvroError::SchemaError(
2012                "ResolvedUnion constructed for non-union sides; resolver should return None"
2013                    .to_string(),
2014            )),
2015        }
2016    }
2017}
2018
2019impl UnionDecoder {
2020    fn try_new(
2021        fields: UnionFields,
2022        branches: Vec<Decoder>,
2023        resolved: Option<ResolvedUnion>,
2024    ) -> Result<Self, AvroError> {
2025        let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
2026        let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
2027        let default_emit_idx = 0;
2028        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
2029        // Guard against impractically large unions that cannot be indexed by an Avro int
2030        let max_addr = (i32::MAX as usize) + 1;
2031        if branches.len() > max_addr {
2032            return Err(AvroError::SchemaError(format!(
2033                "Reader union has {} branches, which exceeds the maximum addressable \
2034                 branches by an Avro int tag ({} + 1).",
2035                branches.len(),
2036                i32::MAX
2037            )));
2038        }
2039        let plan = UnionReadPlan::from_resolved(&branches, resolved)?;
2040        Ok(Self {
2041            fields,
2042            branches: UnionDecoderBranches::new(branches, reader_type_codes),
2043            default_emit_idx,
2044            null_emit_idx,
2045            plan,
2046        })
2047    }
2048
2049    fn with_single_target(target: Decoder, info: ResolvedUnion) -> Result<Self, AvroError> {
2050        // This constructor is only for writer-union to single-type resolution
2051        debug_assert!(info.writer_is_union && !info.reader_is_union);
2052        let mut reader_branches = [target];
2053        let lookup_table =
2054            DispatchLookupTable::from_writer_to_reader(&reader_branches, &info.writer_to_reader)?;
2055        let target = Box::new(mem::replace(&mut reader_branches[0], Decoder::Null(0)));
2056        Ok(Self {
2057            plan: UnionReadPlan::ToSingle {
2058                target,
2059                lookup_table,
2060            },
2061            ..Self::default()
2062        })
2063    }
2064
2065    #[inline]
2066    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
2067        // Avro unions are encoded by first writing the zero-based branch index.
2068        // In Avro 1.11.1 this is specified as an *int*; older specs said *long*,
2069        // but both use zig-zag varint encoding, so decoding as long is compatible
2070        // with either form and widely used in practice.
2071        let raw = buf.get_long()?;
2072        if raw < 0 {
2073            return Err(AvroError::ParseError(format!(
2074                "Negative union branch index {raw}"
2075            )));
2076        }
2077        usize::try_from(raw).map_err(|_| {
2078            AvroError::ParseError(format!(
2079                "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2080                (usize::BITS as usize)
2081            ))
2082        })
2083    }
2084
2085    #[inline]
2086    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
2087    where
2088        F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
2089    {
2090        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2091            return action(target);
2092        }
2093        let reader_idx = match &self.plan {
2094            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
2095            _ => fallback_idx,
2096        };
2097        self.branches.emit_to(reader_idx).and_then(action)
2098    }
2099
2100    fn append_null(&mut self) -> Result<(), AvroError> {
2101        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
2102    }
2103
2104    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
2105        self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
2106    }
2107
2108    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2109        match &mut self.plan {
2110            UnionReadPlan::Passthrough => {
2111                let reader_idx = Self::read_tag(buf)?;
2112                let decoder = self.branches.emit_to(reader_idx)?;
2113                decoder.decode(buf)
2114            }
2115            UnionReadPlan::ReaderUnion { lookup_table } => {
2116                let idx = Self::read_tag(buf)?;
2117                let Some((reader_idx, resolution)) = lookup_table.resolve(idx) else {
2118                    return Err(AvroError::ParseError(format!(
2119                        "Union branch index {idx} not resolvable by reader schema"
2120                    )));
2121                };
2122                let decoder = self.branches.emit_to(reader_idx)?;
2123                decoder.decode_with_resolution(buf, resolution)
2124            }
2125            UnionReadPlan::FromSingle {
2126                reader_idx,
2127                resolution,
2128            } => {
2129                let decoder = self.branches.emit_to(*reader_idx)?;
2130                decoder.decode_with_resolution(buf, resolution)
2131            }
2132            UnionReadPlan::ToSingle {
2133                target,
2134                lookup_table,
2135            } => {
2136                let idx = Self::read_tag(buf)?;
2137                let Some((_, resolution)) = lookup_table.resolve(idx) else {
2138                    return Err(AvroError::ParseError(format!(
2139                        "Writer union branch index {idx} not resolvable by reader schema"
2140                    )));
2141                };
2142                target.decode_with_resolution(buf, resolution)
2143            }
2144        }
2145    }
2146
2147    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
2148        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2149            return target.flush(nulls);
2150        }
2151        debug_assert!(
2152            nulls.is_none(),
2153            "UnionArray does not accept a validity bitmap; \
2154                     nulls should have been materialized as a Null child during decode"
2155        );
2156        let children = self
2157            .branches
2158            .decoders
2159            .iter_mut()
2160            .map(|d| d.flush(None))
2161            .collect::<Result<Vec<_>, _>>()?;
2162        let arr = UnionArray::try_new(
2163            self.fields.clone(),
2164            flush_values(&mut self.branches.type_ids)
2165                .into_iter()
2166                .collect(),
2167            Some(
2168                flush_values(&mut self.branches.offsets)
2169                    .into_iter()
2170                    .collect(),
2171            ),
2172            children,
2173        )
2174        .map_err(|e| AvroError::ParseError(e.to_string()))?;
2175        Ok(Arc::new(arr))
2176    }
2177}
2178
2179#[derive(Debug, Default)]
2180struct UnionDecoderBuilder {
2181    fields: Option<UnionFields>,
2182    branches: Option<Vec<Decoder>>,
2183    resolved: Option<ResolvedUnion>,
2184    target: Option<Decoder>,
2185}
2186
2187impl UnionDecoderBuilder {
2188    fn new() -> Self {
2189        Self::default()
2190    }
2191
2192    fn with_fields(mut self, fields: UnionFields) -> Self {
2193        self.fields = Some(fields);
2194        self
2195    }
2196
2197    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
2198        self.branches = Some(branches);
2199        self
2200    }
2201
2202    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
2203        self.resolved = Some(resolved_union);
2204        self
2205    }
2206
2207    fn with_target(mut self, target: Decoder) -> Self {
2208        self.target = Some(target);
2209        self
2210    }
2211
2212    fn build(self) -> Result<UnionDecoder, AvroError> {
2213        match (self.resolved, self.fields, self.branches, self.target) {
2214            (resolved, Some(fields), Some(branches), None) => {
2215                UnionDecoder::try_new(fields, branches, resolved)
2216            }
2217            (Some(info), None, None, Some(target))
2218                if info.writer_is_union && !info.reader_is_union =>
2219            {
2220                UnionDecoder::with_single_target(target, info)
2221            }
2222            _ => Err(AvroError::InvalidArgument(
2223                "Invalid UnionDecoderBuilder configuration: expected either \
2224                 (fields + branches + resolved) with no target for reader-unions, or \
2225                 (resolved + target) with no fields/branches for writer-union to single."
2226                    .to_string(),
2227            )),
2228        }
2229    }
2230}
2231
2232#[derive(Debug, Copy, Clone)]
2233enum NegativeBlockBehavior {
2234    ProcessItems,
2235    SkipBySize,
2236}
2237
2238#[inline]
2239fn skip_blocks(
2240    buf: &mut AvroCursor,
2241    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2242) -> Result<usize, AvroError> {
2243    process_blockwise(
2244        buf,
2245        move |c| skip_item(c),
2246        NegativeBlockBehavior::SkipBySize,
2247    )
2248}
2249
2250#[inline]
2251fn flush_dict(
2252    indices: &mut Vec<i32>,
2253    symbols: &[String],
2254    nulls: Option<NullBuffer>,
2255) -> Result<ArrayRef, AvroError> {
2256    let keys = flush_primitive::<Int32Type>(indices, nulls);
2257    let values = Arc::new(StringArray::from_iter_values(
2258        symbols.iter().map(|s| s.as_str()),
2259    ));
2260    DictionaryArray::try_new(keys, values)
2261        .map_err(Into::into)
2262        .map(|arr| Arc::new(arr) as ArrayRef)
2263}
2264
2265#[inline]
2266fn read_blocks(
2267    buf: &mut AvroCursor,
2268    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2269) -> Result<usize, AvroError> {
2270    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
2271}
2272
2273#[inline]
2274fn process_blockwise(
2275    buf: &mut AvroCursor,
2276    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2277    negative_behavior: NegativeBlockBehavior,
2278) -> Result<usize, AvroError> {
2279    let mut total = 0usize;
2280    loop {
2281        // Read the block count
2282        //  positive = that many items
2283        //  negative = that many items + read block size
2284        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
2285        let block_count = buf.get_long()?;
2286        match block_count.cmp(&0) {
2287            Ordering::Equal => break,
2288            Ordering::Less => {
2289                let count = (-block_count) as usize;
2290                // A negative count is followed by a long of the size in bytes
2291                let size_in_bytes = buf.get_long()? as usize;
2292                match negative_behavior {
2293                    NegativeBlockBehavior::ProcessItems => {
2294                        // Process items one-by-one after reading size
2295                        for _ in 0..count {
2296                            on_item(buf)?;
2297                        }
2298                    }
2299                    NegativeBlockBehavior::SkipBySize => {
2300                        // Skip the entire block payload at once
2301                        let _ = buf.get_fixed(size_in_bytes)?;
2302                    }
2303                }
2304                total += count;
2305            }
2306            Ordering::Greater => {
2307                let count = block_count as usize;
2308                for _ in 0..count {
2309                    on_item(buf)?;
2310                }
2311                total += count;
2312            }
2313        }
2314    }
2315    Ok(total)
2316}
2317
2318#[inline]
2319fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
2320    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
2321}
2322
2323#[inline]
2324fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
2325    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
2326}
2327
2328#[inline]
2329fn flush_primitive<T: ArrowPrimitiveType>(
2330    values: &mut Vec<T::Native>,
2331    nulls: Option<NullBuffer>,
2332) -> PrimitiveArray<T> {
2333    PrimitiveArray::new(flush_values(values).into(), nulls)
2334}
2335
2336#[inline]
2337fn read_decimal_bytes_be<const N: usize>(
2338    buf: &mut AvroCursor<'_>,
2339    size: &Option<usize>,
2340) -> Result<[u8; N], AvroError> {
2341    match size {
2342        Some(n) if *n == N => {
2343            let raw = buf.get_fixed(N)?;
2344            let mut arr = [0u8; N];
2345            arr.copy_from_slice(raw);
2346            Ok(arr)
2347        }
2348        Some(n) => {
2349            let raw = buf.get_fixed(*n)?;
2350            sign_cast_to::<N>(raw)
2351        }
2352        None => {
2353            let raw = buf.get_bytes()?;
2354            sign_cast_to::<N>(raw)
2355        }
2356    }
2357}
2358
2359/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
2360/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
2361/// the payload is a big-endian two's-complement integer, and when narrowing it must
2362/// be representable without changing sign or value.
2363///
2364/// If `raw.len() < N`, the value is sign-extended.
2365/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
2366/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
2367#[inline]
2368fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
2369    let len = raw.len();
2370    // Fast path: exact width, just copy
2371    if len == N {
2372        let mut out = [0u8; N];
2373        out.copy_from_slice(raw);
2374        return Ok(out);
2375    }
2376    // Determine sign byte from MSB of first byte (empty => positive)
2377    let first = raw.first().copied().unwrap_or(0u8);
2378    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
2379    // Pre-fill with sign byte to support sign extension
2380    let mut out = [sign_byte; N];
2381    if len > N {
2382        // Validate truncation: all dropped leading bytes must equal sign_byte,
2383        // and the MSB of the first kept byte must match the sign.
2384        let extra = len - N;
2385        // Any non-sign byte in the truncated prefix indicates overflow
2386        if raw[..extra].iter().any(|&b| b != sign_byte) {
2387            return Err(AvroError::ParseError(format!(
2388                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2389                len, N
2390            )));
2391        }
2392        if N > 0 {
2393            let first_kept = raw[extra];
2394            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
2395            if sign_bit_mismatch {
2396                return Err(AvroError::ParseError(format!(
2397                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2398                    len, N
2399                )));
2400            }
2401        }
2402        out.copy_from_slice(&raw[extra..]);
2403        return Ok(out);
2404    }
2405    out[N - len..].copy_from_slice(raw);
2406    Ok(out)
2407}
2408
2409#[cfg(feature = "avro_custom_types")]
2410#[inline]
2411fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
2412    match (arr.is_null(i), arr.is_null(j)) {
2413        (true, true) => true,
2414        (true, false) | (false, true) => false,
2415        (false, false) => {
2416            let a = arr.slice(i, 1);
2417            let b = arr.slice(j, 1);
2418            a == b
2419        }
2420    }
2421}
2422
2423#[derive(Debug)]
2424struct Projector {
2425    writer_projections: Vec<FieldProjection>,
2426    default_injections: Arc<[(usize, AvroLiteral)]>,
2427}
2428
2429#[derive(Debug)]
2430enum FieldProjection {
2431    ToReader(usize),
2432    Skip(Skipper),
2433}
2434
2435#[derive(Debug)]
2436struct ProjectorBuilder<'a> {
2437    rec: &'a ResolvedRecord,
2438    field_defaults: &'a [Option<AvroLiteral>],
2439}
2440
2441impl<'a> ProjectorBuilder<'a> {
2442    #[inline]
2443    fn try_new(rec: &'a ResolvedRecord, field_defaults: &'a [Option<AvroLiteral>]) -> Self {
2444        Self {
2445            rec,
2446            field_defaults,
2447        }
2448    }
2449
2450    #[inline]
2451    fn build(self) -> Result<Projector, AvroError> {
2452        let mut default_injections: Vec<(usize, AvroLiteral)> =
2453            Vec::with_capacity(self.rec.default_fields.len());
2454        for &idx in self.rec.default_fields.as_ref() {
2455            let lit = self
2456                .field_defaults
2457                .get(idx)
2458                .and_then(|lit| lit.clone())
2459                .unwrap_or(AvroLiteral::Null);
2460            default_injections.push((idx, lit));
2461        }
2462        let writer_projections = self
2463            .rec
2464            .writer_fields
2465            .iter()
2466            .map(|field| match field {
2467                ResolvedField::ToReader(index, _) => Ok(FieldProjection::ToReader(*index)),
2468                ResolvedField::Skip(datatype) => {
2469                    let skipper = Skipper::from_avro(datatype)?;
2470                    Ok(FieldProjection::Skip(skipper))
2471                }
2472            })
2473            .collect::<Result<_, AvroError>>()?;
2474        Ok(Projector {
2475            writer_projections,
2476            default_injections: default_injections.into(),
2477        })
2478    }
2479}
2480
2481impl Projector {
2482    #[inline]
2483    fn project_record(
2484        &self,
2485        buf: &mut AvroCursor<'_>,
2486        encodings: &mut [Decoder],
2487    ) -> Result<(), AvroError> {
2488        for field_proj in self.writer_projections.iter() {
2489            match field_proj {
2490                FieldProjection::ToReader(index) => encodings[*index].decode(buf)?,
2491                FieldProjection::Skip(skipper) => skipper.skip(buf)?,
2492            }
2493        }
2494        for (reader_index, lit) in self.default_injections.as_ref() {
2495            encodings[*reader_index].append_default(lit)?;
2496        }
2497        Ok(())
2498    }
2499}
2500
2501/// Lightweight skipper for non‑projected writer fields
2502/// (fields present in the writer schema but omitted by the reader/projection);
2503/// per Avro 1.11.1 schema resolution these fields are ignored.
2504///
2505/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
2506#[derive(Debug)]
2507enum Skipper {
2508    Null,
2509    Boolean,
2510    Int32,
2511    Int64,
2512    Float32,
2513    Float64,
2514    Bytes,
2515    String,
2516    TimeMicros,
2517    TimestampMillis,
2518    TimestampMicros,
2519    TimestampNanos,
2520    Fixed(usize),
2521    Decimal(Option<usize>),
2522    UuidString,
2523    Enum,
2524    DurationFixed12,
2525    List(Box<Skipper>),
2526    Map(Box<Skipper>),
2527    Struct(Vec<Skipper>),
2528    Union(Vec<Skipper>),
2529    Nullable(Nullability, Box<Skipper>),
2530    #[cfg(feature = "avro_custom_types")]
2531    RunEndEncoded(Box<Skipper>),
2532}
2533
2534impl Skipper {
2535    fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
2536        let mut base = match dt.codec() {
2537            Codec::Null => Self::Null,
2538            Codec::Boolean => Self::Boolean,
2539            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
2540            Codec::Int64 => Self::Int64,
2541            Codec::TimeMicros => Self::TimeMicros,
2542            Codec::TimestampMillis(_) => Self::TimestampMillis,
2543            Codec::TimestampMicros(_) => Self::TimestampMicros,
2544            Codec::TimestampNanos(_) => Self::TimestampNanos,
2545            #[cfg(feature = "avro_custom_types")]
2546            Codec::DurationNanos
2547            | Codec::DurationMicros
2548            | Codec::DurationMillis
2549            | Codec::DurationSeconds => Self::Int64,
2550            #[cfg(feature = "avro_custom_types")]
2551            Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 | Codec::Time32Secs => {
2552                Self::Int32
2553            }
2554            #[cfg(feature = "avro_custom_types")]
2555            Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
2556                Self::Int64
2557            }
2558            #[cfg(feature = "avro_custom_types")]
2559            Codec::UInt64 => Self::Fixed(8),
2560            #[cfg(feature = "avro_custom_types")]
2561            Codec::Float16 => Self::Fixed(2),
2562            #[cfg(feature = "avro_custom_types")]
2563            Codec::IntervalYearMonth => Self::Fixed(4),
2564            #[cfg(feature = "avro_custom_types")]
2565            Codec::IntervalMonthDayNano => Self::Fixed(16),
2566            #[cfg(feature = "avro_custom_types")]
2567            Codec::IntervalDayTime => Self::Fixed(8),
2568            Codec::Float32 => Self::Float32,
2569            Codec::Float64 => Self::Float64,
2570            Codec::Binary => Self::Bytes,
2571            Codec::Utf8 | Codec::Utf8View => Self::String,
2572            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2573            Codec::Decimal(_, _, size) => Self::Decimal(*size),
2574            Codec::Uuid => Self::UuidString, // encoded as string
2575            Codec::Enum(_) => Self::Enum,
2576            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2577            Codec::Struct(fields) => {
2578                if let Some(ResolutionInfo::Record(rec)) = dt.resolution.as_ref() {
2579                    Self::Struct(
2580                        rec.writer_fields
2581                            .iter()
2582                            .map(|wf| match wf {
2583                                ResolvedField::ToReader(_, wdt) | ResolvedField::Skip(wdt) => {
2584                                    Skipper::from_avro(wdt)
2585                                }
2586                            })
2587                            .collect::<Result<_, _>>()?,
2588                    )
2589                } else {
2590                    Self::Struct(
2591                        fields
2592                            .iter()
2593                            .map(|f| Skipper::from_avro(f.data_type()))
2594                            .collect::<Result<_, _>>()?,
2595                    )
2596                }
2597            }
2598            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2599            Codec::Interval => Self::DurationFixed12,
2600            Codec::Union(encodings, _, _) => {
2601                let max_addr = (i32::MAX as usize) + 1;
2602                if encodings.len() > max_addr {
2603                    return Err(AvroError::SchemaError(format!(
2604                        "Writer union has {} branches, which exceeds the maximum addressable \
2605                         branches by an Avro int tag ({} + 1).",
2606                        encodings.len(),
2607                        i32::MAX
2608                    )));
2609                }
2610                Self::Union(
2611                    encodings
2612                        .iter()
2613                        .map(Skipper::from_avro)
2614                        .collect::<Result<_, _>>()?,
2615                )
2616            }
2617            #[cfg(feature = "avro_custom_types")]
2618            Codec::RunEndEncoded(inner, _w) => {
2619                Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2620            }
2621        };
2622        if let Some(n) = dt.nullability() {
2623            base = Self::Nullable(n, Box::new(base));
2624        }
2625        Ok(base)
2626    }
2627
2628    fn skip(&self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2629        match self {
2630            Self::Null => Ok(()),
2631            Self::Boolean => {
2632                buf.get_bool()?;
2633                Ok(())
2634            }
2635            Self::Int32 => {
2636                buf.skip_int()?;
2637                Ok(())
2638            }
2639            Self::Int64
2640            | Self::TimeMicros
2641            | Self::TimestampMillis
2642            | Self::TimestampMicros
2643            | Self::TimestampNanos => {
2644                buf.skip_long()?;
2645                Ok(())
2646            }
2647            Self::Float32 => {
2648                buf.get_float()?;
2649                Ok(())
2650            }
2651            Self::Float64 => {
2652                buf.get_double()?;
2653                Ok(())
2654            }
2655            Self::Bytes | Self::String | Self::UuidString => {
2656                buf.get_bytes()?;
2657                Ok(())
2658            }
2659            Self::Fixed(sz) => {
2660                buf.get_fixed(*sz)?;
2661                Ok(())
2662            }
2663            Self::Decimal(size) => {
2664                if let Some(s) = size {
2665                    buf.get_fixed(*s)
2666                } else {
2667                    buf.get_bytes()
2668                }?;
2669                Ok(())
2670            }
2671            Self::Enum => {
2672                buf.skip_int()?;
2673                Ok(())
2674            }
2675            Self::DurationFixed12 => {
2676                buf.get_fixed(12)?;
2677                Ok(())
2678            }
2679            Self::List(item) => {
2680                skip_blocks(buf, |c| item.skip(c))?;
2681                Ok(())
2682            }
2683            Self::Map(value) => {
2684                skip_blocks(buf, |c| {
2685                    c.get_bytes()?; // key
2686                    value.skip(c)
2687                })?;
2688                Ok(())
2689            }
2690            Self::Struct(fields) => {
2691                for f in fields.iter() {
2692                    f.skip(buf)?
2693                }
2694                Ok(())
2695            }
2696            Self::Union(encodings) => {
2697                // Union tag must be ZigZag-decoded
2698                let raw = buf.get_long()?;
2699                if raw < 0 {
2700                    return Err(AvroError::ParseError(format!(
2701                        "Negative union branch index {raw}"
2702                    )));
2703                }
2704                let idx: usize = usize::try_from(raw).map_err(|_| {
2705                    AvroError::ParseError(format!(
2706                        "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2707                        (usize::BITS as usize)
2708                    ))
2709                })?;
2710                let Some(encoding) = encodings.get(idx) else {
2711                    return Err(AvroError::ParseError(format!(
2712                        "Union branch index {idx} out of range for skipper ({} branches)",
2713                        encodings.len()
2714                    )));
2715                };
2716                encoding.skip(buf)
2717            }
2718            Self::Nullable(order, inner) => {
2719                let branch = buf.read_vlq()?;
2720                let is_not_null = match *order {
2721                    Nullability::NullFirst => branch != 0,
2722                    Nullability::NullSecond => branch == 0,
2723                };
2724                if is_not_null {
2725                    inner.skip(buf)?;
2726                }
2727                Ok(())
2728            }
2729            #[cfg(feature = "avro_custom_types")]
2730            Self::RunEndEncoded(inner) => inner.skip(buf),
2731        }
2732    }
2733}
2734
2735#[cfg(test)]
2736mod tests {
2737    use super::*;
2738    use crate::codec::AvroFieldBuilder;
2739    use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2740    use arrow_array::cast::AsArray;
2741    use indexmap::IndexMap;
2742    use std::collections::HashMap;
2743
2744    fn encode_avro_int(value: i32) -> Vec<u8> {
2745        let mut buf = Vec::new();
2746        let mut v = (value << 1) ^ (value >> 31);
2747        while v & !0x7F != 0 {
2748            buf.push(((v & 0x7F) | 0x80) as u8);
2749            v >>= 7;
2750        }
2751        buf.push(v as u8);
2752        buf
2753    }
2754
2755    fn encode_avro_long(value: i64) -> Vec<u8> {
2756        let mut buf = Vec::new();
2757        let mut v = (value << 1) ^ (value >> 63);
2758        while v & !0x7F != 0 {
2759            buf.push(((v & 0x7F) | 0x80) as u8);
2760            v >>= 7;
2761        }
2762        buf.push(v as u8);
2763        buf
2764    }
2765
2766    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2767        let mut buf = encode_avro_long(bytes.len() as i64);
2768        buf.extend_from_slice(bytes);
2769        buf
2770    }
2771
2772    fn avro_from_codec(codec: Codec) -> AvroDataType {
2773        AvroDataType::new(codec, Default::default(), None)
2774    }
2775
2776    fn resolved_root_datatype(
2777        writer: Schema<'static>,
2778        reader: Schema<'static>,
2779        use_utf8view: bool,
2780        strict_mode: bool,
2781    ) -> AvroDataType {
2782        // Wrap writer schema in a single-field record
2783        let writer_record = Schema::Complex(ComplexType::Record(Record {
2784            name: "Root",
2785            namespace: None,
2786            doc: None,
2787            aliases: vec![],
2788            fields: vec![Field {
2789                name: "v",
2790                r#type: writer,
2791                default: None,
2792                doc: None,
2793                aliases: vec![],
2794            }],
2795            attributes: Attributes::default(),
2796        }));
2797
2798        // Wrap reader schema in a single-field record
2799        let reader_record = Schema::Complex(ComplexType::Record(Record {
2800            name: "Root",
2801            namespace: None,
2802            doc: None,
2803            aliases: vec![],
2804            fields: vec![Field {
2805                name: "v",
2806                r#type: reader,
2807                default: None,
2808                doc: None,
2809                aliases: vec![],
2810            }],
2811            attributes: Attributes::default(),
2812        }));
2813
2814        // Build resolved record, then extract the inner field's resolved AvroDataType
2815        let field = AvroFieldBuilder::new(&writer_record)
2816            .with_reader_schema(&reader_record)
2817            .with_utf8view(use_utf8view)
2818            .with_strict_mode(strict_mode)
2819            .build()
2820            .expect("schema resolution should succeed");
2821
2822        match field.data_type().codec() {
2823            Codec::Struct(fields) => fields[0].data_type().clone(),
2824            other => panic!("expected wrapper struct, got {other:?}"),
2825        }
2826    }
2827
2828    fn decoder_for_promotion(
2829        writer: PrimitiveType,
2830        reader: PrimitiveType,
2831        use_utf8view: bool,
2832    ) -> Decoder {
2833        let ws = Schema::TypeName(TypeName::Primitive(writer));
2834        let rs = Schema::TypeName(TypeName::Primitive(reader));
2835        let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2836        Decoder::try_new(&dt).unwrap()
2837    }
2838
2839    fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2840        AvroDataType::new(codec, HashMap::new(), nullability)
2841    }
2842
2843    #[cfg(feature = "avro_custom_types")]
2844    fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2845        let mut out = Vec::with_capacity(10);
2846        while x >= 0x80 {
2847            out.push((x as u8) | 0x80);
2848            x >>= 7;
2849        }
2850        out.push(x as u8);
2851        out
2852    }
2853
2854    #[test]
2855    fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2856        let ws = Schema::Union(vec![
2857            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2858            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2859        ]);
2860        let rs = Schema::Union(vec![
2861            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2862            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2863        ]);
2864
2865        let dt = resolved_root_datatype(ws, rs, false, false);
2866        let mut dec = Decoder::try_new(&dt).unwrap();
2867
2868        let mut rec1 = encode_avro_long(0);
2869        rec1.extend(encode_avro_int(7));
2870        let mut cur1 = AvroCursor::new(&rec1);
2871        dec.decode(&mut cur1).unwrap();
2872
2873        let mut rec2 = encode_avro_long(1);
2874        rec2.extend(encode_avro_bytes("abc".as_bytes()));
2875        let mut cur2 = AvroCursor::new(&rec2);
2876        dec.decode(&mut cur2).unwrap();
2877
2878        let arr = dec.flush(None).unwrap();
2879        let ua = arr
2880            .as_any()
2881            .downcast_ref::<UnionArray>()
2882            .expect("dense union output");
2883
2884        assert_eq!(
2885            ua.type_id(0),
2886            1,
2887            "first value must select reader 'long' branch"
2888        );
2889        assert_eq!(ua.value_offset(0), 0);
2890
2891        assert_eq!(
2892            ua.type_id(1),
2893            0,
2894            "second value must select reader 'string' branch"
2895        );
2896        assert_eq!(ua.value_offset(1), 0);
2897
2898        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2899        assert_eq!(long_child.len(), 1);
2900        assert_eq!(long_child.value(0), 7);
2901
2902        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2903        assert_eq!(str_child.len(), 1);
2904        assert_eq!(str_child.value(0), "abc");
2905    }
2906
2907    #[test]
2908    fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2909        let ws = Schema::Union(vec![
2910            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2911            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2912        ]);
2913        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2914
2915        let dt = resolved_root_datatype(ws, rs, false, false);
2916        let mut dec = Decoder::try_new(&dt).unwrap();
2917
2918        let mut data = encode_avro_long(0);
2919        data.extend(encode_avro_int(5));
2920        let mut cur = AvroCursor::new(&data);
2921        dec.decode(&mut cur).unwrap();
2922
2923        let arr = dec.flush(None).unwrap();
2924        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2925        assert_eq!(out.len(), 1);
2926        assert_eq!(out.value(0), 5);
2927    }
2928
2929    #[test]
2930    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2931        let ws = Schema::Union(vec![
2932            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2933            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2934        ]);
2935        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2936
2937        let dt = resolved_root_datatype(ws, rs, false, false);
2938        let mut dec = Decoder::try_new(&dt).unwrap();
2939
2940        let mut data = encode_avro_long(1);
2941        data.extend(encode_avro_bytes("z".as_bytes()));
2942        let mut cur = AvroCursor::new(&data);
2943        let res = dec.decode(&mut cur);
2944        assert!(
2945            res.is_err(),
2946            "expected error when writer union branch does not resolve to reader non-union type"
2947        );
2948    }
2949
2950    #[test]
2951    fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2952        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2953        let rs = Schema::Union(vec![
2954            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2955            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2956        ]);
2957
2958        let dt = resolved_root_datatype(ws, rs, false, false);
2959        let mut dec = Decoder::try_new(&dt).unwrap();
2960
2961        let data = encode_avro_int(6);
2962        let mut cur = AvroCursor::new(&data);
2963        dec.decode(&mut cur).unwrap();
2964
2965        let arr = dec.flush(None).unwrap();
2966        let ua = arr
2967            .as_any()
2968            .downcast_ref::<UnionArray>()
2969            .expect("dense union output");
2970        assert_eq!(ua.len(), 1);
2971        assert_eq!(
2972            ua.type_id(0),
2973            1,
2974            "must resolve to reader 'long' branch (type_id 1)"
2975        );
2976        assert_eq!(ua.value_offset(0), 0);
2977
2978        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2979        assert_eq!(long_child.len(), 1);
2980        assert_eq!(long_child.value(0), 6);
2981
2982        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2983        assert_eq!(str_child.len(), 0, "string branch must be empty");
2984    }
2985
2986    #[test]
2987    fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2988        let ws = Schema::Union(vec![
2989            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2990            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2991        ]);
2992        let rs = Schema::Union(vec![
2993            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2994            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2995        ]);
2996
2997        let dt = resolved_root_datatype(ws, rs, false, false);
2998        let mut dec = Decoder::try_new(&dt).unwrap();
2999
3000        let mut data = encode_avro_long(1);
3001        data.push(1);
3002        let mut cur = AvroCursor::new(&data);
3003        let res = dec.decode(&mut cur);
3004        assert!(
3005            res.is_err(),
3006            "expected error for unmapped writer 'boolean' branch"
3007        );
3008    }
3009
3010    #[test]
3011    fn test_schema_resolution_promotion_int_to_long() {
3012        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
3013        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
3014        for v in [0, 1, -2, 123456] {
3015            let data = encode_avro_int(v);
3016            let mut cur = AvroCursor::new(&data);
3017            dec.decode(&mut cur).unwrap();
3018        }
3019        let arr = dec.flush(None).unwrap();
3020        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
3021        assert_eq!(a.value(0), 0);
3022        assert_eq!(a.value(1), 1);
3023        assert_eq!(a.value(2), -2);
3024        assert_eq!(a.value(3), 123456);
3025    }
3026
3027    #[test]
3028    fn test_schema_resolution_promotion_int_to_float() {
3029        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
3030        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
3031        for v in [0, 42, -7] {
3032            let data = encode_avro_int(v);
3033            let mut cur = AvroCursor::new(&data);
3034            dec.decode(&mut cur).unwrap();
3035        }
3036        let arr = dec.flush(None).unwrap();
3037        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3038        assert_eq!(a.value(0), 0.0);
3039        assert_eq!(a.value(1), 42.0);
3040        assert_eq!(a.value(2), -7.0);
3041    }
3042
3043    #[test]
3044    fn test_schema_resolution_promotion_int_to_double() {
3045        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
3046        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
3047        for v in [1, -1, 10_000] {
3048            let data = encode_avro_int(v);
3049            let mut cur = AvroCursor::new(&data);
3050            dec.decode(&mut cur).unwrap();
3051        }
3052        let arr = dec.flush(None).unwrap();
3053        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3054        assert_eq!(a.value(0), 1.0);
3055        assert_eq!(a.value(1), -1.0);
3056        assert_eq!(a.value(2), 10_000.0);
3057    }
3058
3059    #[test]
3060    fn test_schema_resolution_promotion_long_to_float() {
3061        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
3062        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
3063        for v in [0_i64, 1_000_000_i64, -123_i64] {
3064            let data = encode_avro_long(v);
3065            let mut cur = AvroCursor::new(&data);
3066            dec.decode(&mut cur).unwrap();
3067        }
3068        let arr = dec.flush(None).unwrap();
3069        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3070        assert_eq!(a.value(0), 0.0);
3071        assert_eq!(a.value(1), 1_000_000.0);
3072        assert_eq!(a.value(2), -123.0);
3073    }
3074
3075    #[test]
3076    fn test_schema_resolution_promotion_long_to_double() {
3077        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
3078        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
3079        for v in [2_i64, -2_i64, 9_223_372_i64] {
3080            let data = encode_avro_long(v);
3081            let mut cur = AvroCursor::new(&data);
3082            dec.decode(&mut cur).unwrap();
3083        }
3084        let arr = dec.flush(None).unwrap();
3085        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3086        assert_eq!(a.value(0), 2.0);
3087        assert_eq!(a.value(1), -2.0);
3088        assert_eq!(a.value(2), 9_223_372.0);
3089    }
3090
3091    #[test]
3092    fn test_schema_resolution_promotion_float_to_double() {
3093        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
3094        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
3095        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
3096            let data = v.to_le_bytes().to_vec();
3097            let mut cur = AvroCursor::new(&data);
3098            dec.decode(&mut cur).unwrap();
3099        }
3100        let arr = dec.flush(None).unwrap();
3101        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3102        assert_eq!(a.value(0), 0.5_f64);
3103        assert_eq!(a.value(1), -3.25_f64);
3104        assert_eq!(a.value(2), 1.0e6_f64);
3105    }
3106
3107    #[test]
3108    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
3109        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
3110        assert!(matches!(dec, Decoder::BytesToString(_, _)));
3111        for s in ["hello", "world", "héllo"] {
3112            let data = encode_avro_bytes(s.as_bytes());
3113            let mut cur = AvroCursor::new(&data);
3114            dec.decode(&mut cur).unwrap();
3115        }
3116        let arr = dec.flush(None).unwrap();
3117        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3118        assert_eq!(a.value(0), "hello");
3119        assert_eq!(a.value(1), "world");
3120        assert_eq!(a.value(2), "héllo");
3121    }
3122
3123    #[test]
3124    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
3125        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
3126        assert!(matches!(dec, Decoder::BytesToString(_, _)));
3127        let data = encode_avro_bytes("abc".as_bytes());
3128        let mut cur = AvroCursor::new(&data);
3129        dec.decode(&mut cur).unwrap();
3130        let arr = dec.flush(None).unwrap();
3131        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3132        assert_eq!(a.value(0), "abc");
3133    }
3134
3135    #[test]
3136    fn test_schema_resolution_promotion_string_to_bytes() {
3137        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
3138        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
3139        for s in ["", "abc", "data"] {
3140            let data = encode_avro_bytes(s.as_bytes());
3141            let mut cur = AvroCursor::new(&data);
3142            dec.decode(&mut cur).unwrap();
3143        }
3144        let arr = dec.flush(None).unwrap();
3145        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3146        assert_eq!(a.value(0), b"");
3147        assert_eq!(a.value(1), b"abc");
3148        assert_eq!(a.value(2), "data".as_bytes());
3149    }
3150
3151    #[test]
3152    fn test_schema_resolution_no_promotion_passthrough_int() {
3153        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3154        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3155        // Wrap both in a synthetic single-field record and resolve with AvroFieldBuilder
3156        let writer_record = Schema::Complex(ComplexType::Record(Record {
3157            name: "Root",
3158            namespace: None,
3159            doc: None,
3160            aliases: vec![],
3161            fields: vec![Field {
3162                name: "v",
3163                r#type: ws,
3164                default: None,
3165                doc: None,
3166                aliases: vec![],
3167            }],
3168            attributes: Attributes::default(),
3169        }));
3170        let reader_record = Schema::Complex(ComplexType::Record(Record {
3171            name: "Root",
3172            namespace: None,
3173            doc: None,
3174            aliases: vec![],
3175            fields: vec![Field {
3176                name: "v",
3177                r#type: rs,
3178                default: None,
3179                doc: None,
3180                aliases: vec![],
3181            }],
3182            attributes: Attributes::default(),
3183        }));
3184        let field = AvroFieldBuilder::new(&writer_record)
3185            .with_reader_schema(&reader_record)
3186            .with_utf8view(false)
3187            .with_strict_mode(false)
3188            .build()
3189            .unwrap();
3190        // Extract the resolved inner field's AvroDataType
3191        let dt = match field.data_type().codec() {
3192            Codec::Struct(fields) => fields[0].data_type().clone(),
3193            other => panic!("expected wrapper struct, got {other:?}"),
3194        };
3195        let mut dec = Decoder::try_new(&dt).unwrap();
3196        assert!(matches!(dec, Decoder::Int32(_)));
3197        for v in [7, -9] {
3198            let data = encode_avro_int(v);
3199            let mut cur = AvroCursor::new(&data);
3200            dec.decode(&mut cur).unwrap();
3201        }
3202        let arr = dec.flush(None).unwrap();
3203        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3204        assert_eq!(a.value(0), 7);
3205        assert_eq!(a.value(1), -9);
3206    }
3207
3208    #[test]
3209    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
3210        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3211        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
3212        let writer_record = Schema::Complex(ComplexType::Record(Record {
3213            name: "Root",
3214            namespace: None,
3215            doc: None,
3216            aliases: vec![],
3217            fields: vec![Field {
3218                name: "v",
3219                r#type: ws,
3220                default: None,
3221                doc: None,
3222                aliases: vec![],
3223            }],
3224            attributes: Attributes::default(),
3225        }));
3226        let reader_record = Schema::Complex(ComplexType::Record(Record {
3227            name: "Root",
3228            namespace: None,
3229            doc: None,
3230            aliases: vec![],
3231            fields: vec![Field {
3232                name: "v",
3233                r#type: rs,
3234                default: None,
3235                doc: None,
3236                aliases: vec![],
3237            }],
3238            attributes: Attributes::default(),
3239        }));
3240        let res = AvroFieldBuilder::new(&writer_record)
3241            .with_reader_schema(&reader_record)
3242            .with_utf8view(false)
3243            .with_strict_mode(false)
3244            .build();
3245        assert!(res.is_err(), "expected error for illegal promotion");
3246    }
3247
3248    #[test]
3249    fn test_map_decoding_one_entry() {
3250        let value_type = avro_from_codec(Codec::Utf8);
3251        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3252        let mut decoder = Decoder::try_new(&map_type).unwrap();
3253        // Encode a single map with one entry: {"hello": "world"}
3254        let mut data = Vec::new();
3255        data.extend_from_slice(&encode_avro_long(1));
3256        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
3257        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
3258        data.extend_from_slice(&encode_avro_long(0));
3259        let mut cursor = AvroCursor::new(&data);
3260        decoder.decode(&mut cursor).unwrap();
3261        let array = decoder.flush(None).unwrap();
3262        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3263        assert_eq!(map_arr.len(), 1); // one map
3264        assert_eq!(map_arr.value_length(0), 1);
3265        let entries = map_arr.value(0);
3266        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
3267        assert_eq!(struct_entries.len(), 1);
3268        let key_arr = struct_entries
3269            .column_by_name("key")
3270            .unwrap()
3271            .as_any()
3272            .downcast_ref::<StringArray>()
3273            .unwrap();
3274        let val_arr = struct_entries
3275            .column_by_name("value")
3276            .unwrap()
3277            .as_any()
3278            .downcast_ref::<StringArray>()
3279            .unwrap();
3280        assert_eq!(key_arr.value(0), "hello");
3281        assert_eq!(val_arr.value(0), "world");
3282    }
3283
3284    #[test]
3285    fn test_map_decoding_empty() {
3286        let value_type = avro_from_codec(Codec::Utf8);
3287        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3288        let mut decoder = Decoder::try_new(&map_type).unwrap();
3289        let data = encode_avro_long(0);
3290        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3291        let array = decoder.flush(None).unwrap();
3292        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3293        assert_eq!(map_arr.len(), 1);
3294        assert_eq!(map_arr.value_length(0), 0);
3295    }
3296
3297    #[test]
3298    fn test_fixed_decoding() {
3299        let avro_type = avro_from_codec(Codec::Fixed(3));
3300        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3301
3302        let data1 = [1u8, 2, 3];
3303        let mut cursor1 = AvroCursor::new(&data1);
3304        decoder
3305            .decode(&mut cursor1)
3306            .expect("Failed to decode data1");
3307        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
3308        let data2 = [4u8, 5, 6];
3309        let mut cursor2 = AvroCursor::new(&data2);
3310        decoder
3311            .decode(&mut cursor2)
3312            .expect("Failed to decode data2");
3313        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
3314        let array = decoder.flush(None).expect("Failed to flush decoder");
3315        assert_eq!(array.len(), 2, "Array should contain two items");
3316        let fixed_size_binary_array = array
3317            .as_any()
3318            .downcast_ref::<FixedSizeBinaryArray>()
3319            .expect("Failed to downcast to FixedSizeBinaryArray");
3320        assert_eq!(
3321            fixed_size_binary_array.value_length(),
3322            3,
3323            "Fixed size of binary values should be 3"
3324        );
3325        assert_eq!(
3326            fixed_size_binary_array.value(0),
3327            &[1, 2, 3],
3328            "First item mismatch"
3329        );
3330        assert_eq!(
3331            fixed_size_binary_array.value(1),
3332            &[4, 5, 6],
3333            "Second item mismatch"
3334        );
3335    }
3336
3337    #[test]
3338    fn test_fixed_decoding_empty() {
3339        let avro_type = avro_from_codec(Codec::Fixed(5));
3340        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3341
3342        let array = decoder
3343            .flush(None)
3344            .expect("Failed to flush decoder for empty input");
3345
3346        assert_eq!(array.len(), 0, "Array should be empty");
3347        let fixed_size_binary_array = array
3348            .as_any()
3349            .downcast_ref::<FixedSizeBinaryArray>()
3350            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
3351
3352        assert_eq!(
3353            fixed_size_binary_array.value_length(),
3354            5,
3355            "Fixed size of binary values should be 5 as per type"
3356        );
3357    }
3358
3359    #[test]
3360    fn test_uuid_decoding() {
3361        let avro_type = avro_from_codec(Codec::Uuid);
3362        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3363        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
3364        let data = encode_avro_bytes(uuid_str.as_bytes());
3365        let mut cursor = AvroCursor::new(&data);
3366        decoder.decode(&mut cursor).expect("Failed to decode data");
3367        assert_eq!(
3368            cursor.position(),
3369            data.len(),
3370            "Cursor should advance by varint size + data size"
3371        );
3372        let array = decoder.flush(None).expect("Failed to flush decoder");
3373        let fixed_size_binary_array = array
3374            .as_any()
3375            .downcast_ref::<FixedSizeBinaryArray>()
3376            .expect("Array should be a FixedSizeBinaryArray");
3377        assert_eq!(fixed_size_binary_array.len(), 1);
3378        assert_eq!(fixed_size_binary_array.value_length(), 16);
3379        let expected_bytes = [
3380            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
3381            0x6b, 0xf6,
3382        ];
3383        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
3384    }
3385
3386    #[test]
3387    fn test_array_decoding() {
3388        let item_dt = avro_from_codec(Codec::Int32);
3389        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3390        let mut decoder = Decoder::try_new(&list_dt).unwrap();
3391        let mut row1 = Vec::new();
3392        row1.extend_from_slice(&encode_avro_long(2));
3393        row1.extend_from_slice(&encode_avro_int(10));
3394        row1.extend_from_slice(&encode_avro_int(20));
3395        row1.extend_from_slice(&encode_avro_long(0));
3396        let row2 = encode_avro_long(0);
3397        let mut cursor = AvroCursor::new(&row1);
3398        decoder.decode(&mut cursor).unwrap();
3399        let mut cursor2 = AvroCursor::new(&row2);
3400        decoder.decode(&mut cursor2).unwrap();
3401        let array = decoder.flush(None).unwrap();
3402        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3403        assert_eq!(list_arr.len(), 2);
3404        let offsets = list_arr.value_offsets();
3405        assert_eq!(offsets, &[0, 2, 2]);
3406        let values = list_arr.values();
3407        let int_arr = values.as_primitive::<Int32Type>();
3408        assert_eq!(int_arr.len(), 2);
3409        assert_eq!(int_arr.value(0), 10);
3410        assert_eq!(int_arr.value(1), 20);
3411    }
3412
3413    #[test]
3414    fn test_array_decoding_with_negative_block_count() {
3415        let item_dt = avro_from_codec(Codec::Int32);
3416        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3417        let mut decoder = Decoder::try_new(&list_dt).unwrap();
3418        let mut data = encode_avro_long(-3);
3419        data.extend_from_slice(&encode_avro_long(12));
3420        data.extend_from_slice(&encode_avro_int(1));
3421        data.extend_from_slice(&encode_avro_int(2));
3422        data.extend_from_slice(&encode_avro_int(3));
3423        data.extend_from_slice(&encode_avro_long(0));
3424        let mut cursor = AvroCursor::new(&data);
3425        decoder.decode(&mut cursor).unwrap();
3426        let array = decoder.flush(None).unwrap();
3427        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3428        assert_eq!(list_arr.len(), 1);
3429        assert_eq!(list_arr.value_length(0), 3);
3430        let values = list_arr.values().as_primitive::<Int32Type>();
3431        assert_eq!(values.len(), 3);
3432        assert_eq!(values.value(0), 1);
3433        assert_eq!(values.value(1), 2);
3434        assert_eq!(values.value(2), 3);
3435    }
3436
3437    #[test]
3438    fn test_nested_array_decoding() {
3439        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3440        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
3441        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
3442        let mut buf = Vec::new();
3443        buf.extend(encode_avro_long(1));
3444        buf.extend(encode_avro_long(2));
3445        buf.extend(encode_avro_int(5));
3446        buf.extend(encode_avro_int(6));
3447        buf.extend(encode_avro_long(0));
3448        buf.extend(encode_avro_long(0));
3449        let mut cursor = AvroCursor::new(&buf);
3450        decoder.decode(&mut cursor).unwrap();
3451        let arr = decoder.flush(None).unwrap();
3452        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
3453        assert_eq!(outer.len(), 1);
3454        assert_eq!(outer.value_length(0), 1);
3455        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
3456        assert_eq!(inner.len(), 1);
3457        assert_eq!(inner.value_length(0), 2);
3458        let values = inner
3459            .values()
3460            .as_any()
3461            .downcast_ref::<Int32Array>()
3462            .unwrap();
3463        assert_eq!(values.values(), &[5, 6]);
3464    }
3465
3466    #[test]
3467    fn test_array_decoding_empty_array() {
3468        let value_type = avro_from_codec(Codec::Utf8);
3469        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
3470        let mut decoder = Decoder::try_new(&map_type).unwrap();
3471        let data = encode_avro_long(0);
3472        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3473        let array = decoder.flush(None).unwrap();
3474        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3475        assert_eq!(list_arr.len(), 1);
3476        assert_eq!(list_arr.value_length(0), 0);
3477    }
3478
3479    #[test]
3480    fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
3481        use crate::schema::Array;
3482        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3483            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3484            attributes: Attributes::default(),
3485        }));
3486        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3487            items: Box::new(Schema::Union(vec![
3488                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3489                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3490            ])),
3491            attributes: Attributes::default(),
3492        }));
3493        let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
3494        if let Codec::List(inner) = dt.codec() {
3495            assert_eq!(
3496                inner.nullability(),
3497                Some(Nullability::NullFirst),
3498                "items should be nullable"
3499            );
3500        } else {
3501            panic!("expected List codec");
3502        }
3503        let mut decoder = Decoder::try_new(&dt).unwrap();
3504        let mut data = encode_avro_long(2);
3505        data.extend(encode_avro_int(10));
3506        data.extend(encode_avro_int(20));
3507        data.extend(encode_avro_long(0));
3508        let mut cursor = AvroCursor::new(&data);
3509        decoder.decode(&mut cursor).unwrap();
3510        assert_eq!(
3511            cursor.position(),
3512            data.len(),
3513            "all bytes should be consumed"
3514        );
3515        let array = decoder.flush(None).unwrap();
3516        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3517        assert_eq!(list_arr.len(), 1, "one list/row");
3518        assert_eq!(list_arr.value_length(0), 2, "two items in the list");
3519        let values = list_arr.values().as_primitive::<Int32Type>();
3520        assert_eq!(values.len(), 2);
3521        assert_eq!(values.value(0), 10);
3522        assert_eq!(values.value(1), 20);
3523        assert!(!values.is_null(0));
3524        assert!(!values.is_null(1));
3525    }
3526
3527    #[test]
3528    fn test_decimal_decoding_fixed256() {
3529        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3530        let mut decoder = Decoder::try_new(&dt).unwrap();
3531        let row1 = [
3532            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3533            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3534            0x00, 0x00, 0x30, 0x39,
3535        ];
3536        let row2 = [
3537            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3538            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3539            0xFF, 0xFF, 0xFF, 0x85,
3540        ];
3541        let mut data = Vec::new();
3542        data.extend_from_slice(&row1);
3543        data.extend_from_slice(&row2);
3544        let mut cursor = AvroCursor::new(&data);
3545        decoder.decode(&mut cursor).unwrap();
3546        decoder.decode(&mut cursor).unwrap();
3547        let arr = decoder.flush(None).unwrap();
3548        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3549        assert_eq!(dec.len(), 2);
3550        assert_eq!(dec.value_as_string(0), "123.45");
3551        assert_eq!(dec.value_as_string(1), "-1.23");
3552    }
3553
3554    #[test]
3555    fn test_decimal_decoding_fixed128() {
3556        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
3557        let mut decoder = Decoder::try_new(&dt).unwrap();
3558        let row1 = [
3559            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3560            0x30, 0x39,
3561        ];
3562        let row2 = [
3563            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3564            0xFF, 0x85,
3565        ];
3566        let mut data = Vec::new();
3567        data.extend_from_slice(&row1);
3568        data.extend_from_slice(&row2);
3569        let mut cursor = AvroCursor::new(&data);
3570        decoder.decode(&mut cursor).unwrap();
3571        decoder.decode(&mut cursor).unwrap();
3572        let arr = decoder.flush(None).unwrap();
3573        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3574        assert_eq!(dec.len(), 2);
3575        assert_eq!(dec.value_as_string(0), "123.45");
3576        assert_eq!(dec.value_as_string(1), "-1.23");
3577    }
3578
3579    #[test]
3580    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3581        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3582        let mut decoder = Decoder::try_new(&dt).unwrap();
3583        let row1 = [
3584            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3585            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3586            0x00, 0x00, 0x30, 0x39,
3587        ];
3588        let row2 = [
3589            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3590            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3591            0xFF, 0xFF, 0xFF, 0x85,
3592        ];
3593        let mut data = Vec::new();
3594        data.extend_from_slice(&row1);
3595        data.extend_from_slice(&row2);
3596        let mut cursor = AvroCursor::new(&data);
3597        decoder.decode(&mut cursor).unwrap();
3598        decoder.decode(&mut cursor).unwrap();
3599        let arr = decoder.flush(None).unwrap();
3600        #[cfg(feature = "small_decimals")]
3601        {
3602            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3603            assert_eq!(dec.len(), 2);
3604            assert_eq!(dec.value_as_string(0), "123.45");
3605            assert_eq!(dec.value_as_string(1), "-1.23");
3606        }
3607        #[cfg(not(feature = "small_decimals"))]
3608        {
3609            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3610            assert_eq!(dec.len(), 2);
3611            assert_eq!(dec.value_as_string(0), "123.45");
3612            assert_eq!(dec.value_as_string(1), "-1.23");
3613        }
3614    }
3615
3616    #[test]
3617    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3618        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3619        let mut decoder = Decoder::try_new(&dt).unwrap();
3620        let row1 = [
3621            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3622            0x30, 0x39,
3623        ];
3624        let row2 = [
3625            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3626            0xFF, 0x85,
3627        ];
3628        let mut data = Vec::new();
3629        data.extend_from_slice(&row1);
3630        data.extend_from_slice(&row2);
3631        let mut cursor = AvroCursor::new(&data);
3632        decoder.decode(&mut cursor).unwrap();
3633        decoder.decode(&mut cursor).unwrap();
3634
3635        let arr = decoder.flush(None).unwrap();
3636        #[cfg(feature = "small_decimals")]
3637        {
3638            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3639            assert_eq!(dec.len(), 2);
3640            assert_eq!(dec.value_as_string(0), "123.45");
3641            assert_eq!(dec.value_as_string(1), "-1.23");
3642        }
3643        #[cfg(not(feature = "small_decimals"))]
3644        {
3645            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3646            assert_eq!(dec.len(), 2);
3647            assert_eq!(dec.value_as_string(0), "123.45");
3648            assert_eq!(dec.value_as_string(1), "-1.23");
3649        }
3650    }
3651
3652    #[test]
3653    fn test_decimal_decoding_bytes_with_nulls() {
3654        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3655        let inner = Decoder::try_new(&dt).unwrap();
3656        let mut decoder = Decoder::Nullable(
3657            NullablePlan::ReadTag {
3658                nullability: Nullability::NullSecond,
3659                resolution: ResolutionPlan::Promotion(Promotion::Direct),
3660            },
3661            NullBufferBuilder::new(DEFAULT_CAPACITY),
3662            Box::new(inner),
3663        );
3664        let mut data = Vec::new();
3665        data.extend_from_slice(&encode_avro_int(0));
3666        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3667        data.extend_from_slice(&encode_avro_int(1));
3668        data.extend_from_slice(&encode_avro_int(0));
3669        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3670        let mut cursor = AvroCursor::new(&data);
3671        decoder.decode(&mut cursor).unwrap();
3672        decoder.decode(&mut cursor).unwrap();
3673        decoder.decode(&mut cursor).unwrap();
3674        let arr = decoder.flush(None).unwrap();
3675        #[cfg(feature = "small_decimals")]
3676        {
3677            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3678            assert_eq!(dec_arr.len(), 3);
3679            assert!(dec_arr.is_valid(0));
3680            assert!(!dec_arr.is_valid(1));
3681            assert!(dec_arr.is_valid(2));
3682            assert_eq!(dec_arr.value_as_string(0), "123.4");
3683            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3684        }
3685        #[cfg(not(feature = "small_decimals"))]
3686        {
3687            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3688            assert_eq!(dec_arr.len(), 3);
3689            assert!(dec_arr.is_valid(0));
3690            assert!(!dec_arr.is_valid(1));
3691            assert!(dec_arr.is_valid(2));
3692            assert_eq!(dec_arr.value_as_string(0), "123.4");
3693            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3694        }
3695    }
3696
3697    #[test]
3698    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3699        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3700        let inner = Decoder::try_new(&dt).unwrap();
3701        let mut decoder = Decoder::Nullable(
3702            NullablePlan::ReadTag {
3703                nullability: Nullability::NullSecond,
3704                resolution: ResolutionPlan::Promotion(Promotion::Direct),
3705            },
3706            NullBufferBuilder::new(DEFAULT_CAPACITY),
3707            Box::new(inner),
3708        );
3709        let row1 = [
3710            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3711            0xE2, 0x40,
3712        ];
3713        let row3 = [
3714            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3715            0x1D, 0xC0,
3716        ];
3717        let mut data = Vec::new();
3718        data.extend_from_slice(&encode_avro_int(0));
3719        data.extend_from_slice(&row1);
3720        data.extend_from_slice(&encode_avro_int(1));
3721        data.extend_from_slice(&encode_avro_int(0));
3722        data.extend_from_slice(&row3);
3723        let mut cursor = AvroCursor::new(&data);
3724        decoder.decode(&mut cursor).unwrap();
3725        decoder.decode(&mut cursor).unwrap();
3726        decoder.decode(&mut cursor).unwrap();
3727        let arr = decoder.flush(None).unwrap();
3728        #[cfg(feature = "small_decimals")]
3729        {
3730            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3731            assert_eq!(dec_arr.len(), 3);
3732            assert!(dec_arr.is_valid(0));
3733            assert!(!dec_arr.is_valid(1));
3734            assert!(dec_arr.is_valid(2));
3735            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3736            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3737        }
3738        #[cfg(not(feature = "small_decimals"))]
3739        {
3740            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3741            assert_eq!(dec_arr.len(), 3);
3742            assert!(dec_arr.is_valid(0));
3743            assert!(!dec_arr.is_valid(1));
3744            assert!(dec_arr.is_valid(2));
3745            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3746            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3747        }
3748    }
3749
3750    #[test]
3751    fn test_enum_decoding() {
3752        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3753        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3754        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3755        let mut data = Vec::new();
3756        data.extend_from_slice(&encode_avro_int(2));
3757        data.extend_from_slice(&encode_avro_int(0));
3758        data.extend_from_slice(&encode_avro_int(1));
3759        let mut cursor = AvroCursor::new(&data);
3760        decoder.decode(&mut cursor).unwrap();
3761        decoder.decode(&mut cursor).unwrap();
3762        decoder.decode(&mut cursor).unwrap();
3763        let array = decoder.flush(None).unwrap();
3764        let dict_array = array
3765            .as_any()
3766            .downcast_ref::<DictionaryArray<Int32Type>>()
3767            .unwrap();
3768        assert_eq!(dict_array.len(), 3);
3769        let values = dict_array
3770            .values()
3771            .as_any()
3772            .downcast_ref::<StringArray>()
3773            .unwrap();
3774        assert_eq!(values.value(0), "A");
3775        assert_eq!(values.value(1), "B");
3776        assert_eq!(values.value(2), "C");
3777        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3778    }
3779
3780    #[test]
3781    fn test_enum_decoding_with_nulls() {
3782        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3783        let enum_codec = Codec::Enum(symbols.clone());
3784        let avro_type =
3785            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3786        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3787        let mut data = Vec::new();
3788        data.extend_from_slice(&encode_avro_long(1));
3789        data.extend_from_slice(&encode_avro_int(1));
3790        data.extend_from_slice(&encode_avro_long(0));
3791        data.extend_from_slice(&encode_avro_long(1));
3792        data.extend_from_slice(&encode_avro_int(0));
3793        let mut cursor = AvroCursor::new(&data);
3794        decoder.decode(&mut cursor).unwrap();
3795        decoder.decode(&mut cursor).unwrap();
3796        decoder.decode(&mut cursor).unwrap();
3797        let array = decoder.flush(None).unwrap();
3798        let dict_array = array
3799            .as_any()
3800            .downcast_ref::<DictionaryArray<Int32Type>>()
3801            .unwrap();
3802        assert_eq!(dict_array.len(), 3);
3803        assert!(dict_array.is_valid(0));
3804        assert!(dict_array.is_null(1));
3805        assert!(dict_array.is_valid(2));
3806        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3807        assert_eq!(dict_array.keys(), &expected_keys);
3808        let values = dict_array
3809            .values()
3810            .as_any()
3811            .downcast_ref::<StringArray>()
3812            .unwrap();
3813        assert_eq!(values.value(0), "X");
3814        assert_eq!(values.value(1), "Y");
3815    }
3816
3817    #[test]
3818    fn test_duration_decoding_with_nulls() {
3819        let duration_codec = Codec::Interval;
3820        let avro_type = AvroDataType::new(
3821            duration_codec,
3822            Default::default(),
3823            Some(Nullability::NullFirst),
3824        );
3825        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3826        let mut data = Vec::new();
3827        // First value: 1 month, 2 days, 3 millis
3828        data.extend_from_slice(&encode_avro_long(1)); // not null
3829        let mut duration1 = Vec::new();
3830        duration1.extend_from_slice(&1u32.to_le_bytes());
3831        duration1.extend_from_slice(&2u32.to_le_bytes());
3832        duration1.extend_from_slice(&3u32.to_le_bytes());
3833        data.extend_from_slice(&duration1);
3834        // Second value: null
3835        data.extend_from_slice(&encode_avro_long(0)); // null
3836        data.extend_from_slice(&encode_avro_long(1)); // not null
3837        let mut duration2 = Vec::new();
3838        duration2.extend_from_slice(&4u32.to_le_bytes());
3839        duration2.extend_from_slice(&5u32.to_le_bytes());
3840        duration2.extend_from_slice(&6u32.to_le_bytes());
3841        data.extend_from_slice(&duration2);
3842        let mut cursor = AvroCursor::new(&data);
3843        decoder.decode(&mut cursor).unwrap();
3844        decoder.decode(&mut cursor).unwrap();
3845        decoder.decode(&mut cursor).unwrap();
3846        let array = decoder.flush(None).unwrap();
3847        let interval_array = array
3848            .as_any()
3849            .downcast_ref::<IntervalMonthDayNanoArray>()
3850            .unwrap();
3851        assert_eq!(interval_array.len(), 3);
3852        assert!(interval_array.is_valid(0));
3853        assert!(interval_array.is_null(1));
3854        assert!(interval_array.is_valid(2));
3855        let expected = IntervalMonthDayNanoArray::from(vec![
3856            Some(IntervalMonthDayNano {
3857                months: 1,
3858                days: 2,
3859                nanoseconds: 3_000_000,
3860            }),
3861            None,
3862            Some(IntervalMonthDayNano {
3863                months: 4,
3864                days: 5,
3865                nanoseconds: 6_000_000,
3866            }),
3867        ]);
3868        assert_eq!(interval_array, &expected);
3869    }
3870
3871    #[cfg(feature = "avro_custom_types")]
3872    #[test]
3873    fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3874        let avro_type = AvroDataType::new(
3875            Codec::IntervalMonthDayNano,
3876            Default::default(),
3877            Some(Nullability::NullFirst),
3878        );
3879        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3880        let mut data = Vec::new();
3881        // First value: months=1, days=-2, nanos=3
3882        data.extend_from_slice(&encode_avro_long(1));
3883        data.extend_from_slice(&1i32.to_le_bytes());
3884        data.extend_from_slice(&(-2i32).to_le_bytes());
3885        data.extend_from_slice(&3i64.to_le_bytes());
3886        // Second value: null
3887        data.extend_from_slice(&encode_avro_long(0));
3888        // Third value: months=-4, days=5, nanos=-6
3889        data.extend_from_slice(&encode_avro_long(1));
3890        data.extend_from_slice(&(-4i32).to_le_bytes());
3891        data.extend_from_slice(&5i32.to_le_bytes());
3892        data.extend_from_slice(&(-6i64).to_le_bytes());
3893        let mut cursor = AvroCursor::new(&data);
3894        decoder.decode(&mut cursor).unwrap();
3895        decoder.decode(&mut cursor).unwrap();
3896        decoder.decode(&mut cursor).unwrap();
3897        let array = decoder.flush(None).unwrap();
3898        let interval_array = array
3899            .as_any()
3900            .downcast_ref::<IntervalMonthDayNanoArray>()
3901            .unwrap();
3902        assert_eq!(interval_array.len(), 3);
3903        let expected = IntervalMonthDayNanoArray::from(vec![
3904            Some(IntervalMonthDayNano::new(1, -2, 3)),
3905            None,
3906            Some(IntervalMonthDayNano::new(-4, 5, -6)),
3907        ]);
3908        assert_eq!(interval_array, &expected);
3909    }
3910
3911    #[test]
3912    fn test_duration_decoding_empty() {
3913        let duration_codec = Codec::Interval;
3914        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3915        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3916        let array = decoder.flush(None).unwrap();
3917        assert_eq!(array.len(), 0);
3918    }
3919
3920    #[test]
3921    #[cfg(feature = "avro_custom_types")]
3922    fn test_duration_seconds_decoding() {
3923        let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3924        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3925        let mut data = Vec::new();
3926        // Three values: 0, -1, 2
3927        data.extend_from_slice(&encode_avro_long(0));
3928        data.extend_from_slice(&encode_avro_long(-1));
3929        data.extend_from_slice(&encode_avro_long(2));
3930        let mut cursor = AvroCursor::new(&data);
3931        decoder.decode(&mut cursor).unwrap();
3932        decoder.decode(&mut cursor).unwrap();
3933        decoder.decode(&mut cursor).unwrap();
3934        let array = decoder.flush(None).unwrap();
3935        let dur = array
3936            .as_any()
3937            .downcast_ref::<DurationSecondArray>()
3938            .unwrap();
3939        assert_eq!(dur.values(), &[0, -1, 2]);
3940    }
3941
3942    #[test]
3943    #[cfg(feature = "avro_custom_types")]
3944    fn test_duration_milliseconds_decoding() {
3945        let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3946        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3947        let mut data = Vec::new();
3948        for v in [1i64, 0, -2] {
3949            data.extend_from_slice(&encode_avro_long(v));
3950        }
3951        let mut cursor = AvroCursor::new(&data);
3952        for _ in 0..3 {
3953            decoder.decode(&mut cursor).unwrap();
3954        }
3955        let array = decoder.flush(None).unwrap();
3956        let dur = array
3957            .as_any()
3958            .downcast_ref::<DurationMillisecondArray>()
3959            .unwrap();
3960        assert_eq!(dur.values(), &[1, 0, -2]);
3961    }
3962
3963    #[test]
3964    #[cfg(feature = "avro_custom_types")]
3965    fn test_duration_microseconds_decoding() {
3966        let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3967        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3968        let mut data = Vec::new();
3969        for v in [5i64, -6, 7] {
3970            data.extend_from_slice(&encode_avro_long(v));
3971        }
3972        let mut cursor = AvroCursor::new(&data);
3973        for _ in 0..3 {
3974            decoder.decode(&mut cursor).unwrap();
3975        }
3976        let array = decoder.flush(None).unwrap();
3977        let dur = array
3978            .as_any()
3979            .downcast_ref::<DurationMicrosecondArray>()
3980            .unwrap();
3981        assert_eq!(dur.values(), &[5, -6, 7]);
3982    }
3983
3984    #[test]
3985    #[cfg(feature = "avro_custom_types")]
3986    fn test_duration_nanoseconds_decoding() {
3987        let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3988        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3989        let mut data = Vec::new();
3990        for v in [8i64, 9, -10] {
3991            data.extend_from_slice(&encode_avro_long(v));
3992        }
3993        let mut cursor = AvroCursor::new(&data);
3994        for _ in 0..3 {
3995            decoder.decode(&mut cursor).unwrap();
3996        }
3997        let array = decoder.flush(None).unwrap();
3998        let dur = array
3999            .as_any()
4000            .downcast_ref::<DurationNanosecondArray>()
4001            .unwrap();
4002        assert_eq!(dur.values(), &[8, 9, -10]);
4003    }
4004
4005    #[test]
4006    fn test_nullable_decode_error_bitmap_corruption() {
4007        // Nullable Int32 with ['T','null'] encoding (NullSecond)
4008        let avro_type = AvroDataType::new(
4009            Codec::Int32,
4010            Default::default(),
4011            Some(Nullability::NullSecond),
4012        );
4013        let mut decoder = Decoder::try_new(&avro_type).unwrap();
4014
4015        // Row 1: union branch 1 (null)
4016        let mut row1 = Vec::new();
4017        row1.extend_from_slice(&encode_avro_int(1));
4018
4019        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
4020        let mut row2 = Vec::new();
4021        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
4022
4023        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
4024        let mut row3 = Vec::new();
4025        row3.extend_from_slice(&encode_avro_int(0)); // branch
4026        row3.extend_from_slice(&encode_avro_int(42)); // actual value
4027
4028        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
4029        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
4030        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
4031
4032        let array = decoder.flush(None).unwrap();
4033
4034        // Should contain 2 elements: row1 (null) and row3 (42)
4035        assert_eq!(array.len(), 2);
4036        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
4037        assert!(int_array.is_null(0)); // row1 is null
4038        assert_eq!(int_array.value(1), 42); // row3 value is 42
4039    }
4040
4041    #[test]
4042    fn test_enum_mapping_reordered_symbols() {
4043        let reader_symbols: Arc<[String]> =
4044            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
4045        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
4046        let default_index: i32 = -1;
4047        let mut dec = Decoder::Enum(
4048            Vec::with_capacity(DEFAULT_CAPACITY),
4049            reader_symbols.clone(),
4050            Some(EnumResolution {
4051                mapping,
4052                default_index,
4053            }),
4054        );
4055        let mut data = Vec::new();
4056        data.extend_from_slice(&encode_avro_int(0));
4057        data.extend_from_slice(&encode_avro_int(1));
4058        data.extend_from_slice(&encode_avro_int(2));
4059        let mut cur = AvroCursor::new(&data);
4060        dec.decode(&mut cur).unwrap();
4061        dec.decode(&mut cur).unwrap();
4062        dec.decode(&mut cur).unwrap();
4063        let arr = dec.flush(None).unwrap();
4064        let dict = arr
4065            .as_any()
4066            .downcast_ref::<DictionaryArray<Int32Type>>()
4067            .unwrap();
4068        let expected_keys = Int32Array::from(vec![2, 0, 1]);
4069        assert_eq!(dict.keys(), &expected_keys);
4070        let values = dict
4071            .values()
4072            .as_any()
4073            .downcast_ref::<StringArray>()
4074            .unwrap();
4075        assert_eq!(values.value(0), "B");
4076        assert_eq!(values.value(1), "C");
4077        assert_eq!(values.value(2), "A");
4078    }
4079
4080    #[test]
4081    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
4082        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
4083        let default_index: i32 = 1;
4084        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
4085        let mut dec = Decoder::Enum(
4086            Vec::with_capacity(DEFAULT_CAPACITY),
4087            reader_symbols.clone(),
4088            Some(EnumResolution {
4089                mapping,
4090                default_index,
4091            }),
4092        );
4093        let mut data = Vec::new();
4094        data.extend_from_slice(&encode_avro_int(0));
4095        data.extend_from_slice(&encode_avro_int(1));
4096        data.extend_from_slice(&encode_avro_int(99));
4097        let mut cur = AvroCursor::new(&data);
4098        dec.decode(&mut cur).unwrap();
4099        dec.decode(&mut cur).unwrap();
4100        dec.decode(&mut cur).unwrap();
4101        let arr = dec.flush(None).unwrap();
4102        let dict = arr
4103            .as_any()
4104            .downcast_ref::<DictionaryArray<Int32Type>>()
4105            .unwrap();
4106        let expected_keys = Int32Array::from(vec![0, 1, 1]);
4107        assert_eq!(dict.keys(), &expected_keys);
4108        let values = dict
4109            .values()
4110            .as_any()
4111            .downcast_ref::<StringArray>()
4112            .unwrap();
4113        assert_eq!(values.value(0), "A");
4114        assert_eq!(values.value(1), "B");
4115    }
4116
4117    #[test]
4118    fn test_enum_mapping_unknown_symbol_without_default_errors() {
4119        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
4120        let default_index: i32 = -1; // indicates no default at type-level
4121        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
4122        let mut dec = Decoder::Enum(
4123            Vec::with_capacity(DEFAULT_CAPACITY),
4124            reader_symbols,
4125            Some(EnumResolution {
4126                mapping,
4127                default_index,
4128            }),
4129        );
4130        let data = encode_avro_int(0);
4131        let mut cur = AvroCursor::new(&data);
4132        let err = dec
4133            .decode(&mut cur)
4134            .expect_err("expected decode error for unresolved enum without default");
4135        let msg = err.to_string();
4136        assert!(
4137            msg.contains("not resolvable") && msg.contains("no default"),
4138            "unexpected error message: {msg}"
4139        );
4140    }
4141
4142    fn make_record_resolved_decoder(
4143        reader_fields: &[(&str, DataType, bool)],
4144        writer_projections: Vec<FieldProjection>,
4145    ) -> Decoder {
4146        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4147        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4148        for (name, dt, nullable) in reader_fields {
4149            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4150            let enc = match dt {
4151                DataType::Int32 => Decoder::Int32(Vec::new()),
4152                DataType::Int64 => Decoder::Int64(Vec::new()),
4153                DataType::Utf8 => {
4154                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
4155                }
4156                other => panic!("Unsupported test reader field type: {other:?}"),
4157            };
4158            encodings.push(enc);
4159        }
4160        let fields: Fields = field_refs.into();
4161        Decoder::Record(
4162            fields,
4163            encodings,
4164            vec![None; reader_fields.len()],
4165            Some(Projector {
4166                writer_projections,
4167                default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4168            }),
4169        )
4170    }
4171
4172    #[test]
4173    fn test_skip_writer_trailing_field_int32() {
4174        let mut dec = make_record_resolved_decoder(
4175            &[("id", arrow_schema::DataType::Int32, false)],
4176            vec![
4177                FieldProjection::ToReader(0),
4178                FieldProjection::Skip(super::Skipper::Int32),
4179            ],
4180        );
4181        let mut data = Vec::new();
4182        data.extend_from_slice(&encode_avro_int(7));
4183        data.extend_from_slice(&encode_avro_int(999));
4184        let mut cur = AvroCursor::new(&data);
4185        dec.decode(&mut cur).unwrap();
4186        assert_eq!(cur.position(), data.len());
4187        let arr = dec.flush(None).unwrap();
4188        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
4189        assert_eq!(struct_arr.len(), 1);
4190        let id = struct_arr
4191            .column_by_name("id")
4192            .unwrap()
4193            .as_any()
4194            .downcast_ref::<Int32Array>()
4195            .unwrap();
4196        assert_eq!(id.value(0), 7);
4197    }
4198
4199    #[test]
4200    fn test_skip_writer_middle_field_string() {
4201        let mut dec = make_record_resolved_decoder(
4202            &[
4203                ("id", DataType::Int32, false),
4204                ("score", DataType::Int64, false),
4205            ],
4206            vec![
4207                FieldProjection::ToReader(0),
4208                FieldProjection::Skip(Skipper::String),
4209                FieldProjection::ToReader(1),
4210            ],
4211        );
4212        let mut data = Vec::new();
4213        data.extend_from_slice(&encode_avro_int(42));
4214        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
4215        data.extend_from_slice(&encode_avro_long(1000));
4216        let mut cur = AvroCursor::new(&data);
4217        dec.decode(&mut cur).unwrap();
4218        assert_eq!(cur.position(), data.len());
4219        let arr = dec.flush(None).unwrap();
4220        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4221        let id = s
4222            .column_by_name("id")
4223            .unwrap()
4224            .as_any()
4225            .downcast_ref::<Int32Array>()
4226            .unwrap();
4227        let score = s
4228            .column_by_name("score")
4229            .unwrap()
4230            .as_any()
4231            .downcast_ref::<Int64Array>()
4232            .unwrap();
4233        assert_eq!(id.value(0), 42);
4234        assert_eq!(score.value(0), 1000);
4235    }
4236
4237    #[test]
4238    fn test_skip_writer_array_with_negative_block_count_fast() {
4239        let mut dec = make_record_resolved_decoder(
4240            &[("id", DataType::Int32, false)],
4241            vec![
4242                FieldProjection::Skip(super::Skipper::List(Box::new(Skipper::Int32))),
4243                FieldProjection::ToReader(0),
4244            ],
4245        );
4246        let mut array_payload = Vec::new();
4247        array_payload.extend_from_slice(&encode_avro_int(1));
4248        array_payload.extend_from_slice(&encode_avro_int(2));
4249        array_payload.extend_from_slice(&encode_avro_int(3));
4250        let mut data = Vec::new();
4251        data.extend_from_slice(&encode_avro_long(-3));
4252        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
4253        data.extend_from_slice(&array_payload);
4254        data.extend_from_slice(&encode_avro_long(0));
4255        data.extend_from_slice(&encode_avro_int(5));
4256        let mut cur = AvroCursor::new(&data);
4257        dec.decode(&mut cur).unwrap();
4258        assert_eq!(cur.position(), data.len());
4259        let arr = dec.flush(None).unwrap();
4260        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4261        let id = s
4262            .column_by_name("id")
4263            .unwrap()
4264            .as_any()
4265            .downcast_ref::<Int32Array>()
4266            .unwrap();
4267        assert_eq!(id.len(), 1);
4268        assert_eq!(id.value(0), 5);
4269    }
4270
4271    #[test]
4272    fn test_skip_writer_map_with_negative_block_count_fast() {
4273        let mut dec = make_record_resolved_decoder(
4274            &[("id", DataType::Int32, false)],
4275            vec![
4276                FieldProjection::Skip(Skipper::Map(Box::new(Skipper::Int32))),
4277                FieldProjection::ToReader(0),
4278            ],
4279        );
4280        let mut entries = Vec::new();
4281        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
4282        entries.extend_from_slice(&encode_avro_int(10));
4283        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
4284        entries.extend_from_slice(&encode_avro_int(20));
4285        let mut data = Vec::new();
4286        data.extend_from_slice(&encode_avro_long(-2));
4287        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
4288        data.extend_from_slice(&entries);
4289        data.extend_from_slice(&encode_avro_long(0));
4290        data.extend_from_slice(&encode_avro_int(123));
4291        let mut cur = AvroCursor::new(&data);
4292        dec.decode(&mut cur).unwrap();
4293        assert_eq!(cur.position(), data.len());
4294        let arr = dec.flush(None).unwrap();
4295        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4296        let id = s
4297            .column_by_name("id")
4298            .unwrap()
4299            .as_any()
4300            .downcast_ref::<Int32Array>()
4301            .unwrap();
4302        assert_eq!(id.len(), 1);
4303        assert_eq!(id.value(0), 123);
4304    }
4305
4306    #[test]
4307    fn test_skip_writer_nullable_field_union_nullfirst() {
4308        let mut dec = make_record_resolved_decoder(
4309            &[("id", DataType::Int32, false)],
4310            vec![
4311                FieldProjection::Skip(super::Skipper::Nullable(
4312                    Nullability::NullFirst,
4313                    Box::new(super::Skipper::Int32),
4314                )),
4315                FieldProjection::ToReader(0),
4316            ],
4317        );
4318        let mut row1 = Vec::new();
4319        row1.extend_from_slice(&encode_avro_long(0));
4320        row1.extend_from_slice(&encode_avro_int(5));
4321        let mut row2 = Vec::new();
4322        row2.extend_from_slice(&encode_avro_long(1));
4323        row2.extend_from_slice(&encode_avro_int(123));
4324        row2.extend_from_slice(&encode_avro_int(7));
4325        let mut cur1 = AvroCursor::new(&row1);
4326        let mut cur2 = AvroCursor::new(&row2);
4327        dec.decode(&mut cur1).unwrap();
4328        dec.decode(&mut cur2).unwrap();
4329        assert_eq!(cur1.position(), row1.len());
4330        assert_eq!(cur2.position(), row2.len());
4331        let arr = dec.flush(None).unwrap();
4332        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4333        let id = s
4334            .column_by_name("id")
4335            .unwrap()
4336            .as_any()
4337            .downcast_ref::<Int32Array>()
4338            .unwrap();
4339        assert_eq!(id.len(), 2);
4340        assert_eq!(id.value(0), 5);
4341        assert_eq!(id.value(1), 7);
4342    }
4343
4344    fn make_dense_union_avro(
4345        children: Vec<(Codec, &'_ str, DataType)>,
4346        type_ids: Vec<i8>,
4347    ) -> AvroDataType {
4348        let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
4349        let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
4350        for (codec, name, dt) in children.into_iter() {
4351            avro_children.push(AvroDataType::new(codec, Default::default(), None));
4352            fields.push(arrow_schema::Field::new(name, dt, true));
4353        }
4354        let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
4355        let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
4356        AvroDataType::new(union_codec, Default::default(), None)
4357    }
4358
4359    #[test]
4360    fn test_union_dense_two_children_custom_type_ids() {
4361        let union_dt = make_dense_union_avro(
4362            vec![
4363                (Codec::Int32, "i", DataType::Int32),
4364                (Codec::Utf8, "s", DataType::Utf8),
4365            ],
4366            vec![2, 5],
4367        );
4368        let mut dec = Decoder::try_new(&union_dt).unwrap();
4369        let mut r1 = Vec::new();
4370        r1.extend_from_slice(&encode_avro_long(0));
4371        r1.extend_from_slice(&encode_avro_int(7));
4372        let mut r2 = Vec::new();
4373        r2.extend_from_slice(&encode_avro_long(1));
4374        r2.extend_from_slice(&encode_avro_bytes(b"x"));
4375        let mut r3 = Vec::new();
4376        r3.extend_from_slice(&encode_avro_long(0));
4377        r3.extend_from_slice(&encode_avro_int(-1));
4378        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4379        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4380        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4381        let array = dec.flush(None).unwrap();
4382        let ua = array
4383            .as_any()
4384            .downcast_ref::<UnionArray>()
4385            .expect("expected UnionArray");
4386        assert_eq!(ua.len(), 3);
4387        assert_eq!(ua.type_id(0), 2);
4388        assert_eq!(ua.type_id(1), 5);
4389        assert_eq!(ua.type_id(2), 2);
4390        assert_eq!(ua.value_offset(0), 0);
4391        assert_eq!(ua.value_offset(1), 0);
4392        assert_eq!(ua.value_offset(2), 1);
4393        let int_child = ua
4394            .child(2)
4395            .as_any()
4396            .downcast_ref::<Int32Array>()
4397            .expect("int child");
4398        assert_eq!(int_child.len(), 2);
4399        assert_eq!(int_child.value(0), 7);
4400        assert_eq!(int_child.value(1), -1);
4401        let str_child = ua
4402            .child(5)
4403            .as_any()
4404            .downcast_ref::<StringArray>()
4405            .expect("string child");
4406        assert_eq!(str_child.len(), 1);
4407        assert_eq!(str_child.value(0), "x");
4408    }
4409
4410    #[test]
4411    fn test_union_dense_with_null_and_string_children() {
4412        let union_dt = make_dense_union_avro(
4413            vec![
4414                (Codec::Null, "n", DataType::Null),
4415                (Codec::Utf8, "s", DataType::Utf8),
4416            ],
4417            vec![42, 7],
4418        );
4419        let mut dec = Decoder::try_new(&union_dt).unwrap();
4420        let r1 = encode_avro_long(0);
4421        let mut r2 = Vec::new();
4422        r2.extend_from_slice(&encode_avro_long(1));
4423        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
4424        let r3 = encode_avro_long(0);
4425        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4426        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4427        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4428        let array = dec.flush(None).unwrap();
4429        let ua = array
4430            .as_any()
4431            .downcast_ref::<UnionArray>()
4432            .expect("expected UnionArray");
4433        assert_eq!(ua.len(), 3);
4434        assert_eq!(ua.type_id(0), 42);
4435        assert_eq!(ua.type_id(1), 7);
4436        assert_eq!(ua.type_id(2), 42);
4437        assert_eq!(ua.value_offset(0), 0);
4438        assert_eq!(ua.value_offset(1), 0);
4439        assert_eq!(ua.value_offset(2), 1);
4440        let null_child = ua
4441            .child(42)
4442            .as_any()
4443            .downcast_ref::<NullArray>()
4444            .expect("null child");
4445        assert_eq!(null_child.len(), 2);
4446        let str_child = ua
4447            .child(7)
4448            .as_any()
4449            .downcast_ref::<StringArray>()
4450            .expect("string child");
4451        assert_eq!(str_child.len(), 1);
4452        assert_eq!(str_child.value(0), "abc");
4453    }
4454
4455    #[test]
4456    fn test_union_decode_negative_branch_index_errors() {
4457        let union_dt = make_dense_union_avro(
4458            vec![
4459                (Codec::Int32, "i", DataType::Int32),
4460                (Codec::Utf8, "s", DataType::Utf8),
4461            ],
4462            vec![0, 1],
4463        );
4464        let mut dec = Decoder::try_new(&union_dt).unwrap();
4465        let row = encode_avro_long(-1); // decodes back to -1
4466        let err = dec
4467            .decode(&mut AvroCursor::new(&row))
4468            .expect_err("expected error for negative branch index");
4469        let msg = err.to_string();
4470        assert!(
4471            msg.contains("Negative union branch index"),
4472            "unexpected error message: {msg}"
4473        );
4474    }
4475
4476    #[test]
4477    fn test_union_decode_out_of_range_branch_index_errors() {
4478        let union_dt = make_dense_union_avro(
4479            vec![
4480                (Codec::Int32, "i", DataType::Int32),
4481                (Codec::Utf8, "s", DataType::Utf8),
4482            ],
4483            vec![10, 11],
4484        );
4485        let mut dec = Decoder::try_new(&union_dt).unwrap();
4486        let row = encode_avro_long(2);
4487        let err = dec
4488            .decode(&mut AvroCursor::new(&row))
4489            .expect_err("expected error for out-of-range branch index");
4490        let msg = err.to_string();
4491        assert!(
4492            msg.contains("out of range"),
4493            "unexpected error message: {msg}"
4494        );
4495    }
4496
4497    #[test]
4498    fn test_union_sparse_mode_not_supported() {
4499        let children: Vec<AvroDataType> = vec![
4500            AvroDataType::new(Codec::Int32, Default::default(), None),
4501            AvroDataType::new(Codec::Utf8, Default::default(), None),
4502        ];
4503        let uf = UnionFields::try_new(
4504            vec![1, 3],
4505            vec![
4506                arrow_schema::Field::new("i", DataType::Int32, true),
4507                arrow_schema::Field::new("s", DataType::Utf8, true),
4508            ],
4509        )
4510        .unwrap();
4511        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
4512        let dt = AvroDataType::new(codec, Default::default(), None);
4513        let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
4514        let msg = err.to_string();
4515        assert!(
4516            msg.contains("Sparse Arrow unions are not yet supported"),
4517            "unexpected error message: {msg}"
4518        );
4519    }
4520
4521    fn make_record_decoder_with_projector_defaults(
4522        reader_fields: &[(&str, DataType, bool)],
4523        field_defaults: Vec<Option<AvroLiteral>>,
4524        default_injections: Vec<(usize, AvroLiteral)>,
4525    ) -> Decoder {
4526        assert_eq!(
4527            field_defaults.len(),
4528            reader_fields.len(),
4529            "field_defaults must have one entry per reader field"
4530        );
4531        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4532        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4533        for (name, dt, nullable) in reader_fields {
4534            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4535            let enc = match dt {
4536                DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
4537                DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
4538                DataType::Utf8 => Decoder::String(
4539                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4540                    Vec::with_capacity(DEFAULT_CAPACITY),
4541                ),
4542                other => panic!("Unsupported test field type in helper: {other:?}"),
4543            };
4544            encodings.push(enc);
4545        }
4546        let fields: Fields = field_refs.into();
4547        let projector = Projector {
4548            writer_projections: vec![],
4549            default_injections: Arc::from(default_injections),
4550        };
4551        Decoder::Record(fields, encodings, field_defaults, Some(projector))
4552    }
4553
4554    #[cfg(feature = "avro_custom_types")]
4555    #[test]
4556    fn test_default_append_custom_integer_range_validation() {
4557        let mut d_i8 = Decoder::Int8(Vec::with_capacity(DEFAULT_CAPACITY));
4558        d_i8.append_default(&AvroLiteral::Int(i8::MIN as i32))
4559            .unwrap();
4560        d_i8.append_default(&AvroLiteral::Int(i8::MAX as i32))
4561            .unwrap();
4562        let err_i8_high = d_i8
4563            .append_default(&AvroLiteral::Int(i8::MAX as i32 + 1))
4564            .unwrap_err();
4565        assert!(err_i8_high.to_string().contains("out of range for i8"));
4566        let err_i8_low = d_i8
4567            .append_default(&AvroLiteral::Int(i8::MIN as i32 - 1))
4568            .unwrap_err();
4569        assert!(err_i8_low.to_string().contains("out of range for i8"));
4570        let arr_i8 = d_i8.flush(None).unwrap();
4571        let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4572        assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4573
4574        let mut d_i16 = Decoder::Int16(Vec::with_capacity(DEFAULT_CAPACITY));
4575        d_i16
4576            .append_default(&AvroLiteral::Int(i16::MIN as i32))
4577            .unwrap();
4578        d_i16
4579            .append_default(&AvroLiteral::Int(i16::MAX as i32))
4580            .unwrap();
4581        let err_i16_high = d_i16
4582            .append_default(&AvroLiteral::Int(i16::MAX as i32 + 1))
4583            .unwrap_err();
4584        assert!(err_i16_high.to_string().contains("out of range for i16"));
4585        let err_i16_low = d_i16
4586            .append_default(&AvroLiteral::Int(i16::MIN as i32 - 1))
4587            .unwrap_err();
4588        assert!(err_i16_low.to_string().contains("out of range for i16"));
4589        let arr_i16 = d_i16.flush(None).unwrap();
4590        let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4591        assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4592
4593        let mut d_u8 = Decoder::UInt8(Vec::with_capacity(DEFAULT_CAPACITY));
4594        d_u8.append_default(&AvroLiteral::Int(0)).unwrap();
4595        d_u8.append_default(&AvroLiteral::Int(u8::MAX as i32))
4596            .unwrap();
4597        let err_u8_neg = d_u8.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4598        assert!(err_u8_neg.to_string().contains("out of range for u8"));
4599        let err_u8_high = d_u8
4600            .append_default(&AvroLiteral::Int(u8::MAX as i32 + 1))
4601            .unwrap_err();
4602        assert!(err_u8_high.to_string().contains("out of range for u8"));
4603        let arr_u8 = d_u8.flush(None).unwrap();
4604        let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4605        assert_eq!(values_u8.values(), &[0, u8::MAX]);
4606
4607        let mut d_u16 = Decoder::UInt16(Vec::with_capacity(DEFAULT_CAPACITY));
4608        d_u16.append_default(&AvroLiteral::Int(0)).unwrap();
4609        d_u16
4610            .append_default(&AvroLiteral::Int(u16::MAX as i32))
4611            .unwrap();
4612        let err_u16_neg = d_u16.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4613        assert!(err_u16_neg.to_string().contains("out of range for u16"));
4614        let err_u16_high = d_u16
4615            .append_default(&AvroLiteral::Int(u16::MAX as i32 + 1))
4616            .unwrap_err();
4617        assert!(err_u16_high.to_string().contains("out of range for u16"));
4618        let arr_u16 = d_u16.flush(None).unwrap();
4619        let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4620        assert_eq!(values_u16.values(), &[0, u16::MAX]);
4621
4622        let mut d_u32 = Decoder::UInt32(Vec::with_capacity(DEFAULT_CAPACITY));
4623        d_u32.append_default(&AvroLiteral::Long(0)).unwrap();
4624        d_u32
4625            .append_default(&AvroLiteral::Long(u32::MAX as i64))
4626            .unwrap();
4627        let err_u32_neg = d_u32.append_default(&AvroLiteral::Long(-1)).unwrap_err();
4628        assert!(err_u32_neg.to_string().contains("out of range for u32"));
4629        let err_u32_high = d_u32
4630            .append_default(&AvroLiteral::Long(u32::MAX as i64 + 1))
4631            .unwrap_err();
4632        assert!(err_u32_high.to_string().contains("out of range for u32"));
4633        let arr_u32 = d_u32.flush(None).unwrap();
4634        let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4635        assert_eq!(values_u32.values(), &[0, u32::MAX]);
4636    }
4637
4638    #[cfg(feature = "avro_custom_types")]
4639    #[test]
4640    fn test_decode_custom_integer_range_validation() {
4641        let mut d_i8 = Decoder::try_new(&avro_from_codec(Codec::Int8)).unwrap();
4642        d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32)))
4643            .unwrap();
4644        d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32)))
4645            .unwrap();
4646        let err_i8_high = d_i8
4647            .decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32 + 1)))
4648            .unwrap_err();
4649        assert!(err_i8_high.to_string().contains("out of range for i8"));
4650        let err_i8_low = d_i8
4651            .decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32 - 1)))
4652            .unwrap_err();
4653        assert!(err_i8_low.to_string().contains("out of range for i8"));
4654        let arr_i8 = d_i8.flush(None).unwrap();
4655        let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4656        assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4657
4658        let mut d_i16 = Decoder::try_new(&avro_from_codec(Codec::Int16)).unwrap();
4659        d_i16
4660            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32)))
4661            .unwrap();
4662        d_i16
4663            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32)))
4664            .unwrap();
4665        let err_i16_high = d_i16
4666            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32 + 1)))
4667            .unwrap_err();
4668        assert!(err_i16_high.to_string().contains("out of range for i16"));
4669        let err_i16_low = d_i16
4670            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32 - 1)))
4671            .unwrap_err();
4672        assert!(err_i16_low.to_string().contains("out of range for i16"));
4673        let arr_i16 = d_i16.flush(None).unwrap();
4674        let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4675        assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4676
4677        let mut d_u8 = Decoder::try_new(&avro_from_codec(Codec::UInt8)).unwrap();
4678        d_u8.decode(&mut AvroCursor::new(&encode_avro_int(0)))
4679            .unwrap();
4680        d_u8.decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32)))
4681            .unwrap();
4682        let err_u8_neg = d_u8
4683            .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4684            .unwrap_err();
4685        assert!(err_u8_neg.to_string().contains("out of range for u8"));
4686        let err_u8_high = d_u8
4687            .decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32 + 1)))
4688            .unwrap_err();
4689        assert!(err_u8_high.to_string().contains("out of range for u8"));
4690        let arr_u8 = d_u8.flush(None).unwrap();
4691        let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4692        assert_eq!(values_u8.values(), &[0, u8::MAX]);
4693
4694        let mut d_u16 = Decoder::try_new(&avro_from_codec(Codec::UInt16)).unwrap();
4695        d_u16
4696            .decode(&mut AvroCursor::new(&encode_avro_int(0)))
4697            .unwrap();
4698        d_u16
4699            .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32)))
4700            .unwrap();
4701        let err_u16_neg = d_u16
4702            .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4703            .unwrap_err();
4704        assert!(err_u16_neg.to_string().contains("out of range for u16"));
4705        let err_u16_high = d_u16
4706            .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32 + 1)))
4707            .unwrap_err();
4708        assert!(err_u16_high.to_string().contains("out of range for u16"));
4709        let arr_u16 = d_u16.flush(None).unwrap();
4710        let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4711        assert_eq!(values_u16.values(), &[0, u16::MAX]);
4712
4713        let mut d_u32 = Decoder::try_new(&avro_from_codec(Codec::UInt32)).unwrap();
4714        d_u32
4715            .decode(&mut AvroCursor::new(&encode_avro_long(0)))
4716            .unwrap();
4717        d_u32
4718            .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64)))
4719            .unwrap();
4720        let err_u32_neg = d_u32
4721            .decode(&mut AvroCursor::new(&encode_avro_long(-1)))
4722            .unwrap_err();
4723        assert!(err_u32_neg.to_string().contains("out of range for u32"));
4724        let err_u32_high = d_u32
4725            .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64 + 1)))
4726            .unwrap_err();
4727        assert!(err_u32_high.to_string().contains("out of range for u32"));
4728        let arr_u32 = d_u32.flush(None).unwrap();
4729        let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4730        assert_eq!(values_u32.values(), &[0, u32::MAX]);
4731    }
4732
4733    #[test]
4734    fn test_default_append_int32_and_int64_from_int_and_long() {
4735        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4736        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
4737        let arr = d_i32.flush(None).unwrap();
4738        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4739        assert_eq!(a.len(), 1);
4740        assert_eq!(a.value(0), 42);
4741        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
4742        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
4743        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
4744        let arr64 = d_i64.flush(None).unwrap();
4745        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
4746        assert_eq!(a64.len(), 2);
4747        assert_eq!(a64.value(0), 5);
4748        assert_eq!(a64.value(1), 7);
4749    }
4750
4751    #[test]
4752    fn test_default_append_floats_and_doubles() {
4753        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
4754        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
4755        let arr32 = d_f32.flush(None).unwrap();
4756        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
4757        assert_eq!(a.value(0), 1.5);
4758        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4759        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
4760        let arr64 = d_f64.flush(None).unwrap();
4761        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
4762        assert_eq!(b.value(0), 2.25);
4763    }
4764
4765    #[test]
4766    fn test_default_append_string_and_bytes() {
4767        let mut d_str = Decoder::String(
4768            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4769            Vec::with_capacity(DEFAULT_CAPACITY),
4770        );
4771        d_str
4772            .append_default(&AvroLiteral::String("hi".into()))
4773            .unwrap();
4774        let s_arr = d_str.flush(None).unwrap();
4775        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
4776        assert_eq!(arr.value(0), "hi");
4777        let mut d_bytes = Decoder::Binary(
4778            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4779            Vec::with_capacity(DEFAULT_CAPACITY),
4780        );
4781        d_bytes
4782            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4783            .unwrap();
4784        let b_arr = d_bytes.flush(None).unwrap();
4785        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
4786        assert_eq!(barr.value(0), &[1, 2, 3]);
4787        let mut d_str_err = Decoder::String(
4788            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4789            Vec::with_capacity(DEFAULT_CAPACITY),
4790        );
4791        let err = d_str_err
4792            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
4793            .unwrap_err();
4794        assert!(
4795            err.to_string()
4796                .contains("Default for string must be string"),
4797            "unexpected error: {err:?}"
4798        );
4799    }
4800
4801    #[test]
4802    fn test_default_append_nullable_int32_null_and_value() {
4803        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4804        let mut dec = Decoder::Nullable(
4805            NullablePlan::ReadTag {
4806                nullability: Nullability::NullFirst,
4807                resolution: ResolutionPlan::Promotion(Promotion::Direct),
4808            },
4809            NullBufferBuilder::new(DEFAULT_CAPACITY),
4810            Box::new(inner),
4811        );
4812        dec.append_default(&AvroLiteral::Null).unwrap();
4813        dec.append_default(&AvroLiteral::Int(11)).unwrap();
4814        let arr = dec.flush(None).unwrap();
4815        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4816        assert_eq!(a.len(), 2);
4817        assert!(a.is_null(0));
4818        assert_eq!(a.value(1), 11);
4819    }
4820
4821    #[test]
4822    fn test_default_append_array_of_ints() {
4823        let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4824        let mut d = Decoder::try_new(&list_dt).unwrap();
4825        let items = vec![
4826            AvroLiteral::Int(1),
4827            AvroLiteral::Int(2),
4828            AvroLiteral::Int(3),
4829        ];
4830        d.append_default(&AvroLiteral::Array(items)).unwrap();
4831        let arr = d.flush(None).unwrap();
4832        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4833        assert_eq!(list.len(), 1);
4834        assert_eq!(list.value_length(0), 3);
4835        let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4836        assert_eq!(vals.values(), &[1, 2, 3]);
4837    }
4838
4839    #[test]
4840    fn test_default_append_map_string_to_int() {
4841        let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4842        let mut d = Decoder::try_new(&map_dt).unwrap();
4843        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4844        m.insert("k1".to_string(), AvroLiteral::Int(10));
4845        m.insert("k2".to_string(), AvroLiteral::Int(20));
4846        d.append_default(&AvroLiteral::Map(m)).unwrap();
4847        let arr = d.flush(None).unwrap();
4848        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4849        assert_eq!(map.len(), 1);
4850        assert_eq!(map.value_length(0), 2);
4851        let binding = map.value(0);
4852        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4853        let k = entries
4854            .column_by_name("key")
4855            .unwrap()
4856            .as_any()
4857            .downcast_ref::<StringArray>()
4858            .unwrap();
4859        let v = entries
4860            .column_by_name("value")
4861            .unwrap()
4862            .as_any()
4863            .downcast_ref::<Int32Array>()
4864            .unwrap();
4865        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4866        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4867        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4868        assert_eq!(vals, [10, 20].into_iter().collect());
4869    }
4870
4871    #[test]
4872    fn test_default_append_enum_by_symbol() {
4873        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4874        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4875        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4876        let arr = d.flush(None).unwrap();
4877        let dict = arr
4878            .as_any()
4879            .downcast_ref::<DictionaryArray<Int32Type>>()
4880            .unwrap();
4881        assert_eq!(dict.len(), 1);
4882        let expected = Int32Array::from(vec![1]);
4883        assert_eq!(dict.keys(), &expected);
4884        let values = dict
4885            .values()
4886            .as_any()
4887            .downcast_ref::<StringArray>()
4888            .unwrap();
4889        assert_eq!(values.value(1), "B");
4890    }
4891
4892    #[test]
4893    fn test_default_append_uuid_and_type_error() {
4894        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4895        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4896        d.append_default(&AvroLiteral::String(uuid_str.into()))
4897            .unwrap();
4898        let arr_ref = d.flush(None).unwrap();
4899        let arr = arr_ref
4900            .as_any()
4901            .downcast_ref::<FixedSizeBinaryArray>()
4902            .unwrap();
4903        assert_eq!(arr.value_length(), 16);
4904        assert_eq!(arr.len(), 1);
4905        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4906        let err = d2
4907            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4908            .unwrap_err();
4909        assert!(
4910            err.to_string().contains("Default for uuid must be string"),
4911            "unexpected error: {err:?}"
4912        );
4913    }
4914
4915    #[test]
4916    fn test_default_append_fixed_and_length_mismatch() {
4917        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4918        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4919            .unwrap();
4920        let arr_ref = d.flush(None).unwrap();
4921        let arr = arr_ref
4922            .as_any()
4923            .downcast_ref::<FixedSizeBinaryArray>()
4924            .unwrap();
4925        assert_eq!(arr.value_length(), 4);
4926        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4927        let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4928        let err = d_err
4929            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4930            .unwrap_err();
4931        assert!(
4932            err.to_string().contains("Fixed default length"),
4933            "unexpected error: {err:?}"
4934        );
4935    }
4936
4937    #[test]
4938    fn test_default_append_duration_and_length_validation() {
4939        let dt = avro_from_codec(Codec::Interval);
4940        let mut d = Decoder::try_new(&dt).unwrap();
4941        let mut bytes = Vec::with_capacity(12);
4942        bytes.extend_from_slice(&1u32.to_le_bytes());
4943        bytes.extend_from_slice(&2u32.to_le_bytes());
4944        bytes.extend_from_slice(&3u32.to_le_bytes());
4945        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4946        let arr_ref = d.flush(None).unwrap();
4947        let arr = arr_ref
4948            .as_any()
4949            .downcast_ref::<IntervalMonthDayNanoArray>()
4950            .unwrap();
4951        assert_eq!(arr.len(), 1);
4952        let v = arr.value(0);
4953        assert_eq!(v.months, 1);
4954        assert_eq!(v.days, 2);
4955        assert_eq!(v.nanoseconds, 3_000_000);
4956        let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4957        let err = d_err
4958            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4959            .unwrap_err();
4960        assert!(
4961            err.to_string()
4962                .contains("Duration default must be exactly 12 bytes"),
4963            "unexpected error: {err:?}"
4964        );
4965    }
4966
4967    #[test]
4968    fn test_default_append_decimal256_from_bytes() {
4969        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4970        let mut d = Decoder::try_new(&dt).unwrap();
4971        let pos: [u8; 32] = [
4972            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4973            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4974            0x00, 0x00, 0x30, 0x39,
4975        ];
4976        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4977        let neg: [u8; 32] = [
4978            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4979            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4980            0xFF, 0xFF, 0xFF, 0x85,
4981        ];
4982        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4983        let arr = d.flush(None).unwrap();
4984        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4985        assert_eq!(dec.len(), 2);
4986        assert_eq!(dec.value_as_string(0), "123.45");
4987        assert_eq!(dec.value_as_string(1), "-1.23");
4988    }
4989
4990    #[test]
4991    fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4992        let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4993        let mut rec = make_record_decoder_with_projector_defaults(
4994            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4995            field_defaults,
4996            vec![],
4997        );
4998        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4999        map.insert("a".to_string(), AvroLiteral::Int(7));
5000        rec.append_default(&AvroLiteral::Map(map)).unwrap();
5001        let arr = rec.flush(None).unwrap();
5002        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5003        let a = s
5004            .column_by_name("a")
5005            .unwrap()
5006            .as_any()
5007            .downcast_ref::<Int32Array>()
5008            .unwrap();
5009        let b = s
5010            .column_by_name("b")
5011            .unwrap()
5012            .as_any()
5013            .downcast_ref::<StringArray>()
5014            .unwrap();
5015        assert_eq!(a.value(0), 7);
5016        assert_eq!(b.value(0), "hi");
5017    }
5018
5019    #[test]
5020    fn test_record_append_default_null_uses_projector_field_defaults() {
5021        let field_defaults = vec![
5022            Some(AvroLiteral::Int(5)),
5023            Some(AvroLiteral::String("x".into())),
5024        ];
5025        let mut rec = make_record_decoder_with_projector_defaults(
5026            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
5027            field_defaults,
5028            vec![],
5029        );
5030        rec.append_default(&AvroLiteral::Null).unwrap();
5031        let arr = rec.flush(None).unwrap();
5032        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5033        let a = s
5034            .column_by_name("a")
5035            .unwrap()
5036            .as_any()
5037            .downcast_ref::<Int32Array>()
5038            .unwrap();
5039        let b = s
5040            .column_by_name("b")
5041            .unwrap()
5042            .as_any()
5043            .downcast_ref::<StringArray>()
5044            .unwrap();
5045        assert_eq!(a.value(0), 5);
5046        assert_eq!(b.value(0), "x");
5047    }
5048
5049    #[test]
5050    fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
5051     {
5052        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
5053        let mut field_refs: Vec<FieldRef> = Vec::new();
5054        let mut encoders: Vec<Decoder> = Vec::new();
5055        for (name, dt, nullable) in &fields {
5056            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
5057        }
5058        let enc_a = Decoder::Nullable(
5059            NullablePlan::ReadTag {
5060                nullability: Nullability::NullSecond,
5061                resolution: ResolutionPlan::Promotion(Promotion::Direct),
5062            },
5063            NullBufferBuilder::new(DEFAULT_CAPACITY),
5064            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
5065        );
5066        let enc_b = Decoder::Nullable(
5067            NullablePlan::ReadTag {
5068                nullability: Nullability::NullSecond,
5069                resolution: ResolutionPlan::Promotion(Promotion::Direct),
5070            },
5071            NullBufferBuilder::new(DEFAULT_CAPACITY),
5072            Box::new(Decoder::String(
5073                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
5074                Vec::with_capacity(DEFAULT_CAPACITY),
5075            )),
5076        );
5077        encoders.push(enc_a);
5078        encoders.push(enc_b);
5079        let field_defaults = vec![None, None]; // no defaults -> append_null
5080        let projector = Projector {
5081            writer_projections: vec![],
5082            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
5083        };
5084        let mut rec = Decoder::Record(field_refs.into(), encoders, field_defaults, Some(projector));
5085        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
5086        map.insert("a".to_string(), AvroLiteral::Int(9));
5087        rec.append_default(&AvroLiteral::Map(map)).unwrap();
5088        let arr = rec.flush(None).unwrap();
5089        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5090        let a = s
5091            .column_by_name("a")
5092            .unwrap()
5093            .as_any()
5094            .downcast_ref::<Int32Array>()
5095            .unwrap();
5096        let b = s
5097            .column_by_name("b")
5098            .unwrap()
5099            .as_any()
5100            .downcast_ref::<StringArray>()
5101            .unwrap();
5102        assert!(a.is_valid(0));
5103        assert_eq!(a.value(0), 9);
5104        assert!(b.is_null(0));
5105    }
5106
5107    #[test]
5108    fn test_projector_default_injection_when_writer_lacks_fields() {
5109        let defaults = vec![None, None];
5110        let injections = vec![
5111            (0, AvroLiteral::Int(99)),
5112            (1, AvroLiteral::String("alice".into())),
5113        ];
5114        let mut rec = make_record_decoder_with_projector_defaults(
5115            &[
5116                ("id", DataType::Int32, false),
5117                ("name", DataType::Utf8, false),
5118            ],
5119            defaults,
5120            injections,
5121        );
5122        rec.decode(&mut AvroCursor::new(&[])).unwrap();
5123        let arr = rec.flush(None).unwrap();
5124        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5125        let id = s
5126            .column_by_name("id")
5127            .unwrap()
5128            .as_any()
5129            .downcast_ref::<Int32Array>()
5130            .unwrap();
5131        let name = s
5132            .column_by_name("name")
5133            .unwrap()
5134            .as_any()
5135            .downcast_ref::<StringArray>()
5136            .unwrap();
5137        assert_eq!(id.value(0), 99);
5138        assert_eq!(name.value(0), "alice");
5139    }
5140
5141    #[test]
5142    fn union_type_ids_are_not_child_indexes() {
5143        let encodings: Vec<AvroDataType> =
5144            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
5145        let fields: UnionFields = [
5146            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
5147            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
5148        ]
5149        .into_iter()
5150        .collect();
5151        let dt = avro_from_codec(Codec::Union(
5152            encodings.into(),
5153            fields.clone(),
5154            UnionMode::Dense,
5155        ));
5156        let mut dec = Decoder::try_new(&dt).expect("decoder");
5157        let mut b1 = encode_avro_long(1);
5158        b1.extend(encode_avro_bytes("hi".as_bytes()));
5159        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
5160        let mut b0 = encode_avro_long(0);
5161        b0.extend(encode_avro_int(5));
5162        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
5163        let arr = dec.flush(None).expect("flush");
5164        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
5165        assert_eq!(ua.len(), 2);
5166        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
5167        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
5168        assert_eq!(ua.value_offset(0), 0);
5169        assert_eq!(ua.value_offset(1), 0);
5170        let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
5171        assert_eq!(utf8_child.len(), 1);
5172        assert_eq!(utf8_child.value(0), "hi");
5173        let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
5174        assert_eq!(int_child.len(), 1);
5175        assert_eq!(int_child.value(0), 5);
5176        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
5177        assert_eq!(type_ids, vec![42_i8, 7_i8]);
5178    }
5179
5180    #[cfg(feature = "avro_custom_types")]
5181    #[test]
5182    fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
5183        for codec in [
5184            Codec::DurationNanos,
5185            Codec::DurationMicros,
5186            Codec::DurationMillis,
5187            Codec::DurationSeconds,
5188        ] {
5189            let dt = make_avro_dt(codec.clone(), None);
5190            let s = Skipper::from_avro(&dt)?;
5191            match s {
5192                Skipper::Int64 => {}
5193                other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
5194            }
5195        }
5196        Ok(())
5197    }
5198
5199    #[cfg(feature = "avro_custom_types")]
5200    #[test]
5201    fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
5202        let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
5203        for codec in [
5204            Codec::DurationNanos,
5205            Codec::DurationMicros,
5206            Codec::DurationMillis,
5207            Codec::DurationSeconds,
5208        ] {
5209            let dt = make_avro_dt(codec.clone(), None);
5210            let s = Skipper::from_avro(&dt)?;
5211            for &v in &values {
5212                let bytes = encode_avro_long(v);
5213                let mut cursor = AvroCursor::new(&bytes);
5214                s.skip(&mut cursor)?;
5215                assert_eq!(
5216                    cursor.position(),
5217                    bytes.len(),
5218                    "did not consume all bytes for {:?} value {}",
5219                    codec,
5220                    v
5221                );
5222            }
5223        }
5224        Ok(())
5225    }
5226
5227    #[cfg(feature = "avro_custom_types")]
5228    #[test]
5229    fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
5230        let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
5231        let s = Skipper::from_avro(&dt)?;
5232        match &s {
5233            Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
5234                Skipper::Int64 => {}
5235                ref other => panic!("expected inner Int64, got {:?}", other),
5236            },
5237            other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
5238        }
5239        {
5240            let buf = encode_vlq_u64(0);
5241            let mut cursor = AvroCursor::new(&buf);
5242            s.skip(&mut cursor)?;
5243            assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
5244        }
5245        {
5246            let mut buf = encode_vlq_u64(1);
5247            buf.extend(encode_avro_long(0));
5248            let mut cursor = AvroCursor::new(&buf);
5249            s.skip(&mut cursor)?;
5250            assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
5251        }
5252
5253        Ok(())
5254    }
5255
5256    #[cfg(feature = "avro_custom_types")]
5257    #[test]
5258    fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
5259        let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
5260        let s = Skipper::from_avro(&dt)?;
5261        match &s {
5262            Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
5263                Skipper::Int64 => {}
5264                ref other => panic!("expected inner Int64, got {:?}", other),
5265            },
5266            other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
5267        }
5268        {
5269            let buf = encode_vlq_u64(1);
5270            let mut cursor = AvroCursor::new(&buf);
5271            s.skip(&mut cursor)?;
5272            assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
5273        }
5274        {
5275            let mut buf = encode_vlq_u64(0);
5276            buf.extend(encode_avro_long(-1));
5277            let mut cursor = AvroCursor::new(&buf);
5278            s.skip(&mut cursor)?;
5279            assert_eq!(
5280                cursor.position(),
5281                1 + encode_avro_long(-1).len(),
5282                "expected to consume tag=0 + long(-1)"
5283            );
5284        }
5285        Ok(())
5286    }
5287
5288    #[test]
5289    fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
5290        let dt = make_avro_dt(Codec::Interval, None);
5291        let s = Skipper::from_avro(&dt)?;
5292        match s {
5293            Skipper::DurationFixed12 => {}
5294            other => panic!("expected DurationFixed12, got {:?}", other),
5295        }
5296        let payload = vec![0u8; 12];
5297        let mut cursor = AvroCursor::new(&payload);
5298        s.skip(&mut cursor)?;
5299        assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
5300        Ok(())
5301    }
5302
5303    #[cfg(feature = "avro_custom_types")]
5304    #[test]
5305    fn test_run_end_encoded_width16_int32_basic_grouping() {
5306        use arrow_array::RunArray;
5307        use std::sync::Arc;
5308        let inner = avro_from_codec(Codec::Int32);
5309        let ree = AvroDataType::new(
5310            Codec::RunEndEncoded(Arc::new(inner), 16),
5311            Default::default(),
5312            None,
5313        );
5314        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5315        for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
5316            let bytes = encode_avro_int(v);
5317            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5318        }
5319        let arr = dec.flush(None).expect("flush");
5320        let ra = arr
5321            .as_any()
5322            .downcast_ref::<RunArray<Int16Type>>()
5323            .expect("RunArray<Int16Type>");
5324        assert_eq!(ra.len(), 9);
5325        assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
5326        let vals = ra
5327            .values()
5328            .as_ref()
5329            .as_any()
5330            .downcast_ref::<Int32Array>()
5331            .expect("values Int32");
5332        assert_eq!(vals.values(), &[1, 2, 3]);
5333    }
5334
5335    #[cfg(feature = "avro_custom_types")]
5336    #[test]
5337    fn test_run_end_encoded_width32_nullable_values_group_nulls() {
5338        use arrow_array::RunArray;
5339        use std::sync::Arc;
5340        let inner = AvroDataType::new(
5341            Codec::Int32,
5342            Default::default(),
5343            Some(Nullability::NullSecond),
5344        );
5345        let ree = AvroDataType::new(
5346            Codec::RunEndEncoded(Arc::new(inner), 32),
5347            Default::default(),
5348            None,
5349        );
5350        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5351        let seq: [Option<i32>; 8] = [
5352            None,
5353            None,
5354            Some(7),
5355            Some(7),
5356            Some(7),
5357            None,
5358            Some(5),
5359            Some(5),
5360        ];
5361        for item in seq {
5362            let mut bytes = Vec::new();
5363            match item {
5364                None => bytes.extend_from_slice(&encode_vlq_u64(1)),
5365                Some(v) => {
5366                    bytes.extend_from_slice(&encode_vlq_u64(0));
5367                    bytes.extend_from_slice(&encode_avro_int(v));
5368                }
5369            }
5370            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5371        }
5372        let arr = dec.flush(None).expect("flush");
5373        let ra = arr
5374            .as_any()
5375            .downcast_ref::<RunArray<Int32Type>>()
5376            .expect("RunArray<Int32Type>");
5377        assert_eq!(ra.len(), 8);
5378        assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
5379        let vals = ra
5380            .values()
5381            .as_ref()
5382            .as_any()
5383            .downcast_ref::<Int32Array>()
5384            .expect("values Int32 (nullable)");
5385        assert_eq!(vals.len(), 4);
5386        assert!(vals.is_null(0));
5387        assert_eq!(vals.value(1), 7);
5388        assert!(vals.is_null(2));
5389        assert_eq!(vals.value(3), 5);
5390    }
5391
5392    #[cfg(feature = "avro_custom_types")]
5393    #[test]
5394    fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
5395        use arrow_array::RunArray;
5396        let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
5397        let ree = Decoder::RunEndEncoded(
5398            8, /* bytes => Int64 run-ends */
5399            0,
5400            Box::new(inner_values),
5401        );
5402        let mut dec = Decoder::Nullable(
5403            NullablePlan::FromSingle {
5404                resolution: ResolutionPlan::Promotion(Promotion::IntToDouble),
5405            },
5406            NullBufferBuilder::new(DEFAULT_CAPACITY),
5407            Box::new(ree),
5408        );
5409        for v in [1, 1, 2, 2, 2] {
5410            let bytes = encode_avro_int(v);
5411            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5412        }
5413        let arr = dec.flush(None).expect("flush");
5414        let ra = arr
5415            .as_any()
5416            .downcast_ref::<RunArray<Int64Type>>()
5417            .expect("RunArray<Int64Type>");
5418        assert_eq!(ra.len(), 5);
5419        assert_eq!(ra.run_ends().values(), &[2, 5]);
5420        let vals = ra
5421            .values()
5422            .as_ref()
5423            .as_any()
5424            .downcast_ref::<Float64Array>()
5425            .expect("values Float64");
5426        assert_eq!(vals.values(), &[1.0, 2.0]);
5427    }
5428
5429    #[cfg(feature = "avro_custom_types")]
5430    #[test]
5431    fn test_run_end_encoded_unsupported_run_end_width_errors() {
5432        use std::sync::Arc;
5433        let inner = avro_from_codec(Codec::Int32);
5434        let dt = AvroDataType::new(
5435            Codec::RunEndEncoded(Arc::new(inner), 3),
5436            Default::default(),
5437            None,
5438        );
5439        let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
5440        let msg = err.to_string();
5441        assert!(
5442            msg.contains("Unsupported run-end width")
5443                && msg.contains("16/32/64 bits or 2/4/8 bytes"),
5444            "unexpected error message: {msg}"
5445        );
5446    }
5447
5448    #[cfg(feature = "avro_custom_types")]
5449    #[test]
5450    fn test_run_end_encoded_empty_input_is_empty_runarray() {
5451        use arrow_array::RunArray;
5452        use std::sync::Arc;
5453        let inner = avro_from_codec(Codec::Utf8);
5454        let dt = AvroDataType::new(
5455            Codec::RunEndEncoded(Arc::new(inner), 4),
5456            Default::default(),
5457            None,
5458        );
5459        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5460        let arr = dec.flush(None).expect("flush");
5461        let ra = arr
5462            .as_any()
5463            .downcast_ref::<RunArray<Int32Type>>()
5464            .expect("RunArray<Int32Type>");
5465        assert_eq!(ra.len(), 0);
5466        assert_eq!(ra.run_ends().len(), 0);
5467        assert_eq!(ra.values().len(), 0);
5468    }
5469
5470    #[cfg(feature = "avro_custom_types")]
5471    #[test]
5472    fn test_run_end_encoded_strings_grouping_width32_bits() {
5473        use arrow_array::RunArray;
5474        use std::sync::Arc;
5475        let inner = avro_from_codec(Codec::Utf8);
5476        let dt = AvroDataType::new(
5477            Codec::RunEndEncoded(Arc::new(inner), 32),
5478            Default::default(),
5479            None,
5480        );
5481        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5482        for s in ["a", "a", "bb", "bb", "bb", "a"] {
5483            let bytes = encode_avro_bytes(s.as_bytes());
5484            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5485        }
5486        let arr = dec.flush(None).expect("flush");
5487        let ra = arr
5488            .as_any()
5489            .downcast_ref::<RunArray<Int32Type>>()
5490            .expect("RunArray<Int32Type>");
5491        assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
5492        let vals = ra
5493            .values()
5494            .as_ref()
5495            .as_any()
5496            .downcast_ref::<StringArray>()
5497            .expect("values String");
5498        assert_eq!(vals.len(), 3);
5499        assert_eq!(vals.value(0), "a");
5500        assert_eq!(vals.value(1), "bb");
5501        assert_eq!(vals.value(2), "a");
5502    }
5503
5504    #[cfg(not(feature = "avro_custom_types"))]
5505    #[test]
5506    fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
5507        let dt = avro_from_codec(Codec::Int32);
5508        let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
5509        for v in [1, 2, 3] {
5510            let bytes = encode_avro_int(v);
5511            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5512        }
5513        let arr = dec.flush(None).expect("flush");
5514        let a = arr
5515            .as_any()
5516            .downcast_ref::<Int32Array>()
5517            .expect("Int32Array");
5518        assert_eq!(a.values(), &[1, 2, 3]);
5519    }
5520
5521    #[test]
5522    fn test_timestamp_nanos_decoding_offset_zero() {
5523        let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::OffsetZero)));
5524        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5525        let mut data = Vec::new();
5526        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5527            data.extend_from_slice(&encode_avro_long(v));
5528        }
5529        let mut cur = AvroCursor::new(&data);
5530        for _ in 0..4 {
5531            decoder.decode(&mut cur).expect("decode nanos ts");
5532        }
5533        let array = decoder.flush(None).expect("flush nanos ts");
5534        let ts = array
5535            .as_any()
5536            .downcast_ref::<TimestampNanosecondArray>()
5537            .expect("TimestampNanosecondArray");
5538        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5539        match ts.data_type() {
5540            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5541                assert_eq!(tz.as_deref(), Some("+00:00"));
5542            }
5543            other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
5544        }
5545    }
5546
5547    #[test]
5548    fn test_timestamp_nanos_decoding_utc() {
5549        let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::Utc)));
5550        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5551        let mut data = Vec::new();
5552        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5553            data.extend_from_slice(&encode_avro_long(v));
5554        }
5555        let mut cur = AvroCursor::new(&data);
5556        for _ in 0..4 {
5557            decoder.decode(&mut cur).expect("decode nanos ts");
5558        }
5559        let array = decoder.flush(None).expect("flush nanos ts");
5560        let ts = array
5561            .as_any()
5562            .downcast_ref::<TimestampNanosecondArray>()
5563            .expect("TimestampNanosecondArray");
5564        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5565        match ts.data_type() {
5566            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5567                assert_eq!(tz.as_deref(), Some("UTC"));
5568            }
5569            other => panic!("expected Timestamp(Nanosecond, Some(\"UTC\")), got {other:?}"),
5570        }
5571    }
5572
5573    #[test]
5574    fn test_timestamp_nanos_decoding_local() {
5575        let avro_type = avro_from_codec(Codec::TimestampNanos(None));
5576        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5577        let mut data = Vec::new();
5578        for v in [10_i64, 20_i64, -30_i64] {
5579            data.extend_from_slice(&encode_avro_long(v));
5580        }
5581        let mut cur = AvroCursor::new(&data);
5582        for _ in 0..3 {
5583            decoder.decode(&mut cur).expect("decode nanos ts");
5584        }
5585        let array = decoder.flush(None).expect("flush nanos ts");
5586        let ts = array
5587            .as_any()
5588            .downcast_ref::<TimestampNanosecondArray>()
5589            .expect("TimestampNanosecondArray");
5590        assert_eq!(ts.values(), &[10, 20, -30]);
5591        match ts.data_type() {
5592            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5593                assert_eq!(tz.as_deref(), None);
5594            }
5595            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5596        }
5597    }
5598
5599    #[test]
5600    fn test_timestamp_nanos_decoding_with_nulls() {
5601        let avro_type = AvroDataType::new(
5602            Codec::TimestampNanos(None),
5603            Default::default(),
5604            Some(Nullability::NullFirst),
5605        );
5606        let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
5607        let mut data = Vec::new();
5608        data.extend_from_slice(&encode_avro_long(1));
5609        data.extend_from_slice(&encode_avro_long(42));
5610        data.extend_from_slice(&encode_avro_long(0));
5611        data.extend_from_slice(&encode_avro_long(1));
5612        data.extend_from_slice(&encode_avro_long(-7));
5613        let mut cur = AvroCursor::new(&data);
5614        for _ in 0..3 {
5615            decoder.decode(&mut cur).expect("decode nullable nanos ts");
5616        }
5617        let array = decoder.flush(None).expect("flush nullable nanos ts");
5618        let ts = array
5619            .as_any()
5620            .downcast_ref::<TimestampNanosecondArray>()
5621            .expect("TimestampNanosecondArray");
5622        assert_eq!(ts.len(), 3);
5623        assert!(ts.is_valid(0));
5624        assert!(ts.is_null(1));
5625        assert!(ts.is_valid(2));
5626        assert_eq!(ts.value(0), 42);
5627        assert_eq!(ts.value(2), -7);
5628        match ts.data_type() {
5629            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5630                assert_eq!(tz.as_deref(), None);
5631            }
5632            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5633        }
5634    }
5635}