arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Decoder for Arrow types.
19
20use crate::codec::{
21    AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
22    ResolvedUnion,
23};
24use crate::reader::cursor::AvroCursor;
25use crate::schema::Nullability;
26#[cfg(feature = "small_decimals")]
27use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
28use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
29use arrow_array::types::*;
30use arrow_array::*;
31use arrow_buffer::*;
32use arrow_schema::{
33    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField,
34    FieldRef, Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
35};
36#[cfg(feature = "small_decimals")]
37use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
38#[cfg(feature = "avro_custom_types")]
39use arrow_select::take::{TakeOptions, take};
40use std::cmp::Ordering;
41use std::sync::Arc;
42use strum_macros::AsRefStr;
43use uuid::Uuid;
44
45const DEFAULT_CAPACITY: usize = 1024;
46
47/// Runtime plan for decoding reader-side `["null", T]` types.
48#[derive(Clone, Copy, Debug)]
49enum NullablePlan {
50    /// Writer actually wrote a union (branch tag present).
51    ReadTag,
52    /// Writer wrote a single (non-union) value resolved to the non-null branch
53    /// of the reader union; do NOT read a branch tag, but apply any promotion.
54    FromSingle { promotion: Promotion },
55}
56
57/// Macro to decode a decimal payload for a given width and integer type.
58macro_rules! decode_decimal {
59    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
60        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
61        $builder.append_value(<$Int>::from_be_bytes(bytes));
62    }};
63}
64
65/// Macro to finish a decimal builder into an array with precision/scale and nulls.
66macro_rules! flush_decimal {
67    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
68        let (_, vals, _) = $builder.finish().into_parts();
69        let dec = <$ArrayTy>::try_new(vals, $nulls)?
70            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)
71            .map_err(|e| ArrowError::ParseError(e.to_string()))?;
72        Arc::new(dec) as ArrayRef
73    }};
74}
75
76/// Macro to append a default decimal value from two's-complement big-endian bytes
77/// into the corresponding decimal builder, with compile-time constructed error text.
78macro_rules! append_decimal_default {
79    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
80        match $lit {
81            AvroLiteral::Bytes(b) => {
82                let ext = sign_cast_to::<$N>(b)?;
83                let val = <$Int>::from_be_bytes(ext);
84                $builder.append_value(val);
85                Ok(())
86            }
87            _ => Err(ArrowError::InvalidArgumentError(
88                concat!(
89                    "Default for ",
90                    $name,
91                    " must be bytes (two's-complement big-endian)"
92                )
93                .to_string(),
94            )),
95        }
96    }};
97}
98
99/// Decodes avro encoded data into [`RecordBatch`]
100#[derive(Debug)]
101pub(crate) struct RecordDecoder {
102    schema: SchemaRef,
103    fields: Vec<Decoder>,
104    projector: Option<Projector>,
105}
106
107impl RecordDecoder {
108    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
109    ///
110    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
111    ///
112    /// # Arguments
113    /// * `data_type` - The Avro data type to decode.
114    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
115    ///
116    /// # Errors
117    /// This function will return an error if the provided `data_type` is not a `Record`.
118    pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, ArrowError> {
119        match data_type.codec() {
120            Codec::Struct(reader_fields) => {
121                // Build Arrow schema fields and per-child decoders
122                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
123                let mut encodings = Vec::with_capacity(reader_fields.len());
124                for avro_field in reader_fields.iter() {
125                    arrow_fields.push(avro_field.field());
126                    encodings.push(Decoder::try_new(avro_field.data_type())?);
127                }
128                let projector = match data_type.resolution.as_ref() {
129                    Some(ResolutionInfo::Record(rec)) => {
130                        Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
131                    }
132                    _ => None,
133                };
134                Ok(Self {
135                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
136                    fields: encodings,
137                    projector,
138                })
139            }
140            other => Err(ArrowError::ParseError(format!(
141                "Expected record got {other:?}"
142            ))),
143        }
144    }
145
146    /// Returns the decoder's `SchemaRef`
147    pub(crate) fn schema(&self) -> &SchemaRef {
148        &self.schema
149    }
150
151    /// Decode `count` records from `buf`
152    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> {
153        let mut cursor = AvroCursor::new(buf);
154        match self.projector.as_mut() {
155            Some(proj) => {
156                for _ in 0..count {
157                    proj.project_record(&mut cursor, &mut self.fields)?;
158                }
159            }
160            None => {
161                for _ in 0..count {
162                    for field in &mut self.fields {
163                        field.decode(&mut cursor)?;
164                    }
165                }
166            }
167        }
168        Ok(cursor.position())
169    }
170
171    /// Flush the decoded records into a [`RecordBatch`]
172    pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
173        let arrays = self
174            .fields
175            .iter_mut()
176            .map(|x| x.flush(None))
177            .collect::<Result<Vec<_>, _>>()?;
178        RecordBatch::try_new(self.schema.clone(), arrays)
179    }
180}
181
182#[derive(Debug)]
183struct EnumResolution {
184    mapping: Arc<[i32]>,
185    default_index: i32,
186}
187
188#[derive(Debug, AsRefStr)]
189enum Decoder {
190    Null(usize),
191    Boolean(BooleanBufferBuilder),
192    Int32(Vec<i32>),
193    Int64(Vec<i64>),
194    #[cfg(feature = "avro_custom_types")]
195    DurationSecond(Vec<i64>),
196    #[cfg(feature = "avro_custom_types")]
197    DurationMillisecond(Vec<i64>),
198    #[cfg(feature = "avro_custom_types")]
199    DurationMicrosecond(Vec<i64>),
200    #[cfg(feature = "avro_custom_types")]
201    DurationNanosecond(Vec<i64>),
202    Float32(Vec<f32>),
203    Float64(Vec<f64>),
204    Date32(Vec<i32>),
205    TimeMillis(Vec<i32>),
206    TimeMicros(Vec<i64>),
207    TimestampMillis(bool, Vec<i64>),
208    TimestampMicros(bool, Vec<i64>),
209    TimestampNanos(bool, Vec<i64>),
210    Int32ToInt64(Vec<i64>),
211    Int32ToFloat32(Vec<f32>),
212    Int32ToFloat64(Vec<f64>),
213    Int64ToFloat32(Vec<f32>),
214    Int64ToFloat64(Vec<f64>),
215    Float32ToFloat64(Vec<f64>),
216    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
217    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
218    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
219    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
220    String(OffsetBufferBuilder<i32>, Vec<u8>),
221    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
222    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
223    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
224    Record(Fields, Vec<Decoder>, Option<Projector>),
225    Map(
226        FieldRef,
227        OffsetBufferBuilder<i32>,
228        OffsetBufferBuilder<i32>,
229        Vec<u8>,
230        Box<Decoder>,
231    ),
232    Fixed(i32, Vec<u8>),
233    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
234    Duration(IntervalMonthDayNanoBuilder),
235    Uuid(Vec<u8>),
236    #[cfg(feature = "small_decimals")]
237    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
238    #[cfg(feature = "small_decimals")]
239    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
240    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
241    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
242    #[cfg(feature = "avro_custom_types")]
243    RunEndEncoded(u8, usize, Box<Decoder>),
244    Union(UnionDecoder),
245    Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
246}
247
248impl Decoder {
249    fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
250        if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
251            if info.writer_is_union && !info.reader_is_union {
252                let mut clone = data_type.clone();
253                clone.resolution = None; // Build target base decoder without Union resolution
254                let target = Box::new(Self::try_new_internal(&clone)?);
255                let decoder = Self::Union(
256                    UnionDecoderBuilder::new()
257                        .with_resolved_union(info.clone())
258                        .with_target(target)
259                        .build()?,
260                );
261                return Ok(decoder);
262            }
263        }
264        Self::try_new_internal(data_type)
265    }
266
267    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, ArrowError> {
268        // Extract just the Promotion (if any) to simplify pattern matching
269        let promotion = match data_type.resolution.as_ref() {
270            Some(ResolutionInfo::Promotion(p)) => Some(p),
271            _ => None,
272        };
273        let decoder = match (data_type.codec(), promotion) {
274            (Codec::Int64, Some(Promotion::IntToLong)) => {
275                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
276            }
277            (Codec::Float32, Some(Promotion::IntToFloat)) => {
278                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
279            }
280            (Codec::Float64, Some(Promotion::IntToDouble)) => {
281                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
282            }
283            (Codec::Float32, Some(Promotion::LongToFloat)) => {
284                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
285            }
286            (Codec::Float64, Some(Promotion::LongToDouble)) => {
287                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
288            }
289            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
290                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
291            }
292            (Codec::Utf8, Some(Promotion::BytesToString))
293            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
294                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
295                Vec::with_capacity(DEFAULT_CAPACITY),
296            ),
297            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
298                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
299                Vec::with_capacity(DEFAULT_CAPACITY),
300            ),
301            (Codec::Null, _) => Self::Null(0),
302            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
303            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
304            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
305            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
306            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
307            (Codec::Binary, _) => Self::Binary(
308                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
309                Vec::with_capacity(DEFAULT_CAPACITY),
310            ),
311            (Codec::Utf8, _) => Self::String(
312                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
313                Vec::with_capacity(DEFAULT_CAPACITY),
314            ),
315            (Codec::Utf8View, _) => Self::StringView(
316                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
317                Vec::with_capacity(DEFAULT_CAPACITY),
318            ),
319            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
320            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
321            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
322            (Codec::TimestampMillis(is_utc), _) => {
323                Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
324            }
325            (Codec::TimestampMicros(is_utc), _) => {
326                Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
327            }
328            (Codec::TimestampNanos(is_utc), _) => {
329                Self::TimestampNanos(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
330            }
331            #[cfg(feature = "avro_custom_types")]
332            (Codec::DurationNanos, _) => {
333                Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
334            }
335            #[cfg(feature = "avro_custom_types")]
336            (Codec::DurationMicros, _) => {
337                Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
338            }
339            #[cfg(feature = "avro_custom_types")]
340            (Codec::DurationMillis, _) => {
341                Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
342            }
343            #[cfg(feature = "avro_custom_types")]
344            (Codec::DurationSeconds, _) => {
345                Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
346            }
347            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
348            (Codec::Decimal(precision, scale, size), _) => {
349                let p = *precision;
350                let s = *scale;
351                let prec = p as u8;
352                let scl = s.unwrap_or(0) as i8;
353                #[cfg(feature = "small_decimals")]
354                {
355                    if p <= DECIMAL32_MAX_PRECISION as usize {
356                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
357                            .with_precision_and_scale(prec, scl)?;
358                        Self::Decimal32(p, s, *size, builder)
359                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
360                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
361                            .with_precision_and_scale(prec, scl)?;
362                        Self::Decimal64(p, s, *size, builder)
363                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
364                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
365                            .with_precision_and_scale(prec, scl)?;
366                        Self::Decimal128(p, s, *size, builder)
367                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
368                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
369                            .with_precision_and_scale(prec, scl)?;
370                        Self::Decimal256(p, s, *size, builder)
371                    } else {
372                        return Err(ArrowError::ParseError(format!(
373                            "Decimal precision {p} exceeds maximum supported"
374                        )));
375                    }
376                }
377                #[cfg(not(feature = "small_decimals"))]
378                {
379                    if p <= DECIMAL128_MAX_PRECISION as usize {
380                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
381                            .with_precision_and_scale(prec, scl)?;
382                        Self::Decimal128(p, s, *size, builder)
383                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
384                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
385                            .with_precision_and_scale(prec, scl)?;
386                        Self::Decimal256(p, s, *size, builder)
387                    } else {
388                        return Err(ArrowError::ParseError(format!(
389                            "Decimal precision {p} exceeds maximum supported"
390                        )));
391                    }
392                }
393            }
394            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
395            (Codec::List(item), _) => {
396                let decoder = Self::try_new(item)?;
397                Self::Array(
398                    Arc::new(item.field_with_name("item")),
399                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
400                    Box::new(decoder),
401                )
402            }
403            (Codec::Enum(symbols), _) => {
404                let res = match data_type.resolution.as_ref() {
405                    Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
406                        mapping: mapping.mapping.clone(),
407                        default_index: mapping.default_index,
408                    }),
409                    _ => None,
410                };
411                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
412            }
413            (Codec::Struct(fields), _) => {
414                let mut arrow_fields = Vec::with_capacity(fields.len());
415                let mut encodings = Vec::with_capacity(fields.len());
416                for avro_field in fields.iter() {
417                    let encoding = Self::try_new(avro_field.data_type())?;
418                    arrow_fields.push(avro_field.field());
419                    encodings.push(encoding);
420                }
421                let projector =
422                    if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
423                        Some(ProjectorBuilder::try_new(rec, fields).build()?)
424                    } else {
425                        None
426                    };
427                Self::Record(arrow_fields.into(), encodings, projector)
428            }
429            (Codec::Map(child), _) => {
430                let val_field = child.field_with_name("value");
431                let map_field = Arc::new(ArrowField::new(
432                    "entries",
433                    DataType::Struct(Fields::from(vec![
434                        ArrowField::new("key", DataType::Utf8, false),
435                        val_field,
436                    ])),
437                    false,
438                ));
439                let val_dec = Self::try_new(child)?;
440                Self::Map(
441                    map_field,
442                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
443                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
444                    Vec::with_capacity(DEFAULT_CAPACITY),
445                    Box::new(val_dec),
446                )
447            }
448            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
449            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
450                let decoders = encodings
451                    .iter()
452                    .map(Self::try_new_internal)
453                    .collect::<Result<Vec<_>, _>>()?;
454                if fields.len() != decoders.len() {
455                    return Err(ArrowError::SchemaError(format!(
456                        "Union has {} fields but {} decoders",
457                        fields.len(),
458                        decoders.len()
459                    )));
460                }
461                // Proactive guard: if a user provides a union with more branches than
462                // a 32-bit Avro index can address, fail fast with a clear message.
463                let branch_count = decoders.len();
464                let max_addr = (i32::MAX as usize) + 1;
465                if branch_count > max_addr {
466                    return Err(ArrowError::SchemaError(format!(
467                        "Union has {branch_count} branches, which exceeds the maximum addressable \
468                         branches by an Avro int tag ({} + 1).",
469                        i32::MAX
470                    )));
471                }
472                let mut builder = UnionDecoderBuilder::new()
473                    .with_fields(fields.clone())
474                    .with_branches(decoders);
475                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
476                    if info.reader_is_union {
477                        builder = builder.with_resolved_union(info.clone());
478                    }
479                }
480                Self::Union(builder.build()?)
481            }
482            (Codec::Union(_, _, _), _) => {
483                return Err(ArrowError::NotYetImplemented(
484                    "Sparse Arrow unions are not yet supported".to_string(),
485                ));
486            }
487            #[cfg(feature = "avro_custom_types")]
488            (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
489                let inner = Self::try_new(values_dt)?;
490                let byte_width: u8 = match *width_bits_or_bytes {
491                    2 | 4 | 8 => *width_bits_or_bytes,
492                    16 => 2,
493                    32 => 4,
494                    64 => 8,
495                    other => {
496                        return Err(ArrowError::InvalidArgumentError(format!(
497                            "Unsupported run-end width {other} for RunEndEncoded; \
498                             expected 16/32/64 bits or 2/4/8 bytes"
499                        )));
500                    }
501                };
502                Self::RunEndEncoded(byte_width, 0, Box::new(inner))
503            }
504        };
505        Ok(match data_type.nullability() {
506            Some(nullability) => {
507                // Default to reading a union branch tag unless the resolution proves otherwise.
508                let mut plan = NullablePlan::ReadTag;
509                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
510                    if !info.writer_is_union && info.reader_is_union {
511                        if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
512                            plan = NullablePlan::FromSingle { promotion: *promo };
513                        }
514                    }
515                }
516                Self::Nullable(
517                    nullability,
518                    NullBufferBuilder::new(DEFAULT_CAPACITY),
519                    Box::new(decoder),
520                    plan,
521                )
522            }
523            None => decoder,
524        })
525    }
526
527    /// Append a null record
528    fn append_null(&mut self) -> Result<(), ArrowError> {
529        match self {
530            Self::Null(count) => *count += 1,
531            Self::Boolean(b) => b.append(false),
532            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
533            Self::Int64(v)
534            | Self::Int32ToInt64(v)
535            | Self::TimeMicros(v)
536            | Self::TimestampMillis(_, v)
537            | Self::TimestampMicros(_, v)
538            | Self::TimestampNanos(_, v) => v.push(0),
539            #[cfg(feature = "avro_custom_types")]
540            Self::DurationSecond(v)
541            | Self::DurationMillisecond(v)
542            | Self::DurationMicrosecond(v)
543            | Self::DurationNanosecond(v) => v.push(0),
544            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
545            Self::Float64(v)
546            | Self::Int32ToFloat64(v)
547            | Self::Int64ToFloat64(v)
548            | Self::Float32ToFloat64(v) => v.push(0.),
549            Self::Binary(offsets, _)
550            | Self::String(offsets, _)
551            | Self::StringView(offsets, _)
552            | Self::BytesToString(offsets, _)
553            | Self::StringToBytes(offsets, _) => {
554                offsets.push_length(0);
555            }
556            Self::Uuid(v) => {
557                v.extend([0; 16]);
558            }
559            Self::Array(_, offsets, _) => {
560                offsets.push_length(0);
561            }
562            Self::Record(_, e, _) => {
563                for encoding in e.iter_mut() {
564                    encoding.append_null()?;
565                }
566            }
567            Self::Map(_, _koff, moff, _, _) => {
568                moff.push_length(0);
569            }
570            Self::Fixed(sz, accum) => {
571                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
572            }
573            #[cfg(feature = "small_decimals")]
574            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
575            #[cfg(feature = "small_decimals")]
576            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
577            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
578            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
579            Self::Enum(indices, _, _) => indices.push(0),
580            Self::Duration(builder) => builder.append_null(),
581            #[cfg(feature = "avro_custom_types")]
582            Self::RunEndEncoded(_, len, inner) => {
583                *len += 1;
584                inner.append_null()?;
585            }
586            Self::Union(u) => u.append_null()?,
587            Self::Nullable(_, null_buffer, inner, _) => {
588                null_buffer.append(false);
589                inner.append_null()?;
590            }
591        }
592        Ok(())
593    }
594
595    /// Append a single default literal into the decoder's buffers
596    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
597        match self {
598            Self::Nullable(_, nb, inner, _) => {
599                if matches!(lit, AvroLiteral::Null) {
600                    nb.append(false);
601                    inner.append_null()
602                } else {
603                    nb.append(true);
604                    inner.append_default(lit)
605                }
606            }
607            Self::Null(count) => match lit {
608                AvroLiteral::Null => {
609                    *count += 1;
610                    Ok(())
611                }
612                _ => Err(ArrowError::InvalidArgumentError(
613                    "Non-null default for null type".to_string(),
614                )),
615            },
616            Self::Boolean(b) => match lit {
617                AvroLiteral::Boolean(v) => {
618                    b.append(*v);
619                    Ok(())
620                }
621                _ => Err(ArrowError::InvalidArgumentError(
622                    "Default for boolean must be boolean".to_string(),
623                )),
624            },
625            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
626                AvroLiteral::Int(i) => {
627                    v.push(*i);
628                    Ok(())
629                }
630                _ => Err(ArrowError::InvalidArgumentError(
631                    "Default for int32/date32/time-millis must be int".to_string(),
632                )),
633            },
634            #[cfg(feature = "avro_custom_types")]
635            Self::DurationSecond(v)
636            | Self::DurationMillisecond(v)
637            | Self::DurationMicrosecond(v)
638            | Self::DurationNanosecond(v) => match lit {
639                AvroLiteral::Long(i) => {
640                    v.push(*i);
641                    Ok(())
642                }
643                _ => Err(ArrowError::InvalidArgumentError(
644                    "Default for duration long must be long".to_string(),
645                )),
646            },
647            Self::Int64(v)
648            | Self::Int32ToInt64(v)
649            | Self::TimeMicros(v)
650            | Self::TimestampMillis(_, v)
651            | Self::TimestampMicros(_, v)
652            | Self::TimestampNanos(_, v) => match lit {
653                AvroLiteral::Long(i) => {
654                    v.push(*i);
655                    Ok(())
656                }
657                AvroLiteral::Int(i) => {
658                    v.push(*i as i64);
659                    Ok(())
660                }
661                _ => Err(ArrowError::InvalidArgumentError(
662                    "Default for long/time-micros/timestamp must be long or int".to_string(),
663                )),
664            },
665            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
666                AvroLiteral::Float(f) => {
667                    v.push(*f);
668                    Ok(())
669                }
670                _ => Err(ArrowError::InvalidArgumentError(
671                    "Default for float must be float".to_string(),
672                )),
673            },
674            Self::Float64(v)
675            | Self::Int32ToFloat64(v)
676            | Self::Int64ToFloat64(v)
677            | Self::Float32ToFloat64(v) => match lit {
678                AvroLiteral::Double(f) => {
679                    v.push(*f);
680                    Ok(())
681                }
682                _ => Err(ArrowError::InvalidArgumentError(
683                    "Default for double must be double".to_string(),
684                )),
685            },
686            Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
687                AvroLiteral::Bytes(b) => {
688                    offsets.push_length(b.len());
689                    values.extend_from_slice(b);
690                    Ok(())
691                }
692                _ => Err(ArrowError::InvalidArgumentError(
693                    "Default for bytes must be bytes".to_string(),
694                )),
695            },
696            Self::BytesToString(offsets, values)
697            | Self::String(offsets, values)
698            | Self::StringView(offsets, values) => match lit {
699                AvroLiteral::String(s) => {
700                    let b = s.as_bytes();
701                    offsets.push_length(b.len());
702                    values.extend_from_slice(b);
703                    Ok(())
704                }
705                _ => Err(ArrowError::InvalidArgumentError(
706                    "Default for string must be string".to_string(),
707                )),
708            },
709            Self::Uuid(values) => match lit {
710                AvroLiteral::String(s) => {
711                    let uuid = Uuid::try_parse(s).map_err(|e| {
712                        ArrowError::InvalidArgumentError(format!("Invalid UUID default: {s} ({e})"))
713                    })?;
714                    values.extend_from_slice(uuid.as_bytes());
715                    Ok(())
716                }
717                _ => Err(ArrowError::InvalidArgumentError(
718                    "Default for uuid must be string".to_string(),
719                )),
720            },
721            Self::Fixed(sz, accum) => match lit {
722                AvroLiteral::Bytes(b) => {
723                    if b.len() != *sz as usize {
724                        return Err(ArrowError::InvalidArgumentError(format!(
725                            "Fixed default length {} does not match size {sz}",
726                            b.len(),
727                        )));
728                    }
729                    accum.extend_from_slice(b);
730                    Ok(())
731                }
732                _ => Err(ArrowError::InvalidArgumentError(
733                    "Default for fixed must be bytes".to_string(),
734                )),
735            },
736            #[cfg(feature = "small_decimals")]
737            Self::Decimal32(_, _, _, builder) => {
738                append_decimal_default!(lit, builder, 4, i32, "decimal32")
739            }
740            #[cfg(feature = "small_decimals")]
741            Self::Decimal64(_, _, _, builder) => {
742                append_decimal_default!(lit, builder, 8, i64, "decimal64")
743            }
744            Self::Decimal128(_, _, _, builder) => {
745                append_decimal_default!(lit, builder, 16, i128, "decimal128")
746            }
747            Self::Decimal256(_, _, _, builder) => {
748                append_decimal_default!(lit, builder, 32, i256, "decimal256")
749            }
750            Self::Duration(builder) => match lit {
751                AvroLiteral::Bytes(b) => {
752                    if b.len() != 12 {
753                        return Err(ArrowError::InvalidArgumentError(format!(
754                            "Duration default must be exactly 12 bytes, got {}",
755                            b.len()
756                        )));
757                    }
758                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
759                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
760                    let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
761                    let nanos = (millis as i64) * 1_000_000;
762                    builder.append_value(IntervalMonthDayNano::new(
763                        months as i32,
764                        days as i32,
765                        nanos,
766                    ));
767                    Ok(())
768                }
769                _ => Err(ArrowError::InvalidArgumentError(
770                    "Default for duration must be 12-byte little-endian months/days/millis"
771                        .to_string(),
772                )),
773            },
774            Self::Array(_, offsets, inner) => match lit {
775                AvroLiteral::Array(items) => {
776                    offsets.push_length(items.len());
777                    for item in items {
778                        inner.append_default(item)?;
779                    }
780                    Ok(())
781                }
782                _ => Err(ArrowError::InvalidArgumentError(
783                    "Default for array must be an array literal".to_string(),
784                )),
785            },
786            Self::Map(_, koff, moff, kdata, valdec) => match lit {
787                AvroLiteral::Map(entries) => {
788                    moff.push_length(entries.len());
789                    for (k, v) in entries {
790                        let kb = k.as_bytes();
791                        koff.push_length(kb.len());
792                        kdata.extend_from_slice(kb);
793                        valdec.append_default(v)?;
794                    }
795                    Ok(())
796                }
797                _ => Err(ArrowError::InvalidArgumentError(
798                    "Default for map must be a map/object literal".to_string(),
799                )),
800            },
801            Self::Enum(indices, symbols, _) => match lit {
802                AvroLiteral::Enum(sym) => {
803                    let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
804                        ArrowError::InvalidArgumentError(format!(
805                            "Enum default symbol {sym:?} not in reader symbols"
806                        ))
807                    })?;
808                    indices.push(pos as i32);
809                    Ok(())
810                }
811                _ => Err(ArrowError::InvalidArgumentError(
812                    "Default for enum must be a symbol".to_string(),
813                )),
814            },
815            #[cfg(feature = "avro_custom_types")]
816            Self::RunEndEncoded(_, len, inner) => {
817                *len += 1;
818                inner.append_default(lit)
819            }
820            Self::Union(u) => u.append_default(lit),
821            Self::Record(field_meta, decoders, projector) => match lit {
822                AvroLiteral::Map(entries) => {
823                    for (i, dec) in decoders.iter_mut().enumerate() {
824                        let name = field_meta[i].name();
825                        if let Some(sub) = entries.get(name) {
826                            dec.append_default(sub)?;
827                        } else if let Some(proj) = projector.as_ref() {
828                            proj.project_default(dec, i)?;
829                        } else {
830                            dec.append_null()?;
831                        }
832                    }
833                    Ok(())
834                }
835                AvroLiteral::Null => {
836                    for (i, dec) in decoders.iter_mut().enumerate() {
837                        if let Some(proj) = projector.as_ref() {
838                            proj.project_default(dec, i)?;
839                        } else {
840                            dec.append_null()?;
841                        }
842                    }
843                    Ok(())
844                }
845                _ => Err(ArrowError::InvalidArgumentError(
846                    "Default for record must be a map/object or null".to_string(),
847                )),
848            },
849        }
850    }
851
852    /// Decode a single record from `buf`
853    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
854        match self {
855            Self::Null(x) => *x += 1,
856            Self::Boolean(values) => values.append(buf.get_bool()?),
857            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
858                values.push(buf.get_int()?)
859            }
860            Self::Int64(values)
861            | Self::TimeMicros(values)
862            | Self::TimestampMillis(_, values)
863            | Self::TimestampMicros(_, values)
864            | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
865            #[cfg(feature = "avro_custom_types")]
866            Self::DurationSecond(values)
867            | Self::DurationMillisecond(values)
868            | Self::DurationMicrosecond(values)
869            | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
870            Self::Float32(values) => values.push(buf.get_float()?),
871            Self::Float64(values) => values.push(buf.get_double()?),
872            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
873            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
874            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
875            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
876            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
877            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
878            Self::StringToBytes(offsets, values)
879            | Self::BytesToString(offsets, values)
880            | Self::Binary(offsets, values)
881            | Self::String(offsets, values)
882            | Self::StringView(offsets, values) => {
883                let data = buf.get_bytes()?;
884                offsets.push_length(data.len());
885                values.extend_from_slice(data);
886            }
887            Self::Uuid(values) => {
888                let s_bytes = buf.get_bytes()?;
889                let s = std::str::from_utf8(s_bytes).map_err(|e| {
890                    ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
891                })?;
892                let uuid = Uuid::try_parse(s)
893                    .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?;
894                values.extend_from_slice(uuid.as_bytes());
895            }
896            Self::Array(_, off, encoding) => {
897                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
898                off.push_length(total_items);
899            }
900            Self::Record(_, encodings, None) => {
901                for encoding in encodings {
902                    encoding.decode(buf)?;
903                }
904            }
905            Self::Record(_, encodings, Some(proj)) => {
906                proj.project_record(buf, encodings)?;
907            }
908            Self::Map(_, koff, moff, kdata, valdec) => {
909                let newly_added = read_blocks(buf, |cur| {
910                    let kb = cur.get_bytes()?;
911                    koff.push_length(kb.len());
912                    kdata.extend_from_slice(kb);
913                    valdec.decode(cur)
914                })?;
915                moff.push_length(newly_added);
916            }
917            Self::Fixed(sz, accum) => {
918                let fx = buf.get_fixed(*sz as usize)?;
919                accum.extend_from_slice(fx);
920            }
921            #[cfg(feature = "small_decimals")]
922            Self::Decimal32(_, _, size, builder) => {
923                decode_decimal!(size, buf, builder, 4, i32);
924            }
925            #[cfg(feature = "small_decimals")]
926            Self::Decimal64(_, _, size, builder) => {
927                decode_decimal!(size, buf, builder, 8, i64);
928            }
929            Self::Decimal128(_, _, size, builder) => {
930                decode_decimal!(size, buf, builder, 16, i128);
931            }
932            Self::Decimal256(_, _, size, builder) => {
933                decode_decimal!(size, buf, builder, 32, i256);
934            }
935            Self::Enum(indices, _, None) => {
936                indices.push(buf.get_int()?);
937            }
938            Self::Enum(indices, _, Some(res)) => {
939                let raw = buf.get_int()?;
940                let resolved = usize::try_from(raw)
941                    .ok()
942                    .and_then(|idx| res.mapping.get(idx).copied())
943                    .filter(|&idx| idx >= 0)
944                    .unwrap_or(res.default_index);
945                if resolved >= 0 {
946                    indices.push(resolved);
947                } else {
948                    return Err(ArrowError::ParseError(format!(
949                        "Enum symbol index {raw} not resolvable and no default provided",
950                    )));
951                }
952            }
953            Self::Duration(builder) => {
954                let b = buf.get_fixed(12)?;
955                let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
956                let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
957                let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
958                let nanos = (millis as i64) * 1_000_000;
959                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
960            }
961            #[cfg(feature = "avro_custom_types")]
962            Self::RunEndEncoded(_, len, inner) => {
963                *len += 1;
964                inner.decode(buf)?;
965            }
966            Self::Union(u) => u.decode(buf)?,
967            Self::Nullable(order, nb, encoding, plan) => {
968                match *plan {
969                    NullablePlan::FromSingle { promotion } => {
970                        encoding.decode_with_promotion(buf, promotion)?;
971                        nb.append(true);
972                    }
973                    NullablePlan::ReadTag => {
974                        let branch = buf.read_vlq()?;
975                        let is_not_null = match *order {
976                            Nullability::NullFirst => branch != 0,
977                            Nullability::NullSecond => branch == 0,
978                        };
979                        if is_not_null {
980                            // It is important to decode before appending to null buffer in case of decode error
981                            encoding.decode(buf)?;
982                        } else {
983                            encoding.append_null()?;
984                        }
985                        nb.append(is_not_null);
986                    }
987                }
988            }
989        }
990        Ok(())
991    }
992
993    fn decode_with_promotion(
994        &mut self,
995        buf: &mut AvroCursor<'_>,
996        promotion: Promotion,
997    ) -> Result<(), ArrowError> {
998        #[cfg(feature = "avro_custom_types")]
999        if let Self::RunEndEncoded(_, len, inner) = self {
1000            *len += 1;
1001            return inner.decode_with_promotion(buf, promotion);
1002        }
1003
1004        macro_rules! promote_numeric_to {
1005            ($variant:ident, $getter:ident, $to:ty) => {{
1006                match self {
1007                    Self::$variant(v) => {
1008                        let x = buf.$getter()?;
1009                        v.push(x as $to);
1010                        Ok(())
1011                    }
1012                    other => Err(ArrowError::ParseError(format!(
1013                        "Promotion {promotion} target mismatch: expected {}, got {}",
1014                        stringify!($variant),
1015                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
1016                    ))),
1017                }
1018            }};
1019        }
1020        match promotion {
1021            Promotion::Direct => self.decode(buf),
1022            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1023            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1024            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1025            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1026            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1027            Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1028            Promotion::StringToBytes => match self {
1029                Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1030                    let data = buf.get_bytes()?;
1031                    offsets.push_length(data.len());
1032                    values.extend_from_slice(data);
1033                    Ok(())
1034                }
1035                other => Err(ArrowError::ParseError(format!(
1036                    "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1037                    <Self as AsRef<str>>::as_ref(other)
1038                ))),
1039            },
1040            Promotion::BytesToString => match self {
1041                Self::String(offsets, values)
1042                | Self::StringView(offsets, values)
1043                | Self::BytesToString(offsets, values) => {
1044                    let data = buf.get_bytes()?;
1045                    offsets.push_length(data.len());
1046                    values.extend_from_slice(data);
1047                    Ok(())
1048                }
1049                other => Err(ArrowError::ParseError(format!(
1050                    "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1051                    <Self as AsRef<str>>::as_ref(other)
1052                ))),
1053            },
1054        }
1055    }
1056
1057    /// Flush decoded records to an [`ArrayRef`]
1058    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1059        Ok(match self {
1060            Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1061            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1062            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1063            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1064            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1065            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1066            Self::TimeMillis(values) => {
1067                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1068            }
1069            Self::TimeMicros(values) => {
1070                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1071            }
1072            Self::TimestampMillis(is_utc, values) => Arc::new(
1073                flush_primitive::<TimestampMillisecondType>(values, nulls)
1074                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1075            ),
1076            Self::TimestampMicros(is_utc, values) => Arc::new(
1077                flush_primitive::<TimestampMicrosecondType>(values, nulls)
1078                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1079            ),
1080            Self::TimestampNanos(is_utc, values) => Arc::new(
1081                flush_primitive::<TimestampNanosecondType>(values, nulls)
1082                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1083            ),
1084            #[cfg(feature = "avro_custom_types")]
1085            Self::DurationSecond(values) => {
1086                Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1087            }
1088            #[cfg(feature = "avro_custom_types")]
1089            Self::DurationMillisecond(values) => {
1090                Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1091            }
1092            #[cfg(feature = "avro_custom_types")]
1093            Self::DurationMicrosecond(values) => {
1094                Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1095            }
1096            #[cfg(feature = "avro_custom_types")]
1097            Self::DurationNanosecond(values) => {
1098                Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1099            }
1100            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1101            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1102            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1103            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1104                Arc::new(flush_primitive::<Float32Type>(values, nulls))
1105            }
1106            Self::Int32ToFloat64(values)
1107            | Self::Int64ToFloat64(values)
1108            | Self::Float32ToFloat64(values) => {
1109                Arc::new(flush_primitive::<Float64Type>(values, nulls))
1110            }
1111            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1112                let offsets = flush_offsets(offsets);
1113                let values = flush_values(values).into();
1114                Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1115            }
1116            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1117                let offsets = flush_offsets(offsets);
1118                let values = flush_values(values).into();
1119                Arc::new(StringArray::try_new(offsets, values, nulls)?)
1120            }
1121            Self::StringView(offsets, values) => {
1122                let offsets = flush_offsets(offsets);
1123                let values = flush_values(values);
1124                let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1125                let values: Vec<&str> = (0..array.len())
1126                    .map(|i| {
1127                        if array.is_valid(i) {
1128                            array.value(i)
1129                        } else {
1130                            ""
1131                        }
1132                    })
1133                    .collect();
1134                Arc::new(StringViewArray::from(values))
1135            }
1136            Self::Array(field, offsets, values) => {
1137                let values = values.flush(None)?;
1138                let offsets = flush_offsets(offsets);
1139                Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1140            }
1141            Self::Record(fields, encodings, _) => {
1142                let arrays = encodings
1143                    .iter_mut()
1144                    .map(|x| x.flush(None))
1145                    .collect::<Result<Vec<_>, _>>()?;
1146                Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1147            }
1148            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1149                let moff = flush_offsets(m_off);
1150                let koff = flush_offsets(k_off);
1151                let kd = flush_values(kdata).into();
1152                let val_arr = valdec.flush(None)?;
1153                let key_arr = StringArray::try_new(koff, kd, None)?;
1154                if key_arr.len() != val_arr.len() {
1155                    return Err(ArrowError::InvalidArgumentError(format!(
1156                        "Map keys length ({}) != map values length ({})",
1157                        key_arr.len(),
1158                        val_arr.len()
1159                    )));
1160                }
1161                let final_len = moff.len() - 1;
1162                if let Some(n) = &nulls {
1163                    if n.len() != final_len {
1164                        return Err(ArrowError::InvalidArgumentError(format!(
1165                            "Map array null buffer length {} != final map length {final_len}",
1166                            n.len()
1167                        )));
1168                    }
1169                }
1170                let entries_fields = match map_field.data_type() {
1171                    DataType::Struct(fields) => fields.clone(),
1172                    other => {
1173                        return Err(ArrowError::InvalidArgumentError(format!(
1174                            "Map entries field must be a Struct, got {other:?}"
1175                        )));
1176                    }
1177                };
1178                let entries_struct =
1179                    StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1180                let map_arr =
1181                    MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1182                Arc::new(map_arr)
1183            }
1184            Self::Fixed(sz, accum) => {
1185                let b: Buffer = flush_values(accum).into();
1186                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1187                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1188                Arc::new(arr)
1189            }
1190            Self::Uuid(values) => {
1191                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1192                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1193                Arc::new(arr)
1194            }
1195            #[cfg(feature = "small_decimals")]
1196            Self::Decimal32(precision, scale, _, builder) => {
1197                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1198            }
1199            #[cfg(feature = "small_decimals")]
1200            Self::Decimal64(precision, scale, _, builder) => {
1201                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1202            }
1203            Self::Decimal128(precision, scale, _, builder) => {
1204                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1205            }
1206            Self::Decimal256(precision, scale, _, builder) => {
1207                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1208            }
1209            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1210            Self::Duration(builder) => {
1211                let (_, vals, _) = builder.finish().into_parts();
1212                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1213                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1214                Arc::new(vals)
1215            }
1216            #[cfg(feature = "avro_custom_types")]
1217            Self::RunEndEncoded(width, len, inner) => {
1218                let values = inner.flush(nulls)?;
1219                let n = *len;
1220                let arr = values.as_ref();
1221                let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1222                if n > 0 {
1223                    run_starts.push(0);
1224                    for i in 1..n {
1225                        if !values_equal_at(arr, i - 1, i) {
1226                            run_starts.push(i);
1227                        }
1228                    }
1229                }
1230                if n > (u32::MAX as usize) {
1231                    return Err(ArrowError::InvalidArgumentError(format!(
1232                        "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1233                    )));
1234                }
1235                let run_count = run_starts.len();
1236                let take_idx: PrimitiveArray<UInt32Type> =
1237                    run_starts.iter().map(|&s| s as u32).collect();
1238                let per_run_values = if run_count == 0 {
1239                    values.slice(0, 0)
1240                } else {
1241                    take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1242                        ArrowError::ParseError(format!("take() for REE values failed: {e}"))
1243                    })?
1244                };
1245
1246                macro_rules! build_run_array {
1247                    ($Native:ty, $ArrowTy:ty) => {{
1248                        let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1249                        for (idx, &_start) in run_starts.iter().enumerate() {
1250                            let end = if idx + 1 < run_count {
1251                                run_starts[idx + 1]
1252                            } else {
1253                                n
1254                            };
1255                            ends.push(end as $Native);
1256                        }
1257                        let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1258                        let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1259                            .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1260                        Arc::new(run_arr) as ArrayRef
1261                    }};
1262                }
1263                match *width {
1264                    2 => {
1265                        if n > i16::MAX as usize {
1266                            return Err(ArrowError::InvalidArgumentError(format!(
1267                                "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1268                            )));
1269                        }
1270                        build_run_array!(i16, Int16Type)
1271                    }
1272                    4 => build_run_array!(i32, Int32Type),
1273                    8 => build_run_array!(i64, Int64Type),
1274                    other => {
1275                        return Err(ArrowError::InvalidArgumentError(format!(
1276                            "Unsupported run-end width {other} for RunEndEncoded"
1277                        )));
1278                    }
1279                }
1280            }
1281            Self::Union(u) => u.flush(nulls)?,
1282        })
1283    }
1284}
1285
1286// A lookup table for resolving fields between writer and reader schemas during record projection.
1287#[derive(Debug)]
1288struct DispatchLookupTable {
1289    // Maps each reader field index `r` to the corresponding writer field index.
1290    //
1291    // Semantics:
1292    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. The value from
1293    //   the writer field is decoded, and `promotion[r]` is applied.
1294    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. The reader field's
1295    //   default value is used.
1296    //
1297    // Representation (`i8`):
1298    // `i8` is used for a dense, cache-friendly dispatch table, consistent with Arrow's use of
1299    // `i8` for union type IDs. This requires that writer field indices do not exceed `i8::MAX`.
1300    //
1301    // Invariants:
1302    // - `to_reader.len() == promotion.len()` and matches the reader field count.
1303    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
1304    to_reader: Box<[i8]>,
1305    // For each reader field `r`, specifies the `Promotion` to apply to the writer's value.
1306    //
1307    // This is used when a writer field's type can be promoted to a reader field's type
1308    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
1309    promotion: Box<[Promotion]>,
1310}
1311
1312// Sentinel used in `DispatchLookupTable::to_reader` to mark
1313// "no matching writer field".
1314const NO_SOURCE: i8 = -1;
1315
1316impl DispatchLookupTable {
1317    fn from_writer_to_reader(
1318        promotion_map: &[Option<(usize, Promotion)>],
1319    ) -> Result<Self, ArrowError> {
1320        let mut to_reader = Vec::with_capacity(promotion_map.len());
1321        let mut promotion = Vec::with_capacity(promotion_map.len());
1322        for map in promotion_map {
1323            match *map {
1324                Some((idx, promo)) => {
1325                    let idx_i8 = i8::try_from(idx).map_err(|_| {
1326                        ArrowError::SchemaError(format!(
1327                            "Reader branch index {idx} exceeds i8 range (max {})",
1328                            i8::MAX
1329                        ))
1330                    })?;
1331                    to_reader.push(idx_i8);
1332                    promotion.push(promo);
1333                }
1334                None => {
1335                    to_reader.push(NO_SOURCE);
1336                    promotion.push(Promotion::Direct);
1337                }
1338            }
1339        }
1340        Ok(Self {
1341            to_reader: to_reader.into_boxed_slice(),
1342            promotion: promotion.into_boxed_slice(),
1343        })
1344    }
1345
1346    // Resolve a writer branch index to (reader_idx, promotion)
1347    #[inline]
1348    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1349        let reader_index = *self.to_reader.get(writer_index)?;
1350        (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1351    }
1352}
1353
1354#[derive(Debug)]
1355struct UnionDecoder {
1356    fields: UnionFields,
1357    type_ids: Vec<i8>,
1358    offsets: Vec<i32>,
1359    branches: Vec<Decoder>,
1360    counts: Vec<i32>,
1361    reader_type_codes: Vec<i8>,
1362    default_emit_idx: usize,
1363    null_emit_idx: usize,
1364    plan: UnionReadPlan,
1365}
1366
1367impl Default for UnionDecoder {
1368    fn default() -> Self {
1369        Self {
1370            fields: UnionFields::empty(),
1371            type_ids: Vec::new(),
1372            offsets: Vec::new(),
1373            branches: Vec::new(),
1374            counts: Vec::new(),
1375            reader_type_codes: Vec::new(),
1376            default_emit_idx: 0,
1377            null_emit_idx: 0,
1378            plan: UnionReadPlan::Passthrough,
1379        }
1380    }
1381}
1382
1383#[derive(Debug)]
1384enum UnionReadPlan {
1385    ReaderUnion {
1386        lookup_table: DispatchLookupTable,
1387    },
1388    FromSingle {
1389        reader_idx: usize,
1390        promotion: Promotion,
1391    },
1392    ToSingle {
1393        target: Box<Decoder>,
1394        lookup_table: DispatchLookupTable,
1395    },
1396    Passthrough,
1397}
1398
1399impl UnionDecoder {
1400    fn try_new(
1401        fields: UnionFields,
1402        branches: Vec<Decoder>,
1403        resolved: Option<ResolvedUnion>,
1404    ) -> Result<Self, ArrowError> {
1405        let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1406        let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1407        let default_emit_idx = 0;
1408        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1409        let branch_len = branches.len().max(reader_type_codes.len());
1410        // Guard against impractically large unions that cannot be indexed by an Avro int
1411        let max_addr = (i32::MAX as usize) + 1;
1412        if branches.len() > max_addr {
1413            return Err(ArrowError::SchemaError(format!(
1414                "Reader union has {} branches, which exceeds the maximum addressable \
1415                 branches by an Avro int tag ({} + 1).",
1416                branches.len(),
1417                i32::MAX
1418            )));
1419        }
1420        Ok(Self {
1421            fields,
1422            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1423            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1424            branches,
1425            counts: vec![0; branch_len],
1426            reader_type_codes,
1427            default_emit_idx,
1428            null_emit_idx,
1429            plan: Self::plan_from_resolved(resolved)?,
1430        })
1431    }
1432
1433    fn try_new_from_writer_union(
1434        info: ResolvedUnion,
1435        target: Box<Decoder>,
1436    ) -> Result<Self, ArrowError> {
1437        // This constructor is only for writer-union to single-type resolution
1438        debug_assert!(info.writer_is_union && !info.reader_is_union);
1439        let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1440        Ok(Self {
1441            plan: UnionReadPlan::ToSingle {
1442                target,
1443                lookup_table,
1444            },
1445            ..Self::default()
1446        })
1447    }
1448
1449    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, ArrowError> {
1450        let Some(info) = resolved else {
1451            return Ok(UnionReadPlan::Passthrough);
1452        };
1453        match (info.writer_is_union, info.reader_is_union) {
1454            (true, true) => {
1455                let lookup_table =
1456                    DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1457                Ok(UnionReadPlan::ReaderUnion { lookup_table })
1458            }
1459            (false, true) => {
1460                let Some(&(reader_idx, promotion)) =
1461                    info.writer_to_reader.first().and_then(Option::as_ref)
1462                else {
1463                    return Err(ArrowError::SchemaError(
1464                        "Writer type does not match any reader union branch".to_string(),
1465                    ));
1466                };
1467                Ok(UnionReadPlan::FromSingle {
1468                    reader_idx,
1469                    promotion,
1470                })
1471            }
1472            (true, false) => Err(ArrowError::InvalidArgumentError(
1473                "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1474                    .to_string(),
1475            )),
1476            // (false, false) is invalid and should never be constructed by the resolver.
1477            _ => Err(ArrowError::SchemaError(
1478                "ResolvedUnion constructed for non-union sides; resolver should return None"
1479                    .to_string(),
1480            )),
1481        }
1482    }
1483
1484    #[inline]
1485    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
1486        // Avro unions are encoded by first writing the zero-based branch index.
1487        // In Avro 1.11.1 this is specified as an *int*; older specs said *long*,
1488        // but both use zig-zag varint encoding, so decoding as long is compatible
1489        // with either form and widely used in practice.
1490        let raw = buf.get_long()?;
1491        if raw < 0 {
1492            return Err(ArrowError::ParseError(format!(
1493                "Negative union branch index {raw}"
1494            )));
1495        }
1496        usize::try_from(raw).map_err(|_| {
1497            ArrowError::ParseError(format!(
1498                "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1499                (usize::BITS as usize)
1500            ))
1501        })
1502    }
1503
1504    #[inline]
1505    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, ArrowError> {
1506        let branches_len = self.branches.len();
1507        let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1508            return Err(ArrowError::ParseError(format!(
1509                "Union branch index {reader_idx} out of range ({branches_len} branches)"
1510            )));
1511        };
1512        self.type_ids.push(self.reader_type_codes[reader_idx]);
1513        self.offsets.push(self.counts[reader_idx]);
1514        self.counts[reader_idx] += 1;
1515        Ok(reader_branch)
1516    }
1517
1518    #[inline]
1519    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), ArrowError>
1520    where
1521        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
1522    {
1523        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1524            return action(target);
1525        }
1526        let reader_idx = match &self.plan {
1527            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1528            _ => fallback_idx,
1529        };
1530        self.emit_to(reader_idx).and_then(action)
1531    }
1532
1533    fn append_null(&mut self) -> Result<(), ArrowError> {
1534        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1535    }
1536
1537    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
1538        self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1539    }
1540
1541    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1542        let (reader_idx, promotion) = match &mut self.plan {
1543            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1544            UnionReadPlan::ReaderUnion { lookup_table } => {
1545                let idx = Self::read_tag(buf)?;
1546                lookup_table.resolve(idx).ok_or_else(|| {
1547                    ArrowError::ParseError(format!(
1548                        "Union branch index {idx} not resolvable by reader schema"
1549                    ))
1550                })?
1551            }
1552            UnionReadPlan::FromSingle {
1553                reader_idx,
1554                promotion,
1555            } => (*reader_idx, *promotion),
1556            UnionReadPlan::ToSingle {
1557                target,
1558                lookup_table,
1559            } => {
1560                let idx = Self::read_tag(buf)?;
1561                return match lookup_table.resolve(idx) {
1562                    Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1563                    None => Err(ArrowError::ParseError(format!(
1564                        "Writer union branch {idx} does not resolve to reader type"
1565                    ))),
1566                };
1567            }
1568        };
1569        let decoder = self.emit_to(reader_idx)?;
1570        decoder.decode_with_promotion(buf, promotion)
1571    }
1572
1573    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1574        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1575            return target.flush(nulls);
1576        }
1577        debug_assert!(
1578            nulls.is_none(),
1579            "UnionArray does not accept a validity bitmap; \
1580                     nulls should have been materialized as a Null child during decode"
1581        );
1582        let children = self
1583            .branches
1584            .iter_mut()
1585            .map(|d| d.flush(None))
1586            .collect::<Result<Vec<_>, _>>()?;
1587        let arr = UnionArray::try_new(
1588            self.fields.clone(),
1589            flush_values(&mut self.type_ids).into_iter().collect(),
1590            Some(flush_values(&mut self.offsets).into_iter().collect()),
1591            children,
1592        )
1593        .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1594        Ok(Arc::new(arr))
1595    }
1596}
1597
1598#[derive(Debug, Default)]
1599struct UnionDecoderBuilder {
1600    fields: Option<UnionFields>,
1601    branches: Option<Vec<Decoder>>,
1602    resolved: Option<ResolvedUnion>,
1603    target: Option<Box<Decoder>>,
1604}
1605
1606impl UnionDecoderBuilder {
1607    fn new() -> Self {
1608        Self::default()
1609    }
1610
1611    fn with_fields(mut self, fields: UnionFields) -> Self {
1612        self.fields = Some(fields);
1613        self
1614    }
1615
1616    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
1617        self.branches = Some(branches);
1618        self
1619    }
1620
1621    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
1622        self.resolved = Some(resolved_union);
1623        self
1624    }
1625
1626    fn with_target(mut self, target: Box<Decoder>) -> Self {
1627        self.target = Some(target);
1628        self
1629    }
1630
1631    fn build(self) -> Result<UnionDecoder, ArrowError> {
1632        match (self.resolved, self.fields, self.branches, self.target) {
1633            (resolved, Some(fields), Some(branches), None) => {
1634                UnionDecoder::try_new(fields, branches, resolved)
1635            }
1636            (Some(info), None, None, Some(target))
1637                if info.writer_is_union && !info.reader_is_union =>
1638            {
1639                UnionDecoder::try_new_from_writer_union(info, target)
1640            }
1641            _ => Err(ArrowError::InvalidArgumentError(
1642                "Invalid UnionDecoderBuilder configuration: expected either \
1643                 (fields + branches + resolved) with no target for reader-unions, or \
1644                 (resolved + target) with no fields/branches for writer-union to single."
1645                    .to_string(),
1646            )),
1647        }
1648    }
1649}
1650
1651#[derive(Debug, Copy, Clone)]
1652enum NegativeBlockBehavior {
1653    ProcessItems,
1654    SkipBySize,
1655}
1656
1657#[inline]
1658fn skip_blocks(
1659    buf: &mut AvroCursor,
1660    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1661) -> Result<usize, ArrowError> {
1662    process_blockwise(
1663        buf,
1664        move |c| skip_item(c),
1665        NegativeBlockBehavior::SkipBySize,
1666    )
1667}
1668
1669#[inline]
1670fn flush_dict(
1671    indices: &mut Vec<i32>,
1672    symbols: &[String],
1673    nulls: Option<NullBuffer>,
1674) -> Result<ArrayRef, ArrowError> {
1675    let keys = flush_primitive::<Int32Type>(indices, nulls);
1676    let values = Arc::new(StringArray::from_iter_values(
1677        symbols.iter().map(|s| s.as_str()),
1678    ));
1679    DictionaryArray::try_new(keys, values)
1680        .map_err(|e| ArrowError::ParseError(e.to_string()))
1681        .map(|arr| Arc::new(arr) as ArrayRef)
1682}
1683
1684#[inline]
1685fn read_blocks(
1686    buf: &mut AvroCursor,
1687    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1688) -> Result<usize, ArrowError> {
1689    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
1690}
1691
1692#[inline]
1693fn process_blockwise(
1694    buf: &mut AvroCursor,
1695    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1696    negative_behavior: NegativeBlockBehavior,
1697) -> Result<usize, ArrowError> {
1698    let mut total = 0usize;
1699    loop {
1700        // Read the block count
1701        //  positive = that many items
1702        //  negative = that many items + read block size
1703        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
1704        let block_count = buf.get_long()?;
1705        match block_count.cmp(&0) {
1706            Ordering::Equal => break,
1707            Ordering::Less => {
1708                let count = (-block_count) as usize;
1709                // A negative count is followed by a long of the size in bytes
1710                let size_in_bytes = buf.get_long()? as usize;
1711                match negative_behavior {
1712                    NegativeBlockBehavior::ProcessItems => {
1713                        // Process items one-by-one after reading size
1714                        for _ in 0..count {
1715                            on_item(buf)?;
1716                        }
1717                    }
1718                    NegativeBlockBehavior::SkipBySize => {
1719                        // Skip the entire block payload at once
1720                        let _ = buf.get_fixed(size_in_bytes)?;
1721                    }
1722                }
1723                total += count;
1724            }
1725            Ordering::Greater => {
1726                let count = block_count as usize;
1727                for _ in 0..count {
1728                    on_item(buf)?;
1729                }
1730                total += count;
1731            }
1732        }
1733    }
1734    Ok(total)
1735}
1736
1737#[inline]
1738fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
1739    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
1740}
1741
1742#[inline]
1743fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
1744    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
1745}
1746
1747#[inline]
1748fn flush_primitive<T: ArrowPrimitiveType>(
1749    values: &mut Vec<T::Native>,
1750    nulls: Option<NullBuffer>,
1751) -> PrimitiveArray<T> {
1752    PrimitiveArray::new(flush_values(values).into(), nulls)
1753}
1754
1755#[inline]
1756fn read_decimal_bytes_be<const N: usize>(
1757    buf: &mut AvroCursor<'_>,
1758    size: &Option<usize>,
1759) -> Result<[u8; N], ArrowError> {
1760    match size {
1761        Some(n) if *n == N => {
1762            let raw = buf.get_fixed(N)?;
1763            let mut arr = [0u8; N];
1764            arr.copy_from_slice(raw);
1765            Ok(arr)
1766        }
1767        Some(n) => {
1768            let raw = buf.get_fixed(*n)?;
1769            sign_cast_to::<N>(raw)
1770        }
1771        None => {
1772            let raw = buf.get_bytes()?;
1773            sign_cast_to::<N>(raw)
1774        }
1775    }
1776}
1777
1778/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
1779/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
1780/// the payload is a big-endian two's-complement integer, and when narrowing it must
1781/// be representable without changing sign or value.
1782///
1783/// If `raw.len() < N`, the value is sign-extended.
1784/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
1785/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
1786#[inline]
1787fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
1788    let len = raw.len();
1789    // Fast path: exact width, just copy
1790    if len == N {
1791        let mut out = [0u8; N];
1792        out.copy_from_slice(raw);
1793        return Ok(out);
1794    }
1795    // Determine sign byte from MSB of first byte (empty => positive)
1796    let first = raw.first().copied().unwrap_or(0u8);
1797    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
1798    // Pre-fill with sign byte to support sign extension
1799    let mut out = [sign_byte; N];
1800    if len > N {
1801        // Validate truncation: all dropped leading bytes must equal sign_byte,
1802        // and the MSB of the first kept byte must match the sign.
1803        let extra = len - N;
1804        // Any non-sign byte in the truncated prefix indicates overflow
1805        if raw[..extra].iter().any(|&b| b != sign_byte) {
1806            return Err(ArrowError::ParseError(format!(
1807                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1808                len, N
1809            )));
1810        }
1811        if N > 0 {
1812            let first_kept = raw[extra];
1813            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
1814            if sign_bit_mismatch {
1815                return Err(ArrowError::ParseError(format!(
1816                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1817                    len, N
1818                )));
1819            }
1820        }
1821        out.copy_from_slice(&raw[extra..]);
1822        return Ok(out);
1823    }
1824    out[N - len..].copy_from_slice(raw);
1825    Ok(out)
1826}
1827
1828#[cfg(feature = "avro_custom_types")]
1829#[inline]
1830fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
1831    match (arr.is_null(i), arr.is_null(j)) {
1832        (true, true) => true,
1833        (true, false) | (false, true) => false,
1834        (false, false) => {
1835            let a = arr.slice(i, 1);
1836            let b = arr.slice(j, 1);
1837            a == b
1838        }
1839    }
1840}
1841
1842#[derive(Debug)]
1843struct Projector {
1844    writer_to_reader: Arc<[Option<usize>]>,
1845    skip_decoders: Vec<Option<Skipper>>,
1846    field_defaults: Vec<Option<AvroLiteral>>,
1847    default_injections: Arc<[(usize, AvroLiteral)]>,
1848}
1849
1850#[derive(Debug)]
1851struct ProjectorBuilder<'a> {
1852    rec: &'a ResolvedRecord,
1853    reader_fields: Arc<[AvroField]>,
1854}
1855
1856impl<'a> ProjectorBuilder<'a> {
1857    #[inline]
1858    fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
1859        Self {
1860            rec,
1861            reader_fields: reader_fields.clone(),
1862        }
1863    }
1864
1865    #[inline]
1866    fn build(self) -> Result<Projector, ArrowError> {
1867        let reader_fields = self.reader_fields;
1868        let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
1869        for avro_field in reader_fields.as_ref() {
1870            if let Some(ResolutionInfo::DefaultValue(lit)) =
1871                avro_field.data_type().resolution.as_ref()
1872            {
1873                field_defaults.push(Some(lit.clone()));
1874            } else {
1875                field_defaults.push(None);
1876            }
1877        }
1878        let mut default_injections: Vec<(usize, AvroLiteral)> =
1879            Vec::with_capacity(self.rec.default_fields.len());
1880        for &idx in self.rec.default_fields.as_ref() {
1881            let lit = field_defaults
1882                .get(idx)
1883                .and_then(|lit| lit.clone())
1884                .unwrap_or(AvroLiteral::Null);
1885            default_injections.push((idx, lit));
1886        }
1887        let mut skip_decoders: Vec<Option<Skipper>> =
1888            Vec::with_capacity(self.rec.skip_fields.len());
1889        for datatype in self.rec.skip_fields.as_ref() {
1890            let skipper = match datatype {
1891                Some(datatype) => Some(Skipper::from_avro(datatype)?),
1892                None => None,
1893            };
1894            skip_decoders.push(skipper);
1895        }
1896        Ok(Projector {
1897            writer_to_reader: self.rec.writer_to_reader.clone(),
1898            skip_decoders,
1899            field_defaults,
1900            default_injections: default_injections.into(),
1901        })
1902    }
1903}
1904
1905impl Projector {
1906    #[inline]
1907    fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), ArrowError> {
1908        // SAFETY: `index` is obtained by listing the reader's record fields (i.e., from
1909        // `decoders.iter_mut().enumerate()`), and `field_defaults` was built in
1910        // `ProjectorBuilder::build` to have exactly one element per reader field.
1911        // Therefore, `index < self.field_defaults.len()` always holds here, so
1912        // `self.field_defaults[index]` cannot panic. We only take an immutable reference
1913        // via `.as_ref()`, and `self` is borrowed immutably.
1914        if let Some(default_literal) = self.field_defaults[index].as_ref() {
1915            decoder.append_default(default_literal)
1916        } else {
1917            decoder.append_null()
1918        }
1919    }
1920
1921    #[inline]
1922    fn project_record(
1923        &mut self,
1924        buf: &mut AvroCursor<'_>,
1925        encodings: &mut [Decoder],
1926    ) -> Result<(), ArrowError> {
1927        debug_assert_eq!(
1928            self.writer_to_reader.len(),
1929            self.skip_decoders.len(),
1930            "internal invariant: mapping and skipper lists must have equal length"
1931        );
1932        for (i, (mapping, skipper_opt)) in self
1933            .writer_to_reader
1934            .iter()
1935            .zip(self.skip_decoders.iter_mut())
1936            .enumerate()
1937        {
1938            match (mapping, skipper_opt.as_mut()) {
1939                (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
1940                (None, Some(skipper)) => skipper.skip(buf)?,
1941                (None, None) => {
1942                    return Err(ArrowError::SchemaError(format!(
1943                        "No skipper available for writer-only field at index {i}",
1944                    )));
1945                }
1946            }
1947        }
1948        for (reader_index, lit) in self.default_injections.as_ref() {
1949            encodings[*reader_index].append_default(lit)?;
1950        }
1951        Ok(())
1952    }
1953}
1954
1955/// Lightweight skipper for non‑projected writer fields
1956/// (fields present in the writer schema but omitted by the reader/projection);
1957/// per Avro 1.11.1 schema resolution these fields are ignored.
1958///
1959/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
1960#[derive(Debug)]
1961enum Skipper {
1962    Null,
1963    Boolean,
1964    Int32,
1965    Int64,
1966    Float32,
1967    Float64,
1968    Bytes,
1969    String,
1970    TimeMicros,
1971    TimestampMillis,
1972    TimestampMicros,
1973    TimestampNanos,
1974    Fixed(usize),
1975    Decimal(Option<usize>),
1976    UuidString,
1977    Enum,
1978    DurationFixed12,
1979    List(Box<Skipper>),
1980    Map(Box<Skipper>),
1981    Struct(Vec<Skipper>),
1982    Union(Vec<Skipper>),
1983    Nullable(Nullability, Box<Skipper>),
1984    #[cfg(feature = "avro_custom_types")]
1985    RunEndEncoded(Box<Skipper>),
1986}
1987
1988impl Skipper {
1989    fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
1990        let mut base = match dt.codec() {
1991            Codec::Null => Self::Null,
1992            Codec::Boolean => Self::Boolean,
1993            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1994            Codec::Int64 => Self::Int64,
1995            Codec::TimeMicros => Self::TimeMicros,
1996            Codec::TimestampMillis(_) => Self::TimestampMillis,
1997            Codec::TimestampMicros(_) => Self::TimestampMicros,
1998            Codec::TimestampNanos(_) => Self::TimestampNanos,
1999            #[cfg(feature = "avro_custom_types")]
2000            Codec::DurationNanos
2001            | Codec::DurationMicros
2002            | Codec::DurationMillis
2003            | Codec::DurationSeconds => Self::Int64,
2004            Codec::Float32 => Self::Float32,
2005            Codec::Float64 => Self::Float64,
2006            Codec::Binary => Self::Bytes,
2007            Codec::Utf8 | Codec::Utf8View => Self::String,
2008            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2009            Codec::Decimal(_, _, size) => Self::Decimal(*size),
2010            Codec::Uuid => Self::UuidString, // encoded as string
2011            Codec::Enum(_) => Self::Enum,
2012            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2013            Codec::Struct(fields) => Self::Struct(
2014                fields
2015                    .iter()
2016                    .map(|f| Skipper::from_avro(f.data_type()))
2017                    .collect::<Result<_, _>>()?,
2018            ),
2019            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2020            Codec::Interval => Self::DurationFixed12,
2021            Codec::Union(encodings, _, _) => {
2022                let max_addr = (i32::MAX as usize) + 1;
2023                if encodings.len() > max_addr {
2024                    return Err(ArrowError::SchemaError(format!(
2025                        "Writer union has {} branches, which exceeds the maximum addressable \
2026                         branches by an Avro int tag ({} + 1).",
2027                        encodings.len(),
2028                        i32::MAX
2029                    )));
2030                }
2031                Self::Union(
2032                    encodings
2033                        .iter()
2034                        .map(Skipper::from_avro)
2035                        .collect::<Result<_, _>>()?,
2036                )
2037            }
2038            #[cfg(feature = "avro_custom_types")]
2039            Codec::RunEndEncoded(inner, _w) => {
2040                Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2041            }
2042        };
2043        if let Some(n) = dt.nullability() {
2044            base = Self::Nullable(n, Box::new(base));
2045        }
2046        Ok(base)
2047    }
2048
2049    fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
2050        match self {
2051            Self::Null => Ok(()),
2052            Self::Boolean => {
2053                buf.get_bool()?;
2054                Ok(())
2055            }
2056            Self::Int32 => {
2057                buf.get_int()?;
2058                Ok(())
2059            }
2060            Self::Int64
2061            | Self::TimeMicros
2062            | Self::TimestampMillis
2063            | Self::TimestampMicros
2064            | Self::TimestampNanos => {
2065                buf.get_long()?;
2066                Ok(())
2067            }
2068            Self::Float32 => {
2069                buf.get_float()?;
2070                Ok(())
2071            }
2072            Self::Float64 => {
2073                buf.get_double()?;
2074                Ok(())
2075            }
2076            Self::Bytes | Self::String | Self::UuidString => {
2077                buf.get_bytes()?;
2078                Ok(())
2079            }
2080            Self::Fixed(sz) => {
2081                buf.get_fixed(*sz)?;
2082                Ok(())
2083            }
2084            Self::Decimal(size) => {
2085                if let Some(s) = size {
2086                    buf.get_fixed(*s)
2087                } else {
2088                    buf.get_bytes()
2089                }?;
2090                Ok(())
2091            }
2092            Self::Enum => {
2093                buf.get_int()?;
2094                Ok(())
2095            }
2096            Self::DurationFixed12 => {
2097                buf.get_fixed(12)?;
2098                Ok(())
2099            }
2100            Self::List(item) => {
2101                skip_blocks(buf, |c| item.skip(c))?;
2102                Ok(())
2103            }
2104            Self::Map(value) => {
2105                skip_blocks(buf, |c| {
2106                    c.get_bytes()?; // key
2107                    value.skip(c)
2108                })?;
2109                Ok(())
2110            }
2111            Self::Struct(fields) => {
2112                for f in fields.iter_mut() {
2113                    f.skip(buf)?
2114                }
2115                Ok(())
2116            }
2117            Self::Union(encodings) => {
2118                // Union tag must be ZigZag-decoded
2119                let raw = buf.get_long()?;
2120                if raw < 0 {
2121                    return Err(ArrowError::ParseError(format!(
2122                        "Negative union branch index {raw}"
2123                    )));
2124                }
2125                let idx: usize = usize::try_from(raw).map_err(|_| {
2126                    ArrowError::ParseError(format!(
2127                        "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2128                        (usize::BITS as usize)
2129                    ))
2130                })?;
2131                let Some(encoding) = encodings.get_mut(idx) else {
2132                    return Err(ArrowError::ParseError(format!(
2133                        "Union branch index {idx} out of range for skipper ({} branches)",
2134                        encodings.len()
2135                    )));
2136                };
2137                encoding.skip(buf)
2138            }
2139            Self::Nullable(order, inner) => {
2140                let branch = buf.read_vlq()?;
2141                let is_not_null = match *order {
2142                    Nullability::NullFirst => branch != 0,
2143                    Nullability::NullSecond => branch == 0,
2144                };
2145                if is_not_null {
2146                    inner.skip(buf)?;
2147                }
2148                Ok(())
2149            }
2150            #[cfg(feature = "avro_custom_types")]
2151            Self::RunEndEncoded(inner) => inner.skip(buf),
2152        }
2153    }
2154}
2155
2156#[cfg(test)]
2157mod tests {
2158    use super::*;
2159    use crate::codec::AvroFieldBuilder;
2160    use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2161    use arrow_array::cast::AsArray;
2162    use indexmap::IndexMap;
2163    use std::collections::HashMap;
2164
2165    fn encode_avro_int(value: i32) -> Vec<u8> {
2166        let mut buf = Vec::new();
2167        let mut v = (value << 1) ^ (value >> 31);
2168        while v & !0x7F != 0 {
2169            buf.push(((v & 0x7F) | 0x80) as u8);
2170            v >>= 7;
2171        }
2172        buf.push(v as u8);
2173        buf
2174    }
2175
2176    fn encode_avro_long(value: i64) -> Vec<u8> {
2177        let mut buf = Vec::new();
2178        let mut v = (value << 1) ^ (value >> 63);
2179        while v & !0x7F != 0 {
2180            buf.push(((v & 0x7F) | 0x80) as u8);
2181            v >>= 7;
2182        }
2183        buf.push(v as u8);
2184        buf
2185    }
2186
2187    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2188        let mut buf = encode_avro_long(bytes.len() as i64);
2189        buf.extend_from_slice(bytes);
2190        buf
2191    }
2192
2193    fn avro_from_codec(codec: Codec) -> AvroDataType {
2194        AvroDataType::new(codec, Default::default(), None)
2195    }
2196
2197    fn resolved_root_datatype(
2198        writer: Schema<'static>,
2199        reader: Schema<'static>,
2200        use_utf8view: bool,
2201        strict_mode: bool,
2202    ) -> AvroDataType {
2203        // Wrap writer schema in a single-field record
2204        let writer_record = Schema::Complex(ComplexType::Record(Record {
2205            name: "Root",
2206            namespace: None,
2207            doc: None,
2208            aliases: vec![],
2209            fields: vec![Field {
2210                name: "v",
2211                r#type: writer,
2212                default: None,
2213                doc: None,
2214                aliases: vec![],
2215            }],
2216            attributes: Attributes::default(),
2217        }));
2218
2219        // Wrap reader schema in a single-field record
2220        let reader_record = Schema::Complex(ComplexType::Record(Record {
2221            name: "Root",
2222            namespace: None,
2223            doc: None,
2224            aliases: vec![],
2225            fields: vec![Field {
2226                name: "v",
2227                r#type: reader,
2228                default: None,
2229                doc: None,
2230                aliases: vec![],
2231            }],
2232            attributes: Attributes::default(),
2233        }));
2234
2235        // Build resolved record, then extract the inner field's resolved AvroDataType
2236        let field = AvroFieldBuilder::new(&writer_record)
2237            .with_reader_schema(&reader_record)
2238            .with_utf8view(use_utf8view)
2239            .with_strict_mode(strict_mode)
2240            .build()
2241            .expect("schema resolution should succeed");
2242
2243        match field.data_type().codec() {
2244            Codec::Struct(fields) => fields[0].data_type().clone(),
2245            other => panic!("expected wrapper struct, got {other:?}"),
2246        }
2247    }
2248
2249    fn decoder_for_promotion(
2250        writer: PrimitiveType,
2251        reader: PrimitiveType,
2252        use_utf8view: bool,
2253    ) -> Decoder {
2254        let ws = Schema::TypeName(TypeName::Primitive(writer));
2255        let rs = Schema::TypeName(TypeName::Primitive(reader));
2256        let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2257        Decoder::try_new(&dt).unwrap()
2258    }
2259
2260    fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2261        AvroDataType::new(codec, HashMap::new(), nullability)
2262    }
2263
2264    #[cfg(feature = "avro_custom_types")]
2265    fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2266        let mut out = Vec::with_capacity(10);
2267        while x >= 0x80 {
2268            out.push((x as u8) | 0x80);
2269            x >>= 7;
2270        }
2271        out.push(x as u8);
2272        out
2273    }
2274
2275    #[test]
2276    fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2277        let ws = Schema::Union(vec![
2278            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2279            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2280        ]);
2281        let rs = Schema::Union(vec![
2282            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2283            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2284        ]);
2285
2286        let dt = resolved_root_datatype(ws, rs, false, false);
2287        let mut dec = Decoder::try_new(&dt).unwrap();
2288
2289        let mut rec1 = encode_avro_long(0);
2290        rec1.extend(encode_avro_int(7));
2291        let mut cur1 = AvroCursor::new(&rec1);
2292        dec.decode(&mut cur1).unwrap();
2293
2294        let mut rec2 = encode_avro_long(1);
2295        rec2.extend(encode_avro_bytes("abc".as_bytes()));
2296        let mut cur2 = AvroCursor::new(&rec2);
2297        dec.decode(&mut cur2).unwrap();
2298
2299        let arr = dec.flush(None).unwrap();
2300        let ua = arr
2301            .as_any()
2302            .downcast_ref::<UnionArray>()
2303            .expect("dense union output");
2304
2305        assert_eq!(
2306            ua.type_id(0),
2307            1,
2308            "first value must select reader 'long' branch"
2309        );
2310        assert_eq!(ua.value_offset(0), 0);
2311
2312        assert_eq!(
2313            ua.type_id(1),
2314            0,
2315            "second value must select reader 'string' branch"
2316        );
2317        assert_eq!(ua.value_offset(1), 0);
2318
2319        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2320        assert_eq!(long_child.len(), 1);
2321        assert_eq!(long_child.value(0), 7);
2322
2323        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2324        assert_eq!(str_child.len(), 1);
2325        assert_eq!(str_child.value(0), "abc");
2326    }
2327
2328    #[test]
2329    fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2330        let ws = Schema::Union(vec![
2331            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2332            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2333        ]);
2334        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2335
2336        let dt = resolved_root_datatype(ws, rs, false, false);
2337        let mut dec = Decoder::try_new(&dt).unwrap();
2338
2339        let mut data = encode_avro_long(0);
2340        data.extend(encode_avro_int(5));
2341        let mut cur = AvroCursor::new(&data);
2342        dec.decode(&mut cur).unwrap();
2343
2344        let arr = dec.flush(None).unwrap();
2345        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2346        assert_eq!(out.len(), 1);
2347        assert_eq!(out.value(0), 5);
2348    }
2349
2350    #[test]
2351    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2352        let ws = Schema::Union(vec![
2353            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2354            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2355        ]);
2356        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2357
2358        let dt = resolved_root_datatype(ws, rs, false, false);
2359        let mut dec = Decoder::try_new(&dt).unwrap();
2360
2361        let mut data = encode_avro_long(1);
2362        data.extend(encode_avro_bytes("z".as_bytes()));
2363        let mut cur = AvroCursor::new(&data);
2364        let res = dec.decode(&mut cur);
2365        assert!(
2366            res.is_err(),
2367            "expected error when writer union branch does not resolve to reader non-union type"
2368        );
2369    }
2370
2371    #[test]
2372    fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2373        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2374        let rs = Schema::Union(vec![
2375            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2376            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2377        ]);
2378
2379        let dt = resolved_root_datatype(ws, rs, false, false);
2380        let mut dec = Decoder::try_new(&dt).unwrap();
2381
2382        let data = encode_avro_int(6);
2383        let mut cur = AvroCursor::new(&data);
2384        dec.decode(&mut cur).unwrap();
2385
2386        let arr = dec.flush(None).unwrap();
2387        let ua = arr
2388            .as_any()
2389            .downcast_ref::<UnionArray>()
2390            .expect("dense union output");
2391        assert_eq!(ua.len(), 1);
2392        assert_eq!(
2393            ua.type_id(0),
2394            1,
2395            "must resolve to reader 'long' branch (type_id 1)"
2396        );
2397        assert_eq!(ua.value_offset(0), 0);
2398
2399        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2400        assert_eq!(long_child.len(), 1);
2401        assert_eq!(long_child.value(0), 6);
2402
2403        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2404        assert_eq!(str_child.len(), 0, "string branch must be empty");
2405    }
2406
2407    #[test]
2408    fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2409        let ws = Schema::Union(vec![
2410            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2411            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2412        ]);
2413        let rs = Schema::Union(vec![
2414            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2415            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2416        ]);
2417
2418        let dt = resolved_root_datatype(ws, rs, false, false);
2419        let mut dec = Decoder::try_new(&dt).unwrap();
2420
2421        let mut data = encode_avro_long(1);
2422        data.push(1);
2423        let mut cur = AvroCursor::new(&data);
2424        let res = dec.decode(&mut cur);
2425        assert!(
2426            res.is_err(),
2427            "expected error for unmapped writer 'boolean' branch"
2428        );
2429    }
2430
2431    #[test]
2432    fn test_schema_resolution_promotion_int_to_long() {
2433        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2434        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2435        for v in [0, 1, -2, 123456] {
2436            let data = encode_avro_int(v);
2437            let mut cur = AvroCursor::new(&data);
2438            dec.decode(&mut cur).unwrap();
2439        }
2440        let arr = dec.flush(None).unwrap();
2441        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2442        assert_eq!(a.value(0), 0);
2443        assert_eq!(a.value(1), 1);
2444        assert_eq!(a.value(2), -2);
2445        assert_eq!(a.value(3), 123456);
2446    }
2447
2448    #[test]
2449    fn test_schema_resolution_promotion_int_to_float() {
2450        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2451        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2452        for v in [0, 42, -7] {
2453            let data = encode_avro_int(v);
2454            let mut cur = AvroCursor::new(&data);
2455            dec.decode(&mut cur).unwrap();
2456        }
2457        let arr = dec.flush(None).unwrap();
2458        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2459        assert_eq!(a.value(0), 0.0);
2460        assert_eq!(a.value(1), 42.0);
2461        assert_eq!(a.value(2), -7.0);
2462    }
2463
2464    #[test]
2465    fn test_schema_resolution_promotion_int_to_double() {
2466        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2467        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2468        for v in [1, -1, 10_000] {
2469            let data = encode_avro_int(v);
2470            let mut cur = AvroCursor::new(&data);
2471            dec.decode(&mut cur).unwrap();
2472        }
2473        let arr = dec.flush(None).unwrap();
2474        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2475        assert_eq!(a.value(0), 1.0);
2476        assert_eq!(a.value(1), -1.0);
2477        assert_eq!(a.value(2), 10_000.0);
2478    }
2479
2480    #[test]
2481    fn test_schema_resolution_promotion_long_to_float() {
2482        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2483        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2484        for v in [0_i64, 1_000_000_i64, -123_i64] {
2485            let data = encode_avro_long(v);
2486            let mut cur = AvroCursor::new(&data);
2487            dec.decode(&mut cur).unwrap();
2488        }
2489        let arr = dec.flush(None).unwrap();
2490        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2491        assert_eq!(a.value(0), 0.0);
2492        assert_eq!(a.value(1), 1_000_000.0);
2493        assert_eq!(a.value(2), -123.0);
2494    }
2495
2496    #[test]
2497    fn test_schema_resolution_promotion_long_to_double() {
2498        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2499        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2500        for v in [2_i64, -2_i64, 9_223_372_i64] {
2501            let data = encode_avro_long(v);
2502            let mut cur = AvroCursor::new(&data);
2503            dec.decode(&mut cur).unwrap();
2504        }
2505        let arr = dec.flush(None).unwrap();
2506        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2507        assert_eq!(a.value(0), 2.0);
2508        assert_eq!(a.value(1), -2.0);
2509        assert_eq!(a.value(2), 9_223_372.0);
2510    }
2511
2512    #[test]
2513    fn test_schema_resolution_promotion_float_to_double() {
2514        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2515        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2516        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2517            let data = v.to_le_bytes().to_vec();
2518            let mut cur = AvroCursor::new(&data);
2519            dec.decode(&mut cur).unwrap();
2520        }
2521        let arr = dec.flush(None).unwrap();
2522        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2523        assert_eq!(a.value(0), 0.5_f64);
2524        assert_eq!(a.value(1), -3.25_f64);
2525        assert_eq!(a.value(2), 1.0e6_f64);
2526    }
2527
2528    #[test]
2529    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2530        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2531        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2532        for s in ["hello", "world", "héllo"] {
2533            let data = encode_avro_bytes(s.as_bytes());
2534            let mut cur = AvroCursor::new(&data);
2535            dec.decode(&mut cur).unwrap();
2536        }
2537        let arr = dec.flush(None).unwrap();
2538        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2539        assert_eq!(a.value(0), "hello");
2540        assert_eq!(a.value(1), "world");
2541        assert_eq!(a.value(2), "héllo");
2542    }
2543
2544    #[test]
2545    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2546        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2547        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2548        let data = encode_avro_bytes("abc".as_bytes());
2549        let mut cur = AvroCursor::new(&data);
2550        dec.decode(&mut cur).unwrap();
2551        let arr = dec.flush(None).unwrap();
2552        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2553        assert_eq!(a.value(0), "abc");
2554    }
2555
2556    #[test]
2557    fn test_schema_resolution_promotion_string_to_bytes() {
2558        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2559        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2560        for s in ["", "abc", "data"] {
2561            let data = encode_avro_bytes(s.as_bytes());
2562            let mut cur = AvroCursor::new(&data);
2563            dec.decode(&mut cur).unwrap();
2564        }
2565        let arr = dec.flush(None).unwrap();
2566        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2567        assert_eq!(a.value(0), b"");
2568        assert_eq!(a.value(1), b"abc");
2569        assert_eq!(a.value(2), "data".as_bytes());
2570    }
2571
2572    #[test]
2573    fn test_schema_resolution_no_promotion_passthrough_int() {
2574        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2575        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2576        // Wrap both in a synthetic single-field record and resolve with AvroFieldBuilder
2577        let writer_record = Schema::Complex(ComplexType::Record(Record {
2578            name: "Root",
2579            namespace: None,
2580            doc: None,
2581            aliases: vec![],
2582            fields: vec![Field {
2583                name: "v",
2584                r#type: ws,
2585                default: None,
2586                doc: None,
2587                aliases: vec![],
2588            }],
2589            attributes: Attributes::default(),
2590        }));
2591        let reader_record = Schema::Complex(ComplexType::Record(Record {
2592            name: "Root",
2593            namespace: None,
2594            doc: None,
2595            aliases: vec![],
2596            fields: vec![Field {
2597                name: "v",
2598                r#type: rs,
2599                default: None,
2600                doc: None,
2601                aliases: vec![],
2602            }],
2603            attributes: Attributes::default(),
2604        }));
2605        let field = AvroFieldBuilder::new(&writer_record)
2606            .with_reader_schema(&reader_record)
2607            .with_utf8view(false)
2608            .with_strict_mode(false)
2609            .build()
2610            .unwrap();
2611        // Extract the resolved inner field's AvroDataType
2612        let dt = match field.data_type().codec() {
2613            Codec::Struct(fields) => fields[0].data_type().clone(),
2614            other => panic!("expected wrapper struct, got {other:?}"),
2615        };
2616        let mut dec = Decoder::try_new(&dt).unwrap();
2617        assert!(matches!(dec, Decoder::Int32(_)));
2618        for v in [7, -9] {
2619            let data = encode_avro_int(v);
2620            let mut cur = AvroCursor::new(&data);
2621            dec.decode(&mut cur).unwrap();
2622        }
2623        let arr = dec.flush(None).unwrap();
2624        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2625        assert_eq!(a.value(0), 7);
2626        assert_eq!(a.value(1), -9);
2627    }
2628
2629    #[test]
2630    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
2631        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2632        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
2633        let writer_record = Schema::Complex(ComplexType::Record(Record {
2634            name: "Root",
2635            namespace: None,
2636            doc: None,
2637            aliases: vec![],
2638            fields: vec![Field {
2639                name: "v",
2640                r#type: ws,
2641                default: None,
2642                doc: None,
2643                aliases: vec![],
2644            }],
2645            attributes: Attributes::default(),
2646        }));
2647        let reader_record = Schema::Complex(ComplexType::Record(Record {
2648            name: "Root",
2649            namespace: None,
2650            doc: None,
2651            aliases: vec![],
2652            fields: vec![Field {
2653                name: "v",
2654                r#type: rs,
2655                default: None,
2656                doc: None,
2657                aliases: vec![],
2658            }],
2659            attributes: Attributes::default(),
2660        }));
2661        let res = AvroFieldBuilder::new(&writer_record)
2662            .with_reader_schema(&reader_record)
2663            .with_utf8view(false)
2664            .with_strict_mode(false)
2665            .build();
2666        assert!(res.is_err(), "expected error for illegal promotion");
2667    }
2668
2669    #[test]
2670    fn test_map_decoding_one_entry() {
2671        let value_type = avro_from_codec(Codec::Utf8);
2672        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2673        let mut decoder = Decoder::try_new(&map_type).unwrap();
2674        // Encode a single map with one entry: {"hello": "world"}
2675        let mut data = Vec::new();
2676        data.extend_from_slice(&encode_avro_long(1));
2677        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
2678        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
2679        data.extend_from_slice(&encode_avro_long(0));
2680        let mut cursor = AvroCursor::new(&data);
2681        decoder.decode(&mut cursor).unwrap();
2682        let array = decoder.flush(None).unwrap();
2683        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2684        assert_eq!(map_arr.len(), 1); // one map
2685        assert_eq!(map_arr.value_length(0), 1);
2686        let entries = map_arr.value(0);
2687        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
2688        assert_eq!(struct_entries.len(), 1);
2689        let key_arr = struct_entries
2690            .column_by_name("key")
2691            .unwrap()
2692            .as_any()
2693            .downcast_ref::<StringArray>()
2694            .unwrap();
2695        let val_arr = struct_entries
2696            .column_by_name("value")
2697            .unwrap()
2698            .as_any()
2699            .downcast_ref::<StringArray>()
2700            .unwrap();
2701        assert_eq!(key_arr.value(0), "hello");
2702        assert_eq!(val_arr.value(0), "world");
2703    }
2704
2705    #[test]
2706    fn test_map_decoding_empty() {
2707        let value_type = avro_from_codec(Codec::Utf8);
2708        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2709        let mut decoder = Decoder::try_new(&map_type).unwrap();
2710        let data = encode_avro_long(0);
2711        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2712        let array = decoder.flush(None).unwrap();
2713        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2714        assert_eq!(map_arr.len(), 1);
2715        assert_eq!(map_arr.value_length(0), 0);
2716    }
2717
2718    #[test]
2719    fn test_fixed_decoding() {
2720        let avro_type = avro_from_codec(Codec::Fixed(3));
2721        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2722
2723        let data1 = [1u8, 2, 3];
2724        let mut cursor1 = AvroCursor::new(&data1);
2725        decoder
2726            .decode(&mut cursor1)
2727            .expect("Failed to decode data1");
2728        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
2729        let data2 = [4u8, 5, 6];
2730        let mut cursor2 = AvroCursor::new(&data2);
2731        decoder
2732            .decode(&mut cursor2)
2733            .expect("Failed to decode data2");
2734        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
2735        let array = decoder.flush(None).expect("Failed to flush decoder");
2736        assert_eq!(array.len(), 2, "Array should contain two items");
2737        let fixed_size_binary_array = array
2738            .as_any()
2739            .downcast_ref::<FixedSizeBinaryArray>()
2740            .expect("Failed to downcast to FixedSizeBinaryArray");
2741        assert_eq!(
2742            fixed_size_binary_array.value_length(),
2743            3,
2744            "Fixed size of binary values should be 3"
2745        );
2746        assert_eq!(
2747            fixed_size_binary_array.value(0),
2748            &[1, 2, 3],
2749            "First item mismatch"
2750        );
2751        assert_eq!(
2752            fixed_size_binary_array.value(1),
2753            &[4, 5, 6],
2754            "Second item mismatch"
2755        );
2756    }
2757
2758    #[test]
2759    fn test_fixed_decoding_empty() {
2760        let avro_type = avro_from_codec(Codec::Fixed(5));
2761        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2762
2763        let array = decoder
2764            .flush(None)
2765            .expect("Failed to flush decoder for empty input");
2766
2767        assert_eq!(array.len(), 0, "Array should be empty");
2768        let fixed_size_binary_array = array
2769            .as_any()
2770            .downcast_ref::<FixedSizeBinaryArray>()
2771            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
2772
2773        assert_eq!(
2774            fixed_size_binary_array.value_length(),
2775            5,
2776            "Fixed size of binary values should be 5 as per type"
2777        );
2778    }
2779
2780    #[test]
2781    fn test_uuid_decoding() {
2782        let avro_type = avro_from_codec(Codec::Uuid);
2783        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2784        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
2785        let data = encode_avro_bytes(uuid_str.as_bytes());
2786        let mut cursor = AvroCursor::new(&data);
2787        decoder.decode(&mut cursor).expect("Failed to decode data");
2788        assert_eq!(
2789            cursor.position(),
2790            data.len(),
2791            "Cursor should advance by varint size + data size"
2792        );
2793        let array = decoder.flush(None).expect("Failed to flush decoder");
2794        let fixed_size_binary_array = array
2795            .as_any()
2796            .downcast_ref::<FixedSizeBinaryArray>()
2797            .expect("Array should be a FixedSizeBinaryArray");
2798        assert_eq!(fixed_size_binary_array.len(), 1);
2799        assert_eq!(fixed_size_binary_array.value_length(), 16);
2800        let expected_bytes = [
2801            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
2802            0x6b, 0xf6,
2803        ];
2804        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
2805    }
2806
2807    #[test]
2808    fn test_array_decoding() {
2809        let item_dt = avro_from_codec(Codec::Int32);
2810        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2811        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2812        let mut row1 = Vec::new();
2813        row1.extend_from_slice(&encode_avro_long(2));
2814        row1.extend_from_slice(&encode_avro_int(10));
2815        row1.extend_from_slice(&encode_avro_int(20));
2816        row1.extend_from_slice(&encode_avro_long(0));
2817        let row2 = encode_avro_long(0);
2818        let mut cursor = AvroCursor::new(&row1);
2819        decoder.decode(&mut cursor).unwrap();
2820        let mut cursor2 = AvroCursor::new(&row2);
2821        decoder.decode(&mut cursor2).unwrap();
2822        let array = decoder.flush(None).unwrap();
2823        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2824        assert_eq!(list_arr.len(), 2);
2825        let offsets = list_arr.value_offsets();
2826        assert_eq!(offsets, &[0, 2, 2]);
2827        let values = list_arr.values();
2828        let int_arr = values.as_primitive::<Int32Type>();
2829        assert_eq!(int_arr.len(), 2);
2830        assert_eq!(int_arr.value(0), 10);
2831        assert_eq!(int_arr.value(1), 20);
2832    }
2833
2834    #[test]
2835    fn test_array_decoding_with_negative_block_count() {
2836        let item_dt = avro_from_codec(Codec::Int32);
2837        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2838        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2839        let mut data = encode_avro_long(-3);
2840        data.extend_from_slice(&encode_avro_long(12));
2841        data.extend_from_slice(&encode_avro_int(1));
2842        data.extend_from_slice(&encode_avro_int(2));
2843        data.extend_from_slice(&encode_avro_int(3));
2844        data.extend_from_slice(&encode_avro_long(0));
2845        let mut cursor = AvroCursor::new(&data);
2846        decoder.decode(&mut cursor).unwrap();
2847        let array = decoder.flush(None).unwrap();
2848        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2849        assert_eq!(list_arr.len(), 1);
2850        assert_eq!(list_arr.value_length(0), 3);
2851        let values = list_arr.values().as_primitive::<Int32Type>();
2852        assert_eq!(values.len(), 3);
2853        assert_eq!(values.value(0), 1);
2854        assert_eq!(values.value(1), 2);
2855        assert_eq!(values.value(2), 3);
2856    }
2857
2858    #[test]
2859    fn test_nested_array_decoding() {
2860        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
2861        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
2862        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
2863        let mut buf = Vec::new();
2864        buf.extend(encode_avro_long(1));
2865        buf.extend(encode_avro_long(2));
2866        buf.extend(encode_avro_int(5));
2867        buf.extend(encode_avro_int(6));
2868        buf.extend(encode_avro_long(0));
2869        buf.extend(encode_avro_long(0));
2870        let mut cursor = AvroCursor::new(&buf);
2871        decoder.decode(&mut cursor).unwrap();
2872        let arr = decoder.flush(None).unwrap();
2873        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
2874        assert_eq!(outer.len(), 1);
2875        assert_eq!(outer.value_length(0), 1);
2876        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
2877        assert_eq!(inner.len(), 1);
2878        assert_eq!(inner.value_length(0), 2);
2879        let values = inner
2880            .values()
2881            .as_any()
2882            .downcast_ref::<Int32Array>()
2883            .unwrap();
2884        assert_eq!(values.values(), &[5, 6]);
2885    }
2886
2887    #[test]
2888    fn test_array_decoding_empty_array() {
2889        let value_type = avro_from_codec(Codec::Utf8);
2890        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
2891        let mut decoder = Decoder::try_new(&map_type).unwrap();
2892        let data = encode_avro_long(0);
2893        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2894        let array = decoder.flush(None).unwrap();
2895        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2896        assert_eq!(list_arr.len(), 1);
2897        assert_eq!(list_arr.value_length(0), 0);
2898    }
2899
2900    #[test]
2901    fn test_decimal_decoding_fixed256() {
2902        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
2903        let mut decoder = Decoder::try_new(&dt).unwrap();
2904        let row1 = [
2905            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2906            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2907            0x00, 0x00, 0x30, 0x39,
2908        ];
2909        let row2 = [
2910            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2911            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2912            0xFF, 0xFF, 0xFF, 0x85,
2913        ];
2914        let mut data = Vec::new();
2915        data.extend_from_slice(&row1);
2916        data.extend_from_slice(&row2);
2917        let mut cursor = AvroCursor::new(&data);
2918        decoder.decode(&mut cursor).unwrap();
2919        decoder.decode(&mut cursor).unwrap();
2920        let arr = decoder.flush(None).unwrap();
2921        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
2922        assert_eq!(dec.len(), 2);
2923        assert_eq!(dec.value_as_string(0), "123.45");
2924        assert_eq!(dec.value_as_string(1), "-1.23");
2925    }
2926
2927    #[test]
2928    fn test_decimal_decoding_fixed128() {
2929        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
2930        let mut decoder = Decoder::try_new(&dt).unwrap();
2931        let row1 = [
2932            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2933            0x30, 0x39,
2934        ];
2935        let row2 = [
2936            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2937            0xFF, 0x85,
2938        ];
2939        let mut data = Vec::new();
2940        data.extend_from_slice(&row1);
2941        data.extend_from_slice(&row2);
2942        let mut cursor = AvroCursor::new(&data);
2943        decoder.decode(&mut cursor).unwrap();
2944        decoder.decode(&mut cursor).unwrap();
2945        let arr = decoder.flush(None).unwrap();
2946        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2947        assert_eq!(dec.len(), 2);
2948        assert_eq!(dec.value_as_string(0), "123.45");
2949        assert_eq!(dec.value_as_string(1), "-1.23");
2950    }
2951
2952    #[test]
2953    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
2954        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
2955        let mut decoder = Decoder::try_new(&dt).unwrap();
2956        let row1 = [
2957            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2958            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2959            0x00, 0x00, 0x30, 0x39,
2960        ];
2961        let row2 = [
2962            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2963            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2964            0xFF, 0xFF, 0xFF, 0x85,
2965        ];
2966        let mut data = Vec::new();
2967        data.extend_from_slice(&row1);
2968        data.extend_from_slice(&row2);
2969        let mut cursor = AvroCursor::new(&data);
2970        decoder.decode(&mut cursor).unwrap();
2971        decoder.decode(&mut cursor).unwrap();
2972        let arr = decoder.flush(None).unwrap();
2973        #[cfg(feature = "small_decimals")]
2974        {
2975            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2976            assert_eq!(dec.len(), 2);
2977            assert_eq!(dec.value_as_string(0), "123.45");
2978            assert_eq!(dec.value_as_string(1), "-1.23");
2979        }
2980        #[cfg(not(feature = "small_decimals"))]
2981        {
2982            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2983            assert_eq!(dec.len(), 2);
2984            assert_eq!(dec.value_as_string(0), "123.45");
2985            assert_eq!(dec.value_as_string(1), "-1.23");
2986        }
2987    }
2988
2989    #[test]
2990    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
2991        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
2992        let mut decoder = Decoder::try_new(&dt).unwrap();
2993        let row1 = [
2994            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2995            0x30, 0x39,
2996        ];
2997        let row2 = [
2998            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2999            0xFF, 0x85,
3000        ];
3001        let mut data = Vec::new();
3002        data.extend_from_slice(&row1);
3003        data.extend_from_slice(&row2);
3004        let mut cursor = AvroCursor::new(&data);
3005        decoder.decode(&mut cursor).unwrap();
3006        decoder.decode(&mut cursor).unwrap();
3007
3008        let arr = decoder.flush(None).unwrap();
3009        #[cfg(feature = "small_decimals")]
3010        {
3011            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3012            assert_eq!(dec.len(), 2);
3013            assert_eq!(dec.value_as_string(0), "123.45");
3014            assert_eq!(dec.value_as_string(1), "-1.23");
3015        }
3016        #[cfg(not(feature = "small_decimals"))]
3017        {
3018            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3019            assert_eq!(dec.len(), 2);
3020            assert_eq!(dec.value_as_string(0), "123.45");
3021            assert_eq!(dec.value_as_string(1), "-1.23");
3022        }
3023    }
3024
3025    #[test]
3026    fn test_decimal_decoding_bytes_with_nulls() {
3027        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3028        let inner = Decoder::try_new(&dt).unwrap();
3029        let mut decoder = Decoder::Nullable(
3030            Nullability::NullSecond,
3031            NullBufferBuilder::new(DEFAULT_CAPACITY),
3032            Box::new(inner),
3033            NullablePlan::ReadTag,
3034        );
3035        let mut data = Vec::new();
3036        data.extend_from_slice(&encode_avro_int(0));
3037        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3038        data.extend_from_slice(&encode_avro_int(1));
3039        data.extend_from_slice(&encode_avro_int(0));
3040        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3041        let mut cursor = AvroCursor::new(&data);
3042        decoder.decode(&mut cursor).unwrap();
3043        decoder.decode(&mut cursor).unwrap();
3044        decoder.decode(&mut cursor).unwrap();
3045        let arr = decoder.flush(None).unwrap();
3046        #[cfg(feature = "small_decimals")]
3047        {
3048            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3049            assert_eq!(dec_arr.len(), 3);
3050            assert!(dec_arr.is_valid(0));
3051            assert!(!dec_arr.is_valid(1));
3052            assert!(dec_arr.is_valid(2));
3053            assert_eq!(dec_arr.value_as_string(0), "123.4");
3054            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3055        }
3056        #[cfg(not(feature = "small_decimals"))]
3057        {
3058            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3059            assert_eq!(dec_arr.len(), 3);
3060            assert!(dec_arr.is_valid(0));
3061            assert!(!dec_arr.is_valid(1));
3062            assert!(dec_arr.is_valid(2));
3063            assert_eq!(dec_arr.value_as_string(0), "123.4");
3064            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3065        }
3066    }
3067
3068    #[test]
3069    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3070        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3071        let inner = Decoder::try_new(&dt).unwrap();
3072        let mut decoder = Decoder::Nullable(
3073            Nullability::NullSecond,
3074            NullBufferBuilder::new(DEFAULT_CAPACITY),
3075            Box::new(inner),
3076            NullablePlan::ReadTag,
3077        );
3078        let row1 = [
3079            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3080            0xE2, 0x40,
3081        ];
3082        let row3 = [
3083            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3084            0x1D, 0xC0,
3085        ];
3086        let mut data = Vec::new();
3087        data.extend_from_slice(&encode_avro_int(0));
3088        data.extend_from_slice(&row1);
3089        data.extend_from_slice(&encode_avro_int(1));
3090        data.extend_from_slice(&encode_avro_int(0));
3091        data.extend_from_slice(&row3);
3092        let mut cursor = AvroCursor::new(&data);
3093        decoder.decode(&mut cursor).unwrap();
3094        decoder.decode(&mut cursor).unwrap();
3095        decoder.decode(&mut cursor).unwrap();
3096        let arr = decoder.flush(None).unwrap();
3097        #[cfg(feature = "small_decimals")]
3098        {
3099            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3100            assert_eq!(dec_arr.len(), 3);
3101            assert!(dec_arr.is_valid(0));
3102            assert!(!dec_arr.is_valid(1));
3103            assert!(dec_arr.is_valid(2));
3104            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3105            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3106        }
3107        #[cfg(not(feature = "small_decimals"))]
3108        {
3109            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3110            assert_eq!(dec_arr.len(), 3);
3111            assert!(dec_arr.is_valid(0));
3112            assert!(!dec_arr.is_valid(1));
3113            assert!(dec_arr.is_valid(2));
3114            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3115            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3116        }
3117    }
3118
3119    #[test]
3120    fn test_enum_decoding() {
3121        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3122        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3123        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3124        let mut data = Vec::new();
3125        data.extend_from_slice(&encode_avro_int(2));
3126        data.extend_from_slice(&encode_avro_int(0));
3127        data.extend_from_slice(&encode_avro_int(1));
3128        let mut cursor = AvroCursor::new(&data);
3129        decoder.decode(&mut cursor).unwrap();
3130        decoder.decode(&mut cursor).unwrap();
3131        decoder.decode(&mut cursor).unwrap();
3132        let array = decoder.flush(None).unwrap();
3133        let dict_array = array
3134            .as_any()
3135            .downcast_ref::<DictionaryArray<Int32Type>>()
3136            .unwrap();
3137        assert_eq!(dict_array.len(), 3);
3138        let values = dict_array
3139            .values()
3140            .as_any()
3141            .downcast_ref::<StringArray>()
3142            .unwrap();
3143        assert_eq!(values.value(0), "A");
3144        assert_eq!(values.value(1), "B");
3145        assert_eq!(values.value(2), "C");
3146        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3147    }
3148
3149    #[test]
3150    fn test_enum_decoding_with_nulls() {
3151        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3152        let enum_codec = Codec::Enum(symbols.clone());
3153        let avro_type =
3154            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3155        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3156        let mut data = Vec::new();
3157        data.extend_from_slice(&encode_avro_long(1));
3158        data.extend_from_slice(&encode_avro_int(1));
3159        data.extend_from_slice(&encode_avro_long(0));
3160        data.extend_from_slice(&encode_avro_long(1));
3161        data.extend_from_slice(&encode_avro_int(0));
3162        let mut cursor = AvroCursor::new(&data);
3163        decoder.decode(&mut cursor).unwrap();
3164        decoder.decode(&mut cursor).unwrap();
3165        decoder.decode(&mut cursor).unwrap();
3166        let array = decoder.flush(None).unwrap();
3167        let dict_array = array
3168            .as_any()
3169            .downcast_ref::<DictionaryArray<Int32Type>>()
3170            .unwrap();
3171        assert_eq!(dict_array.len(), 3);
3172        assert!(dict_array.is_valid(0));
3173        assert!(dict_array.is_null(1));
3174        assert!(dict_array.is_valid(2));
3175        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3176        assert_eq!(dict_array.keys(), &expected_keys);
3177        let values = dict_array
3178            .values()
3179            .as_any()
3180            .downcast_ref::<StringArray>()
3181            .unwrap();
3182        assert_eq!(values.value(0), "X");
3183        assert_eq!(values.value(1), "Y");
3184    }
3185
3186    #[test]
3187    fn test_duration_decoding_with_nulls() {
3188        let duration_codec = Codec::Interval;
3189        let avro_type = AvroDataType::new(
3190            duration_codec,
3191            Default::default(),
3192            Some(Nullability::NullFirst),
3193        );
3194        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3195        let mut data = Vec::new();
3196        // First value: 1 month, 2 days, 3 millis
3197        data.extend_from_slice(&encode_avro_long(1)); // not null
3198        let mut duration1 = Vec::new();
3199        duration1.extend_from_slice(&1u32.to_le_bytes());
3200        duration1.extend_from_slice(&2u32.to_le_bytes());
3201        duration1.extend_from_slice(&3u32.to_le_bytes());
3202        data.extend_from_slice(&duration1);
3203        // Second value: null
3204        data.extend_from_slice(&encode_avro_long(0)); // null
3205        data.extend_from_slice(&encode_avro_long(1)); // not null
3206        let mut duration2 = Vec::new();
3207        duration2.extend_from_slice(&4u32.to_le_bytes());
3208        duration2.extend_from_slice(&5u32.to_le_bytes());
3209        duration2.extend_from_slice(&6u32.to_le_bytes());
3210        data.extend_from_slice(&duration2);
3211        let mut cursor = AvroCursor::new(&data);
3212        decoder.decode(&mut cursor).unwrap();
3213        decoder.decode(&mut cursor).unwrap();
3214        decoder.decode(&mut cursor).unwrap();
3215        let array = decoder.flush(None).unwrap();
3216        let interval_array = array
3217            .as_any()
3218            .downcast_ref::<IntervalMonthDayNanoArray>()
3219            .unwrap();
3220        assert_eq!(interval_array.len(), 3);
3221        assert!(interval_array.is_valid(0));
3222        assert!(interval_array.is_null(1));
3223        assert!(interval_array.is_valid(2));
3224        let expected = IntervalMonthDayNanoArray::from(vec![
3225            Some(IntervalMonthDayNano {
3226                months: 1,
3227                days: 2,
3228                nanoseconds: 3_000_000,
3229            }),
3230            None,
3231            Some(IntervalMonthDayNano {
3232                months: 4,
3233                days: 5,
3234                nanoseconds: 6_000_000,
3235            }),
3236        ]);
3237        assert_eq!(interval_array, &expected);
3238    }
3239
3240    #[test]
3241    fn test_duration_decoding_empty() {
3242        let duration_codec = Codec::Interval;
3243        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3244        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3245        let array = decoder.flush(None).unwrap();
3246        assert_eq!(array.len(), 0);
3247    }
3248
3249    #[test]
3250    #[cfg(feature = "avro_custom_types")]
3251    fn test_duration_seconds_decoding() {
3252        let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3253        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3254        let mut data = Vec::new();
3255        // Three values: 0, -1, 2
3256        data.extend_from_slice(&encode_avro_long(0));
3257        data.extend_from_slice(&encode_avro_long(-1));
3258        data.extend_from_slice(&encode_avro_long(2));
3259        let mut cursor = AvroCursor::new(&data);
3260        decoder.decode(&mut cursor).unwrap();
3261        decoder.decode(&mut cursor).unwrap();
3262        decoder.decode(&mut cursor).unwrap();
3263        let array = decoder.flush(None).unwrap();
3264        let dur = array
3265            .as_any()
3266            .downcast_ref::<DurationSecondArray>()
3267            .unwrap();
3268        assert_eq!(dur.values(), &[0, -1, 2]);
3269    }
3270
3271    #[test]
3272    #[cfg(feature = "avro_custom_types")]
3273    fn test_duration_milliseconds_decoding() {
3274        let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3275        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3276        let mut data = Vec::new();
3277        for v in [1i64, 0, -2] {
3278            data.extend_from_slice(&encode_avro_long(v));
3279        }
3280        let mut cursor = AvroCursor::new(&data);
3281        for _ in 0..3 {
3282            decoder.decode(&mut cursor).unwrap();
3283        }
3284        let array = decoder.flush(None).unwrap();
3285        let dur = array
3286            .as_any()
3287            .downcast_ref::<DurationMillisecondArray>()
3288            .unwrap();
3289        assert_eq!(dur.values(), &[1, 0, -2]);
3290    }
3291
3292    #[test]
3293    #[cfg(feature = "avro_custom_types")]
3294    fn test_duration_microseconds_decoding() {
3295        let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3296        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3297        let mut data = Vec::new();
3298        for v in [5i64, -6, 7] {
3299            data.extend_from_slice(&encode_avro_long(v));
3300        }
3301        let mut cursor = AvroCursor::new(&data);
3302        for _ in 0..3 {
3303            decoder.decode(&mut cursor).unwrap();
3304        }
3305        let array = decoder.flush(None).unwrap();
3306        let dur = array
3307            .as_any()
3308            .downcast_ref::<DurationMicrosecondArray>()
3309            .unwrap();
3310        assert_eq!(dur.values(), &[5, -6, 7]);
3311    }
3312
3313    #[test]
3314    #[cfg(feature = "avro_custom_types")]
3315    fn test_duration_nanoseconds_decoding() {
3316        let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3317        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3318        let mut data = Vec::new();
3319        for v in [8i64, 9, -10] {
3320            data.extend_from_slice(&encode_avro_long(v));
3321        }
3322        let mut cursor = AvroCursor::new(&data);
3323        for _ in 0..3 {
3324            decoder.decode(&mut cursor).unwrap();
3325        }
3326        let array = decoder.flush(None).unwrap();
3327        let dur = array
3328            .as_any()
3329            .downcast_ref::<DurationNanosecondArray>()
3330            .unwrap();
3331        assert_eq!(dur.values(), &[8, 9, -10]);
3332    }
3333
3334    #[test]
3335    fn test_nullable_decode_error_bitmap_corruption() {
3336        // Nullable Int32 with ['T','null'] encoding (NullSecond)
3337        let avro_type = AvroDataType::new(
3338            Codec::Int32,
3339            Default::default(),
3340            Some(Nullability::NullSecond),
3341        );
3342        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3343
3344        // Row 1: union branch 1 (null)
3345        let mut row1 = Vec::new();
3346        row1.extend_from_slice(&encode_avro_int(1));
3347
3348        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
3349        let mut row2 = Vec::new();
3350        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
3351
3352        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
3353        let mut row3 = Vec::new();
3354        row3.extend_from_slice(&encode_avro_int(0)); // branch
3355        row3.extend_from_slice(&encode_avro_int(42)); // actual value
3356
3357        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3358        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
3359        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3360
3361        let array = decoder.flush(None).unwrap();
3362
3363        // Should contain 2 elements: row1 (null) and row3 (42)
3364        assert_eq!(array.len(), 2);
3365        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3366        assert!(int_array.is_null(0)); // row1 is null
3367        assert_eq!(int_array.value(1), 42); // row3 value is 42
3368    }
3369
3370    #[test]
3371    fn test_enum_mapping_reordered_symbols() {
3372        let reader_symbols: Arc<[String]> =
3373            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3374        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3375        let default_index: i32 = -1;
3376        let mut dec = Decoder::Enum(
3377            Vec::with_capacity(DEFAULT_CAPACITY),
3378            reader_symbols.clone(),
3379            Some(EnumResolution {
3380                mapping,
3381                default_index,
3382            }),
3383        );
3384        let mut data = Vec::new();
3385        data.extend_from_slice(&encode_avro_int(0));
3386        data.extend_from_slice(&encode_avro_int(1));
3387        data.extend_from_slice(&encode_avro_int(2));
3388        let mut cur = AvroCursor::new(&data);
3389        dec.decode(&mut cur).unwrap();
3390        dec.decode(&mut cur).unwrap();
3391        dec.decode(&mut cur).unwrap();
3392        let arr = dec.flush(None).unwrap();
3393        let dict = arr
3394            .as_any()
3395            .downcast_ref::<DictionaryArray<Int32Type>>()
3396            .unwrap();
3397        let expected_keys = Int32Array::from(vec![2, 0, 1]);
3398        assert_eq!(dict.keys(), &expected_keys);
3399        let values = dict
3400            .values()
3401            .as_any()
3402            .downcast_ref::<StringArray>()
3403            .unwrap();
3404        assert_eq!(values.value(0), "B");
3405        assert_eq!(values.value(1), "C");
3406        assert_eq!(values.value(2), "A");
3407    }
3408
3409    #[test]
3410    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3411        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3412        let default_index: i32 = 1;
3413        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3414        let mut dec = Decoder::Enum(
3415            Vec::with_capacity(DEFAULT_CAPACITY),
3416            reader_symbols.clone(),
3417            Some(EnumResolution {
3418                mapping,
3419                default_index,
3420            }),
3421        );
3422        let mut data = Vec::new();
3423        data.extend_from_slice(&encode_avro_int(0));
3424        data.extend_from_slice(&encode_avro_int(1));
3425        data.extend_from_slice(&encode_avro_int(99));
3426        let mut cur = AvroCursor::new(&data);
3427        dec.decode(&mut cur).unwrap();
3428        dec.decode(&mut cur).unwrap();
3429        dec.decode(&mut cur).unwrap();
3430        let arr = dec.flush(None).unwrap();
3431        let dict = arr
3432            .as_any()
3433            .downcast_ref::<DictionaryArray<Int32Type>>()
3434            .unwrap();
3435        let expected_keys = Int32Array::from(vec![0, 1, 1]);
3436        assert_eq!(dict.keys(), &expected_keys);
3437        let values = dict
3438            .values()
3439            .as_any()
3440            .downcast_ref::<StringArray>()
3441            .unwrap();
3442        assert_eq!(values.value(0), "A");
3443        assert_eq!(values.value(1), "B");
3444    }
3445
3446    #[test]
3447    fn test_enum_mapping_unknown_symbol_without_default_errors() {
3448        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3449        let default_index: i32 = -1; // indicates no default at type-level
3450        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3451        let mut dec = Decoder::Enum(
3452            Vec::with_capacity(DEFAULT_CAPACITY),
3453            reader_symbols,
3454            Some(EnumResolution {
3455                mapping,
3456                default_index,
3457            }),
3458        );
3459        let data = encode_avro_int(0);
3460        let mut cur = AvroCursor::new(&data);
3461        let err = dec
3462            .decode(&mut cur)
3463            .expect_err("expected decode error for unresolved enum without default");
3464        let msg = err.to_string();
3465        assert!(
3466            msg.contains("not resolvable") && msg.contains("no default"),
3467            "unexpected error message: {msg}"
3468        );
3469    }
3470
3471    fn make_record_resolved_decoder(
3472        reader_fields: &[(&str, DataType, bool)],
3473        writer_to_reader: Vec<Option<usize>>,
3474        skip_decoders: Vec<Option<Skipper>>,
3475    ) -> Decoder {
3476        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3477        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3478        for (name, dt, nullable) in reader_fields {
3479            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3480            let enc = match dt {
3481                DataType::Int32 => Decoder::Int32(Vec::new()),
3482                DataType::Int64 => Decoder::Int64(Vec::new()),
3483                DataType::Utf8 => {
3484                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3485                }
3486                other => panic!("Unsupported test reader field type: {other:?}"),
3487            };
3488            encodings.push(enc);
3489        }
3490        let fields: Fields = field_refs.into();
3491        Decoder::Record(
3492            fields,
3493            encodings,
3494            Some(Projector {
3495                writer_to_reader: Arc::from(writer_to_reader),
3496                skip_decoders,
3497                field_defaults: vec![None; reader_fields.len()],
3498                default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3499            }),
3500        )
3501    }
3502
3503    #[test]
3504    fn test_skip_writer_trailing_field_int32() {
3505        let mut dec = make_record_resolved_decoder(
3506            &[("id", arrow_schema::DataType::Int32, false)],
3507            vec![Some(0), None],
3508            vec![None, Some(super::Skipper::Int32)],
3509        );
3510        let mut data = Vec::new();
3511        data.extend_from_slice(&encode_avro_int(7));
3512        data.extend_from_slice(&encode_avro_int(999));
3513        let mut cur = AvroCursor::new(&data);
3514        dec.decode(&mut cur).unwrap();
3515        assert_eq!(cur.position(), data.len());
3516        let arr = dec.flush(None).unwrap();
3517        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
3518        assert_eq!(struct_arr.len(), 1);
3519        let id = struct_arr
3520            .column_by_name("id")
3521            .unwrap()
3522            .as_any()
3523            .downcast_ref::<Int32Array>()
3524            .unwrap();
3525        assert_eq!(id.value(0), 7);
3526    }
3527
3528    #[test]
3529    fn test_skip_writer_middle_field_string() {
3530        let mut dec = make_record_resolved_decoder(
3531            &[
3532                ("id", DataType::Int32, false),
3533                ("score", DataType::Int64, false),
3534            ],
3535            vec![Some(0), None, Some(1)],
3536            vec![None, Some(Skipper::String), None],
3537        );
3538        let mut data = Vec::new();
3539        data.extend_from_slice(&encode_avro_int(42));
3540        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
3541        data.extend_from_slice(&encode_avro_long(1000));
3542        let mut cur = AvroCursor::new(&data);
3543        dec.decode(&mut cur).unwrap();
3544        assert_eq!(cur.position(), data.len());
3545        let arr = dec.flush(None).unwrap();
3546        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3547        let id = s
3548            .column_by_name("id")
3549            .unwrap()
3550            .as_any()
3551            .downcast_ref::<Int32Array>()
3552            .unwrap();
3553        let score = s
3554            .column_by_name("score")
3555            .unwrap()
3556            .as_any()
3557            .downcast_ref::<Int64Array>()
3558            .unwrap();
3559        assert_eq!(id.value(0), 42);
3560        assert_eq!(score.value(0), 1000);
3561    }
3562
3563    #[test]
3564    fn test_skip_writer_array_with_negative_block_count_fast() {
3565        let mut dec = make_record_resolved_decoder(
3566            &[("id", DataType::Int32, false)],
3567            vec![None, Some(0)],
3568            vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
3569        );
3570        let mut array_payload = Vec::new();
3571        array_payload.extend_from_slice(&encode_avro_int(1));
3572        array_payload.extend_from_slice(&encode_avro_int(2));
3573        array_payload.extend_from_slice(&encode_avro_int(3));
3574        let mut data = Vec::new();
3575        data.extend_from_slice(&encode_avro_long(-3));
3576        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
3577        data.extend_from_slice(&array_payload);
3578        data.extend_from_slice(&encode_avro_long(0));
3579        data.extend_from_slice(&encode_avro_int(5));
3580        let mut cur = AvroCursor::new(&data);
3581        dec.decode(&mut cur).unwrap();
3582        assert_eq!(cur.position(), data.len());
3583        let arr = dec.flush(None).unwrap();
3584        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3585        let id = s
3586            .column_by_name("id")
3587            .unwrap()
3588            .as_any()
3589            .downcast_ref::<Int32Array>()
3590            .unwrap();
3591        assert_eq!(id.len(), 1);
3592        assert_eq!(id.value(0), 5);
3593    }
3594
3595    #[test]
3596    fn test_skip_writer_map_with_negative_block_count_fast() {
3597        let mut dec = make_record_resolved_decoder(
3598            &[("id", DataType::Int32, false)],
3599            vec![None, Some(0)],
3600            vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
3601        );
3602        let mut entries = Vec::new();
3603        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
3604        entries.extend_from_slice(&encode_avro_int(10));
3605        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
3606        entries.extend_from_slice(&encode_avro_int(20));
3607        let mut data = Vec::new();
3608        data.extend_from_slice(&encode_avro_long(-2));
3609        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
3610        data.extend_from_slice(&entries);
3611        data.extend_from_slice(&encode_avro_long(0));
3612        data.extend_from_slice(&encode_avro_int(123));
3613        let mut cur = AvroCursor::new(&data);
3614        dec.decode(&mut cur).unwrap();
3615        assert_eq!(cur.position(), data.len());
3616        let arr = dec.flush(None).unwrap();
3617        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3618        let id = s
3619            .column_by_name("id")
3620            .unwrap()
3621            .as_any()
3622            .downcast_ref::<Int32Array>()
3623            .unwrap();
3624        assert_eq!(id.len(), 1);
3625        assert_eq!(id.value(0), 123);
3626    }
3627
3628    #[test]
3629    fn test_skip_writer_nullable_field_union_nullfirst() {
3630        let mut dec = make_record_resolved_decoder(
3631            &[("id", DataType::Int32, false)],
3632            vec![None, Some(0)],
3633            vec![
3634                Some(super::Skipper::Nullable(
3635                    Nullability::NullFirst,
3636                    Box::new(super::Skipper::Int32),
3637                )),
3638                None,
3639            ],
3640        );
3641        let mut row1 = Vec::new();
3642        row1.extend_from_slice(&encode_avro_long(0));
3643        row1.extend_from_slice(&encode_avro_int(5));
3644        let mut row2 = Vec::new();
3645        row2.extend_from_slice(&encode_avro_long(1));
3646        row2.extend_from_slice(&encode_avro_int(123));
3647        row2.extend_from_slice(&encode_avro_int(7));
3648        let mut cur1 = AvroCursor::new(&row1);
3649        let mut cur2 = AvroCursor::new(&row2);
3650        dec.decode(&mut cur1).unwrap();
3651        dec.decode(&mut cur2).unwrap();
3652        assert_eq!(cur1.position(), row1.len());
3653        assert_eq!(cur2.position(), row2.len());
3654        let arr = dec.flush(None).unwrap();
3655        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3656        let id = s
3657            .column_by_name("id")
3658            .unwrap()
3659            .as_any()
3660            .downcast_ref::<Int32Array>()
3661            .unwrap();
3662        assert_eq!(id.len(), 2);
3663        assert_eq!(id.value(0), 5);
3664        assert_eq!(id.value(1), 7);
3665    }
3666
3667    fn make_dense_union_avro(
3668        children: Vec<(Codec, &'_ str, DataType)>,
3669        type_ids: Vec<i8>,
3670    ) -> AvroDataType {
3671        let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
3672        let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
3673        for (codec, name, dt) in children.into_iter() {
3674            avro_children.push(AvroDataType::new(codec, Default::default(), None));
3675            fields.push(arrow_schema::Field::new(name, dt, true));
3676        }
3677        let union_fields = UnionFields::new(type_ids, fields);
3678        let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
3679        AvroDataType::new(union_codec, Default::default(), None)
3680    }
3681
3682    #[test]
3683    fn test_union_dense_two_children_custom_type_ids() {
3684        let union_dt = make_dense_union_avro(
3685            vec![
3686                (Codec::Int32, "i", DataType::Int32),
3687                (Codec::Utf8, "s", DataType::Utf8),
3688            ],
3689            vec![2, 5],
3690        );
3691        let mut dec = Decoder::try_new(&union_dt).unwrap();
3692        let mut r1 = Vec::new();
3693        r1.extend_from_slice(&encode_avro_long(0));
3694        r1.extend_from_slice(&encode_avro_int(7));
3695        let mut r2 = Vec::new();
3696        r2.extend_from_slice(&encode_avro_long(1));
3697        r2.extend_from_slice(&encode_avro_bytes(b"x"));
3698        let mut r3 = Vec::new();
3699        r3.extend_from_slice(&encode_avro_long(0));
3700        r3.extend_from_slice(&encode_avro_int(-1));
3701        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3702        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3703        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3704        let array = dec.flush(None).unwrap();
3705        let ua = array
3706            .as_any()
3707            .downcast_ref::<UnionArray>()
3708            .expect("expected UnionArray");
3709        assert_eq!(ua.len(), 3);
3710        assert_eq!(ua.type_id(0), 2);
3711        assert_eq!(ua.type_id(1), 5);
3712        assert_eq!(ua.type_id(2), 2);
3713        assert_eq!(ua.value_offset(0), 0);
3714        assert_eq!(ua.value_offset(1), 0);
3715        assert_eq!(ua.value_offset(2), 1);
3716        let int_child = ua
3717            .child(2)
3718            .as_any()
3719            .downcast_ref::<Int32Array>()
3720            .expect("int child");
3721        assert_eq!(int_child.len(), 2);
3722        assert_eq!(int_child.value(0), 7);
3723        assert_eq!(int_child.value(1), -1);
3724        let str_child = ua
3725            .child(5)
3726            .as_any()
3727            .downcast_ref::<StringArray>()
3728            .expect("string child");
3729        assert_eq!(str_child.len(), 1);
3730        assert_eq!(str_child.value(0), "x");
3731    }
3732
3733    #[test]
3734    fn test_union_dense_with_null_and_string_children() {
3735        let union_dt = make_dense_union_avro(
3736            vec![
3737                (Codec::Null, "n", DataType::Null),
3738                (Codec::Utf8, "s", DataType::Utf8),
3739            ],
3740            vec![42, 7],
3741        );
3742        let mut dec = Decoder::try_new(&union_dt).unwrap();
3743        let r1 = encode_avro_long(0);
3744        let mut r2 = Vec::new();
3745        r2.extend_from_slice(&encode_avro_long(1));
3746        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
3747        let r3 = encode_avro_long(0);
3748        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3749        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3750        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3751        let array = dec.flush(None).unwrap();
3752        let ua = array
3753            .as_any()
3754            .downcast_ref::<UnionArray>()
3755            .expect("expected UnionArray");
3756        assert_eq!(ua.len(), 3);
3757        assert_eq!(ua.type_id(0), 42);
3758        assert_eq!(ua.type_id(1), 7);
3759        assert_eq!(ua.type_id(2), 42);
3760        assert_eq!(ua.value_offset(0), 0);
3761        assert_eq!(ua.value_offset(1), 0);
3762        assert_eq!(ua.value_offset(2), 1);
3763        let null_child = ua
3764            .child(42)
3765            .as_any()
3766            .downcast_ref::<NullArray>()
3767            .expect("null child");
3768        assert_eq!(null_child.len(), 2);
3769        let str_child = ua
3770            .child(7)
3771            .as_any()
3772            .downcast_ref::<StringArray>()
3773            .expect("string child");
3774        assert_eq!(str_child.len(), 1);
3775        assert_eq!(str_child.value(0), "abc");
3776    }
3777
3778    #[test]
3779    fn test_union_decode_negative_branch_index_errors() {
3780        let union_dt = make_dense_union_avro(
3781            vec![
3782                (Codec::Int32, "i", DataType::Int32),
3783                (Codec::Utf8, "s", DataType::Utf8),
3784            ],
3785            vec![0, 1],
3786        );
3787        let mut dec = Decoder::try_new(&union_dt).unwrap();
3788        let row = encode_avro_long(-1); // decodes back to -1
3789        let err = dec
3790            .decode(&mut AvroCursor::new(&row))
3791            .expect_err("expected error for negative branch index");
3792        let msg = err.to_string();
3793        assert!(
3794            msg.contains("Negative union branch index"),
3795            "unexpected error message: {msg}"
3796        );
3797    }
3798
3799    #[test]
3800    fn test_union_decode_out_of_range_branch_index_errors() {
3801        let union_dt = make_dense_union_avro(
3802            vec![
3803                (Codec::Int32, "i", DataType::Int32),
3804                (Codec::Utf8, "s", DataType::Utf8),
3805            ],
3806            vec![10, 11],
3807        );
3808        let mut dec = Decoder::try_new(&union_dt).unwrap();
3809        let row = encode_avro_long(2);
3810        let err = dec
3811            .decode(&mut AvroCursor::new(&row))
3812            .expect_err("expected error for out-of-range branch index");
3813        let msg = err.to_string();
3814        assert!(
3815            msg.contains("out of range"),
3816            "unexpected error message: {msg}"
3817        );
3818    }
3819
3820    #[test]
3821    fn test_union_sparse_mode_not_supported() {
3822        let children: Vec<AvroDataType> = vec![
3823            AvroDataType::new(Codec::Int32, Default::default(), None),
3824            AvroDataType::new(Codec::Utf8, Default::default(), None),
3825        ];
3826        let uf = UnionFields::new(
3827            vec![1, 3],
3828            vec![
3829                arrow_schema::Field::new("i", DataType::Int32, true),
3830                arrow_schema::Field::new("s", DataType::Utf8, true),
3831            ],
3832        );
3833        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
3834        let dt = AvroDataType::new(codec, Default::default(), None);
3835        let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
3836        let msg = err.to_string();
3837        assert!(
3838            msg.contains("Sparse Arrow unions are not yet supported"),
3839            "unexpected error message: {msg}"
3840        );
3841    }
3842
3843    fn make_record_decoder_with_projector_defaults(
3844        reader_fields: &[(&str, DataType, bool)],
3845        field_defaults: Vec<Option<AvroLiteral>>,
3846        default_injections: Vec<(usize, AvroLiteral)>,
3847        writer_to_reader_len: usize,
3848    ) -> Decoder {
3849        assert_eq!(
3850            field_defaults.len(),
3851            reader_fields.len(),
3852            "field_defaults must have one entry per reader field"
3853        );
3854        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3855        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3856        for (name, dt, nullable) in reader_fields {
3857            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3858            let enc = match dt {
3859                DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
3860                DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
3861                DataType::Utf8 => Decoder::String(
3862                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3863                    Vec::with_capacity(DEFAULT_CAPACITY),
3864                ),
3865                other => panic!("Unsupported test field type in helper: {other:?}"),
3866            };
3867            encodings.push(enc);
3868        }
3869        let fields: Fields = field_refs.into();
3870        let skip_decoders: Vec<Option<Skipper>> =
3871            (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
3872        let projector = Projector {
3873            writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
3874            skip_decoders,
3875            field_defaults,
3876            default_injections: Arc::from(default_injections),
3877        };
3878        Decoder::Record(fields, encodings, Some(projector))
3879    }
3880
3881    #[test]
3882    fn test_default_append_int32_and_int64_from_int_and_long() {
3883        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3884        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
3885        let arr = d_i32.flush(None).unwrap();
3886        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3887        assert_eq!(a.len(), 1);
3888        assert_eq!(a.value(0), 42);
3889        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
3890        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
3891        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
3892        let arr64 = d_i64.flush(None).unwrap();
3893        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
3894        assert_eq!(a64.len(), 2);
3895        assert_eq!(a64.value(0), 5);
3896        assert_eq!(a64.value(1), 7);
3897    }
3898
3899    #[test]
3900    fn test_default_append_floats_and_doubles() {
3901        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
3902        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
3903        let arr32 = d_f32.flush(None).unwrap();
3904        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
3905        assert_eq!(a.value(0), 1.5);
3906        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
3907        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
3908        let arr64 = d_f64.flush(None).unwrap();
3909        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
3910        assert_eq!(b.value(0), 2.25);
3911    }
3912
3913    #[test]
3914    fn test_default_append_string_and_bytes() {
3915        let mut d_str = Decoder::String(
3916            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3917            Vec::with_capacity(DEFAULT_CAPACITY),
3918        );
3919        d_str
3920            .append_default(&AvroLiteral::String("hi".into()))
3921            .unwrap();
3922        let s_arr = d_str.flush(None).unwrap();
3923        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
3924        assert_eq!(arr.value(0), "hi");
3925        let mut d_bytes = Decoder::Binary(
3926            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3927            Vec::with_capacity(DEFAULT_CAPACITY),
3928        );
3929        d_bytes
3930            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3931            .unwrap();
3932        let b_arr = d_bytes.flush(None).unwrap();
3933        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3934        assert_eq!(barr.value(0), &[1, 2, 3]);
3935        let mut d_str_err = Decoder::String(
3936            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3937            Vec::with_capacity(DEFAULT_CAPACITY),
3938        );
3939        let err = d_str_err
3940            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
3941            .unwrap_err();
3942        assert!(
3943            err.to_string()
3944                .contains("Default for string must be string"),
3945            "unexpected error: {err:?}"
3946        );
3947    }
3948
3949    #[test]
3950    fn test_default_append_nullable_int32_null_and_value() {
3951        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3952        let mut dec = Decoder::Nullable(
3953            Nullability::NullFirst,
3954            NullBufferBuilder::new(DEFAULT_CAPACITY),
3955            Box::new(inner),
3956            NullablePlan::ReadTag,
3957        );
3958        dec.append_default(&AvroLiteral::Null).unwrap();
3959        dec.append_default(&AvroLiteral::Int(11)).unwrap();
3960        let arr = dec.flush(None).unwrap();
3961        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3962        assert_eq!(a.len(), 2);
3963        assert!(a.is_null(0));
3964        assert_eq!(a.value(1), 11);
3965    }
3966
3967    #[test]
3968    fn test_default_append_array_of_ints() {
3969        let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3970        let mut d = Decoder::try_new(&list_dt).unwrap();
3971        let items = vec![
3972            AvroLiteral::Int(1),
3973            AvroLiteral::Int(2),
3974            AvroLiteral::Int(3),
3975        ];
3976        d.append_default(&AvroLiteral::Array(items)).unwrap();
3977        let arr = d.flush(None).unwrap();
3978        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
3979        assert_eq!(list.len(), 1);
3980        assert_eq!(list.value_length(0), 3);
3981        let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
3982        assert_eq!(vals.values(), &[1, 2, 3]);
3983    }
3984
3985    #[test]
3986    fn test_default_append_map_string_to_int() {
3987        let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
3988        let mut d = Decoder::try_new(&map_dt).unwrap();
3989        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
3990        m.insert("k1".to_string(), AvroLiteral::Int(10));
3991        m.insert("k2".to_string(), AvroLiteral::Int(20));
3992        d.append_default(&AvroLiteral::Map(m)).unwrap();
3993        let arr = d.flush(None).unwrap();
3994        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
3995        assert_eq!(map.len(), 1);
3996        assert_eq!(map.value_length(0), 2);
3997        let binding = map.value(0);
3998        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
3999        let k = entries
4000            .column_by_name("key")
4001            .unwrap()
4002            .as_any()
4003            .downcast_ref::<StringArray>()
4004            .unwrap();
4005        let v = entries
4006            .column_by_name("value")
4007            .unwrap()
4008            .as_any()
4009            .downcast_ref::<Int32Array>()
4010            .unwrap();
4011        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4012        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4013        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4014        assert_eq!(vals, [10, 20].into_iter().collect());
4015    }
4016
4017    #[test]
4018    fn test_default_append_enum_by_symbol() {
4019        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4020        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4021        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4022        let arr = d.flush(None).unwrap();
4023        let dict = arr
4024            .as_any()
4025            .downcast_ref::<DictionaryArray<Int32Type>>()
4026            .unwrap();
4027        assert_eq!(dict.len(), 1);
4028        let expected = Int32Array::from(vec![1]);
4029        assert_eq!(dict.keys(), &expected);
4030        let values = dict
4031            .values()
4032            .as_any()
4033            .downcast_ref::<StringArray>()
4034            .unwrap();
4035        assert_eq!(values.value(1), "B");
4036    }
4037
4038    #[test]
4039    fn test_default_append_uuid_and_type_error() {
4040        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4041        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4042        d.append_default(&AvroLiteral::String(uuid_str.into()))
4043            .unwrap();
4044        let arr_ref = d.flush(None).unwrap();
4045        let arr = arr_ref
4046            .as_any()
4047            .downcast_ref::<FixedSizeBinaryArray>()
4048            .unwrap();
4049        assert_eq!(arr.value_length(), 16);
4050        assert_eq!(arr.len(), 1);
4051        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4052        let err = d2
4053            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4054            .unwrap_err();
4055        assert!(
4056            err.to_string().contains("Default for uuid must be string"),
4057            "unexpected error: {err:?}"
4058        );
4059    }
4060
4061    #[test]
4062    fn test_default_append_fixed_and_length_mismatch() {
4063        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4064        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4065            .unwrap();
4066        let arr_ref = d.flush(None).unwrap();
4067        let arr = arr_ref
4068            .as_any()
4069            .downcast_ref::<FixedSizeBinaryArray>()
4070            .unwrap();
4071        assert_eq!(arr.value_length(), 4);
4072        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4073        let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4074        let err = d_err
4075            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4076            .unwrap_err();
4077        assert!(
4078            err.to_string().contains("Fixed default length"),
4079            "unexpected error: {err:?}"
4080        );
4081    }
4082
4083    #[test]
4084    fn test_default_append_duration_and_length_validation() {
4085        let dt = avro_from_codec(Codec::Interval);
4086        let mut d = Decoder::try_new(&dt).unwrap();
4087        let mut bytes = Vec::with_capacity(12);
4088        bytes.extend_from_slice(&1u32.to_le_bytes());
4089        bytes.extend_from_slice(&2u32.to_le_bytes());
4090        bytes.extend_from_slice(&3u32.to_le_bytes());
4091        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4092        let arr_ref = d.flush(None).unwrap();
4093        let arr = arr_ref
4094            .as_any()
4095            .downcast_ref::<IntervalMonthDayNanoArray>()
4096            .unwrap();
4097        assert_eq!(arr.len(), 1);
4098        let v = arr.value(0);
4099        assert_eq!(v.months, 1);
4100        assert_eq!(v.days, 2);
4101        assert_eq!(v.nanoseconds, 3_000_000);
4102        let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4103        let err = d_err
4104            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4105            .unwrap_err();
4106        assert!(
4107            err.to_string()
4108                .contains("Duration default must be exactly 12 bytes"),
4109            "unexpected error: {err:?}"
4110        );
4111    }
4112
4113    #[test]
4114    fn test_default_append_decimal256_from_bytes() {
4115        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4116        let mut d = Decoder::try_new(&dt).unwrap();
4117        let pos: [u8; 32] = [
4118            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4119            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4120            0x00, 0x00, 0x30, 0x39,
4121        ];
4122        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4123        let neg: [u8; 32] = [
4124            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4125            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4126            0xFF, 0xFF, 0xFF, 0x85,
4127        ];
4128        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4129        let arr = d.flush(None).unwrap();
4130        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4131        assert_eq!(dec.len(), 2);
4132        assert_eq!(dec.value_as_string(0), "123.45");
4133        assert_eq!(dec.value_as_string(1), "-1.23");
4134    }
4135
4136    #[test]
4137    fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4138        let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4139        let mut rec = make_record_decoder_with_projector_defaults(
4140            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4141            field_defaults,
4142            vec![],
4143            0,
4144        );
4145        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4146        map.insert("a".to_string(), AvroLiteral::Int(7));
4147        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4148        let arr = rec.flush(None).unwrap();
4149        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4150        let a = s
4151            .column_by_name("a")
4152            .unwrap()
4153            .as_any()
4154            .downcast_ref::<Int32Array>()
4155            .unwrap();
4156        let b = s
4157            .column_by_name("b")
4158            .unwrap()
4159            .as_any()
4160            .downcast_ref::<StringArray>()
4161            .unwrap();
4162        assert_eq!(a.value(0), 7);
4163        assert_eq!(b.value(0), "hi");
4164    }
4165
4166    #[test]
4167    fn test_record_append_default_null_uses_projector_field_defaults() {
4168        let field_defaults = vec![
4169            Some(AvroLiteral::Int(5)),
4170            Some(AvroLiteral::String("x".into())),
4171        ];
4172        let mut rec = make_record_decoder_with_projector_defaults(
4173            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4174            field_defaults,
4175            vec![],
4176            0,
4177        );
4178        rec.append_default(&AvroLiteral::Null).unwrap();
4179        let arr = rec.flush(None).unwrap();
4180        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4181        let a = s
4182            .column_by_name("a")
4183            .unwrap()
4184            .as_any()
4185            .downcast_ref::<Int32Array>()
4186            .unwrap();
4187        let b = s
4188            .column_by_name("b")
4189            .unwrap()
4190            .as_any()
4191            .downcast_ref::<StringArray>()
4192            .unwrap();
4193        assert_eq!(a.value(0), 5);
4194        assert_eq!(b.value(0), "x");
4195    }
4196
4197    #[test]
4198    fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
4199     {
4200        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
4201        let mut field_refs: Vec<FieldRef> = Vec::new();
4202        let mut encoders: Vec<Decoder> = Vec::new();
4203        for (name, dt, nullable) in &fields {
4204            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4205        }
4206        let enc_a = Decoder::Nullable(
4207            Nullability::NullSecond,
4208            NullBufferBuilder::new(DEFAULT_CAPACITY),
4209            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
4210            NullablePlan::ReadTag,
4211        );
4212        let enc_b = Decoder::Nullable(
4213            Nullability::NullSecond,
4214            NullBufferBuilder::new(DEFAULT_CAPACITY),
4215            Box::new(Decoder::String(
4216                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4217                Vec::with_capacity(DEFAULT_CAPACITY),
4218            )),
4219            NullablePlan::ReadTag,
4220        );
4221        encoders.push(enc_a);
4222        encoders.push(enc_b);
4223        let projector = Projector {
4224            writer_to_reader: Arc::from(vec![]),
4225            skip_decoders: vec![],
4226            field_defaults: vec![None, None], // no defaults -> append_null
4227            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4228        };
4229        let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
4230        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4231        map.insert("a".to_string(), AvroLiteral::Int(9));
4232        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4233        let arr = rec.flush(None).unwrap();
4234        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4235        let a = s
4236            .column_by_name("a")
4237            .unwrap()
4238            .as_any()
4239            .downcast_ref::<Int32Array>()
4240            .unwrap();
4241        let b = s
4242            .column_by_name("b")
4243            .unwrap()
4244            .as_any()
4245            .downcast_ref::<StringArray>()
4246            .unwrap();
4247        assert!(a.is_valid(0));
4248        assert_eq!(a.value(0), 9);
4249        assert!(b.is_null(0));
4250    }
4251
4252    #[test]
4253    fn test_projector_default_injection_when_writer_lacks_fields() {
4254        let defaults = vec![None, None];
4255        let injections = vec![
4256            (0, AvroLiteral::Int(99)),
4257            (1, AvroLiteral::String("alice".into())),
4258        ];
4259        let mut rec = make_record_decoder_with_projector_defaults(
4260            &[
4261                ("id", DataType::Int32, false),
4262                ("name", DataType::Utf8, false),
4263            ],
4264            defaults,
4265            injections,
4266            0,
4267        );
4268        rec.decode(&mut AvroCursor::new(&[])).unwrap();
4269        let arr = rec.flush(None).unwrap();
4270        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4271        let id = s
4272            .column_by_name("id")
4273            .unwrap()
4274            .as_any()
4275            .downcast_ref::<Int32Array>()
4276            .unwrap();
4277        let name = s
4278            .column_by_name("name")
4279            .unwrap()
4280            .as_any()
4281            .downcast_ref::<StringArray>()
4282            .unwrap();
4283        assert_eq!(id.value(0), 99);
4284        assert_eq!(name.value(0), "alice");
4285    }
4286
4287    #[test]
4288    fn union_type_ids_are_not_child_indexes() {
4289        let encodings: Vec<AvroDataType> =
4290            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4291        let fields: UnionFields = [
4292            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4293            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4294        ]
4295        .into_iter()
4296        .collect();
4297        let dt = avro_from_codec(Codec::Union(
4298            encodings.into(),
4299            fields.clone(),
4300            UnionMode::Dense,
4301        ));
4302        let mut dec = Decoder::try_new(&dt).expect("decoder");
4303        let mut b1 = encode_avro_long(1);
4304        b1.extend(encode_avro_bytes("hi".as_bytes()));
4305        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4306        let mut b0 = encode_avro_long(0);
4307        b0.extend(encode_avro_int(5));
4308        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4309        let arr = dec.flush(None).expect("flush");
4310        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4311        assert_eq!(ua.len(), 2);
4312        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4313        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4314        assert_eq!(ua.value_offset(0), 0);
4315        assert_eq!(ua.value_offset(1), 0);
4316        let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4317        assert_eq!(utf8_child.len(), 1);
4318        assert_eq!(utf8_child.value(0), "hi");
4319        let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
4320        assert_eq!(int_child.len(), 1);
4321        assert_eq!(int_child.value(0), 5);
4322        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
4323        assert_eq!(type_ids, vec![42_i8, 7_i8]);
4324    }
4325
4326    #[cfg(feature = "avro_custom_types")]
4327    #[test]
4328    fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), ArrowError> {
4329        for codec in [
4330            Codec::DurationNanos,
4331            Codec::DurationMicros,
4332            Codec::DurationMillis,
4333            Codec::DurationSeconds,
4334        ] {
4335            let dt = make_avro_dt(codec.clone(), None);
4336            let s = Skipper::from_avro(&dt)?;
4337            match s {
4338                Skipper::Int64 => {}
4339                other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
4340            }
4341        }
4342        Ok(())
4343    }
4344
4345    #[cfg(feature = "avro_custom_types")]
4346    #[test]
4347    fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), ArrowError> {
4348        let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
4349        for codec in [
4350            Codec::DurationNanos,
4351            Codec::DurationMicros,
4352            Codec::DurationMillis,
4353            Codec::DurationSeconds,
4354        ] {
4355            let dt = make_avro_dt(codec.clone(), None);
4356            let mut s = Skipper::from_avro(&dt)?;
4357            for &v in &values {
4358                let bytes = encode_avro_long(v);
4359                let mut cursor = AvroCursor::new(&bytes);
4360                s.skip(&mut cursor)?;
4361                assert_eq!(
4362                    cursor.position(),
4363                    bytes.len(),
4364                    "did not consume all bytes for {:?} value {}",
4365                    codec,
4366                    v
4367                );
4368            }
4369        }
4370        Ok(())
4371    }
4372
4373    #[cfg(feature = "avro_custom_types")]
4374    #[test]
4375    fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), ArrowError> {
4376        let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
4377        let mut s = Skipper::from_avro(&dt)?;
4378        match &s {
4379            Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
4380                Skipper::Int64 => {}
4381                ref other => panic!("expected inner Int64, got {:?}", other),
4382            },
4383            other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
4384        }
4385        {
4386            let buf = encode_vlq_u64(0);
4387            let mut cursor = AvroCursor::new(&buf);
4388            s.skip(&mut cursor)?;
4389            assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
4390        }
4391        {
4392            let mut buf = encode_vlq_u64(1);
4393            buf.extend(encode_avro_long(0));
4394            let mut cursor = AvroCursor::new(&buf);
4395            s.skip(&mut cursor)?;
4396            assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
4397        }
4398
4399        Ok(())
4400    }
4401
4402    #[cfg(feature = "avro_custom_types")]
4403    #[test]
4404    fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), ArrowError> {
4405        let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
4406        let mut s = Skipper::from_avro(&dt)?;
4407        match &s {
4408            Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
4409                Skipper::Int64 => {}
4410                ref other => panic!("expected inner Int64, got {:?}", other),
4411            },
4412            other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
4413        }
4414        {
4415            let buf = encode_vlq_u64(1);
4416            let mut cursor = AvroCursor::new(&buf);
4417            s.skip(&mut cursor)?;
4418            assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
4419        }
4420        {
4421            let mut buf = encode_vlq_u64(0);
4422            buf.extend(encode_avro_long(-1));
4423            let mut cursor = AvroCursor::new(&buf);
4424            s.skip(&mut cursor)?;
4425            assert_eq!(
4426                cursor.position(),
4427                1 + encode_avro_long(-1).len(),
4428                "expected to consume tag=0 + long(-1)"
4429            );
4430        }
4431        Ok(())
4432    }
4433
4434    #[test]
4435    fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), ArrowError> {
4436        let dt = make_avro_dt(Codec::Interval, None);
4437        let mut s = Skipper::from_avro(&dt)?;
4438        match s {
4439            Skipper::DurationFixed12 => {}
4440            other => panic!("expected DurationFixed12, got {:?}", other),
4441        }
4442        let payload = vec![0u8; 12];
4443        let mut cursor = AvroCursor::new(&payload);
4444        s.skip(&mut cursor)?;
4445        assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
4446        Ok(())
4447    }
4448
4449    #[cfg(feature = "avro_custom_types")]
4450    #[test]
4451    fn test_run_end_encoded_width16_int32_basic_grouping() {
4452        use arrow_array::RunArray;
4453        use std::sync::Arc;
4454        let inner = avro_from_codec(Codec::Int32);
4455        let ree = AvroDataType::new(
4456            Codec::RunEndEncoded(Arc::new(inner), 16),
4457            Default::default(),
4458            None,
4459        );
4460        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4461        for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
4462            let bytes = encode_avro_int(v);
4463            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4464        }
4465        let arr = dec.flush(None).expect("flush");
4466        let ra = arr
4467            .as_any()
4468            .downcast_ref::<RunArray<Int16Type>>()
4469            .expect("RunArray<Int16Type>");
4470        assert_eq!(ra.len(), 9);
4471        assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
4472        let vals = ra
4473            .values()
4474            .as_ref()
4475            .as_any()
4476            .downcast_ref::<Int32Array>()
4477            .expect("values Int32");
4478        assert_eq!(vals.values(), &[1, 2, 3]);
4479    }
4480
4481    #[cfg(feature = "avro_custom_types")]
4482    #[test]
4483    fn test_run_end_encoded_width32_nullable_values_group_nulls() {
4484        use arrow_array::RunArray;
4485        use std::sync::Arc;
4486        let inner = AvroDataType::new(
4487            Codec::Int32,
4488            Default::default(),
4489            Some(Nullability::NullSecond),
4490        );
4491        let ree = AvroDataType::new(
4492            Codec::RunEndEncoded(Arc::new(inner), 32),
4493            Default::default(),
4494            None,
4495        );
4496        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4497        let seq: [Option<i32>; 8] = [
4498            None,
4499            None,
4500            Some(7),
4501            Some(7),
4502            Some(7),
4503            None,
4504            Some(5),
4505            Some(5),
4506        ];
4507        for item in seq {
4508            let mut bytes = Vec::new();
4509            match item {
4510                None => bytes.extend_from_slice(&encode_vlq_u64(1)),
4511                Some(v) => {
4512                    bytes.extend_from_slice(&encode_vlq_u64(0));
4513                    bytes.extend_from_slice(&encode_avro_int(v));
4514                }
4515            }
4516            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4517        }
4518        let arr = dec.flush(None).expect("flush");
4519        let ra = arr
4520            .as_any()
4521            .downcast_ref::<RunArray<Int32Type>>()
4522            .expect("RunArray<Int32Type>");
4523        assert_eq!(ra.len(), 8);
4524        assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
4525        let vals = ra
4526            .values()
4527            .as_ref()
4528            .as_any()
4529            .downcast_ref::<Int32Array>()
4530            .expect("values Int32 (nullable)");
4531        assert_eq!(vals.len(), 4);
4532        assert!(vals.is_null(0));
4533        assert_eq!(vals.value(1), 7);
4534        assert!(vals.is_null(2));
4535        assert_eq!(vals.value(3), 5);
4536    }
4537
4538    #[cfg(feature = "avro_custom_types")]
4539    #[test]
4540    fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
4541        use arrow_array::RunArray;
4542        let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4543        let ree = Decoder::RunEndEncoded(
4544            8, /* bytes => Int64 run-ends */
4545            0,
4546            Box::new(inner_values),
4547        );
4548        let mut dec = Decoder::Nullable(
4549            Nullability::NullSecond,
4550            NullBufferBuilder::new(DEFAULT_CAPACITY),
4551            Box::new(ree),
4552            NullablePlan::FromSingle {
4553                promotion: Promotion::IntToDouble,
4554            },
4555        );
4556        for v in [1, 1, 2, 2, 2] {
4557            let bytes = encode_avro_int(v);
4558            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4559        }
4560        let arr = dec.flush(None).expect("flush");
4561        let ra = arr
4562            .as_any()
4563            .downcast_ref::<RunArray<Int64Type>>()
4564            .expect("RunArray<Int64Type>");
4565        assert_eq!(ra.len(), 5);
4566        assert_eq!(ra.run_ends().values(), &[2, 5]);
4567        let vals = ra
4568            .values()
4569            .as_ref()
4570            .as_any()
4571            .downcast_ref::<Float64Array>()
4572            .expect("values Float64");
4573        assert_eq!(vals.values(), &[1.0, 2.0]);
4574    }
4575
4576    #[cfg(feature = "avro_custom_types")]
4577    #[test]
4578    fn test_run_end_encoded_unsupported_run_end_width_errors() {
4579        use std::sync::Arc;
4580        let inner = avro_from_codec(Codec::Int32);
4581        let dt = AvroDataType::new(
4582            Codec::RunEndEncoded(Arc::new(inner), 3),
4583            Default::default(),
4584            None,
4585        );
4586        let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
4587        let msg = err.to_string();
4588        assert!(
4589            msg.contains("Unsupported run-end width")
4590                && msg.contains("16/32/64 bits or 2/4/8 bytes"),
4591            "unexpected error message: {msg}"
4592        );
4593    }
4594
4595    #[cfg(feature = "avro_custom_types")]
4596    #[test]
4597    fn test_run_end_encoded_empty_input_is_empty_runarray() {
4598        use arrow_array::RunArray;
4599        use std::sync::Arc;
4600        let inner = avro_from_codec(Codec::Utf8);
4601        let dt = AvroDataType::new(
4602            Codec::RunEndEncoded(Arc::new(inner), 4),
4603            Default::default(),
4604            None,
4605        );
4606        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4607        let arr = dec.flush(None).expect("flush");
4608        let ra = arr
4609            .as_any()
4610            .downcast_ref::<RunArray<Int32Type>>()
4611            .expect("RunArray<Int32Type>");
4612        assert_eq!(ra.len(), 0);
4613        assert_eq!(ra.run_ends().len(), 0);
4614        assert_eq!(ra.values().len(), 0);
4615    }
4616
4617    #[cfg(feature = "avro_custom_types")]
4618    #[test]
4619    fn test_run_end_encoded_strings_grouping_width32_bits() {
4620        use arrow_array::RunArray;
4621        use std::sync::Arc;
4622        let inner = avro_from_codec(Codec::Utf8);
4623        let dt = AvroDataType::new(
4624            Codec::RunEndEncoded(Arc::new(inner), 32),
4625            Default::default(),
4626            None,
4627        );
4628        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4629        for s in ["a", "a", "bb", "bb", "bb", "a"] {
4630            let bytes = encode_avro_bytes(s.as_bytes());
4631            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4632        }
4633        let arr = dec.flush(None).expect("flush");
4634        let ra = arr
4635            .as_any()
4636            .downcast_ref::<RunArray<Int32Type>>()
4637            .expect("RunArray<Int32Type>");
4638        assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
4639        let vals = ra
4640            .values()
4641            .as_ref()
4642            .as_any()
4643            .downcast_ref::<StringArray>()
4644            .expect("values String");
4645        assert_eq!(vals.len(), 3);
4646        assert_eq!(vals.value(0), "a");
4647        assert_eq!(vals.value(1), "bb");
4648        assert_eq!(vals.value(2), "a");
4649    }
4650
4651    #[cfg(not(feature = "avro_custom_types"))]
4652    #[test]
4653    fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
4654        let dt = avro_from_codec(Codec::Int32);
4655        let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
4656        for v in [1, 2, 3] {
4657            let bytes = encode_avro_int(v);
4658            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4659        }
4660        let arr = dec.flush(None).expect("flush");
4661        let a = arr
4662            .as_any()
4663            .downcast_ref::<Int32Array>()
4664            .expect("Int32Array");
4665        assert_eq!(a.values(), &[1, 2, 3]);
4666    }
4667
4668    #[test]
4669    fn test_timestamp_nanos_decoding_utc() {
4670        let avro_type = avro_from_codec(Codec::TimestampNanos(true));
4671        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4672        let mut data = Vec::new();
4673        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
4674            data.extend_from_slice(&encode_avro_long(v));
4675        }
4676        let mut cur = AvroCursor::new(&data);
4677        for _ in 0..4 {
4678            decoder.decode(&mut cur).expect("decode nanos ts");
4679        }
4680        let array = decoder.flush(None).expect("flush nanos ts");
4681        let ts = array
4682            .as_any()
4683            .downcast_ref::<TimestampNanosecondArray>()
4684            .expect("TimestampNanosecondArray");
4685        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
4686        match ts.data_type() {
4687            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4688                assert_eq!(tz.as_deref(), Some("+00:00"));
4689            }
4690            other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
4691        }
4692    }
4693
4694    #[test]
4695    fn test_timestamp_nanos_decoding_local() {
4696        let avro_type = avro_from_codec(Codec::TimestampNanos(false));
4697        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4698        let mut data = Vec::new();
4699        for v in [10_i64, 20_i64, -30_i64] {
4700            data.extend_from_slice(&encode_avro_long(v));
4701        }
4702        let mut cur = AvroCursor::new(&data);
4703        for _ in 0..3 {
4704            decoder.decode(&mut cur).expect("decode nanos ts");
4705        }
4706        let array = decoder.flush(None).expect("flush nanos ts");
4707        let ts = array
4708            .as_any()
4709            .downcast_ref::<TimestampNanosecondArray>()
4710            .expect("TimestampNanosecondArray");
4711        assert_eq!(ts.values(), &[10, 20, -30]);
4712        match ts.data_type() {
4713            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4714                assert_eq!(tz.as_deref(), None);
4715            }
4716            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4717        }
4718    }
4719
4720    #[test]
4721    fn test_timestamp_nanos_decoding_with_nulls() {
4722        let avro_type = AvroDataType::new(
4723            Codec::TimestampNanos(false),
4724            Default::default(),
4725            Some(Nullability::NullFirst),
4726        );
4727        let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
4728        let mut data = Vec::new();
4729        data.extend_from_slice(&encode_avro_long(1));
4730        data.extend_from_slice(&encode_avro_long(42));
4731        data.extend_from_slice(&encode_avro_long(0));
4732        data.extend_from_slice(&encode_avro_long(1));
4733        data.extend_from_slice(&encode_avro_long(-7));
4734        let mut cur = AvroCursor::new(&data);
4735        for _ in 0..3 {
4736            decoder.decode(&mut cur).expect("decode nullable nanos ts");
4737        }
4738        let array = decoder.flush(None).expect("flush nullable nanos ts");
4739        let ts = array
4740            .as_any()
4741            .downcast_ref::<TimestampNanosecondArray>()
4742            .expect("TimestampNanosecondArray");
4743        assert_eq!(ts.len(), 3);
4744        assert!(ts.is_valid(0));
4745        assert!(ts.is_null(1));
4746        assert!(ts.is_valid(2));
4747        assert_eq!(ts.value(0), 42);
4748        assert_eq!(ts.value(2), -7);
4749        match ts.data_type() {
4750            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4751                assert_eq!(tz.as_deref(), None);
4752            }
4753            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4754        }
4755    }
4756}