Skip to main content

arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Decoder for Arrow types.
19
20use crate::codec::{
21    AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
22    ResolvedUnion,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36    DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37    Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use std::cmp::Ordering;
42use std::sync::Arc;
43use strum_macros::AsRefStr;
44use uuid::Uuid;
45
46const DEFAULT_CAPACITY: usize = 1024;
47
48/// Runtime plan for decoding reader-side `["null", T]` types.
49#[derive(Clone, Copy, Debug)]
50enum NullablePlan {
51    /// Writer actually wrote a union (branch tag present).
52    ReadTag,
53    /// Writer wrote a single (non-union) value resolved to the non-null branch
54    /// of the reader union; do NOT read a branch tag, but apply any promotion.
55    FromSingle { promotion: Promotion },
56}
57
58/// Macro to decode a decimal payload for a given width and integer type.
59macro_rules! decode_decimal {
60    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
61        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
62        $builder.append_value(<$Int>::from_be_bytes(bytes));
63    }};
64}
65
66/// Macro to finish a decimal builder into an array with precision/scale and nulls.
67macro_rules! flush_decimal {
68    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
69        let (_, vals, _) = $builder.finish().into_parts();
70        let dec = <$ArrayTy>::try_new(vals, $nulls)?
71            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
72        Arc::new(dec) as ArrayRef
73    }};
74}
75
76/// Macro to append a default decimal value from two's-complement big-endian bytes
77/// into the corresponding decimal builder, with compile-time constructed error text.
78macro_rules! append_decimal_default {
79    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
80        match $lit {
81            AvroLiteral::Bytes(b) => {
82                let ext = sign_cast_to::<$N>(b)?;
83                let val = <$Int>::from_be_bytes(ext);
84                $builder.append_value(val);
85                Ok(())
86            }
87            _ => Err(AvroError::InvalidArgument(
88                concat!(
89                    "Default for ",
90                    $name,
91                    " must be bytes (two's-complement big-endian)"
92                )
93                .to_string(),
94            )),
95        }
96    }};
97}
98
99/// Decodes avro encoded data into [`RecordBatch`]
100#[derive(Debug)]
101pub(crate) struct RecordDecoder {
102    schema: SchemaRef,
103    fields: Vec<Decoder>,
104    projector: Option<Projector>,
105}
106
107impl RecordDecoder {
108    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
109    ///
110    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
111    ///
112    /// # Arguments
113    /// * `data_type` - The Avro data type to decode.
114    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
115    ///
116    /// # Errors
117    /// This function will return an error if the provided `data_type` is not a `Record`.
118    pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
119        match data_type.codec() {
120            Codec::Struct(reader_fields) => {
121                // Build Arrow schema fields and per-child decoders
122                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
123                let mut encodings = Vec::with_capacity(reader_fields.len());
124                for avro_field in reader_fields.iter() {
125                    arrow_fields.push(avro_field.field());
126                    encodings.push(Decoder::try_new(avro_field.data_type())?);
127                }
128                let projector = match data_type.resolution.as_ref() {
129                    Some(ResolutionInfo::Record(rec)) => {
130                        Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
131                    }
132                    _ => None,
133                };
134                Ok(Self {
135                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
136                    fields: encodings,
137                    projector,
138                })
139            }
140            other => Err(AvroError::ParseError(format!(
141                "Expected record got {other:?}"
142            ))),
143        }
144    }
145
146    /// Returns the decoder's `SchemaRef`
147    pub(crate) fn schema(&self) -> &SchemaRef {
148        &self.schema
149    }
150
151    /// Decode `count` records from `buf`
152    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
153        let mut cursor = AvroCursor::new(buf);
154        match self.projector.as_mut() {
155            Some(proj) => {
156                for _ in 0..count {
157                    proj.project_record(&mut cursor, &mut self.fields)?;
158                }
159            }
160            None => {
161                for _ in 0..count {
162                    for field in &mut self.fields {
163                        field.decode(&mut cursor)?;
164                    }
165                }
166            }
167        }
168        Ok(cursor.position())
169    }
170
171    /// Flush the decoded records into a [`RecordBatch`]
172    pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
173        let arrays = self
174            .fields
175            .iter_mut()
176            .map(|x| x.flush(None))
177            .collect::<Result<Vec<_>, _>>()?;
178        RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
179    }
180}
181
182#[derive(Debug)]
183struct EnumResolution {
184    mapping: Arc<[i32]>,
185    default_index: i32,
186}
187
188#[derive(Debug, AsRefStr)]
189enum Decoder {
190    Null(usize),
191    Boolean(BooleanBufferBuilder),
192    Int32(Vec<i32>),
193    Int64(Vec<i64>),
194    #[cfg(feature = "avro_custom_types")]
195    DurationSecond(Vec<i64>),
196    #[cfg(feature = "avro_custom_types")]
197    DurationMillisecond(Vec<i64>),
198    #[cfg(feature = "avro_custom_types")]
199    DurationMicrosecond(Vec<i64>),
200    #[cfg(feature = "avro_custom_types")]
201    DurationNanosecond(Vec<i64>),
202    Float32(Vec<f32>),
203    Float64(Vec<f64>),
204    Date32(Vec<i32>),
205    TimeMillis(Vec<i32>),
206    TimeMicros(Vec<i64>),
207    TimestampMillis(bool, Vec<i64>),
208    TimestampMicros(bool, Vec<i64>),
209    TimestampNanos(bool, Vec<i64>),
210    Int32ToInt64(Vec<i64>),
211    Int32ToFloat32(Vec<f32>),
212    Int32ToFloat64(Vec<f64>),
213    Int64ToFloat32(Vec<f32>),
214    Int64ToFloat64(Vec<f64>),
215    Float32ToFloat64(Vec<f64>),
216    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
217    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
218    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
219    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
220    String(OffsetBufferBuilder<i32>, Vec<u8>),
221    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
222    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
223    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
224    Record(Fields, Vec<Decoder>, Option<Projector>),
225    Map(
226        FieldRef,
227        OffsetBufferBuilder<i32>,
228        OffsetBufferBuilder<i32>,
229        Vec<u8>,
230        Box<Decoder>,
231    ),
232    Fixed(i32, Vec<u8>),
233    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
234    Duration(IntervalMonthDayNanoBuilder),
235    Uuid(Vec<u8>),
236    #[cfg(feature = "small_decimals")]
237    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
238    #[cfg(feature = "small_decimals")]
239    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
240    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
241    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
242    #[cfg(feature = "avro_custom_types")]
243    RunEndEncoded(u8, usize, Box<Decoder>),
244    Union(UnionDecoder),
245    Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
246}
247
248impl Decoder {
249    fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
250        if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
251            if info.writer_is_union && !info.reader_is_union {
252                let mut clone = data_type.clone();
253                clone.resolution = None; // Build target base decoder without Union resolution
254                let target = Box::new(Self::try_new_internal(&clone)?);
255                let decoder = Self::Union(
256                    UnionDecoderBuilder::new()
257                        .with_resolved_union(info.clone())
258                        .with_target(target)
259                        .build()?,
260                );
261                return Ok(decoder);
262            }
263        }
264        Self::try_new_internal(data_type)
265    }
266
267    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
268        // Extract just the Promotion (if any) to simplify pattern matching
269        let promotion = match data_type.resolution.as_ref() {
270            Some(ResolutionInfo::Promotion(p)) => Some(p),
271            _ => None,
272        };
273        let decoder = match (data_type.codec(), promotion) {
274            (Codec::Int64, Some(Promotion::IntToLong)) => {
275                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
276            }
277            (Codec::Float32, Some(Promotion::IntToFloat)) => {
278                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
279            }
280            (Codec::Float64, Some(Promotion::IntToDouble)) => {
281                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
282            }
283            (Codec::Float32, Some(Promotion::LongToFloat)) => {
284                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
285            }
286            (Codec::Float64, Some(Promotion::LongToDouble)) => {
287                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
288            }
289            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
290                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
291            }
292            (Codec::Utf8, Some(Promotion::BytesToString))
293            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
294                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
295                Vec::with_capacity(DEFAULT_CAPACITY),
296            ),
297            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
298                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
299                Vec::with_capacity(DEFAULT_CAPACITY),
300            ),
301            (Codec::Null, _) => Self::Null(0),
302            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
303            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
304            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
305            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
306            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
307            (Codec::Binary, _) => Self::Binary(
308                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
309                Vec::with_capacity(DEFAULT_CAPACITY),
310            ),
311            (Codec::Utf8, _) => Self::String(
312                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
313                Vec::with_capacity(DEFAULT_CAPACITY),
314            ),
315            (Codec::Utf8View, _) => Self::StringView(
316                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
317                Vec::with_capacity(DEFAULT_CAPACITY),
318            ),
319            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
320            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
321            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
322            (Codec::TimestampMillis(is_utc), _) => {
323                Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
324            }
325            (Codec::TimestampMicros(is_utc), _) => {
326                Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
327            }
328            (Codec::TimestampNanos(is_utc), _) => {
329                Self::TimestampNanos(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
330            }
331            #[cfg(feature = "avro_custom_types")]
332            (Codec::DurationNanos, _) => {
333                Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
334            }
335            #[cfg(feature = "avro_custom_types")]
336            (Codec::DurationMicros, _) => {
337                Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
338            }
339            #[cfg(feature = "avro_custom_types")]
340            (Codec::DurationMillis, _) => {
341                Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
342            }
343            #[cfg(feature = "avro_custom_types")]
344            (Codec::DurationSeconds, _) => {
345                Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
346            }
347            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
348            (Codec::Decimal(precision, scale, size), _) => {
349                let p = *precision;
350                let s = *scale;
351                let prec = p as u8;
352                let scl = s.unwrap_or(0) as i8;
353                #[cfg(feature = "small_decimals")]
354                {
355                    if p <= DECIMAL32_MAX_PRECISION as usize {
356                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
357                            .with_precision_and_scale(prec, scl)?;
358                        Self::Decimal32(p, s, *size, builder)
359                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
360                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
361                            .with_precision_and_scale(prec, scl)?;
362                        Self::Decimal64(p, s, *size, builder)
363                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
364                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
365                            .with_precision_and_scale(prec, scl)?;
366                        Self::Decimal128(p, s, *size, builder)
367                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
368                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
369                            .with_precision_and_scale(prec, scl)?;
370                        Self::Decimal256(p, s, *size, builder)
371                    } else {
372                        return Err(AvroError::ParseError(format!(
373                            "Decimal precision {p} exceeds maximum supported"
374                        )));
375                    }
376                }
377                #[cfg(not(feature = "small_decimals"))]
378                {
379                    if p <= DECIMAL128_MAX_PRECISION as usize {
380                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
381                            .with_precision_and_scale(prec, scl)?;
382                        Self::Decimal128(p, s, *size, builder)
383                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
384                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
385                            .with_precision_and_scale(prec, scl)?;
386                        Self::Decimal256(p, s, *size, builder)
387                    } else {
388                        return Err(AvroError::ParseError(format!(
389                            "Decimal precision {p} exceeds maximum supported"
390                        )));
391                    }
392                }
393            }
394            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
395            (Codec::List(item), _) => {
396                let decoder = Self::try_new(item)?;
397                Self::Array(
398                    Arc::new(item.field_with_name("item")),
399                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
400                    Box::new(decoder),
401                )
402            }
403            (Codec::Enum(symbols), _) => {
404                let res = match data_type.resolution.as_ref() {
405                    Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
406                        mapping: mapping.mapping.clone(),
407                        default_index: mapping.default_index,
408                    }),
409                    _ => None,
410                };
411                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
412            }
413            (Codec::Struct(fields), _) => {
414                let mut arrow_fields = Vec::with_capacity(fields.len());
415                let mut encodings = Vec::with_capacity(fields.len());
416                for avro_field in fields.iter() {
417                    let encoding = Self::try_new(avro_field.data_type())?;
418                    arrow_fields.push(avro_field.field());
419                    encodings.push(encoding);
420                }
421                let projector =
422                    if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
423                        Some(ProjectorBuilder::try_new(rec, fields).build()?)
424                    } else {
425                        None
426                    };
427                Self::Record(arrow_fields.into(), encodings, projector)
428            }
429            (Codec::Map(child), _) => {
430                let val_field = child.field_with_name("value");
431                let map_field = Arc::new(ArrowField::new(
432                    "entries",
433                    DataType::Struct(Fields::from(vec![
434                        ArrowField::new("key", DataType::Utf8, false),
435                        val_field,
436                    ])),
437                    false,
438                ));
439                let val_dec = Self::try_new(child)?;
440                Self::Map(
441                    map_field,
442                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
443                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
444                    Vec::with_capacity(DEFAULT_CAPACITY),
445                    Box::new(val_dec),
446                )
447            }
448            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
449            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
450                let decoders = encodings
451                    .iter()
452                    .map(Self::try_new_internal)
453                    .collect::<Result<Vec<_>, _>>()?;
454                if fields.len() != decoders.len() {
455                    return Err(AvroError::SchemaError(format!(
456                        "Union has {} fields but {} decoders",
457                        fields.len(),
458                        decoders.len()
459                    )));
460                }
461                // Proactive guard: if a user provides a union with more branches than
462                // a 32-bit Avro index can address, fail fast with a clear message.
463                let branch_count = decoders.len();
464                let max_addr = (i32::MAX as usize) + 1;
465                if branch_count > max_addr {
466                    return Err(AvroError::SchemaError(format!(
467                        "Union has {branch_count} branches, which exceeds the maximum addressable \
468                         branches by an Avro int tag ({} + 1).",
469                        i32::MAX
470                    )));
471                }
472                let mut builder = UnionDecoderBuilder::new()
473                    .with_fields(fields.clone())
474                    .with_branches(decoders);
475                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
476                    if info.reader_is_union {
477                        builder = builder.with_resolved_union(info.clone());
478                    }
479                }
480                Self::Union(builder.build()?)
481            }
482            (Codec::Union(_, _, _), _) => {
483                return Err(AvroError::NYI(
484                    "Sparse Arrow unions are not yet supported".to_string(),
485                ));
486            }
487            #[cfg(feature = "avro_custom_types")]
488            (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
489                let inner = Self::try_new(values_dt)?;
490                let byte_width: u8 = match *width_bits_or_bytes {
491                    2 | 4 | 8 => *width_bits_or_bytes,
492                    16 => 2,
493                    32 => 4,
494                    64 => 8,
495                    other => {
496                        return Err(AvroError::InvalidArgument(format!(
497                            "Unsupported run-end width {other} for RunEndEncoded; \
498                             expected 16/32/64 bits or 2/4/8 bytes"
499                        )));
500                    }
501                };
502                Self::RunEndEncoded(byte_width, 0, Box::new(inner))
503            }
504        };
505        Ok(match data_type.nullability() {
506            Some(nullability) => {
507                // Default to reading a union branch tag unless the resolution proves otherwise.
508                let mut plan = NullablePlan::ReadTag;
509                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
510                    if !info.writer_is_union && info.reader_is_union {
511                        if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
512                            plan = NullablePlan::FromSingle { promotion: *promo };
513                        }
514                    }
515                }
516                Self::Nullable(
517                    nullability,
518                    NullBufferBuilder::new(DEFAULT_CAPACITY),
519                    Box::new(decoder),
520                    plan,
521                )
522            }
523            None => decoder,
524        })
525    }
526
527    /// Append a null record
528    fn append_null(&mut self) -> Result<(), AvroError> {
529        match self {
530            Self::Null(count) => *count += 1,
531            Self::Boolean(b) => b.append(false),
532            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
533            Self::Int64(v)
534            | Self::Int32ToInt64(v)
535            | Self::TimeMicros(v)
536            | Self::TimestampMillis(_, v)
537            | Self::TimestampMicros(_, v)
538            | Self::TimestampNanos(_, v) => v.push(0),
539            #[cfg(feature = "avro_custom_types")]
540            Self::DurationSecond(v)
541            | Self::DurationMillisecond(v)
542            | Self::DurationMicrosecond(v)
543            | Self::DurationNanosecond(v) => v.push(0),
544            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
545            Self::Float64(v)
546            | Self::Int32ToFloat64(v)
547            | Self::Int64ToFloat64(v)
548            | Self::Float32ToFloat64(v) => v.push(0.),
549            Self::Binary(offsets, _)
550            | Self::String(offsets, _)
551            | Self::StringView(offsets, _)
552            | Self::BytesToString(offsets, _)
553            | Self::StringToBytes(offsets, _) => {
554                offsets.push_length(0);
555            }
556            Self::Uuid(v) => {
557                v.extend([0; 16]);
558            }
559            Self::Array(_, offsets, _) => {
560                offsets.push_length(0);
561            }
562            Self::Record(_, e, _) => {
563                for encoding in e.iter_mut() {
564                    encoding.append_null()?;
565                }
566            }
567            Self::Map(_, _koff, moff, _, _) => {
568                moff.push_length(0);
569            }
570            Self::Fixed(sz, accum) => {
571                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
572            }
573            #[cfg(feature = "small_decimals")]
574            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
575            #[cfg(feature = "small_decimals")]
576            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
577            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
578            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
579            Self::Enum(indices, _, _) => indices.push(0),
580            Self::Duration(builder) => builder.append_null(),
581            #[cfg(feature = "avro_custom_types")]
582            Self::RunEndEncoded(_, len, inner) => {
583                *len += 1;
584                inner.append_null()?;
585            }
586            Self::Union(u) => u.append_null()?,
587            Self::Nullable(_, null_buffer, inner, _) => {
588                null_buffer.append(false);
589                inner.append_null()?;
590            }
591        }
592        Ok(())
593    }
594
595    /// Append a single default literal into the decoder's buffers
596    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
597        match self {
598            Self::Nullable(_, nb, inner, _) => {
599                if matches!(lit, AvroLiteral::Null) {
600                    nb.append(false);
601                    inner.append_null()
602                } else {
603                    nb.append(true);
604                    inner.append_default(lit)
605                }
606            }
607            Self::Null(count) => match lit {
608                AvroLiteral::Null => {
609                    *count += 1;
610                    Ok(())
611                }
612                _ => Err(AvroError::InvalidArgument(
613                    "Non-null default for null type".to_string(),
614                )),
615            },
616            Self::Boolean(b) => match lit {
617                AvroLiteral::Boolean(v) => {
618                    b.append(*v);
619                    Ok(())
620                }
621                _ => Err(AvroError::InvalidArgument(
622                    "Default for boolean must be boolean".to_string(),
623                )),
624            },
625            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
626                AvroLiteral::Int(i) => {
627                    v.push(*i);
628                    Ok(())
629                }
630                _ => Err(AvroError::InvalidArgument(
631                    "Default for int32/date32/time-millis must be int".to_string(),
632                )),
633            },
634            #[cfg(feature = "avro_custom_types")]
635            Self::DurationSecond(v)
636            | Self::DurationMillisecond(v)
637            | Self::DurationMicrosecond(v)
638            | Self::DurationNanosecond(v) => match lit {
639                AvroLiteral::Long(i) => {
640                    v.push(*i);
641                    Ok(())
642                }
643                _ => Err(AvroError::InvalidArgument(
644                    "Default for duration long must be long".to_string(),
645                )),
646            },
647            Self::Int64(v)
648            | Self::Int32ToInt64(v)
649            | Self::TimeMicros(v)
650            | Self::TimestampMillis(_, v)
651            | Self::TimestampMicros(_, v)
652            | Self::TimestampNanos(_, v) => match lit {
653                AvroLiteral::Long(i) => {
654                    v.push(*i);
655                    Ok(())
656                }
657                AvroLiteral::Int(i) => {
658                    v.push(*i as i64);
659                    Ok(())
660                }
661                _ => Err(AvroError::InvalidArgument(
662                    "Default for long/time-micros/timestamp must be long or int".to_string(),
663                )),
664            },
665            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
666                AvroLiteral::Float(f) => {
667                    v.push(*f);
668                    Ok(())
669                }
670                _ => Err(AvroError::InvalidArgument(
671                    "Default for float must be float".to_string(),
672                )),
673            },
674            Self::Float64(v)
675            | Self::Int32ToFloat64(v)
676            | Self::Int64ToFloat64(v)
677            | Self::Float32ToFloat64(v) => match lit {
678                AvroLiteral::Double(f) => {
679                    v.push(*f);
680                    Ok(())
681                }
682                _ => Err(AvroError::InvalidArgument(
683                    "Default for double must be double".to_string(),
684                )),
685            },
686            Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
687                AvroLiteral::Bytes(b) => {
688                    offsets.push_length(b.len());
689                    values.extend_from_slice(b);
690                    Ok(())
691                }
692                _ => Err(AvroError::InvalidArgument(
693                    "Default for bytes must be bytes".to_string(),
694                )),
695            },
696            Self::BytesToString(offsets, values)
697            | Self::String(offsets, values)
698            | Self::StringView(offsets, values) => match lit {
699                AvroLiteral::String(s) => {
700                    let b = s.as_bytes();
701                    offsets.push_length(b.len());
702                    values.extend_from_slice(b);
703                    Ok(())
704                }
705                _ => Err(AvroError::InvalidArgument(
706                    "Default for string must be string".to_string(),
707                )),
708            },
709            Self::Uuid(values) => match lit {
710                AvroLiteral::String(s) => {
711                    let uuid = Uuid::try_parse(s).map_err(|e| {
712                        AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
713                    })?;
714                    values.extend_from_slice(uuid.as_bytes());
715                    Ok(())
716                }
717                _ => Err(AvroError::InvalidArgument(
718                    "Default for uuid must be string".to_string(),
719                )),
720            },
721            Self::Fixed(sz, accum) => match lit {
722                AvroLiteral::Bytes(b) => {
723                    if b.len() != *sz as usize {
724                        return Err(AvroError::InvalidArgument(format!(
725                            "Fixed default length {} does not match size {sz}",
726                            b.len(),
727                        )));
728                    }
729                    accum.extend_from_slice(b);
730                    Ok(())
731                }
732                _ => Err(AvroError::InvalidArgument(
733                    "Default for fixed must be bytes".to_string(),
734                )),
735            },
736            #[cfg(feature = "small_decimals")]
737            Self::Decimal32(_, _, _, builder) => {
738                append_decimal_default!(lit, builder, 4, i32, "decimal32")
739            }
740            #[cfg(feature = "small_decimals")]
741            Self::Decimal64(_, _, _, builder) => {
742                append_decimal_default!(lit, builder, 8, i64, "decimal64")
743            }
744            Self::Decimal128(_, _, _, builder) => {
745                append_decimal_default!(lit, builder, 16, i128, "decimal128")
746            }
747            Self::Decimal256(_, _, _, builder) => {
748                append_decimal_default!(lit, builder, 32, i256, "decimal256")
749            }
750            Self::Duration(builder) => match lit {
751                AvroLiteral::Bytes(b) => {
752                    if b.len() != 12 {
753                        return Err(AvroError::InvalidArgument(format!(
754                            "Duration default must be exactly 12 bytes, got {}",
755                            b.len()
756                        )));
757                    }
758                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
759                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
760                    let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
761                    let nanos = (millis as i64) * 1_000_000;
762                    builder.append_value(IntervalMonthDayNano::new(
763                        months as i32,
764                        days as i32,
765                        nanos,
766                    ));
767                    Ok(())
768                }
769                _ => Err(AvroError::InvalidArgument(
770                    "Default for duration must be 12-byte little-endian months/days/millis"
771                        .to_string(),
772                )),
773            },
774            Self::Array(_, offsets, inner) => match lit {
775                AvroLiteral::Array(items) => {
776                    offsets.push_length(items.len());
777                    for item in items {
778                        inner.append_default(item)?;
779                    }
780                    Ok(())
781                }
782                _ => Err(AvroError::InvalidArgument(
783                    "Default for array must be an array literal".to_string(),
784                )),
785            },
786            Self::Map(_, koff, moff, kdata, valdec) => match lit {
787                AvroLiteral::Map(entries) => {
788                    moff.push_length(entries.len());
789                    for (k, v) in entries {
790                        let kb = k.as_bytes();
791                        koff.push_length(kb.len());
792                        kdata.extend_from_slice(kb);
793                        valdec.append_default(v)?;
794                    }
795                    Ok(())
796                }
797                _ => Err(AvroError::InvalidArgument(
798                    "Default for map must be a map/object literal".to_string(),
799                )),
800            },
801            Self::Enum(indices, symbols, _) => match lit {
802                AvroLiteral::Enum(sym) => {
803                    let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
804                        AvroError::InvalidArgument(format!(
805                            "Enum default symbol {sym:?} not in reader symbols"
806                        ))
807                    })?;
808                    indices.push(pos as i32);
809                    Ok(())
810                }
811                _ => Err(AvroError::InvalidArgument(
812                    "Default for enum must be a symbol".to_string(),
813                )),
814            },
815            #[cfg(feature = "avro_custom_types")]
816            Self::RunEndEncoded(_, len, inner) => {
817                *len += 1;
818                inner.append_default(lit)
819            }
820            Self::Union(u) => u.append_default(lit),
821            Self::Record(field_meta, decoders, projector) => match lit {
822                AvroLiteral::Map(entries) => {
823                    for (i, dec) in decoders.iter_mut().enumerate() {
824                        let name = field_meta[i].name();
825                        if let Some(sub) = entries.get(name) {
826                            dec.append_default(sub)?;
827                        } else if let Some(proj) = projector.as_ref() {
828                            proj.project_default(dec, i)?;
829                        } else {
830                            dec.append_null()?;
831                        }
832                    }
833                    Ok(())
834                }
835                AvroLiteral::Null => {
836                    for (i, dec) in decoders.iter_mut().enumerate() {
837                        if let Some(proj) = projector.as_ref() {
838                            proj.project_default(dec, i)?;
839                        } else {
840                            dec.append_null()?;
841                        }
842                    }
843                    Ok(())
844                }
845                _ => Err(AvroError::InvalidArgument(
846                    "Default for record must be a map/object or null".to_string(),
847                )),
848            },
849        }
850    }
851
852    /// Decode a single record from `buf`
853    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
854        match self {
855            Self::Null(x) => *x += 1,
856            Self::Boolean(values) => values.append(buf.get_bool()?),
857            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
858                values.push(buf.get_int()?)
859            }
860            Self::Int64(values)
861            | Self::TimeMicros(values)
862            | Self::TimestampMillis(_, values)
863            | Self::TimestampMicros(_, values)
864            | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
865            #[cfg(feature = "avro_custom_types")]
866            Self::DurationSecond(values)
867            | Self::DurationMillisecond(values)
868            | Self::DurationMicrosecond(values)
869            | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
870            Self::Float32(values) => values.push(buf.get_float()?),
871            Self::Float64(values) => values.push(buf.get_double()?),
872            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
873            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
874            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
875            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
876            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
877            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
878            Self::StringToBytes(offsets, values)
879            | Self::BytesToString(offsets, values)
880            | Self::Binary(offsets, values)
881            | Self::String(offsets, values)
882            | Self::StringView(offsets, values) => {
883                let data = buf.get_bytes()?;
884                offsets.push_length(data.len());
885                values.extend_from_slice(data);
886            }
887            Self::Uuid(values) => {
888                let s_bytes = buf.get_bytes()?;
889                let s = std::str::from_utf8(s_bytes).map_err(|e| {
890                    AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
891                })?;
892                let uuid = Uuid::try_parse(s)
893                    .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
894                values.extend_from_slice(uuid.as_bytes());
895            }
896            Self::Array(_, off, encoding) => {
897                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
898                off.push_length(total_items);
899            }
900            Self::Record(_, encodings, None) => {
901                for encoding in encodings {
902                    encoding.decode(buf)?;
903                }
904            }
905            Self::Record(_, encodings, Some(proj)) => {
906                proj.project_record(buf, encodings)?;
907            }
908            Self::Map(_, koff, moff, kdata, valdec) => {
909                let newly_added = read_blocks(buf, |cur| {
910                    let kb = cur.get_bytes()?;
911                    koff.push_length(kb.len());
912                    kdata.extend_from_slice(kb);
913                    valdec.decode(cur)
914                })?;
915                moff.push_length(newly_added);
916            }
917            Self::Fixed(sz, accum) => {
918                let fx = buf.get_fixed(*sz as usize)?;
919                accum.extend_from_slice(fx);
920            }
921            #[cfg(feature = "small_decimals")]
922            Self::Decimal32(_, _, size, builder) => {
923                decode_decimal!(size, buf, builder, 4, i32);
924            }
925            #[cfg(feature = "small_decimals")]
926            Self::Decimal64(_, _, size, builder) => {
927                decode_decimal!(size, buf, builder, 8, i64);
928            }
929            Self::Decimal128(_, _, size, builder) => {
930                decode_decimal!(size, buf, builder, 16, i128);
931            }
932            Self::Decimal256(_, _, size, builder) => {
933                decode_decimal!(size, buf, builder, 32, i256);
934            }
935            Self::Enum(indices, _, None) => {
936                indices.push(buf.get_int()?);
937            }
938            Self::Enum(indices, _, Some(res)) => {
939                let raw = buf.get_int()?;
940                let resolved = usize::try_from(raw)
941                    .ok()
942                    .and_then(|idx| res.mapping.get(idx).copied())
943                    .filter(|&idx| idx >= 0)
944                    .unwrap_or(res.default_index);
945                if resolved >= 0 {
946                    indices.push(resolved);
947                } else {
948                    return Err(AvroError::ParseError(format!(
949                        "Enum symbol index {raw} not resolvable and no default provided",
950                    )));
951                }
952            }
953            Self::Duration(builder) => {
954                let b = buf.get_fixed(12)?;
955                let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
956                let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
957                let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
958                let nanos = (millis as i64) * 1_000_000;
959                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
960            }
961            #[cfg(feature = "avro_custom_types")]
962            Self::RunEndEncoded(_, len, inner) => {
963                *len += 1;
964                inner.decode(buf)?;
965            }
966            Self::Union(u) => u.decode(buf)?,
967            Self::Nullable(order, nb, encoding, plan) => {
968                match *plan {
969                    NullablePlan::FromSingle { promotion } => {
970                        encoding.decode_with_promotion(buf, promotion)?;
971                        nb.append(true);
972                    }
973                    NullablePlan::ReadTag => {
974                        let branch = buf.read_vlq()?;
975                        let is_not_null = match *order {
976                            Nullability::NullFirst => branch != 0,
977                            Nullability::NullSecond => branch == 0,
978                        };
979                        if is_not_null {
980                            // It is important to decode before appending to null buffer in case of decode error
981                            encoding.decode(buf)?;
982                        } else {
983                            encoding.append_null()?;
984                        }
985                        nb.append(is_not_null);
986                    }
987                }
988            }
989        }
990        Ok(())
991    }
992
993    fn decode_with_promotion(
994        &mut self,
995        buf: &mut AvroCursor<'_>,
996        promotion: Promotion,
997    ) -> Result<(), AvroError> {
998        #[cfg(feature = "avro_custom_types")]
999        if let Self::RunEndEncoded(_, len, inner) = self {
1000            *len += 1;
1001            return inner.decode_with_promotion(buf, promotion);
1002        }
1003
1004        macro_rules! promote_numeric_to {
1005            ($variant:ident, $getter:ident, $to:ty) => {{
1006                match self {
1007                    Self::$variant(v) => {
1008                        let x = buf.$getter()?;
1009                        v.push(x as $to);
1010                        Ok(())
1011                    }
1012                    other => Err(AvroError::ParseError(format!(
1013                        "Promotion {promotion} target mismatch: expected {}, got {}",
1014                        stringify!($variant),
1015                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
1016                    ))),
1017                }
1018            }};
1019        }
1020        match promotion {
1021            Promotion::Direct => self.decode(buf),
1022            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1023            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1024            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1025            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1026            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1027            Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1028            Promotion::StringToBytes => match self {
1029                Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1030                    let data = buf.get_bytes()?;
1031                    offsets.push_length(data.len());
1032                    values.extend_from_slice(data);
1033                    Ok(())
1034                }
1035                other => Err(AvroError::ParseError(format!(
1036                    "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1037                    <Self as AsRef<str>>::as_ref(other)
1038                ))),
1039            },
1040            Promotion::BytesToString => match self {
1041                Self::String(offsets, values)
1042                | Self::StringView(offsets, values)
1043                | Self::BytesToString(offsets, values) => {
1044                    let data = buf.get_bytes()?;
1045                    offsets.push_length(data.len());
1046                    values.extend_from_slice(data);
1047                    Ok(())
1048                }
1049                other => Err(AvroError::ParseError(format!(
1050                    "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1051                    <Self as AsRef<str>>::as_ref(other)
1052                ))),
1053            },
1054        }
1055    }
1056
1057    /// Flush decoded records to an [`ArrayRef`]
1058    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1059        Ok(match self {
1060            Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1061            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1062            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1063            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1064            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1065            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1066            Self::TimeMillis(values) => {
1067                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1068            }
1069            Self::TimeMicros(values) => {
1070                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1071            }
1072            Self::TimestampMillis(is_utc, values) => Arc::new(
1073                flush_primitive::<TimestampMillisecondType>(values, nulls)
1074                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1075            ),
1076            Self::TimestampMicros(is_utc, values) => Arc::new(
1077                flush_primitive::<TimestampMicrosecondType>(values, nulls)
1078                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1079            ),
1080            Self::TimestampNanos(is_utc, values) => Arc::new(
1081                flush_primitive::<TimestampNanosecondType>(values, nulls)
1082                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1083            ),
1084            #[cfg(feature = "avro_custom_types")]
1085            Self::DurationSecond(values) => {
1086                Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1087            }
1088            #[cfg(feature = "avro_custom_types")]
1089            Self::DurationMillisecond(values) => {
1090                Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1091            }
1092            #[cfg(feature = "avro_custom_types")]
1093            Self::DurationMicrosecond(values) => {
1094                Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1095            }
1096            #[cfg(feature = "avro_custom_types")]
1097            Self::DurationNanosecond(values) => {
1098                Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1099            }
1100            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1101            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1102            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1103            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1104                Arc::new(flush_primitive::<Float32Type>(values, nulls))
1105            }
1106            Self::Int32ToFloat64(values)
1107            | Self::Int64ToFloat64(values)
1108            | Self::Float32ToFloat64(values) => {
1109                Arc::new(flush_primitive::<Float64Type>(values, nulls))
1110            }
1111            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1112                let offsets = flush_offsets(offsets);
1113                let values = flush_values(values).into();
1114                Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1115            }
1116            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1117                let offsets = flush_offsets(offsets);
1118                let values = flush_values(values).into();
1119                Arc::new(StringArray::try_new(offsets, values, nulls)?)
1120            }
1121            Self::StringView(offsets, values) => {
1122                let offsets = flush_offsets(offsets);
1123                let values = flush_values(values);
1124                let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1125                let values: Vec<&str> = (0..array.len())
1126                    .map(|i| {
1127                        if array.is_valid(i) {
1128                            array.value(i)
1129                        } else {
1130                            ""
1131                        }
1132                    })
1133                    .collect();
1134                Arc::new(StringViewArray::from(values))
1135            }
1136            Self::Array(field, offsets, values) => {
1137                let values = values.flush(None)?;
1138                let offsets = flush_offsets(offsets);
1139                Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1140            }
1141            Self::Record(fields, encodings, _) => {
1142                let arrays = encodings
1143                    .iter_mut()
1144                    .map(|x| x.flush(None))
1145                    .collect::<Result<Vec<_>, _>>()?;
1146                Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1147            }
1148            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1149                let moff = flush_offsets(m_off);
1150                let koff = flush_offsets(k_off);
1151                let kd = flush_values(kdata).into();
1152                let val_arr = valdec.flush(None)?;
1153                let key_arr = StringArray::try_new(koff, kd, None)?;
1154                if key_arr.len() != val_arr.len() {
1155                    return Err(AvroError::InvalidArgument(format!(
1156                        "Map keys length ({}) != map values length ({})",
1157                        key_arr.len(),
1158                        val_arr.len()
1159                    )));
1160                }
1161                let final_len = moff.len() - 1;
1162                if let Some(n) = &nulls {
1163                    if n.len() != final_len {
1164                        return Err(AvroError::InvalidArgument(format!(
1165                            "Map array null buffer length {} != final map length {final_len}",
1166                            n.len()
1167                        )));
1168                    }
1169                }
1170                let entries_fields = match map_field.data_type() {
1171                    DataType::Struct(fields) => fields.clone(),
1172                    other => {
1173                        return Err(AvroError::InvalidArgument(format!(
1174                            "Map entries field must be a Struct, got {other:?}"
1175                        )));
1176                    }
1177                };
1178                let entries_struct =
1179                    StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1180                let map_arr =
1181                    MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1182                Arc::new(map_arr)
1183            }
1184            Self::Fixed(sz, accum) => {
1185                let b: Buffer = flush_values(accum).into();
1186                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1187                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1188                Arc::new(arr)
1189            }
1190            Self::Uuid(values) => {
1191                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1192                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1193                Arc::new(arr)
1194            }
1195            #[cfg(feature = "small_decimals")]
1196            Self::Decimal32(precision, scale, _, builder) => {
1197                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1198            }
1199            #[cfg(feature = "small_decimals")]
1200            Self::Decimal64(precision, scale, _, builder) => {
1201                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1202            }
1203            Self::Decimal128(precision, scale, _, builder) => {
1204                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1205            }
1206            Self::Decimal256(precision, scale, _, builder) => {
1207                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1208            }
1209            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1210            Self::Duration(builder) => {
1211                let (_, vals, _) = builder.finish().into_parts();
1212                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1213                    .map_err(|e| AvroError::ParseError(e.to_string()))?;
1214                Arc::new(vals)
1215            }
1216            #[cfg(feature = "avro_custom_types")]
1217            Self::RunEndEncoded(width, len, inner) => {
1218                let values = inner.flush(nulls)?;
1219                let n = *len;
1220                let arr = values.as_ref();
1221                let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1222                if n > 0 {
1223                    run_starts.push(0);
1224                    for i in 1..n {
1225                        if !values_equal_at(arr, i - 1, i) {
1226                            run_starts.push(i);
1227                        }
1228                    }
1229                }
1230                if n > (u32::MAX as usize) {
1231                    return Err(AvroError::InvalidArgument(format!(
1232                        "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1233                    )));
1234                }
1235                let run_count = run_starts.len();
1236                let take_idx: PrimitiveArray<UInt32Type> =
1237                    run_starts.iter().map(|&s| s as u32).collect();
1238                let per_run_values = if run_count == 0 {
1239                    values.slice(0, 0)
1240                } else {
1241                    take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1242                        AvroError::ParseError(format!("take() for REE values failed: {e}"))
1243                    })?
1244                };
1245
1246                macro_rules! build_run_array {
1247                    ($Native:ty, $ArrowTy:ty) => {{
1248                        let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1249                        for (idx, &_start) in run_starts.iter().enumerate() {
1250                            let end = if idx + 1 < run_count {
1251                                run_starts[idx + 1]
1252                            } else {
1253                                n
1254                            };
1255                            ends.push(end as $Native);
1256                        }
1257                        let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1258                        let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1259                            .map_err(|e| AvroError::ParseError(e.to_string()))?;
1260                        Arc::new(run_arr) as ArrayRef
1261                    }};
1262                }
1263                match *width {
1264                    2 => {
1265                        if n > i16::MAX as usize {
1266                            return Err(AvroError::InvalidArgument(format!(
1267                                "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1268                            )));
1269                        }
1270                        build_run_array!(i16, Int16Type)
1271                    }
1272                    4 => build_run_array!(i32, Int32Type),
1273                    8 => build_run_array!(i64, Int64Type),
1274                    other => {
1275                        return Err(AvroError::InvalidArgument(format!(
1276                            "Unsupported run-end width {other} for RunEndEncoded"
1277                        )));
1278                    }
1279                }
1280            }
1281            Self::Union(u) => u.flush(nulls)?,
1282        })
1283    }
1284}
1285
1286// A lookup table for resolving fields between writer and reader schemas during record projection.
1287#[derive(Debug)]
1288struct DispatchLookupTable {
1289    // Maps each reader field index `r` to the corresponding writer field index.
1290    //
1291    // Semantics:
1292    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. The value from
1293    //   the writer field is decoded, and `promotion[r]` is applied.
1294    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. The reader field's
1295    //   default value is used.
1296    //
1297    // Representation (`i8`):
1298    // `i8` is used for a dense, cache-friendly dispatch table, consistent with Arrow's use of
1299    // `i8` for union type IDs. This requires that writer field indices do not exceed `i8::MAX`.
1300    //
1301    // Invariants:
1302    // - `to_reader.len() == promotion.len()` and matches the reader field count.
1303    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
1304    to_reader: Box<[i8]>,
1305    // For each reader field `r`, specifies the `Promotion` to apply to the writer's value.
1306    //
1307    // This is used when a writer field's type can be promoted to a reader field's type
1308    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
1309    promotion: Box<[Promotion]>,
1310}
1311
1312// Sentinel used in `DispatchLookupTable::to_reader` to mark
1313// "no matching writer field".
1314const NO_SOURCE: i8 = -1;
1315
1316impl DispatchLookupTable {
1317    fn from_writer_to_reader(
1318        promotion_map: &[Option<(usize, Promotion)>],
1319    ) -> Result<Self, AvroError> {
1320        let mut to_reader = Vec::with_capacity(promotion_map.len());
1321        let mut promotion = Vec::with_capacity(promotion_map.len());
1322        for map in promotion_map {
1323            match *map {
1324                Some((idx, promo)) => {
1325                    let idx_i8 = i8::try_from(idx).map_err(|_| {
1326                        AvroError::SchemaError(format!(
1327                            "Reader branch index {idx} exceeds i8 range (max {})",
1328                            i8::MAX
1329                        ))
1330                    })?;
1331                    to_reader.push(idx_i8);
1332                    promotion.push(promo);
1333                }
1334                None => {
1335                    to_reader.push(NO_SOURCE);
1336                    promotion.push(Promotion::Direct);
1337                }
1338            }
1339        }
1340        Ok(Self {
1341            to_reader: to_reader.into_boxed_slice(),
1342            promotion: promotion.into_boxed_slice(),
1343        })
1344    }
1345
1346    // Resolve a writer branch index to (reader_idx, promotion)
1347    #[inline]
1348    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1349        let reader_index = *self.to_reader.get(writer_index)?;
1350        (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1351    }
1352}
1353
1354#[derive(Debug)]
1355struct UnionDecoder {
1356    fields: UnionFields,
1357    type_ids: Vec<i8>,
1358    offsets: Vec<i32>,
1359    branches: Vec<Decoder>,
1360    counts: Vec<i32>,
1361    reader_type_codes: Vec<i8>,
1362    default_emit_idx: usize,
1363    null_emit_idx: usize,
1364    plan: UnionReadPlan,
1365}
1366
1367impl Default for UnionDecoder {
1368    fn default() -> Self {
1369        Self {
1370            fields: UnionFields::empty(),
1371            type_ids: Vec::new(),
1372            offsets: Vec::new(),
1373            branches: Vec::new(),
1374            counts: Vec::new(),
1375            reader_type_codes: Vec::new(),
1376            default_emit_idx: 0,
1377            null_emit_idx: 0,
1378            plan: UnionReadPlan::Passthrough,
1379        }
1380    }
1381}
1382
1383#[derive(Debug)]
1384enum UnionReadPlan {
1385    ReaderUnion {
1386        lookup_table: DispatchLookupTable,
1387    },
1388    FromSingle {
1389        reader_idx: usize,
1390        promotion: Promotion,
1391    },
1392    ToSingle {
1393        target: Box<Decoder>,
1394        lookup_table: DispatchLookupTable,
1395    },
1396    Passthrough,
1397}
1398
1399impl UnionDecoder {
1400    fn try_new(
1401        fields: UnionFields,
1402        branches: Vec<Decoder>,
1403        resolved: Option<ResolvedUnion>,
1404    ) -> Result<Self, AvroError> {
1405        let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1406        let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1407        let default_emit_idx = 0;
1408        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1409        let branch_len = branches.len().max(reader_type_codes.len());
1410        // Guard against impractically large unions that cannot be indexed by an Avro int
1411        let max_addr = (i32::MAX as usize) + 1;
1412        if branches.len() > max_addr {
1413            return Err(AvroError::SchemaError(format!(
1414                "Reader union has {} branches, which exceeds the maximum addressable \
1415                 branches by an Avro int tag ({} + 1).",
1416                branches.len(),
1417                i32::MAX
1418            )));
1419        }
1420        Ok(Self {
1421            fields,
1422            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1423            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1424            branches,
1425            counts: vec![0; branch_len],
1426            reader_type_codes,
1427            default_emit_idx,
1428            null_emit_idx,
1429            plan: Self::plan_from_resolved(resolved)?,
1430        })
1431    }
1432
1433    fn try_new_from_writer_union(
1434        info: ResolvedUnion,
1435        target: Box<Decoder>,
1436    ) -> Result<Self, AvroError> {
1437        // This constructor is only for writer-union to single-type resolution
1438        debug_assert!(info.writer_is_union && !info.reader_is_union);
1439        let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1440        Ok(Self {
1441            plan: UnionReadPlan::ToSingle {
1442                target,
1443                lookup_table,
1444            },
1445            ..Self::default()
1446        })
1447    }
1448
1449    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, AvroError> {
1450        let Some(info) = resolved else {
1451            return Ok(UnionReadPlan::Passthrough);
1452        };
1453        match (info.writer_is_union, info.reader_is_union) {
1454            (true, true) => {
1455                let lookup_table =
1456                    DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1457                Ok(UnionReadPlan::ReaderUnion { lookup_table })
1458            }
1459            (false, true) => {
1460                let Some(&(reader_idx, promotion)) =
1461                    info.writer_to_reader.first().and_then(Option::as_ref)
1462                else {
1463                    return Err(AvroError::SchemaError(
1464                        "Writer type does not match any reader union branch".to_string(),
1465                    ));
1466                };
1467                Ok(UnionReadPlan::FromSingle {
1468                    reader_idx,
1469                    promotion,
1470                })
1471            }
1472            (true, false) => Err(AvroError::InvalidArgument(
1473                "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1474                    .to_string(),
1475            )),
1476            // (false, false) is invalid and should never be constructed by the resolver.
1477            _ => Err(AvroError::SchemaError(
1478                "ResolvedUnion constructed for non-union sides; resolver should return None"
1479                    .to_string(),
1480            )),
1481        }
1482    }
1483
1484    #[inline]
1485    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
1486        // Avro unions are encoded by first writing the zero-based branch index.
1487        // In Avro 1.11.1 this is specified as an *int*; older specs said *long*,
1488        // but both use zig-zag varint encoding, so decoding as long is compatible
1489        // with either form and widely used in practice.
1490        let raw = buf.get_long()?;
1491        if raw < 0 {
1492            return Err(AvroError::ParseError(format!(
1493                "Negative union branch index {raw}"
1494            )));
1495        }
1496        usize::try_from(raw).map_err(|_| {
1497            AvroError::ParseError(format!(
1498                "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1499                (usize::BITS as usize)
1500            ))
1501        })
1502    }
1503
1504    #[inline]
1505    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1506        let branches_len = self.branches.len();
1507        let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1508            return Err(AvroError::ParseError(format!(
1509                "Union branch index {reader_idx} out of range ({branches_len} branches)"
1510            )));
1511        };
1512        self.type_ids.push(self.reader_type_codes[reader_idx]);
1513        self.offsets.push(self.counts[reader_idx]);
1514        self.counts[reader_idx] += 1;
1515        Ok(reader_branch)
1516    }
1517
1518    #[inline]
1519    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
1520    where
1521        F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
1522    {
1523        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1524            return action(target);
1525        }
1526        let reader_idx = match &self.plan {
1527            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1528            _ => fallback_idx,
1529        };
1530        self.emit_to(reader_idx).and_then(action)
1531    }
1532
1533    fn append_null(&mut self) -> Result<(), AvroError> {
1534        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1535    }
1536
1537    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
1538        self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1539    }
1540
1541    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1542        let (reader_idx, promotion) = match &mut self.plan {
1543            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1544            UnionReadPlan::ReaderUnion { lookup_table } => {
1545                let idx = Self::read_tag(buf)?;
1546                lookup_table.resolve(idx).ok_or_else(|| {
1547                    AvroError::ParseError(format!(
1548                        "Union branch index {idx} not resolvable by reader schema"
1549                    ))
1550                })?
1551            }
1552            UnionReadPlan::FromSingle {
1553                reader_idx,
1554                promotion,
1555            } => (*reader_idx, *promotion),
1556            UnionReadPlan::ToSingle {
1557                target,
1558                lookup_table,
1559            } => {
1560                let idx = Self::read_tag(buf)?;
1561                return match lookup_table.resolve(idx) {
1562                    Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1563                    None => Err(AvroError::ParseError(format!(
1564                        "Writer union branch {idx} does not resolve to reader type"
1565                    ))),
1566                };
1567            }
1568        };
1569        let decoder = self.emit_to(reader_idx)?;
1570        decoder.decode_with_promotion(buf, promotion)
1571    }
1572
1573    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1574        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1575            return target.flush(nulls);
1576        }
1577        debug_assert!(
1578            nulls.is_none(),
1579            "UnionArray does not accept a validity bitmap; \
1580                     nulls should have been materialized as a Null child during decode"
1581        );
1582        let children = self
1583            .branches
1584            .iter_mut()
1585            .map(|d| d.flush(None))
1586            .collect::<Result<Vec<_>, _>>()?;
1587        let arr = UnionArray::try_new(
1588            self.fields.clone(),
1589            flush_values(&mut self.type_ids).into_iter().collect(),
1590            Some(flush_values(&mut self.offsets).into_iter().collect()),
1591            children,
1592        )
1593        .map_err(|e| AvroError::ParseError(e.to_string()))?;
1594        Ok(Arc::new(arr))
1595    }
1596}
1597
1598#[derive(Debug, Default)]
1599struct UnionDecoderBuilder {
1600    fields: Option<UnionFields>,
1601    branches: Option<Vec<Decoder>>,
1602    resolved: Option<ResolvedUnion>,
1603    target: Option<Box<Decoder>>,
1604}
1605
1606impl UnionDecoderBuilder {
1607    fn new() -> Self {
1608        Self::default()
1609    }
1610
1611    fn with_fields(mut self, fields: UnionFields) -> Self {
1612        self.fields = Some(fields);
1613        self
1614    }
1615
1616    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
1617        self.branches = Some(branches);
1618        self
1619    }
1620
1621    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
1622        self.resolved = Some(resolved_union);
1623        self
1624    }
1625
1626    fn with_target(mut self, target: Box<Decoder>) -> Self {
1627        self.target = Some(target);
1628        self
1629    }
1630
1631    fn build(self) -> Result<UnionDecoder, AvroError> {
1632        match (self.resolved, self.fields, self.branches, self.target) {
1633            (resolved, Some(fields), Some(branches), None) => {
1634                UnionDecoder::try_new(fields, branches, resolved)
1635            }
1636            (Some(info), None, None, Some(target))
1637                if info.writer_is_union && !info.reader_is_union =>
1638            {
1639                UnionDecoder::try_new_from_writer_union(info, target)
1640            }
1641            _ => Err(AvroError::InvalidArgument(
1642                "Invalid UnionDecoderBuilder configuration: expected either \
1643                 (fields + branches + resolved) with no target for reader-unions, or \
1644                 (resolved + target) with no fields/branches for writer-union to single."
1645                    .to_string(),
1646            )),
1647        }
1648    }
1649}
1650
1651#[derive(Debug, Copy, Clone)]
1652enum NegativeBlockBehavior {
1653    ProcessItems,
1654    SkipBySize,
1655}
1656
1657#[inline]
1658fn skip_blocks(
1659    buf: &mut AvroCursor,
1660    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
1661) -> Result<usize, AvroError> {
1662    process_blockwise(
1663        buf,
1664        move |c| skip_item(c),
1665        NegativeBlockBehavior::SkipBySize,
1666    )
1667}
1668
1669#[inline]
1670fn flush_dict(
1671    indices: &mut Vec<i32>,
1672    symbols: &[String],
1673    nulls: Option<NullBuffer>,
1674) -> Result<ArrayRef, AvroError> {
1675    let keys = flush_primitive::<Int32Type>(indices, nulls);
1676    let values = Arc::new(StringArray::from_iter_values(
1677        symbols.iter().map(|s| s.as_str()),
1678    ));
1679    DictionaryArray::try_new(keys, values)
1680        .map_err(Into::into)
1681        .map(|arr| Arc::new(arr) as ArrayRef)
1682}
1683
1684#[inline]
1685fn read_blocks(
1686    buf: &mut AvroCursor,
1687    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
1688) -> Result<usize, AvroError> {
1689    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
1690}
1691
1692#[inline]
1693fn process_blockwise(
1694    buf: &mut AvroCursor,
1695    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
1696    negative_behavior: NegativeBlockBehavior,
1697) -> Result<usize, AvroError> {
1698    let mut total = 0usize;
1699    loop {
1700        // Read the block count
1701        //  positive = that many items
1702        //  negative = that many items + read block size
1703        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
1704        let block_count = buf.get_long()?;
1705        match block_count.cmp(&0) {
1706            Ordering::Equal => break,
1707            Ordering::Less => {
1708                let count = (-block_count) as usize;
1709                // A negative count is followed by a long of the size in bytes
1710                let size_in_bytes = buf.get_long()? as usize;
1711                match negative_behavior {
1712                    NegativeBlockBehavior::ProcessItems => {
1713                        // Process items one-by-one after reading size
1714                        for _ in 0..count {
1715                            on_item(buf)?;
1716                        }
1717                    }
1718                    NegativeBlockBehavior::SkipBySize => {
1719                        // Skip the entire block payload at once
1720                        let _ = buf.get_fixed(size_in_bytes)?;
1721                    }
1722                }
1723                total += count;
1724            }
1725            Ordering::Greater => {
1726                let count = block_count as usize;
1727                for _ in 0..count {
1728                    on_item(buf)?;
1729                }
1730                total += count;
1731            }
1732        }
1733    }
1734    Ok(total)
1735}
1736
1737#[inline]
1738fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
1739    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
1740}
1741
1742#[inline]
1743fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
1744    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
1745}
1746
1747#[inline]
1748fn flush_primitive<T: ArrowPrimitiveType>(
1749    values: &mut Vec<T::Native>,
1750    nulls: Option<NullBuffer>,
1751) -> PrimitiveArray<T> {
1752    PrimitiveArray::new(flush_values(values).into(), nulls)
1753}
1754
1755#[inline]
1756fn read_decimal_bytes_be<const N: usize>(
1757    buf: &mut AvroCursor<'_>,
1758    size: &Option<usize>,
1759) -> Result<[u8; N], AvroError> {
1760    match size {
1761        Some(n) if *n == N => {
1762            let raw = buf.get_fixed(N)?;
1763            let mut arr = [0u8; N];
1764            arr.copy_from_slice(raw);
1765            Ok(arr)
1766        }
1767        Some(n) => {
1768            let raw = buf.get_fixed(*n)?;
1769            sign_cast_to::<N>(raw)
1770        }
1771        None => {
1772            let raw = buf.get_bytes()?;
1773            sign_cast_to::<N>(raw)
1774        }
1775    }
1776}
1777
1778/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
1779/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
1780/// the payload is a big-endian two's-complement integer, and when narrowing it must
1781/// be representable without changing sign or value.
1782///
1783/// If `raw.len() < N`, the value is sign-extended.
1784/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
1785/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
1786#[inline]
1787fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
1788    let len = raw.len();
1789    // Fast path: exact width, just copy
1790    if len == N {
1791        let mut out = [0u8; N];
1792        out.copy_from_slice(raw);
1793        return Ok(out);
1794    }
1795    // Determine sign byte from MSB of first byte (empty => positive)
1796    let first = raw.first().copied().unwrap_or(0u8);
1797    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
1798    // Pre-fill with sign byte to support sign extension
1799    let mut out = [sign_byte; N];
1800    if len > N {
1801        // Validate truncation: all dropped leading bytes must equal sign_byte,
1802        // and the MSB of the first kept byte must match the sign.
1803        let extra = len - N;
1804        // Any non-sign byte in the truncated prefix indicates overflow
1805        if raw[..extra].iter().any(|&b| b != sign_byte) {
1806            return Err(AvroError::ParseError(format!(
1807                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1808                len, N
1809            )));
1810        }
1811        if N > 0 {
1812            let first_kept = raw[extra];
1813            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
1814            if sign_bit_mismatch {
1815                return Err(AvroError::ParseError(format!(
1816                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1817                    len, N
1818                )));
1819            }
1820        }
1821        out.copy_from_slice(&raw[extra..]);
1822        return Ok(out);
1823    }
1824    out[N - len..].copy_from_slice(raw);
1825    Ok(out)
1826}
1827
1828#[cfg(feature = "avro_custom_types")]
1829#[inline]
1830fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
1831    match (arr.is_null(i), arr.is_null(j)) {
1832        (true, true) => true,
1833        (true, false) | (false, true) => false,
1834        (false, false) => {
1835            let a = arr.slice(i, 1);
1836            let b = arr.slice(j, 1);
1837            a == b
1838        }
1839    }
1840}
1841
1842#[derive(Debug)]
1843struct Projector {
1844    writer_to_reader: Arc<[Option<usize>]>,
1845    skip_decoders: Vec<Option<Skipper>>,
1846    field_defaults: Vec<Option<AvroLiteral>>,
1847    default_injections: Arc<[(usize, AvroLiteral)]>,
1848}
1849
1850#[derive(Debug)]
1851struct ProjectorBuilder<'a> {
1852    rec: &'a ResolvedRecord,
1853    reader_fields: Arc<[AvroField]>,
1854}
1855
1856impl<'a> ProjectorBuilder<'a> {
1857    #[inline]
1858    fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
1859        Self {
1860            rec,
1861            reader_fields: reader_fields.clone(),
1862        }
1863    }
1864
1865    #[inline]
1866    fn build(self) -> Result<Projector, AvroError> {
1867        let reader_fields = self.reader_fields;
1868        let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
1869        for avro_field in reader_fields.as_ref() {
1870            if let Some(ResolutionInfo::DefaultValue(lit)) =
1871                avro_field.data_type().resolution.as_ref()
1872            {
1873                field_defaults.push(Some(lit.clone()));
1874            } else {
1875                field_defaults.push(None);
1876            }
1877        }
1878        let mut default_injections: Vec<(usize, AvroLiteral)> =
1879            Vec::with_capacity(self.rec.default_fields.len());
1880        for &idx in self.rec.default_fields.as_ref() {
1881            let lit = field_defaults
1882                .get(idx)
1883                .and_then(|lit| lit.clone())
1884                .unwrap_or(AvroLiteral::Null);
1885            default_injections.push((idx, lit));
1886        }
1887        let mut skip_decoders: Vec<Option<Skipper>> =
1888            Vec::with_capacity(self.rec.skip_fields.len());
1889        for datatype in self.rec.skip_fields.as_ref() {
1890            let skipper = match datatype {
1891                Some(datatype) => Some(Skipper::from_avro(datatype)?),
1892                None => None,
1893            };
1894            skip_decoders.push(skipper);
1895        }
1896        Ok(Projector {
1897            writer_to_reader: self.rec.writer_to_reader.clone(),
1898            skip_decoders,
1899            field_defaults,
1900            default_injections: default_injections.into(),
1901        })
1902    }
1903}
1904
1905impl Projector {
1906    #[inline]
1907    fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), AvroError> {
1908        // SAFETY: `index` is obtained by listing the reader's record fields (i.e., from
1909        // `decoders.iter_mut().enumerate()`), and `field_defaults` was built in
1910        // `ProjectorBuilder::build` to have exactly one element per reader field.
1911        // Therefore, `index < self.field_defaults.len()` always holds here, so
1912        // `self.field_defaults[index]` cannot panic. We only take an immutable reference
1913        // via `.as_ref()`, and `self` is borrowed immutably.
1914        if let Some(default_literal) = self.field_defaults[index].as_ref() {
1915            decoder.append_default(default_literal)
1916        } else {
1917            decoder.append_null()
1918        }
1919    }
1920
1921    #[inline]
1922    fn project_record(
1923        &mut self,
1924        buf: &mut AvroCursor<'_>,
1925        encodings: &mut [Decoder],
1926    ) -> Result<(), AvroError> {
1927        debug_assert_eq!(
1928            self.writer_to_reader.len(),
1929            self.skip_decoders.len(),
1930            "internal invariant: mapping and skipper lists must have equal length"
1931        );
1932        for (i, (mapping, skipper_opt)) in self
1933            .writer_to_reader
1934            .iter()
1935            .zip(self.skip_decoders.iter_mut())
1936            .enumerate()
1937        {
1938            match (mapping, skipper_opt.as_mut()) {
1939                (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
1940                (None, Some(skipper)) => skipper.skip(buf)?,
1941                (None, None) => {
1942                    return Err(AvroError::SchemaError(format!(
1943                        "No skipper available for writer-only field at index {i}",
1944                    )));
1945                }
1946            }
1947        }
1948        for (reader_index, lit) in self.default_injections.as_ref() {
1949            encodings[*reader_index].append_default(lit)?;
1950        }
1951        Ok(())
1952    }
1953}
1954
1955/// Lightweight skipper for non‑projected writer fields
1956/// (fields present in the writer schema but omitted by the reader/projection);
1957/// per Avro 1.11.1 schema resolution these fields are ignored.
1958///
1959/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
1960#[derive(Debug)]
1961enum Skipper {
1962    Null,
1963    Boolean,
1964    Int32,
1965    Int64,
1966    Float32,
1967    Float64,
1968    Bytes,
1969    String,
1970    TimeMicros,
1971    TimestampMillis,
1972    TimestampMicros,
1973    TimestampNanos,
1974    Fixed(usize),
1975    Decimal(Option<usize>),
1976    UuidString,
1977    Enum,
1978    DurationFixed12,
1979    List(Box<Skipper>),
1980    Map(Box<Skipper>),
1981    Struct(Vec<Skipper>),
1982    Union(Vec<Skipper>),
1983    Nullable(Nullability, Box<Skipper>),
1984    #[cfg(feature = "avro_custom_types")]
1985    RunEndEncoded(Box<Skipper>),
1986}
1987
1988impl Skipper {
1989    fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
1990        let mut base = match dt.codec() {
1991            Codec::Null => Self::Null,
1992            Codec::Boolean => Self::Boolean,
1993            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1994            Codec::Int64 => Self::Int64,
1995            Codec::TimeMicros => Self::TimeMicros,
1996            Codec::TimestampMillis(_) => Self::TimestampMillis,
1997            Codec::TimestampMicros(_) => Self::TimestampMicros,
1998            Codec::TimestampNanos(_) => Self::TimestampNanos,
1999            #[cfg(feature = "avro_custom_types")]
2000            Codec::DurationNanos
2001            | Codec::DurationMicros
2002            | Codec::DurationMillis
2003            | Codec::DurationSeconds => Self::Int64,
2004            Codec::Float32 => Self::Float32,
2005            Codec::Float64 => Self::Float64,
2006            Codec::Binary => Self::Bytes,
2007            Codec::Utf8 | Codec::Utf8View => Self::String,
2008            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2009            Codec::Decimal(_, _, size) => Self::Decimal(*size),
2010            Codec::Uuid => Self::UuidString, // encoded as string
2011            Codec::Enum(_) => Self::Enum,
2012            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2013            Codec::Struct(fields) => Self::Struct(
2014                fields
2015                    .iter()
2016                    .map(|f| Skipper::from_avro(f.data_type()))
2017                    .collect::<Result<_, _>>()?,
2018            ),
2019            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2020            Codec::Interval => Self::DurationFixed12,
2021            Codec::Union(encodings, _, _) => {
2022                let max_addr = (i32::MAX as usize) + 1;
2023                if encodings.len() > max_addr {
2024                    return Err(AvroError::SchemaError(format!(
2025                        "Writer union has {} branches, which exceeds the maximum addressable \
2026                         branches by an Avro int tag ({} + 1).",
2027                        encodings.len(),
2028                        i32::MAX
2029                    )));
2030                }
2031                Self::Union(
2032                    encodings
2033                        .iter()
2034                        .map(Skipper::from_avro)
2035                        .collect::<Result<_, _>>()?,
2036                )
2037            }
2038            #[cfg(feature = "avro_custom_types")]
2039            Codec::RunEndEncoded(inner, _w) => {
2040                Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2041            }
2042        };
2043        if let Some(n) = dt.nullability() {
2044            base = Self::Nullable(n, Box::new(base));
2045        }
2046        Ok(base)
2047    }
2048
2049    fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2050        match self {
2051            Self::Null => Ok(()),
2052            Self::Boolean => {
2053                buf.get_bool()?;
2054                Ok(())
2055            }
2056            Self::Int32 => {
2057                buf.get_int()?;
2058                Ok(())
2059            }
2060            Self::Int64
2061            | Self::TimeMicros
2062            | Self::TimestampMillis
2063            | Self::TimestampMicros
2064            | Self::TimestampNanos => {
2065                buf.get_long()?;
2066                Ok(())
2067            }
2068            Self::Float32 => {
2069                buf.get_float()?;
2070                Ok(())
2071            }
2072            Self::Float64 => {
2073                buf.get_double()?;
2074                Ok(())
2075            }
2076            Self::Bytes | Self::String | Self::UuidString => {
2077                buf.get_bytes()?;
2078                Ok(())
2079            }
2080            Self::Fixed(sz) => {
2081                buf.get_fixed(*sz)?;
2082                Ok(())
2083            }
2084            Self::Decimal(size) => {
2085                if let Some(s) = size {
2086                    buf.get_fixed(*s)
2087                } else {
2088                    buf.get_bytes()
2089                }?;
2090                Ok(())
2091            }
2092            Self::Enum => {
2093                buf.get_int()?;
2094                Ok(())
2095            }
2096            Self::DurationFixed12 => {
2097                buf.get_fixed(12)?;
2098                Ok(())
2099            }
2100            Self::List(item) => {
2101                skip_blocks(buf, |c| item.skip(c))?;
2102                Ok(())
2103            }
2104            Self::Map(value) => {
2105                skip_blocks(buf, |c| {
2106                    c.get_bytes()?; // key
2107                    value.skip(c)
2108                })?;
2109                Ok(())
2110            }
2111            Self::Struct(fields) => {
2112                for f in fields.iter_mut() {
2113                    f.skip(buf)?
2114                }
2115                Ok(())
2116            }
2117            Self::Union(encodings) => {
2118                // Union tag must be ZigZag-decoded
2119                let raw = buf.get_long()?;
2120                if raw < 0 {
2121                    return Err(AvroError::ParseError(format!(
2122                        "Negative union branch index {raw}"
2123                    )));
2124                }
2125                let idx: usize = usize::try_from(raw).map_err(|_| {
2126                    AvroError::ParseError(format!(
2127                        "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2128                        (usize::BITS as usize)
2129                    ))
2130                })?;
2131                let Some(encoding) = encodings.get_mut(idx) else {
2132                    return Err(AvroError::ParseError(format!(
2133                        "Union branch index {idx} out of range for skipper ({} branches)",
2134                        encodings.len()
2135                    )));
2136                };
2137                encoding.skip(buf)
2138            }
2139            Self::Nullable(order, inner) => {
2140                let branch = buf.read_vlq()?;
2141                let is_not_null = match *order {
2142                    Nullability::NullFirst => branch != 0,
2143                    Nullability::NullSecond => branch == 0,
2144                };
2145                if is_not_null {
2146                    inner.skip(buf)?;
2147                }
2148                Ok(())
2149            }
2150            #[cfg(feature = "avro_custom_types")]
2151            Self::RunEndEncoded(inner) => inner.skip(buf),
2152        }
2153    }
2154}
2155
2156#[cfg(test)]
2157mod tests {
2158    use super::*;
2159    use crate::codec::AvroFieldBuilder;
2160    use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2161    use arrow_array::cast::AsArray;
2162    use indexmap::IndexMap;
2163    use std::collections::HashMap;
2164
2165    fn encode_avro_int(value: i32) -> Vec<u8> {
2166        let mut buf = Vec::new();
2167        let mut v = (value << 1) ^ (value >> 31);
2168        while v & !0x7F != 0 {
2169            buf.push(((v & 0x7F) | 0x80) as u8);
2170            v >>= 7;
2171        }
2172        buf.push(v as u8);
2173        buf
2174    }
2175
2176    fn encode_avro_long(value: i64) -> Vec<u8> {
2177        let mut buf = Vec::new();
2178        let mut v = (value << 1) ^ (value >> 63);
2179        while v & !0x7F != 0 {
2180            buf.push(((v & 0x7F) | 0x80) as u8);
2181            v >>= 7;
2182        }
2183        buf.push(v as u8);
2184        buf
2185    }
2186
2187    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2188        let mut buf = encode_avro_long(bytes.len() as i64);
2189        buf.extend_from_slice(bytes);
2190        buf
2191    }
2192
2193    fn avro_from_codec(codec: Codec) -> AvroDataType {
2194        AvroDataType::new(codec, Default::default(), None)
2195    }
2196
2197    fn resolved_root_datatype(
2198        writer: Schema<'static>,
2199        reader: Schema<'static>,
2200        use_utf8view: bool,
2201        strict_mode: bool,
2202    ) -> AvroDataType {
2203        // Wrap writer schema in a single-field record
2204        let writer_record = Schema::Complex(ComplexType::Record(Record {
2205            name: "Root",
2206            namespace: None,
2207            doc: None,
2208            aliases: vec![],
2209            fields: vec![Field {
2210                name: "v",
2211                r#type: writer,
2212                default: None,
2213                doc: None,
2214                aliases: vec![],
2215            }],
2216            attributes: Attributes::default(),
2217        }));
2218
2219        // Wrap reader schema in a single-field record
2220        let reader_record = Schema::Complex(ComplexType::Record(Record {
2221            name: "Root",
2222            namespace: None,
2223            doc: None,
2224            aliases: vec![],
2225            fields: vec![Field {
2226                name: "v",
2227                r#type: reader,
2228                default: None,
2229                doc: None,
2230                aliases: vec![],
2231            }],
2232            attributes: Attributes::default(),
2233        }));
2234
2235        // Build resolved record, then extract the inner field's resolved AvroDataType
2236        let field = AvroFieldBuilder::new(&writer_record)
2237            .with_reader_schema(&reader_record)
2238            .with_utf8view(use_utf8view)
2239            .with_strict_mode(strict_mode)
2240            .build()
2241            .expect("schema resolution should succeed");
2242
2243        match field.data_type().codec() {
2244            Codec::Struct(fields) => fields[0].data_type().clone(),
2245            other => panic!("expected wrapper struct, got {other:?}"),
2246        }
2247    }
2248
2249    fn decoder_for_promotion(
2250        writer: PrimitiveType,
2251        reader: PrimitiveType,
2252        use_utf8view: bool,
2253    ) -> Decoder {
2254        let ws = Schema::TypeName(TypeName::Primitive(writer));
2255        let rs = Schema::TypeName(TypeName::Primitive(reader));
2256        let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2257        Decoder::try_new(&dt).unwrap()
2258    }
2259
2260    fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2261        AvroDataType::new(codec, HashMap::new(), nullability)
2262    }
2263
2264    #[cfg(feature = "avro_custom_types")]
2265    fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2266        let mut out = Vec::with_capacity(10);
2267        while x >= 0x80 {
2268            out.push((x as u8) | 0x80);
2269            x >>= 7;
2270        }
2271        out.push(x as u8);
2272        out
2273    }
2274
2275    #[test]
2276    fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2277        let ws = Schema::Union(vec![
2278            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2279            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2280        ]);
2281        let rs = Schema::Union(vec![
2282            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2283            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2284        ]);
2285
2286        let dt = resolved_root_datatype(ws, rs, false, false);
2287        let mut dec = Decoder::try_new(&dt).unwrap();
2288
2289        let mut rec1 = encode_avro_long(0);
2290        rec1.extend(encode_avro_int(7));
2291        let mut cur1 = AvroCursor::new(&rec1);
2292        dec.decode(&mut cur1).unwrap();
2293
2294        let mut rec2 = encode_avro_long(1);
2295        rec2.extend(encode_avro_bytes("abc".as_bytes()));
2296        let mut cur2 = AvroCursor::new(&rec2);
2297        dec.decode(&mut cur2).unwrap();
2298
2299        let arr = dec.flush(None).unwrap();
2300        let ua = arr
2301            .as_any()
2302            .downcast_ref::<UnionArray>()
2303            .expect("dense union output");
2304
2305        assert_eq!(
2306            ua.type_id(0),
2307            1,
2308            "first value must select reader 'long' branch"
2309        );
2310        assert_eq!(ua.value_offset(0), 0);
2311
2312        assert_eq!(
2313            ua.type_id(1),
2314            0,
2315            "second value must select reader 'string' branch"
2316        );
2317        assert_eq!(ua.value_offset(1), 0);
2318
2319        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2320        assert_eq!(long_child.len(), 1);
2321        assert_eq!(long_child.value(0), 7);
2322
2323        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2324        assert_eq!(str_child.len(), 1);
2325        assert_eq!(str_child.value(0), "abc");
2326    }
2327
2328    #[test]
2329    fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2330        let ws = Schema::Union(vec![
2331            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2332            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2333        ]);
2334        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2335
2336        let dt = resolved_root_datatype(ws, rs, false, false);
2337        let mut dec = Decoder::try_new(&dt).unwrap();
2338
2339        let mut data = encode_avro_long(0);
2340        data.extend(encode_avro_int(5));
2341        let mut cur = AvroCursor::new(&data);
2342        dec.decode(&mut cur).unwrap();
2343
2344        let arr = dec.flush(None).unwrap();
2345        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2346        assert_eq!(out.len(), 1);
2347        assert_eq!(out.value(0), 5);
2348    }
2349
2350    #[test]
2351    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2352        let ws = Schema::Union(vec![
2353            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2354            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2355        ]);
2356        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2357
2358        let dt = resolved_root_datatype(ws, rs, false, false);
2359        let mut dec = Decoder::try_new(&dt).unwrap();
2360
2361        let mut data = encode_avro_long(1);
2362        data.extend(encode_avro_bytes("z".as_bytes()));
2363        let mut cur = AvroCursor::new(&data);
2364        let res = dec.decode(&mut cur);
2365        assert!(
2366            res.is_err(),
2367            "expected error when writer union branch does not resolve to reader non-union type"
2368        );
2369    }
2370
2371    #[test]
2372    fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2373        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2374        let rs = Schema::Union(vec![
2375            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2376            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2377        ]);
2378
2379        let dt = resolved_root_datatype(ws, rs, false, false);
2380        let mut dec = Decoder::try_new(&dt).unwrap();
2381
2382        let data = encode_avro_int(6);
2383        let mut cur = AvroCursor::new(&data);
2384        dec.decode(&mut cur).unwrap();
2385
2386        let arr = dec.flush(None).unwrap();
2387        let ua = arr
2388            .as_any()
2389            .downcast_ref::<UnionArray>()
2390            .expect("dense union output");
2391        assert_eq!(ua.len(), 1);
2392        assert_eq!(
2393            ua.type_id(0),
2394            1,
2395            "must resolve to reader 'long' branch (type_id 1)"
2396        );
2397        assert_eq!(ua.value_offset(0), 0);
2398
2399        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2400        assert_eq!(long_child.len(), 1);
2401        assert_eq!(long_child.value(0), 6);
2402
2403        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2404        assert_eq!(str_child.len(), 0, "string branch must be empty");
2405    }
2406
2407    #[test]
2408    fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2409        let ws = Schema::Union(vec![
2410            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2411            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2412        ]);
2413        let rs = Schema::Union(vec![
2414            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2415            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2416        ]);
2417
2418        let dt = resolved_root_datatype(ws, rs, false, false);
2419        let mut dec = Decoder::try_new(&dt).unwrap();
2420
2421        let mut data = encode_avro_long(1);
2422        data.push(1);
2423        let mut cur = AvroCursor::new(&data);
2424        let res = dec.decode(&mut cur);
2425        assert!(
2426            res.is_err(),
2427            "expected error for unmapped writer 'boolean' branch"
2428        );
2429    }
2430
2431    #[test]
2432    fn test_schema_resolution_promotion_int_to_long() {
2433        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2434        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2435        for v in [0, 1, -2, 123456] {
2436            let data = encode_avro_int(v);
2437            let mut cur = AvroCursor::new(&data);
2438            dec.decode(&mut cur).unwrap();
2439        }
2440        let arr = dec.flush(None).unwrap();
2441        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2442        assert_eq!(a.value(0), 0);
2443        assert_eq!(a.value(1), 1);
2444        assert_eq!(a.value(2), -2);
2445        assert_eq!(a.value(3), 123456);
2446    }
2447
2448    #[test]
2449    fn test_schema_resolution_promotion_int_to_float() {
2450        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2451        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2452        for v in [0, 42, -7] {
2453            let data = encode_avro_int(v);
2454            let mut cur = AvroCursor::new(&data);
2455            dec.decode(&mut cur).unwrap();
2456        }
2457        let arr = dec.flush(None).unwrap();
2458        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2459        assert_eq!(a.value(0), 0.0);
2460        assert_eq!(a.value(1), 42.0);
2461        assert_eq!(a.value(2), -7.0);
2462    }
2463
2464    #[test]
2465    fn test_schema_resolution_promotion_int_to_double() {
2466        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2467        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2468        for v in [1, -1, 10_000] {
2469            let data = encode_avro_int(v);
2470            let mut cur = AvroCursor::new(&data);
2471            dec.decode(&mut cur).unwrap();
2472        }
2473        let arr = dec.flush(None).unwrap();
2474        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2475        assert_eq!(a.value(0), 1.0);
2476        assert_eq!(a.value(1), -1.0);
2477        assert_eq!(a.value(2), 10_000.0);
2478    }
2479
2480    #[test]
2481    fn test_schema_resolution_promotion_long_to_float() {
2482        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2483        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2484        for v in [0_i64, 1_000_000_i64, -123_i64] {
2485            let data = encode_avro_long(v);
2486            let mut cur = AvroCursor::new(&data);
2487            dec.decode(&mut cur).unwrap();
2488        }
2489        let arr = dec.flush(None).unwrap();
2490        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2491        assert_eq!(a.value(0), 0.0);
2492        assert_eq!(a.value(1), 1_000_000.0);
2493        assert_eq!(a.value(2), -123.0);
2494    }
2495
2496    #[test]
2497    fn test_schema_resolution_promotion_long_to_double() {
2498        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2499        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2500        for v in [2_i64, -2_i64, 9_223_372_i64] {
2501            let data = encode_avro_long(v);
2502            let mut cur = AvroCursor::new(&data);
2503            dec.decode(&mut cur).unwrap();
2504        }
2505        let arr = dec.flush(None).unwrap();
2506        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2507        assert_eq!(a.value(0), 2.0);
2508        assert_eq!(a.value(1), -2.0);
2509        assert_eq!(a.value(2), 9_223_372.0);
2510    }
2511
2512    #[test]
2513    fn test_schema_resolution_promotion_float_to_double() {
2514        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2515        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2516        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2517            let data = v.to_le_bytes().to_vec();
2518            let mut cur = AvroCursor::new(&data);
2519            dec.decode(&mut cur).unwrap();
2520        }
2521        let arr = dec.flush(None).unwrap();
2522        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2523        assert_eq!(a.value(0), 0.5_f64);
2524        assert_eq!(a.value(1), -3.25_f64);
2525        assert_eq!(a.value(2), 1.0e6_f64);
2526    }
2527
2528    #[test]
2529    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2530        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2531        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2532        for s in ["hello", "world", "héllo"] {
2533            let data = encode_avro_bytes(s.as_bytes());
2534            let mut cur = AvroCursor::new(&data);
2535            dec.decode(&mut cur).unwrap();
2536        }
2537        let arr = dec.flush(None).unwrap();
2538        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2539        assert_eq!(a.value(0), "hello");
2540        assert_eq!(a.value(1), "world");
2541        assert_eq!(a.value(2), "héllo");
2542    }
2543
2544    #[test]
2545    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2546        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2547        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2548        let data = encode_avro_bytes("abc".as_bytes());
2549        let mut cur = AvroCursor::new(&data);
2550        dec.decode(&mut cur).unwrap();
2551        let arr = dec.flush(None).unwrap();
2552        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2553        assert_eq!(a.value(0), "abc");
2554    }
2555
2556    #[test]
2557    fn test_schema_resolution_promotion_string_to_bytes() {
2558        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2559        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2560        for s in ["", "abc", "data"] {
2561            let data = encode_avro_bytes(s.as_bytes());
2562            let mut cur = AvroCursor::new(&data);
2563            dec.decode(&mut cur).unwrap();
2564        }
2565        let arr = dec.flush(None).unwrap();
2566        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2567        assert_eq!(a.value(0), b"");
2568        assert_eq!(a.value(1), b"abc");
2569        assert_eq!(a.value(2), "data".as_bytes());
2570    }
2571
2572    #[test]
2573    fn test_schema_resolution_no_promotion_passthrough_int() {
2574        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2575        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2576        // Wrap both in a synthetic single-field record and resolve with AvroFieldBuilder
2577        let writer_record = Schema::Complex(ComplexType::Record(Record {
2578            name: "Root",
2579            namespace: None,
2580            doc: None,
2581            aliases: vec![],
2582            fields: vec![Field {
2583                name: "v",
2584                r#type: ws,
2585                default: None,
2586                doc: None,
2587                aliases: vec![],
2588            }],
2589            attributes: Attributes::default(),
2590        }));
2591        let reader_record = Schema::Complex(ComplexType::Record(Record {
2592            name: "Root",
2593            namespace: None,
2594            doc: None,
2595            aliases: vec![],
2596            fields: vec![Field {
2597                name: "v",
2598                r#type: rs,
2599                default: None,
2600                doc: None,
2601                aliases: vec![],
2602            }],
2603            attributes: Attributes::default(),
2604        }));
2605        let field = AvroFieldBuilder::new(&writer_record)
2606            .with_reader_schema(&reader_record)
2607            .with_utf8view(false)
2608            .with_strict_mode(false)
2609            .build()
2610            .unwrap();
2611        // Extract the resolved inner field's AvroDataType
2612        let dt = match field.data_type().codec() {
2613            Codec::Struct(fields) => fields[0].data_type().clone(),
2614            other => panic!("expected wrapper struct, got {other:?}"),
2615        };
2616        let mut dec = Decoder::try_new(&dt).unwrap();
2617        assert!(matches!(dec, Decoder::Int32(_)));
2618        for v in [7, -9] {
2619            let data = encode_avro_int(v);
2620            let mut cur = AvroCursor::new(&data);
2621            dec.decode(&mut cur).unwrap();
2622        }
2623        let arr = dec.flush(None).unwrap();
2624        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2625        assert_eq!(a.value(0), 7);
2626        assert_eq!(a.value(1), -9);
2627    }
2628
2629    #[test]
2630    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
2631        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2632        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
2633        let writer_record = Schema::Complex(ComplexType::Record(Record {
2634            name: "Root",
2635            namespace: None,
2636            doc: None,
2637            aliases: vec![],
2638            fields: vec![Field {
2639                name: "v",
2640                r#type: ws,
2641                default: None,
2642                doc: None,
2643                aliases: vec![],
2644            }],
2645            attributes: Attributes::default(),
2646        }));
2647        let reader_record = Schema::Complex(ComplexType::Record(Record {
2648            name: "Root",
2649            namespace: None,
2650            doc: None,
2651            aliases: vec![],
2652            fields: vec![Field {
2653                name: "v",
2654                r#type: rs,
2655                default: None,
2656                doc: None,
2657                aliases: vec![],
2658            }],
2659            attributes: Attributes::default(),
2660        }));
2661        let res = AvroFieldBuilder::new(&writer_record)
2662            .with_reader_schema(&reader_record)
2663            .with_utf8view(false)
2664            .with_strict_mode(false)
2665            .build();
2666        assert!(res.is_err(), "expected error for illegal promotion");
2667    }
2668
2669    #[test]
2670    fn test_map_decoding_one_entry() {
2671        let value_type = avro_from_codec(Codec::Utf8);
2672        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2673        let mut decoder = Decoder::try_new(&map_type).unwrap();
2674        // Encode a single map with one entry: {"hello": "world"}
2675        let mut data = Vec::new();
2676        data.extend_from_slice(&encode_avro_long(1));
2677        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
2678        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
2679        data.extend_from_slice(&encode_avro_long(0));
2680        let mut cursor = AvroCursor::new(&data);
2681        decoder.decode(&mut cursor).unwrap();
2682        let array = decoder.flush(None).unwrap();
2683        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2684        assert_eq!(map_arr.len(), 1); // one map
2685        assert_eq!(map_arr.value_length(0), 1);
2686        let entries = map_arr.value(0);
2687        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
2688        assert_eq!(struct_entries.len(), 1);
2689        let key_arr = struct_entries
2690            .column_by_name("key")
2691            .unwrap()
2692            .as_any()
2693            .downcast_ref::<StringArray>()
2694            .unwrap();
2695        let val_arr = struct_entries
2696            .column_by_name("value")
2697            .unwrap()
2698            .as_any()
2699            .downcast_ref::<StringArray>()
2700            .unwrap();
2701        assert_eq!(key_arr.value(0), "hello");
2702        assert_eq!(val_arr.value(0), "world");
2703    }
2704
2705    #[test]
2706    fn test_map_decoding_empty() {
2707        let value_type = avro_from_codec(Codec::Utf8);
2708        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2709        let mut decoder = Decoder::try_new(&map_type).unwrap();
2710        let data = encode_avro_long(0);
2711        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2712        let array = decoder.flush(None).unwrap();
2713        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2714        assert_eq!(map_arr.len(), 1);
2715        assert_eq!(map_arr.value_length(0), 0);
2716    }
2717
2718    #[test]
2719    fn test_fixed_decoding() {
2720        let avro_type = avro_from_codec(Codec::Fixed(3));
2721        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2722
2723        let data1 = [1u8, 2, 3];
2724        let mut cursor1 = AvroCursor::new(&data1);
2725        decoder
2726            .decode(&mut cursor1)
2727            .expect("Failed to decode data1");
2728        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
2729        let data2 = [4u8, 5, 6];
2730        let mut cursor2 = AvroCursor::new(&data2);
2731        decoder
2732            .decode(&mut cursor2)
2733            .expect("Failed to decode data2");
2734        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
2735        let array = decoder.flush(None).expect("Failed to flush decoder");
2736        assert_eq!(array.len(), 2, "Array should contain two items");
2737        let fixed_size_binary_array = array
2738            .as_any()
2739            .downcast_ref::<FixedSizeBinaryArray>()
2740            .expect("Failed to downcast to FixedSizeBinaryArray");
2741        assert_eq!(
2742            fixed_size_binary_array.value_length(),
2743            3,
2744            "Fixed size of binary values should be 3"
2745        );
2746        assert_eq!(
2747            fixed_size_binary_array.value(0),
2748            &[1, 2, 3],
2749            "First item mismatch"
2750        );
2751        assert_eq!(
2752            fixed_size_binary_array.value(1),
2753            &[4, 5, 6],
2754            "Second item mismatch"
2755        );
2756    }
2757
2758    #[test]
2759    fn test_fixed_decoding_empty() {
2760        let avro_type = avro_from_codec(Codec::Fixed(5));
2761        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2762
2763        let array = decoder
2764            .flush(None)
2765            .expect("Failed to flush decoder for empty input");
2766
2767        assert_eq!(array.len(), 0, "Array should be empty");
2768        let fixed_size_binary_array = array
2769            .as_any()
2770            .downcast_ref::<FixedSizeBinaryArray>()
2771            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
2772
2773        assert_eq!(
2774            fixed_size_binary_array.value_length(),
2775            5,
2776            "Fixed size of binary values should be 5 as per type"
2777        );
2778    }
2779
2780    #[test]
2781    fn test_uuid_decoding() {
2782        let avro_type = avro_from_codec(Codec::Uuid);
2783        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2784        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
2785        let data = encode_avro_bytes(uuid_str.as_bytes());
2786        let mut cursor = AvroCursor::new(&data);
2787        decoder.decode(&mut cursor).expect("Failed to decode data");
2788        assert_eq!(
2789            cursor.position(),
2790            data.len(),
2791            "Cursor should advance by varint size + data size"
2792        );
2793        let array = decoder.flush(None).expect("Failed to flush decoder");
2794        let fixed_size_binary_array = array
2795            .as_any()
2796            .downcast_ref::<FixedSizeBinaryArray>()
2797            .expect("Array should be a FixedSizeBinaryArray");
2798        assert_eq!(fixed_size_binary_array.len(), 1);
2799        assert_eq!(fixed_size_binary_array.value_length(), 16);
2800        let expected_bytes = [
2801            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
2802            0x6b, 0xf6,
2803        ];
2804        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
2805    }
2806
2807    #[test]
2808    fn test_array_decoding() {
2809        let item_dt = avro_from_codec(Codec::Int32);
2810        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2811        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2812        let mut row1 = Vec::new();
2813        row1.extend_from_slice(&encode_avro_long(2));
2814        row1.extend_from_slice(&encode_avro_int(10));
2815        row1.extend_from_slice(&encode_avro_int(20));
2816        row1.extend_from_slice(&encode_avro_long(0));
2817        let row2 = encode_avro_long(0);
2818        let mut cursor = AvroCursor::new(&row1);
2819        decoder.decode(&mut cursor).unwrap();
2820        let mut cursor2 = AvroCursor::new(&row2);
2821        decoder.decode(&mut cursor2).unwrap();
2822        let array = decoder.flush(None).unwrap();
2823        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2824        assert_eq!(list_arr.len(), 2);
2825        let offsets = list_arr.value_offsets();
2826        assert_eq!(offsets, &[0, 2, 2]);
2827        let values = list_arr.values();
2828        let int_arr = values.as_primitive::<Int32Type>();
2829        assert_eq!(int_arr.len(), 2);
2830        assert_eq!(int_arr.value(0), 10);
2831        assert_eq!(int_arr.value(1), 20);
2832    }
2833
2834    #[test]
2835    fn test_array_decoding_with_negative_block_count() {
2836        let item_dt = avro_from_codec(Codec::Int32);
2837        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2838        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2839        let mut data = encode_avro_long(-3);
2840        data.extend_from_slice(&encode_avro_long(12));
2841        data.extend_from_slice(&encode_avro_int(1));
2842        data.extend_from_slice(&encode_avro_int(2));
2843        data.extend_from_slice(&encode_avro_int(3));
2844        data.extend_from_slice(&encode_avro_long(0));
2845        let mut cursor = AvroCursor::new(&data);
2846        decoder.decode(&mut cursor).unwrap();
2847        let array = decoder.flush(None).unwrap();
2848        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2849        assert_eq!(list_arr.len(), 1);
2850        assert_eq!(list_arr.value_length(0), 3);
2851        let values = list_arr.values().as_primitive::<Int32Type>();
2852        assert_eq!(values.len(), 3);
2853        assert_eq!(values.value(0), 1);
2854        assert_eq!(values.value(1), 2);
2855        assert_eq!(values.value(2), 3);
2856    }
2857
2858    #[test]
2859    fn test_nested_array_decoding() {
2860        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
2861        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
2862        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
2863        let mut buf = Vec::new();
2864        buf.extend(encode_avro_long(1));
2865        buf.extend(encode_avro_long(2));
2866        buf.extend(encode_avro_int(5));
2867        buf.extend(encode_avro_int(6));
2868        buf.extend(encode_avro_long(0));
2869        buf.extend(encode_avro_long(0));
2870        let mut cursor = AvroCursor::new(&buf);
2871        decoder.decode(&mut cursor).unwrap();
2872        let arr = decoder.flush(None).unwrap();
2873        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
2874        assert_eq!(outer.len(), 1);
2875        assert_eq!(outer.value_length(0), 1);
2876        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
2877        assert_eq!(inner.len(), 1);
2878        assert_eq!(inner.value_length(0), 2);
2879        let values = inner
2880            .values()
2881            .as_any()
2882            .downcast_ref::<Int32Array>()
2883            .unwrap();
2884        assert_eq!(values.values(), &[5, 6]);
2885    }
2886
2887    #[test]
2888    fn test_array_decoding_empty_array() {
2889        let value_type = avro_from_codec(Codec::Utf8);
2890        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
2891        let mut decoder = Decoder::try_new(&map_type).unwrap();
2892        let data = encode_avro_long(0);
2893        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2894        let array = decoder.flush(None).unwrap();
2895        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2896        assert_eq!(list_arr.len(), 1);
2897        assert_eq!(list_arr.value_length(0), 0);
2898    }
2899
2900    #[test]
2901    fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
2902        use crate::schema::Array;
2903        let writer_schema = Schema::Complex(ComplexType::Array(Array {
2904            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2905            attributes: Attributes::default(),
2906        }));
2907        let reader_schema = Schema::Complex(ComplexType::Array(Array {
2908            items: Box::new(Schema::Union(vec![
2909                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2910                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2911            ])),
2912            attributes: Attributes::default(),
2913        }));
2914        let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
2915        if let Codec::List(inner) = dt.codec() {
2916            assert_eq!(
2917                inner.nullability(),
2918                Some(Nullability::NullFirst),
2919                "items should be nullable"
2920            );
2921        } else {
2922            panic!("expected List codec");
2923        }
2924        let mut decoder = Decoder::try_new(&dt).unwrap();
2925        let mut data = encode_avro_long(2);
2926        data.extend(encode_avro_int(10));
2927        data.extend(encode_avro_int(20));
2928        data.extend(encode_avro_long(0));
2929        let mut cursor = AvroCursor::new(&data);
2930        decoder.decode(&mut cursor).unwrap();
2931        assert_eq!(
2932            cursor.position(),
2933            data.len(),
2934            "all bytes should be consumed"
2935        );
2936        let array = decoder.flush(None).unwrap();
2937        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2938        assert_eq!(list_arr.len(), 1, "one list/row");
2939        assert_eq!(list_arr.value_length(0), 2, "two items in the list");
2940        let values = list_arr.values().as_primitive::<Int32Type>();
2941        assert_eq!(values.len(), 2);
2942        assert_eq!(values.value(0), 10);
2943        assert_eq!(values.value(1), 20);
2944        assert!(!values.is_null(0));
2945        assert!(!values.is_null(1));
2946    }
2947
2948    #[test]
2949    fn test_decimal_decoding_fixed256() {
2950        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
2951        let mut decoder = Decoder::try_new(&dt).unwrap();
2952        let row1 = [
2953            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2954            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2955            0x00, 0x00, 0x30, 0x39,
2956        ];
2957        let row2 = [
2958            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2959            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2960            0xFF, 0xFF, 0xFF, 0x85,
2961        ];
2962        let mut data = Vec::new();
2963        data.extend_from_slice(&row1);
2964        data.extend_from_slice(&row2);
2965        let mut cursor = AvroCursor::new(&data);
2966        decoder.decode(&mut cursor).unwrap();
2967        decoder.decode(&mut cursor).unwrap();
2968        let arr = decoder.flush(None).unwrap();
2969        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
2970        assert_eq!(dec.len(), 2);
2971        assert_eq!(dec.value_as_string(0), "123.45");
2972        assert_eq!(dec.value_as_string(1), "-1.23");
2973    }
2974
2975    #[test]
2976    fn test_decimal_decoding_fixed128() {
2977        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
2978        let mut decoder = Decoder::try_new(&dt).unwrap();
2979        let row1 = [
2980            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2981            0x30, 0x39,
2982        ];
2983        let row2 = [
2984            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2985            0xFF, 0x85,
2986        ];
2987        let mut data = Vec::new();
2988        data.extend_from_slice(&row1);
2989        data.extend_from_slice(&row2);
2990        let mut cursor = AvroCursor::new(&data);
2991        decoder.decode(&mut cursor).unwrap();
2992        decoder.decode(&mut cursor).unwrap();
2993        let arr = decoder.flush(None).unwrap();
2994        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2995        assert_eq!(dec.len(), 2);
2996        assert_eq!(dec.value_as_string(0), "123.45");
2997        assert_eq!(dec.value_as_string(1), "-1.23");
2998    }
2999
3000    #[test]
3001    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3002        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3003        let mut decoder = Decoder::try_new(&dt).unwrap();
3004        let row1 = [
3005            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3006            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3007            0x00, 0x00, 0x30, 0x39,
3008        ];
3009        let row2 = [
3010            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3011            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3012            0xFF, 0xFF, 0xFF, 0x85,
3013        ];
3014        let mut data = Vec::new();
3015        data.extend_from_slice(&row1);
3016        data.extend_from_slice(&row2);
3017        let mut cursor = AvroCursor::new(&data);
3018        decoder.decode(&mut cursor).unwrap();
3019        decoder.decode(&mut cursor).unwrap();
3020        let arr = decoder.flush(None).unwrap();
3021        #[cfg(feature = "small_decimals")]
3022        {
3023            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3024            assert_eq!(dec.len(), 2);
3025            assert_eq!(dec.value_as_string(0), "123.45");
3026            assert_eq!(dec.value_as_string(1), "-1.23");
3027        }
3028        #[cfg(not(feature = "small_decimals"))]
3029        {
3030            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3031            assert_eq!(dec.len(), 2);
3032            assert_eq!(dec.value_as_string(0), "123.45");
3033            assert_eq!(dec.value_as_string(1), "-1.23");
3034        }
3035    }
3036
3037    #[test]
3038    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3039        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3040        let mut decoder = Decoder::try_new(&dt).unwrap();
3041        let row1 = [
3042            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3043            0x30, 0x39,
3044        ];
3045        let row2 = [
3046            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3047            0xFF, 0x85,
3048        ];
3049        let mut data = Vec::new();
3050        data.extend_from_slice(&row1);
3051        data.extend_from_slice(&row2);
3052        let mut cursor = AvroCursor::new(&data);
3053        decoder.decode(&mut cursor).unwrap();
3054        decoder.decode(&mut cursor).unwrap();
3055
3056        let arr = decoder.flush(None).unwrap();
3057        #[cfg(feature = "small_decimals")]
3058        {
3059            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3060            assert_eq!(dec.len(), 2);
3061            assert_eq!(dec.value_as_string(0), "123.45");
3062            assert_eq!(dec.value_as_string(1), "-1.23");
3063        }
3064        #[cfg(not(feature = "small_decimals"))]
3065        {
3066            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3067            assert_eq!(dec.len(), 2);
3068            assert_eq!(dec.value_as_string(0), "123.45");
3069            assert_eq!(dec.value_as_string(1), "-1.23");
3070        }
3071    }
3072
3073    #[test]
3074    fn test_decimal_decoding_bytes_with_nulls() {
3075        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3076        let inner = Decoder::try_new(&dt).unwrap();
3077        let mut decoder = Decoder::Nullable(
3078            Nullability::NullSecond,
3079            NullBufferBuilder::new(DEFAULT_CAPACITY),
3080            Box::new(inner),
3081            NullablePlan::ReadTag,
3082        );
3083        let mut data = Vec::new();
3084        data.extend_from_slice(&encode_avro_int(0));
3085        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3086        data.extend_from_slice(&encode_avro_int(1));
3087        data.extend_from_slice(&encode_avro_int(0));
3088        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3089        let mut cursor = AvroCursor::new(&data);
3090        decoder.decode(&mut cursor).unwrap();
3091        decoder.decode(&mut cursor).unwrap();
3092        decoder.decode(&mut cursor).unwrap();
3093        let arr = decoder.flush(None).unwrap();
3094        #[cfg(feature = "small_decimals")]
3095        {
3096            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3097            assert_eq!(dec_arr.len(), 3);
3098            assert!(dec_arr.is_valid(0));
3099            assert!(!dec_arr.is_valid(1));
3100            assert!(dec_arr.is_valid(2));
3101            assert_eq!(dec_arr.value_as_string(0), "123.4");
3102            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3103        }
3104        #[cfg(not(feature = "small_decimals"))]
3105        {
3106            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3107            assert_eq!(dec_arr.len(), 3);
3108            assert!(dec_arr.is_valid(0));
3109            assert!(!dec_arr.is_valid(1));
3110            assert!(dec_arr.is_valid(2));
3111            assert_eq!(dec_arr.value_as_string(0), "123.4");
3112            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3113        }
3114    }
3115
3116    #[test]
3117    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3118        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3119        let inner = Decoder::try_new(&dt).unwrap();
3120        let mut decoder = Decoder::Nullable(
3121            Nullability::NullSecond,
3122            NullBufferBuilder::new(DEFAULT_CAPACITY),
3123            Box::new(inner),
3124            NullablePlan::ReadTag,
3125        );
3126        let row1 = [
3127            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3128            0xE2, 0x40,
3129        ];
3130        let row3 = [
3131            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3132            0x1D, 0xC0,
3133        ];
3134        let mut data = Vec::new();
3135        data.extend_from_slice(&encode_avro_int(0));
3136        data.extend_from_slice(&row1);
3137        data.extend_from_slice(&encode_avro_int(1));
3138        data.extend_from_slice(&encode_avro_int(0));
3139        data.extend_from_slice(&row3);
3140        let mut cursor = AvroCursor::new(&data);
3141        decoder.decode(&mut cursor).unwrap();
3142        decoder.decode(&mut cursor).unwrap();
3143        decoder.decode(&mut cursor).unwrap();
3144        let arr = decoder.flush(None).unwrap();
3145        #[cfg(feature = "small_decimals")]
3146        {
3147            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3148            assert_eq!(dec_arr.len(), 3);
3149            assert!(dec_arr.is_valid(0));
3150            assert!(!dec_arr.is_valid(1));
3151            assert!(dec_arr.is_valid(2));
3152            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3153            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3154        }
3155        #[cfg(not(feature = "small_decimals"))]
3156        {
3157            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3158            assert_eq!(dec_arr.len(), 3);
3159            assert!(dec_arr.is_valid(0));
3160            assert!(!dec_arr.is_valid(1));
3161            assert!(dec_arr.is_valid(2));
3162            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3163            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3164        }
3165    }
3166
3167    #[test]
3168    fn test_enum_decoding() {
3169        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3170        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3171        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3172        let mut data = Vec::new();
3173        data.extend_from_slice(&encode_avro_int(2));
3174        data.extend_from_slice(&encode_avro_int(0));
3175        data.extend_from_slice(&encode_avro_int(1));
3176        let mut cursor = AvroCursor::new(&data);
3177        decoder.decode(&mut cursor).unwrap();
3178        decoder.decode(&mut cursor).unwrap();
3179        decoder.decode(&mut cursor).unwrap();
3180        let array = decoder.flush(None).unwrap();
3181        let dict_array = array
3182            .as_any()
3183            .downcast_ref::<DictionaryArray<Int32Type>>()
3184            .unwrap();
3185        assert_eq!(dict_array.len(), 3);
3186        let values = dict_array
3187            .values()
3188            .as_any()
3189            .downcast_ref::<StringArray>()
3190            .unwrap();
3191        assert_eq!(values.value(0), "A");
3192        assert_eq!(values.value(1), "B");
3193        assert_eq!(values.value(2), "C");
3194        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3195    }
3196
3197    #[test]
3198    fn test_enum_decoding_with_nulls() {
3199        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3200        let enum_codec = Codec::Enum(symbols.clone());
3201        let avro_type =
3202            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3203        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3204        let mut data = Vec::new();
3205        data.extend_from_slice(&encode_avro_long(1));
3206        data.extend_from_slice(&encode_avro_int(1));
3207        data.extend_from_slice(&encode_avro_long(0));
3208        data.extend_from_slice(&encode_avro_long(1));
3209        data.extend_from_slice(&encode_avro_int(0));
3210        let mut cursor = AvroCursor::new(&data);
3211        decoder.decode(&mut cursor).unwrap();
3212        decoder.decode(&mut cursor).unwrap();
3213        decoder.decode(&mut cursor).unwrap();
3214        let array = decoder.flush(None).unwrap();
3215        let dict_array = array
3216            .as_any()
3217            .downcast_ref::<DictionaryArray<Int32Type>>()
3218            .unwrap();
3219        assert_eq!(dict_array.len(), 3);
3220        assert!(dict_array.is_valid(0));
3221        assert!(dict_array.is_null(1));
3222        assert!(dict_array.is_valid(2));
3223        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3224        assert_eq!(dict_array.keys(), &expected_keys);
3225        let values = dict_array
3226            .values()
3227            .as_any()
3228            .downcast_ref::<StringArray>()
3229            .unwrap();
3230        assert_eq!(values.value(0), "X");
3231        assert_eq!(values.value(1), "Y");
3232    }
3233
3234    #[test]
3235    fn test_duration_decoding_with_nulls() {
3236        let duration_codec = Codec::Interval;
3237        let avro_type = AvroDataType::new(
3238            duration_codec,
3239            Default::default(),
3240            Some(Nullability::NullFirst),
3241        );
3242        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3243        let mut data = Vec::new();
3244        // First value: 1 month, 2 days, 3 millis
3245        data.extend_from_slice(&encode_avro_long(1)); // not null
3246        let mut duration1 = Vec::new();
3247        duration1.extend_from_slice(&1u32.to_le_bytes());
3248        duration1.extend_from_slice(&2u32.to_le_bytes());
3249        duration1.extend_from_slice(&3u32.to_le_bytes());
3250        data.extend_from_slice(&duration1);
3251        // Second value: null
3252        data.extend_from_slice(&encode_avro_long(0)); // null
3253        data.extend_from_slice(&encode_avro_long(1)); // not null
3254        let mut duration2 = Vec::new();
3255        duration2.extend_from_slice(&4u32.to_le_bytes());
3256        duration2.extend_from_slice(&5u32.to_le_bytes());
3257        duration2.extend_from_slice(&6u32.to_le_bytes());
3258        data.extend_from_slice(&duration2);
3259        let mut cursor = AvroCursor::new(&data);
3260        decoder.decode(&mut cursor).unwrap();
3261        decoder.decode(&mut cursor).unwrap();
3262        decoder.decode(&mut cursor).unwrap();
3263        let array = decoder.flush(None).unwrap();
3264        let interval_array = array
3265            .as_any()
3266            .downcast_ref::<IntervalMonthDayNanoArray>()
3267            .unwrap();
3268        assert_eq!(interval_array.len(), 3);
3269        assert!(interval_array.is_valid(0));
3270        assert!(interval_array.is_null(1));
3271        assert!(interval_array.is_valid(2));
3272        let expected = IntervalMonthDayNanoArray::from(vec![
3273            Some(IntervalMonthDayNano {
3274                months: 1,
3275                days: 2,
3276                nanoseconds: 3_000_000,
3277            }),
3278            None,
3279            Some(IntervalMonthDayNano {
3280                months: 4,
3281                days: 5,
3282                nanoseconds: 6_000_000,
3283            }),
3284        ]);
3285        assert_eq!(interval_array, &expected);
3286    }
3287
3288    #[test]
3289    fn test_duration_decoding_empty() {
3290        let duration_codec = Codec::Interval;
3291        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3292        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3293        let array = decoder.flush(None).unwrap();
3294        assert_eq!(array.len(), 0);
3295    }
3296
3297    #[test]
3298    #[cfg(feature = "avro_custom_types")]
3299    fn test_duration_seconds_decoding() {
3300        let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3301        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3302        let mut data = Vec::new();
3303        // Three values: 0, -1, 2
3304        data.extend_from_slice(&encode_avro_long(0));
3305        data.extend_from_slice(&encode_avro_long(-1));
3306        data.extend_from_slice(&encode_avro_long(2));
3307        let mut cursor = AvroCursor::new(&data);
3308        decoder.decode(&mut cursor).unwrap();
3309        decoder.decode(&mut cursor).unwrap();
3310        decoder.decode(&mut cursor).unwrap();
3311        let array = decoder.flush(None).unwrap();
3312        let dur = array
3313            .as_any()
3314            .downcast_ref::<DurationSecondArray>()
3315            .unwrap();
3316        assert_eq!(dur.values(), &[0, -1, 2]);
3317    }
3318
3319    #[test]
3320    #[cfg(feature = "avro_custom_types")]
3321    fn test_duration_milliseconds_decoding() {
3322        let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3323        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3324        let mut data = Vec::new();
3325        for v in [1i64, 0, -2] {
3326            data.extend_from_slice(&encode_avro_long(v));
3327        }
3328        let mut cursor = AvroCursor::new(&data);
3329        for _ in 0..3 {
3330            decoder.decode(&mut cursor).unwrap();
3331        }
3332        let array = decoder.flush(None).unwrap();
3333        let dur = array
3334            .as_any()
3335            .downcast_ref::<DurationMillisecondArray>()
3336            .unwrap();
3337        assert_eq!(dur.values(), &[1, 0, -2]);
3338    }
3339
3340    #[test]
3341    #[cfg(feature = "avro_custom_types")]
3342    fn test_duration_microseconds_decoding() {
3343        let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3344        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3345        let mut data = Vec::new();
3346        for v in [5i64, -6, 7] {
3347            data.extend_from_slice(&encode_avro_long(v));
3348        }
3349        let mut cursor = AvroCursor::new(&data);
3350        for _ in 0..3 {
3351            decoder.decode(&mut cursor).unwrap();
3352        }
3353        let array = decoder.flush(None).unwrap();
3354        let dur = array
3355            .as_any()
3356            .downcast_ref::<DurationMicrosecondArray>()
3357            .unwrap();
3358        assert_eq!(dur.values(), &[5, -6, 7]);
3359    }
3360
3361    #[test]
3362    #[cfg(feature = "avro_custom_types")]
3363    fn test_duration_nanoseconds_decoding() {
3364        let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3365        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3366        let mut data = Vec::new();
3367        for v in [8i64, 9, -10] {
3368            data.extend_from_slice(&encode_avro_long(v));
3369        }
3370        let mut cursor = AvroCursor::new(&data);
3371        for _ in 0..3 {
3372            decoder.decode(&mut cursor).unwrap();
3373        }
3374        let array = decoder.flush(None).unwrap();
3375        let dur = array
3376            .as_any()
3377            .downcast_ref::<DurationNanosecondArray>()
3378            .unwrap();
3379        assert_eq!(dur.values(), &[8, 9, -10]);
3380    }
3381
3382    #[test]
3383    fn test_nullable_decode_error_bitmap_corruption() {
3384        // Nullable Int32 with ['T','null'] encoding (NullSecond)
3385        let avro_type = AvroDataType::new(
3386            Codec::Int32,
3387            Default::default(),
3388            Some(Nullability::NullSecond),
3389        );
3390        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3391
3392        // Row 1: union branch 1 (null)
3393        let mut row1 = Vec::new();
3394        row1.extend_from_slice(&encode_avro_int(1));
3395
3396        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
3397        let mut row2 = Vec::new();
3398        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
3399
3400        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
3401        let mut row3 = Vec::new();
3402        row3.extend_from_slice(&encode_avro_int(0)); // branch
3403        row3.extend_from_slice(&encode_avro_int(42)); // actual value
3404
3405        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3406        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
3407        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3408
3409        let array = decoder.flush(None).unwrap();
3410
3411        // Should contain 2 elements: row1 (null) and row3 (42)
3412        assert_eq!(array.len(), 2);
3413        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3414        assert!(int_array.is_null(0)); // row1 is null
3415        assert_eq!(int_array.value(1), 42); // row3 value is 42
3416    }
3417
3418    #[test]
3419    fn test_enum_mapping_reordered_symbols() {
3420        let reader_symbols: Arc<[String]> =
3421            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3422        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3423        let default_index: i32 = -1;
3424        let mut dec = Decoder::Enum(
3425            Vec::with_capacity(DEFAULT_CAPACITY),
3426            reader_symbols.clone(),
3427            Some(EnumResolution {
3428                mapping,
3429                default_index,
3430            }),
3431        );
3432        let mut data = Vec::new();
3433        data.extend_from_slice(&encode_avro_int(0));
3434        data.extend_from_slice(&encode_avro_int(1));
3435        data.extend_from_slice(&encode_avro_int(2));
3436        let mut cur = AvroCursor::new(&data);
3437        dec.decode(&mut cur).unwrap();
3438        dec.decode(&mut cur).unwrap();
3439        dec.decode(&mut cur).unwrap();
3440        let arr = dec.flush(None).unwrap();
3441        let dict = arr
3442            .as_any()
3443            .downcast_ref::<DictionaryArray<Int32Type>>()
3444            .unwrap();
3445        let expected_keys = Int32Array::from(vec![2, 0, 1]);
3446        assert_eq!(dict.keys(), &expected_keys);
3447        let values = dict
3448            .values()
3449            .as_any()
3450            .downcast_ref::<StringArray>()
3451            .unwrap();
3452        assert_eq!(values.value(0), "B");
3453        assert_eq!(values.value(1), "C");
3454        assert_eq!(values.value(2), "A");
3455    }
3456
3457    #[test]
3458    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3459        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3460        let default_index: i32 = 1;
3461        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3462        let mut dec = Decoder::Enum(
3463            Vec::with_capacity(DEFAULT_CAPACITY),
3464            reader_symbols.clone(),
3465            Some(EnumResolution {
3466                mapping,
3467                default_index,
3468            }),
3469        );
3470        let mut data = Vec::new();
3471        data.extend_from_slice(&encode_avro_int(0));
3472        data.extend_from_slice(&encode_avro_int(1));
3473        data.extend_from_slice(&encode_avro_int(99));
3474        let mut cur = AvroCursor::new(&data);
3475        dec.decode(&mut cur).unwrap();
3476        dec.decode(&mut cur).unwrap();
3477        dec.decode(&mut cur).unwrap();
3478        let arr = dec.flush(None).unwrap();
3479        let dict = arr
3480            .as_any()
3481            .downcast_ref::<DictionaryArray<Int32Type>>()
3482            .unwrap();
3483        let expected_keys = Int32Array::from(vec![0, 1, 1]);
3484        assert_eq!(dict.keys(), &expected_keys);
3485        let values = dict
3486            .values()
3487            .as_any()
3488            .downcast_ref::<StringArray>()
3489            .unwrap();
3490        assert_eq!(values.value(0), "A");
3491        assert_eq!(values.value(1), "B");
3492    }
3493
3494    #[test]
3495    fn test_enum_mapping_unknown_symbol_without_default_errors() {
3496        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3497        let default_index: i32 = -1; // indicates no default at type-level
3498        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3499        let mut dec = Decoder::Enum(
3500            Vec::with_capacity(DEFAULT_CAPACITY),
3501            reader_symbols,
3502            Some(EnumResolution {
3503                mapping,
3504                default_index,
3505            }),
3506        );
3507        let data = encode_avro_int(0);
3508        let mut cur = AvroCursor::new(&data);
3509        let err = dec
3510            .decode(&mut cur)
3511            .expect_err("expected decode error for unresolved enum without default");
3512        let msg = err.to_string();
3513        assert!(
3514            msg.contains("not resolvable") && msg.contains("no default"),
3515            "unexpected error message: {msg}"
3516        );
3517    }
3518
3519    fn make_record_resolved_decoder(
3520        reader_fields: &[(&str, DataType, bool)],
3521        writer_to_reader: Vec<Option<usize>>,
3522        skip_decoders: Vec<Option<Skipper>>,
3523    ) -> Decoder {
3524        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3525        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3526        for (name, dt, nullable) in reader_fields {
3527            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3528            let enc = match dt {
3529                DataType::Int32 => Decoder::Int32(Vec::new()),
3530                DataType::Int64 => Decoder::Int64(Vec::new()),
3531                DataType::Utf8 => {
3532                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3533                }
3534                other => panic!("Unsupported test reader field type: {other:?}"),
3535            };
3536            encodings.push(enc);
3537        }
3538        let fields: Fields = field_refs.into();
3539        Decoder::Record(
3540            fields,
3541            encodings,
3542            Some(Projector {
3543                writer_to_reader: Arc::from(writer_to_reader),
3544                skip_decoders,
3545                field_defaults: vec![None; reader_fields.len()],
3546                default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3547            }),
3548        )
3549    }
3550
3551    #[test]
3552    fn test_skip_writer_trailing_field_int32() {
3553        let mut dec = make_record_resolved_decoder(
3554            &[("id", arrow_schema::DataType::Int32, false)],
3555            vec![Some(0), None],
3556            vec![None, Some(super::Skipper::Int32)],
3557        );
3558        let mut data = Vec::new();
3559        data.extend_from_slice(&encode_avro_int(7));
3560        data.extend_from_slice(&encode_avro_int(999));
3561        let mut cur = AvroCursor::new(&data);
3562        dec.decode(&mut cur).unwrap();
3563        assert_eq!(cur.position(), data.len());
3564        let arr = dec.flush(None).unwrap();
3565        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
3566        assert_eq!(struct_arr.len(), 1);
3567        let id = struct_arr
3568            .column_by_name("id")
3569            .unwrap()
3570            .as_any()
3571            .downcast_ref::<Int32Array>()
3572            .unwrap();
3573        assert_eq!(id.value(0), 7);
3574    }
3575
3576    #[test]
3577    fn test_skip_writer_middle_field_string() {
3578        let mut dec = make_record_resolved_decoder(
3579            &[
3580                ("id", DataType::Int32, false),
3581                ("score", DataType::Int64, false),
3582            ],
3583            vec![Some(0), None, Some(1)],
3584            vec![None, Some(Skipper::String), None],
3585        );
3586        let mut data = Vec::new();
3587        data.extend_from_slice(&encode_avro_int(42));
3588        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
3589        data.extend_from_slice(&encode_avro_long(1000));
3590        let mut cur = AvroCursor::new(&data);
3591        dec.decode(&mut cur).unwrap();
3592        assert_eq!(cur.position(), data.len());
3593        let arr = dec.flush(None).unwrap();
3594        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3595        let id = s
3596            .column_by_name("id")
3597            .unwrap()
3598            .as_any()
3599            .downcast_ref::<Int32Array>()
3600            .unwrap();
3601        let score = s
3602            .column_by_name("score")
3603            .unwrap()
3604            .as_any()
3605            .downcast_ref::<Int64Array>()
3606            .unwrap();
3607        assert_eq!(id.value(0), 42);
3608        assert_eq!(score.value(0), 1000);
3609    }
3610
3611    #[test]
3612    fn test_skip_writer_array_with_negative_block_count_fast() {
3613        let mut dec = make_record_resolved_decoder(
3614            &[("id", DataType::Int32, false)],
3615            vec![None, Some(0)],
3616            vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
3617        );
3618        let mut array_payload = Vec::new();
3619        array_payload.extend_from_slice(&encode_avro_int(1));
3620        array_payload.extend_from_slice(&encode_avro_int(2));
3621        array_payload.extend_from_slice(&encode_avro_int(3));
3622        let mut data = Vec::new();
3623        data.extend_from_slice(&encode_avro_long(-3));
3624        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
3625        data.extend_from_slice(&array_payload);
3626        data.extend_from_slice(&encode_avro_long(0));
3627        data.extend_from_slice(&encode_avro_int(5));
3628        let mut cur = AvroCursor::new(&data);
3629        dec.decode(&mut cur).unwrap();
3630        assert_eq!(cur.position(), data.len());
3631        let arr = dec.flush(None).unwrap();
3632        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3633        let id = s
3634            .column_by_name("id")
3635            .unwrap()
3636            .as_any()
3637            .downcast_ref::<Int32Array>()
3638            .unwrap();
3639        assert_eq!(id.len(), 1);
3640        assert_eq!(id.value(0), 5);
3641    }
3642
3643    #[test]
3644    fn test_skip_writer_map_with_negative_block_count_fast() {
3645        let mut dec = make_record_resolved_decoder(
3646            &[("id", DataType::Int32, false)],
3647            vec![None, Some(0)],
3648            vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
3649        );
3650        let mut entries = Vec::new();
3651        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
3652        entries.extend_from_slice(&encode_avro_int(10));
3653        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
3654        entries.extend_from_slice(&encode_avro_int(20));
3655        let mut data = Vec::new();
3656        data.extend_from_slice(&encode_avro_long(-2));
3657        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
3658        data.extend_from_slice(&entries);
3659        data.extend_from_slice(&encode_avro_long(0));
3660        data.extend_from_slice(&encode_avro_int(123));
3661        let mut cur = AvroCursor::new(&data);
3662        dec.decode(&mut cur).unwrap();
3663        assert_eq!(cur.position(), data.len());
3664        let arr = dec.flush(None).unwrap();
3665        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3666        let id = s
3667            .column_by_name("id")
3668            .unwrap()
3669            .as_any()
3670            .downcast_ref::<Int32Array>()
3671            .unwrap();
3672        assert_eq!(id.len(), 1);
3673        assert_eq!(id.value(0), 123);
3674    }
3675
3676    #[test]
3677    fn test_skip_writer_nullable_field_union_nullfirst() {
3678        let mut dec = make_record_resolved_decoder(
3679            &[("id", DataType::Int32, false)],
3680            vec![None, Some(0)],
3681            vec![
3682                Some(super::Skipper::Nullable(
3683                    Nullability::NullFirst,
3684                    Box::new(super::Skipper::Int32),
3685                )),
3686                None,
3687            ],
3688        );
3689        let mut row1 = Vec::new();
3690        row1.extend_from_slice(&encode_avro_long(0));
3691        row1.extend_from_slice(&encode_avro_int(5));
3692        let mut row2 = Vec::new();
3693        row2.extend_from_slice(&encode_avro_long(1));
3694        row2.extend_from_slice(&encode_avro_int(123));
3695        row2.extend_from_slice(&encode_avro_int(7));
3696        let mut cur1 = AvroCursor::new(&row1);
3697        let mut cur2 = AvroCursor::new(&row2);
3698        dec.decode(&mut cur1).unwrap();
3699        dec.decode(&mut cur2).unwrap();
3700        assert_eq!(cur1.position(), row1.len());
3701        assert_eq!(cur2.position(), row2.len());
3702        let arr = dec.flush(None).unwrap();
3703        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3704        let id = s
3705            .column_by_name("id")
3706            .unwrap()
3707            .as_any()
3708            .downcast_ref::<Int32Array>()
3709            .unwrap();
3710        assert_eq!(id.len(), 2);
3711        assert_eq!(id.value(0), 5);
3712        assert_eq!(id.value(1), 7);
3713    }
3714
3715    fn make_dense_union_avro(
3716        children: Vec<(Codec, &'_ str, DataType)>,
3717        type_ids: Vec<i8>,
3718    ) -> AvroDataType {
3719        let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
3720        let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
3721        for (codec, name, dt) in children.into_iter() {
3722            avro_children.push(AvroDataType::new(codec, Default::default(), None));
3723            fields.push(arrow_schema::Field::new(name, dt, true));
3724        }
3725        let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
3726        let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
3727        AvroDataType::new(union_codec, Default::default(), None)
3728    }
3729
3730    #[test]
3731    fn test_union_dense_two_children_custom_type_ids() {
3732        let union_dt = make_dense_union_avro(
3733            vec![
3734                (Codec::Int32, "i", DataType::Int32),
3735                (Codec::Utf8, "s", DataType::Utf8),
3736            ],
3737            vec![2, 5],
3738        );
3739        let mut dec = Decoder::try_new(&union_dt).unwrap();
3740        let mut r1 = Vec::new();
3741        r1.extend_from_slice(&encode_avro_long(0));
3742        r1.extend_from_slice(&encode_avro_int(7));
3743        let mut r2 = Vec::new();
3744        r2.extend_from_slice(&encode_avro_long(1));
3745        r2.extend_from_slice(&encode_avro_bytes(b"x"));
3746        let mut r3 = Vec::new();
3747        r3.extend_from_slice(&encode_avro_long(0));
3748        r3.extend_from_slice(&encode_avro_int(-1));
3749        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3750        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3751        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3752        let array = dec.flush(None).unwrap();
3753        let ua = array
3754            .as_any()
3755            .downcast_ref::<UnionArray>()
3756            .expect("expected UnionArray");
3757        assert_eq!(ua.len(), 3);
3758        assert_eq!(ua.type_id(0), 2);
3759        assert_eq!(ua.type_id(1), 5);
3760        assert_eq!(ua.type_id(2), 2);
3761        assert_eq!(ua.value_offset(0), 0);
3762        assert_eq!(ua.value_offset(1), 0);
3763        assert_eq!(ua.value_offset(2), 1);
3764        let int_child = ua
3765            .child(2)
3766            .as_any()
3767            .downcast_ref::<Int32Array>()
3768            .expect("int child");
3769        assert_eq!(int_child.len(), 2);
3770        assert_eq!(int_child.value(0), 7);
3771        assert_eq!(int_child.value(1), -1);
3772        let str_child = ua
3773            .child(5)
3774            .as_any()
3775            .downcast_ref::<StringArray>()
3776            .expect("string child");
3777        assert_eq!(str_child.len(), 1);
3778        assert_eq!(str_child.value(0), "x");
3779    }
3780
3781    #[test]
3782    fn test_union_dense_with_null_and_string_children() {
3783        let union_dt = make_dense_union_avro(
3784            vec![
3785                (Codec::Null, "n", DataType::Null),
3786                (Codec::Utf8, "s", DataType::Utf8),
3787            ],
3788            vec![42, 7],
3789        );
3790        let mut dec = Decoder::try_new(&union_dt).unwrap();
3791        let r1 = encode_avro_long(0);
3792        let mut r2 = Vec::new();
3793        r2.extend_from_slice(&encode_avro_long(1));
3794        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
3795        let r3 = encode_avro_long(0);
3796        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3797        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3798        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3799        let array = dec.flush(None).unwrap();
3800        let ua = array
3801            .as_any()
3802            .downcast_ref::<UnionArray>()
3803            .expect("expected UnionArray");
3804        assert_eq!(ua.len(), 3);
3805        assert_eq!(ua.type_id(0), 42);
3806        assert_eq!(ua.type_id(1), 7);
3807        assert_eq!(ua.type_id(2), 42);
3808        assert_eq!(ua.value_offset(0), 0);
3809        assert_eq!(ua.value_offset(1), 0);
3810        assert_eq!(ua.value_offset(2), 1);
3811        let null_child = ua
3812            .child(42)
3813            .as_any()
3814            .downcast_ref::<NullArray>()
3815            .expect("null child");
3816        assert_eq!(null_child.len(), 2);
3817        let str_child = ua
3818            .child(7)
3819            .as_any()
3820            .downcast_ref::<StringArray>()
3821            .expect("string child");
3822        assert_eq!(str_child.len(), 1);
3823        assert_eq!(str_child.value(0), "abc");
3824    }
3825
3826    #[test]
3827    fn test_union_decode_negative_branch_index_errors() {
3828        let union_dt = make_dense_union_avro(
3829            vec![
3830                (Codec::Int32, "i", DataType::Int32),
3831                (Codec::Utf8, "s", DataType::Utf8),
3832            ],
3833            vec![0, 1],
3834        );
3835        let mut dec = Decoder::try_new(&union_dt).unwrap();
3836        let row = encode_avro_long(-1); // decodes back to -1
3837        let err = dec
3838            .decode(&mut AvroCursor::new(&row))
3839            .expect_err("expected error for negative branch index");
3840        let msg = err.to_string();
3841        assert!(
3842            msg.contains("Negative union branch index"),
3843            "unexpected error message: {msg}"
3844        );
3845    }
3846
3847    #[test]
3848    fn test_union_decode_out_of_range_branch_index_errors() {
3849        let union_dt = make_dense_union_avro(
3850            vec![
3851                (Codec::Int32, "i", DataType::Int32),
3852                (Codec::Utf8, "s", DataType::Utf8),
3853            ],
3854            vec![10, 11],
3855        );
3856        let mut dec = Decoder::try_new(&union_dt).unwrap();
3857        let row = encode_avro_long(2);
3858        let err = dec
3859            .decode(&mut AvroCursor::new(&row))
3860            .expect_err("expected error for out-of-range branch index");
3861        let msg = err.to_string();
3862        assert!(
3863            msg.contains("out of range"),
3864            "unexpected error message: {msg}"
3865        );
3866    }
3867
3868    #[test]
3869    fn test_union_sparse_mode_not_supported() {
3870        let children: Vec<AvroDataType> = vec![
3871            AvroDataType::new(Codec::Int32, Default::default(), None),
3872            AvroDataType::new(Codec::Utf8, Default::default(), None),
3873        ];
3874        let uf = UnionFields::try_new(
3875            vec![1, 3],
3876            vec![
3877                arrow_schema::Field::new("i", DataType::Int32, true),
3878                arrow_schema::Field::new("s", DataType::Utf8, true),
3879            ],
3880        )
3881        .unwrap();
3882        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
3883        let dt = AvroDataType::new(codec, Default::default(), None);
3884        let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
3885        let msg = err.to_string();
3886        assert!(
3887            msg.contains("Sparse Arrow unions are not yet supported"),
3888            "unexpected error message: {msg}"
3889        );
3890    }
3891
3892    fn make_record_decoder_with_projector_defaults(
3893        reader_fields: &[(&str, DataType, bool)],
3894        field_defaults: Vec<Option<AvroLiteral>>,
3895        default_injections: Vec<(usize, AvroLiteral)>,
3896        writer_to_reader_len: usize,
3897    ) -> Decoder {
3898        assert_eq!(
3899            field_defaults.len(),
3900            reader_fields.len(),
3901            "field_defaults must have one entry per reader field"
3902        );
3903        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3904        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3905        for (name, dt, nullable) in reader_fields {
3906            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3907            let enc = match dt {
3908                DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
3909                DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
3910                DataType::Utf8 => Decoder::String(
3911                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3912                    Vec::with_capacity(DEFAULT_CAPACITY),
3913                ),
3914                other => panic!("Unsupported test field type in helper: {other:?}"),
3915            };
3916            encodings.push(enc);
3917        }
3918        let fields: Fields = field_refs.into();
3919        let skip_decoders: Vec<Option<Skipper>> =
3920            (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
3921        let projector = Projector {
3922            writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
3923            skip_decoders,
3924            field_defaults,
3925            default_injections: Arc::from(default_injections),
3926        };
3927        Decoder::Record(fields, encodings, Some(projector))
3928    }
3929
3930    #[test]
3931    fn test_default_append_int32_and_int64_from_int_and_long() {
3932        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3933        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
3934        let arr = d_i32.flush(None).unwrap();
3935        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3936        assert_eq!(a.len(), 1);
3937        assert_eq!(a.value(0), 42);
3938        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
3939        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
3940        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
3941        let arr64 = d_i64.flush(None).unwrap();
3942        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
3943        assert_eq!(a64.len(), 2);
3944        assert_eq!(a64.value(0), 5);
3945        assert_eq!(a64.value(1), 7);
3946    }
3947
3948    #[test]
3949    fn test_default_append_floats_and_doubles() {
3950        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
3951        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
3952        let arr32 = d_f32.flush(None).unwrap();
3953        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
3954        assert_eq!(a.value(0), 1.5);
3955        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
3956        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
3957        let arr64 = d_f64.flush(None).unwrap();
3958        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
3959        assert_eq!(b.value(0), 2.25);
3960    }
3961
3962    #[test]
3963    fn test_default_append_string_and_bytes() {
3964        let mut d_str = Decoder::String(
3965            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3966            Vec::with_capacity(DEFAULT_CAPACITY),
3967        );
3968        d_str
3969            .append_default(&AvroLiteral::String("hi".into()))
3970            .unwrap();
3971        let s_arr = d_str.flush(None).unwrap();
3972        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
3973        assert_eq!(arr.value(0), "hi");
3974        let mut d_bytes = Decoder::Binary(
3975            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3976            Vec::with_capacity(DEFAULT_CAPACITY),
3977        );
3978        d_bytes
3979            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3980            .unwrap();
3981        let b_arr = d_bytes.flush(None).unwrap();
3982        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3983        assert_eq!(barr.value(0), &[1, 2, 3]);
3984        let mut d_str_err = Decoder::String(
3985            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3986            Vec::with_capacity(DEFAULT_CAPACITY),
3987        );
3988        let err = d_str_err
3989            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
3990            .unwrap_err();
3991        assert!(
3992            err.to_string()
3993                .contains("Default for string must be string"),
3994            "unexpected error: {err:?}"
3995        );
3996    }
3997
3998    #[test]
3999    fn test_default_append_nullable_int32_null_and_value() {
4000        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4001        let mut dec = Decoder::Nullable(
4002            Nullability::NullFirst,
4003            NullBufferBuilder::new(DEFAULT_CAPACITY),
4004            Box::new(inner),
4005            NullablePlan::ReadTag,
4006        );
4007        dec.append_default(&AvroLiteral::Null).unwrap();
4008        dec.append_default(&AvroLiteral::Int(11)).unwrap();
4009        let arr = dec.flush(None).unwrap();
4010        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4011        assert_eq!(a.len(), 2);
4012        assert!(a.is_null(0));
4013        assert_eq!(a.value(1), 11);
4014    }
4015
4016    #[test]
4017    fn test_default_append_array_of_ints() {
4018        let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4019        let mut d = Decoder::try_new(&list_dt).unwrap();
4020        let items = vec![
4021            AvroLiteral::Int(1),
4022            AvroLiteral::Int(2),
4023            AvroLiteral::Int(3),
4024        ];
4025        d.append_default(&AvroLiteral::Array(items)).unwrap();
4026        let arr = d.flush(None).unwrap();
4027        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4028        assert_eq!(list.len(), 1);
4029        assert_eq!(list.value_length(0), 3);
4030        let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4031        assert_eq!(vals.values(), &[1, 2, 3]);
4032    }
4033
4034    #[test]
4035    fn test_default_append_map_string_to_int() {
4036        let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4037        let mut d = Decoder::try_new(&map_dt).unwrap();
4038        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4039        m.insert("k1".to_string(), AvroLiteral::Int(10));
4040        m.insert("k2".to_string(), AvroLiteral::Int(20));
4041        d.append_default(&AvroLiteral::Map(m)).unwrap();
4042        let arr = d.flush(None).unwrap();
4043        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4044        assert_eq!(map.len(), 1);
4045        assert_eq!(map.value_length(0), 2);
4046        let binding = map.value(0);
4047        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4048        let k = entries
4049            .column_by_name("key")
4050            .unwrap()
4051            .as_any()
4052            .downcast_ref::<StringArray>()
4053            .unwrap();
4054        let v = entries
4055            .column_by_name("value")
4056            .unwrap()
4057            .as_any()
4058            .downcast_ref::<Int32Array>()
4059            .unwrap();
4060        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4061        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4062        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4063        assert_eq!(vals, [10, 20].into_iter().collect());
4064    }
4065
4066    #[test]
4067    fn test_default_append_enum_by_symbol() {
4068        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4069        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4070        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4071        let arr = d.flush(None).unwrap();
4072        let dict = arr
4073            .as_any()
4074            .downcast_ref::<DictionaryArray<Int32Type>>()
4075            .unwrap();
4076        assert_eq!(dict.len(), 1);
4077        let expected = Int32Array::from(vec![1]);
4078        assert_eq!(dict.keys(), &expected);
4079        let values = dict
4080            .values()
4081            .as_any()
4082            .downcast_ref::<StringArray>()
4083            .unwrap();
4084        assert_eq!(values.value(1), "B");
4085    }
4086
4087    #[test]
4088    fn test_default_append_uuid_and_type_error() {
4089        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4090        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4091        d.append_default(&AvroLiteral::String(uuid_str.into()))
4092            .unwrap();
4093        let arr_ref = d.flush(None).unwrap();
4094        let arr = arr_ref
4095            .as_any()
4096            .downcast_ref::<FixedSizeBinaryArray>()
4097            .unwrap();
4098        assert_eq!(arr.value_length(), 16);
4099        assert_eq!(arr.len(), 1);
4100        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4101        let err = d2
4102            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4103            .unwrap_err();
4104        assert!(
4105            err.to_string().contains("Default for uuid must be string"),
4106            "unexpected error: {err:?}"
4107        );
4108    }
4109
4110    #[test]
4111    fn test_default_append_fixed_and_length_mismatch() {
4112        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4113        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4114            .unwrap();
4115        let arr_ref = d.flush(None).unwrap();
4116        let arr = arr_ref
4117            .as_any()
4118            .downcast_ref::<FixedSizeBinaryArray>()
4119            .unwrap();
4120        assert_eq!(arr.value_length(), 4);
4121        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4122        let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4123        let err = d_err
4124            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4125            .unwrap_err();
4126        assert!(
4127            err.to_string().contains("Fixed default length"),
4128            "unexpected error: {err:?}"
4129        );
4130    }
4131
4132    #[test]
4133    fn test_default_append_duration_and_length_validation() {
4134        let dt = avro_from_codec(Codec::Interval);
4135        let mut d = Decoder::try_new(&dt).unwrap();
4136        let mut bytes = Vec::with_capacity(12);
4137        bytes.extend_from_slice(&1u32.to_le_bytes());
4138        bytes.extend_from_slice(&2u32.to_le_bytes());
4139        bytes.extend_from_slice(&3u32.to_le_bytes());
4140        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4141        let arr_ref = d.flush(None).unwrap();
4142        let arr = arr_ref
4143            .as_any()
4144            .downcast_ref::<IntervalMonthDayNanoArray>()
4145            .unwrap();
4146        assert_eq!(arr.len(), 1);
4147        let v = arr.value(0);
4148        assert_eq!(v.months, 1);
4149        assert_eq!(v.days, 2);
4150        assert_eq!(v.nanoseconds, 3_000_000);
4151        let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4152        let err = d_err
4153            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4154            .unwrap_err();
4155        assert!(
4156            err.to_string()
4157                .contains("Duration default must be exactly 12 bytes"),
4158            "unexpected error: {err:?}"
4159        );
4160    }
4161
4162    #[test]
4163    fn test_default_append_decimal256_from_bytes() {
4164        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4165        let mut d = Decoder::try_new(&dt).unwrap();
4166        let pos: [u8; 32] = [
4167            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4168            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4169            0x00, 0x00, 0x30, 0x39,
4170        ];
4171        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4172        let neg: [u8; 32] = [
4173            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4174            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4175            0xFF, 0xFF, 0xFF, 0x85,
4176        ];
4177        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4178        let arr = d.flush(None).unwrap();
4179        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4180        assert_eq!(dec.len(), 2);
4181        assert_eq!(dec.value_as_string(0), "123.45");
4182        assert_eq!(dec.value_as_string(1), "-1.23");
4183    }
4184
4185    #[test]
4186    fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4187        let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4188        let mut rec = make_record_decoder_with_projector_defaults(
4189            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4190            field_defaults,
4191            vec![],
4192            0,
4193        );
4194        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4195        map.insert("a".to_string(), AvroLiteral::Int(7));
4196        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4197        let arr = rec.flush(None).unwrap();
4198        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4199        let a = s
4200            .column_by_name("a")
4201            .unwrap()
4202            .as_any()
4203            .downcast_ref::<Int32Array>()
4204            .unwrap();
4205        let b = s
4206            .column_by_name("b")
4207            .unwrap()
4208            .as_any()
4209            .downcast_ref::<StringArray>()
4210            .unwrap();
4211        assert_eq!(a.value(0), 7);
4212        assert_eq!(b.value(0), "hi");
4213    }
4214
4215    #[test]
4216    fn test_record_append_default_null_uses_projector_field_defaults() {
4217        let field_defaults = vec![
4218            Some(AvroLiteral::Int(5)),
4219            Some(AvroLiteral::String("x".into())),
4220        ];
4221        let mut rec = make_record_decoder_with_projector_defaults(
4222            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4223            field_defaults,
4224            vec![],
4225            0,
4226        );
4227        rec.append_default(&AvroLiteral::Null).unwrap();
4228        let arr = rec.flush(None).unwrap();
4229        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4230        let a = s
4231            .column_by_name("a")
4232            .unwrap()
4233            .as_any()
4234            .downcast_ref::<Int32Array>()
4235            .unwrap();
4236        let b = s
4237            .column_by_name("b")
4238            .unwrap()
4239            .as_any()
4240            .downcast_ref::<StringArray>()
4241            .unwrap();
4242        assert_eq!(a.value(0), 5);
4243        assert_eq!(b.value(0), "x");
4244    }
4245
4246    #[test]
4247    fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
4248     {
4249        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
4250        let mut field_refs: Vec<FieldRef> = Vec::new();
4251        let mut encoders: Vec<Decoder> = Vec::new();
4252        for (name, dt, nullable) in &fields {
4253            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4254        }
4255        let enc_a = Decoder::Nullable(
4256            Nullability::NullSecond,
4257            NullBufferBuilder::new(DEFAULT_CAPACITY),
4258            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
4259            NullablePlan::ReadTag,
4260        );
4261        let enc_b = Decoder::Nullable(
4262            Nullability::NullSecond,
4263            NullBufferBuilder::new(DEFAULT_CAPACITY),
4264            Box::new(Decoder::String(
4265                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4266                Vec::with_capacity(DEFAULT_CAPACITY),
4267            )),
4268            NullablePlan::ReadTag,
4269        );
4270        encoders.push(enc_a);
4271        encoders.push(enc_b);
4272        let projector = Projector {
4273            writer_to_reader: Arc::from(vec![]),
4274            skip_decoders: vec![],
4275            field_defaults: vec![None, None], // no defaults -> append_null
4276            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4277        };
4278        let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
4279        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4280        map.insert("a".to_string(), AvroLiteral::Int(9));
4281        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4282        let arr = rec.flush(None).unwrap();
4283        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4284        let a = s
4285            .column_by_name("a")
4286            .unwrap()
4287            .as_any()
4288            .downcast_ref::<Int32Array>()
4289            .unwrap();
4290        let b = s
4291            .column_by_name("b")
4292            .unwrap()
4293            .as_any()
4294            .downcast_ref::<StringArray>()
4295            .unwrap();
4296        assert!(a.is_valid(0));
4297        assert_eq!(a.value(0), 9);
4298        assert!(b.is_null(0));
4299    }
4300
4301    #[test]
4302    fn test_projector_default_injection_when_writer_lacks_fields() {
4303        let defaults = vec![None, None];
4304        let injections = vec![
4305            (0, AvroLiteral::Int(99)),
4306            (1, AvroLiteral::String("alice".into())),
4307        ];
4308        let mut rec = make_record_decoder_with_projector_defaults(
4309            &[
4310                ("id", DataType::Int32, false),
4311                ("name", DataType::Utf8, false),
4312            ],
4313            defaults,
4314            injections,
4315            0,
4316        );
4317        rec.decode(&mut AvroCursor::new(&[])).unwrap();
4318        let arr = rec.flush(None).unwrap();
4319        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4320        let id = s
4321            .column_by_name("id")
4322            .unwrap()
4323            .as_any()
4324            .downcast_ref::<Int32Array>()
4325            .unwrap();
4326        let name = s
4327            .column_by_name("name")
4328            .unwrap()
4329            .as_any()
4330            .downcast_ref::<StringArray>()
4331            .unwrap();
4332        assert_eq!(id.value(0), 99);
4333        assert_eq!(name.value(0), "alice");
4334    }
4335
4336    #[test]
4337    fn union_type_ids_are_not_child_indexes() {
4338        let encodings: Vec<AvroDataType> =
4339            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4340        let fields: UnionFields = [
4341            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4342            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4343        ]
4344        .into_iter()
4345        .collect();
4346        let dt = avro_from_codec(Codec::Union(
4347            encodings.into(),
4348            fields.clone(),
4349            UnionMode::Dense,
4350        ));
4351        let mut dec = Decoder::try_new(&dt).expect("decoder");
4352        let mut b1 = encode_avro_long(1);
4353        b1.extend(encode_avro_bytes("hi".as_bytes()));
4354        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4355        let mut b0 = encode_avro_long(0);
4356        b0.extend(encode_avro_int(5));
4357        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4358        let arr = dec.flush(None).expect("flush");
4359        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4360        assert_eq!(ua.len(), 2);
4361        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4362        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4363        assert_eq!(ua.value_offset(0), 0);
4364        assert_eq!(ua.value_offset(1), 0);
4365        let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4366        assert_eq!(utf8_child.len(), 1);
4367        assert_eq!(utf8_child.value(0), "hi");
4368        let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
4369        assert_eq!(int_child.len(), 1);
4370        assert_eq!(int_child.value(0), 5);
4371        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
4372        assert_eq!(type_ids, vec![42_i8, 7_i8]);
4373    }
4374
4375    #[cfg(feature = "avro_custom_types")]
4376    #[test]
4377    fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
4378        for codec in [
4379            Codec::DurationNanos,
4380            Codec::DurationMicros,
4381            Codec::DurationMillis,
4382            Codec::DurationSeconds,
4383        ] {
4384            let dt = make_avro_dt(codec.clone(), None);
4385            let s = Skipper::from_avro(&dt)?;
4386            match s {
4387                Skipper::Int64 => {}
4388                other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
4389            }
4390        }
4391        Ok(())
4392    }
4393
4394    #[cfg(feature = "avro_custom_types")]
4395    #[test]
4396    fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
4397        let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
4398        for codec in [
4399            Codec::DurationNanos,
4400            Codec::DurationMicros,
4401            Codec::DurationMillis,
4402            Codec::DurationSeconds,
4403        ] {
4404            let dt = make_avro_dt(codec.clone(), None);
4405            let mut s = Skipper::from_avro(&dt)?;
4406            for &v in &values {
4407                let bytes = encode_avro_long(v);
4408                let mut cursor = AvroCursor::new(&bytes);
4409                s.skip(&mut cursor)?;
4410                assert_eq!(
4411                    cursor.position(),
4412                    bytes.len(),
4413                    "did not consume all bytes for {:?} value {}",
4414                    codec,
4415                    v
4416                );
4417            }
4418        }
4419        Ok(())
4420    }
4421
4422    #[cfg(feature = "avro_custom_types")]
4423    #[test]
4424    fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
4425        let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
4426        let mut s = Skipper::from_avro(&dt)?;
4427        match &s {
4428            Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
4429                Skipper::Int64 => {}
4430                ref other => panic!("expected inner Int64, got {:?}", other),
4431            },
4432            other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
4433        }
4434        {
4435            let buf = encode_vlq_u64(0);
4436            let mut cursor = AvroCursor::new(&buf);
4437            s.skip(&mut cursor)?;
4438            assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
4439        }
4440        {
4441            let mut buf = encode_vlq_u64(1);
4442            buf.extend(encode_avro_long(0));
4443            let mut cursor = AvroCursor::new(&buf);
4444            s.skip(&mut cursor)?;
4445            assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
4446        }
4447
4448        Ok(())
4449    }
4450
4451    #[cfg(feature = "avro_custom_types")]
4452    #[test]
4453    fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
4454        let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
4455        let mut s = Skipper::from_avro(&dt)?;
4456        match &s {
4457            Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
4458                Skipper::Int64 => {}
4459                ref other => panic!("expected inner Int64, got {:?}", other),
4460            },
4461            other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
4462        }
4463        {
4464            let buf = encode_vlq_u64(1);
4465            let mut cursor = AvroCursor::new(&buf);
4466            s.skip(&mut cursor)?;
4467            assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
4468        }
4469        {
4470            let mut buf = encode_vlq_u64(0);
4471            buf.extend(encode_avro_long(-1));
4472            let mut cursor = AvroCursor::new(&buf);
4473            s.skip(&mut cursor)?;
4474            assert_eq!(
4475                cursor.position(),
4476                1 + encode_avro_long(-1).len(),
4477                "expected to consume tag=0 + long(-1)"
4478            );
4479        }
4480        Ok(())
4481    }
4482
4483    #[test]
4484    fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
4485        let dt = make_avro_dt(Codec::Interval, None);
4486        let mut s = Skipper::from_avro(&dt)?;
4487        match s {
4488            Skipper::DurationFixed12 => {}
4489            other => panic!("expected DurationFixed12, got {:?}", other),
4490        }
4491        let payload = vec![0u8; 12];
4492        let mut cursor = AvroCursor::new(&payload);
4493        s.skip(&mut cursor)?;
4494        assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
4495        Ok(())
4496    }
4497
4498    #[cfg(feature = "avro_custom_types")]
4499    #[test]
4500    fn test_run_end_encoded_width16_int32_basic_grouping() {
4501        use arrow_array::RunArray;
4502        use std::sync::Arc;
4503        let inner = avro_from_codec(Codec::Int32);
4504        let ree = AvroDataType::new(
4505            Codec::RunEndEncoded(Arc::new(inner), 16),
4506            Default::default(),
4507            None,
4508        );
4509        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4510        for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
4511            let bytes = encode_avro_int(v);
4512            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4513        }
4514        let arr = dec.flush(None).expect("flush");
4515        let ra = arr
4516            .as_any()
4517            .downcast_ref::<RunArray<Int16Type>>()
4518            .expect("RunArray<Int16Type>");
4519        assert_eq!(ra.len(), 9);
4520        assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
4521        let vals = ra
4522            .values()
4523            .as_ref()
4524            .as_any()
4525            .downcast_ref::<Int32Array>()
4526            .expect("values Int32");
4527        assert_eq!(vals.values(), &[1, 2, 3]);
4528    }
4529
4530    #[cfg(feature = "avro_custom_types")]
4531    #[test]
4532    fn test_run_end_encoded_width32_nullable_values_group_nulls() {
4533        use arrow_array::RunArray;
4534        use std::sync::Arc;
4535        let inner = AvroDataType::new(
4536            Codec::Int32,
4537            Default::default(),
4538            Some(Nullability::NullSecond),
4539        );
4540        let ree = AvroDataType::new(
4541            Codec::RunEndEncoded(Arc::new(inner), 32),
4542            Default::default(),
4543            None,
4544        );
4545        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4546        let seq: [Option<i32>; 8] = [
4547            None,
4548            None,
4549            Some(7),
4550            Some(7),
4551            Some(7),
4552            None,
4553            Some(5),
4554            Some(5),
4555        ];
4556        for item in seq {
4557            let mut bytes = Vec::new();
4558            match item {
4559                None => bytes.extend_from_slice(&encode_vlq_u64(1)),
4560                Some(v) => {
4561                    bytes.extend_from_slice(&encode_vlq_u64(0));
4562                    bytes.extend_from_slice(&encode_avro_int(v));
4563                }
4564            }
4565            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4566        }
4567        let arr = dec.flush(None).expect("flush");
4568        let ra = arr
4569            .as_any()
4570            .downcast_ref::<RunArray<Int32Type>>()
4571            .expect("RunArray<Int32Type>");
4572        assert_eq!(ra.len(), 8);
4573        assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
4574        let vals = ra
4575            .values()
4576            .as_ref()
4577            .as_any()
4578            .downcast_ref::<Int32Array>()
4579            .expect("values Int32 (nullable)");
4580        assert_eq!(vals.len(), 4);
4581        assert!(vals.is_null(0));
4582        assert_eq!(vals.value(1), 7);
4583        assert!(vals.is_null(2));
4584        assert_eq!(vals.value(3), 5);
4585    }
4586
4587    #[cfg(feature = "avro_custom_types")]
4588    #[test]
4589    fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
4590        use arrow_array::RunArray;
4591        let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4592        let ree = Decoder::RunEndEncoded(
4593            8, /* bytes => Int64 run-ends */
4594            0,
4595            Box::new(inner_values),
4596        );
4597        let mut dec = Decoder::Nullable(
4598            Nullability::NullSecond,
4599            NullBufferBuilder::new(DEFAULT_CAPACITY),
4600            Box::new(ree),
4601            NullablePlan::FromSingle {
4602                promotion: Promotion::IntToDouble,
4603            },
4604        );
4605        for v in [1, 1, 2, 2, 2] {
4606            let bytes = encode_avro_int(v);
4607            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4608        }
4609        let arr = dec.flush(None).expect("flush");
4610        let ra = arr
4611            .as_any()
4612            .downcast_ref::<RunArray<Int64Type>>()
4613            .expect("RunArray<Int64Type>");
4614        assert_eq!(ra.len(), 5);
4615        assert_eq!(ra.run_ends().values(), &[2, 5]);
4616        let vals = ra
4617            .values()
4618            .as_ref()
4619            .as_any()
4620            .downcast_ref::<Float64Array>()
4621            .expect("values Float64");
4622        assert_eq!(vals.values(), &[1.0, 2.0]);
4623    }
4624
4625    #[cfg(feature = "avro_custom_types")]
4626    #[test]
4627    fn test_run_end_encoded_unsupported_run_end_width_errors() {
4628        use std::sync::Arc;
4629        let inner = avro_from_codec(Codec::Int32);
4630        let dt = AvroDataType::new(
4631            Codec::RunEndEncoded(Arc::new(inner), 3),
4632            Default::default(),
4633            None,
4634        );
4635        let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
4636        let msg = err.to_string();
4637        assert!(
4638            msg.contains("Unsupported run-end width")
4639                && msg.contains("16/32/64 bits or 2/4/8 bytes"),
4640            "unexpected error message: {msg}"
4641        );
4642    }
4643
4644    #[cfg(feature = "avro_custom_types")]
4645    #[test]
4646    fn test_run_end_encoded_empty_input_is_empty_runarray() {
4647        use arrow_array::RunArray;
4648        use std::sync::Arc;
4649        let inner = avro_from_codec(Codec::Utf8);
4650        let dt = AvroDataType::new(
4651            Codec::RunEndEncoded(Arc::new(inner), 4),
4652            Default::default(),
4653            None,
4654        );
4655        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4656        let arr = dec.flush(None).expect("flush");
4657        let ra = arr
4658            .as_any()
4659            .downcast_ref::<RunArray<Int32Type>>()
4660            .expect("RunArray<Int32Type>");
4661        assert_eq!(ra.len(), 0);
4662        assert_eq!(ra.run_ends().len(), 0);
4663        assert_eq!(ra.values().len(), 0);
4664    }
4665
4666    #[cfg(feature = "avro_custom_types")]
4667    #[test]
4668    fn test_run_end_encoded_strings_grouping_width32_bits() {
4669        use arrow_array::RunArray;
4670        use std::sync::Arc;
4671        let inner = avro_from_codec(Codec::Utf8);
4672        let dt = AvroDataType::new(
4673            Codec::RunEndEncoded(Arc::new(inner), 32),
4674            Default::default(),
4675            None,
4676        );
4677        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4678        for s in ["a", "a", "bb", "bb", "bb", "a"] {
4679            let bytes = encode_avro_bytes(s.as_bytes());
4680            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4681        }
4682        let arr = dec.flush(None).expect("flush");
4683        let ra = arr
4684            .as_any()
4685            .downcast_ref::<RunArray<Int32Type>>()
4686            .expect("RunArray<Int32Type>");
4687        assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
4688        let vals = ra
4689            .values()
4690            .as_ref()
4691            .as_any()
4692            .downcast_ref::<StringArray>()
4693            .expect("values String");
4694        assert_eq!(vals.len(), 3);
4695        assert_eq!(vals.value(0), "a");
4696        assert_eq!(vals.value(1), "bb");
4697        assert_eq!(vals.value(2), "a");
4698    }
4699
4700    #[cfg(not(feature = "avro_custom_types"))]
4701    #[test]
4702    fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
4703        let dt = avro_from_codec(Codec::Int32);
4704        let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
4705        for v in [1, 2, 3] {
4706            let bytes = encode_avro_int(v);
4707            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4708        }
4709        let arr = dec.flush(None).expect("flush");
4710        let a = arr
4711            .as_any()
4712            .downcast_ref::<Int32Array>()
4713            .expect("Int32Array");
4714        assert_eq!(a.values(), &[1, 2, 3]);
4715    }
4716
4717    #[test]
4718    fn test_timestamp_nanos_decoding_utc() {
4719        let avro_type = avro_from_codec(Codec::TimestampNanos(true));
4720        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4721        let mut data = Vec::new();
4722        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
4723            data.extend_from_slice(&encode_avro_long(v));
4724        }
4725        let mut cur = AvroCursor::new(&data);
4726        for _ in 0..4 {
4727            decoder.decode(&mut cur).expect("decode nanos ts");
4728        }
4729        let array = decoder.flush(None).expect("flush nanos ts");
4730        let ts = array
4731            .as_any()
4732            .downcast_ref::<TimestampNanosecondArray>()
4733            .expect("TimestampNanosecondArray");
4734        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
4735        match ts.data_type() {
4736            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4737                assert_eq!(tz.as_deref(), Some("+00:00"));
4738            }
4739            other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
4740        }
4741    }
4742
4743    #[test]
4744    fn test_timestamp_nanos_decoding_local() {
4745        let avro_type = avro_from_codec(Codec::TimestampNanos(false));
4746        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4747        let mut data = Vec::new();
4748        for v in [10_i64, 20_i64, -30_i64] {
4749            data.extend_from_slice(&encode_avro_long(v));
4750        }
4751        let mut cur = AvroCursor::new(&data);
4752        for _ in 0..3 {
4753            decoder.decode(&mut cur).expect("decode nanos ts");
4754        }
4755        let array = decoder.flush(None).expect("flush nanos ts");
4756        let ts = array
4757            .as_any()
4758            .downcast_ref::<TimestampNanosecondArray>()
4759            .expect("TimestampNanosecondArray");
4760        assert_eq!(ts.values(), &[10, 20, -30]);
4761        match ts.data_type() {
4762            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4763                assert_eq!(tz.as_deref(), None);
4764            }
4765            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4766        }
4767    }
4768
4769    #[test]
4770    fn test_timestamp_nanos_decoding_with_nulls() {
4771        let avro_type = AvroDataType::new(
4772            Codec::TimestampNanos(false),
4773            Default::default(),
4774            Some(Nullability::NullFirst),
4775        );
4776        let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
4777        let mut data = Vec::new();
4778        data.extend_from_slice(&encode_avro_long(1));
4779        data.extend_from_slice(&encode_avro_long(42));
4780        data.extend_from_slice(&encode_avro_long(0));
4781        data.extend_from_slice(&encode_avro_long(1));
4782        data.extend_from_slice(&encode_avro_long(-7));
4783        let mut cur = AvroCursor::new(&data);
4784        for _ in 0..3 {
4785            decoder.decode(&mut cur).expect("decode nullable nanos ts");
4786        }
4787        let array = decoder.flush(None).expect("flush nullable nanos ts");
4788        let ts = array
4789            .as_any()
4790            .downcast_ref::<TimestampNanosecondArray>()
4791            .expect("TimestampNanosecondArray");
4792        assert_eq!(ts.len(), 3);
4793        assert!(ts.is_valid(0));
4794        assert!(ts.is_null(1));
4795        assert!(ts.is_valid(2));
4796        assert_eq!(ts.value(0), 42);
4797        assert_eq!(ts.value(2), -7);
4798        match ts.data_type() {
4799            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4800                assert_eq!(tz.as_deref(), None);
4801            }
4802            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4803        }
4804    }
4805}