Skip to main content

arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Decoder for Arrow types.
19
20use crate::codec::{
21    AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo, ResolvedField,
22    ResolvedRecord, ResolvedUnion, Tz,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36    DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37    Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use strum_macros::AsRefStr;
42use uuid::Uuid;
43
44use std::cmp::Ordering;
45use std::mem;
46use std::sync::Arc;
47
48const DEFAULT_CAPACITY: usize = 1024;
49
50/// Macro to decode a decimal payload for a given width and integer type.
51macro_rules! decode_decimal {
52    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
53        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
54        $builder.append_value(<$Int>::from_be_bytes(bytes));
55    }};
56}
57
58/// Macro to finish a decimal builder into an array with precision/scale and nulls.
59macro_rules! flush_decimal {
60    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
61        let (_, vals, _) = $builder.finish().into_parts();
62        let dec = <$ArrayTy>::try_new(vals, $nulls)?
63            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
64        Arc::new(dec) as ArrayRef
65    }};
66}
67
68/// Macro to append a default decimal value from two's-complement big-endian bytes
69/// into the corresponding decimal builder, with compile-time constructed error text.
70macro_rules! append_decimal_default {
71    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
72        match $lit {
73            AvroLiteral::Bytes(b) => {
74                let ext = sign_cast_to::<$N>(b)?;
75                let val = <$Int>::from_be_bytes(ext);
76                $builder.append_value(val);
77                Ok(())
78            }
79            _ => Err(AvroError::InvalidArgument(
80                concat!(
81                    "Default for ",
82                    $name,
83                    " must be bytes (two's-complement big-endian)"
84                )
85                .to_string(),
86            )),
87        }
88    }};
89}
90
91/// Decodes avro encoded data into [`RecordBatch`]
92#[derive(Debug)]
93pub(crate) struct RecordDecoder {
94    schema: SchemaRef,
95    fields: Vec<Decoder>,
96    projector: Option<Projector>,
97}
98
99impl RecordDecoder {
100    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
101    ///
102    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
103    ///
104    /// # Arguments
105    /// * `data_type` - The Avro data type to decode.
106    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
107    ///
108    /// # Errors
109    /// This function will return an error if the provided `data_type` is not a `Record`.
110    pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
111        match data_type.codec() {
112            Codec::Struct(reader_fields) => {
113                // Build Arrow schema fields and per-child decoders
114                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
115                let mut encodings = Vec::with_capacity(reader_fields.len());
116                let mut field_defaults = Vec::with_capacity(reader_fields.len());
117                for avro_field in reader_fields.iter() {
118                    arrow_fields.push(avro_field.field());
119                    encodings.push(Decoder::try_new(avro_field.data_type())?);
120
121                    if let Some(ResolutionInfo::DefaultValue(lit)) =
122                        avro_field.data_type().resolution.as_ref()
123                    {
124                        field_defaults.push(Some(lit.clone()));
125                    } else {
126                        field_defaults.push(None);
127                    }
128                }
129                let projector = match data_type.resolution.as_ref() {
130                    Some(ResolutionInfo::Record(rec)) => {
131                        Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
132                    }
133                    _ => None,
134                };
135                Ok(Self {
136                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
137                    fields: encodings,
138                    projector,
139                })
140            }
141            other => Err(AvroError::ParseError(format!(
142                "Expected record got {other:?}"
143            ))),
144        }
145    }
146
147    /// Returns the decoder's `SchemaRef`
148    pub(crate) fn schema(&self) -> &SchemaRef {
149        &self.schema
150    }
151
152    /// Decode `count` records from `buf`
153    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
154        let mut cursor = AvroCursor::new(buf);
155        match self.projector.as_mut() {
156            Some(proj) => {
157                for _ in 0..count {
158                    proj.project_record(&mut cursor, &mut self.fields)?;
159                }
160            }
161            None => {
162                for _ in 0..count {
163                    for field in &mut self.fields {
164                        field.decode(&mut cursor)?;
165                    }
166                }
167            }
168        }
169        Ok(cursor.position())
170    }
171
172    /// Flush the decoded records into a [`RecordBatch`]
173    pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
174        let arrays = self
175            .fields
176            .iter_mut()
177            .map(|x| x.flush(None))
178            .collect::<Result<Vec<_>, _>>()?;
179        RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
180    }
181}
182
183#[derive(Debug, AsRefStr)]
184enum Decoder {
185    Null(usize),
186    Boolean(BooleanBufferBuilder),
187    Int32(Vec<i32>),
188    Int64(Vec<i64>),
189    #[cfg(feature = "avro_custom_types")]
190    DurationSecond(Vec<i64>),
191    #[cfg(feature = "avro_custom_types")]
192    DurationMillisecond(Vec<i64>),
193    #[cfg(feature = "avro_custom_types")]
194    DurationMicrosecond(Vec<i64>),
195    #[cfg(feature = "avro_custom_types")]
196    DurationNanosecond(Vec<i64>),
197    #[cfg(feature = "avro_custom_types")]
198    Int8(Vec<i8>),
199    #[cfg(feature = "avro_custom_types")]
200    Int16(Vec<i16>),
201    #[cfg(feature = "avro_custom_types")]
202    UInt8(Vec<u8>),
203    #[cfg(feature = "avro_custom_types")]
204    UInt16(Vec<u16>),
205    #[cfg(feature = "avro_custom_types")]
206    UInt32(Vec<u32>),
207    #[cfg(feature = "avro_custom_types")]
208    UInt64(Vec<u64>),
209    #[cfg(feature = "avro_custom_types")]
210    Float16(Vec<u16>), // Stored as raw IEEE-754 f16 bits
211    #[cfg(feature = "avro_custom_types")]
212    Date64(Vec<i64>),
213    #[cfg(feature = "avro_custom_types")]
214    TimeNanos(Vec<i64>),
215    #[cfg(feature = "avro_custom_types")]
216    Time32Secs(Vec<i32>),
217    #[cfg(feature = "avro_custom_types")]
218    TimestampSecs(bool, Vec<i64>),
219    #[cfg(feature = "avro_custom_types")]
220    IntervalYearMonth(Vec<i32>),
221    #[cfg(feature = "avro_custom_types")]
222    IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
223    #[cfg(feature = "avro_custom_types")]
224    IntervalDayTime(Vec<IntervalDayTime>),
225    Float32(Vec<f32>),
226    Float64(Vec<f64>),
227    Date32(Vec<i32>),
228    TimeMillis(Vec<i32>),
229    TimeMicros(Vec<i64>),
230    TimestampMillis(Option<Tz>, Vec<i64>),
231    TimestampMicros(Option<Tz>, Vec<i64>),
232    TimestampNanos(Option<Tz>, Vec<i64>),
233    Int32ToInt64(Vec<i64>),
234    Int32ToFloat32(Vec<f32>),
235    Int32ToFloat64(Vec<f64>),
236    Int64ToFloat32(Vec<f32>),
237    Int64ToFloat64(Vec<f64>),
238    Float32ToFloat64(Vec<f64>),
239    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
240    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
241    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
242    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
243    String(OffsetBufferBuilder<i32>, Vec<u8>),
244    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
245    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
246    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
247    Record(
248        Fields,
249        Vec<Decoder>,
250        Vec<Option<AvroLiteral>>,
251        Option<Projector>,
252    ),
253    Map(
254        FieldRef,
255        OffsetBufferBuilder<i32>,
256        OffsetBufferBuilder<i32>,
257        Vec<u8>,
258        Box<Decoder>,
259    ),
260    Fixed(i32, Vec<u8>),
261    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
262    Duration(IntervalMonthDayNanoBuilder),
263    Uuid(Vec<u8>),
264    #[cfg(feature = "small_decimals")]
265    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
266    #[cfg(feature = "small_decimals")]
267    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
268    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
269    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
270    #[cfg(feature = "avro_custom_types")]
271    RunEndEncoded(u8, usize, Box<Decoder>),
272    Union(UnionDecoder),
273    Nullable(NullablePlan, NullBufferBuilder, Box<Decoder>),
274}
275
276impl Decoder {
277    fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
278        if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
279            if info.writer_is_union && !info.reader_is_union {
280                let mut clone = data_type.clone();
281                clone.resolution = None; // Build target base decoder without Union resolution
282                let target = Self::try_new_internal(&clone)?;
283                let decoder = Self::Union(
284                    UnionDecoderBuilder::new()
285                        .with_resolved_union(info.clone())
286                        .with_target(target)
287                        .build()?,
288                );
289                return Ok(decoder);
290            }
291        }
292        Self::try_new_internal(data_type)
293    }
294
295    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
296        // Extract just the Promotion (if any) to simplify pattern matching
297        let promotion = match data_type.resolution.as_ref() {
298            Some(ResolutionInfo::Promotion(p)) => Some(*p),
299            _ => None,
300        };
301        let decoder = match (data_type.codec(), promotion) {
302            (Codec::Int64, Some(Promotion::IntToLong)) => {
303                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
304            }
305            (Codec::Float32, Some(Promotion::IntToFloat)) => {
306                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
307            }
308            (Codec::Float64, Some(Promotion::IntToDouble)) => {
309                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
310            }
311            (Codec::Float32, Some(Promotion::LongToFloat)) => {
312                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
313            }
314            (Codec::Float64, Some(Promotion::LongToDouble)) => {
315                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
316            }
317            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
318                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
319            }
320            (Codec::Utf8, Some(Promotion::BytesToString))
321            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
322                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
323                Vec::with_capacity(DEFAULT_CAPACITY),
324            ),
325            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
326                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
327                Vec::with_capacity(DEFAULT_CAPACITY),
328            ),
329            (Codec::Null, _) => Self::Null(0),
330            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
331            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
332            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
333            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
334            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
335            (Codec::Binary, _) => Self::Binary(
336                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
337                Vec::with_capacity(DEFAULT_CAPACITY),
338            ),
339            (Codec::Utf8, _) => Self::String(
340                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
341                Vec::with_capacity(DEFAULT_CAPACITY),
342            ),
343            (Codec::Utf8View, _) => Self::StringView(
344                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
345                Vec::with_capacity(DEFAULT_CAPACITY),
346            ),
347            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
348            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
349            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
350            (Codec::TimestampMillis(tz), _) => {
351                Self::TimestampMillis(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
352            }
353            (Codec::TimestampMicros(tz), _) => {
354                Self::TimestampMicros(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
355            }
356            (Codec::TimestampNanos(tz), _) => {
357                Self::TimestampNanos(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
358            }
359            #[cfg(feature = "avro_custom_types")]
360            (Codec::DurationNanos, _) => {
361                Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
362            }
363            #[cfg(feature = "avro_custom_types")]
364            (Codec::DurationMicros, _) => {
365                Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
366            }
367            #[cfg(feature = "avro_custom_types")]
368            (Codec::DurationMillis, _) => {
369                Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
370            }
371            #[cfg(feature = "avro_custom_types")]
372            (Codec::DurationSeconds, _) => {
373                Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
374            }
375            #[cfg(feature = "avro_custom_types")]
376            (Codec::Int8, _) => Self::Int8(Vec::with_capacity(DEFAULT_CAPACITY)),
377            #[cfg(feature = "avro_custom_types")]
378            (Codec::Int16, _) => Self::Int16(Vec::with_capacity(DEFAULT_CAPACITY)),
379            #[cfg(feature = "avro_custom_types")]
380            (Codec::UInt8, _) => Self::UInt8(Vec::with_capacity(DEFAULT_CAPACITY)),
381            #[cfg(feature = "avro_custom_types")]
382            (Codec::UInt16, _) => Self::UInt16(Vec::with_capacity(DEFAULT_CAPACITY)),
383            #[cfg(feature = "avro_custom_types")]
384            (Codec::UInt32, _) => Self::UInt32(Vec::with_capacity(DEFAULT_CAPACITY)),
385            #[cfg(feature = "avro_custom_types")]
386            (Codec::UInt64, _) => Self::UInt64(Vec::with_capacity(DEFAULT_CAPACITY)),
387            #[cfg(feature = "avro_custom_types")]
388            (Codec::Float16, _) => Self::Float16(Vec::with_capacity(DEFAULT_CAPACITY)),
389            #[cfg(feature = "avro_custom_types")]
390            (Codec::Date64, _) => Self::Date64(Vec::with_capacity(DEFAULT_CAPACITY)),
391            #[cfg(feature = "avro_custom_types")]
392            (Codec::TimeNanos, _) => Self::TimeNanos(Vec::with_capacity(DEFAULT_CAPACITY)),
393            #[cfg(feature = "avro_custom_types")]
394            (Codec::Time32Secs, _) => Self::Time32Secs(Vec::with_capacity(DEFAULT_CAPACITY)),
395            #[cfg(feature = "avro_custom_types")]
396            (Codec::TimestampSecs(is_utc), _) => {
397                Self::TimestampSecs(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
398            }
399            #[cfg(feature = "avro_custom_types")]
400            (Codec::IntervalYearMonth, _) => {
401                Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
402            }
403            #[cfg(feature = "avro_custom_types")]
404            (Codec::IntervalMonthDayNano, _) => {
405                Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
406            }
407            #[cfg(feature = "avro_custom_types")]
408            (Codec::IntervalDayTime, _) => {
409                Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
410            }
411            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
412            (Codec::Decimal(precision, scale, size), _) => {
413                let p = *precision;
414                let s = *scale;
415                let prec = p as u8;
416                let scl = s.unwrap_or(0) as i8;
417                #[cfg(feature = "small_decimals")]
418                {
419                    if p <= DECIMAL32_MAX_PRECISION as usize {
420                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
421                            .with_precision_and_scale(prec, scl)?;
422                        Self::Decimal32(p, s, *size, builder)
423                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
424                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
425                            .with_precision_and_scale(prec, scl)?;
426                        Self::Decimal64(p, s, *size, builder)
427                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
428                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
429                            .with_precision_and_scale(prec, scl)?;
430                        Self::Decimal128(p, s, *size, builder)
431                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
432                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
433                            .with_precision_and_scale(prec, scl)?;
434                        Self::Decimal256(p, s, *size, builder)
435                    } else {
436                        return Err(AvroError::ParseError(format!(
437                            "Decimal precision {p} exceeds maximum supported"
438                        )));
439                    }
440                }
441                #[cfg(not(feature = "small_decimals"))]
442                {
443                    if p <= DECIMAL128_MAX_PRECISION as usize {
444                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
445                            .with_precision_and_scale(prec, scl)?;
446                        Self::Decimal128(p, s, *size, builder)
447                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
448                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
449                            .with_precision_and_scale(prec, scl)?;
450                        Self::Decimal256(p, s, *size, builder)
451                    } else {
452                        return Err(AvroError::ParseError(format!(
453                            "Decimal precision {p} exceeds maximum supported"
454                        )));
455                    }
456                }
457            }
458            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
459            (Codec::List(item), _) => {
460                let decoder = Self::try_new(item)?;
461                Self::Array(
462                    Arc::new(item.field_with_name("item")),
463                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
464                    Box::new(decoder),
465                )
466            }
467            (Codec::Enum(symbols), _) => {
468                let res = match data_type.resolution.as_ref() {
469                    Some(ResolutionInfo::EnumMapping(mapping)) => {
470                        Some(EnumResolution::new(mapping))
471                    }
472                    _ => None,
473                };
474                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
475            }
476            (Codec::Struct(fields), _) => {
477                let mut arrow_fields = Vec::with_capacity(fields.len());
478                let mut encodings = Vec::with_capacity(fields.len());
479                let mut field_defaults = Vec::with_capacity(fields.len());
480                for avro_field in fields.iter() {
481                    let encoding = Self::try_new(avro_field.data_type())?;
482                    arrow_fields.push(avro_field.field());
483                    encodings.push(encoding);
484
485                    if let Some(ResolutionInfo::DefaultValue(lit)) =
486                        avro_field.data_type().resolution.as_ref()
487                    {
488                        field_defaults.push(Some(lit.clone()));
489                    } else {
490                        field_defaults.push(None);
491                    }
492                }
493                let projector =
494                    if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
495                        Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
496                    } else {
497                        None
498                    };
499                Self::Record(arrow_fields.into(), encodings, field_defaults, projector)
500            }
501            (Codec::Map(child), _) => {
502                let val_field = child.field_with_name("value");
503                let map_field = Arc::new(ArrowField::new(
504                    "entries",
505                    DataType::Struct(Fields::from(vec![
506                        ArrowField::new("key", DataType::Utf8, false),
507                        val_field,
508                    ])),
509                    false,
510                ));
511                let val_dec = Self::try_new(child)?;
512                Self::Map(
513                    map_field,
514                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
515                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
516                    Vec::with_capacity(DEFAULT_CAPACITY),
517                    Box::new(val_dec),
518                )
519            }
520            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
521            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
522                let decoders = encodings
523                    .iter()
524                    .map(Self::try_new_internal)
525                    .collect::<Result<Vec<_>, _>>()?;
526                if fields.len() != decoders.len() {
527                    return Err(AvroError::SchemaError(format!(
528                        "Union has {} fields but {} decoders",
529                        fields.len(),
530                        decoders.len()
531                    )));
532                }
533                // Proactive guard: if a user provides a union with more branches than
534                // a 32-bit Avro index can address, fail fast with a clear message.
535                let branch_count = decoders.len();
536                let max_addr = (i32::MAX as usize) + 1;
537                if branch_count > max_addr {
538                    return Err(AvroError::SchemaError(format!(
539                        "Union has {branch_count} branches, which exceeds the maximum addressable \
540                         branches by an Avro int tag ({} + 1).",
541                        i32::MAX
542                    )));
543                }
544                let mut builder = UnionDecoderBuilder::new()
545                    .with_fields(fields.clone())
546                    .with_branches(decoders);
547                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
548                    if info.reader_is_union {
549                        builder = builder.with_resolved_union(info.clone());
550                    }
551                }
552                Self::Union(builder.build()?)
553            }
554            (Codec::Union(_, _, _), _) => {
555                return Err(AvroError::NYI(
556                    "Sparse Arrow unions are not yet supported".to_string(),
557                ));
558            }
559            #[cfg(feature = "avro_custom_types")]
560            (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
561                let inner = Self::try_new(values_dt)?;
562                let byte_width: u8 = match *width_bits_or_bytes {
563                    2 | 4 | 8 => *width_bits_or_bytes,
564                    16 => 2,
565                    32 => 4,
566                    64 => 8,
567                    other => {
568                        return Err(AvroError::InvalidArgument(format!(
569                            "Unsupported run-end width {other} for RunEndEncoded; \
570                             expected 16/32/64 bits or 2/4/8 bytes"
571                        )));
572                    }
573                };
574                Self::RunEndEncoded(byte_width, 0, Box::new(inner))
575            }
576        };
577        Ok(match data_type.nullability() {
578            Some(nullability) => {
579                // Default to reading a union branch tag unless the resolution directs otherwise.
580                let plan = match &data_type.resolution {
581                    None => NullablePlan::ReadTag {
582                        nullability,
583                        resolution: ResolutionPlan::Promotion(Promotion::Direct),
584                    },
585                    Some(ResolutionInfo::Promotion(_)) => {
586                        // Promotions should have been incorporated
587                        // into the inner decoder.
588                        NullablePlan::FromSingle {
589                            resolution: ResolutionPlan::Promotion(Promotion::Direct),
590                        }
591                    }
592                    Some(ResolutionInfo::Union(info)) if !info.writer_is_union => {
593                        let Some(Some((_, resolution))) = info.writer_to_reader.first() else {
594                            return Err(AvroError::SchemaError(
595                                "unexpected union resolution info for non-union writer and union reader type".into(),
596                            ));
597                        };
598                        let resolution = ResolutionPlan::try_new(&decoder, resolution)?;
599                        NullablePlan::FromSingle { resolution }
600                    }
601                    Some(ResolutionInfo::Union(info)) => {
602                        let Some((_, resolution)) =
603                            info.writer_to_reader[nullability.non_null_index()].as_ref()
604                        else {
605                            return Err(AvroError::SchemaError(
606                                "unexpected union resolution info for nullable writer type".into(),
607                            ));
608                        };
609                        NullablePlan::ReadTag {
610                            nullability,
611                            resolution: ResolutionPlan::try_new(&decoder, resolution)?,
612                        }
613                    }
614                    Some(resolution) => NullablePlan::FromSingle {
615                        resolution: ResolutionPlan::try_new(&decoder, resolution)?,
616                    },
617                };
618                Self::Nullable(
619                    plan,
620                    NullBufferBuilder::new(DEFAULT_CAPACITY),
621                    Box::new(decoder),
622                )
623            }
624            None => decoder,
625        })
626    }
627
628    /// Append a null record
629    fn append_null(&mut self) -> Result<(), AvroError> {
630        match self {
631            Self::Null(count) => *count += 1,
632            Self::Boolean(b) => b.append(false),
633            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
634            Self::Int64(v)
635            | Self::Int32ToInt64(v)
636            | Self::TimeMicros(v)
637            | Self::TimestampMillis(_, v)
638            | Self::TimestampMicros(_, v)
639            | Self::TimestampNanos(_, v) => v.push(0),
640            #[cfg(feature = "avro_custom_types")]
641            Self::DurationSecond(v)
642            | Self::DurationMillisecond(v)
643            | Self::DurationMicrosecond(v)
644            | Self::DurationNanosecond(v) => v.push(0),
645            #[cfg(feature = "avro_custom_types")]
646            Self::Int8(v) => v.push(0),
647            #[cfg(feature = "avro_custom_types")]
648            Self::Int16(v) => v.push(0),
649            #[cfg(feature = "avro_custom_types")]
650            Self::UInt8(v) => v.push(0),
651            #[cfg(feature = "avro_custom_types")]
652            Self::UInt16(v) => v.push(0),
653            #[cfg(feature = "avro_custom_types")]
654            Self::UInt32(v) => v.push(0),
655            #[cfg(feature = "avro_custom_types")]
656            Self::UInt64(v) => v.push(0),
657            #[cfg(feature = "avro_custom_types")]
658            Self::Float16(v) => v.push(0),
659            #[cfg(feature = "avro_custom_types")]
660            Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => v.push(0),
661            #[cfg(feature = "avro_custom_types")]
662            Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
663            #[cfg(feature = "avro_custom_types")]
664            Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
665            #[cfg(feature = "avro_custom_types")]
666            Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
667            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
668            Self::Float64(v)
669            | Self::Int32ToFloat64(v)
670            | Self::Int64ToFloat64(v)
671            | Self::Float32ToFloat64(v) => v.push(0.),
672            Self::Binary(offsets, _)
673            | Self::String(offsets, _)
674            | Self::StringView(offsets, _)
675            | Self::BytesToString(offsets, _)
676            | Self::StringToBytes(offsets, _) => {
677                offsets.push_length(0);
678            }
679            Self::Uuid(v) => {
680                v.extend([0; 16]);
681            }
682            Self::Array(_, offsets, _) => {
683                offsets.push_length(0);
684            }
685            Self::Record(_, e, _, _) => {
686                for encoding in e.iter_mut() {
687                    encoding.append_null()?;
688                }
689            }
690            Self::Map(_, _koff, moff, _, _) => {
691                moff.push_length(0);
692            }
693            Self::Fixed(sz, accum) => {
694                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
695            }
696            #[cfg(feature = "small_decimals")]
697            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
698            #[cfg(feature = "small_decimals")]
699            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
700            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
701            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
702            Self::Enum(indices, _, _) => indices.push(0),
703            Self::Duration(builder) => builder.append_null(),
704            #[cfg(feature = "avro_custom_types")]
705            Self::RunEndEncoded(_, len, inner) => {
706                *len += 1;
707                inner.append_null()?;
708            }
709            Self::Union(u) => u.append_null()?,
710            Self::Nullable(_, null_buffer, inner) => {
711                null_buffer.append(false);
712                inner.append_null()?;
713            }
714        }
715        Ok(())
716    }
717
718    /// Append a single default literal into the decoder's buffers
719    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
720        match self {
721            Self::Nullable(_, nb, inner) => {
722                if matches!(lit, AvroLiteral::Null) {
723                    nb.append(false);
724                    inner.append_null()
725                } else {
726                    nb.append(true);
727                    inner.append_default(lit)
728                }
729            }
730            Self::Null(count) => match lit {
731                AvroLiteral::Null => {
732                    *count += 1;
733                    Ok(())
734                }
735                _ => Err(AvroError::InvalidArgument(
736                    "Non-null default for null type".to_string(),
737                )),
738            },
739            Self::Boolean(b) => match lit {
740                AvroLiteral::Boolean(v) => {
741                    b.append(*v);
742                    Ok(())
743                }
744                _ => Err(AvroError::InvalidArgument(
745                    "Default for boolean must be boolean".to_string(),
746                )),
747            },
748            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
749                AvroLiteral::Int(i) => {
750                    v.push(*i);
751                    Ok(())
752                }
753                _ => Err(AvroError::InvalidArgument(
754                    "Default for int32/date32/time-millis must be int".to_string(),
755                )),
756            },
757            #[cfg(feature = "avro_custom_types")]
758            Self::DurationSecond(v)
759            | Self::DurationMillisecond(v)
760            | Self::DurationMicrosecond(v)
761            | Self::DurationNanosecond(v) => match lit {
762                AvroLiteral::Long(i) => {
763                    v.push(*i);
764                    Ok(())
765                }
766                _ => Err(AvroError::InvalidArgument(
767                    "Default for duration long must be long".to_string(),
768                )),
769            },
770            #[cfg(feature = "avro_custom_types")]
771            Self::Int8(v) => match lit {
772                AvroLiteral::Int(i) => {
773                    let x = i8::try_from(*i).map_err(|_| {
774                        AvroError::InvalidArgument(format!(
775                            "Default for int8 out of range for i8: {i}"
776                        ))
777                    })?;
778                    v.push(x);
779                    Ok(())
780                }
781                _ => Err(AvroError::InvalidArgument(
782                    "Default for int8 must be int".to_string(),
783                )),
784            },
785            #[cfg(feature = "avro_custom_types")]
786            Self::Int16(v) => match lit {
787                AvroLiteral::Int(i) => {
788                    let x = i16::try_from(*i).map_err(|_| {
789                        AvroError::InvalidArgument(format!(
790                            "Default for int16 out of range for i16: {i}"
791                        ))
792                    })?;
793                    v.push(x);
794                    Ok(())
795                }
796                _ => Err(AvroError::InvalidArgument(
797                    "Default for int16 must be int".to_string(),
798                )),
799            },
800            #[cfg(feature = "avro_custom_types")]
801            Self::UInt8(v) => match lit {
802                AvroLiteral::Int(i) => {
803                    let x = u8::try_from(*i).map_err(|_| {
804                        AvroError::InvalidArgument(format!(
805                            "Default for uint8 out of range for u8: {i}"
806                        ))
807                    })?;
808                    v.push(x);
809                    Ok(())
810                }
811                _ => Err(AvroError::InvalidArgument(
812                    "Default for uint8 must be int".to_string(),
813                )),
814            },
815            #[cfg(feature = "avro_custom_types")]
816            Self::UInt16(v) => match lit {
817                AvroLiteral::Int(i) => {
818                    let x = u16::try_from(*i).map_err(|_| {
819                        AvroError::InvalidArgument(format!(
820                            "Default for uint16 out of range for u16: {i}"
821                        ))
822                    })?;
823                    v.push(x);
824                    Ok(())
825                }
826                _ => Err(AvroError::InvalidArgument(
827                    "Default for uint16 must be int".to_string(),
828                )),
829            },
830            #[cfg(feature = "avro_custom_types")]
831            Self::UInt32(v) => match lit {
832                AvroLiteral::Long(i) => {
833                    let x = u32::try_from(*i).map_err(|_| {
834                        AvroError::InvalidArgument(format!(
835                            "Default for uint32 out of range for u32: {i}"
836                        ))
837                    })?;
838                    v.push(x);
839                    Ok(())
840                }
841                _ => Err(AvroError::InvalidArgument(
842                    "Default for uint32 must be long".to_string(),
843                )),
844            },
845            #[cfg(feature = "avro_custom_types")]
846            Self::UInt64(v) => match lit {
847                AvroLiteral::Bytes(b) => {
848                    if b.len() != 8 {
849                        return Err(AvroError::InvalidArgument(format!(
850                            "uint64 default must be exactly 8 bytes, got {}",
851                            b.len()
852                        )));
853                    }
854                    v.push(u64::from_le_bytes([
855                        b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
856                    ]));
857                    Ok(())
858                }
859                _ => Err(AvroError::InvalidArgument(
860                    "Default for uint64 must be bytes (8-byte LE)".to_string(),
861                )),
862            },
863            #[cfg(feature = "avro_custom_types")]
864            Self::Float16(v) => match lit {
865                AvroLiteral::Bytes(b) => {
866                    if b.len() != 2 {
867                        return Err(AvroError::InvalidArgument(format!(
868                            "float16 default must be exactly 2 bytes, got {}",
869                            b.len()
870                        )));
871                    }
872                    v.push(u16::from_le_bytes([b[0], b[1]]));
873                    Ok(())
874                }
875                _ => Err(AvroError::InvalidArgument(
876                    "Default for float16 must be bytes (2-byte LE IEEE-754)".to_string(),
877                )),
878            },
879            #[cfg(feature = "avro_custom_types")]
880            Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => match lit {
881                AvroLiteral::Long(i) => {
882                    v.push(*i);
883                    Ok(())
884                }
885                _ => Err(AvroError::InvalidArgument(
886                    "Default for date64/time-nanos/timestamp-secs must be long".to_string(),
887                )),
888            },
889            #[cfg(feature = "avro_custom_types")]
890            Self::Time32Secs(v) => match lit {
891                AvroLiteral::Int(i) => {
892                    v.push(*i);
893                    Ok(())
894                }
895                _ => Err(AvroError::InvalidArgument(
896                    "Default for time32-secs must be int".to_string(),
897                )),
898            },
899            #[cfg(feature = "avro_custom_types")]
900            Self::IntervalYearMonth(v) => match lit {
901                AvroLiteral::Bytes(b) => {
902                    if b.len() != 4 {
903                        return Err(AvroError::InvalidArgument(format!(
904                            "interval-year-month default must be exactly 4 bytes, got {}",
905                            b.len()
906                        )));
907                    }
908                    v.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
909                    Ok(())
910                }
911                _ => Err(AvroError::InvalidArgument(
912                    "Default for interval-year-month must be bytes (4-byte LE)".to_string(),
913                )),
914            },
915            #[cfg(feature = "avro_custom_types")]
916            Self::IntervalMonthDayNano(v) => match lit {
917                AvroLiteral::Bytes(b) => {
918                    if b.len() != 16 {
919                        return Err(AvroError::InvalidArgument(format!(
920                            "interval-month-day-nano default must be exactly 16 bytes, got {}",
921                            b.len()
922                        )));
923                    }
924                    let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
925                    let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
926                    let nanos =
927                        i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
928                    v.push(IntervalMonthDayNano::new(months, days, nanos));
929                    Ok(())
930                }
931                _ => Err(AvroError::InvalidArgument(
932                    "Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
933                )),
934            },
935            #[cfg(feature = "avro_custom_types")]
936            Self::IntervalDayTime(v) => match lit {
937                AvroLiteral::Bytes(b) => {
938                    if b.len() != 8 {
939                        return Err(AvroError::InvalidArgument(format!(
940                            "interval-day-time default must be exactly 8 bytes, got {}",
941                            b.len()
942                        )));
943                    }
944                    let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
945                    let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
946                    v.push(IntervalDayTime::new(days, milliseconds));
947                    Ok(())
948                }
949                _ => Err(AvroError::InvalidArgument(
950                    "Default for interval-day-time must be bytes (8-byte LE)".to_string(),
951                )),
952            },
953            Self::Int64(v)
954            | Self::Int32ToInt64(v)
955            | Self::TimeMicros(v)
956            | Self::TimestampMillis(_, v)
957            | Self::TimestampMicros(_, v)
958            | Self::TimestampNanos(_, v) => match lit {
959                AvroLiteral::Long(i) => {
960                    v.push(*i);
961                    Ok(())
962                }
963                AvroLiteral::Int(i) => {
964                    v.push(*i as i64);
965                    Ok(())
966                }
967                _ => Err(AvroError::InvalidArgument(
968                    "Default for long/time-micros/timestamp must be long or int".to_string(),
969                )),
970            },
971            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
972                AvroLiteral::Float(f) => {
973                    v.push(*f);
974                    Ok(())
975                }
976                _ => Err(AvroError::InvalidArgument(
977                    "Default for float must be float".to_string(),
978                )),
979            },
980            Self::Float64(v)
981            | Self::Int32ToFloat64(v)
982            | Self::Int64ToFloat64(v)
983            | Self::Float32ToFloat64(v) => match lit {
984                AvroLiteral::Double(f) => {
985                    v.push(*f);
986                    Ok(())
987                }
988                _ => Err(AvroError::InvalidArgument(
989                    "Default for double must be double".to_string(),
990                )),
991            },
992            Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
993                AvroLiteral::Bytes(b) => {
994                    offsets.push_length(b.len());
995                    values.extend_from_slice(b);
996                    Ok(())
997                }
998                _ => Err(AvroError::InvalidArgument(
999                    "Default for bytes must be bytes".to_string(),
1000                )),
1001            },
1002            Self::BytesToString(offsets, values)
1003            | Self::String(offsets, values)
1004            | Self::StringView(offsets, values) => match lit {
1005                AvroLiteral::String(s) => {
1006                    let b = s.as_bytes();
1007                    offsets.push_length(b.len());
1008                    values.extend_from_slice(b);
1009                    Ok(())
1010                }
1011                _ => Err(AvroError::InvalidArgument(
1012                    "Default for string must be string".to_string(),
1013                )),
1014            },
1015            Self::Uuid(values) => match lit {
1016                AvroLiteral::String(s) => {
1017                    let uuid = Uuid::try_parse(s).map_err(|e| {
1018                        AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
1019                    })?;
1020                    values.extend_from_slice(uuid.as_bytes());
1021                    Ok(())
1022                }
1023                _ => Err(AvroError::InvalidArgument(
1024                    "Default for uuid must be string".to_string(),
1025                )),
1026            },
1027            Self::Fixed(sz, accum) => match lit {
1028                AvroLiteral::Bytes(b) => {
1029                    if b.len() != *sz as usize {
1030                        return Err(AvroError::InvalidArgument(format!(
1031                            "Fixed default length {} does not match size {sz}",
1032                            b.len(),
1033                        )));
1034                    }
1035                    accum.extend_from_slice(b);
1036                    Ok(())
1037                }
1038                _ => Err(AvroError::InvalidArgument(
1039                    "Default for fixed must be bytes".to_string(),
1040                )),
1041            },
1042            #[cfg(feature = "small_decimals")]
1043            Self::Decimal32(_, _, _, builder) => {
1044                append_decimal_default!(lit, builder, 4, i32, "decimal32")
1045            }
1046            #[cfg(feature = "small_decimals")]
1047            Self::Decimal64(_, _, _, builder) => {
1048                append_decimal_default!(lit, builder, 8, i64, "decimal64")
1049            }
1050            Self::Decimal128(_, _, _, builder) => {
1051                append_decimal_default!(lit, builder, 16, i128, "decimal128")
1052            }
1053            Self::Decimal256(_, _, _, builder) => {
1054                append_decimal_default!(lit, builder, 32, i256, "decimal256")
1055            }
1056            Self::Duration(builder) => match lit {
1057                AvroLiteral::Bytes(b) => {
1058                    if b.len() != 12 {
1059                        return Err(AvroError::InvalidArgument(format!(
1060                            "Duration default must be exactly 12 bytes, got {}",
1061                            b.len()
1062                        )));
1063                    }
1064                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1065                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1066                    let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1067                    let nanos = (millis as i64) * 1_000_000;
1068                    builder.append_value(IntervalMonthDayNano::new(
1069                        months as i32,
1070                        days as i32,
1071                        nanos,
1072                    ));
1073                    Ok(())
1074                }
1075                _ => Err(AvroError::InvalidArgument(
1076                    "Default for duration must be 12-byte little-endian months/days/millis"
1077                        .to_string(),
1078                )),
1079            },
1080            Self::Array(_, offsets, inner) => match lit {
1081                AvroLiteral::Array(items) => {
1082                    offsets.push_length(items.len());
1083                    for item in items {
1084                        inner.append_default(item)?;
1085                    }
1086                    Ok(())
1087                }
1088                _ => Err(AvroError::InvalidArgument(
1089                    "Default for array must be an array literal".to_string(),
1090                )),
1091            },
1092            Self::Map(_, koff, moff, kdata, valdec) => match lit {
1093                AvroLiteral::Map(entries) => {
1094                    moff.push_length(entries.len());
1095                    for (k, v) in entries {
1096                        let kb = k.as_bytes();
1097                        koff.push_length(kb.len());
1098                        kdata.extend_from_slice(kb);
1099                        valdec.append_default(v)?;
1100                    }
1101                    Ok(())
1102                }
1103                _ => Err(AvroError::InvalidArgument(
1104                    "Default for map must be a map/object literal".to_string(),
1105                )),
1106            },
1107            Self::Enum(indices, symbols, _) => match lit {
1108                AvroLiteral::Enum(sym) => {
1109                    let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
1110                        AvroError::InvalidArgument(format!(
1111                            "Enum default symbol {sym:?} not in reader symbols"
1112                        ))
1113                    })?;
1114                    indices.push(pos as i32);
1115                    Ok(())
1116                }
1117                _ => Err(AvroError::InvalidArgument(
1118                    "Default for enum must be a symbol".to_string(),
1119                )),
1120            },
1121            #[cfg(feature = "avro_custom_types")]
1122            Self::RunEndEncoded(_, len, inner) => {
1123                *len += 1;
1124                inner.append_default(lit)
1125            }
1126            Self::Union(u) => u.append_default(lit),
1127            Self::Record(field_meta, decoders, field_defaults, _) => match lit {
1128                AvroLiteral::Map(entries) => {
1129                    for (i, dec) in decoders.iter_mut().enumerate() {
1130                        let name = field_meta[i].name();
1131                        if let Some(sub) = entries.get(name) {
1132                            dec.append_default(sub)?;
1133                        } else if let Some(default_literal) = field_defaults[i].as_ref() {
1134                            dec.append_default(default_literal)?;
1135                        } else {
1136                            dec.append_null()?;
1137                        }
1138                    }
1139                    Ok(())
1140                }
1141                AvroLiteral::Null => {
1142                    for (i, dec) in decoders.iter_mut().enumerate() {
1143                        if let Some(default_literal) = field_defaults[i].as_ref() {
1144                            dec.append_default(default_literal)?;
1145                        } else {
1146                            dec.append_null()?;
1147                        }
1148                    }
1149                    Ok(())
1150                }
1151                _ => Err(AvroError::InvalidArgument(
1152                    "Default for record must be a map/object or null".to_string(),
1153                )),
1154            },
1155        }
1156    }
1157
1158    /// Decode a single record from `buf`
1159    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1160        match self {
1161            Self::Null(x) => *x += 1,
1162            Self::Boolean(values) => values.append(buf.get_bool()?),
1163            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
1164                values.push(buf.get_int()?)
1165            }
1166            Self::Int64(values)
1167            | Self::TimeMicros(values)
1168            | Self::TimestampMillis(_, values)
1169            | Self::TimestampMicros(_, values)
1170            | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
1171            #[cfg(feature = "avro_custom_types")]
1172            Self::DurationSecond(values)
1173            | Self::DurationMillisecond(values)
1174            | Self::DurationMicrosecond(values)
1175            | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
1176            #[cfg(feature = "avro_custom_types")]
1177            Self::Int8(values) => {
1178                let raw = buf.get_int()?;
1179                let x = i8::try_from(raw).map_err(|_| {
1180                    AvroError::ParseError(format!("int8 value {raw} out of range for i8"))
1181                })?;
1182                values.push(x);
1183            }
1184            #[cfg(feature = "avro_custom_types")]
1185            Self::Int16(values) => {
1186                let raw = buf.get_int()?;
1187                let x = i16::try_from(raw).map_err(|_| {
1188                    AvroError::ParseError(format!("int16 value {raw} out of range for i16"))
1189                })?;
1190                values.push(x);
1191            }
1192            #[cfg(feature = "avro_custom_types")]
1193            Self::UInt8(values) => {
1194                let raw = buf.get_int()?;
1195                let x = u8::try_from(raw).map_err(|_| {
1196                    AvroError::ParseError(format!("uint8 value {raw} out of range for u8"))
1197                })?;
1198                values.push(x);
1199            }
1200            #[cfg(feature = "avro_custom_types")]
1201            Self::UInt16(values) => {
1202                let raw = buf.get_int()?;
1203                let x = u16::try_from(raw).map_err(|_| {
1204                    AvroError::ParseError(format!("uint16 value {raw} out of range for u16"))
1205                })?;
1206                values.push(x);
1207            }
1208            #[cfg(feature = "avro_custom_types")]
1209            Self::UInt32(values) => {
1210                let raw = buf.get_long()?;
1211                let x = u32::try_from(raw).map_err(|_| {
1212                    AvroError::ParseError(format!("uint32 value {raw} out of range for u32"))
1213                })?;
1214                values.push(x);
1215            }
1216            #[cfg(feature = "avro_custom_types")]
1217            Self::UInt64(values) => {
1218                let b = buf.get_fixed(8)?;
1219                values.push(u64::from_le_bytes([
1220                    b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
1221                ]));
1222            }
1223            #[cfg(feature = "avro_custom_types")]
1224            Self::Float16(values) => {
1225                let b = buf.get_fixed(2)?;
1226                values.push(u16::from_le_bytes([b[0], b[1]]));
1227            }
1228            #[cfg(feature = "avro_custom_types")]
1229            Self::Date64(values) | Self::TimeNanos(values) | Self::TimestampSecs(_, values) => {
1230                values.push(buf.get_long()?)
1231            }
1232            #[cfg(feature = "avro_custom_types")]
1233            Self::Time32Secs(values) => values.push(buf.get_int()?),
1234            #[cfg(feature = "avro_custom_types")]
1235            Self::IntervalYearMonth(values) => {
1236                let b = buf.get_fixed(4)?;
1237                values.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
1238            }
1239            #[cfg(feature = "avro_custom_types")]
1240            Self::IntervalMonthDayNano(values) => {
1241                let b = buf.get_fixed(16)?;
1242                let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1243                let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1244                let nanos =
1245                    i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
1246                values.push(IntervalMonthDayNano::new(months, days, nanos));
1247            }
1248            #[cfg(feature = "avro_custom_types")]
1249            Self::IntervalDayTime(values) => {
1250                let b = buf.get_fixed(8)?;
1251                // Read as two i32s: days (4 bytes) and milliseconds (4 bytes)
1252                let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1253                let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1254                values.push(IntervalDayTime::new(days, milliseconds));
1255            }
1256            Self::Float32(values) => values.push(buf.get_float()?),
1257            Self::Float64(values) => values.push(buf.get_double()?),
1258            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
1259            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
1260            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
1261            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
1262            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
1263            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
1264            Self::StringToBytes(offsets, values)
1265            | Self::BytesToString(offsets, values)
1266            | Self::Binary(offsets, values)
1267            | Self::String(offsets, values)
1268            | Self::StringView(offsets, values) => {
1269                let data = buf.get_bytes()?;
1270                offsets.push_length(data.len());
1271                values.extend_from_slice(data);
1272            }
1273            Self::Uuid(values) => {
1274                let s_bytes = buf.get_bytes()?;
1275                let s = std::str::from_utf8(s_bytes).map_err(|e| {
1276                    AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
1277                })?;
1278                let uuid = Uuid::try_parse(s)
1279                    .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
1280                values.extend_from_slice(uuid.as_bytes());
1281            }
1282            Self::Array(_, off, encoding) => {
1283                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
1284                off.push_length(total_items);
1285            }
1286            Self::Record(_, encodings, _, None) => {
1287                for encoding in encodings {
1288                    encoding.decode(buf)?;
1289                }
1290            }
1291            Self::Record(_, encodings, _, Some(proj)) => {
1292                proj.project_record(buf, encodings)?;
1293            }
1294            Self::Map(_, koff, moff, kdata, valdec) => {
1295                let newly_added = read_blocks(buf, |cur| {
1296                    let kb = cur.get_bytes()?;
1297                    koff.push_length(kb.len());
1298                    kdata.extend_from_slice(kb);
1299                    valdec.decode(cur)
1300                })?;
1301                moff.push_length(newly_added);
1302            }
1303            Self::Fixed(sz, accum) => {
1304                let fx = buf.get_fixed(*sz as usize)?;
1305                accum.extend_from_slice(fx);
1306            }
1307            #[cfg(feature = "small_decimals")]
1308            Self::Decimal32(_, _, size, builder) => {
1309                decode_decimal!(size, buf, builder, 4, i32);
1310            }
1311            #[cfg(feature = "small_decimals")]
1312            Self::Decimal64(_, _, size, builder) => {
1313                decode_decimal!(size, buf, builder, 8, i64);
1314            }
1315            Self::Decimal128(_, _, size, builder) => {
1316                decode_decimal!(size, buf, builder, 16, i128);
1317            }
1318            Self::Decimal256(_, _, size, builder) => {
1319                decode_decimal!(size, buf, builder, 32, i256);
1320            }
1321            Self::Enum(indices, _, None) => {
1322                indices.push(buf.get_int()?);
1323            }
1324            Self::Enum(indices, _, Some(res)) => {
1325                let raw = buf.get_int()?;
1326                let resolved = res.resolve(raw)?;
1327                indices.push(resolved);
1328            }
1329            Self::Duration(builder) => {
1330                let b = buf.get_fixed(12)?;
1331                let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1332                let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1333                let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1334                let nanos = (millis as i64) * 1_000_000;
1335                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
1336            }
1337            #[cfg(feature = "avro_custom_types")]
1338            Self::RunEndEncoded(_, len, inner) => {
1339                *len += 1;
1340                inner.decode(buf)?;
1341            }
1342            Self::Union(u) => u.decode(buf)?,
1343            Self::Nullable(plan, nb, encoding) => {
1344                match plan {
1345                    NullablePlan::FromSingle { resolution } => {
1346                        encoding.decode_with_resolution(buf, resolution)?;
1347                        nb.append(true);
1348                    }
1349                    NullablePlan::ReadTag {
1350                        nullability,
1351                        resolution,
1352                    } => {
1353                        let branch = buf.read_vlq()?;
1354                        let is_not_null = match *nullability {
1355                            Nullability::NullFirst => branch != 0,
1356                            Nullability::NullSecond => branch == 0,
1357                        };
1358                        if is_not_null {
1359                            // It is important to decode before appending to null buffer in case of decode error
1360                            encoding.decode_with_resolution(buf, resolution)?;
1361                        } else {
1362                            encoding.append_null()?;
1363                        }
1364                        nb.append(is_not_null);
1365                    }
1366                }
1367            }
1368        }
1369        Ok(())
1370    }
1371
1372    fn decode_with_promotion(
1373        &mut self,
1374        buf: &mut AvroCursor<'_>,
1375        promotion: Promotion,
1376    ) -> Result<(), AvroError> {
1377        #[cfg(feature = "avro_custom_types")]
1378        if let Self::RunEndEncoded(_, len, inner) = self {
1379            *len += 1;
1380            return inner.decode_with_promotion(buf, promotion);
1381        }
1382
1383        macro_rules! promote_numeric_to {
1384            ($variant:ident, $getter:ident, $to:ty) => {{
1385                match self {
1386                    Self::$variant(v) => {
1387                        let x = buf.$getter()?;
1388                        v.push(x as $to);
1389                        Ok(())
1390                    }
1391                    other => Err(AvroError::ParseError(format!(
1392                        "Promotion {promotion} target mismatch: expected {}, got {}",
1393                        stringify!($variant),
1394                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
1395                    ))),
1396                }
1397            }};
1398        }
1399        match promotion {
1400            Promotion::Direct => self.decode(buf),
1401            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1402            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1403            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1404            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1405            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1406            Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1407            Promotion::StringToBytes => match self {
1408                Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1409                    let data = buf.get_bytes()?;
1410                    offsets.push_length(data.len());
1411                    values.extend_from_slice(data);
1412                    Ok(())
1413                }
1414                other => Err(AvroError::ParseError(format!(
1415                    "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1416                    <Self as AsRef<str>>::as_ref(other)
1417                ))),
1418            },
1419            Promotion::BytesToString => match self {
1420                Self::String(offsets, values)
1421                | Self::StringView(offsets, values)
1422                | Self::BytesToString(offsets, values) => {
1423                    let data = buf.get_bytes()?;
1424                    offsets.push_length(data.len());
1425                    values.extend_from_slice(data);
1426                    Ok(())
1427                }
1428                other => Err(AvroError::ParseError(format!(
1429                    "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1430                    <Self as AsRef<str>>::as_ref(other)
1431                ))),
1432            },
1433        }
1434    }
1435
1436    fn decode_with_resolution<'d>(
1437        &'d mut self,
1438        buf: &mut AvroCursor<'_>,
1439        resolution: &'d ResolutionPlan,
1440    ) -> Result<(), AvroError> {
1441        #[cfg(feature = "avro_custom_types")]
1442        if let Self::RunEndEncoded(_, len, inner) = self {
1443            *len += 1;
1444            return inner.decode_with_resolution(buf, resolution);
1445        }
1446
1447        match resolution {
1448            ResolutionPlan::Promotion(promotion) => {
1449                let promotion = *promotion;
1450                self.decode_with_promotion(buf, promotion)
1451            }
1452            ResolutionPlan::DefaultValue(lit) => self.append_default(lit),
1453            ResolutionPlan::EnumMapping(res) => {
1454                let Self::Enum(indices, _, _) = self else {
1455                    return Err(AvroError::SchemaError(
1456                        "enum mapping resolution provided for non-enum decoder".into(),
1457                    ));
1458                };
1459                let raw = buf.get_int()?;
1460                let resolved = res.resolve(raw)?;
1461                indices.push(resolved);
1462                Ok(())
1463            }
1464            ResolutionPlan::Record(proj) => {
1465                let Self::Record(_, encodings, _, _) = self else {
1466                    return Err(AvroError::SchemaError(
1467                        "record projection provided for non-record decoder".into(),
1468                    ));
1469                };
1470                proj.project_record(buf, encodings)
1471            }
1472        }
1473    }
1474
1475    /// Flush decoded records to an [`ArrayRef`]
1476    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1477        Ok(match self {
1478            Self::Nullable(_, n, e) => e.flush(n.finish())?,
1479            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1480            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1481            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1482            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1483            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1484            Self::TimeMillis(values) => {
1485                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1486            }
1487            Self::TimeMicros(values) => {
1488                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1489            }
1490            Self::TimestampMillis(tz, values) => Arc::new(
1491                flush_primitive::<TimestampMillisecondType>(values, nulls)
1492                    .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1493            ),
1494            Self::TimestampMicros(tz, values) => Arc::new(
1495                flush_primitive::<TimestampMicrosecondType>(values, nulls)
1496                    .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1497            ),
1498            Self::TimestampNanos(tz, values) => Arc::new(
1499                flush_primitive::<TimestampNanosecondType>(values, nulls)
1500                    .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1501            ),
1502            #[cfg(feature = "avro_custom_types")]
1503            Self::DurationSecond(values) => {
1504                Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1505            }
1506            #[cfg(feature = "avro_custom_types")]
1507            Self::DurationMillisecond(values) => {
1508                Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1509            }
1510            #[cfg(feature = "avro_custom_types")]
1511            Self::DurationMicrosecond(values) => {
1512                Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1513            }
1514            #[cfg(feature = "avro_custom_types")]
1515            Self::DurationNanosecond(values) => {
1516                Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1517            }
1518            #[cfg(feature = "avro_custom_types")]
1519            Self::Int8(values) => Arc::new(flush_primitive::<Int8Type>(values, nulls)),
1520            #[cfg(feature = "avro_custom_types")]
1521            Self::Int16(values) => Arc::new(flush_primitive::<Int16Type>(values, nulls)),
1522            #[cfg(feature = "avro_custom_types")]
1523            Self::UInt8(values) => Arc::new(flush_primitive::<UInt8Type>(values, nulls)),
1524            #[cfg(feature = "avro_custom_types")]
1525            Self::UInt16(values) => Arc::new(flush_primitive::<UInt16Type>(values, nulls)),
1526            #[cfg(feature = "avro_custom_types")]
1527            Self::UInt32(values) => Arc::new(flush_primitive::<UInt32Type>(values, nulls)),
1528            #[cfg(feature = "avro_custom_types")]
1529            Self::UInt64(values) => Arc::new(flush_primitive::<UInt64Type>(values, nulls)),
1530            #[cfg(feature = "avro_custom_types")]
1531            Self::Float16(values) => {
1532                // Convert Vec<u16> to Float16Array by reinterpreting the raw bytes.
1533                // This is safe because f16 and u16 have the same size and alignment.
1534                let len = values.len();
1535                let buf: Buffer = std::mem::take(values).into();
1536                let scalar_buf = ScalarBuffer::new(buf, 0, len);
1537                Arc::new(Float16Array::new(scalar_buf, nulls))
1538            }
1539            #[cfg(feature = "avro_custom_types")]
1540            Self::Date64(values) => Arc::new(flush_primitive::<Date64Type>(values, nulls)),
1541            #[cfg(feature = "avro_custom_types")]
1542            Self::TimeNanos(values) => {
1543                Arc::new(flush_primitive::<Time64NanosecondType>(values, nulls))
1544            }
1545            #[cfg(feature = "avro_custom_types")]
1546            Self::Time32Secs(values) => {
1547                Arc::new(flush_primitive::<Time32SecondType>(values, nulls))
1548            }
1549            #[cfg(feature = "avro_custom_types")]
1550            Self::TimestampSecs(is_utc, values) => Arc::new(
1551                flush_primitive::<TimestampSecondType>(values, nulls)
1552                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1553            ),
1554            #[cfg(feature = "avro_custom_types")]
1555            Self::IntervalYearMonth(values) => {
1556                Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
1557            }
1558            #[cfg(feature = "avro_custom_types")]
1559            Self::IntervalMonthDayNano(values) => {
1560                Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1561            }
1562            #[cfg(feature = "avro_custom_types")]
1563            Self::IntervalDayTime(values) => {
1564                Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
1565            }
1566            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1567            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1568            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1569            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1570                Arc::new(flush_primitive::<Float32Type>(values, nulls))
1571            }
1572            Self::Int32ToFloat64(values)
1573            | Self::Int64ToFloat64(values)
1574            | Self::Float32ToFloat64(values) => {
1575                Arc::new(flush_primitive::<Float64Type>(values, nulls))
1576            }
1577            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1578                let offsets = flush_offsets(offsets);
1579                let values = flush_values(values).into();
1580                Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1581            }
1582            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1583                let offsets = flush_offsets(offsets);
1584                let values = flush_values(values).into();
1585                Arc::new(StringArray::try_new(offsets, values, nulls)?)
1586            }
1587            Self::StringView(offsets, values) => {
1588                let offsets = flush_offsets(offsets);
1589                let values = flush_values(values);
1590                let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1591                let values: Vec<&str> = (0..array.len())
1592                    .map(|i| {
1593                        if array.is_valid(i) {
1594                            array.value(i)
1595                        } else {
1596                            ""
1597                        }
1598                    })
1599                    .collect();
1600                Arc::new(StringViewArray::from(values))
1601            }
1602            Self::Array(field, offsets, values) => {
1603                let values = values.flush(None)?;
1604                let offsets = flush_offsets(offsets);
1605                Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1606            }
1607            Self::Record(fields, encodings, _, _) => {
1608                let arrays = encodings
1609                    .iter_mut()
1610                    .map(|x| x.flush(None))
1611                    .collect::<Result<Vec<_>, _>>()?;
1612                Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1613            }
1614            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1615                let moff = flush_offsets(m_off);
1616                let koff = flush_offsets(k_off);
1617                let kd = flush_values(kdata).into();
1618                let val_arr = valdec.flush(None)?;
1619                let key_arr = StringArray::try_new(koff, kd, None)?;
1620                if key_arr.len() != val_arr.len() {
1621                    return Err(AvroError::InvalidArgument(format!(
1622                        "Map keys length ({}) != map values length ({})",
1623                        key_arr.len(),
1624                        val_arr.len()
1625                    )));
1626                }
1627                let final_len = moff.len() - 1;
1628                if let Some(n) = &nulls {
1629                    if n.len() != final_len {
1630                        return Err(AvroError::InvalidArgument(format!(
1631                            "Map array null buffer length {} != final map length {final_len}",
1632                            n.len()
1633                        )));
1634                    }
1635                }
1636                let entries_fields = match map_field.data_type() {
1637                    DataType::Struct(fields) => fields.clone(),
1638                    other => {
1639                        return Err(AvroError::InvalidArgument(format!(
1640                            "Map entries field must be a Struct, got {other:?}"
1641                        )));
1642                    }
1643                };
1644                let entries_struct =
1645                    StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1646                let map_arr =
1647                    MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1648                Arc::new(map_arr)
1649            }
1650            Self::Fixed(sz, accum) => {
1651                let b: Buffer = flush_values(accum).into();
1652                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1653                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1654                Arc::new(arr)
1655            }
1656            Self::Uuid(values) => {
1657                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1658                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1659                Arc::new(arr)
1660            }
1661            #[cfg(feature = "small_decimals")]
1662            Self::Decimal32(precision, scale, _, builder) => {
1663                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1664            }
1665            #[cfg(feature = "small_decimals")]
1666            Self::Decimal64(precision, scale, _, builder) => {
1667                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1668            }
1669            Self::Decimal128(precision, scale, _, builder) => {
1670                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1671            }
1672            Self::Decimal256(precision, scale, _, builder) => {
1673                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1674            }
1675            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1676            Self::Duration(builder) => {
1677                let (_, vals, _) = builder.finish().into_parts();
1678                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1679                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1680                Arc::new(vals)
1681            }
1682            #[cfg(feature = "avro_custom_types")]
1683            Self::RunEndEncoded(width, len, inner) => {
1684                let values = inner.flush(nulls)?;
1685                let n = *len;
1686                let arr = values.as_ref();
1687                let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1688                if n > 0 {
1689                    run_starts.push(0);
1690                    for i in 1..n {
1691                        if !values_equal_at(arr, i - 1, i) {
1692                            run_starts.push(i);
1693                        }
1694                    }
1695                }
1696                if n > (u32::MAX as usize) {
1697                    return Err(AvroError::InvalidArgument(format!(
1698                        "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1699                    )));
1700                }
1701                let run_count = run_starts.len();
1702                let take_idx: PrimitiveArray<UInt32Type> =
1703                    run_starts.iter().map(|&s| s as u32).collect();
1704                let per_run_values = if run_count == 0 {
1705                    values.slice(0, 0)
1706                } else {
1707                    take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1708                        AvroError::ParseError(format!("take() for REE values failed: {e}"))
1709                    })?
1710                };
1711
1712                macro_rules! build_run_array {
1713                    ($Native:ty, $ArrowTy:ty) => {{
1714                        let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1715                        for (idx, &_start) in run_starts.iter().enumerate() {
1716                            let end = if idx + 1 < run_count {
1717                                run_starts[idx + 1]
1718                            } else {
1719                                n
1720                            };
1721                            ends.push(end as $Native);
1722                        }
1723                        let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1724                        let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1725                            .map_err(|e| AvroError::ParseError(e.to_string()))?;
1726                        Arc::new(run_arr) as ArrayRef
1727                    }};
1728                }
1729                match *width {
1730                    2 => {
1731                        if n > i16::MAX as usize {
1732                            return Err(AvroError::InvalidArgument(format!(
1733                                "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1734                            )));
1735                        }
1736                        build_run_array!(i16, Int16Type)
1737                    }
1738                    4 => build_run_array!(i32, Int32Type),
1739                    8 => build_run_array!(i64, Int64Type),
1740                    other => {
1741                        return Err(AvroError::InvalidArgument(format!(
1742                            "Unsupported run-end width {other} for RunEndEncoded"
1743                        )));
1744                    }
1745                }
1746            }
1747            Self::Union(u) => u.flush(nulls)?,
1748        })
1749    }
1750}
1751
1752/// Runtime plan for decoding reader-side `["null", T]` types.
1753#[derive(Debug)]
1754enum NullablePlan {
1755    /// Writer actually wrote a union (branch tag present).
1756    ReadTag {
1757        nullability: Nullability,
1758        resolution: ResolutionPlan,
1759    },
1760    /// Writer wrote a single (non-union) value resolved to the non-null branch
1761    /// of the reader union; do NOT read a branch tag, but apply any resolution.
1762    FromSingle { resolution: ResolutionPlan },
1763}
1764
1765/// Runtime plan for resolving writer-reader type differences.
1766#[derive(Debug)]
1767enum ResolutionPlan {
1768    /// Indicates that the writer's type should be promoted to the reader's type.
1769    Promotion(Promotion),
1770    /// Provides a default value for the field missing in the writer type.
1771    DefaultValue(AvroLiteral),
1772    /// Provides mapping information for resolving enums.
1773    EnumMapping(EnumResolution),
1774    /// Provides projection information for record fields.
1775    Record(Projector),
1776}
1777
1778impl ResolutionPlan {
1779    fn try_new(decoder: &Decoder, resolution: &ResolutionInfo) -> Result<Self, AvroError> {
1780        match (decoder, resolution) {
1781            (_, ResolutionInfo::Promotion(p)) => Ok(ResolutionPlan::Promotion(*p)),
1782            (_, ResolutionInfo::DefaultValue(lit)) => Ok(ResolutionPlan::DefaultValue(lit.clone())),
1783            (_, ResolutionInfo::EnumMapping(m)) => {
1784                Ok(ResolutionPlan::EnumMapping(EnumResolution::new(m)))
1785            }
1786            (Decoder::Record(_, _, field_defaults, _), ResolutionInfo::Record(r)) => Ok(
1787                ResolutionPlan::Record(ProjectorBuilder::try_new(r, field_defaults).build()?),
1788            ),
1789            (_, ResolutionInfo::Record(_)) => Err(AvroError::SchemaError(
1790                "record resolution on non-record decoder".into(),
1791            )),
1792            (_, ResolutionInfo::Union(_)) => Err(AvroError::SchemaError(
1793                "union variant cannot be resolved to a union type".into(),
1794            )),
1795        }
1796    }
1797}
1798
1799#[derive(Debug)]
1800struct EnumResolution {
1801    mapping: Arc<[i32]>,
1802    default_index: i32,
1803}
1804
1805impl EnumResolution {
1806    fn new(mapping: &EnumMapping) -> Self {
1807        EnumResolution {
1808            mapping: mapping.mapping.clone(),
1809            default_index: mapping.default_index,
1810        }
1811    }
1812
1813    fn resolve(&self, index: i32) -> Result<i32, AvroError> {
1814        let resolved = usize::try_from(index)
1815            .ok()
1816            .and_then(|idx| self.mapping.get(idx).copied())
1817            .filter(|&idx| idx >= 0)
1818            .unwrap_or(self.default_index);
1819        if resolved >= 0 {
1820            Ok(resolved)
1821        } else {
1822            Err(AvroError::ParseError(format!(
1823                "Enum symbol index {index} not resolvable and no default provided",
1824            )))
1825        }
1826    }
1827}
1828
1829// A lookup table for resolving fields between writer and reader schemas during record projection.
1830#[derive(Debug)]
1831struct DispatchLookupTable {
1832    // Maps each reader field index `r` to the corresponding writer field index.
1833    //
1834    // Semantics:
1835    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. The value from
1836    //   the writer field is decoded, and `promotion[r]` is applied.
1837    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. The reader field's
1838    //   default value is used.
1839    //
1840    // Representation (`i8`):
1841    // `i8` is used for a dense, cache-friendly dispatch table, consistent with Arrow's use of
1842    // `i8` for union type IDs. This requires that writer field indices do not exceed `i8::MAX`.
1843    //
1844    // Invariants:
1845    // - `to_reader.len() == promotion.len()` and matches the reader field count.
1846    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
1847    to_reader: Box<[i8]>,
1848    // For each reader field `r`, specifies the resolution to apply to the writer's value.
1849    //
1850    // This is used when a writer field's type can be promoted to a reader field's type
1851    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
1852    resolution: Box<[ResolutionPlan]>,
1853}
1854
1855// Sentinel used in `DispatchLookupTable::to_reader` to mark
1856// "no matching writer field".
1857const NO_SOURCE: i8 = -1;
1858
1859impl DispatchLookupTable {
1860    fn from_writer_to_reader(
1861        reader_branches: &[Decoder],
1862        resolution_map: &[Option<(usize, ResolutionInfo)>],
1863    ) -> Result<Self, AvroError> {
1864        let mut to_reader = Vec::with_capacity(resolution_map.len());
1865        let mut resolution = Vec::with_capacity(resolution_map.len());
1866        for map in resolution_map {
1867            match map {
1868                Some((idx, res)) => {
1869                    let idx = *idx;
1870                    let idx_i8 = i8::try_from(idx).map_err(|_| {
1871                        AvroError::SchemaError(format!(
1872                            "Reader branch index {idx} exceeds i8 range (max {})",
1873                            i8::MAX
1874                        ))
1875                    })?;
1876                    let plan = ResolutionPlan::try_new(&reader_branches[idx], res)?;
1877                    to_reader.push(idx_i8);
1878                    resolution.push(plan);
1879                }
1880                None => {
1881                    to_reader.push(NO_SOURCE);
1882                    resolution.push(ResolutionPlan::DefaultValue(AvroLiteral::Null));
1883                }
1884            }
1885        }
1886        Ok(Self {
1887            to_reader: to_reader.into_boxed_slice(),
1888            resolution: resolution.into_boxed_slice(),
1889        })
1890    }
1891
1892    // Resolve a writer branch index to (reader_idx, resolution)
1893    #[inline]
1894    fn resolve(&self, writer_index: usize) -> Option<(usize, &ResolutionPlan)> {
1895        let reader_index = *self.to_reader.get(writer_index)?;
1896        (reader_index >= 0).then(|| (reader_index as usize, &self.resolution[writer_index]))
1897    }
1898}
1899
1900#[derive(Debug)]
1901struct UnionDecoder {
1902    fields: UnionFields,
1903    branches: UnionDecoderBranches,
1904    default_emit_idx: usize,
1905    null_emit_idx: usize,
1906    plan: UnionReadPlan,
1907}
1908
1909#[derive(Debug, Default)]
1910struct UnionDecoderBranches {
1911    decoders: Vec<Decoder>,
1912    reader_type_codes: Vec<i8>,
1913    type_ids: Vec<i8>,
1914    offsets: Vec<i32>,
1915    counts: Vec<i32>,
1916}
1917
1918impl UnionDecoderBranches {
1919    fn new(decoders: Vec<Decoder>, reader_type_codes: Vec<i8>) -> Self {
1920        let branch_len = decoders.len().max(reader_type_codes.len());
1921        Self {
1922            decoders,
1923            reader_type_codes,
1924            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1925            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1926            counts: vec![0; branch_len],
1927        }
1928    }
1929
1930    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1931        let branches_len = self.decoders.len();
1932        let Some(reader_branch) = self.decoders.get_mut(reader_idx) else {
1933            return Err(AvroError::ParseError(format!(
1934                "Union branch index {reader_idx} out of range ({branches_len} branches)"
1935            )));
1936        };
1937        self.type_ids.push(self.reader_type_codes[reader_idx]);
1938        self.offsets.push(self.counts[reader_idx]);
1939        self.counts[reader_idx] += 1;
1940        Ok(reader_branch)
1941    }
1942}
1943
1944impl Default for UnionDecoder {
1945    fn default() -> Self {
1946        Self {
1947            fields: UnionFields::empty(),
1948            branches: Default::default(),
1949            default_emit_idx: 0,
1950            null_emit_idx: 0,
1951            plan: UnionReadPlan::Passthrough,
1952        }
1953    }
1954}
1955
1956#[derive(Debug)]
1957enum UnionReadPlan {
1958    ReaderUnion {
1959        lookup_table: DispatchLookupTable,
1960    },
1961    FromSingle {
1962        reader_idx: usize,
1963        resolution: ResolutionPlan,
1964    },
1965    ToSingle {
1966        target: Box<Decoder>,
1967        lookup_table: DispatchLookupTable,
1968    },
1969    Passthrough,
1970}
1971
1972impl UnionReadPlan {
1973    fn from_resolved(
1974        reader_branches: &[Decoder],
1975        resolved: Option<ResolvedUnion>,
1976    ) -> Result<Self, AvroError> {
1977        let Some(info) = resolved else {
1978            return Ok(Self::Passthrough);
1979        };
1980        match (info.writer_is_union, info.reader_is_union) {
1981            (true, true) => {
1982                let lookup_table =
1983                    DispatchLookupTable::from_writer_to_reader(reader_branches, &info.writer_to_reader)?;
1984                Ok(Self::ReaderUnion { lookup_table })
1985            }
1986            (false, true) => {
1987                let Some((idx, resolution)) =
1988                    info.writer_to_reader.first().and_then(Option::as_ref)
1989                else {
1990                    return Err(AvroError::SchemaError(
1991                        "Writer type does not match any reader union branch".to_string(),
1992                    ));
1993                };
1994                let reader_idx = *idx;
1995                Ok(Self::FromSingle {
1996                    reader_idx,
1997                    resolution: ResolutionPlan::try_new(&reader_branches[reader_idx], resolution)?,
1998                })
1999            }
2000            (true, false) => Err(AvroError::InvalidArgument(
2001                "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
2002                    .to_string(),
2003            )),
2004            // (false, false) is invalid and should never be constructed by the resolver.
2005            _ => Err(AvroError::SchemaError(
2006                "ResolvedUnion constructed for non-union sides; resolver should return None"
2007                    .to_string(),
2008            )),
2009        }
2010    }
2011}
2012
2013impl UnionDecoder {
2014    fn try_new(
2015        fields: UnionFields,
2016        branches: Vec<Decoder>,
2017        resolved: Option<ResolvedUnion>,
2018    ) -> Result<Self, AvroError> {
2019        let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
2020        let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
2021        let default_emit_idx = 0;
2022        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
2023        // Guard against impractically large unions that cannot be indexed by an Avro int
2024        let max_addr = (i32::MAX as usize) + 1;
2025        if branches.len() > max_addr {
2026            return Err(AvroError::SchemaError(format!(
2027                "Reader union has {} branches, which exceeds the maximum addressable \
2028                 branches by an Avro int tag ({} + 1).",
2029                branches.len(),
2030                i32::MAX
2031            )));
2032        }
2033        let plan = UnionReadPlan::from_resolved(&branches, resolved)?;
2034        Ok(Self {
2035            fields,
2036            branches: UnionDecoderBranches::new(branches, reader_type_codes),
2037            default_emit_idx,
2038            null_emit_idx,
2039            plan,
2040        })
2041    }
2042
2043    fn with_single_target(target: Decoder, info: ResolvedUnion) -> Result<Self, AvroError> {
2044        // This constructor is only for writer-union to single-type resolution
2045        debug_assert!(info.writer_is_union && !info.reader_is_union);
2046        let mut reader_branches = [target];
2047        let lookup_table =
2048            DispatchLookupTable::from_writer_to_reader(&reader_branches, &info.writer_to_reader)?;
2049        let target = Box::new(mem::replace(&mut reader_branches[0], Decoder::Null(0)));
2050        Ok(Self {
2051            plan: UnionReadPlan::ToSingle {
2052                target,
2053                lookup_table,
2054            },
2055            ..Self::default()
2056        })
2057    }
2058
2059    #[inline]
2060    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
2061        // Avro unions are encoded by first writing the zero-based branch index.
2062        // In Avro 1.11.1 this is specified as an *int*; older specs said *long*,
2063        // but both use zig-zag varint encoding, so decoding as long is compatible
2064        // with either form and widely used in practice.
2065        let raw = buf.get_long()?;
2066        if raw < 0 {
2067            return Err(AvroError::ParseError(format!(
2068                "Negative union branch index {raw}"
2069            )));
2070        }
2071        usize::try_from(raw).map_err(|_| {
2072            AvroError::ParseError(format!(
2073                "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2074                (usize::BITS as usize)
2075            ))
2076        })
2077    }
2078
2079    #[inline]
2080    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
2081    where
2082        F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
2083    {
2084        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2085            return action(target);
2086        }
2087        let reader_idx = match &self.plan {
2088            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
2089            _ => fallback_idx,
2090        };
2091        self.branches.emit_to(reader_idx).and_then(action)
2092    }
2093
2094    fn append_null(&mut self) -> Result<(), AvroError> {
2095        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
2096    }
2097
2098    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
2099        self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
2100    }
2101
2102    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2103        match &mut self.plan {
2104            UnionReadPlan::Passthrough => {
2105                let reader_idx = Self::read_tag(buf)?;
2106                let decoder = self.branches.emit_to(reader_idx)?;
2107                decoder.decode(buf)
2108            }
2109            UnionReadPlan::ReaderUnion { lookup_table } => {
2110                let idx = Self::read_tag(buf)?;
2111                let Some((reader_idx, resolution)) = lookup_table.resolve(idx) else {
2112                    return Err(AvroError::ParseError(format!(
2113                        "Union branch index {idx} not resolvable by reader schema"
2114                    )));
2115                };
2116                let decoder = self.branches.emit_to(reader_idx)?;
2117                decoder.decode_with_resolution(buf, resolution)
2118            }
2119            UnionReadPlan::FromSingle {
2120                reader_idx,
2121                resolution,
2122            } => {
2123                let decoder = self.branches.emit_to(*reader_idx)?;
2124                decoder.decode_with_resolution(buf, resolution)
2125            }
2126            UnionReadPlan::ToSingle {
2127                target,
2128                lookup_table,
2129            } => {
2130                let idx = Self::read_tag(buf)?;
2131                let Some((_, resolution)) = lookup_table.resolve(idx) else {
2132                    return Err(AvroError::ParseError(format!(
2133                        "Writer union branch index {idx} not resolvable by reader schema"
2134                    )));
2135                };
2136                target.decode_with_resolution(buf, resolution)
2137            }
2138        }
2139    }
2140
2141    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
2142        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2143            return target.flush(nulls);
2144        }
2145        debug_assert!(
2146            nulls.is_none(),
2147            "UnionArray does not accept a validity bitmap; \
2148                     nulls should have been materialized as a Null child during decode"
2149        );
2150        let children = self
2151            .branches
2152            .decoders
2153            .iter_mut()
2154            .map(|d| d.flush(None))
2155            .collect::<Result<Vec<_>, _>>()?;
2156        let arr = UnionArray::try_new(
2157            self.fields.clone(),
2158            flush_values(&mut self.branches.type_ids)
2159                .into_iter()
2160                .collect(),
2161            Some(
2162                flush_values(&mut self.branches.offsets)
2163                    .into_iter()
2164                    .collect(),
2165            ),
2166            children,
2167        )
2168        .map_err(|e| AvroError::ParseError(e.to_string()))?;
2169        Ok(Arc::new(arr))
2170    }
2171}
2172
2173#[derive(Debug, Default)]
2174struct UnionDecoderBuilder {
2175    fields: Option<UnionFields>,
2176    branches: Option<Vec<Decoder>>,
2177    resolved: Option<ResolvedUnion>,
2178    target: Option<Decoder>,
2179}
2180
2181impl UnionDecoderBuilder {
2182    fn new() -> Self {
2183        Self::default()
2184    }
2185
2186    fn with_fields(mut self, fields: UnionFields) -> Self {
2187        self.fields = Some(fields);
2188        self
2189    }
2190
2191    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
2192        self.branches = Some(branches);
2193        self
2194    }
2195
2196    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
2197        self.resolved = Some(resolved_union);
2198        self
2199    }
2200
2201    fn with_target(mut self, target: Decoder) -> Self {
2202        self.target = Some(target);
2203        self
2204    }
2205
2206    fn build(self) -> Result<UnionDecoder, AvroError> {
2207        match (self.resolved, self.fields, self.branches, self.target) {
2208            (resolved, Some(fields), Some(branches), None) => {
2209                UnionDecoder::try_new(fields, branches, resolved)
2210            }
2211            (Some(info), None, None, Some(target))
2212                if info.writer_is_union && !info.reader_is_union =>
2213            {
2214                UnionDecoder::with_single_target(target, info)
2215            }
2216            _ => Err(AvroError::InvalidArgument(
2217                "Invalid UnionDecoderBuilder configuration: expected either \
2218                 (fields + branches + resolved) with no target for reader-unions, or \
2219                 (resolved + target) with no fields/branches for writer-union to single."
2220                    .to_string(),
2221            )),
2222        }
2223    }
2224}
2225
2226#[derive(Debug, Copy, Clone)]
2227enum NegativeBlockBehavior {
2228    ProcessItems,
2229    SkipBySize,
2230}
2231
2232#[inline]
2233fn skip_blocks(
2234    buf: &mut AvroCursor,
2235    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2236) -> Result<usize, AvroError> {
2237    process_blockwise(
2238        buf,
2239        move |c| skip_item(c),
2240        NegativeBlockBehavior::SkipBySize,
2241    )
2242}
2243
2244#[inline]
2245fn flush_dict(
2246    indices: &mut Vec<i32>,
2247    symbols: &[String],
2248    nulls: Option<NullBuffer>,
2249) -> Result<ArrayRef, AvroError> {
2250    let keys = flush_primitive::<Int32Type>(indices, nulls);
2251    let values = Arc::new(StringArray::from_iter_values(
2252        symbols.iter().map(|s| s.as_str()),
2253    ));
2254    DictionaryArray::try_new(keys, values)
2255        .map_err(Into::into)
2256        .map(|arr| Arc::new(arr) as ArrayRef)
2257}
2258
2259#[inline]
2260fn read_blocks(
2261    buf: &mut AvroCursor,
2262    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2263) -> Result<usize, AvroError> {
2264    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
2265}
2266
2267#[inline]
2268fn process_blockwise(
2269    buf: &mut AvroCursor,
2270    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2271    negative_behavior: NegativeBlockBehavior,
2272) -> Result<usize, AvroError> {
2273    let mut total = 0usize;
2274    loop {
2275        // Read the block count
2276        //  positive = that many items
2277        //  negative = that many items + read block size
2278        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
2279        let block_count = buf.get_long()?;
2280        match block_count.cmp(&0) {
2281            Ordering::Equal => break,
2282            Ordering::Less => {
2283                let count = (-block_count) as usize;
2284                // A negative count is followed by a long of the size in bytes
2285                let size_in_bytes = buf.get_long()? as usize;
2286                match negative_behavior {
2287                    NegativeBlockBehavior::ProcessItems => {
2288                        // Process items one-by-one after reading size
2289                        for _ in 0..count {
2290                            on_item(buf)?;
2291                        }
2292                    }
2293                    NegativeBlockBehavior::SkipBySize => {
2294                        // Skip the entire block payload at once
2295                        let _ = buf.get_fixed(size_in_bytes)?;
2296                    }
2297                }
2298                total += count;
2299            }
2300            Ordering::Greater => {
2301                let count = block_count as usize;
2302                for _ in 0..count {
2303                    on_item(buf)?;
2304                }
2305                total += count;
2306            }
2307        }
2308    }
2309    Ok(total)
2310}
2311
2312#[inline]
2313fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
2314    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
2315}
2316
2317#[inline]
2318fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
2319    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
2320}
2321
2322#[inline]
2323fn flush_primitive<T: ArrowPrimitiveType>(
2324    values: &mut Vec<T::Native>,
2325    nulls: Option<NullBuffer>,
2326) -> PrimitiveArray<T> {
2327    PrimitiveArray::new(flush_values(values).into(), nulls)
2328}
2329
2330#[inline]
2331fn read_decimal_bytes_be<const N: usize>(
2332    buf: &mut AvroCursor<'_>,
2333    size: &Option<usize>,
2334) -> Result<[u8; N], AvroError> {
2335    match size {
2336        Some(n) if *n == N => {
2337            let raw = buf.get_fixed(N)?;
2338            let mut arr = [0u8; N];
2339            arr.copy_from_slice(raw);
2340            Ok(arr)
2341        }
2342        Some(n) => {
2343            let raw = buf.get_fixed(*n)?;
2344            sign_cast_to::<N>(raw)
2345        }
2346        None => {
2347            let raw = buf.get_bytes()?;
2348            sign_cast_to::<N>(raw)
2349        }
2350    }
2351}
2352
2353/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
2354/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
2355/// the payload is a big-endian two's-complement integer, and when narrowing it must
2356/// be representable without changing sign or value.
2357///
2358/// If `raw.len() < N`, the value is sign-extended.
2359/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
2360/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
2361#[inline]
2362fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
2363    let len = raw.len();
2364    // Fast path: exact width, just copy
2365    if len == N {
2366        let mut out = [0u8; N];
2367        out.copy_from_slice(raw);
2368        return Ok(out);
2369    }
2370    // Determine sign byte from MSB of first byte (empty => positive)
2371    let first = raw.first().copied().unwrap_or(0u8);
2372    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
2373    // Pre-fill with sign byte to support sign extension
2374    let mut out = [sign_byte; N];
2375    if len > N {
2376        // Validate truncation: all dropped leading bytes must equal sign_byte,
2377        // and the MSB of the first kept byte must match the sign.
2378        let extra = len - N;
2379        // Any non-sign byte in the truncated prefix indicates overflow
2380        if raw[..extra].iter().any(|&b| b != sign_byte) {
2381            return Err(AvroError::ParseError(format!(
2382                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2383                len, N
2384            )));
2385        }
2386        if N > 0 {
2387            let first_kept = raw[extra];
2388            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
2389            if sign_bit_mismatch {
2390                return Err(AvroError::ParseError(format!(
2391                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2392                    len, N
2393                )));
2394            }
2395        }
2396        out.copy_from_slice(&raw[extra..]);
2397        return Ok(out);
2398    }
2399    out[N - len..].copy_from_slice(raw);
2400    Ok(out)
2401}
2402
2403#[cfg(feature = "avro_custom_types")]
2404#[inline]
2405fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
2406    match (arr.is_null(i), arr.is_null(j)) {
2407        (true, true) => true,
2408        (true, false) | (false, true) => false,
2409        (false, false) => {
2410            let a = arr.slice(i, 1);
2411            let b = arr.slice(j, 1);
2412            a == b
2413        }
2414    }
2415}
2416
2417#[derive(Debug)]
2418struct Projector {
2419    writer_projections: Vec<FieldProjection>,
2420    default_injections: Arc<[(usize, AvroLiteral)]>,
2421}
2422
2423#[derive(Debug)]
2424enum FieldProjection {
2425    ToReader(usize),
2426    Skip(Skipper),
2427}
2428
2429#[derive(Debug)]
2430struct ProjectorBuilder<'a> {
2431    rec: &'a ResolvedRecord,
2432    field_defaults: &'a [Option<AvroLiteral>],
2433}
2434
2435impl<'a> ProjectorBuilder<'a> {
2436    #[inline]
2437    fn try_new(rec: &'a ResolvedRecord, field_defaults: &'a [Option<AvroLiteral>]) -> Self {
2438        Self {
2439            rec,
2440            field_defaults,
2441        }
2442    }
2443
2444    #[inline]
2445    fn build(self) -> Result<Projector, AvroError> {
2446        let mut default_injections: Vec<(usize, AvroLiteral)> =
2447            Vec::with_capacity(self.rec.default_fields.len());
2448        for &idx in self.rec.default_fields.as_ref() {
2449            let lit = self
2450                .field_defaults
2451                .get(idx)
2452                .and_then(|lit| lit.clone())
2453                .unwrap_or(AvroLiteral::Null);
2454            default_injections.push((idx, lit));
2455        }
2456        let writer_projections = self
2457            .rec
2458            .writer_fields
2459            .iter()
2460            .map(|field| match field {
2461                ResolvedField::ToReader(index) => Ok(FieldProjection::ToReader(*index)),
2462                ResolvedField::Skip(datatype) => {
2463                    let skipper = Skipper::from_avro(datatype)?;
2464                    Ok(FieldProjection::Skip(skipper))
2465                }
2466            })
2467            .collect::<Result<_, AvroError>>()?;
2468        Ok(Projector {
2469            writer_projections,
2470            default_injections: default_injections.into(),
2471        })
2472    }
2473}
2474
2475impl Projector {
2476    #[inline]
2477    fn project_record(
2478        &self,
2479        buf: &mut AvroCursor<'_>,
2480        encodings: &mut [Decoder],
2481    ) -> Result<(), AvroError> {
2482        for field_proj in self.writer_projections.iter() {
2483            match field_proj {
2484                FieldProjection::ToReader(index) => encodings[*index].decode(buf)?,
2485                FieldProjection::Skip(skipper) => skipper.skip(buf)?,
2486            }
2487        }
2488        for (reader_index, lit) in self.default_injections.as_ref() {
2489            encodings[*reader_index].append_default(lit)?;
2490        }
2491        Ok(())
2492    }
2493}
2494
2495/// Lightweight skipper for non‑projected writer fields
2496/// (fields present in the writer schema but omitted by the reader/projection);
2497/// per Avro 1.11.1 schema resolution these fields are ignored.
2498///
2499/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
2500#[derive(Debug)]
2501enum Skipper {
2502    Null,
2503    Boolean,
2504    Int32,
2505    Int64,
2506    Float32,
2507    Float64,
2508    Bytes,
2509    String,
2510    TimeMicros,
2511    TimestampMillis,
2512    TimestampMicros,
2513    TimestampNanos,
2514    Fixed(usize),
2515    Decimal(Option<usize>),
2516    UuidString,
2517    Enum,
2518    DurationFixed12,
2519    List(Box<Skipper>),
2520    Map(Box<Skipper>),
2521    Struct(Vec<Skipper>),
2522    Union(Vec<Skipper>),
2523    Nullable(Nullability, Box<Skipper>),
2524    #[cfg(feature = "avro_custom_types")]
2525    RunEndEncoded(Box<Skipper>),
2526}
2527
2528impl Skipper {
2529    fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
2530        let mut base = match dt.codec() {
2531            Codec::Null => Self::Null,
2532            Codec::Boolean => Self::Boolean,
2533            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
2534            Codec::Int64 => Self::Int64,
2535            Codec::TimeMicros => Self::TimeMicros,
2536            Codec::TimestampMillis(_) => Self::TimestampMillis,
2537            Codec::TimestampMicros(_) => Self::TimestampMicros,
2538            Codec::TimestampNanos(_) => Self::TimestampNanos,
2539            #[cfg(feature = "avro_custom_types")]
2540            Codec::DurationNanos
2541            | Codec::DurationMicros
2542            | Codec::DurationMillis
2543            | Codec::DurationSeconds => Self::Int64,
2544            #[cfg(feature = "avro_custom_types")]
2545            Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 | Codec::Time32Secs => {
2546                Self::Int32
2547            }
2548            #[cfg(feature = "avro_custom_types")]
2549            Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
2550                Self::Int64
2551            }
2552            #[cfg(feature = "avro_custom_types")]
2553            Codec::UInt64 => Self::Fixed(8),
2554            #[cfg(feature = "avro_custom_types")]
2555            Codec::Float16 => Self::Fixed(2),
2556            #[cfg(feature = "avro_custom_types")]
2557            Codec::IntervalYearMonth => Self::Fixed(4),
2558            #[cfg(feature = "avro_custom_types")]
2559            Codec::IntervalMonthDayNano => Self::Fixed(16),
2560            #[cfg(feature = "avro_custom_types")]
2561            Codec::IntervalDayTime => Self::Fixed(8),
2562            Codec::Float32 => Self::Float32,
2563            Codec::Float64 => Self::Float64,
2564            Codec::Binary => Self::Bytes,
2565            Codec::Utf8 | Codec::Utf8View => Self::String,
2566            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2567            Codec::Decimal(_, _, size) => Self::Decimal(*size),
2568            Codec::Uuid => Self::UuidString, // encoded as string
2569            Codec::Enum(_) => Self::Enum,
2570            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2571            Codec::Struct(fields) => Self::Struct(
2572                fields
2573                    .iter()
2574                    .map(|f| Skipper::from_avro(f.data_type()))
2575                    .collect::<Result<_, _>>()?,
2576            ),
2577            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2578            Codec::Interval => Self::DurationFixed12,
2579            Codec::Union(encodings, _, _) => {
2580                let max_addr = (i32::MAX as usize) + 1;
2581                if encodings.len() > max_addr {
2582                    return Err(AvroError::SchemaError(format!(
2583                        "Writer union has {} branches, which exceeds the maximum addressable \
2584                         branches by an Avro int tag ({} + 1).",
2585                        encodings.len(),
2586                        i32::MAX
2587                    )));
2588                }
2589                Self::Union(
2590                    encodings
2591                        .iter()
2592                        .map(Skipper::from_avro)
2593                        .collect::<Result<_, _>>()?,
2594                )
2595            }
2596            #[cfg(feature = "avro_custom_types")]
2597            Codec::RunEndEncoded(inner, _w) => {
2598                Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2599            }
2600        };
2601        if let Some(n) = dt.nullability() {
2602            base = Self::Nullable(n, Box::new(base));
2603        }
2604        Ok(base)
2605    }
2606
2607    fn skip(&self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2608        match self {
2609            Self::Null => Ok(()),
2610            Self::Boolean => {
2611                buf.get_bool()?;
2612                Ok(())
2613            }
2614            Self::Int32 => {
2615                buf.skip_int()?;
2616                Ok(())
2617            }
2618            Self::Int64
2619            | Self::TimeMicros
2620            | Self::TimestampMillis
2621            | Self::TimestampMicros
2622            | Self::TimestampNanos => {
2623                buf.skip_long()?;
2624                Ok(())
2625            }
2626            Self::Float32 => {
2627                buf.get_float()?;
2628                Ok(())
2629            }
2630            Self::Float64 => {
2631                buf.get_double()?;
2632                Ok(())
2633            }
2634            Self::Bytes | Self::String | Self::UuidString => {
2635                buf.get_bytes()?;
2636                Ok(())
2637            }
2638            Self::Fixed(sz) => {
2639                buf.get_fixed(*sz)?;
2640                Ok(())
2641            }
2642            Self::Decimal(size) => {
2643                if let Some(s) = size {
2644                    buf.get_fixed(*s)
2645                } else {
2646                    buf.get_bytes()
2647                }?;
2648                Ok(())
2649            }
2650            Self::Enum => {
2651                buf.skip_int()?;
2652                Ok(())
2653            }
2654            Self::DurationFixed12 => {
2655                buf.get_fixed(12)?;
2656                Ok(())
2657            }
2658            Self::List(item) => {
2659                skip_blocks(buf, |c| item.skip(c))?;
2660                Ok(())
2661            }
2662            Self::Map(value) => {
2663                skip_blocks(buf, |c| {
2664                    c.get_bytes()?; // key
2665                    value.skip(c)
2666                })?;
2667                Ok(())
2668            }
2669            Self::Struct(fields) => {
2670                for f in fields.iter() {
2671                    f.skip(buf)?
2672                }
2673                Ok(())
2674            }
2675            Self::Union(encodings) => {
2676                // Union tag must be ZigZag-decoded
2677                let raw = buf.get_long()?;
2678                if raw < 0 {
2679                    return Err(AvroError::ParseError(format!(
2680                        "Negative union branch index {raw}"
2681                    )));
2682                }
2683                let idx: usize = usize::try_from(raw).map_err(|_| {
2684                    AvroError::ParseError(format!(
2685                        "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2686                        (usize::BITS as usize)
2687                    ))
2688                })?;
2689                let Some(encoding) = encodings.get(idx) else {
2690                    return Err(AvroError::ParseError(format!(
2691                        "Union branch index {idx} out of range for skipper ({} branches)",
2692                        encodings.len()
2693                    )));
2694                };
2695                encoding.skip(buf)
2696            }
2697            Self::Nullable(order, inner) => {
2698                let branch = buf.read_vlq()?;
2699                let is_not_null = match *order {
2700                    Nullability::NullFirst => branch != 0,
2701                    Nullability::NullSecond => branch == 0,
2702                };
2703                if is_not_null {
2704                    inner.skip(buf)?;
2705                }
2706                Ok(())
2707            }
2708            #[cfg(feature = "avro_custom_types")]
2709            Self::RunEndEncoded(inner) => inner.skip(buf),
2710        }
2711    }
2712}
2713
2714#[cfg(test)]
2715mod tests {
2716    use super::*;
2717    use crate::codec::AvroFieldBuilder;
2718    use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2719    use arrow_array::cast::AsArray;
2720    use indexmap::IndexMap;
2721    use std::collections::HashMap;
2722
2723    fn encode_avro_int(value: i32) -> Vec<u8> {
2724        let mut buf = Vec::new();
2725        let mut v = (value << 1) ^ (value >> 31);
2726        while v & !0x7F != 0 {
2727            buf.push(((v & 0x7F) | 0x80) as u8);
2728            v >>= 7;
2729        }
2730        buf.push(v as u8);
2731        buf
2732    }
2733
2734    fn encode_avro_long(value: i64) -> Vec<u8> {
2735        let mut buf = Vec::new();
2736        let mut v = (value << 1) ^ (value >> 63);
2737        while v & !0x7F != 0 {
2738            buf.push(((v & 0x7F) | 0x80) as u8);
2739            v >>= 7;
2740        }
2741        buf.push(v as u8);
2742        buf
2743    }
2744
2745    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2746        let mut buf = encode_avro_long(bytes.len() as i64);
2747        buf.extend_from_slice(bytes);
2748        buf
2749    }
2750
2751    fn avro_from_codec(codec: Codec) -> AvroDataType {
2752        AvroDataType::new(codec, Default::default(), None)
2753    }
2754
2755    fn resolved_root_datatype(
2756        writer: Schema<'static>,
2757        reader: Schema<'static>,
2758        use_utf8view: bool,
2759        strict_mode: bool,
2760    ) -> AvroDataType {
2761        // Wrap writer schema in a single-field record
2762        let writer_record = Schema::Complex(ComplexType::Record(Record {
2763            name: "Root",
2764            namespace: None,
2765            doc: None,
2766            aliases: vec![],
2767            fields: vec![Field {
2768                name: "v",
2769                r#type: writer,
2770                default: None,
2771                doc: None,
2772                aliases: vec![],
2773            }],
2774            attributes: Attributes::default(),
2775        }));
2776
2777        // Wrap reader schema in a single-field record
2778        let reader_record = Schema::Complex(ComplexType::Record(Record {
2779            name: "Root",
2780            namespace: None,
2781            doc: None,
2782            aliases: vec![],
2783            fields: vec![Field {
2784                name: "v",
2785                r#type: reader,
2786                default: None,
2787                doc: None,
2788                aliases: vec![],
2789            }],
2790            attributes: Attributes::default(),
2791        }));
2792
2793        // Build resolved record, then extract the inner field's resolved AvroDataType
2794        let field = AvroFieldBuilder::new(&writer_record)
2795            .with_reader_schema(&reader_record)
2796            .with_utf8view(use_utf8view)
2797            .with_strict_mode(strict_mode)
2798            .build()
2799            .expect("schema resolution should succeed");
2800
2801        match field.data_type().codec() {
2802            Codec::Struct(fields) => fields[0].data_type().clone(),
2803            other => panic!("expected wrapper struct, got {other:?}"),
2804        }
2805    }
2806
2807    fn decoder_for_promotion(
2808        writer: PrimitiveType,
2809        reader: PrimitiveType,
2810        use_utf8view: bool,
2811    ) -> Decoder {
2812        let ws = Schema::TypeName(TypeName::Primitive(writer));
2813        let rs = Schema::TypeName(TypeName::Primitive(reader));
2814        let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2815        Decoder::try_new(&dt).unwrap()
2816    }
2817
2818    fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2819        AvroDataType::new(codec, HashMap::new(), nullability)
2820    }
2821
2822    #[cfg(feature = "avro_custom_types")]
2823    fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2824        let mut out = Vec::with_capacity(10);
2825        while x >= 0x80 {
2826            out.push((x as u8) | 0x80);
2827            x >>= 7;
2828        }
2829        out.push(x as u8);
2830        out
2831    }
2832
2833    #[test]
2834    fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2835        let ws = Schema::Union(vec![
2836            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2837            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2838        ]);
2839        let rs = Schema::Union(vec![
2840            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2841            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2842        ]);
2843
2844        let dt = resolved_root_datatype(ws, rs, false, false);
2845        let mut dec = Decoder::try_new(&dt).unwrap();
2846
2847        let mut rec1 = encode_avro_long(0);
2848        rec1.extend(encode_avro_int(7));
2849        let mut cur1 = AvroCursor::new(&rec1);
2850        dec.decode(&mut cur1).unwrap();
2851
2852        let mut rec2 = encode_avro_long(1);
2853        rec2.extend(encode_avro_bytes("abc".as_bytes()));
2854        let mut cur2 = AvroCursor::new(&rec2);
2855        dec.decode(&mut cur2).unwrap();
2856
2857        let arr = dec.flush(None).unwrap();
2858        let ua = arr
2859            .as_any()
2860            .downcast_ref::<UnionArray>()
2861            .expect("dense union output");
2862
2863        assert_eq!(
2864            ua.type_id(0),
2865            1,
2866            "first value must select reader 'long' branch"
2867        );
2868        assert_eq!(ua.value_offset(0), 0);
2869
2870        assert_eq!(
2871            ua.type_id(1),
2872            0,
2873            "second value must select reader 'string' branch"
2874        );
2875        assert_eq!(ua.value_offset(1), 0);
2876
2877        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2878        assert_eq!(long_child.len(), 1);
2879        assert_eq!(long_child.value(0), 7);
2880
2881        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2882        assert_eq!(str_child.len(), 1);
2883        assert_eq!(str_child.value(0), "abc");
2884    }
2885
2886    #[test]
2887    fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2888        let ws = Schema::Union(vec![
2889            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2890            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2891        ]);
2892        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2893
2894        let dt = resolved_root_datatype(ws, rs, false, false);
2895        let mut dec = Decoder::try_new(&dt).unwrap();
2896
2897        let mut data = encode_avro_long(0);
2898        data.extend(encode_avro_int(5));
2899        let mut cur = AvroCursor::new(&data);
2900        dec.decode(&mut cur).unwrap();
2901
2902        let arr = dec.flush(None).unwrap();
2903        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2904        assert_eq!(out.len(), 1);
2905        assert_eq!(out.value(0), 5);
2906    }
2907
2908    #[test]
2909    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2910        let ws = Schema::Union(vec![
2911            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2912            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2913        ]);
2914        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2915
2916        let dt = resolved_root_datatype(ws, rs, false, false);
2917        let mut dec = Decoder::try_new(&dt).unwrap();
2918
2919        let mut data = encode_avro_long(1);
2920        data.extend(encode_avro_bytes("z".as_bytes()));
2921        let mut cur = AvroCursor::new(&data);
2922        let res = dec.decode(&mut cur);
2923        assert!(
2924            res.is_err(),
2925            "expected error when writer union branch does not resolve to reader non-union type"
2926        );
2927    }
2928
2929    #[test]
2930    fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2931        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2932        let rs = Schema::Union(vec![
2933            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2934            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2935        ]);
2936
2937        let dt = resolved_root_datatype(ws, rs, false, false);
2938        let mut dec = Decoder::try_new(&dt).unwrap();
2939
2940        let data = encode_avro_int(6);
2941        let mut cur = AvroCursor::new(&data);
2942        dec.decode(&mut cur).unwrap();
2943
2944        let arr = dec.flush(None).unwrap();
2945        let ua = arr
2946            .as_any()
2947            .downcast_ref::<UnionArray>()
2948            .expect("dense union output");
2949        assert_eq!(ua.len(), 1);
2950        assert_eq!(
2951            ua.type_id(0),
2952            1,
2953            "must resolve to reader 'long' branch (type_id 1)"
2954        );
2955        assert_eq!(ua.value_offset(0), 0);
2956
2957        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2958        assert_eq!(long_child.len(), 1);
2959        assert_eq!(long_child.value(0), 6);
2960
2961        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2962        assert_eq!(str_child.len(), 0, "string branch must be empty");
2963    }
2964
2965    #[test]
2966    fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2967        let ws = Schema::Union(vec![
2968            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2969            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2970        ]);
2971        let rs = Schema::Union(vec![
2972            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2973            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2974        ]);
2975
2976        let dt = resolved_root_datatype(ws, rs, false, false);
2977        let mut dec = Decoder::try_new(&dt).unwrap();
2978
2979        let mut data = encode_avro_long(1);
2980        data.push(1);
2981        let mut cur = AvroCursor::new(&data);
2982        let res = dec.decode(&mut cur);
2983        assert!(
2984            res.is_err(),
2985            "expected error for unmapped writer 'boolean' branch"
2986        );
2987    }
2988
2989    #[test]
2990    fn test_schema_resolution_promotion_int_to_long() {
2991        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2992        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2993        for v in [0, 1, -2, 123456] {
2994            let data = encode_avro_int(v);
2995            let mut cur = AvroCursor::new(&data);
2996            dec.decode(&mut cur).unwrap();
2997        }
2998        let arr = dec.flush(None).unwrap();
2999        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
3000        assert_eq!(a.value(0), 0);
3001        assert_eq!(a.value(1), 1);
3002        assert_eq!(a.value(2), -2);
3003        assert_eq!(a.value(3), 123456);
3004    }
3005
3006    #[test]
3007    fn test_schema_resolution_promotion_int_to_float() {
3008        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
3009        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
3010        for v in [0, 42, -7] {
3011            let data = encode_avro_int(v);
3012            let mut cur = AvroCursor::new(&data);
3013            dec.decode(&mut cur).unwrap();
3014        }
3015        let arr = dec.flush(None).unwrap();
3016        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3017        assert_eq!(a.value(0), 0.0);
3018        assert_eq!(a.value(1), 42.0);
3019        assert_eq!(a.value(2), -7.0);
3020    }
3021
3022    #[test]
3023    fn test_schema_resolution_promotion_int_to_double() {
3024        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
3025        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
3026        for v in [1, -1, 10_000] {
3027            let data = encode_avro_int(v);
3028            let mut cur = AvroCursor::new(&data);
3029            dec.decode(&mut cur).unwrap();
3030        }
3031        let arr = dec.flush(None).unwrap();
3032        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3033        assert_eq!(a.value(0), 1.0);
3034        assert_eq!(a.value(1), -1.0);
3035        assert_eq!(a.value(2), 10_000.0);
3036    }
3037
3038    #[test]
3039    fn test_schema_resolution_promotion_long_to_float() {
3040        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
3041        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
3042        for v in [0_i64, 1_000_000_i64, -123_i64] {
3043            let data = encode_avro_long(v);
3044            let mut cur = AvroCursor::new(&data);
3045            dec.decode(&mut cur).unwrap();
3046        }
3047        let arr = dec.flush(None).unwrap();
3048        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3049        assert_eq!(a.value(0), 0.0);
3050        assert_eq!(a.value(1), 1_000_000.0);
3051        assert_eq!(a.value(2), -123.0);
3052    }
3053
3054    #[test]
3055    fn test_schema_resolution_promotion_long_to_double() {
3056        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
3057        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
3058        for v in [2_i64, -2_i64, 9_223_372_i64] {
3059            let data = encode_avro_long(v);
3060            let mut cur = AvroCursor::new(&data);
3061            dec.decode(&mut cur).unwrap();
3062        }
3063        let arr = dec.flush(None).unwrap();
3064        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3065        assert_eq!(a.value(0), 2.0);
3066        assert_eq!(a.value(1), -2.0);
3067        assert_eq!(a.value(2), 9_223_372.0);
3068    }
3069
3070    #[test]
3071    fn test_schema_resolution_promotion_float_to_double() {
3072        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
3073        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
3074        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
3075            let data = v.to_le_bytes().to_vec();
3076            let mut cur = AvroCursor::new(&data);
3077            dec.decode(&mut cur).unwrap();
3078        }
3079        let arr = dec.flush(None).unwrap();
3080        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3081        assert_eq!(a.value(0), 0.5_f64);
3082        assert_eq!(a.value(1), -3.25_f64);
3083        assert_eq!(a.value(2), 1.0e6_f64);
3084    }
3085
3086    #[test]
3087    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
3088        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
3089        assert!(matches!(dec, Decoder::BytesToString(_, _)));
3090        for s in ["hello", "world", "héllo"] {
3091            let data = encode_avro_bytes(s.as_bytes());
3092            let mut cur = AvroCursor::new(&data);
3093            dec.decode(&mut cur).unwrap();
3094        }
3095        let arr = dec.flush(None).unwrap();
3096        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3097        assert_eq!(a.value(0), "hello");
3098        assert_eq!(a.value(1), "world");
3099        assert_eq!(a.value(2), "héllo");
3100    }
3101
3102    #[test]
3103    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
3104        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
3105        assert!(matches!(dec, Decoder::BytesToString(_, _)));
3106        let data = encode_avro_bytes("abc".as_bytes());
3107        let mut cur = AvroCursor::new(&data);
3108        dec.decode(&mut cur).unwrap();
3109        let arr = dec.flush(None).unwrap();
3110        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3111        assert_eq!(a.value(0), "abc");
3112    }
3113
3114    #[test]
3115    fn test_schema_resolution_promotion_string_to_bytes() {
3116        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
3117        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
3118        for s in ["", "abc", "data"] {
3119            let data = encode_avro_bytes(s.as_bytes());
3120            let mut cur = AvroCursor::new(&data);
3121            dec.decode(&mut cur).unwrap();
3122        }
3123        let arr = dec.flush(None).unwrap();
3124        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3125        assert_eq!(a.value(0), b"");
3126        assert_eq!(a.value(1), b"abc");
3127        assert_eq!(a.value(2), "data".as_bytes());
3128    }
3129
3130    #[test]
3131    fn test_schema_resolution_no_promotion_passthrough_int() {
3132        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3133        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3134        // Wrap both in a synthetic single-field record and resolve with AvroFieldBuilder
3135        let writer_record = Schema::Complex(ComplexType::Record(Record {
3136            name: "Root",
3137            namespace: None,
3138            doc: None,
3139            aliases: vec![],
3140            fields: vec![Field {
3141                name: "v",
3142                r#type: ws,
3143                default: None,
3144                doc: None,
3145                aliases: vec![],
3146            }],
3147            attributes: Attributes::default(),
3148        }));
3149        let reader_record = Schema::Complex(ComplexType::Record(Record {
3150            name: "Root",
3151            namespace: None,
3152            doc: None,
3153            aliases: vec![],
3154            fields: vec![Field {
3155                name: "v",
3156                r#type: rs,
3157                default: None,
3158                doc: None,
3159                aliases: vec![],
3160            }],
3161            attributes: Attributes::default(),
3162        }));
3163        let field = AvroFieldBuilder::new(&writer_record)
3164            .with_reader_schema(&reader_record)
3165            .with_utf8view(false)
3166            .with_strict_mode(false)
3167            .build()
3168            .unwrap();
3169        // Extract the resolved inner field's AvroDataType
3170        let dt = match field.data_type().codec() {
3171            Codec::Struct(fields) => fields[0].data_type().clone(),
3172            other => panic!("expected wrapper struct, got {other:?}"),
3173        };
3174        let mut dec = Decoder::try_new(&dt).unwrap();
3175        assert!(matches!(dec, Decoder::Int32(_)));
3176        for v in [7, -9] {
3177            let data = encode_avro_int(v);
3178            let mut cur = AvroCursor::new(&data);
3179            dec.decode(&mut cur).unwrap();
3180        }
3181        let arr = dec.flush(None).unwrap();
3182        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3183        assert_eq!(a.value(0), 7);
3184        assert_eq!(a.value(1), -9);
3185    }
3186
3187    #[test]
3188    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
3189        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3190        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
3191        let writer_record = Schema::Complex(ComplexType::Record(Record {
3192            name: "Root",
3193            namespace: None,
3194            doc: None,
3195            aliases: vec![],
3196            fields: vec![Field {
3197                name: "v",
3198                r#type: ws,
3199                default: None,
3200                doc: None,
3201                aliases: vec![],
3202            }],
3203            attributes: Attributes::default(),
3204        }));
3205        let reader_record = Schema::Complex(ComplexType::Record(Record {
3206            name: "Root",
3207            namespace: None,
3208            doc: None,
3209            aliases: vec![],
3210            fields: vec![Field {
3211                name: "v",
3212                r#type: rs,
3213                default: None,
3214                doc: None,
3215                aliases: vec![],
3216            }],
3217            attributes: Attributes::default(),
3218        }));
3219        let res = AvroFieldBuilder::new(&writer_record)
3220            .with_reader_schema(&reader_record)
3221            .with_utf8view(false)
3222            .with_strict_mode(false)
3223            .build();
3224        assert!(res.is_err(), "expected error for illegal promotion");
3225    }
3226
3227    #[test]
3228    fn test_map_decoding_one_entry() {
3229        let value_type = avro_from_codec(Codec::Utf8);
3230        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3231        let mut decoder = Decoder::try_new(&map_type).unwrap();
3232        // Encode a single map with one entry: {"hello": "world"}
3233        let mut data = Vec::new();
3234        data.extend_from_slice(&encode_avro_long(1));
3235        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
3236        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
3237        data.extend_from_slice(&encode_avro_long(0));
3238        let mut cursor = AvroCursor::new(&data);
3239        decoder.decode(&mut cursor).unwrap();
3240        let array = decoder.flush(None).unwrap();
3241        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3242        assert_eq!(map_arr.len(), 1); // one map
3243        assert_eq!(map_arr.value_length(0), 1);
3244        let entries = map_arr.value(0);
3245        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
3246        assert_eq!(struct_entries.len(), 1);
3247        let key_arr = struct_entries
3248            .column_by_name("key")
3249            .unwrap()
3250            .as_any()
3251            .downcast_ref::<StringArray>()
3252            .unwrap();
3253        let val_arr = struct_entries
3254            .column_by_name("value")
3255            .unwrap()
3256            .as_any()
3257            .downcast_ref::<StringArray>()
3258            .unwrap();
3259        assert_eq!(key_arr.value(0), "hello");
3260        assert_eq!(val_arr.value(0), "world");
3261    }
3262
3263    #[test]
3264    fn test_map_decoding_empty() {
3265        let value_type = avro_from_codec(Codec::Utf8);
3266        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3267        let mut decoder = Decoder::try_new(&map_type).unwrap();
3268        let data = encode_avro_long(0);
3269        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3270        let array = decoder.flush(None).unwrap();
3271        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3272        assert_eq!(map_arr.len(), 1);
3273        assert_eq!(map_arr.value_length(0), 0);
3274    }
3275
3276    #[test]
3277    fn test_fixed_decoding() {
3278        let avro_type = avro_from_codec(Codec::Fixed(3));
3279        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3280
3281        let data1 = [1u8, 2, 3];
3282        let mut cursor1 = AvroCursor::new(&data1);
3283        decoder
3284            .decode(&mut cursor1)
3285            .expect("Failed to decode data1");
3286        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
3287        let data2 = [4u8, 5, 6];
3288        let mut cursor2 = AvroCursor::new(&data2);
3289        decoder
3290            .decode(&mut cursor2)
3291            .expect("Failed to decode data2");
3292        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
3293        let array = decoder.flush(None).expect("Failed to flush decoder");
3294        assert_eq!(array.len(), 2, "Array should contain two items");
3295        let fixed_size_binary_array = array
3296            .as_any()
3297            .downcast_ref::<FixedSizeBinaryArray>()
3298            .expect("Failed to downcast to FixedSizeBinaryArray");
3299        assert_eq!(
3300            fixed_size_binary_array.value_length(),
3301            3,
3302            "Fixed size of binary values should be 3"
3303        );
3304        assert_eq!(
3305            fixed_size_binary_array.value(0),
3306            &[1, 2, 3],
3307            "First item mismatch"
3308        );
3309        assert_eq!(
3310            fixed_size_binary_array.value(1),
3311            &[4, 5, 6],
3312            "Second item mismatch"
3313        );
3314    }
3315
3316    #[test]
3317    fn test_fixed_decoding_empty() {
3318        let avro_type = avro_from_codec(Codec::Fixed(5));
3319        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3320
3321        let array = decoder
3322            .flush(None)
3323            .expect("Failed to flush decoder for empty input");
3324
3325        assert_eq!(array.len(), 0, "Array should be empty");
3326        let fixed_size_binary_array = array
3327            .as_any()
3328            .downcast_ref::<FixedSizeBinaryArray>()
3329            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
3330
3331        assert_eq!(
3332            fixed_size_binary_array.value_length(),
3333            5,
3334            "Fixed size of binary values should be 5 as per type"
3335        );
3336    }
3337
3338    #[test]
3339    fn test_uuid_decoding() {
3340        let avro_type = avro_from_codec(Codec::Uuid);
3341        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3342        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
3343        let data = encode_avro_bytes(uuid_str.as_bytes());
3344        let mut cursor = AvroCursor::new(&data);
3345        decoder.decode(&mut cursor).expect("Failed to decode data");
3346        assert_eq!(
3347            cursor.position(),
3348            data.len(),
3349            "Cursor should advance by varint size + data size"
3350        );
3351        let array = decoder.flush(None).expect("Failed to flush decoder");
3352        let fixed_size_binary_array = array
3353            .as_any()
3354            .downcast_ref::<FixedSizeBinaryArray>()
3355            .expect("Array should be a FixedSizeBinaryArray");
3356        assert_eq!(fixed_size_binary_array.len(), 1);
3357        assert_eq!(fixed_size_binary_array.value_length(), 16);
3358        let expected_bytes = [
3359            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
3360            0x6b, 0xf6,
3361        ];
3362        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
3363    }
3364
3365    #[test]
3366    fn test_array_decoding() {
3367        let item_dt = avro_from_codec(Codec::Int32);
3368        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3369        let mut decoder = Decoder::try_new(&list_dt).unwrap();
3370        let mut row1 = Vec::new();
3371        row1.extend_from_slice(&encode_avro_long(2));
3372        row1.extend_from_slice(&encode_avro_int(10));
3373        row1.extend_from_slice(&encode_avro_int(20));
3374        row1.extend_from_slice(&encode_avro_long(0));
3375        let row2 = encode_avro_long(0);
3376        let mut cursor = AvroCursor::new(&row1);
3377        decoder.decode(&mut cursor).unwrap();
3378        let mut cursor2 = AvroCursor::new(&row2);
3379        decoder.decode(&mut cursor2).unwrap();
3380        let array = decoder.flush(None).unwrap();
3381        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3382        assert_eq!(list_arr.len(), 2);
3383        let offsets = list_arr.value_offsets();
3384        assert_eq!(offsets, &[0, 2, 2]);
3385        let values = list_arr.values();
3386        let int_arr = values.as_primitive::<Int32Type>();
3387        assert_eq!(int_arr.len(), 2);
3388        assert_eq!(int_arr.value(0), 10);
3389        assert_eq!(int_arr.value(1), 20);
3390    }
3391
3392    #[test]
3393    fn test_array_decoding_with_negative_block_count() {
3394        let item_dt = avro_from_codec(Codec::Int32);
3395        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3396        let mut decoder = Decoder::try_new(&list_dt).unwrap();
3397        let mut data = encode_avro_long(-3);
3398        data.extend_from_slice(&encode_avro_long(12));
3399        data.extend_from_slice(&encode_avro_int(1));
3400        data.extend_from_slice(&encode_avro_int(2));
3401        data.extend_from_slice(&encode_avro_int(3));
3402        data.extend_from_slice(&encode_avro_long(0));
3403        let mut cursor = AvroCursor::new(&data);
3404        decoder.decode(&mut cursor).unwrap();
3405        let array = decoder.flush(None).unwrap();
3406        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3407        assert_eq!(list_arr.len(), 1);
3408        assert_eq!(list_arr.value_length(0), 3);
3409        let values = list_arr.values().as_primitive::<Int32Type>();
3410        assert_eq!(values.len(), 3);
3411        assert_eq!(values.value(0), 1);
3412        assert_eq!(values.value(1), 2);
3413        assert_eq!(values.value(2), 3);
3414    }
3415
3416    #[test]
3417    fn test_nested_array_decoding() {
3418        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3419        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
3420        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
3421        let mut buf = Vec::new();
3422        buf.extend(encode_avro_long(1));
3423        buf.extend(encode_avro_long(2));
3424        buf.extend(encode_avro_int(5));
3425        buf.extend(encode_avro_int(6));
3426        buf.extend(encode_avro_long(0));
3427        buf.extend(encode_avro_long(0));
3428        let mut cursor = AvroCursor::new(&buf);
3429        decoder.decode(&mut cursor).unwrap();
3430        let arr = decoder.flush(None).unwrap();
3431        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
3432        assert_eq!(outer.len(), 1);
3433        assert_eq!(outer.value_length(0), 1);
3434        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
3435        assert_eq!(inner.len(), 1);
3436        assert_eq!(inner.value_length(0), 2);
3437        let values = inner
3438            .values()
3439            .as_any()
3440            .downcast_ref::<Int32Array>()
3441            .unwrap();
3442        assert_eq!(values.values(), &[5, 6]);
3443    }
3444
3445    #[test]
3446    fn test_array_decoding_empty_array() {
3447        let value_type = avro_from_codec(Codec::Utf8);
3448        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
3449        let mut decoder = Decoder::try_new(&map_type).unwrap();
3450        let data = encode_avro_long(0);
3451        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3452        let array = decoder.flush(None).unwrap();
3453        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3454        assert_eq!(list_arr.len(), 1);
3455        assert_eq!(list_arr.value_length(0), 0);
3456    }
3457
3458    #[test]
3459    fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
3460        use crate::schema::Array;
3461        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3462            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3463            attributes: Attributes::default(),
3464        }));
3465        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3466            items: Box::new(Schema::Union(vec![
3467                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3468                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3469            ])),
3470            attributes: Attributes::default(),
3471        }));
3472        let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
3473        if let Codec::List(inner) = dt.codec() {
3474            assert_eq!(
3475                inner.nullability(),
3476                Some(Nullability::NullFirst),
3477                "items should be nullable"
3478            );
3479        } else {
3480            panic!("expected List codec");
3481        }
3482        let mut decoder = Decoder::try_new(&dt).unwrap();
3483        let mut data = encode_avro_long(2);
3484        data.extend(encode_avro_int(10));
3485        data.extend(encode_avro_int(20));
3486        data.extend(encode_avro_long(0));
3487        let mut cursor = AvroCursor::new(&data);
3488        decoder.decode(&mut cursor).unwrap();
3489        assert_eq!(
3490            cursor.position(),
3491            data.len(),
3492            "all bytes should be consumed"
3493        );
3494        let array = decoder.flush(None).unwrap();
3495        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3496        assert_eq!(list_arr.len(), 1, "one list/row");
3497        assert_eq!(list_arr.value_length(0), 2, "two items in the list");
3498        let values = list_arr.values().as_primitive::<Int32Type>();
3499        assert_eq!(values.len(), 2);
3500        assert_eq!(values.value(0), 10);
3501        assert_eq!(values.value(1), 20);
3502        assert!(!values.is_null(0));
3503        assert!(!values.is_null(1));
3504    }
3505
3506    #[test]
3507    fn test_decimal_decoding_fixed256() {
3508        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3509        let mut decoder = Decoder::try_new(&dt).unwrap();
3510        let row1 = [
3511            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3512            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3513            0x00, 0x00, 0x30, 0x39,
3514        ];
3515        let row2 = [
3516            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3517            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3518            0xFF, 0xFF, 0xFF, 0x85,
3519        ];
3520        let mut data = Vec::new();
3521        data.extend_from_slice(&row1);
3522        data.extend_from_slice(&row2);
3523        let mut cursor = AvroCursor::new(&data);
3524        decoder.decode(&mut cursor).unwrap();
3525        decoder.decode(&mut cursor).unwrap();
3526        let arr = decoder.flush(None).unwrap();
3527        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3528        assert_eq!(dec.len(), 2);
3529        assert_eq!(dec.value_as_string(0), "123.45");
3530        assert_eq!(dec.value_as_string(1), "-1.23");
3531    }
3532
3533    #[test]
3534    fn test_decimal_decoding_fixed128() {
3535        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
3536        let mut decoder = Decoder::try_new(&dt).unwrap();
3537        let row1 = [
3538            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3539            0x30, 0x39,
3540        ];
3541        let row2 = [
3542            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3543            0xFF, 0x85,
3544        ];
3545        let mut data = Vec::new();
3546        data.extend_from_slice(&row1);
3547        data.extend_from_slice(&row2);
3548        let mut cursor = AvroCursor::new(&data);
3549        decoder.decode(&mut cursor).unwrap();
3550        decoder.decode(&mut cursor).unwrap();
3551        let arr = decoder.flush(None).unwrap();
3552        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3553        assert_eq!(dec.len(), 2);
3554        assert_eq!(dec.value_as_string(0), "123.45");
3555        assert_eq!(dec.value_as_string(1), "-1.23");
3556    }
3557
3558    #[test]
3559    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3560        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3561        let mut decoder = Decoder::try_new(&dt).unwrap();
3562        let row1 = [
3563            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3564            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3565            0x00, 0x00, 0x30, 0x39,
3566        ];
3567        let row2 = [
3568            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3569            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3570            0xFF, 0xFF, 0xFF, 0x85,
3571        ];
3572        let mut data = Vec::new();
3573        data.extend_from_slice(&row1);
3574        data.extend_from_slice(&row2);
3575        let mut cursor = AvroCursor::new(&data);
3576        decoder.decode(&mut cursor).unwrap();
3577        decoder.decode(&mut cursor).unwrap();
3578        let arr = decoder.flush(None).unwrap();
3579        #[cfg(feature = "small_decimals")]
3580        {
3581            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3582            assert_eq!(dec.len(), 2);
3583            assert_eq!(dec.value_as_string(0), "123.45");
3584            assert_eq!(dec.value_as_string(1), "-1.23");
3585        }
3586        #[cfg(not(feature = "small_decimals"))]
3587        {
3588            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3589            assert_eq!(dec.len(), 2);
3590            assert_eq!(dec.value_as_string(0), "123.45");
3591            assert_eq!(dec.value_as_string(1), "-1.23");
3592        }
3593    }
3594
3595    #[test]
3596    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3597        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3598        let mut decoder = Decoder::try_new(&dt).unwrap();
3599        let row1 = [
3600            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3601            0x30, 0x39,
3602        ];
3603        let row2 = [
3604            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3605            0xFF, 0x85,
3606        ];
3607        let mut data = Vec::new();
3608        data.extend_from_slice(&row1);
3609        data.extend_from_slice(&row2);
3610        let mut cursor = AvroCursor::new(&data);
3611        decoder.decode(&mut cursor).unwrap();
3612        decoder.decode(&mut cursor).unwrap();
3613
3614        let arr = decoder.flush(None).unwrap();
3615        #[cfg(feature = "small_decimals")]
3616        {
3617            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3618            assert_eq!(dec.len(), 2);
3619            assert_eq!(dec.value_as_string(0), "123.45");
3620            assert_eq!(dec.value_as_string(1), "-1.23");
3621        }
3622        #[cfg(not(feature = "small_decimals"))]
3623        {
3624            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3625            assert_eq!(dec.len(), 2);
3626            assert_eq!(dec.value_as_string(0), "123.45");
3627            assert_eq!(dec.value_as_string(1), "-1.23");
3628        }
3629    }
3630
3631    #[test]
3632    fn test_decimal_decoding_bytes_with_nulls() {
3633        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3634        let inner = Decoder::try_new(&dt).unwrap();
3635        let mut decoder = Decoder::Nullable(
3636            NullablePlan::ReadTag {
3637                nullability: Nullability::NullSecond,
3638                resolution: ResolutionPlan::Promotion(Promotion::Direct),
3639            },
3640            NullBufferBuilder::new(DEFAULT_CAPACITY),
3641            Box::new(inner),
3642        );
3643        let mut data = Vec::new();
3644        data.extend_from_slice(&encode_avro_int(0));
3645        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3646        data.extend_from_slice(&encode_avro_int(1));
3647        data.extend_from_slice(&encode_avro_int(0));
3648        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3649        let mut cursor = AvroCursor::new(&data);
3650        decoder.decode(&mut cursor).unwrap();
3651        decoder.decode(&mut cursor).unwrap();
3652        decoder.decode(&mut cursor).unwrap();
3653        let arr = decoder.flush(None).unwrap();
3654        #[cfg(feature = "small_decimals")]
3655        {
3656            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3657            assert_eq!(dec_arr.len(), 3);
3658            assert!(dec_arr.is_valid(0));
3659            assert!(!dec_arr.is_valid(1));
3660            assert!(dec_arr.is_valid(2));
3661            assert_eq!(dec_arr.value_as_string(0), "123.4");
3662            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3663        }
3664        #[cfg(not(feature = "small_decimals"))]
3665        {
3666            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3667            assert_eq!(dec_arr.len(), 3);
3668            assert!(dec_arr.is_valid(0));
3669            assert!(!dec_arr.is_valid(1));
3670            assert!(dec_arr.is_valid(2));
3671            assert_eq!(dec_arr.value_as_string(0), "123.4");
3672            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3673        }
3674    }
3675
3676    #[test]
3677    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3678        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3679        let inner = Decoder::try_new(&dt).unwrap();
3680        let mut decoder = Decoder::Nullable(
3681            NullablePlan::ReadTag {
3682                nullability: Nullability::NullSecond,
3683                resolution: ResolutionPlan::Promotion(Promotion::Direct),
3684            },
3685            NullBufferBuilder::new(DEFAULT_CAPACITY),
3686            Box::new(inner),
3687        );
3688        let row1 = [
3689            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3690            0xE2, 0x40,
3691        ];
3692        let row3 = [
3693            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3694            0x1D, 0xC0,
3695        ];
3696        let mut data = Vec::new();
3697        data.extend_from_slice(&encode_avro_int(0));
3698        data.extend_from_slice(&row1);
3699        data.extend_from_slice(&encode_avro_int(1));
3700        data.extend_from_slice(&encode_avro_int(0));
3701        data.extend_from_slice(&row3);
3702        let mut cursor = AvroCursor::new(&data);
3703        decoder.decode(&mut cursor).unwrap();
3704        decoder.decode(&mut cursor).unwrap();
3705        decoder.decode(&mut cursor).unwrap();
3706        let arr = decoder.flush(None).unwrap();
3707        #[cfg(feature = "small_decimals")]
3708        {
3709            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3710            assert_eq!(dec_arr.len(), 3);
3711            assert!(dec_arr.is_valid(0));
3712            assert!(!dec_arr.is_valid(1));
3713            assert!(dec_arr.is_valid(2));
3714            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3715            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3716        }
3717        #[cfg(not(feature = "small_decimals"))]
3718        {
3719            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3720            assert_eq!(dec_arr.len(), 3);
3721            assert!(dec_arr.is_valid(0));
3722            assert!(!dec_arr.is_valid(1));
3723            assert!(dec_arr.is_valid(2));
3724            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3725            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3726        }
3727    }
3728
3729    #[test]
3730    fn test_enum_decoding() {
3731        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3732        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3733        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3734        let mut data = Vec::new();
3735        data.extend_from_slice(&encode_avro_int(2));
3736        data.extend_from_slice(&encode_avro_int(0));
3737        data.extend_from_slice(&encode_avro_int(1));
3738        let mut cursor = AvroCursor::new(&data);
3739        decoder.decode(&mut cursor).unwrap();
3740        decoder.decode(&mut cursor).unwrap();
3741        decoder.decode(&mut cursor).unwrap();
3742        let array = decoder.flush(None).unwrap();
3743        let dict_array = array
3744            .as_any()
3745            .downcast_ref::<DictionaryArray<Int32Type>>()
3746            .unwrap();
3747        assert_eq!(dict_array.len(), 3);
3748        let values = dict_array
3749            .values()
3750            .as_any()
3751            .downcast_ref::<StringArray>()
3752            .unwrap();
3753        assert_eq!(values.value(0), "A");
3754        assert_eq!(values.value(1), "B");
3755        assert_eq!(values.value(2), "C");
3756        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3757    }
3758
3759    #[test]
3760    fn test_enum_decoding_with_nulls() {
3761        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3762        let enum_codec = Codec::Enum(symbols.clone());
3763        let avro_type =
3764            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3765        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3766        let mut data = Vec::new();
3767        data.extend_from_slice(&encode_avro_long(1));
3768        data.extend_from_slice(&encode_avro_int(1));
3769        data.extend_from_slice(&encode_avro_long(0));
3770        data.extend_from_slice(&encode_avro_long(1));
3771        data.extend_from_slice(&encode_avro_int(0));
3772        let mut cursor = AvroCursor::new(&data);
3773        decoder.decode(&mut cursor).unwrap();
3774        decoder.decode(&mut cursor).unwrap();
3775        decoder.decode(&mut cursor).unwrap();
3776        let array = decoder.flush(None).unwrap();
3777        let dict_array = array
3778            .as_any()
3779            .downcast_ref::<DictionaryArray<Int32Type>>()
3780            .unwrap();
3781        assert_eq!(dict_array.len(), 3);
3782        assert!(dict_array.is_valid(0));
3783        assert!(dict_array.is_null(1));
3784        assert!(dict_array.is_valid(2));
3785        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3786        assert_eq!(dict_array.keys(), &expected_keys);
3787        let values = dict_array
3788            .values()
3789            .as_any()
3790            .downcast_ref::<StringArray>()
3791            .unwrap();
3792        assert_eq!(values.value(0), "X");
3793        assert_eq!(values.value(1), "Y");
3794    }
3795
3796    #[test]
3797    fn test_duration_decoding_with_nulls() {
3798        let duration_codec = Codec::Interval;
3799        let avro_type = AvroDataType::new(
3800            duration_codec,
3801            Default::default(),
3802            Some(Nullability::NullFirst),
3803        );
3804        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3805        let mut data = Vec::new();
3806        // First value: 1 month, 2 days, 3 millis
3807        data.extend_from_slice(&encode_avro_long(1)); // not null
3808        let mut duration1 = Vec::new();
3809        duration1.extend_from_slice(&1u32.to_le_bytes());
3810        duration1.extend_from_slice(&2u32.to_le_bytes());
3811        duration1.extend_from_slice(&3u32.to_le_bytes());
3812        data.extend_from_slice(&duration1);
3813        // Second value: null
3814        data.extend_from_slice(&encode_avro_long(0)); // null
3815        data.extend_from_slice(&encode_avro_long(1)); // not null
3816        let mut duration2 = Vec::new();
3817        duration2.extend_from_slice(&4u32.to_le_bytes());
3818        duration2.extend_from_slice(&5u32.to_le_bytes());
3819        duration2.extend_from_slice(&6u32.to_le_bytes());
3820        data.extend_from_slice(&duration2);
3821        let mut cursor = AvroCursor::new(&data);
3822        decoder.decode(&mut cursor).unwrap();
3823        decoder.decode(&mut cursor).unwrap();
3824        decoder.decode(&mut cursor).unwrap();
3825        let array = decoder.flush(None).unwrap();
3826        let interval_array = array
3827            .as_any()
3828            .downcast_ref::<IntervalMonthDayNanoArray>()
3829            .unwrap();
3830        assert_eq!(interval_array.len(), 3);
3831        assert!(interval_array.is_valid(0));
3832        assert!(interval_array.is_null(1));
3833        assert!(interval_array.is_valid(2));
3834        let expected = IntervalMonthDayNanoArray::from(vec![
3835            Some(IntervalMonthDayNano {
3836                months: 1,
3837                days: 2,
3838                nanoseconds: 3_000_000,
3839            }),
3840            None,
3841            Some(IntervalMonthDayNano {
3842                months: 4,
3843                days: 5,
3844                nanoseconds: 6_000_000,
3845            }),
3846        ]);
3847        assert_eq!(interval_array, &expected);
3848    }
3849
3850    #[cfg(feature = "avro_custom_types")]
3851    #[test]
3852    fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3853        let avro_type = AvroDataType::new(
3854            Codec::IntervalMonthDayNano,
3855            Default::default(),
3856            Some(Nullability::NullFirst),
3857        );
3858        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3859        let mut data = Vec::new();
3860        // First value: months=1, days=-2, nanos=3
3861        data.extend_from_slice(&encode_avro_long(1));
3862        data.extend_from_slice(&1i32.to_le_bytes());
3863        data.extend_from_slice(&(-2i32).to_le_bytes());
3864        data.extend_from_slice(&3i64.to_le_bytes());
3865        // Second value: null
3866        data.extend_from_slice(&encode_avro_long(0));
3867        // Third value: months=-4, days=5, nanos=-6
3868        data.extend_from_slice(&encode_avro_long(1));
3869        data.extend_from_slice(&(-4i32).to_le_bytes());
3870        data.extend_from_slice(&5i32.to_le_bytes());
3871        data.extend_from_slice(&(-6i64).to_le_bytes());
3872        let mut cursor = AvroCursor::new(&data);
3873        decoder.decode(&mut cursor).unwrap();
3874        decoder.decode(&mut cursor).unwrap();
3875        decoder.decode(&mut cursor).unwrap();
3876        let array = decoder.flush(None).unwrap();
3877        let interval_array = array
3878            .as_any()
3879            .downcast_ref::<IntervalMonthDayNanoArray>()
3880            .unwrap();
3881        assert_eq!(interval_array.len(), 3);
3882        let expected = IntervalMonthDayNanoArray::from(vec![
3883            Some(IntervalMonthDayNano::new(1, -2, 3)),
3884            None,
3885            Some(IntervalMonthDayNano::new(-4, 5, -6)),
3886        ]);
3887        assert_eq!(interval_array, &expected);
3888    }
3889
3890    #[test]
3891    fn test_duration_decoding_empty() {
3892        let duration_codec = Codec::Interval;
3893        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3894        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3895        let array = decoder.flush(None).unwrap();
3896        assert_eq!(array.len(), 0);
3897    }
3898
3899    #[test]
3900    #[cfg(feature = "avro_custom_types")]
3901    fn test_duration_seconds_decoding() {
3902        let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3903        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3904        let mut data = Vec::new();
3905        // Three values: 0, -1, 2
3906        data.extend_from_slice(&encode_avro_long(0));
3907        data.extend_from_slice(&encode_avro_long(-1));
3908        data.extend_from_slice(&encode_avro_long(2));
3909        let mut cursor = AvroCursor::new(&data);
3910        decoder.decode(&mut cursor).unwrap();
3911        decoder.decode(&mut cursor).unwrap();
3912        decoder.decode(&mut cursor).unwrap();
3913        let array = decoder.flush(None).unwrap();
3914        let dur = array
3915            .as_any()
3916            .downcast_ref::<DurationSecondArray>()
3917            .unwrap();
3918        assert_eq!(dur.values(), &[0, -1, 2]);
3919    }
3920
3921    #[test]
3922    #[cfg(feature = "avro_custom_types")]
3923    fn test_duration_milliseconds_decoding() {
3924        let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3925        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3926        let mut data = Vec::new();
3927        for v in [1i64, 0, -2] {
3928            data.extend_from_slice(&encode_avro_long(v));
3929        }
3930        let mut cursor = AvroCursor::new(&data);
3931        for _ in 0..3 {
3932            decoder.decode(&mut cursor).unwrap();
3933        }
3934        let array = decoder.flush(None).unwrap();
3935        let dur = array
3936            .as_any()
3937            .downcast_ref::<DurationMillisecondArray>()
3938            .unwrap();
3939        assert_eq!(dur.values(), &[1, 0, -2]);
3940    }
3941
3942    #[test]
3943    #[cfg(feature = "avro_custom_types")]
3944    fn test_duration_microseconds_decoding() {
3945        let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3946        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3947        let mut data = Vec::new();
3948        for v in [5i64, -6, 7] {
3949            data.extend_from_slice(&encode_avro_long(v));
3950        }
3951        let mut cursor = AvroCursor::new(&data);
3952        for _ in 0..3 {
3953            decoder.decode(&mut cursor).unwrap();
3954        }
3955        let array = decoder.flush(None).unwrap();
3956        let dur = array
3957            .as_any()
3958            .downcast_ref::<DurationMicrosecondArray>()
3959            .unwrap();
3960        assert_eq!(dur.values(), &[5, -6, 7]);
3961    }
3962
3963    #[test]
3964    #[cfg(feature = "avro_custom_types")]
3965    fn test_duration_nanoseconds_decoding() {
3966        let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3967        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3968        let mut data = Vec::new();
3969        for v in [8i64, 9, -10] {
3970            data.extend_from_slice(&encode_avro_long(v));
3971        }
3972        let mut cursor = AvroCursor::new(&data);
3973        for _ in 0..3 {
3974            decoder.decode(&mut cursor).unwrap();
3975        }
3976        let array = decoder.flush(None).unwrap();
3977        let dur = array
3978            .as_any()
3979            .downcast_ref::<DurationNanosecondArray>()
3980            .unwrap();
3981        assert_eq!(dur.values(), &[8, 9, -10]);
3982    }
3983
3984    #[test]
3985    fn test_nullable_decode_error_bitmap_corruption() {
3986        // Nullable Int32 with ['T','null'] encoding (NullSecond)
3987        let avro_type = AvroDataType::new(
3988            Codec::Int32,
3989            Default::default(),
3990            Some(Nullability::NullSecond),
3991        );
3992        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3993
3994        // Row 1: union branch 1 (null)
3995        let mut row1 = Vec::new();
3996        row1.extend_from_slice(&encode_avro_int(1));
3997
3998        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
3999        let mut row2 = Vec::new();
4000        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
4001
4002        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
4003        let mut row3 = Vec::new();
4004        row3.extend_from_slice(&encode_avro_int(0)); // branch
4005        row3.extend_from_slice(&encode_avro_int(42)); // actual value
4006
4007        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
4008        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
4009        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
4010
4011        let array = decoder.flush(None).unwrap();
4012
4013        // Should contain 2 elements: row1 (null) and row3 (42)
4014        assert_eq!(array.len(), 2);
4015        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
4016        assert!(int_array.is_null(0)); // row1 is null
4017        assert_eq!(int_array.value(1), 42); // row3 value is 42
4018    }
4019
4020    #[test]
4021    fn test_enum_mapping_reordered_symbols() {
4022        let reader_symbols: Arc<[String]> =
4023            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
4024        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
4025        let default_index: i32 = -1;
4026        let mut dec = Decoder::Enum(
4027            Vec::with_capacity(DEFAULT_CAPACITY),
4028            reader_symbols.clone(),
4029            Some(EnumResolution {
4030                mapping,
4031                default_index,
4032            }),
4033        );
4034        let mut data = Vec::new();
4035        data.extend_from_slice(&encode_avro_int(0));
4036        data.extend_from_slice(&encode_avro_int(1));
4037        data.extend_from_slice(&encode_avro_int(2));
4038        let mut cur = AvroCursor::new(&data);
4039        dec.decode(&mut cur).unwrap();
4040        dec.decode(&mut cur).unwrap();
4041        dec.decode(&mut cur).unwrap();
4042        let arr = dec.flush(None).unwrap();
4043        let dict = arr
4044            .as_any()
4045            .downcast_ref::<DictionaryArray<Int32Type>>()
4046            .unwrap();
4047        let expected_keys = Int32Array::from(vec![2, 0, 1]);
4048        assert_eq!(dict.keys(), &expected_keys);
4049        let values = dict
4050            .values()
4051            .as_any()
4052            .downcast_ref::<StringArray>()
4053            .unwrap();
4054        assert_eq!(values.value(0), "B");
4055        assert_eq!(values.value(1), "C");
4056        assert_eq!(values.value(2), "A");
4057    }
4058
4059    #[test]
4060    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
4061        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
4062        let default_index: i32 = 1;
4063        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
4064        let mut dec = Decoder::Enum(
4065            Vec::with_capacity(DEFAULT_CAPACITY),
4066            reader_symbols.clone(),
4067            Some(EnumResolution {
4068                mapping,
4069                default_index,
4070            }),
4071        );
4072        let mut data = Vec::new();
4073        data.extend_from_slice(&encode_avro_int(0));
4074        data.extend_from_slice(&encode_avro_int(1));
4075        data.extend_from_slice(&encode_avro_int(99));
4076        let mut cur = AvroCursor::new(&data);
4077        dec.decode(&mut cur).unwrap();
4078        dec.decode(&mut cur).unwrap();
4079        dec.decode(&mut cur).unwrap();
4080        let arr = dec.flush(None).unwrap();
4081        let dict = arr
4082            .as_any()
4083            .downcast_ref::<DictionaryArray<Int32Type>>()
4084            .unwrap();
4085        let expected_keys = Int32Array::from(vec![0, 1, 1]);
4086        assert_eq!(dict.keys(), &expected_keys);
4087        let values = dict
4088            .values()
4089            .as_any()
4090            .downcast_ref::<StringArray>()
4091            .unwrap();
4092        assert_eq!(values.value(0), "A");
4093        assert_eq!(values.value(1), "B");
4094    }
4095
4096    #[test]
4097    fn test_enum_mapping_unknown_symbol_without_default_errors() {
4098        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
4099        let default_index: i32 = -1; // indicates no default at type-level
4100        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
4101        let mut dec = Decoder::Enum(
4102            Vec::with_capacity(DEFAULT_CAPACITY),
4103            reader_symbols,
4104            Some(EnumResolution {
4105                mapping,
4106                default_index,
4107            }),
4108        );
4109        let data = encode_avro_int(0);
4110        let mut cur = AvroCursor::new(&data);
4111        let err = dec
4112            .decode(&mut cur)
4113            .expect_err("expected decode error for unresolved enum without default");
4114        let msg = err.to_string();
4115        assert!(
4116            msg.contains("not resolvable") && msg.contains("no default"),
4117            "unexpected error message: {msg}"
4118        );
4119    }
4120
4121    fn make_record_resolved_decoder(
4122        reader_fields: &[(&str, DataType, bool)],
4123        writer_projections: Vec<FieldProjection>,
4124    ) -> Decoder {
4125        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4126        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4127        for (name, dt, nullable) in reader_fields {
4128            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4129            let enc = match dt {
4130                DataType::Int32 => Decoder::Int32(Vec::new()),
4131                DataType::Int64 => Decoder::Int64(Vec::new()),
4132                DataType::Utf8 => {
4133                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
4134                }
4135                other => panic!("Unsupported test reader field type: {other:?}"),
4136            };
4137            encodings.push(enc);
4138        }
4139        let fields: Fields = field_refs.into();
4140        Decoder::Record(
4141            fields,
4142            encodings,
4143            vec![None; reader_fields.len()],
4144            Some(Projector {
4145                writer_projections,
4146                default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4147            }),
4148        )
4149    }
4150
4151    #[test]
4152    fn test_skip_writer_trailing_field_int32() {
4153        let mut dec = make_record_resolved_decoder(
4154            &[("id", arrow_schema::DataType::Int32, false)],
4155            vec![
4156                FieldProjection::ToReader(0),
4157                FieldProjection::Skip(super::Skipper::Int32),
4158            ],
4159        );
4160        let mut data = Vec::new();
4161        data.extend_from_slice(&encode_avro_int(7));
4162        data.extend_from_slice(&encode_avro_int(999));
4163        let mut cur = AvroCursor::new(&data);
4164        dec.decode(&mut cur).unwrap();
4165        assert_eq!(cur.position(), data.len());
4166        let arr = dec.flush(None).unwrap();
4167        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
4168        assert_eq!(struct_arr.len(), 1);
4169        let id = struct_arr
4170            .column_by_name("id")
4171            .unwrap()
4172            .as_any()
4173            .downcast_ref::<Int32Array>()
4174            .unwrap();
4175        assert_eq!(id.value(0), 7);
4176    }
4177
4178    #[test]
4179    fn test_skip_writer_middle_field_string() {
4180        let mut dec = make_record_resolved_decoder(
4181            &[
4182                ("id", DataType::Int32, false),
4183                ("score", DataType::Int64, false),
4184            ],
4185            vec![
4186                FieldProjection::ToReader(0),
4187                FieldProjection::Skip(Skipper::String),
4188                FieldProjection::ToReader(1),
4189            ],
4190        );
4191        let mut data = Vec::new();
4192        data.extend_from_slice(&encode_avro_int(42));
4193        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
4194        data.extend_from_slice(&encode_avro_long(1000));
4195        let mut cur = AvroCursor::new(&data);
4196        dec.decode(&mut cur).unwrap();
4197        assert_eq!(cur.position(), data.len());
4198        let arr = dec.flush(None).unwrap();
4199        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4200        let id = s
4201            .column_by_name("id")
4202            .unwrap()
4203            .as_any()
4204            .downcast_ref::<Int32Array>()
4205            .unwrap();
4206        let score = s
4207            .column_by_name("score")
4208            .unwrap()
4209            .as_any()
4210            .downcast_ref::<Int64Array>()
4211            .unwrap();
4212        assert_eq!(id.value(0), 42);
4213        assert_eq!(score.value(0), 1000);
4214    }
4215
4216    #[test]
4217    fn test_skip_writer_array_with_negative_block_count_fast() {
4218        let mut dec = make_record_resolved_decoder(
4219            &[("id", DataType::Int32, false)],
4220            vec![
4221                FieldProjection::Skip(super::Skipper::List(Box::new(Skipper::Int32))),
4222                FieldProjection::ToReader(0),
4223            ],
4224        );
4225        let mut array_payload = Vec::new();
4226        array_payload.extend_from_slice(&encode_avro_int(1));
4227        array_payload.extend_from_slice(&encode_avro_int(2));
4228        array_payload.extend_from_slice(&encode_avro_int(3));
4229        let mut data = Vec::new();
4230        data.extend_from_slice(&encode_avro_long(-3));
4231        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
4232        data.extend_from_slice(&array_payload);
4233        data.extend_from_slice(&encode_avro_long(0));
4234        data.extend_from_slice(&encode_avro_int(5));
4235        let mut cur = AvroCursor::new(&data);
4236        dec.decode(&mut cur).unwrap();
4237        assert_eq!(cur.position(), data.len());
4238        let arr = dec.flush(None).unwrap();
4239        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4240        let id = s
4241            .column_by_name("id")
4242            .unwrap()
4243            .as_any()
4244            .downcast_ref::<Int32Array>()
4245            .unwrap();
4246        assert_eq!(id.len(), 1);
4247        assert_eq!(id.value(0), 5);
4248    }
4249
4250    #[test]
4251    fn test_skip_writer_map_with_negative_block_count_fast() {
4252        let mut dec = make_record_resolved_decoder(
4253            &[("id", DataType::Int32, false)],
4254            vec![
4255                FieldProjection::Skip(Skipper::Map(Box::new(Skipper::Int32))),
4256                FieldProjection::ToReader(0),
4257            ],
4258        );
4259        let mut entries = Vec::new();
4260        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
4261        entries.extend_from_slice(&encode_avro_int(10));
4262        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
4263        entries.extend_from_slice(&encode_avro_int(20));
4264        let mut data = Vec::new();
4265        data.extend_from_slice(&encode_avro_long(-2));
4266        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
4267        data.extend_from_slice(&entries);
4268        data.extend_from_slice(&encode_avro_long(0));
4269        data.extend_from_slice(&encode_avro_int(123));
4270        let mut cur = AvroCursor::new(&data);
4271        dec.decode(&mut cur).unwrap();
4272        assert_eq!(cur.position(), data.len());
4273        let arr = dec.flush(None).unwrap();
4274        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4275        let id = s
4276            .column_by_name("id")
4277            .unwrap()
4278            .as_any()
4279            .downcast_ref::<Int32Array>()
4280            .unwrap();
4281        assert_eq!(id.len(), 1);
4282        assert_eq!(id.value(0), 123);
4283    }
4284
4285    #[test]
4286    fn test_skip_writer_nullable_field_union_nullfirst() {
4287        let mut dec = make_record_resolved_decoder(
4288            &[("id", DataType::Int32, false)],
4289            vec![
4290                FieldProjection::Skip(super::Skipper::Nullable(
4291                    Nullability::NullFirst,
4292                    Box::new(super::Skipper::Int32),
4293                )),
4294                FieldProjection::ToReader(0),
4295            ],
4296        );
4297        let mut row1 = Vec::new();
4298        row1.extend_from_slice(&encode_avro_long(0));
4299        row1.extend_from_slice(&encode_avro_int(5));
4300        let mut row2 = Vec::new();
4301        row2.extend_from_slice(&encode_avro_long(1));
4302        row2.extend_from_slice(&encode_avro_int(123));
4303        row2.extend_from_slice(&encode_avro_int(7));
4304        let mut cur1 = AvroCursor::new(&row1);
4305        let mut cur2 = AvroCursor::new(&row2);
4306        dec.decode(&mut cur1).unwrap();
4307        dec.decode(&mut cur2).unwrap();
4308        assert_eq!(cur1.position(), row1.len());
4309        assert_eq!(cur2.position(), row2.len());
4310        let arr = dec.flush(None).unwrap();
4311        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4312        let id = s
4313            .column_by_name("id")
4314            .unwrap()
4315            .as_any()
4316            .downcast_ref::<Int32Array>()
4317            .unwrap();
4318        assert_eq!(id.len(), 2);
4319        assert_eq!(id.value(0), 5);
4320        assert_eq!(id.value(1), 7);
4321    }
4322
4323    fn make_dense_union_avro(
4324        children: Vec<(Codec, &'_ str, DataType)>,
4325        type_ids: Vec<i8>,
4326    ) -> AvroDataType {
4327        let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
4328        let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
4329        for (codec, name, dt) in children.into_iter() {
4330            avro_children.push(AvroDataType::new(codec, Default::default(), None));
4331            fields.push(arrow_schema::Field::new(name, dt, true));
4332        }
4333        let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
4334        let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
4335        AvroDataType::new(union_codec, Default::default(), None)
4336    }
4337
4338    #[test]
4339    fn test_union_dense_two_children_custom_type_ids() {
4340        let union_dt = make_dense_union_avro(
4341            vec![
4342                (Codec::Int32, "i", DataType::Int32),
4343                (Codec::Utf8, "s", DataType::Utf8),
4344            ],
4345            vec![2, 5],
4346        );
4347        let mut dec = Decoder::try_new(&union_dt).unwrap();
4348        let mut r1 = Vec::new();
4349        r1.extend_from_slice(&encode_avro_long(0));
4350        r1.extend_from_slice(&encode_avro_int(7));
4351        let mut r2 = Vec::new();
4352        r2.extend_from_slice(&encode_avro_long(1));
4353        r2.extend_from_slice(&encode_avro_bytes(b"x"));
4354        let mut r3 = Vec::new();
4355        r3.extend_from_slice(&encode_avro_long(0));
4356        r3.extend_from_slice(&encode_avro_int(-1));
4357        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4358        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4359        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4360        let array = dec.flush(None).unwrap();
4361        let ua = array
4362            .as_any()
4363            .downcast_ref::<UnionArray>()
4364            .expect("expected UnionArray");
4365        assert_eq!(ua.len(), 3);
4366        assert_eq!(ua.type_id(0), 2);
4367        assert_eq!(ua.type_id(1), 5);
4368        assert_eq!(ua.type_id(2), 2);
4369        assert_eq!(ua.value_offset(0), 0);
4370        assert_eq!(ua.value_offset(1), 0);
4371        assert_eq!(ua.value_offset(2), 1);
4372        let int_child = ua
4373            .child(2)
4374            .as_any()
4375            .downcast_ref::<Int32Array>()
4376            .expect("int child");
4377        assert_eq!(int_child.len(), 2);
4378        assert_eq!(int_child.value(0), 7);
4379        assert_eq!(int_child.value(1), -1);
4380        let str_child = ua
4381            .child(5)
4382            .as_any()
4383            .downcast_ref::<StringArray>()
4384            .expect("string child");
4385        assert_eq!(str_child.len(), 1);
4386        assert_eq!(str_child.value(0), "x");
4387    }
4388
4389    #[test]
4390    fn test_union_dense_with_null_and_string_children() {
4391        let union_dt = make_dense_union_avro(
4392            vec![
4393                (Codec::Null, "n", DataType::Null),
4394                (Codec::Utf8, "s", DataType::Utf8),
4395            ],
4396            vec![42, 7],
4397        );
4398        let mut dec = Decoder::try_new(&union_dt).unwrap();
4399        let r1 = encode_avro_long(0);
4400        let mut r2 = Vec::new();
4401        r2.extend_from_slice(&encode_avro_long(1));
4402        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
4403        let r3 = encode_avro_long(0);
4404        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4405        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4406        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4407        let array = dec.flush(None).unwrap();
4408        let ua = array
4409            .as_any()
4410            .downcast_ref::<UnionArray>()
4411            .expect("expected UnionArray");
4412        assert_eq!(ua.len(), 3);
4413        assert_eq!(ua.type_id(0), 42);
4414        assert_eq!(ua.type_id(1), 7);
4415        assert_eq!(ua.type_id(2), 42);
4416        assert_eq!(ua.value_offset(0), 0);
4417        assert_eq!(ua.value_offset(1), 0);
4418        assert_eq!(ua.value_offset(2), 1);
4419        let null_child = ua
4420            .child(42)
4421            .as_any()
4422            .downcast_ref::<NullArray>()
4423            .expect("null child");
4424        assert_eq!(null_child.len(), 2);
4425        let str_child = ua
4426            .child(7)
4427            .as_any()
4428            .downcast_ref::<StringArray>()
4429            .expect("string child");
4430        assert_eq!(str_child.len(), 1);
4431        assert_eq!(str_child.value(0), "abc");
4432    }
4433
4434    #[test]
4435    fn test_union_decode_negative_branch_index_errors() {
4436        let union_dt = make_dense_union_avro(
4437            vec![
4438                (Codec::Int32, "i", DataType::Int32),
4439                (Codec::Utf8, "s", DataType::Utf8),
4440            ],
4441            vec![0, 1],
4442        );
4443        let mut dec = Decoder::try_new(&union_dt).unwrap();
4444        let row = encode_avro_long(-1); // decodes back to -1
4445        let err = dec
4446            .decode(&mut AvroCursor::new(&row))
4447            .expect_err("expected error for negative branch index");
4448        let msg = err.to_string();
4449        assert!(
4450            msg.contains("Negative union branch index"),
4451            "unexpected error message: {msg}"
4452        );
4453    }
4454
4455    #[test]
4456    fn test_union_decode_out_of_range_branch_index_errors() {
4457        let union_dt = make_dense_union_avro(
4458            vec![
4459                (Codec::Int32, "i", DataType::Int32),
4460                (Codec::Utf8, "s", DataType::Utf8),
4461            ],
4462            vec![10, 11],
4463        );
4464        let mut dec = Decoder::try_new(&union_dt).unwrap();
4465        let row = encode_avro_long(2);
4466        let err = dec
4467            .decode(&mut AvroCursor::new(&row))
4468            .expect_err("expected error for out-of-range branch index");
4469        let msg = err.to_string();
4470        assert!(
4471            msg.contains("out of range"),
4472            "unexpected error message: {msg}"
4473        );
4474    }
4475
4476    #[test]
4477    fn test_union_sparse_mode_not_supported() {
4478        let children: Vec<AvroDataType> = vec![
4479            AvroDataType::new(Codec::Int32, Default::default(), None),
4480            AvroDataType::new(Codec::Utf8, Default::default(), None),
4481        ];
4482        let uf = UnionFields::try_new(
4483            vec![1, 3],
4484            vec![
4485                arrow_schema::Field::new("i", DataType::Int32, true),
4486                arrow_schema::Field::new("s", DataType::Utf8, true),
4487            ],
4488        )
4489        .unwrap();
4490        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
4491        let dt = AvroDataType::new(codec, Default::default(), None);
4492        let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
4493        let msg = err.to_string();
4494        assert!(
4495            msg.contains("Sparse Arrow unions are not yet supported"),
4496            "unexpected error message: {msg}"
4497        );
4498    }
4499
4500    fn make_record_decoder_with_projector_defaults(
4501        reader_fields: &[(&str, DataType, bool)],
4502        field_defaults: Vec<Option<AvroLiteral>>,
4503        default_injections: Vec<(usize, AvroLiteral)>,
4504    ) -> Decoder {
4505        assert_eq!(
4506            field_defaults.len(),
4507            reader_fields.len(),
4508            "field_defaults must have one entry per reader field"
4509        );
4510        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4511        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4512        for (name, dt, nullable) in reader_fields {
4513            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4514            let enc = match dt {
4515                DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
4516                DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
4517                DataType::Utf8 => Decoder::String(
4518                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4519                    Vec::with_capacity(DEFAULT_CAPACITY),
4520                ),
4521                other => panic!("Unsupported test field type in helper: {other:?}"),
4522            };
4523            encodings.push(enc);
4524        }
4525        let fields: Fields = field_refs.into();
4526        let projector = Projector {
4527            writer_projections: vec![],
4528            default_injections: Arc::from(default_injections),
4529        };
4530        Decoder::Record(fields, encodings, field_defaults, Some(projector))
4531    }
4532
4533    #[cfg(feature = "avro_custom_types")]
4534    #[test]
4535    fn test_default_append_custom_integer_range_validation() {
4536        let mut d_i8 = Decoder::Int8(Vec::with_capacity(DEFAULT_CAPACITY));
4537        d_i8.append_default(&AvroLiteral::Int(i8::MIN as i32))
4538            .unwrap();
4539        d_i8.append_default(&AvroLiteral::Int(i8::MAX as i32))
4540            .unwrap();
4541        let err_i8_high = d_i8
4542            .append_default(&AvroLiteral::Int(i8::MAX as i32 + 1))
4543            .unwrap_err();
4544        assert!(err_i8_high.to_string().contains("out of range for i8"));
4545        let err_i8_low = d_i8
4546            .append_default(&AvroLiteral::Int(i8::MIN as i32 - 1))
4547            .unwrap_err();
4548        assert!(err_i8_low.to_string().contains("out of range for i8"));
4549        let arr_i8 = d_i8.flush(None).unwrap();
4550        let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4551        assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4552
4553        let mut d_i16 = Decoder::Int16(Vec::with_capacity(DEFAULT_CAPACITY));
4554        d_i16
4555            .append_default(&AvroLiteral::Int(i16::MIN as i32))
4556            .unwrap();
4557        d_i16
4558            .append_default(&AvroLiteral::Int(i16::MAX as i32))
4559            .unwrap();
4560        let err_i16_high = d_i16
4561            .append_default(&AvroLiteral::Int(i16::MAX as i32 + 1))
4562            .unwrap_err();
4563        assert!(err_i16_high.to_string().contains("out of range for i16"));
4564        let err_i16_low = d_i16
4565            .append_default(&AvroLiteral::Int(i16::MIN as i32 - 1))
4566            .unwrap_err();
4567        assert!(err_i16_low.to_string().contains("out of range for i16"));
4568        let arr_i16 = d_i16.flush(None).unwrap();
4569        let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4570        assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4571
4572        let mut d_u8 = Decoder::UInt8(Vec::with_capacity(DEFAULT_CAPACITY));
4573        d_u8.append_default(&AvroLiteral::Int(0)).unwrap();
4574        d_u8.append_default(&AvroLiteral::Int(u8::MAX as i32))
4575            .unwrap();
4576        let err_u8_neg = d_u8.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4577        assert!(err_u8_neg.to_string().contains("out of range for u8"));
4578        let err_u8_high = d_u8
4579            .append_default(&AvroLiteral::Int(u8::MAX as i32 + 1))
4580            .unwrap_err();
4581        assert!(err_u8_high.to_string().contains("out of range for u8"));
4582        let arr_u8 = d_u8.flush(None).unwrap();
4583        let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4584        assert_eq!(values_u8.values(), &[0, u8::MAX]);
4585
4586        let mut d_u16 = Decoder::UInt16(Vec::with_capacity(DEFAULT_CAPACITY));
4587        d_u16.append_default(&AvroLiteral::Int(0)).unwrap();
4588        d_u16
4589            .append_default(&AvroLiteral::Int(u16::MAX as i32))
4590            .unwrap();
4591        let err_u16_neg = d_u16.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4592        assert!(err_u16_neg.to_string().contains("out of range for u16"));
4593        let err_u16_high = d_u16
4594            .append_default(&AvroLiteral::Int(u16::MAX as i32 + 1))
4595            .unwrap_err();
4596        assert!(err_u16_high.to_string().contains("out of range for u16"));
4597        let arr_u16 = d_u16.flush(None).unwrap();
4598        let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4599        assert_eq!(values_u16.values(), &[0, u16::MAX]);
4600
4601        let mut d_u32 = Decoder::UInt32(Vec::with_capacity(DEFAULT_CAPACITY));
4602        d_u32.append_default(&AvroLiteral::Long(0)).unwrap();
4603        d_u32
4604            .append_default(&AvroLiteral::Long(u32::MAX as i64))
4605            .unwrap();
4606        let err_u32_neg = d_u32.append_default(&AvroLiteral::Long(-1)).unwrap_err();
4607        assert!(err_u32_neg.to_string().contains("out of range for u32"));
4608        let err_u32_high = d_u32
4609            .append_default(&AvroLiteral::Long(u32::MAX as i64 + 1))
4610            .unwrap_err();
4611        assert!(err_u32_high.to_string().contains("out of range for u32"));
4612        let arr_u32 = d_u32.flush(None).unwrap();
4613        let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4614        assert_eq!(values_u32.values(), &[0, u32::MAX]);
4615    }
4616
4617    #[cfg(feature = "avro_custom_types")]
4618    #[test]
4619    fn test_decode_custom_integer_range_validation() {
4620        let mut d_i8 = Decoder::try_new(&avro_from_codec(Codec::Int8)).unwrap();
4621        d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32)))
4622            .unwrap();
4623        d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32)))
4624            .unwrap();
4625        let err_i8_high = d_i8
4626            .decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32 + 1)))
4627            .unwrap_err();
4628        assert!(err_i8_high.to_string().contains("out of range for i8"));
4629        let err_i8_low = d_i8
4630            .decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32 - 1)))
4631            .unwrap_err();
4632        assert!(err_i8_low.to_string().contains("out of range for i8"));
4633        let arr_i8 = d_i8.flush(None).unwrap();
4634        let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4635        assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4636
4637        let mut d_i16 = Decoder::try_new(&avro_from_codec(Codec::Int16)).unwrap();
4638        d_i16
4639            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32)))
4640            .unwrap();
4641        d_i16
4642            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32)))
4643            .unwrap();
4644        let err_i16_high = d_i16
4645            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32 + 1)))
4646            .unwrap_err();
4647        assert!(err_i16_high.to_string().contains("out of range for i16"));
4648        let err_i16_low = d_i16
4649            .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32 - 1)))
4650            .unwrap_err();
4651        assert!(err_i16_low.to_string().contains("out of range for i16"));
4652        let arr_i16 = d_i16.flush(None).unwrap();
4653        let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4654        assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4655
4656        let mut d_u8 = Decoder::try_new(&avro_from_codec(Codec::UInt8)).unwrap();
4657        d_u8.decode(&mut AvroCursor::new(&encode_avro_int(0)))
4658            .unwrap();
4659        d_u8.decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32)))
4660            .unwrap();
4661        let err_u8_neg = d_u8
4662            .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4663            .unwrap_err();
4664        assert!(err_u8_neg.to_string().contains("out of range for u8"));
4665        let err_u8_high = d_u8
4666            .decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32 + 1)))
4667            .unwrap_err();
4668        assert!(err_u8_high.to_string().contains("out of range for u8"));
4669        let arr_u8 = d_u8.flush(None).unwrap();
4670        let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4671        assert_eq!(values_u8.values(), &[0, u8::MAX]);
4672
4673        let mut d_u16 = Decoder::try_new(&avro_from_codec(Codec::UInt16)).unwrap();
4674        d_u16
4675            .decode(&mut AvroCursor::new(&encode_avro_int(0)))
4676            .unwrap();
4677        d_u16
4678            .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32)))
4679            .unwrap();
4680        let err_u16_neg = d_u16
4681            .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4682            .unwrap_err();
4683        assert!(err_u16_neg.to_string().contains("out of range for u16"));
4684        let err_u16_high = d_u16
4685            .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32 + 1)))
4686            .unwrap_err();
4687        assert!(err_u16_high.to_string().contains("out of range for u16"));
4688        let arr_u16 = d_u16.flush(None).unwrap();
4689        let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4690        assert_eq!(values_u16.values(), &[0, u16::MAX]);
4691
4692        let mut d_u32 = Decoder::try_new(&avro_from_codec(Codec::UInt32)).unwrap();
4693        d_u32
4694            .decode(&mut AvroCursor::new(&encode_avro_long(0)))
4695            .unwrap();
4696        d_u32
4697            .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64)))
4698            .unwrap();
4699        let err_u32_neg = d_u32
4700            .decode(&mut AvroCursor::new(&encode_avro_long(-1)))
4701            .unwrap_err();
4702        assert!(err_u32_neg.to_string().contains("out of range for u32"));
4703        let err_u32_high = d_u32
4704            .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64 + 1)))
4705            .unwrap_err();
4706        assert!(err_u32_high.to_string().contains("out of range for u32"));
4707        let arr_u32 = d_u32.flush(None).unwrap();
4708        let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4709        assert_eq!(values_u32.values(), &[0, u32::MAX]);
4710    }
4711
4712    #[test]
4713    fn test_default_append_int32_and_int64_from_int_and_long() {
4714        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4715        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
4716        let arr = d_i32.flush(None).unwrap();
4717        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4718        assert_eq!(a.len(), 1);
4719        assert_eq!(a.value(0), 42);
4720        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
4721        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
4722        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
4723        let arr64 = d_i64.flush(None).unwrap();
4724        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
4725        assert_eq!(a64.len(), 2);
4726        assert_eq!(a64.value(0), 5);
4727        assert_eq!(a64.value(1), 7);
4728    }
4729
4730    #[test]
4731    fn test_default_append_floats_and_doubles() {
4732        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
4733        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
4734        let arr32 = d_f32.flush(None).unwrap();
4735        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
4736        assert_eq!(a.value(0), 1.5);
4737        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4738        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
4739        let arr64 = d_f64.flush(None).unwrap();
4740        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
4741        assert_eq!(b.value(0), 2.25);
4742    }
4743
4744    #[test]
4745    fn test_default_append_string_and_bytes() {
4746        let mut d_str = Decoder::String(
4747            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4748            Vec::with_capacity(DEFAULT_CAPACITY),
4749        );
4750        d_str
4751            .append_default(&AvroLiteral::String("hi".into()))
4752            .unwrap();
4753        let s_arr = d_str.flush(None).unwrap();
4754        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
4755        assert_eq!(arr.value(0), "hi");
4756        let mut d_bytes = Decoder::Binary(
4757            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4758            Vec::with_capacity(DEFAULT_CAPACITY),
4759        );
4760        d_bytes
4761            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4762            .unwrap();
4763        let b_arr = d_bytes.flush(None).unwrap();
4764        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
4765        assert_eq!(barr.value(0), &[1, 2, 3]);
4766        let mut d_str_err = Decoder::String(
4767            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4768            Vec::with_capacity(DEFAULT_CAPACITY),
4769        );
4770        let err = d_str_err
4771            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
4772            .unwrap_err();
4773        assert!(
4774            err.to_string()
4775                .contains("Default for string must be string"),
4776            "unexpected error: {err:?}"
4777        );
4778    }
4779
4780    #[test]
4781    fn test_default_append_nullable_int32_null_and_value() {
4782        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4783        let mut dec = Decoder::Nullable(
4784            NullablePlan::ReadTag {
4785                nullability: Nullability::NullFirst,
4786                resolution: ResolutionPlan::Promotion(Promotion::Direct),
4787            },
4788            NullBufferBuilder::new(DEFAULT_CAPACITY),
4789            Box::new(inner),
4790        );
4791        dec.append_default(&AvroLiteral::Null).unwrap();
4792        dec.append_default(&AvroLiteral::Int(11)).unwrap();
4793        let arr = dec.flush(None).unwrap();
4794        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4795        assert_eq!(a.len(), 2);
4796        assert!(a.is_null(0));
4797        assert_eq!(a.value(1), 11);
4798    }
4799
4800    #[test]
4801    fn test_default_append_array_of_ints() {
4802        let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4803        let mut d = Decoder::try_new(&list_dt).unwrap();
4804        let items = vec![
4805            AvroLiteral::Int(1),
4806            AvroLiteral::Int(2),
4807            AvroLiteral::Int(3),
4808        ];
4809        d.append_default(&AvroLiteral::Array(items)).unwrap();
4810        let arr = d.flush(None).unwrap();
4811        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4812        assert_eq!(list.len(), 1);
4813        assert_eq!(list.value_length(0), 3);
4814        let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4815        assert_eq!(vals.values(), &[1, 2, 3]);
4816    }
4817
4818    #[test]
4819    fn test_default_append_map_string_to_int() {
4820        let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4821        let mut d = Decoder::try_new(&map_dt).unwrap();
4822        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4823        m.insert("k1".to_string(), AvroLiteral::Int(10));
4824        m.insert("k2".to_string(), AvroLiteral::Int(20));
4825        d.append_default(&AvroLiteral::Map(m)).unwrap();
4826        let arr = d.flush(None).unwrap();
4827        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4828        assert_eq!(map.len(), 1);
4829        assert_eq!(map.value_length(0), 2);
4830        let binding = map.value(0);
4831        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4832        let k = entries
4833            .column_by_name("key")
4834            .unwrap()
4835            .as_any()
4836            .downcast_ref::<StringArray>()
4837            .unwrap();
4838        let v = entries
4839            .column_by_name("value")
4840            .unwrap()
4841            .as_any()
4842            .downcast_ref::<Int32Array>()
4843            .unwrap();
4844        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4845        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4846        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4847        assert_eq!(vals, [10, 20].into_iter().collect());
4848    }
4849
4850    #[test]
4851    fn test_default_append_enum_by_symbol() {
4852        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4853        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4854        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4855        let arr = d.flush(None).unwrap();
4856        let dict = arr
4857            .as_any()
4858            .downcast_ref::<DictionaryArray<Int32Type>>()
4859            .unwrap();
4860        assert_eq!(dict.len(), 1);
4861        let expected = Int32Array::from(vec![1]);
4862        assert_eq!(dict.keys(), &expected);
4863        let values = dict
4864            .values()
4865            .as_any()
4866            .downcast_ref::<StringArray>()
4867            .unwrap();
4868        assert_eq!(values.value(1), "B");
4869    }
4870
4871    #[test]
4872    fn test_default_append_uuid_and_type_error() {
4873        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4874        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4875        d.append_default(&AvroLiteral::String(uuid_str.into()))
4876            .unwrap();
4877        let arr_ref = d.flush(None).unwrap();
4878        let arr = arr_ref
4879            .as_any()
4880            .downcast_ref::<FixedSizeBinaryArray>()
4881            .unwrap();
4882        assert_eq!(arr.value_length(), 16);
4883        assert_eq!(arr.len(), 1);
4884        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4885        let err = d2
4886            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4887            .unwrap_err();
4888        assert!(
4889            err.to_string().contains("Default for uuid must be string"),
4890            "unexpected error: {err:?}"
4891        );
4892    }
4893
4894    #[test]
4895    fn test_default_append_fixed_and_length_mismatch() {
4896        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4897        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4898            .unwrap();
4899        let arr_ref = d.flush(None).unwrap();
4900        let arr = arr_ref
4901            .as_any()
4902            .downcast_ref::<FixedSizeBinaryArray>()
4903            .unwrap();
4904        assert_eq!(arr.value_length(), 4);
4905        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4906        let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4907        let err = d_err
4908            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4909            .unwrap_err();
4910        assert!(
4911            err.to_string().contains("Fixed default length"),
4912            "unexpected error: {err:?}"
4913        );
4914    }
4915
4916    #[test]
4917    fn test_default_append_duration_and_length_validation() {
4918        let dt = avro_from_codec(Codec::Interval);
4919        let mut d = Decoder::try_new(&dt).unwrap();
4920        let mut bytes = Vec::with_capacity(12);
4921        bytes.extend_from_slice(&1u32.to_le_bytes());
4922        bytes.extend_from_slice(&2u32.to_le_bytes());
4923        bytes.extend_from_slice(&3u32.to_le_bytes());
4924        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4925        let arr_ref = d.flush(None).unwrap();
4926        let arr = arr_ref
4927            .as_any()
4928            .downcast_ref::<IntervalMonthDayNanoArray>()
4929            .unwrap();
4930        assert_eq!(arr.len(), 1);
4931        let v = arr.value(0);
4932        assert_eq!(v.months, 1);
4933        assert_eq!(v.days, 2);
4934        assert_eq!(v.nanoseconds, 3_000_000);
4935        let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4936        let err = d_err
4937            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4938            .unwrap_err();
4939        assert!(
4940            err.to_string()
4941                .contains("Duration default must be exactly 12 bytes"),
4942            "unexpected error: {err:?}"
4943        );
4944    }
4945
4946    #[test]
4947    fn test_default_append_decimal256_from_bytes() {
4948        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4949        let mut d = Decoder::try_new(&dt).unwrap();
4950        let pos: [u8; 32] = [
4951            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4952            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4953            0x00, 0x00, 0x30, 0x39,
4954        ];
4955        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4956        let neg: [u8; 32] = [
4957            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4958            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4959            0xFF, 0xFF, 0xFF, 0x85,
4960        ];
4961        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4962        let arr = d.flush(None).unwrap();
4963        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4964        assert_eq!(dec.len(), 2);
4965        assert_eq!(dec.value_as_string(0), "123.45");
4966        assert_eq!(dec.value_as_string(1), "-1.23");
4967    }
4968
4969    #[test]
4970    fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4971        let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4972        let mut rec = make_record_decoder_with_projector_defaults(
4973            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4974            field_defaults,
4975            vec![],
4976        );
4977        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4978        map.insert("a".to_string(), AvroLiteral::Int(7));
4979        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4980        let arr = rec.flush(None).unwrap();
4981        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4982        let a = s
4983            .column_by_name("a")
4984            .unwrap()
4985            .as_any()
4986            .downcast_ref::<Int32Array>()
4987            .unwrap();
4988        let b = s
4989            .column_by_name("b")
4990            .unwrap()
4991            .as_any()
4992            .downcast_ref::<StringArray>()
4993            .unwrap();
4994        assert_eq!(a.value(0), 7);
4995        assert_eq!(b.value(0), "hi");
4996    }
4997
4998    #[test]
4999    fn test_record_append_default_null_uses_projector_field_defaults() {
5000        let field_defaults = vec![
5001            Some(AvroLiteral::Int(5)),
5002            Some(AvroLiteral::String("x".into())),
5003        ];
5004        let mut rec = make_record_decoder_with_projector_defaults(
5005            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
5006            field_defaults,
5007            vec![],
5008        );
5009        rec.append_default(&AvroLiteral::Null).unwrap();
5010        let arr = rec.flush(None).unwrap();
5011        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5012        let a = s
5013            .column_by_name("a")
5014            .unwrap()
5015            .as_any()
5016            .downcast_ref::<Int32Array>()
5017            .unwrap();
5018        let b = s
5019            .column_by_name("b")
5020            .unwrap()
5021            .as_any()
5022            .downcast_ref::<StringArray>()
5023            .unwrap();
5024        assert_eq!(a.value(0), 5);
5025        assert_eq!(b.value(0), "x");
5026    }
5027
5028    #[test]
5029    fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
5030     {
5031        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
5032        let mut field_refs: Vec<FieldRef> = Vec::new();
5033        let mut encoders: Vec<Decoder> = Vec::new();
5034        for (name, dt, nullable) in &fields {
5035            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
5036        }
5037        let enc_a = Decoder::Nullable(
5038            NullablePlan::ReadTag {
5039                nullability: Nullability::NullSecond,
5040                resolution: ResolutionPlan::Promotion(Promotion::Direct),
5041            },
5042            NullBufferBuilder::new(DEFAULT_CAPACITY),
5043            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
5044        );
5045        let enc_b = Decoder::Nullable(
5046            NullablePlan::ReadTag {
5047                nullability: Nullability::NullSecond,
5048                resolution: ResolutionPlan::Promotion(Promotion::Direct),
5049            },
5050            NullBufferBuilder::new(DEFAULT_CAPACITY),
5051            Box::new(Decoder::String(
5052                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
5053                Vec::with_capacity(DEFAULT_CAPACITY),
5054            )),
5055        );
5056        encoders.push(enc_a);
5057        encoders.push(enc_b);
5058        let field_defaults = vec![None, None]; // no defaults -> append_null
5059        let projector = Projector {
5060            writer_projections: vec![],
5061            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
5062        };
5063        let mut rec = Decoder::Record(field_refs.into(), encoders, field_defaults, Some(projector));
5064        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
5065        map.insert("a".to_string(), AvroLiteral::Int(9));
5066        rec.append_default(&AvroLiteral::Map(map)).unwrap();
5067        let arr = rec.flush(None).unwrap();
5068        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5069        let a = s
5070            .column_by_name("a")
5071            .unwrap()
5072            .as_any()
5073            .downcast_ref::<Int32Array>()
5074            .unwrap();
5075        let b = s
5076            .column_by_name("b")
5077            .unwrap()
5078            .as_any()
5079            .downcast_ref::<StringArray>()
5080            .unwrap();
5081        assert!(a.is_valid(0));
5082        assert_eq!(a.value(0), 9);
5083        assert!(b.is_null(0));
5084    }
5085
5086    #[test]
5087    fn test_projector_default_injection_when_writer_lacks_fields() {
5088        let defaults = vec![None, None];
5089        let injections = vec![
5090            (0, AvroLiteral::Int(99)),
5091            (1, AvroLiteral::String("alice".into())),
5092        ];
5093        let mut rec = make_record_decoder_with_projector_defaults(
5094            &[
5095                ("id", DataType::Int32, false),
5096                ("name", DataType::Utf8, false),
5097            ],
5098            defaults,
5099            injections,
5100        );
5101        rec.decode(&mut AvroCursor::new(&[])).unwrap();
5102        let arr = rec.flush(None).unwrap();
5103        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5104        let id = s
5105            .column_by_name("id")
5106            .unwrap()
5107            .as_any()
5108            .downcast_ref::<Int32Array>()
5109            .unwrap();
5110        let name = s
5111            .column_by_name("name")
5112            .unwrap()
5113            .as_any()
5114            .downcast_ref::<StringArray>()
5115            .unwrap();
5116        assert_eq!(id.value(0), 99);
5117        assert_eq!(name.value(0), "alice");
5118    }
5119
5120    #[test]
5121    fn union_type_ids_are_not_child_indexes() {
5122        let encodings: Vec<AvroDataType> =
5123            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
5124        let fields: UnionFields = [
5125            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
5126            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
5127        ]
5128        .into_iter()
5129        .collect();
5130        let dt = avro_from_codec(Codec::Union(
5131            encodings.into(),
5132            fields.clone(),
5133            UnionMode::Dense,
5134        ));
5135        let mut dec = Decoder::try_new(&dt).expect("decoder");
5136        let mut b1 = encode_avro_long(1);
5137        b1.extend(encode_avro_bytes("hi".as_bytes()));
5138        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
5139        let mut b0 = encode_avro_long(0);
5140        b0.extend(encode_avro_int(5));
5141        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
5142        let arr = dec.flush(None).expect("flush");
5143        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
5144        assert_eq!(ua.len(), 2);
5145        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
5146        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
5147        assert_eq!(ua.value_offset(0), 0);
5148        assert_eq!(ua.value_offset(1), 0);
5149        let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
5150        assert_eq!(utf8_child.len(), 1);
5151        assert_eq!(utf8_child.value(0), "hi");
5152        let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
5153        assert_eq!(int_child.len(), 1);
5154        assert_eq!(int_child.value(0), 5);
5155        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
5156        assert_eq!(type_ids, vec![42_i8, 7_i8]);
5157    }
5158
5159    #[cfg(feature = "avro_custom_types")]
5160    #[test]
5161    fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
5162        for codec in [
5163            Codec::DurationNanos,
5164            Codec::DurationMicros,
5165            Codec::DurationMillis,
5166            Codec::DurationSeconds,
5167        ] {
5168            let dt = make_avro_dt(codec.clone(), None);
5169            let s = Skipper::from_avro(&dt)?;
5170            match s {
5171                Skipper::Int64 => {}
5172                other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
5173            }
5174        }
5175        Ok(())
5176    }
5177
5178    #[cfg(feature = "avro_custom_types")]
5179    #[test]
5180    fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
5181        let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
5182        for codec in [
5183            Codec::DurationNanos,
5184            Codec::DurationMicros,
5185            Codec::DurationMillis,
5186            Codec::DurationSeconds,
5187        ] {
5188            let dt = make_avro_dt(codec.clone(), None);
5189            let s = Skipper::from_avro(&dt)?;
5190            for &v in &values {
5191                let bytes = encode_avro_long(v);
5192                let mut cursor = AvroCursor::new(&bytes);
5193                s.skip(&mut cursor)?;
5194                assert_eq!(
5195                    cursor.position(),
5196                    bytes.len(),
5197                    "did not consume all bytes for {:?} value {}",
5198                    codec,
5199                    v
5200                );
5201            }
5202        }
5203        Ok(())
5204    }
5205
5206    #[cfg(feature = "avro_custom_types")]
5207    #[test]
5208    fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
5209        let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
5210        let s = Skipper::from_avro(&dt)?;
5211        match &s {
5212            Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
5213                Skipper::Int64 => {}
5214                ref other => panic!("expected inner Int64, got {:?}", other),
5215            },
5216            other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
5217        }
5218        {
5219            let buf = encode_vlq_u64(0);
5220            let mut cursor = AvroCursor::new(&buf);
5221            s.skip(&mut cursor)?;
5222            assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
5223        }
5224        {
5225            let mut buf = encode_vlq_u64(1);
5226            buf.extend(encode_avro_long(0));
5227            let mut cursor = AvroCursor::new(&buf);
5228            s.skip(&mut cursor)?;
5229            assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
5230        }
5231
5232        Ok(())
5233    }
5234
5235    #[cfg(feature = "avro_custom_types")]
5236    #[test]
5237    fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
5238        let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
5239        let s = Skipper::from_avro(&dt)?;
5240        match &s {
5241            Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
5242                Skipper::Int64 => {}
5243                ref other => panic!("expected inner Int64, got {:?}", other),
5244            },
5245            other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
5246        }
5247        {
5248            let buf = encode_vlq_u64(1);
5249            let mut cursor = AvroCursor::new(&buf);
5250            s.skip(&mut cursor)?;
5251            assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
5252        }
5253        {
5254            let mut buf = encode_vlq_u64(0);
5255            buf.extend(encode_avro_long(-1));
5256            let mut cursor = AvroCursor::new(&buf);
5257            s.skip(&mut cursor)?;
5258            assert_eq!(
5259                cursor.position(),
5260                1 + encode_avro_long(-1).len(),
5261                "expected to consume tag=0 + long(-1)"
5262            );
5263        }
5264        Ok(())
5265    }
5266
5267    #[test]
5268    fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
5269        let dt = make_avro_dt(Codec::Interval, None);
5270        let s = Skipper::from_avro(&dt)?;
5271        match s {
5272            Skipper::DurationFixed12 => {}
5273            other => panic!("expected DurationFixed12, got {:?}", other),
5274        }
5275        let payload = vec![0u8; 12];
5276        let mut cursor = AvroCursor::new(&payload);
5277        s.skip(&mut cursor)?;
5278        assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
5279        Ok(())
5280    }
5281
5282    #[cfg(feature = "avro_custom_types")]
5283    #[test]
5284    fn test_run_end_encoded_width16_int32_basic_grouping() {
5285        use arrow_array::RunArray;
5286        use std::sync::Arc;
5287        let inner = avro_from_codec(Codec::Int32);
5288        let ree = AvroDataType::new(
5289            Codec::RunEndEncoded(Arc::new(inner), 16),
5290            Default::default(),
5291            None,
5292        );
5293        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5294        for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
5295            let bytes = encode_avro_int(v);
5296            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5297        }
5298        let arr = dec.flush(None).expect("flush");
5299        let ra = arr
5300            .as_any()
5301            .downcast_ref::<RunArray<Int16Type>>()
5302            .expect("RunArray<Int16Type>");
5303        assert_eq!(ra.len(), 9);
5304        assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
5305        let vals = ra
5306            .values()
5307            .as_ref()
5308            .as_any()
5309            .downcast_ref::<Int32Array>()
5310            .expect("values Int32");
5311        assert_eq!(vals.values(), &[1, 2, 3]);
5312    }
5313
5314    #[cfg(feature = "avro_custom_types")]
5315    #[test]
5316    fn test_run_end_encoded_width32_nullable_values_group_nulls() {
5317        use arrow_array::RunArray;
5318        use std::sync::Arc;
5319        let inner = AvroDataType::new(
5320            Codec::Int32,
5321            Default::default(),
5322            Some(Nullability::NullSecond),
5323        );
5324        let ree = AvroDataType::new(
5325            Codec::RunEndEncoded(Arc::new(inner), 32),
5326            Default::default(),
5327            None,
5328        );
5329        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5330        let seq: [Option<i32>; 8] = [
5331            None,
5332            None,
5333            Some(7),
5334            Some(7),
5335            Some(7),
5336            None,
5337            Some(5),
5338            Some(5),
5339        ];
5340        for item in seq {
5341            let mut bytes = Vec::new();
5342            match item {
5343                None => bytes.extend_from_slice(&encode_vlq_u64(1)),
5344                Some(v) => {
5345                    bytes.extend_from_slice(&encode_vlq_u64(0));
5346                    bytes.extend_from_slice(&encode_avro_int(v));
5347                }
5348            }
5349            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5350        }
5351        let arr = dec.flush(None).expect("flush");
5352        let ra = arr
5353            .as_any()
5354            .downcast_ref::<RunArray<Int32Type>>()
5355            .expect("RunArray<Int32Type>");
5356        assert_eq!(ra.len(), 8);
5357        assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
5358        let vals = ra
5359            .values()
5360            .as_ref()
5361            .as_any()
5362            .downcast_ref::<Int32Array>()
5363            .expect("values Int32 (nullable)");
5364        assert_eq!(vals.len(), 4);
5365        assert!(vals.is_null(0));
5366        assert_eq!(vals.value(1), 7);
5367        assert!(vals.is_null(2));
5368        assert_eq!(vals.value(3), 5);
5369    }
5370
5371    #[cfg(feature = "avro_custom_types")]
5372    #[test]
5373    fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
5374        use arrow_array::RunArray;
5375        let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
5376        let ree = Decoder::RunEndEncoded(
5377            8, /* bytes => Int64 run-ends */
5378            0,
5379            Box::new(inner_values),
5380        );
5381        let mut dec = Decoder::Nullable(
5382            NullablePlan::FromSingle {
5383                resolution: ResolutionPlan::Promotion(Promotion::IntToDouble),
5384            },
5385            NullBufferBuilder::new(DEFAULT_CAPACITY),
5386            Box::new(ree),
5387        );
5388        for v in [1, 1, 2, 2, 2] {
5389            let bytes = encode_avro_int(v);
5390            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5391        }
5392        let arr = dec.flush(None).expect("flush");
5393        let ra = arr
5394            .as_any()
5395            .downcast_ref::<RunArray<Int64Type>>()
5396            .expect("RunArray<Int64Type>");
5397        assert_eq!(ra.len(), 5);
5398        assert_eq!(ra.run_ends().values(), &[2, 5]);
5399        let vals = ra
5400            .values()
5401            .as_ref()
5402            .as_any()
5403            .downcast_ref::<Float64Array>()
5404            .expect("values Float64");
5405        assert_eq!(vals.values(), &[1.0, 2.0]);
5406    }
5407
5408    #[cfg(feature = "avro_custom_types")]
5409    #[test]
5410    fn test_run_end_encoded_unsupported_run_end_width_errors() {
5411        use std::sync::Arc;
5412        let inner = avro_from_codec(Codec::Int32);
5413        let dt = AvroDataType::new(
5414            Codec::RunEndEncoded(Arc::new(inner), 3),
5415            Default::default(),
5416            None,
5417        );
5418        let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
5419        let msg = err.to_string();
5420        assert!(
5421            msg.contains("Unsupported run-end width")
5422                && msg.contains("16/32/64 bits or 2/4/8 bytes"),
5423            "unexpected error message: {msg}"
5424        );
5425    }
5426
5427    #[cfg(feature = "avro_custom_types")]
5428    #[test]
5429    fn test_run_end_encoded_empty_input_is_empty_runarray() {
5430        use arrow_array::RunArray;
5431        use std::sync::Arc;
5432        let inner = avro_from_codec(Codec::Utf8);
5433        let dt = AvroDataType::new(
5434            Codec::RunEndEncoded(Arc::new(inner), 4),
5435            Default::default(),
5436            None,
5437        );
5438        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5439        let arr = dec.flush(None).expect("flush");
5440        let ra = arr
5441            .as_any()
5442            .downcast_ref::<RunArray<Int32Type>>()
5443            .expect("RunArray<Int32Type>");
5444        assert_eq!(ra.len(), 0);
5445        assert_eq!(ra.run_ends().len(), 0);
5446        assert_eq!(ra.values().len(), 0);
5447    }
5448
5449    #[cfg(feature = "avro_custom_types")]
5450    #[test]
5451    fn test_run_end_encoded_strings_grouping_width32_bits() {
5452        use arrow_array::RunArray;
5453        use std::sync::Arc;
5454        let inner = avro_from_codec(Codec::Utf8);
5455        let dt = AvroDataType::new(
5456            Codec::RunEndEncoded(Arc::new(inner), 32),
5457            Default::default(),
5458            None,
5459        );
5460        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5461        for s in ["a", "a", "bb", "bb", "bb", "a"] {
5462            let bytes = encode_avro_bytes(s.as_bytes());
5463            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5464        }
5465        let arr = dec.flush(None).expect("flush");
5466        let ra = arr
5467            .as_any()
5468            .downcast_ref::<RunArray<Int32Type>>()
5469            .expect("RunArray<Int32Type>");
5470        assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
5471        let vals = ra
5472            .values()
5473            .as_ref()
5474            .as_any()
5475            .downcast_ref::<StringArray>()
5476            .expect("values String");
5477        assert_eq!(vals.len(), 3);
5478        assert_eq!(vals.value(0), "a");
5479        assert_eq!(vals.value(1), "bb");
5480        assert_eq!(vals.value(2), "a");
5481    }
5482
5483    #[cfg(not(feature = "avro_custom_types"))]
5484    #[test]
5485    fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
5486        let dt = avro_from_codec(Codec::Int32);
5487        let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
5488        for v in [1, 2, 3] {
5489            let bytes = encode_avro_int(v);
5490            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5491        }
5492        let arr = dec.flush(None).expect("flush");
5493        let a = arr
5494            .as_any()
5495            .downcast_ref::<Int32Array>()
5496            .expect("Int32Array");
5497        assert_eq!(a.values(), &[1, 2, 3]);
5498    }
5499
5500    #[test]
5501    fn test_timestamp_nanos_decoding_offset_zero() {
5502        let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::OffsetZero)));
5503        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5504        let mut data = Vec::new();
5505        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5506            data.extend_from_slice(&encode_avro_long(v));
5507        }
5508        let mut cur = AvroCursor::new(&data);
5509        for _ in 0..4 {
5510            decoder.decode(&mut cur).expect("decode nanos ts");
5511        }
5512        let array = decoder.flush(None).expect("flush nanos ts");
5513        let ts = array
5514            .as_any()
5515            .downcast_ref::<TimestampNanosecondArray>()
5516            .expect("TimestampNanosecondArray");
5517        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5518        match ts.data_type() {
5519            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5520                assert_eq!(tz.as_deref(), Some("+00:00"));
5521            }
5522            other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
5523        }
5524    }
5525
5526    #[test]
5527    fn test_timestamp_nanos_decoding_utc() {
5528        let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::Utc)));
5529        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5530        let mut data = Vec::new();
5531        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5532            data.extend_from_slice(&encode_avro_long(v));
5533        }
5534        let mut cur = AvroCursor::new(&data);
5535        for _ in 0..4 {
5536            decoder.decode(&mut cur).expect("decode nanos ts");
5537        }
5538        let array = decoder.flush(None).expect("flush nanos ts");
5539        let ts = array
5540            .as_any()
5541            .downcast_ref::<TimestampNanosecondArray>()
5542            .expect("TimestampNanosecondArray");
5543        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5544        match ts.data_type() {
5545            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5546                assert_eq!(tz.as_deref(), Some("UTC"));
5547            }
5548            other => panic!("expected Timestamp(Nanosecond, Some(\"UTC\")), got {other:?}"),
5549        }
5550    }
5551
5552    #[test]
5553    fn test_timestamp_nanos_decoding_local() {
5554        let avro_type = avro_from_codec(Codec::TimestampNanos(None));
5555        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5556        let mut data = Vec::new();
5557        for v in [10_i64, 20_i64, -30_i64] {
5558            data.extend_from_slice(&encode_avro_long(v));
5559        }
5560        let mut cur = AvroCursor::new(&data);
5561        for _ in 0..3 {
5562            decoder.decode(&mut cur).expect("decode nanos ts");
5563        }
5564        let array = decoder.flush(None).expect("flush nanos ts");
5565        let ts = array
5566            .as_any()
5567            .downcast_ref::<TimestampNanosecondArray>()
5568            .expect("TimestampNanosecondArray");
5569        assert_eq!(ts.values(), &[10, 20, -30]);
5570        match ts.data_type() {
5571            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5572                assert_eq!(tz.as_deref(), None);
5573            }
5574            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5575        }
5576    }
5577
5578    #[test]
5579    fn test_timestamp_nanos_decoding_with_nulls() {
5580        let avro_type = AvroDataType::new(
5581            Codec::TimestampNanos(None),
5582            Default::default(),
5583            Some(Nullability::NullFirst),
5584        );
5585        let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
5586        let mut data = Vec::new();
5587        data.extend_from_slice(&encode_avro_long(1));
5588        data.extend_from_slice(&encode_avro_long(42));
5589        data.extend_from_slice(&encode_avro_long(0));
5590        data.extend_from_slice(&encode_avro_long(1));
5591        data.extend_from_slice(&encode_avro_long(-7));
5592        let mut cur = AvroCursor::new(&data);
5593        for _ in 0..3 {
5594            decoder.decode(&mut cur).expect("decode nullable nanos ts");
5595        }
5596        let array = decoder.flush(None).expect("flush nullable nanos ts");
5597        let ts = array
5598            .as_any()
5599            .downcast_ref::<TimestampNanosecondArray>()
5600            .expect("TimestampNanosecondArray");
5601        assert_eq!(ts.len(), 3);
5602        assert!(ts.is_valid(0));
5603        assert!(ts.is_null(1));
5604        assert!(ts.is_valid(2));
5605        assert_eq!(ts.value(0), 42);
5606        assert_eq!(ts.value(2), -7);
5607        match ts.data_type() {
5608            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5609                assert_eq!(tz.as_deref(), None);
5610            }
5611            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5612        }
5613    }
5614}