arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Decoder for Arrow types.
19
20use crate::codec::{
21    AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
22    ResolvedUnion,
23};
24use crate::reader::cursor::AvroCursor;
25use crate::schema::Nullability;
26#[cfg(feature = "small_decimals")]
27use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
28use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
29use arrow_array::types::*;
30use arrow_array::*;
31use arrow_buffer::*;
32use arrow_schema::{
33    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField,
34    FieldRef, Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
35};
36#[cfg(feature = "small_decimals")]
37use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
38#[cfg(feature = "avro_custom_types")]
39use arrow_select::take::{TakeOptions, take};
40use std::cmp::Ordering;
41use std::sync::Arc;
42use strum_macros::AsRefStr;
43use uuid::Uuid;
44
45const DEFAULT_CAPACITY: usize = 1024;
46
47/// Runtime plan for decoding reader-side `["null", T]` types.
48#[derive(Clone, Copy, Debug)]
49enum NullablePlan {
50    /// Writer actually wrote a union (branch tag present).
51    ReadTag,
52    /// Writer wrote a single (non-union) value resolved to the non-null branch
53    /// of the reader union; do NOT read a branch tag, but apply any promotion.
54    FromSingle { promotion: Promotion },
55}
56
57/// Macro to decode a decimal payload for a given width and integer type.
58macro_rules! decode_decimal {
59    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
60        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
61        $builder.append_value(<$Int>::from_be_bytes(bytes));
62    }};
63}
64
65/// Macro to finish a decimal builder into an array with precision/scale and nulls.
66macro_rules! flush_decimal {
67    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
68        let (_, vals, _) = $builder.finish().into_parts();
69        let dec = <$ArrayTy>::try_new(vals, $nulls)?
70            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)
71            .map_err(|e| ArrowError::ParseError(e.to_string()))?;
72        Arc::new(dec) as ArrayRef
73    }};
74}
75
76/// Macro to append a default decimal value from two's-complement big-endian bytes
77/// into the corresponding decimal builder, with compile-time constructed error text.
78macro_rules! append_decimal_default {
79    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
80        match $lit {
81            AvroLiteral::Bytes(b) => {
82                let ext = sign_cast_to::<$N>(b)?;
83                let val = <$Int>::from_be_bytes(ext);
84                $builder.append_value(val);
85                Ok(())
86            }
87            _ => Err(ArrowError::InvalidArgumentError(
88                concat!(
89                    "Default for ",
90                    $name,
91                    " must be bytes (two's-complement big-endian)"
92                )
93                .to_string(),
94            )),
95        }
96    }};
97}
98
99/// Decodes avro encoded data into [`RecordBatch`]
100#[derive(Debug)]
101pub(crate) struct RecordDecoder {
102    schema: SchemaRef,
103    fields: Vec<Decoder>,
104    projector: Option<Projector>,
105}
106
107impl RecordDecoder {
108    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
109    ///
110    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
111    ///
112    /// # Arguments
113    /// * `data_type` - The Avro data type to decode.
114    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
115    ///
116    /// # Errors
117    /// This function will return an error if the provided `data_type` is not a `Record`.
118    pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, ArrowError> {
119        match data_type.codec() {
120            Codec::Struct(reader_fields) => {
121                // Build Arrow schema fields and per-child decoders
122                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
123                let mut encodings = Vec::with_capacity(reader_fields.len());
124                for avro_field in reader_fields.iter() {
125                    arrow_fields.push(avro_field.field());
126                    encodings.push(Decoder::try_new(avro_field.data_type())?);
127                }
128                let projector = match data_type.resolution.as_ref() {
129                    Some(ResolutionInfo::Record(rec)) => {
130                        Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
131                    }
132                    _ => None,
133                };
134                Ok(Self {
135                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
136                    fields: encodings,
137                    projector,
138                })
139            }
140            other => Err(ArrowError::ParseError(format!(
141                "Expected record got {other:?}"
142            ))),
143        }
144    }
145
146    /// Returns the decoder's `SchemaRef`
147    pub(crate) fn schema(&self) -> &SchemaRef {
148        &self.schema
149    }
150
151    /// Decode `count` records from `buf`
152    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> {
153        let mut cursor = AvroCursor::new(buf);
154        match self.projector.as_mut() {
155            Some(proj) => {
156                for _ in 0..count {
157                    proj.project_record(&mut cursor, &mut self.fields)?;
158                }
159            }
160            None => {
161                for _ in 0..count {
162                    for field in &mut self.fields {
163                        field.decode(&mut cursor)?;
164                    }
165                }
166            }
167        }
168        Ok(cursor.position())
169    }
170
171    /// Flush the decoded records into a [`RecordBatch`]
172    pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
173        let arrays = self
174            .fields
175            .iter_mut()
176            .map(|x| x.flush(None))
177            .collect::<Result<Vec<_>, _>>()?;
178        RecordBatch::try_new(self.schema.clone(), arrays)
179    }
180}
181
182#[derive(Debug)]
183struct EnumResolution {
184    mapping: Arc<[i32]>,
185    default_index: i32,
186}
187
188#[derive(Debug, AsRefStr)]
189enum Decoder {
190    Null(usize),
191    Boolean(BooleanBufferBuilder),
192    Int32(Vec<i32>),
193    Int64(Vec<i64>),
194    #[cfg(feature = "avro_custom_types")]
195    DurationSecond(Vec<i64>),
196    #[cfg(feature = "avro_custom_types")]
197    DurationMillisecond(Vec<i64>),
198    #[cfg(feature = "avro_custom_types")]
199    DurationMicrosecond(Vec<i64>),
200    #[cfg(feature = "avro_custom_types")]
201    DurationNanosecond(Vec<i64>),
202    Float32(Vec<f32>),
203    Float64(Vec<f64>),
204    Date32(Vec<i32>),
205    TimeMillis(Vec<i32>),
206    TimeMicros(Vec<i64>),
207    TimestampMillis(bool, Vec<i64>),
208    TimestampMicros(bool, Vec<i64>),
209    TimestampNanos(bool, Vec<i64>),
210    Int32ToInt64(Vec<i64>),
211    Int32ToFloat32(Vec<f32>),
212    Int32ToFloat64(Vec<f64>),
213    Int64ToFloat32(Vec<f32>),
214    Int64ToFloat64(Vec<f64>),
215    Float32ToFloat64(Vec<f64>),
216    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
217    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
218    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
219    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
220    String(OffsetBufferBuilder<i32>, Vec<u8>),
221    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
222    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
223    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
224    Record(Fields, Vec<Decoder>, Option<Projector>),
225    Map(
226        FieldRef,
227        OffsetBufferBuilder<i32>,
228        OffsetBufferBuilder<i32>,
229        Vec<u8>,
230        Box<Decoder>,
231    ),
232    Fixed(i32, Vec<u8>),
233    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
234    Duration(IntervalMonthDayNanoBuilder),
235    Uuid(Vec<u8>),
236    #[cfg(feature = "small_decimals")]
237    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
238    #[cfg(feature = "small_decimals")]
239    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
240    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
241    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
242    #[cfg(feature = "avro_custom_types")]
243    RunEndEncoded(u8, usize, Box<Decoder>),
244    Union(UnionDecoder),
245    Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
246}
247
248impl Decoder {
249    fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
250        if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
251            if info.writer_is_union && !info.reader_is_union {
252                let mut clone = data_type.clone();
253                clone.resolution = None; // Build target base decoder without Union resolution
254                let target = Box::new(Self::try_new_internal(&clone)?);
255                let decoder = Self::Union(
256                    UnionDecoderBuilder::new()
257                        .with_resolved_union(info.clone())
258                        .with_target(target)
259                        .build()?,
260                );
261                return Ok(decoder);
262            }
263        }
264        Self::try_new_internal(data_type)
265    }
266
267    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, ArrowError> {
268        // Extract just the Promotion (if any) to simplify pattern matching
269        let promotion = match data_type.resolution.as_ref() {
270            Some(ResolutionInfo::Promotion(p)) => Some(p),
271            _ => None,
272        };
273        let decoder = match (data_type.codec(), promotion) {
274            (Codec::Int64, Some(Promotion::IntToLong)) => {
275                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
276            }
277            (Codec::Float32, Some(Promotion::IntToFloat)) => {
278                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
279            }
280            (Codec::Float64, Some(Promotion::IntToDouble)) => {
281                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
282            }
283            (Codec::Float32, Some(Promotion::LongToFloat)) => {
284                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
285            }
286            (Codec::Float64, Some(Promotion::LongToDouble)) => {
287                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
288            }
289            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
290                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
291            }
292            (Codec::Utf8, Some(Promotion::BytesToString))
293            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
294                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
295                Vec::with_capacity(DEFAULT_CAPACITY),
296            ),
297            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
298                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
299                Vec::with_capacity(DEFAULT_CAPACITY),
300            ),
301            (Codec::Null, _) => Self::Null(0),
302            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
303            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
304            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
305            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
306            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
307            (Codec::Binary, _) => Self::Binary(
308                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
309                Vec::with_capacity(DEFAULT_CAPACITY),
310            ),
311            (Codec::Utf8, _) => Self::String(
312                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
313                Vec::with_capacity(DEFAULT_CAPACITY),
314            ),
315            (Codec::Utf8View, _) => Self::StringView(
316                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
317                Vec::with_capacity(DEFAULT_CAPACITY),
318            ),
319            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
320            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
321            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
322            (Codec::TimestampMillis(is_utc), _) => {
323                Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
324            }
325            (Codec::TimestampMicros(is_utc), _) => {
326                Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
327            }
328            (Codec::TimestampNanos(is_utc), _) => {
329                Self::TimestampNanos(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
330            }
331            #[cfg(feature = "avro_custom_types")]
332            (Codec::DurationNanos, _) => {
333                Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
334            }
335            #[cfg(feature = "avro_custom_types")]
336            (Codec::DurationMicros, _) => {
337                Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
338            }
339            #[cfg(feature = "avro_custom_types")]
340            (Codec::DurationMillis, _) => {
341                Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
342            }
343            #[cfg(feature = "avro_custom_types")]
344            (Codec::DurationSeconds, _) => {
345                Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
346            }
347            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
348            (Codec::Decimal(precision, scale, size), _) => {
349                let p = *precision;
350                let s = *scale;
351                let prec = p as u8;
352                let scl = s.unwrap_or(0) as i8;
353                #[cfg(feature = "small_decimals")]
354                {
355                    if p <= DECIMAL32_MAX_PRECISION as usize {
356                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
357                            .with_precision_and_scale(prec, scl)?;
358                        Self::Decimal32(p, s, *size, builder)
359                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
360                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
361                            .with_precision_and_scale(prec, scl)?;
362                        Self::Decimal64(p, s, *size, builder)
363                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
364                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
365                            .with_precision_and_scale(prec, scl)?;
366                        Self::Decimal128(p, s, *size, builder)
367                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
368                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
369                            .with_precision_and_scale(prec, scl)?;
370                        Self::Decimal256(p, s, *size, builder)
371                    } else {
372                        return Err(ArrowError::ParseError(format!(
373                            "Decimal precision {p} exceeds maximum supported"
374                        )));
375                    }
376                }
377                #[cfg(not(feature = "small_decimals"))]
378                {
379                    if p <= DECIMAL128_MAX_PRECISION as usize {
380                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
381                            .with_precision_and_scale(prec, scl)?;
382                        Self::Decimal128(p, s, *size, builder)
383                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
384                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
385                            .with_precision_and_scale(prec, scl)?;
386                        Self::Decimal256(p, s, *size, builder)
387                    } else {
388                        return Err(ArrowError::ParseError(format!(
389                            "Decimal precision {p} exceeds maximum supported"
390                        )));
391                    }
392                }
393            }
394            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
395            (Codec::List(item), _) => {
396                let decoder = Self::try_new(item)?;
397                Self::Array(
398                    Arc::new(item.field_with_name("item")),
399                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
400                    Box::new(decoder),
401                )
402            }
403            (Codec::Enum(symbols), _) => {
404                let res = match data_type.resolution.as_ref() {
405                    Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
406                        mapping: mapping.mapping.clone(),
407                        default_index: mapping.default_index,
408                    }),
409                    _ => None,
410                };
411                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
412            }
413            (Codec::Struct(fields), _) => {
414                let mut arrow_fields = Vec::with_capacity(fields.len());
415                let mut encodings = Vec::with_capacity(fields.len());
416                for avro_field in fields.iter() {
417                    let encoding = Self::try_new(avro_field.data_type())?;
418                    arrow_fields.push(avro_field.field());
419                    encodings.push(encoding);
420                }
421                let projector =
422                    if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
423                        Some(ProjectorBuilder::try_new(rec, fields).build()?)
424                    } else {
425                        None
426                    };
427                Self::Record(arrow_fields.into(), encodings, projector)
428            }
429            (Codec::Map(child), _) => {
430                let val_field = child.field_with_name("value");
431                let map_field = Arc::new(ArrowField::new(
432                    "entries",
433                    DataType::Struct(Fields::from(vec![
434                        ArrowField::new("key", DataType::Utf8, false),
435                        val_field,
436                    ])),
437                    false,
438                ));
439                let val_dec = Self::try_new(child)?;
440                Self::Map(
441                    map_field,
442                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
443                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
444                    Vec::with_capacity(DEFAULT_CAPACITY),
445                    Box::new(val_dec),
446                )
447            }
448            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
449            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
450                let decoders = encodings
451                    .iter()
452                    .map(Self::try_new_internal)
453                    .collect::<Result<Vec<_>, _>>()?;
454                if fields.len() != decoders.len() {
455                    return Err(ArrowError::SchemaError(format!(
456                        "Union has {} fields but {} decoders",
457                        fields.len(),
458                        decoders.len()
459                    )));
460                }
461                // Proactive guard: if a user provides a union with more branches than
462                // a 32-bit Avro index can address, fail fast with a clear message.
463                let branch_count = decoders.len();
464                let max_addr = (i32::MAX as usize) + 1;
465                if branch_count > max_addr {
466                    return Err(ArrowError::SchemaError(format!(
467                        "Union has {branch_count} branches, which exceeds the maximum addressable \
468                         branches by an Avro int tag ({} + 1).",
469                        i32::MAX
470                    )));
471                }
472                let mut builder = UnionDecoderBuilder::new()
473                    .with_fields(fields.clone())
474                    .with_branches(decoders);
475                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
476                    if info.reader_is_union {
477                        builder = builder.with_resolved_union(info.clone());
478                    }
479                }
480                Self::Union(builder.build()?)
481            }
482            (Codec::Union(_, _, _), _) => {
483                return Err(ArrowError::NotYetImplemented(
484                    "Sparse Arrow unions are not yet supported".to_string(),
485                ));
486            }
487            #[cfg(feature = "avro_custom_types")]
488            (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
489                let inner = Self::try_new(values_dt)?;
490                let byte_width: u8 = match *width_bits_or_bytes {
491                    2 | 4 | 8 => *width_bits_or_bytes,
492                    16 => 2,
493                    32 => 4,
494                    64 => 8,
495                    other => {
496                        return Err(ArrowError::InvalidArgumentError(format!(
497                            "Unsupported run-end width {other} for RunEndEncoded; \
498                             expected 16/32/64 bits or 2/4/8 bytes"
499                        )));
500                    }
501                };
502                Self::RunEndEncoded(byte_width, 0, Box::new(inner))
503            }
504        };
505        Ok(match data_type.nullability() {
506            Some(nullability) => {
507                // Default to reading a union branch tag unless the resolution proves otherwise.
508                let mut plan = NullablePlan::ReadTag;
509                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
510                    if !info.writer_is_union && info.reader_is_union {
511                        if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
512                            plan = NullablePlan::FromSingle { promotion: *promo };
513                        }
514                    }
515                }
516                Self::Nullable(
517                    nullability,
518                    NullBufferBuilder::new(DEFAULT_CAPACITY),
519                    Box::new(decoder),
520                    plan,
521                )
522            }
523            None => decoder,
524        })
525    }
526
527    /// Append a null record
528    fn append_null(&mut self) -> Result<(), ArrowError> {
529        match self {
530            Self::Null(count) => *count += 1,
531            Self::Boolean(b) => b.append(false),
532            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
533            Self::Int64(v)
534            | Self::Int32ToInt64(v)
535            | Self::TimeMicros(v)
536            | Self::TimestampMillis(_, v)
537            | Self::TimestampMicros(_, v)
538            | Self::TimestampNanos(_, v) => v.push(0),
539            #[cfg(feature = "avro_custom_types")]
540            Self::DurationSecond(v)
541            | Self::DurationMillisecond(v)
542            | Self::DurationMicrosecond(v)
543            | Self::DurationNanosecond(v) => v.push(0),
544            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
545            Self::Float64(v)
546            | Self::Int32ToFloat64(v)
547            | Self::Int64ToFloat64(v)
548            | Self::Float32ToFloat64(v) => v.push(0.),
549            Self::Binary(offsets, _)
550            | Self::String(offsets, _)
551            | Self::StringView(offsets, _)
552            | Self::BytesToString(offsets, _)
553            | Self::StringToBytes(offsets, _) => {
554                offsets.push_length(0);
555            }
556            Self::Uuid(v) => {
557                v.extend([0; 16]);
558            }
559            Self::Array(_, offsets, _) => {
560                offsets.push_length(0);
561            }
562            Self::Record(_, e, _) => {
563                for encoding in e.iter_mut() {
564                    encoding.append_null()?;
565                }
566            }
567            Self::Map(_, _koff, moff, _, _) => {
568                moff.push_length(0);
569            }
570            Self::Fixed(sz, accum) => {
571                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
572            }
573            #[cfg(feature = "small_decimals")]
574            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
575            #[cfg(feature = "small_decimals")]
576            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
577            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
578            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
579            Self::Enum(indices, _, _) => indices.push(0),
580            Self::Duration(builder) => builder.append_null(),
581            #[cfg(feature = "avro_custom_types")]
582            Self::RunEndEncoded(_, len, inner) => {
583                *len += 1;
584                inner.append_null()?;
585            }
586            Self::Union(u) => u.append_null()?,
587            Self::Nullable(_, null_buffer, inner, _) => {
588                null_buffer.append(false);
589                inner.append_null()?;
590            }
591        }
592        Ok(())
593    }
594
595    /// Append a single default literal into the decoder's buffers
596    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
597        match self {
598            Self::Nullable(_, nb, inner, _) => {
599                if matches!(lit, AvroLiteral::Null) {
600                    nb.append(false);
601                    inner.append_null()
602                } else {
603                    nb.append(true);
604                    inner.append_default(lit)
605                }
606            }
607            Self::Null(count) => match lit {
608                AvroLiteral::Null => {
609                    *count += 1;
610                    Ok(())
611                }
612                _ => Err(ArrowError::InvalidArgumentError(
613                    "Non-null default for null type".to_string(),
614                )),
615            },
616            Self::Boolean(b) => match lit {
617                AvroLiteral::Boolean(v) => {
618                    b.append(*v);
619                    Ok(())
620                }
621                _ => Err(ArrowError::InvalidArgumentError(
622                    "Default for boolean must be boolean".to_string(),
623                )),
624            },
625            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
626                AvroLiteral::Int(i) => {
627                    v.push(*i);
628                    Ok(())
629                }
630                _ => Err(ArrowError::InvalidArgumentError(
631                    "Default for int32/date32/time-millis must be int".to_string(),
632                )),
633            },
634            #[cfg(feature = "avro_custom_types")]
635            Self::DurationSecond(v)
636            | Self::DurationMillisecond(v)
637            | Self::DurationMicrosecond(v)
638            | Self::DurationNanosecond(v) => match lit {
639                AvroLiteral::Long(i) => {
640                    v.push(*i);
641                    Ok(())
642                }
643                _ => Err(ArrowError::InvalidArgumentError(
644                    "Default for duration long must be long".to_string(),
645                )),
646            },
647            Self::Int64(v)
648            | Self::Int32ToInt64(v)
649            | Self::TimeMicros(v)
650            | Self::TimestampMillis(_, v)
651            | Self::TimestampMicros(_, v)
652            | Self::TimestampNanos(_, v) => match lit {
653                AvroLiteral::Long(i) => {
654                    v.push(*i);
655                    Ok(())
656                }
657                AvroLiteral::Int(i) => {
658                    v.push(*i as i64);
659                    Ok(())
660                }
661                _ => Err(ArrowError::InvalidArgumentError(
662                    "Default for long/time-micros/timestamp must be long or int".to_string(),
663                )),
664            },
665            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
666                AvroLiteral::Float(f) => {
667                    v.push(*f);
668                    Ok(())
669                }
670                _ => Err(ArrowError::InvalidArgumentError(
671                    "Default for float must be float".to_string(),
672                )),
673            },
674            Self::Float64(v)
675            | Self::Int32ToFloat64(v)
676            | Self::Int64ToFloat64(v)
677            | Self::Float32ToFloat64(v) => match lit {
678                AvroLiteral::Double(f) => {
679                    v.push(*f);
680                    Ok(())
681                }
682                _ => Err(ArrowError::InvalidArgumentError(
683                    "Default for double must be double".to_string(),
684                )),
685            },
686            Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
687                AvroLiteral::Bytes(b) => {
688                    offsets.push_length(b.len());
689                    values.extend_from_slice(b);
690                    Ok(())
691                }
692                _ => Err(ArrowError::InvalidArgumentError(
693                    "Default for bytes must be bytes".to_string(),
694                )),
695            },
696            Self::BytesToString(offsets, values)
697            | Self::String(offsets, values)
698            | Self::StringView(offsets, values) => match lit {
699                AvroLiteral::String(s) => {
700                    let b = s.as_bytes();
701                    offsets.push_length(b.len());
702                    values.extend_from_slice(b);
703                    Ok(())
704                }
705                _ => Err(ArrowError::InvalidArgumentError(
706                    "Default for string must be string".to_string(),
707                )),
708            },
709            Self::Uuid(values) => match lit {
710                AvroLiteral::String(s) => {
711                    let uuid = Uuid::try_parse(s).map_err(|e| {
712                        ArrowError::InvalidArgumentError(format!("Invalid UUID default: {s} ({e})"))
713                    })?;
714                    values.extend_from_slice(uuid.as_bytes());
715                    Ok(())
716                }
717                _ => Err(ArrowError::InvalidArgumentError(
718                    "Default for uuid must be string".to_string(),
719                )),
720            },
721            Self::Fixed(sz, accum) => match lit {
722                AvroLiteral::Bytes(b) => {
723                    if b.len() != *sz as usize {
724                        return Err(ArrowError::InvalidArgumentError(format!(
725                            "Fixed default length {} does not match size {sz}",
726                            b.len(),
727                        )));
728                    }
729                    accum.extend_from_slice(b);
730                    Ok(())
731                }
732                _ => Err(ArrowError::InvalidArgumentError(
733                    "Default for fixed must be bytes".to_string(),
734                )),
735            },
736            #[cfg(feature = "small_decimals")]
737            Self::Decimal32(_, _, _, builder) => {
738                append_decimal_default!(lit, builder, 4, i32, "decimal32")
739            }
740            #[cfg(feature = "small_decimals")]
741            Self::Decimal64(_, _, _, builder) => {
742                append_decimal_default!(lit, builder, 8, i64, "decimal64")
743            }
744            Self::Decimal128(_, _, _, builder) => {
745                append_decimal_default!(lit, builder, 16, i128, "decimal128")
746            }
747            Self::Decimal256(_, _, _, builder) => {
748                append_decimal_default!(lit, builder, 32, i256, "decimal256")
749            }
750            Self::Duration(builder) => match lit {
751                AvroLiteral::Bytes(b) => {
752                    if b.len() != 12 {
753                        return Err(ArrowError::InvalidArgumentError(format!(
754                            "Duration default must be exactly 12 bytes, got {}",
755                            b.len()
756                        )));
757                    }
758                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
759                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
760                    let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
761                    let nanos = (millis as i64) * 1_000_000;
762                    builder.append_value(IntervalMonthDayNano::new(
763                        months as i32,
764                        days as i32,
765                        nanos,
766                    ));
767                    Ok(())
768                }
769                _ => Err(ArrowError::InvalidArgumentError(
770                    "Default for duration must be 12-byte little-endian months/days/millis"
771                        .to_string(),
772                )),
773            },
774            Self::Array(_, offsets, inner) => match lit {
775                AvroLiteral::Array(items) => {
776                    offsets.push_length(items.len());
777                    for item in items {
778                        inner.append_default(item)?;
779                    }
780                    Ok(())
781                }
782                _ => Err(ArrowError::InvalidArgumentError(
783                    "Default for array must be an array literal".to_string(),
784                )),
785            },
786            Self::Map(_, koff, moff, kdata, valdec) => match lit {
787                AvroLiteral::Map(entries) => {
788                    moff.push_length(entries.len());
789                    for (k, v) in entries {
790                        let kb = k.as_bytes();
791                        koff.push_length(kb.len());
792                        kdata.extend_from_slice(kb);
793                        valdec.append_default(v)?;
794                    }
795                    Ok(())
796                }
797                _ => Err(ArrowError::InvalidArgumentError(
798                    "Default for map must be a map/object literal".to_string(),
799                )),
800            },
801            Self::Enum(indices, symbols, _) => match lit {
802                AvroLiteral::Enum(sym) => {
803                    let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
804                        ArrowError::InvalidArgumentError(format!(
805                            "Enum default symbol {sym:?} not in reader symbols"
806                        ))
807                    })?;
808                    indices.push(pos as i32);
809                    Ok(())
810                }
811                _ => Err(ArrowError::InvalidArgumentError(
812                    "Default for enum must be a symbol".to_string(),
813                )),
814            },
815            #[cfg(feature = "avro_custom_types")]
816            Self::RunEndEncoded(_, len, inner) => {
817                *len += 1;
818                inner.append_default(lit)
819            }
820            Self::Union(u) => u.append_default(lit),
821            Self::Record(field_meta, decoders, projector) => match lit {
822                AvroLiteral::Map(entries) => {
823                    for (i, dec) in decoders.iter_mut().enumerate() {
824                        let name = field_meta[i].name();
825                        if let Some(sub) = entries.get(name) {
826                            dec.append_default(sub)?;
827                        } else if let Some(proj) = projector.as_ref() {
828                            proj.project_default(dec, i)?;
829                        } else {
830                            dec.append_null()?;
831                        }
832                    }
833                    Ok(())
834                }
835                AvroLiteral::Null => {
836                    for (i, dec) in decoders.iter_mut().enumerate() {
837                        if let Some(proj) = projector.as_ref() {
838                            proj.project_default(dec, i)?;
839                        } else {
840                            dec.append_null()?;
841                        }
842                    }
843                    Ok(())
844                }
845                _ => Err(ArrowError::InvalidArgumentError(
846                    "Default for record must be a map/object or null".to_string(),
847                )),
848            },
849        }
850    }
851
852    /// Decode a single record from `buf`
853    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
854        match self {
855            Self::Null(x) => *x += 1,
856            Self::Boolean(values) => values.append(buf.get_bool()?),
857            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
858                values.push(buf.get_int()?)
859            }
860            Self::Int64(values)
861            | Self::TimeMicros(values)
862            | Self::TimestampMillis(_, values)
863            | Self::TimestampMicros(_, values)
864            | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
865            #[cfg(feature = "avro_custom_types")]
866            Self::DurationSecond(values)
867            | Self::DurationMillisecond(values)
868            | Self::DurationMicrosecond(values)
869            | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
870            Self::Float32(values) => values.push(buf.get_float()?),
871            Self::Float64(values) => values.push(buf.get_double()?),
872            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
873            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
874            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
875            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
876            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
877            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
878            Self::StringToBytes(offsets, values)
879            | Self::BytesToString(offsets, values)
880            | Self::Binary(offsets, values)
881            | Self::String(offsets, values)
882            | Self::StringView(offsets, values) => {
883                let data = buf.get_bytes()?;
884                offsets.push_length(data.len());
885                values.extend_from_slice(data);
886            }
887            Self::Uuid(values) => {
888                let s_bytes = buf.get_bytes()?;
889                let s = std::str::from_utf8(s_bytes).map_err(|e| {
890                    ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
891                })?;
892                let uuid = Uuid::try_parse(s)
893                    .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?;
894                values.extend_from_slice(uuid.as_bytes());
895            }
896            Self::Array(_, off, encoding) => {
897                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
898                off.push_length(total_items);
899            }
900            Self::Record(_, encodings, None) => {
901                for encoding in encodings {
902                    encoding.decode(buf)?;
903                }
904            }
905            Self::Record(_, encodings, Some(proj)) => {
906                proj.project_record(buf, encodings)?;
907            }
908            Self::Map(_, koff, moff, kdata, valdec) => {
909                let newly_added = read_blocks(buf, |cur| {
910                    let kb = cur.get_bytes()?;
911                    koff.push_length(kb.len());
912                    kdata.extend_from_slice(kb);
913                    valdec.decode(cur)
914                })?;
915                moff.push_length(newly_added);
916            }
917            Self::Fixed(sz, accum) => {
918                let fx = buf.get_fixed(*sz as usize)?;
919                accum.extend_from_slice(fx);
920            }
921            #[cfg(feature = "small_decimals")]
922            Self::Decimal32(_, _, size, builder) => {
923                decode_decimal!(size, buf, builder, 4, i32);
924            }
925            #[cfg(feature = "small_decimals")]
926            Self::Decimal64(_, _, size, builder) => {
927                decode_decimal!(size, buf, builder, 8, i64);
928            }
929            Self::Decimal128(_, _, size, builder) => {
930                decode_decimal!(size, buf, builder, 16, i128);
931            }
932            Self::Decimal256(_, _, size, builder) => {
933                decode_decimal!(size, buf, builder, 32, i256);
934            }
935            Self::Enum(indices, _, None) => {
936                indices.push(buf.get_int()?);
937            }
938            Self::Enum(indices, _, Some(res)) => {
939                let raw = buf.get_int()?;
940                let resolved = usize::try_from(raw)
941                    .ok()
942                    .and_then(|idx| res.mapping.get(idx).copied())
943                    .filter(|&idx| idx >= 0)
944                    .unwrap_or(res.default_index);
945                if resolved >= 0 {
946                    indices.push(resolved);
947                } else {
948                    return Err(ArrowError::ParseError(format!(
949                        "Enum symbol index {raw} not resolvable and no default provided",
950                    )));
951                }
952            }
953            Self::Duration(builder) => {
954                let b = buf.get_fixed(12)?;
955                let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
956                let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
957                let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
958                let nanos = (millis as i64) * 1_000_000;
959                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
960            }
961            #[cfg(feature = "avro_custom_types")]
962            Self::RunEndEncoded(_, len, inner) => {
963                *len += 1;
964                inner.decode(buf)?;
965            }
966            Self::Union(u) => u.decode(buf)?,
967            Self::Nullable(order, nb, encoding, plan) => {
968                match *plan {
969                    NullablePlan::FromSingle { promotion } => {
970                        encoding.decode_with_promotion(buf, promotion)?;
971                        nb.append(true);
972                    }
973                    NullablePlan::ReadTag => {
974                        let branch = buf.read_vlq()?;
975                        let is_not_null = match *order {
976                            Nullability::NullFirst => branch != 0,
977                            Nullability::NullSecond => branch == 0,
978                        };
979                        if is_not_null {
980                            // It is important to decode before appending to null buffer in case of decode error
981                            encoding.decode(buf)?;
982                        } else {
983                            encoding.append_null()?;
984                        }
985                        nb.append(is_not_null);
986                    }
987                }
988            }
989        }
990        Ok(())
991    }
992
993    fn decode_with_promotion(
994        &mut self,
995        buf: &mut AvroCursor<'_>,
996        promotion: Promotion,
997    ) -> Result<(), ArrowError> {
998        #[cfg(feature = "avro_custom_types")]
999        if let Self::RunEndEncoded(_, len, inner) = self {
1000            *len += 1;
1001            return inner.decode_with_promotion(buf, promotion);
1002        }
1003
1004        macro_rules! promote_numeric_to {
1005            ($variant:ident, $getter:ident, $to:ty) => {{
1006                match self {
1007                    Self::$variant(v) => {
1008                        let x = buf.$getter()?;
1009                        v.push(x as $to);
1010                        Ok(())
1011                    }
1012                    other => Err(ArrowError::ParseError(format!(
1013                        "Promotion {promotion} target mismatch: expected {}, got {}",
1014                        stringify!($variant),
1015                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
1016                    ))),
1017                }
1018            }};
1019        }
1020        match promotion {
1021            Promotion::Direct => self.decode(buf),
1022            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1023            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1024            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1025            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1026            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1027            Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1028            Promotion::StringToBytes => match self {
1029                Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1030                    let data = buf.get_bytes()?;
1031                    offsets.push_length(data.len());
1032                    values.extend_from_slice(data);
1033                    Ok(())
1034                }
1035                other => Err(ArrowError::ParseError(format!(
1036                    "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1037                    <Self as AsRef<str>>::as_ref(other)
1038                ))),
1039            },
1040            Promotion::BytesToString => match self {
1041                Self::String(offsets, values)
1042                | Self::StringView(offsets, values)
1043                | Self::BytesToString(offsets, values) => {
1044                    let data = buf.get_bytes()?;
1045                    offsets.push_length(data.len());
1046                    values.extend_from_slice(data);
1047                    Ok(())
1048                }
1049                other => Err(ArrowError::ParseError(format!(
1050                    "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1051                    <Self as AsRef<str>>::as_ref(other)
1052                ))),
1053            },
1054        }
1055    }
1056
1057    /// Flush decoded records to an [`ArrayRef`]
1058    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1059        Ok(match self {
1060            Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1061            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1062            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1063            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1064            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1065            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1066            Self::TimeMillis(values) => {
1067                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1068            }
1069            Self::TimeMicros(values) => {
1070                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1071            }
1072            Self::TimestampMillis(is_utc, values) => Arc::new(
1073                flush_primitive::<TimestampMillisecondType>(values, nulls)
1074                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1075            ),
1076            Self::TimestampMicros(is_utc, values) => Arc::new(
1077                flush_primitive::<TimestampMicrosecondType>(values, nulls)
1078                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1079            ),
1080            Self::TimestampNanos(is_utc, values) => Arc::new(
1081                flush_primitive::<TimestampNanosecondType>(values, nulls)
1082                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1083            ),
1084            #[cfg(feature = "avro_custom_types")]
1085            Self::DurationSecond(values) => {
1086                Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1087            }
1088            #[cfg(feature = "avro_custom_types")]
1089            Self::DurationMillisecond(values) => {
1090                Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1091            }
1092            #[cfg(feature = "avro_custom_types")]
1093            Self::DurationMicrosecond(values) => {
1094                Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1095            }
1096            #[cfg(feature = "avro_custom_types")]
1097            Self::DurationNanosecond(values) => {
1098                Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1099            }
1100            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1101            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1102            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1103            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1104                Arc::new(flush_primitive::<Float32Type>(values, nulls))
1105            }
1106            Self::Int32ToFloat64(values)
1107            | Self::Int64ToFloat64(values)
1108            | Self::Float32ToFloat64(values) => {
1109                Arc::new(flush_primitive::<Float64Type>(values, nulls))
1110            }
1111            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1112                let offsets = flush_offsets(offsets);
1113                let values = flush_values(values).into();
1114                Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1115            }
1116            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1117                let offsets = flush_offsets(offsets);
1118                let values = flush_values(values).into();
1119                Arc::new(StringArray::try_new(offsets, values, nulls)?)
1120            }
1121            Self::StringView(offsets, values) => {
1122                let offsets = flush_offsets(offsets);
1123                let values = flush_values(values);
1124                let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1125                let values: Vec<&str> = (0..array.len())
1126                    .map(|i| {
1127                        if array.is_valid(i) {
1128                            array.value(i)
1129                        } else {
1130                            ""
1131                        }
1132                    })
1133                    .collect();
1134                Arc::new(StringViewArray::from(values))
1135            }
1136            Self::Array(field, offsets, values) => {
1137                let values = values.flush(None)?;
1138                let offsets = flush_offsets(offsets);
1139                Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1140            }
1141            Self::Record(fields, encodings, _) => {
1142                let arrays = encodings
1143                    .iter_mut()
1144                    .map(|x| x.flush(None))
1145                    .collect::<Result<Vec<_>, _>>()?;
1146                Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1147            }
1148            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1149                let moff = flush_offsets(m_off);
1150                let koff = flush_offsets(k_off);
1151                let kd = flush_values(kdata).into();
1152                let val_arr = valdec.flush(None)?;
1153                let key_arr = StringArray::try_new(koff, kd, None)?;
1154                if key_arr.len() != val_arr.len() {
1155                    return Err(ArrowError::InvalidArgumentError(format!(
1156                        "Map keys length ({}) != map values length ({})",
1157                        key_arr.len(),
1158                        val_arr.len()
1159                    )));
1160                }
1161                let final_len = moff.len() - 1;
1162                if let Some(n) = &nulls {
1163                    if n.len() != final_len {
1164                        return Err(ArrowError::InvalidArgumentError(format!(
1165                            "Map array null buffer length {} != final map length {final_len}",
1166                            n.len()
1167                        )));
1168                    }
1169                }
1170                let entries_fields = match map_field.data_type() {
1171                    DataType::Struct(fields) => fields.clone(),
1172                    other => {
1173                        return Err(ArrowError::InvalidArgumentError(format!(
1174                            "Map entries field must be a Struct, got {other:?}"
1175                        )));
1176                    }
1177                };
1178                let entries_struct =
1179                    StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1180                let map_arr =
1181                    MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1182                Arc::new(map_arr)
1183            }
1184            Self::Fixed(sz, accum) => {
1185                let b: Buffer = flush_values(accum).into();
1186                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1187                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1188                Arc::new(arr)
1189            }
1190            Self::Uuid(values) => {
1191                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1192                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1193                Arc::new(arr)
1194            }
1195            #[cfg(feature = "small_decimals")]
1196            Self::Decimal32(precision, scale, _, builder) => {
1197                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1198            }
1199            #[cfg(feature = "small_decimals")]
1200            Self::Decimal64(precision, scale, _, builder) => {
1201                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1202            }
1203            Self::Decimal128(precision, scale, _, builder) => {
1204                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1205            }
1206            Self::Decimal256(precision, scale, _, builder) => {
1207                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1208            }
1209            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1210            Self::Duration(builder) => {
1211                let (_, vals, _) = builder.finish().into_parts();
1212                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1213                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1214                Arc::new(vals)
1215            }
1216            #[cfg(feature = "avro_custom_types")]
1217            Self::RunEndEncoded(width, len, inner) => {
1218                let values = inner.flush(nulls)?;
1219                let n = *len;
1220                let arr = values.as_ref();
1221                let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1222                if n > 0 {
1223                    run_starts.push(0);
1224                    for i in 1..n {
1225                        if !values_equal_at(arr, i - 1, i) {
1226                            run_starts.push(i);
1227                        }
1228                    }
1229                }
1230                if n > (u32::MAX as usize) {
1231                    return Err(ArrowError::InvalidArgumentError(format!(
1232                        "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1233                    )));
1234                }
1235                let run_count = run_starts.len();
1236                let take_idx: PrimitiveArray<UInt32Type> =
1237                    run_starts.iter().map(|&s| s as u32).collect();
1238                let per_run_values = if run_count == 0 {
1239                    values.slice(0, 0)
1240                } else {
1241                    take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1242                        ArrowError::ParseError(format!("take() for REE values failed: {e}"))
1243                    })?
1244                };
1245
1246                macro_rules! build_run_array {
1247                    ($Native:ty, $ArrowTy:ty) => {{
1248                        let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1249                        for (idx, &_start) in run_starts.iter().enumerate() {
1250                            let end = if idx + 1 < run_count {
1251                                run_starts[idx + 1]
1252                            } else {
1253                                n
1254                            };
1255                            ends.push(end as $Native);
1256                        }
1257                        let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1258                        let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1259                            .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1260                        Arc::new(run_arr) as ArrayRef
1261                    }};
1262                }
1263                match *width {
1264                    2 => {
1265                        if n > i16::MAX as usize {
1266                            return Err(ArrowError::InvalidArgumentError(format!(
1267                                "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1268                            )));
1269                        }
1270                        build_run_array!(i16, Int16Type)
1271                    }
1272                    4 => build_run_array!(i32, Int32Type),
1273                    8 => build_run_array!(i64, Int64Type),
1274                    other => {
1275                        return Err(ArrowError::InvalidArgumentError(format!(
1276                            "Unsupported run-end width {other} for RunEndEncoded"
1277                        )));
1278                    }
1279                }
1280            }
1281            Self::Union(u) => u.flush(nulls)?,
1282        })
1283    }
1284}
1285
1286// A lookup table for resolving fields between writer and reader schemas during record projection.
1287#[derive(Debug)]
1288struct DispatchLookupTable {
1289    // Maps each reader field index `r` to the corresponding writer field index.
1290    //
1291    // Semantics:
1292    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. The value from
1293    //   the writer field is decoded, and `promotion[r]` is applied.
1294    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. The reader field's
1295    //   default value is used.
1296    //
1297    // Representation (`i8`):
1298    // `i8` is used for a dense, cache-friendly dispatch table, consistent with Arrow's use of
1299    // `i8` for union type IDs. This requires that writer field indices do not exceed `i8::MAX`.
1300    //
1301    // Invariants:
1302    // - `to_reader.len() == promotion.len()` and matches the reader field count.
1303    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
1304    to_reader: Box<[i8]>,
1305    // For each reader field `r`, specifies the `Promotion` to apply to the writer's value.
1306    //
1307    // This is used when a writer field's type can be promoted to a reader field's type
1308    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
1309    promotion: Box<[Promotion]>,
1310}
1311
1312// Sentinel used in `DispatchLookupTable::to_reader` to mark
1313// "no matching writer field".
1314const NO_SOURCE: i8 = -1;
1315
1316impl DispatchLookupTable {
1317    fn from_writer_to_reader(
1318        promotion_map: &[Option<(usize, Promotion)>],
1319    ) -> Result<Self, ArrowError> {
1320        let mut to_reader = Vec::with_capacity(promotion_map.len());
1321        let mut promotion = Vec::with_capacity(promotion_map.len());
1322        for map in promotion_map {
1323            match *map {
1324                Some((idx, promo)) => {
1325                    let idx_i8 = i8::try_from(idx).map_err(|_| {
1326                        ArrowError::SchemaError(format!(
1327                            "Reader branch index {idx} exceeds i8 range (max {})",
1328                            i8::MAX
1329                        ))
1330                    })?;
1331                    to_reader.push(idx_i8);
1332                    promotion.push(promo);
1333                }
1334                None => {
1335                    to_reader.push(NO_SOURCE);
1336                    promotion.push(Promotion::Direct);
1337                }
1338            }
1339        }
1340        Ok(Self {
1341            to_reader: to_reader.into_boxed_slice(),
1342            promotion: promotion.into_boxed_slice(),
1343        })
1344    }
1345
1346    // Resolve a writer branch index to (reader_idx, promotion)
1347    #[inline]
1348    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1349        let reader_index = *self.to_reader.get(writer_index)?;
1350        (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1351    }
1352}
1353
1354#[derive(Debug)]
1355struct UnionDecoder {
1356    fields: UnionFields,
1357    type_ids: Vec<i8>,
1358    offsets: Vec<i32>,
1359    branches: Vec<Decoder>,
1360    counts: Vec<i32>,
1361    reader_type_codes: Vec<i8>,
1362    default_emit_idx: usize,
1363    null_emit_idx: usize,
1364    plan: UnionReadPlan,
1365}
1366
1367impl Default for UnionDecoder {
1368    fn default() -> Self {
1369        Self {
1370            fields: UnionFields::empty(),
1371            type_ids: Vec::new(),
1372            offsets: Vec::new(),
1373            branches: Vec::new(),
1374            counts: Vec::new(),
1375            reader_type_codes: Vec::new(),
1376            default_emit_idx: 0,
1377            null_emit_idx: 0,
1378            plan: UnionReadPlan::Passthrough,
1379        }
1380    }
1381}
1382
1383#[derive(Debug)]
1384enum UnionReadPlan {
1385    ReaderUnion {
1386        lookup_table: DispatchLookupTable,
1387    },
1388    FromSingle {
1389        reader_idx: usize,
1390        promotion: Promotion,
1391    },
1392    ToSingle {
1393        target: Box<Decoder>,
1394        lookup_table: DispatchLookupTable,
1395    },
1396    Passthrough,
1397}
1398
1399impl UnionDecoder {
1400    fn try_new(
1401        fields: UnionFields,
1402        branches: Vec<Decoder>,
1403        resolved: Option<ResolvedUnion>,
1404    ) -> Result<Self, ArrowError> {
1405        let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1406        let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1407        let default_emit_idx = 0;
1408        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1409        let branch_len = branches.len().max(reader_type_codes.len());
1410        // Guard against impractically large unions that cannot be indexed by an Avro int
1411        let max_addr = (i32::MAX as usize) + 1;
1412        if branches.len() > max_addr {
1413            return Err(ArrowError::SchemaError(format!(
1414                "Reader union has {} branches, which exceeds the maximum addressable \
1415                 branches by an Avro int tag ({} + 1).",
1416                branches.len(),
1417                i32::MAX
1418            )));
1419        }
1420        Ok(Self {
1421            fields,
1422            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1423            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1424            branches,
1425            counts: vec![0; branch_len],
1426            reader_type_codes,
1427            default_emit_idx,
1428            null_emit_idx,
1429            plan: Self::plan_from_resolved(resolved)?,
1430        })
1431    }
1432
1433    fn try_new_from_writer_union(
1434        info: ResolvedUnion,
1435        target: Box<Decoder>,
1436    ) -> Result<Self, ArrowError> {
1437        // This constructor is only for writer-union to single-type resolution
1438        debug_assert!(info.writer_is_union && !info.reader_is_union);
1439        let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1440        Ok(Self {
1441            plan: UnionReadPlan::ToSingle {
1442                target,
1443                lookup_table,
1444            },
1445            ..Self::default()
1446        })
1447    }
1448
1449    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, ArrowError> {
1450        let Some(info) = resolved else {
1451            return Ok(UnionReadPlan::Passthrough);
1452        };
1453        match (info.writer_is_union, info.reader_is_union) {
1454            (true, true) => {
1455                let lookup_table =
1456                    DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1457                Ok(UnionReadPlan::ReaderUnion { lookup_table })
1458            }
1459            (false, true) => {
1460                let Some(&(reader_idx, promotion)) =
1461                    info.writer_to_reader.first().and_then(Option::as_ref)
1462                else {
1463                    return Err(ArrowError::SchemaError(
1464                        "Writer type does not match any reader union branch".to_string(),
1465                    ));
1466                };
1467                Ok(UnionReadPlan::FromSingle {
1468                    reader_idx,
1469                    promotion,
1470                })
1471            }
1472            (true, false) => Err(ArrowError::InvalidArgumentError(
1473                "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1474                    .to_string(),
1475            )),
1476            // (false, false) is invalid and should never be constructed by the resolver.
1477            _ => Err(ArrowError::SchemaError(
1478                "ResolvedUnion constructed for non-union sides; resolver should return None"
1479                    .to_string(),
1480            )),
1481        }
1482    }
1483
1484    #[inline]
1485    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
1486        // Avro unions are encoded by first writing the zero-based branch index.
1487        // In Avro 1.11.1 this is specified as an *int*; older specs said *long*,
1488        // but both use zig-zag varint encoding, so decoding as long is compatible
1489        // with either form and widely used in practice.
1490        let raw = buf.get_long()?;
1491        if raw < 0 {
1492            return Err(ArrowError::ParseError(format!(
1493                "Negative union branch index {raw}"
1494            )));
1495        }
1496        usize::try_from(raw).map_err(|_| {
1497            ArrowError::ParseError(format!(
1498                "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1499                (usize::BITS as usize)
1500            ))
1501        })
1502    }
1503
1504    #[inline]
1505    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, ArrowError> {
1506        let branches_len = self.branches.len();
1507        let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1508            return Err(ArrowError::ParseError(format!(
1509                "Union branch index {reader_idx} out of range ({branches_len} branches)"
1510            )));
1511        };
1512        self.type_ids.push(self.reader_type_codes[reader_idx]);
1513        self.offsets.push(self.counts[reader_idx]);
1514        self.counts[reader_idx] += 1;
1515        Ok(reader_branch)
1516    }
1517
1518    #[inline]
1519    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), ArrowError>
1520    where
1521        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
1522    {
1523        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1524            return action(target);
1525        }
1526        let reader_idx = match &self.plan {
1527            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1528            _ => fallback_idx,
1529        };
1530        self.emit_to(reader_idx).and_then(action)
1531    }
1532
1533    fn append_null(&mut self) -> Result<(), ArrowError> {
1534        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1535    }
1536
1537    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
1538        self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1539    }
1540
1541    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1542        let (reader_idx, promotion) = match &mut self.plan {
1543            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1544            UnionReadPlan::ReaderUnion { lookup_table } => {
1545                let idx = Self::read_tag(buf)?;
1546                lookup_table.resolve(idx).ok_or_else(|| {
1547                    ArrowError::ParseError(format!(
1548                        "Union branch index {idx} not resolvable by reader schema"
1549                    ))
1550                })?
1551            }
1552            UnionReadPlan::FromSingle {
1553                reader_idx,
1554                promotion,
1555            } => (*reader_idx, *promotion),
1556            UnionReadPlan::ToSingle {
1557                target,
1558                lookup_table,
1559            } => {
1560                let idx = Self::read_tag(buf)?;
1561                return match lookup_table.resolve(idx) {
1562                    Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1563                    None => Err(ArrowError::ParseError(format!(
1564                        "Writer union branch {idx} does not resolve to reader type"
1565                    ))),
1566                };
1567            }
1568        };
1569        let decoder = self.emit_to(reader_idx)?;
1570        decoder.decode_with_promotion(buf, promotion)
1571    }
1572
1573    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1574        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1575            return target.flush(nulls);
1576        }
1577        debug_assert!(
1578            nulls.is_none(),
1579            "UnionArray does not accept a validity bitmap; \
1580                     nulls should have been materialized as a Null child during decode"
1581        );
1582        let children = self
1583            .branches
1584            .iter_mut()
1585            .map(|d| d.flush(None))
1586            .collect::<Result<Vec<_>, _>>()?;
1587        let arr = UnionArray::try_new(
1588            self.fields.clone(),
1589            flush_values(&mut self.type_ids).into_iter().collect(),
1590            Some(flush_values(&mut self.offsets).into_iter().collect()),
1591            children,
1592        )
1593        .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1594        Ok(Arc::new(arr))
1595    }
1596}
1597
1598#[derive(Debug, Default)]
1599struct UnionDecoderBuilder {
1600    fields: Option<UnionFields>,
1601    branches: Option<Vec<Decoder>>,
1602    resolved: Option<ResolvedUnion>,
1603    target: Option<Box<Decoder>>,
1604}
1605
1606impl UnionDecoderBuilder {
1607    fn new() -> Self {
1608        Self::default()
1609    }
1610
1611    fn with_fields(mut self, fields: UnionFields) -> Self {
1612        self.fields = Some(fields);
1613        self
1614    }
1615
1616    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
1617        self.branches = Some(branches);
1618        self
1619    }
1620
1621    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
1622        self.resolved = Some(resolved_union);
1623        self
1624    }
1625
1626    fn with_target(mut self, target: Box<Decoder>) -> Self {
1627        self.target = Some(target);
1628        self
1629    }
1630
1631    fn build(self) -> Result<UnionDecoder, ArrowError> {
1632        match (self.resolved, self.fields, self.branches, self.target) {
1633            (resolved, Some(fields), Some(branches), None) => {
1634                UnionDecoder::try_new(fields, branches, resolved)
1635            }
1636            (Some(info), None, None, Some(target))
1637                if info.writer_is_union && !info.reader_is_union =>
1638            {
1639                UnionDecoder::try_new_from_writer_union(info, target)
1640            }
1641            _ => Err(ArrowError::InvalidArgumentError(
1642                "Invalid UnionDecoderBuilder configuration: expected either \
1643                 (fields + branches + resolved) with no target for reader-unions, or \
1644                 (resolved + target) with no fields/branches for writer-union to single."
1645                    .to_string(),
1646            )),
1647        }
1648    }
1649}
1650
1651#[derive(Debug, Copy, Clone)]
1652enum NegativeBlockBehavior {
1653    ProcessItems,
1654    SkipBySize,
1655}
1656
1657#[inline]
1658fn skip_blocks(
1659    buf: &mut AvroCursor,
1660    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1661) -> Result<usize, ArrowError> {
1662    process_blockwise(
1663        buf,
1664        move |c| skip_item(c),
1665        NegativeBlockBehavior::SkipBySize,
1666    )
1667}
1668
1669#[inline]
1670fn flush_dict(
1671    indices: &mut Vec<i32>,
1672    symbols: &[String],
1673    nulls: Option<NullBuffer>,
1674) -> Result<ArrayRef, ArrowError> {
1675    let keys = flush_primitive::<Int32Type>(indices, nulls);
1676    let values = Arc::new(StringArray::from_iter_values(
1677        symbols.iter().map(|s| s.as_str()),
1678    ));
1679    DictionaryArray::try_new(keys, values)
1680        .map_err(|e| ArrowError::ParseError(e.to_string()))
1681        .map(|arr| Arc::new(arr) as ArrayRef)
1682}
1683
1684#[inline]
1685fn read_blocks(
1686    buf: &mut AvroCursor,
1687    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1688) -> Result<usize, ArrowError> {
1689    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
1690}
1691
1692#[inline]
1693fn process_blockwise(
1694    buf: &mut AvroCursor,
1695    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1696    negative_behavior: NegativeBlockBehavior,
1697) -> Result<usize, ArrowError> {
1698    let mut total = 0usize;
1699    loop {
1700        // Read the block count
1701        //  positive = that many items
1702        //  negative = that many items + read block size
1703        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
1704        let block_count = buf.get_long()?;
1705        match block_count.cmp(&0) {
1706            Ordering::Equal => break,
1707            Ordering::Less => {
1708                let count = (-block_count) as usize;
1709                // A negative count is followed by a long of the size in bytes
1710                let size_in_bytes = buf.get_long()? as usize;
1711                match negative_behavior {
1712                    NegativeBlockBehavior::ProcessItems => {
1713                        // Process items one-by-one after reading size
1714                        for _ in 0..count {
1715                            on_item(buf)?;
1716                        }
1717                    }
1718                    NegativeBlockBehavior::SkipBySize => {
1719                        // Skip the entire block payload at once
1720                        let _ = buf.get_fixed(size_in_bytes)?;
1721                    }
1722                }
1723                total += count;
1724            }
1725            Ordering::Greater => {
1726                let count = block_count as usize;
1727                for _ in 0..count {
1728                    on_item(buf)?;
1729                }
1730                total += count;
1731            }
1732        }
1733    }
1734    Ok(total)
1735}
1736
1737#[inline]
1738fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
1739    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
1740}
1741
1742#[inline]
1743fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
1744    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
1745}
1746
1747#[inline]
1748fn flush_primitive<T: ArrowPrimitiveType>(
1749    values: &mut Vec<T::Native>,
1750    nulls: Option<NullBuffer>,
1751) -> PrimitiveArray<T> {
1752    PrimitiveArray::new(flush_values(values).into(), nulls)
1753}
1754
1755#[inline]
1756fn read_decimal_bytes_be<const N: usize>(
1757    buf: &mut AvroCursor<'_>,
1758    size: &Option<usize>,
1759) -> Result<[u8; N], ArrowError> {
1760    match size {
1761        Some(n) if *n == N => {
1762            let raw = buf.get_fixed(N)?;
1763            let mut arr = [0u8; N];
1764            arr.copy_from_slice(raw);
1765            Ok(arr)
1766        }
1767        Some(n) => {
1768            let raw = buf.get_fixed(*n)?;
1769            sign_cast_to::<N>(raw)
1770        }
1771        None => {
1772            let raw = buf.get_bytes()?;
1773            sign_cast_to::<N>(raw)
1774        }
1775    }
1776}
1777
1778/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
1779/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
1780/// the payload is a big-endian two's-complement integer, and when narrowing it must
1781/// be representable without changing sign or value.
1782///
1783/// If `raw.len() < N`, the value is sign-extended.
1784/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
1785/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
1786#[inline]
1787fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
1788    let len = raw.len();
1789    // Fast path: exact width, just copy
1790    if len == N {
1791        let mut out = [0u8; N];
1792        out.copy_from_slice(raw);
1793        return Ok(out);
1794    }
1795    // Determine sign byte from MSB of first byte (empty => positive)
1796    let first = raw.first().copied().unwrap_or(0u8);
1797    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
1798    // Pre-fill with sign byte to support sign extension
1799    let mut out = [sign_byte; N];
1800    if len > N {
1801        // Validate truncation: all dropped leading bytes must equal sign_byte,
1802        // and the MSB of the first kept byte must match the sign.
1803        let extra = len - N;
1804        // Any non-sign byte in the truncated prefix indicates overflow
1805        if raw[..extra].iter().any(|&b| b != sign_byte) {
1806            return Err(ArrowError::ParseError(format!(
1807                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1808                len, N
1809            )));
1810        }
1811        if N > 0 {
1812            let first_kept = raw[extra];
1813            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
1814            if sign_bit_mismatch {
1815                return Err(ArrowError::ParseError(format!(
1816                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1817                    len, N
1818                )));
1819            }
1820        }
1821        out.copy_from_slice(&raw[extra..]);
1822        return Ok(out);
1823    }
1824    out[N - len..].copy_from_slice(raw);
1825    Ok(out)
1826}
1827
1828#[cfg(feature = "avro_custom_types")]
1829#[inline]
1830fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
1831    match (arr.is_null(i), arr.is_null(j)) {
1832        (true, true) => true,
1833        (true, false) | (false, true) => false,
1834        (false, false) => {
1835            let a = arr.slice(i, 1);
1836            let b = arr.slice(j, 1);
1837            a == b
1838        }
1839    }
1840}
1841
1842#[derive(Debug)]
1843struct Projector {
1844    writer_to_reader: Arc<[Option<usize>]>,
1845    skip_decoders: Vec<Option<Skipper>>,
1846    field_defaults: Vec<Option<AvroLiteral>>,
1847    default_injections: Arc<[(usize, AvroLiteral)]>,
1848}
1849
1850#[derive(Debug)]
1851struct ProjectorBuilder<'a> {
1852    rec: &'a ResolvedRecord,
1853    reader_fields: Arc<[AvroField]>,
1854}
1855
1856impl<'a> ProjectorBuilder<'a> {
1857    #[inline]
1858    fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
1859        Self {
1860            rec,
1861            reader_fields: reader_fields.clone(),
1862        }
1863    }
1864
1865    #[inline]
1866    fn build(self) -> Result<Projector, ArrowError> {
1867        let reader_fields = self.reader_fields;
1868        let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
1869        for avro_field in reader_fields.as_ref() {
1870            if let Some(ResolutionInfo::DefaultValue(lit)) =
1871                avro_field.data_type().resolution.as_ref()
1872            {
1873                field_defaults.push(Some(lit.clone()));
1874            } else {
1875                field_defaults.push(None);
1876            }
1877        }
1878        let mut default_injections: Vec<(usize, AvroLiteral)> =
1879            Vec::with_capacity(self.rec.default_fields.len());
1880        for &idx in self.rec.default_fields.as_ref() {
1881            let lit = field_defaults
1882                .get(idx)
1883                .and_then(|lit| lit.clone())
1884                .unwrap_or(AvroLiteral::Null);
1885            default_injections.push((idx, lit));
1886        }
1887        let mut skip_decoders: Vec<Option<Skipper>> =
1888            Vec::with_capacity(self.rec.skip_fields.len());
1889        for datatype in self.rec.skip_fields.as_ref() {
1890            let skipper = match datatype {
1891                Some(datatype) => Some(Skipper::from_avro(datatype)?),
1892                None => None,
1893            };
1894            skip_decoders.push(skipper);
1895        }
1896        Ok(Projector {
1897            writer_to_reader: self.rec.writer_to_reader.clone(),
1898            skip_decoders,
1899            field_defaults,
1900            default_injections: default_injections.into(),
1901        })
1902    }
1903}
1904
1905impl Projector {
1906    #[inline]
1907    fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), ArrowError> {
1908        // SAFETY: `index` is obtained by listing the reader's record fields (i.e., from
1909        // `decoders.iter_mut().enumerate()`), and `field_defaults` was built in
1910        // `ProjectorBuilder::build` to have exactly one element per reader field.
1911        // Therefore, `index < self.field_defaults.len()` always holds here, so
1912        // `self.field_defaults[index]` cannot panic. We only take an immutable reference
1913        // via `.as_ref()`, and `self` is borrowed immutably.
1914        if let Some(default_literal) = self.field_defaults[index].as_ref() {
1915            decoder.append_default(default_literal)
1916        } else {
1917            decoder.append_null()
1918        }
1919    }
1920
1921    #[inline]
1922    fn project_record(
1923        &mut self,
1924        buf: &mut AvroCursor<'_>,
1925        encodings: &mut [Decoder],
1926    ) -> Result<(), ArrowError> {
1927        debug_assert_eq!(
1928            self.writer_to_reader.len(),
1929            self.skip_decoders.len(),
1930            "internal invariant: mapping and skipper lists must have equal length"
1931        );
1932        for (i, (mapping, skipper_opt)) in self
1933            .writer_to_reader
1934            .iter()
1935            .zip(self.skip_decoders.iter_mut())
1936            .enumerate()
1937        {
1938            match (mapping, skipper_opt.as_mut()) {
1939                (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
1940                (None, Some(skipper)) => skipper.skip(buf)?,
1941                (None, None) => {
1942                    return Err(ArrowError::SchemaError(format!(
1943                        "No skipper available for writer-only field at index {i}",
1944                    )));
1945                }
1946            }
1947        }
1948        for (reader_index, lit) in self.default_injections.as_ref() {
1949            encodings[*reader_index].append_default(lit)?;
1950        }
1951        Ok(())
1952    }
1953}
1954
1955/// Lightweight skipper for non‑projected writer fields
1956/// (fields present in the writer schema but omitted by the reader/projection);
1957/// per Avro 1.11.1 schema resolution these fields are ignored.
1958///
1959/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
1960#[derive(Debug)]
1961enum Skipper {
1962    Null,
1963    Boolean,
1964    Int32,
1965    Int64,
1966    Float32,
1967    Float64,
1968    Bytes,
1969    String,
1970    TimeMicros,
1971    TimestampMillis,
1972    TimestampMicros,
1973    TimestampNanos,
1974    Fixed(usize),
1975    Decimal(Option<usize>),
1976    UuidString,
1977    Enum,
1978    DurationFixed12,
1979    List(Box<Skipper>),
1980    Map(Box<Skipper>),
1981    Struct(Vec<Skipper>),
1982    Union(Vec<Skipper>),
1983    Nullable(Nullability, Box<Skipper>),
1984    #[cfg(feature = "avro_custom_types")]
1985    RunEndEncoded(Box<Skipper>),
1986}
1987
1988impl Skipper {
1989    fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
1990        let mut base = match dt.codec() {
1991            Codec::Null => Self::Null,
1992            Codec::Boolean => Self::Boolean,
1993            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1994            Codec::Int64 => Self::Int64,
1995            Codec::TimeMicros => Self::TimeMicros,
1996            Codec::TimestampMillis(_) => Self::TimestampMillis,
1997            Codec::TimestampMicros(_) => Self::TimestampMicros,
1998            Codec::TimestampNanos(_) => Self::TimestampNanos,
1999            #[cfg(feature = "avro_custom_types")]
2000            Codec::DurationNanos
2001            | Codec::DurationMicros
2002            | Codec::DurationMillis
2003            | Codec::DurationSeconds => Self::Int64,
2004            Codec::Float32 => Self::Float32,
2005            Codec::Float64 => Self::Float64,
2006            Codec::Binary => Self::Bytes,
2007            Codec::Utf8 | Codec::Utf8View => Self::String,
2008            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2009            Codec::Decimal(_, _, size) => Self::Decimal(*size),
2010            Codec::Uuid => Self::UuidString, // encoded as string
2011            Codec::Enum(_) => Self::Enum,
2012            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2013            Codec::Struct(fields) => Self::Struct(
2014                fields
2015                    .iter()
2016                    .map(|f| Skipper::from_avro(f.data_type()))
2017                    .collect::<Result<_, _>>()?,
2018            ),
2019            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2020            Codec::Interval => Self::DurationFixed12,
2021            Codec::Union(encodings, _, _) => {
2022                let max_addr = (i32::MAX as usize) + 1;
2023                if encodings.len() > max_addr {
2024                    return Err(ArrowError::SchemaError(format!(
2025                        "Writer union has {} branches, which exceeds the maximum addressable \
2026                         branches by an Avro int tag ({} + 1).",
2027                        encodings.len(),
2028                        i32::MAX
2029                    )));
2030                }
2031                Self::Union(
2032                    encodings
2033                        .iter()
2034                        .map(Skipper::from_avro)
2035                        .collect::<Result<_, _>>()?,
2036                )
2037            }
2038            #[cfg(feature = "avro_custom_types")]
2039            Codec::RunEndEncoded(inner, _w) => {
2040                Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2041            }
2042        };
2043        if let Some(n) = dt.nullability() {
2044            base = Self::Nullable(n, Box::new(base));
2045        }
2046        Ok(base)
2047    }
2048
2049    fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
2050        match self {
2051            Self::Null => Ok(()),
2052            Self::Boolean => {
2053                buf.get_bool()?;
2054                Ok(())
2055            }
2056            Self::Int32 => {
2057                buf.get_int()?;
2058                Ok(())
2059            }
2060            Self::Int64
2061            | Self::TimeMicros
2062            | Self::TimestampMillis
2063            | Self::TimestampMicros
2064            | Self::TimestampNanos => {
2065                buf.get_long()?;
2066                Ok(())
2067            }
2068            Self::Float32 => {
2069                buf.get_float()?;
2070                Ok(())
2071            }
2072            Self::Float64 => {
2073                buf.get_double()?;
2074                Ok(())
2075            }
2076            Self::Bytes | Self::String | Self::UuidString => {
2077                buf.get_bytes()?;
2078                Ok(())
2079            }
2080            Self::Fixed(sz) => {
2081                buf.get_fixed(*sz)?;
2082                Ok(())
2083            }
2084            Self::Decimal(size) => {
2085                if let Some(s) = size {
2086                    buf.get_fixed(*s)
2087                } else {
2088                    buf.get_bytes()
2089                }?;
2090                Ok(())
2091            }
2092            Self::Enum => {
2093                buf.get_int()?;
2094                Ok(())
2095            }
2096            Self::DurationFixed12 => {
2097                buf.get_fixed(12)?;
2098                Ok(())
2099            }
2100            Self::List(item) => {
2101                skip_blocks(buf, |c| item.skip(c))?;
2102                Ok(())
2103            }
2104            Self::Map(value) => {
2105                skip_blocks(buf, |c| {
2106                    c.get_bytes()?; // key
2107                    value.skip(c)
2108                })?;
2109                Ok(())
2110            }
2111            Self::Struct(fields) => {
2112                for f in fields.iter_mut() {
2113                    f.skip(buf)?
2114                }
2115                Ok(())
2116            }
2117            Self::Union(encodings) => {
2118                // Union tag must be ZigZag-decoded
2119                let raw = buf.get_long()?;
2120                if raw < 0 {
2121                    return Err(ArrowError::ParseError(format!(
2122                        "Negative union branch index {raw}"
2123                    )));
2124                }
2125                let idx: usize = usize::try_from(raw).map_err(|_| {
2126                    ArrowError::ParseError(format!(
2127                        "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2128                        (usize::BITS as usize)
2129                    ))
2130                })?;
2131                let Some(encoding) = encodings.get_mut(idx) else {
2132                    return Err(ArrowError::ParseError(format!(
2133                        "Union branch index {idx} out of range for skipper ({} branches)",
2134                        encodings.len()
2135                    )));
2136                };
2137                encoding.skip(buf)
2138            }
2139            Self::Nullable(order, inner) => {
2140                let branch = buf.read_vlq()?;
2141                let is_not_null = match *order {
2142                    Nullability::NullFirst => branch != 0,
2143                    Nullability::NullSecond => branch == 0,
2144                };
2145                if is_not_null {
2146                    inner.skip(buf)?;
2147                }
2148                Ok(())
2149            }
2150            #[cfg(feature = "avro_custom_types")]
2151            Self::RunEndEncoded(inner) => inner.skip(buf),
2152        }
2153    }
2154}
2155
2156#[cfg(test)]
2157mod tests {
2158    use super::*;
2159    use crate::codec::AvroFieldBuilder;
2160    use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2161    use arrow_array::cast::AsArray;
2162    use indexmap::IndexMap;
2163    use std::collections::HashMap;
2164
2165    fn encode_avro_int(value: i32) -> Vec<u8> {
2166        let mut buf = Vec::new();
2167        let mut v = (value << 1) ^ (value >> 31);
2168        while v & !0x7F != 0 {
2169            buf.push(((v & 0x7F) | 0x80) as u8);
2170            v >>= 7;
2171        }
2172        buf.push(v as u8);
2173        buf
2174    }
2175
2176    fn encode_avro_long(value: i64) -> Vec<u8> {
2177        let mut buf = Vec::new();
2178        let mut v = (value << 1) ^ (value >> 63);
2179        while v & !0x7F != 0 {
2180            buf.push(((v & 0x7F) | 0x80) as u8);
2181            v >>= 7;
2182        }
2183        buf.push(v as u8);
2184        buf
2185    }
2186
2187    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2188        let mut buf = encode_avro_long(bytes.len() as i64);
2189        buf.extend_from_slice(bytes);
2190        buf
2191    }
2192
2193    fn avro_from_codec(codec: Codec) -> AvroDataType {
2194        AvroDataType::new(codec, Default::default(), None)
2195    }
2196
2197    fn resolved_root_datatype(
2198        writer: Schema<'static>,
2199        reader: Schema<'static>,
2200        use_utf8view: bool,
2201        strict_mode: bool,
2202    ) -> AvroDataType {
2203        // Wrap writer schema in a single-field record
2204        let writer_record = Schema::Complex(ComplexType::Record(Record {
2205            name: "Root",
2206            namespace: None,
2207            doc: None,
2208            aliases: vec![],
2209            fields: vec![Field {
2210                name: "v",
2211                r#type: writer,
2212                default: None,
2213                doc: None,
2214                aliases: vec![],
2215            }],
2216            attributes: Attributes::default(),
2217        }));
2218
2219        // Wrap reader schema in a single-field record
2220        let reader_record = Schema::Complex(ComplexType::Record(Record {
2221            name: "Root",
2222            namespace: None,
2223            doc: None,
2224            aliases: vec![],
2225            fields: vec![Field {
2226                name: "v",
2227                r#type: reader,
2228                default: None,
2229                doc: None,
2230                aliases: vec![],
2231            }],
2232            attributes: Attributes::default(),
2233        }));
2234
2235        // Build resolved record, then extract the inner field's resolved AvroDataType
2236        let field = AvroFieldBuilder::new(&writer_record)
2237            .with_reader_schema(&reader_record)
2238            .with_utf8view(use_utf8view)
2239            .with_strict_mode(strict_mode)
2240            .build()
2241            .expect("schema resolution should succeed");
2242
2243        match field.data_type().codec() {
2244            Codec::Struct(fields) => fields[0].data_type().clone(),
2245            other => panic!("expected wrapper struct, got {other:?}"),
2246        }
2247    }
2248
2249    fn decoder_for_promotion(
2250        writer: PrimitiveType,
2251        reader: PrimitiveType,
2252        use_utf8view: bool,
2253    ) -> Decoder {
2254        let ws = Schema::TypeName(TypeName::Primitive(writer));
2255        let rs = Schema::TypeName(TypeName::Primitive(reader));
2256        let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2257        Decoder::try_new(&dt).unwrap()
2258    }
2259
2260    fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2261        AvroDataType::new(codec, HashMap::new(), nullability)
2262    }
2263
2264    #[cfg(feature = "avro_custom_types")]
2265    fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2266        let mut out = Vec::with_capacity(10);
2267        while x >= 0x80 {
2268            out.push((x as u8) | 0x80);
2269            x >>= 7;
2270        }
2271        out.push(x as u8);
2272        out
2273    }
2274
2275    #[test]
2276    fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2277        let ws = Schema::Union(vec![
2278            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2279            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2280        ]);
2281        let rs = Schema::Union(vec![
2282            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2283            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2284        ]);
2285
2286        let dt = resolved_root_datatype(ws, rs, false, false);
2287        let mut dec = Decoder::try_new(&dt).unwrap();
2288
2289        let mut rec1 = encode_avro_long(0);
2290        rec1.extend(encode_avro_int(7));
2291        let mut cur1 = AvroCursor::new(&rec1);
2292        dec.decode(&mut cur1).unwrap();
2293
2294        let mut rec2 = encode_avro_long(1);
2295        rec2.extend(encode_avro_bytes("abc".as_bytes()));
2296        let mut cur2 = AvroCursor::new(&rec2);
2297        dec.decode(&mut cur2).unwrap();
2298
2299        let arr = dec.flush(None).unwrap();
2300        let ua = arr
2301            .as_any()
2302            .downcast_ref::<UnionArray>()
2303            .expect("dense union output");
2304
2305        assert_eq!(
2306            ua.type_id(0),
2307            1,
2308            "first value must select reader 'long' branch"
2309        );
2310        assert_eq!(ua.value_offset(0), 0);
2311
2312        assert_eq!(
2313            ua.type_id(1),
2314            0,
2315            "second value must select reader 'string' branch"
2316        );
2317        assert_eq!(ua.value_offset(1), 0);
2318
2319        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2320        assert_eq!(long_child.len(), 1);
2321        assert_eq!(long_child.value(0), 7);
2322
2323        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2324        assert_eq!(str_child.len(), 1);
2325        assert_eq!(str_child.value(0), "abc");
2326    }
2327
2328    #[test]
2329    fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2330        let ws = Schema::Union(vec![
2331            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2332            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2333        ]);
2334        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2335
2336        let dt = resolved_root_datatype(ws, rs, false, false);
2337        let mut dec = Decoder::try_new(&dt).unwrap();
2338
2339        let mut data = encode_avro_long(0);
2340        data.extend(encode_avro_int(5));
2341        let mut cur = AvroCursor::new(&data);
2342        dec.decode(&mut cur).unwrap();
2343
2344        let arr = dec.flush(None).unwrap();
2345        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2346        assert_eq!(out.len(), 1);
2347        assert_eq!(out.value(0), 5);
2348    }
2349
2350    #[test]
2351    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2352        let ws = Schema::Union(vec![
2353            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2354            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2355        ]);
2356        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2357
2358        let dt = resolved_root_datatype(ws, rs, false, false);
2359        let mut dec = Decoder::try_new(&dt).unwrap();
2360
2361        let mut data = encode_avro_long(1);
2362        data.extend(encode_avro_bytes("z".as_bytes()));
2363        let mut cur = AvroCursor::new(&data);
2364        let res = dec.decode(&mut cur);
2365        assert!(
2366            res.is_err(),
2367            "expected error when writer union branch does not resolve to reader non-union type"
2368        );
2369    }
2370
2371    #[test]
2372    fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2373        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2374        let rs = Schema::Union(vec![
2375            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2376            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2377        ]);
2378
2379        let dt = resolved_root_datatype(ws, rs, false, false);
2380        let mut dec = Decoder::try_new(&dt).unwrap();
2381
2382        let data = encode_avro_int(6);
2383        let mut cur = AvroCursor::new(&data);
2384        dec.decode(&mut cur).unwrap();
2385
2386        let arr = dec.flush(None).unwrap();
2387        let ua = arr
2388            .as_any()
2389            .downcast_ref::<UnionArray>()
2390            .expect("dense union output");
2391        assert_eq!(ua.len(), 1);
2392        assert_eq!(
2393            ua.type_id(0),
2394            1,
2395            "must resolve to reader 'long' branch (type_id 1)"
2396        );
2397        assert_eq!(ua.value_offset(0), 0);
2398
2399        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2400        assert_eq!(long_child.len(), 1);
2401        assert_eq!(long_child.value(0), 6);
2402
2403        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2404        assert_eq!(str_child.len(), 0, "string branch must be empty");
2405    }
2406
2407    #[test]
2408    fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2409        let ws = Schema::Union(vec![
2410            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2411            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2412        ]);
2413        let rs = Schema::Union(vec![
2414            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2415            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2416        ]);
2417
2418        let dt = resolved_root_datatype(ws, rs, false, false);
2419        let mut dec = Decoder::try_new(&dt).unwrap();
2420
2421        let mut data = encode_avro_long(1);
2422        data.push(1);
2423        let mut cur = AvroCursor::new(&data);
2424        let res = dec.decode(&mut cur);
2425        assert!(
2426            res.is_err(),
2427            "expected error for unmapped writer 'boolean' branch"
2428        );
2429    }
2430
2431    #[test]
2432    fn test_schema_resolution_promotion_int_to_long() {
2433        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2434        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2435        for v in [0, 1, -2, 123456] {
2436            let data = encode_avro_int(v);
2437            let mut cur = AvroCursor::new(&data);
2438            dec.decode(&mut cur).unwrap();
2439        }
2440        let arr = dec.flush(None).unwrap();
2441        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2442        assert_eq!(a.value(0), 0);
2443        assert_eq!(a.value(1), 1);
2444        assert_eq!(a.value(2), -2);
2445        assert_eq!(a.value(3), 123456);
2446    }
2447
2448    #[test]
2449    fn test_schema_resolution_promotion_int_to_float() {
2450        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2451        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2452        for v in [0, 42, -7] {
2453            let data = encode_avro_int(v);
2454            let mut cur = AvroCursor::new(&data);
2455            dec.decode(&mut cur).unwrap();
2456        }
2457        let arr = dec.flush(None).unwrap();
2458        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2459        assert_eq!(a.value(0), 0.0);
2460        assert_eq!(a.value(1), 42.0);
2461        assert_eq!(a.value(2), -7.0);
2462    }
2463
2464    #[test]
2465    fn test_schema_resolution_promotion_int_to_double() {
2466        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2467        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2468        for v in [1, -1, 10_000] {
2469            let data = encode_avro_int(v);
2470            let mut cur = AvroCursor::new(&data);
2471            dec.decode(&mut cur).unwrap();
2472        }
2473        let arr = dec.flush(None).unwrap();
2474        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2475        assert_eq!(a.value(0), 1.0);
2476        assert_eq!(a.value(1), -1.0);
2477        assert_eq!(a.value(2), 10_000.0);
2478    }
2479
2480    #[test]
2481    fn test_schema_resolution_promotion_long_to_float() {
2482        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2483        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2484        for v in [0_i64, 1_000_000_i64, -123_i64] {
2485            let data = encode_avro_long(v);
2486            let mut cur = AvroCursor::new(&data);
2487            dec.decode(&mut cur).unwrap();
2488        }
2489        let arr = dec.flush(None).unwrap();
2490        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2491        assert_eq!(a.value(0), 0.0);
2492        assert_eq!(a.value(1), 1_000_000.0);
2493        assert_eq!(a.value(2), -123.0);
2494    }
2495
2496    #[test]
2497    fn test_schema_resolution_promotion_long_to_double() {
2498        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2499        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2500        for v in [2_i64, -2_i64, 9_223_372_i64] {
2501            let data = encode_avro_long(v);
2502            let mut cur = AvroCursor::new(&data);
2503            dec.decode(&mut cur).unwrap();
2504        }
2505        let arr = dec.flush(None).unwrap();
2506        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2507        assert_eq!(a.value(0), 2.0);
2508        assert_eq!(a.value(1), -2.0);
2509        assert_eq!(a.value(2), 9_223_372.0);
2510    }
2511
2512    #[test]
2513    fn test_schema_resolution_promotion_float_to_double() {
2514        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2515        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2516        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2517            let data = v.to_le_bytes().to_vec();
2518            let mut cur = AvroCursor::new(&data);
2519            dec.decode(&mut cur).unwrap();
2520        }
2521        let arr = dec.flush(None).unwrap();
2522        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2523        assert_eq!(a.value(0), 0.5_f64);
2524        assert_eq!(a.value(1), -3.25_f64);
2525        assert_eq!(a.value(2), 1.0e6_f64);
2526    }
2527
2528    #[test]
2529    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2530        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2531        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2532        for s in ["hello", "world", "héllo"] {
2533            let data = encode_avro_bytes(s.as_bytes());
2534            let mut cur = AvroCursor::new(&data);
2535            dec.decode(&mut cur).unwrap();
2536        }
2537        let arr = dec.flush(None).unwrap();
2538        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2539        assert_eq!(a.value(0), "hello");
2540        assert_eq!(a.value(1), "world");
2541        assert_eq!(a.value(2), "héllo");
2542    }
2543
2544    #[test]
2545    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2546        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2547        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2548        let data = encode_avro_bytes("abc".as_bytes());
2549        let mut cur = AvroCursor::new(&data);
2550        dec.decode(&mut cur).unwrap();
2551        let arr = dec.flush(None).unwrap();
2552        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2553        assert_eq!(a.value(0), "abc");
2554    }
2555
2556    #[test]
2557    fn test_schema_resolution_promotion_string_to_bytes() {
2558        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2559        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2560        for s in ["", "abc", "data"] {
2561            let data = encode_avro_bytes(s.as_bytes());
2562            let mut cur = AvroCursor::new(&data);
2563            dec.decode(&mut cur).unwrap();
2564        }
2565        let arr = dec.flush(None).unwrap();
2566        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2567        assert_eq!(a.value(0), b"");
2568        assert_eq!(a.value(1), b"abc");
2569        assert_eq!(a.value(2), "data".as_bytes());
2570    }
2571
2572    #[test]
2573    fn test_schema_resolution_no_promotion_passthrough_int() {
2574        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2575        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2576        // Wrap both in a synthetic single-field record and resolve with AvroFieldBuilder
2577        let writer_record = Schema::Complex(ComplexType::Record(Record {
2578            name: "Root",
2579            namespace: None,
2580            doc: None,
2581            aliases: vec![],
2582            fields: vec![Field {
2583                name: "v",
2584                r#type: ws,
2585                default: None,
2586                doc: None,
2587                aliases: vec![],
2588            }],
2589            attributes: Attributes::default(),
2590        }));
2591        let reader_record = Schema::Complex(ComplexType::Record(Record {
2592            name: "Root",
2593            namespace: None,
2594            doc: None,
2595            aliases: vec![],
2596            fields: vec![Field {
2597                name: "v",
2598                r#type: rs,
2599                default: None,
2600                doc: None,
2601                aliases: vec![],
2602            }],
2603            attributes: Attributes::default(),
2604        }));
2605        let field = AvroFieldBuilder::new(&writer_record)
2606            .with_reader_schema(&reader_record)
2607            .with_utf8view(false)
2608            .with_strict_mode(false)
2609            .build()
2610            .unwrap();
2611        // Extract the resolved inner field's AvroDataType
2612        let dt = match field.data_type().codec() {
2613            Codec::Struct(fields) => fields[0].data_type().clone(),
2614            other => panic!("expected wrapper struct, got {other:?}"),
2615        };
2616        let mut dec = Decoder::try_new(&dt).unwrap();
2617        assert!(matches!(dec, Decoder::Int32(_)));
2618        for v in [7, -9] {
2619            let data = encode_avro_int(v);
2620            let mut cur = AvroCursor::new(&data);
2621            dec.decode(&mut cur).unwrap();
2622        }
2623        let arr = dec.flush(None).unwrap();
2624        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2625        assert_eq!(a.value(0), 7);
2626        assert_eq!(a.value(1), -9);
2627    }
2628
2629    #[test]
2630    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
2631        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2632        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
2633        let writer_record = Schema::Complex(ComplexType::Record(Record {
2634            name: "Root",
2635            namespace: None,
2636            doc: None,
2637            aliases: vec![],
2638            fields: vec![Field {
2639                name: "v",
2640                r#type: ws,
2641                default: None,
2642                doc: None,
2643                aliases: vec![],
2644            }],
2645            attributes: Attributes::default(),
2646        }));
2647        let reader_record = Schema::Complex(ComplexType::Record(Record {
2648            name: "Root",
2649            namespace: None,
2650            doc: None,
2651            aliases: vec![],
2652            fields: vec![Field {
2653                name: "v",
2654                r#type: rs,
2655                default: None,
2656                doc: None,
2657                aliases: vec![],
2658            }],
2659            attributes: Attributes::default(),
2660        }));
2661        let res = AvroFieldBuilder::new(&writer_record)
2662            .with_reader_schema(&reader_record)
2663            .with_utf8view(false)
2664            .with_strict_mode(false)
2665            .build();
2666        assert!(res.is_err(), "expected error for illegal promotion");
2667    }
2668
2669    #[test]
2670    fn test_map_decoding_one_entry() {
2671        let value_type = avro_from_codec(Codec::Utf8);
2672        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2673        let mut decoder = Decoder::try_new(&map_type).unwrap();
2674        // Encode a single map with one entry: {"hello": "world"}
2675        let mut data = Vec::new();
2676        data.extend_from_slice(&encode_avro_long(1));
2677        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
2678        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
2679        data.extend_from_slice(&encode_avro_long(0));
2680        let mut cursor = AvroCursor::new(&data);
2681        decoder.decode(&mut cursor).unwrap();
2682        let array = decoder.flush(None).unwrap();
2683        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2684        assert_eq!(map_arr.len(), 1); // one map
2685        assert_eq!(map_arr.value_length(0), 1);
2686        let entries = map_arr.value(0);
2687        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
2688        assert_eq!(struct_entries.len(), 1);
2689        let key_arr = struct_entries
2690            .column_by_name("key")
2691            .unwrap()
2692            .as_any()
2693            .downcast_ref::<StringArray>()
2694            .unwrap();
2695        let val_arr = struct_entries
2696            .column_by_name("value")
2697            .unwrap()
2698            .as_any()
2699            .downcast_ref::<StringArray>()
2700            .unwrap();
2701        assert_eq!(key_arr.value(0), "hello");
2702        assert_eq!(val_arr.value(0), "world");
2703    }
2704
2705    #[test]
2706    fn test_map_decoding_empty() {
2707        let value_type = avro_from_codec(Codec::Utf8);
2708        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2709        let mut decoder = Decoder::try_new(&map_type).unwrap();
2710        let data = encode_avro_long(0);
2711        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2712        let array = decoder.flush(None).unwrap();
2713        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2714        assert_eq!(map_arr.len(), 1);
2715        assert_eq!(map_arr.value_length(0), 0);
2716    }
2717
2718    #[test]
2719    fn test_fixed_decoding() {
2720        let avro_type = avro_from_codec(Codec::Fixed(3));
2721        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2722
2723        let data1 = [1u8, 2, 3];
2724        let mut cursor1 = AvroCursor::new(&data1);
2725        decoder
2726            .decode(&mut cursor1)
2727            .expect("Failed to decode data1");
2728        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
2729        let data2 = [4u8, 5, 6];
2730        let mut cursor2 = AvroCursor::new(&data2);
2731        decoder
2732            .decode(&mut cursor2)
2733            .expect("Failed to decode data2");
2734        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
2735        let array = decoder.flush(None).expect("Failed to flush decoder");
2736        assert_eq!(array.len(), 2, "Array should contain two items");
2737        let fixed_size_binary_array = array
2738            .as_any()
2739            .downcast_ref::<FixedSizeBinaryArray>()
2740            .expect("Failed to downcast to FixedSizeBinaryArray");
2741        assert_eq!(
2742            fixed_size_binary_array.value_length(),
2743            3,
2744            "Fixed size of binary values should be 3"
2745        );
2746        assert_eq!(
2747            fixed_size_binary_array.value(0),
2748            &[1, 2, 3],
2749            "First item mismatch"
2750        );
2751        assert_eq!(
2752            fixed_size_binary_array.value(1),
2753            &[4, 5, 6],
2754            "Second item mismatch"
2755        );
2756    }
2757
2758    #[test]
2759    fn test_fixed_decoding_empty() {
2760        let avro_type = avro_from_codec(Codec::Fixed(5));
2761        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2762
2763        let array = decoder
2764            .flush(None)
2765            .expect("Failed to flush decoder for empty input");
2766
2767        assert_eq!(array.len(), 0, "Array should be empty");
2768        let fixed_size_binary_array = array
2769            .as_any()
2770            .downcast_ref::<FixedSizeBinaryArray>()
2771            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
2772
2773        assert_eq!(
2774            fixed_size_binary_array.value_length(),
2775            5,
2776            "Fixed size of binary values should be 5 as per type"
2777        );
2778    }
2779
2780    #[test]
2781    fn test_uuid_decoding() {
2782        let avro_type = avro_from_codec(Codec::Uuid);
2783        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2784        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
2785        let data = encode_avro_bytes(uuid_str.as_bytes());
2786        let mut cursor = AvroCursor::new(&data);
2787        decoder.decode(&mut cursor).expect("Failed to decode data");
2788        assert_eq!(
2789            cursor.position(),
2790            data.len(),
2791            "Cursor should advance by varint size + data size"
2792        );
2793        let array = decoder.flush(None).expect("Failed to flush decoder");
2794        let fixed_size_binary_array = array
2795            .as_any()
2796            .downcast_ref::<FixedSizeBinaryArray>()
2797            .expect("Array should be a FixedSizeBinaryArray");
2798        assert_eq!(fixed_size_binary_array.len(), 1);
2799        assert_eq!(fixed_size_binary_array.value_length(), 16);
2800        let expected_bytes = [
2801            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
2802            0x6b, 0xf6,
2803        ];
2804        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
2805    }
2806
2807    #[test]
2808    fn test_array_decoding() {
2809        let item_dt = avro_from_codec(Codec::Int32);
2810        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2811        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2812        let mut row1 = Vec::new();
2813        row1.extend_from_slice(&encode_avro_long(2));
2814        row1.extend_from_slice(&encode_avro_int(10));
2815        row1.extend_from_slice(&encode_avro_int(20));
2816        row1.extend_from_slice(&encode_avro_long(0));
2817        let row2 = encode_avro_long(0);
2818        let mut cursor = AvroCursor::new(&row1);
2819        decoder.decode(&mut cursor).unwrap();
2820        let mut cursor2 = AvroCursor::new(&row2);
2821        decoder.decode(&mut cursor2).unwrap();
2822        let array = decoder.flush(None).unwrap();
2823        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2824        assert_eq!(list_arr.len(), 2);
2825        let offsets = list_arr.value_offsets();
2826        assert_eq!(offsets, &[0, 2, 2]);
2827        let values = list_arr.values();
2828        let int_arr = values.as_primitive::<Int32Type>();
2829        assert_eq!(int_arr.len(), 2);
2830        assert_eq!(int_arr.value(0), 10);
2831        assert_eq!(int_arr.value(1), 20);
2832    }
2833
2834    #[test]
2835    fn test_array_decoding_with_negative_block_count() {
2836        let item_dt = avro_from_codec(Codec::Int32);
2837        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2838        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2839        let mut data = encode_avro_long(-3);
2840        data.extend_from_slice(&encode_avro_long(12));
2841        data.extend_from_slice(&encode_avro_int(1));
2842        data.extend_from_slice(&encode_avro_int(2));
2843        data.extend_from_slice(&encode_avro_int(3));
2844        data.extend_from_slice(&encode_avro_long(0));
2845        let mut cursor = AvroCursor::new(&data);
2846        decoder.decode(&mut cursor).unwrap();
2847        let array = decoder.flush(None).unwrap();
2848        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2849        assert_eq!(list_arr.len(), 1);
2850        assert_eq!(list_arr.value_length(0), 3);
2851        let values = list_arr.values().as_primitive::<Int32Type>();
2852        assert_eq!(values.len(), 3);
2853        assert_eq!(values.value(0), 1);
2854        assert_eq!(values.value(1), 2);
2855        assert_eq!(values.value(2), 3);
2856    }
2857
2858    #[test]
2859    fn test_nested_array_decoding() {
2860        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
2861        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
2862        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
2863        let mut buf = Vec::new();
2864        buf.extend(encode_avro_long(1));
2865        buf.extend(encode_avro_long(2));
2866        buf.extend(encode_avro_int(5));
2867        buf.extend(encode_avro_int(6));
2868        buf.extend(encode_avro_long(0));
2869        buf.extend(encode_avro_long(0));
2870        let mut cursor = AvroCursor::new(&buf);
2871        decoder.decode(&mut cursor).unwrap();
2872        let arr = decoder.flush(None).unwrap();
2873        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
2874        assert_eq!(outer.len(), 1);
2875        assert_eq!(outer.value_length(0), 1);
2876        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
2877        assert_eq!(inner.len(), 1);
2878        assert_eq!(inner.value_length(0), 2);
2879        let values = inner
2880            .values()
2881            .as_any()
2882            .downcast_ref::<Int32Array>()
2883            .unwrap();
2884        assert_eq!(values.values(), &[5, 6]);
2885    }
2886
2887    #[test]
2888    fn test_array_decoding_empty_array() {
2889        let value_type = avro_from_codec(Codec::Utf8);
2890        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
2891        let mut decoder = Decoder::try_new(&map_type).unwrap();
2892        let data = encode_avro_long(0);
2893        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2894        let array = decoder.flush(None).unwrap();
2895        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2896        assert_eq!(list_arr.len(), 1);
2897        assert_eq!(list_arr.value_length(0), 0);
2898    }
2899
2900    #[test]
2901    fn test_decimal_decoding_fixed256() {
2902        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
2903        let mut decoder = Decoder::try_new(&dt).unwrap();
2904        let row1 = [
2905            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2906            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2907            0x00, 0x00, 0x30, 0x39,
2908        ];
2909        let row2 = [
2910            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2911            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2912            0xFF, 0xFF, 0xFF, 0x85,
2913        ];
2914        let mut data = Vec::new();
2915        data.extend_from_slice(&row1);
2916        data.extend_from_slice(&row2);
2917        let mut cursor = AvroCursor::new(&data);
2918        decoder.decode(&mut cursor).unwrap();
2919        decoder.decode(&mut cursor).unwrap();
2920        let arr = decoder.flush(None).unwrap();
2921        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
2922        assert_eq!(dec.len(), 2);
2923        assert_eq!(dec.value_as_string(0), "123.45");
2924        assert_eq!(dec.value_as_string(1), "-1.23");
2925    }
2926
2927    #[test]
2928    fn test_decimal_decoding_fixed128() {
2929        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
2930        let mut decoder = Decoder::try_new(&dt).unwrap();
2931        let row1 = [
2932            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2933            0x30, 0x39,
2934        ];
2935        let row2 = [
2936            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2937            0xFF, 0x85,
2938        ];
2939        let mut data = Vec::new();
2940        data.extend_from_slice(&row1);
2941        data.extend_from_slice(&row2);
2942        let mut cursor = AvroCursor::new(&data);
2943        decoder.decode(&mut cursor).unwrap();
2944        decoder.decode(&mut cursor).unwrap();
2945        let arr = decoder.flush(None).unwrap();
2946        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2947        assert_eq!(dec.len(), 2);
2948        assert_eq!(dec.value_as_string(0), "123.45");
2949        assert_eq!(dec.value_as_string(1), "-1.23");
2950    }
2951
2952    #[test]
2953    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
2954        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
2955        let mut decoder = Decoder::try_new(&dt).unwrap();
2956        let row1 = [
2957            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2958            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2959            0x00, 0x00, 0x30, 0x39,
2960        ];
2961        let row2 = [
2962            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2963            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2964            0xFF, 0xFF, 0xFF, 0x85,
2965        ];
2966        let mut data = Vec::new();
2967        data.extend_from_slice(&row1);
2968        data.extend_from_slice(&row2);
2969        let mut cursor = AvroCursor::new(&data);
2970        decoder.decode(&mut cursor).unwrap();
2971        decoder.decode(&mut cursor).unwrap();
2972        let arr = decoder.flush(None).unwrap();
2973        #[cfg(feature = "small_decimals")]
2974        {
2975            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2976            assert_eq!(dec.len(), 2);
2977            assert_eq!(dec.value_as_string(0), "123.45");
2978            assert_eq!(dec.value_as_string(1), "-1.23");
2979        }
2980        #[cfg(not(feature = "small_decimals"))]
2981        {
2982            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2983            assert_eq!(dec.len(), 2);
2984            assert_eq!(dec.value_as_string(0), "123.45");
2985            assert_eq!(dec.value_as_string(1), "-1.23");
2986        }
2987    }
2988
2989    #[test]
2990    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
2991        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
2992        let mut decoder = Decoder::try_new(&dt).unwrap();
2993        let row1 = [
2994            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2995            0x30, 0x39,
2996        ];
2997        let row2 = [
2998            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2999            0xFF, 0x85,
3000        ];
3001        let mut data = Vec::new();
3002        data.extend_from_slice(&row1);
3003        data.extend_from_slice(&row2);
3004        let mut cursor = AvroCursor::new(&data);
3005        decoder.decode(&mut cursor).unwrap();
3006        decoder.decode(&mut cursor).unwrap();
3007
3008        let arr = decoder.flush(None).unwrap();
3009        #[cfg(feature = "small_decimals")]
3010        {
3011            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3012            assert_eq!(dec.len(), 2);
3013            assert_eq!(dec.value_as_string(0), "123.45");
3014            assert_eq!(dec.value_as_string(1), "-1.23");
3015        }
3016        #[cfg(not(feature = "small_decimals"))]
3017        {
3018            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3019            assert_eq!(dec.len(), 2);
3020            assert_eq!(dec.value_as_string(0), "123.45");
3021            assert_eq!(dec.value_as_string(1), "-1.23");
3022        }
3023    }
3024
3025    #[test]
3026    fn test_decimal_decoding_bytes_with_nulls() {
3027        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3028        let inner = Decoder::try_new(&dt).unwrap();
3029        let mut decoder = Decoder::Nullable(
3030            Nullability::NullSecond,
3031            NullBufferBuilder::new(DEFAULT_CAPACITY),
3032            Box::new(inner),
3033            NullablePlan::ReadTag,
3034        );
3035        let mut data = Vec::new();
3036        data.extend_from_slice(&encode_avro_int(0));
3037        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3038        data.extend_from_slice(&encode_avro_int(1));
3039        data.extend_from_slice(&encode_avro_int(0));
3040        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3041        let mut cursor = AvroCursor::new(&data);
3042        decoder.decode(&mut cursor).unwrap();
3043        decoder.decode(&mut cursor).unwrap();
3044        decoder.decode(&mut cursor).unwrap();
3045        let arr = decoder.flush(None).unwrap();
3046        #[cfg(feature = "small_decimals")]
3047        {
3048            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3049            assert_eq!(dec_arr.len(), 3);
3050            assert!(dec_arr.is_valid(0));
3051            assert!(!dec_arr.is_valid(1));
3052            assert!(dec_arr.is_valid(2));
3053            assert_eq!(dec_arr.value_as_string(0), "123.4");
3054            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3055        }
3056        #[cfg(not(feature = "small_decimals"))]
3057        {
3058            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3059            assert_eq!(dec_arr.len(), 3);
3060            assert!(dec_arr.is_valid(0));
3061            assert!(!dec_arr.is_valid(1));
3062            assert!(dec_arr.is_valid(2));
3063            assert_eq!(dec_arr.value_as_string(0), "123.4");
3064            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3065        }
3066    }
3067
3068    #[test]
3069    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3070        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3071        let inner = Decoder::try_new(&dt).unwrap();
3072        let mut decoder = Decoder::Nullable(
3073            Nullability::NullSecond,
3074            NullBufferBuilder::new(DEFAULT_CAPACITY),
3075            Box::new(inner),
3076            NullablePlan::ReadTag,
3077        );
3078        let row1 = [
3079            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3080            0xE2, 0x40,
3081        ];
3082        let row3 = [
3083            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3084            0x1D, 0xC0,
3085        ];
3086        let mut data = Vec::new();
3087        data.extend_from_slice(&encode_avro_int(0));
3088        data.extend_from_slice(&row1);
3089        data.extend_from_slice(&encode_avro_int(1));
3090        data.extend_from_slice(&encode_avro_int(0));
3091        data.extend_from_slice(&row3);
3092        let mut cursor = AvroCursor::new(&data);
3093        decoder.decode(&mut cursor).unwrap();
3094        decoder.decode(&mut cursor).unwrap();
3095        decoder.decode(&mut cursor).unwrap();
3096        let arr = decoder.flush(None).unwrap();
3097        #[cfg(feature = "small_decimals")]
3098        {
3099            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3100            assert_eq!(dec_arr.len(), 3);
3101            assert!(dec_arr.is_valid(0));
3102            assert!(!dec_arr.is_valid(1));
3103            assert!(dec_arr.is_valid(2));
3104            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3105            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3106        }
3107        #[cfg(not(feature = "small_decimals"))]
3108        {
3109            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3110            assert_eq!(dec_arr.len(), 3);
3111            assert!(dec_arr.is_valid(0));
3112            assert!(!dec_arr.is_valid(1));
3113            assert!(dec_arr.is_valid(2));
3114            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3115            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3116        }
3117    }
3118
3119    #[test]
3120    fn test_enum_decoding() {
3121        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3122        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3123        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3124        let mut data = Vec::new();
3125        data.extend_from_slice(&encode_avro_int(2));
3126        data.extend_from_slice(&encode_avro_int(0));
3127        data.extend_from_slice(&encode_avro_int(1));
3128        let mut cursor = AvroCursor::new(&data);
3129        decoder.decode(&mut cursor).unwrap();
3130        decoder.decode(&mut cursor).unwrap();
3131        decoder.decode(&mut cursor).unwrap();
3132        let array = decoder.flush(None).unwrap();
3133        let dict_array = array
3134            .as_any()
3135            .downcast_ref::<DictionaryArray<Int32Type>>()
3136            .unwrap();
3137        assert_eq!(dict_array.len(), 3);
3138        let values = dict_array
3139            .values()
3140            .as_any()
3141            .downcast_ref::<StringArray>()
3142            .unwrap();
3143        assert_eq!(values.value(0), "A");
3144        assert_eq!(values.value(1), "B");
3145        assert_eq!(values.value(2), "C");
3146        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3147    }
3148
3149    #[test]
3150    fn test_enum_decoding_with_nulls() {
3151        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3152        let enum_codec = Codec::Enum(symbols.clone());
3153        let avro_type =
3154            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3155        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3156        let mut data = Vec::new();
3157        data.extend_from_slice(&encode_avro_long(1));
3158        data.extend_from_slice(&encode_avro_int(1));
3159        data.extend_from_slice(&encode_avro_long(0));
3160        data.extend_from_slice(&encode_avro_long(1));
3161        data.extend_from_slice(&encode_avro_int(0));
3162        let mut cursor = AvroCursor::new(&data);
3163        decoder.decode(&mut cursor).unwrap();
3164        decoder.decode(&mut cursor).unwrap();
3165        decoder.decode(&mut cursor).unwrap();
3166        let array = decoder.flush(None).unwrap();
3167        let dict_array = array
3168            .as_any()
3169            .downcast_ref::<DictionaryArray<Int32Type>>()
3170            .unwrap();
3171        assert_eq!(dict_array.len(), 3);
3172        assert!(dict_array.is_valid(0));
3173        assert!(dict_array.is_null(1));
3174        assert!(dict_array.is_valid(2));
3175        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3176        assert_eq!(dict_array.keys(), &expected_keys);
3177        let values = dict_array
3178            .values()
3179            .as_any()
3180            .downcast_ref::<StringArray>()
3181            .unwrap();
3182        assert_eq!(values.value(0), "X");
3183        assert_eq!(values.value(1), "Y");
3184    }
3185
3186    #[test]
3187    fn test_duration_decoding_with_nulls() {
3188        let duration_codec = Codec::Interval;
3189        let avro_type = AvroDataType::new(
3190            duration_codec,
3191            Default::default(),
3192            Some(Nullability::NullFirst),
3193        );
3194        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3195        let mut data = Vec::new();
3196        // First value: 1 month, 2 days, 3 millis
3197        data.extend_from_slice(&encode_avro_long(1)); // not null
3198        let mut duration1 = Vec::new();
3199        duration1.extend_from_slice(&1u32.to_le_bytes());
3200        duration1.extend_from_slice(&2u32.to_le_bytes());
3201        duration1.extend_from_slice(&3u32.to_le_bytes());
3202        data.extend_from_slice(&duration1);
3203        // Second value: null
3204        data.extend_from_slice(&encode_avro_long(0)); // null
3205        data.extend_from_slice(&encode_avro_long(1)); // not null
3206        let mut duration2 = Vec::new();
3207        duration2.extend_from_slice(&4u32.to_le_bytes());
3208        duration2.extend_from_slice(&5u32.to_le_bytes());
3209        duration2.extend_from_slice(&6u32.to_le_bytes());
3210        data.extend_from_slice(&duration2);
3211        let mut cursor = AvroCursor::new(&data);
3212        decoder.decode(&mut cursor).unwrap();
3213        decoder.decode(&mut cursor).unwrap();
3214        decoder.decode(&mut cursor).unwrap();
3215        let array = decoder.flush(None).unwrap();
3216        let interval_array = array
3217            .as_any()
3218            .downcast_ref::<IntervalMonthDayNanoArray>()
3219            .unwrap();
3220        assert_eq!(interval_array.len(), 3);
3221        assert!(interval_array.is_valid(0));
3222        assert!(interval_array.is_null(1));
3223        assert!(interval_array.is_valid(2));
3224        let expected = IntervalMonthDayNanoArray::from(vec![
3225            Some(IntervalMonthDayNano {
3226                months: 1,
3227                days: 2,
3228                nanoseconds: 3_000_000,
3229            }),
3230            None,
3231            Some(IntervalMonthDayNano {
3232                months: 4,
3233                days: 5,
3234                nanoseconds: 6_000_000,
3235            }),
3236        ]);
3237        assert_eq!(interval_array, &expected);
3238    }
3239
3240    #[test]
3241    fn test_duration_decoding_empty() {
3242        let duration_codec = Codec::Interval;
3243        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3244        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3245        let array = decoder.flush(None).unwrap();
3246        assert_eq!(array.len(), 0);
3247    }
3248
3249    #[test]
3250    #[cfg(feature = "avro_custom_types")]
3251    fn test_duration_seconds_decoding() {
3252        let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3253        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3254        let mut data = Vec::new();
3255        // Three values: 0, -1, 2
3256        data.extend_from_slice(&encode_avro_long(0));
3257        data.extend_from_slice(&encode_avro_long(-1));
3258        data.extend_from_slice(&encode_avro_long(2));
3259        let mut cursor = AvroCursor::new(&data);
3260        decoder.decode(&mut cursor).unwrap();
3261        decoder.decode(&mut cursor).unwrap();
3262        decoder.decode(&mut cursor).unwrap();
3263        let array = decoder.flush(None).unwrap();
3264        let dur = array
3265            .as_any()
3266            .downcast_ref::<DurationSecondArray>()
3267            .unwrap();
3268        assert_eq!(dur.values(), &[0, -1, 2]);
3269    }
3270
3271    #[test]
3272    #[cfg(feature = "avro_custom_types")]
3273    fn test_duration_milliseconds_decoding() {
3274        let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3275        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3276        let mut data = Vec::new();
3277        for v in [1i64, 0, -2] {
3278            data.extend_from_slice(&encode_avro_long(v));
3279        }
3280        let mut cursor = AvroCursor::new(&data);
3281        for _ in 0..3 {
3282            decoder.decode(&mut cursor).unwrap();
3283        }
3284        let array = decoder.flush(None).unwrap();
3285        let dur = array
3286            .as_any()
3287            .downcast_ref::<DurationMillisecondArray>()
3288            .unwrap();
3289        assert_eq!(dur.values(), &[1, 0, -2]);
3290    }
3291
3292    #[test]
3293    #[cfg(feature = "avro_custom_types")]
3294    fn test_duration_microseconds_decoding() {
3295        let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3296        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3297        let mut data = Vec::new();
3298        for v in [5i64, -6, 7] {
3299            data.extend_from_slice(&encode_avro_long(v));
3300        }
3301        let mut cursor = AvroCursor::new(&data);
3302        for _ in 0..3 {
3303            decoder.decode(&mut cursor).unwrap();
3304        }
3305        let array = decoder.flush(None).unwrap();
3306        let dur = array
3307            .as_any()
3308            .downcast_ref::<DurationMicrosecondArray>()
3309            .unwrap();
3310        assert_eq!(dur.values(), &[5, -6, 7]);
3311    }
3312
3313    #[test]
3314    #[cfg(feature = "avro_custom_types")]
3315    fn test_duration_nanoseconds_decoding() {
3316        let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3317        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3318        let mut data = Vec::new();
3319        for v in [8i64, 9, -10] {
3320            data.extend_from_slice(&encode_avro_long(v));
3321        }
3322        let mut cursor = AvroCursor::new(&data);
3323        for _ in 0..3 {
3324            decoder.decode(&mut cursor).unwrap();
3325        }
3326        let array = decoder.flush(None).unwrap();
3327        let dur = array
3328            .as_any()
3329            .downcast_ref::<DurationNanosecondArray>()
3330            .unwrap();
3331        assert_eq!(dur.values(), &[8, 9, -10]);
3332    }
3333
3334    #[test]
3335    fn test_nullable_decode_error_bitmap_corruption() {
3336        // Nullable Int32 with ['T','null'] encoding (NullSecond)
3337        let avro_type = AvroDataType::new(
3338            Codec::Int32,
3339            Default::default(),
3340            Some(Nullability::NullSecond),
3341        );
3342        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3343
3344        // Row 1: union branch 1 (null)
3345        let mut row1 = Vec::new();
3346        row1.extend_from_slice(&encode_avro_int(1));
3347
3348        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
3349        let mut row2 = Vec::new();
3350        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
3351
3352        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
3353        let mut row3 = Vec::new();
3354        row3.extend_from_slice(&encode_avro_int(0)); // branch
3355        row3.extend_from_slice(&encode_avro_int(42)); // actual value
3356
3357        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3358        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
3359        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3360
3361        let array = decoder.flush(None).unwrap();
3362
3363        // Should contain 2 elements: row1 (null) and row3 (42)
3364        assert_eq!(array.len(), 2);
3365        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3366        assert!(int_array.is_null(0)); // row1 is null
3367        assert_eq!(int_array.value(1), 42); // row3 value is 42
3368    }
3369
3370    #[test]
3371    fn test_enum_mapping_reordered_symbols() {
3372        let reader_symbols: Arc<[String]> =
3373            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3374        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3375        let default_index: i32 = -1;
3376        let mut dec = Decoder::Enum(
3377            Vec::with_capacity(DEFAULT_CAPACITY),
3378            reader_symbols.clone(),
3379            Some(EnumResolution {
3380                mapping,
3381                default_index,
3382            }),
3383        );
3384        let mut data = Vec::new();
3385        data.extend_from_slice(&encode_avro_int(0));
3386        data.extend_from_slice(&encode_avro_int(1));
3387        data.extend_from_slice(&encode_avro_int(2));
3388        let mut cur = AvroCursor::new(&data);
3389        dec.decode(&mut cur).unwrap();
3390        dec.decode(&mut cur).unwrap();
3391        dec.decode(&mut cur).unwrap();
3392        let arr = dec.flush(None).unwrap();
3393        let dict = arr
3394            .as_any()
3395            .downcast_ref::<DictionaryArray<Int32Type>>()
3396            .unwrap();
3397        let expected_keys = Int32Array::from(vec![2, 0, 1]);
3398        assert_eq!(dict.keys(), &expected_keys);
3399        let values = dict
3400            .values()
3401            .as_any()
3402            .downcast_ref::<StringArray>()
3403            .unwrap();
3404        assert_eq!(values.value(0), "B");
3405        assert_eq!(values.value(1), "C");
3406        assert_eq!(values.value(2), "A");
3407    }
3408
3409    #[test]
3410    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3411        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3412        let default_index: i32 = 1;
3413        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3414        let mut dec = Decoder::Enum(
3415            Vec::with_capacity(DEFAULT_CAPACITY),
3416            reader_symbols.clone(),
3417            Some(EnumResolution {
3418                mapping,
3419                default_index,
3420            }),
3421        );
3422        let mut data = Vec::new();
3423        data.extend_from_slice(&encode_avro_int(0));
3424        data.extend_from_slice(&encode_avro_int(1));
3425        data.extend_from_slice(&encode_avro_int(99));
3426        let mut cur = AvroCursor::new(&data);
3427        dec.decode(&mut cur).unwrap();
3428        dec.decode(&mut cur).unwrap();
3429        dec.decode(&mut cur).unwrap();
3430        let arr = dec.flush(None).unwrap();
3431        let dict = arr
3432            .as_any()
3433            .downcast_ref::<DictionaryArray<Int32Type>>()
3434            .unwrap();
3435        let expected_keys = Int32Array::from(vec![0, 1, 1]);
3436        assert_eq!(dict.keys(), &expected_keys);
3437        let values = dict
3438            .values()
3439            .as_any()
3440            .downcast_ref::<StringArray>()
3441            .unwrap();
3442        assert_eq!(values.value(0), "A");
3443        assert_eq!(values.value(1), "B");
3444    }
3445
3446    #[test]
3447    fn test_enum_mapping_unknown_symbol_without_default_errors() {
3448        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3449        let default_index: i32 = -1; // indicates no default at type-level
3450        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3451        let mut dec = Decoder::Enum(
3452            Vec::with_capacity(DEFAULT_CAPACITY),
3453            reader_symbols,
3454            Some(EnumResolution {
3455                mapping,
3456                default_index,
3457            }),
3458        );
3459        let data = encode_avro_int(0);
3460        let mut cur = AvroCursor::new(&data);
3461        let err = dec
3462            .decode(&mut cur)
3463            .expect_err("expected decode error for unresolved enum without default");
3464        let msg = err.to_string();
3465        assert!(
3466            msg.contains("not resolvable") && msg.contains("no default"),
3467            "unexpected error message: {msg}"
3468        );
3469    }
3470
3471    fn make_record_resolved_decoder(
3472        reader_fields: &[(&str, DataType, bool)],
3473        writer_to_reader: Vec<Option<usize>>,
3474        skip_decoders: Vec<Option<Skipper>>,
3475    ) -> Decoder {
3476        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3477        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3478        for (name, dt, nullable) in reader_fields {
3479            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3480            let enc = match dt {
3481                DataType::Int32 => Decoder::Int32(Vec::new()),
3482                DataType::Int64 => Decoder::Int64(Vec::new()),
3483                DataType::Utf8 => {
3484                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3485                }
3486                other => panic!("Unsupported test reader field type: {other:?}"),
3487            };
3488            encodings.push(enc);
3489        }
3490        let fields: Fields = field_refs.into();
3491        Decoder::Record(
3492            fields,
3493            encodings,
3494            Some(Projector {
3495                writer_to_reader: Arc::from(writer_to_reader),
3496                skip_decoders,
3497                field_defaults: vec![None; reader_fields.len()],
3498                default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3499            }),
3500        )
3501    }
3502
3503    #[test]
3504    fn test_skip_writer_trailing_field_int32() {
3505        let mut dec = make_record_resolved_decoder(
3506            &[("id", arrow_schema::DataType::Int32, false)],
3507            vec![Some(0), None],
3508            vec![None, Some(super::Skipper::Int32)],
3509        );
3510        let mut data = Vec::new();
3511        data.extend_from_slice(&encode_avro_int(7));
3512        data.extend_from_slice(&encode_avro_int(999));
3513        let mut cur = AvroCursor::new(&data);
3514        dec.decode(&mut cur).unwrap();
3515        assert_eq!(cur.position(), data.len());
3516        let arr = dec.flush(None).unwrap();
3517        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
3518        assert_eq!(struct_arr.len(), 1);
3519        let id = struct_arr
3520            .column_by_name("id")
3521            .unwrap()
3522            .as_any()
3523            .downcast_ref::<Int32Array>()
3524            .unwrap();
3525        assert_eq!(id.value(0), 7);
3526    }
3527
3528    #[test]
3529    fn test_skip_writer_middle_field_string() {
3530        let mut dec = make_record_resolved_decoder(
3531            &[
3532                ("id", DataType::Int32, false),
3533                ("score", DataType::Int64, false),
3534            ],
3535            vec![Some(0), None, Some(1)],
3536            vec![None, Some(Skipper::String), None],
3537        );
3538        let mut data = Vec::new();
3539        data.extend_from_slice(&encode_avro_int(42));
3540        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
3541        data.extend_from_slice(&encode_avro_long(1000));
3542        let mut cur = AvroCursor::new(&data);
3543        dec.decode(&mut cur).unwrap();
3544        assert_eq!(cur.position(), data.len());
3545        let arr = dec.flush(None).unwrap();
3546        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3547        let id = s
3548            .column_by_name("id")
3549            .unwrap()
3550            .as_any()
3551            .downcast_ref::<Int32Array>()
3552            .unwrap();
3553        let score = s
3554            .column_by_name("score")
3555            .unwrap()
3556            .as_any()
3557            .downcast_ref::<Int64Array>()
3558            .unwrap();
3559        assert_eq!(id.value(0), 42);
3560        assert_eq!(score.value(0), 1000);
3561    }
3562
3563    #[test]
3564    fn test_skip_writer_array_with_negative_block_count_fast() {
3565        let mut dec = make_record_resolved_decoder(
3566            &[("id", DataType::Int32, false)],
3567            vec![None, Some(0)],
3568            vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
3569        );
3570        let mut array_payload = Vec::new();
3571        array_payload.extend_from_slice(&encode_avro_int(1));
3572        array_payload.extend_from_slice(&encode_avro_int(2));
3573        array_payload.extend_from_slice(&encode_avro_int(3));
3574        let mut data = Vec::new();
3575        data.extend_from_slice(&encode_avro_long(-3));
3576        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
3577        data.extend_from_slice(&array_payload);
3578        data.extend_from_slice(&encode_avro_long(0));
3579        data.extend_from_slice(&encode_avro_int(5));
3580        let mut cur = AvroCursor::new(&data);
3581        dec.decode(&mut cur).unwrap();
3582        assert_eq!(cur.position(), data.len());
3583        let arr = dec.flush(None).unwrap();
3584        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3585        let id = s
3586            .column_by_name("id")
3587            .unwrap()
3588            .as_any()
3589            .downcast_ref::<Int32Array>()
3590            .unwrap();
3591        assert_eq!(id.len(), 1);
3592        assert_eq!(id.value(0), 5);
3593    }
3594
3595    #[test]
3596    fn test_skip_writer_map_with_negative_block_count_fast() {
3597        let mut dec = make_record_resolved_decoder(
3598            &[("id", DataType::Int32, false)],
3599            vec![None, Some(0)],
3600            vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
3601        );
3602        let mut entries = Vec::new();
3603        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
3604        entries.extend_from_slice(&encode_avro_int(10));
3605        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
3606        entries.extend_from_slice(&encode_avro_int(20));
3607        let mut data = Vec::new();
3608        data.extend_from_slice(&encode_avro_long(-2));
3609        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
3610        data.extend_from_slice(&entries);
3611        data.extend_from_slice(&encode_avro_long(0));
3612        data.extend_from_slice(&encode_avro_int(123));
3613        let mut cur = AvroCursor::new(&data);
3614        dec.decode(&mut cur).unwrap();
3615        assert_eq!(cur.position(), data.len());
3616        let arr = dec.flush(None).unwrap();
3617        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3618        let id = s
3619            .column_by_name("id")
3620            .unwrap()
3621            .as_any()
3622            .downcast_ref::<Int32Array>()
3623            .unwrap();
3624        assert_eq!(id.len(), 1);
3625        assert_eq!(id.value(0), 123);
3626    }
3627
3628    #[test]
3629    fn test_skip_writer_nullable_field_union_nullfirst() {
3630        let mut dec = make_record_resolved_decoder(
3631            &[("id", DataType::Int32, false)],
3632            vec![None, Some(0)],
3633            vec![
3634                Some(super::Skipper::Nullable(
3635                    Nullability::NullFirst,
3636                    Box::new(super::Skipper::Int32),
3637                )),
3638                None,
3639            ],
3640        );
3641        let mut row1 = Vec::new();
3642        row1.extend_from_slice(&encode_avro_long(0));
3643        row1.extend_from_slice(&encode_avro_int(5));
3644        let mut row2 = Vec::new();
3645        row2.extend_from_slice(&encode_avro_long(1));
3646        row2.extend_from_slice(&encode_avro_int(123));
3647        row2.extend_from_slice(&encode_avro_int(7));
3648        let mut cur1 = AvroCursor::new(&row1);
3649        let mut cur2 = AvroCursor::new(&row2);
3650        dec.decode(&mut cur1).unwrap();
3651        dec.decode(&mut cur2).unwrap();
3652        assert_eq!(cur1.position(), row1.len());
3653        assert_eq!(cur2.position(), row2.len());
3654        let arr = dec.flush(None).unwrap();
3655        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3656        let id = s
3657            .column_by_name("id")
3658            .unwrap()
3659            .as_any()
3660            .downcast_ref::<Int32Array>()
3661            .unwrap();
3662        assert_eq!(id.len(), 2);
3663        assert_eq!(id.value(0), 5);
3664        assert_eq!(id.value(1), 7);
3665    }
3666
3667    fn make_dense_union_avro(
3668        children: Vec<(Codec, &'_ str, DataType)>,
3669        type_ids: Vec<i8>,
3670    ) -> AvroDataType {
3671        let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
3672        let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
3673        for (codec, name, dt) in children.into_iter() {
3674            avro_children.push(AvroDataType::new(codec, Default::default(), None));
3675            fields.push(arrow_schema::Field::new(name, dt, true));
3676        }
3677        let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
3678        let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
3679        AvroDataType::new(union_codec, Default::default(), None)
3680    }
3681
3682    #[test]
3683    fn test_union_dense_two_children_custom_type_ids() {
3684        let union_dt = make_dense_union_avro(
3685            vec![
3686                (Codec::Int32, "i", DataType::Int32),
3687                (Codec::Utf8, "s", DataType::Utf8),
3688            ],
3689            vec![2, 5],
3690        );
3691        let mut dec = Decoder::try_new(&union_dt).unwrap();
3692        let mut r1 = Vec::new();
3693        r1.extend_from_slice(&encode_avro_long(0));
3694        r1.extend_from_slice(&encode_avro_int(7));
3695        let mut r2 = Vec::new();
3696        r2.extend_from_slice(&encode_avro_long(1));
3697        r2.extend_from_slice(&encode_avro_bytes(b"x"));
3698        let mut r3 = Vec::new();
3699        r3.extend_from_slice(&encode_avro_long(0));
3700        r3.extend_from_slice(&encode_avro_int(-1));
3701        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3702        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3703        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3704        let array = dec.flush(None).unwrap();
3705        let ua = array
3706            .as_any()
3707            .downcast_ref::<UnionArray>()
3708            .expect("expected UnionArray");
3709        assert_eq!(ua.len(), 3);
3710        assert_eq!(ua.type_id(0), 2);
3711        assert_eq!(ua.type_id(1), 5);
3712        assert_eq!(ua.type_id(2), 2);
3713        assert_eq!(ua.value_offset(0), 0);
3714        assert_eq!(ua.value_offset(1), 0);
3715        assert_eq!(ua.value_offset(2), 1);
3716        let int_child = ua
3717            .child(2)
3718            .as_any()
3719            .downcast_ref::<Int32Array>()
3720            .expect("int child");
3721        assert_eq!(int_child.len(), 2);
3722        assert_eq!(int_child.value(0), 7);
3723        assert_eq!(int_child.value(1), -1);
3724        let str_child = ua
3725            .child(5)
3726            .as_any()
3727            .downcast_ref::<StringArray>()
3728            .expect("string child");
3729        assert_eq!(str_child.len(), 1);
3730        assert_eq!(str_child.value(0), "x");
3731    }
3732
3733    #[test]
3734    fn test_union_dense_with_null_and_string_children() {
3735        let union_dt = make_dense_union_avro(
3736            vec![
3737                (Codec::Null, "n", DataType::Null),
3738                (Codec::Utf8, "s", DataType::Utf8),
3739            ],
3740            vec![42, 7],
3741        );
3742        let mut dec = Decoder::try_new(&union_dt).unwrap();
3743        let r1 = encode_avro_long(0);
3744        let mut r2 = Vec::new();
3745        r2.extend_from_slice(&encode_avro_long(1));
3746        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
3747        let r3 = encode_avro_long(0);
3748        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3749        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3750        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3751        let array = dec.flush(None).unwrap();
3752        let ua = array
3753            .as_any()
3754            .downcast_ref::<UnionArray>()
3755            .expect("expected UnionArray");
3756        assert_eq!(ua.len(), 3);
3757        assert_eq!(ua.type_id(0), 42);
3758        assert_eq!(ua.type_id(1), 7);
3759        assert_eq!(ua.type_id(2), 42);
3760        assert_eq!(ua.value_offset(0), 0);
3761        assert_eq!(ua.value_offset(1), 0);
3762        assert_eq!(ua.value_offset(2), 1);
3763        let null_child = ua
3764            .child(42)
3765            .as_any()
3766            .downcast_ref::<NullArray>()
3767            .expect("null child");
3768        assert_eq!(null_child.len(), 2);
3769        let str_child = ua
3770            .child(7)
3771            .as_any()
3772            .downcast_ref::<StringArray>()
3773            .expect("string child");
3774        assert_eq!(str_child.len(), 1);
3775        assert_eq!(str_child.value(0), "abc");
3776    }
3777
3778    #[test]
3779    fn test_union_decode_negative_branch_index_errors() {
3780        let union_dt = make_dense_union_avro(
3781            vec![
3782                (Codec::Int32, "i", DataType::Int32),
3783                (Codec::Utf8, "s", DataType::Utf8),
3784            ],
3785            vec![0, 1],
3786        );
3787        let mut dec = Decoder::try_new(&union_dt).unwrap();
3788        let row = encode_avro_long(-1); // decodes back to -1
3789        let err = dec
3790            .decode(&mut AvroCursor::new(&row))
3791            .expect_err("expected error for negative branch index");
3792        let msg = err.to_string();
3793        assert!(
3794            msg.contains("Negative union branch index"),
3795            "unexpected error message: {msg}"
3796        );
3797    }
3798
3799    #[test]
3800    fn test_union_decode_out_of_range_branch_index_errors() {
3801        let union_dt = make_dense_union_avro(
3802            vec![
3803                (Codec::Int32, "i", DataType::Int32),
3804                (Codec::Utf8, "s", DataType::Utf8),
3805            ],
3806            vec![10, 11],
3807        );
3808        let mut dec = Decoder::try_new(&union_dt).unwrap();
3809        let row = encode_avro_long(2);
3810        let err = dec
3811            .decode(&mut AvroCursor::new(&row))
3812            .expect_err("expected error for out-of-range branch index");
3813        let msg = err.to_string();
3814        assert!(
3815            msg.contains("out of range"),
3816            "unexpected error message: {msg}"
3817        );
3818    }
3819
3820    #[test]
3821    fn test_union_sparse_mode_not_supported() {
3822        let children: Vec<AvroDataType> = vec![
3823            AvroDataType::new(Codec::Int32, Default::default(), None),
3824            AvroDataType::new(Codec::Utf8, Default::default(), None),
3825        ];
3826        let uf = UnionFields::try_new(
3827            vec![1, 3],
3828            vec![
3829                arrow_schema::Field::new("i", DataType::Int32, true),
3830                arrow_schema::Field::new("s", DataType::Utf8, true),
3831            ],
3832        )
3833        .unwrap();
3834        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
3835        let dt = AvroDataType::new(codec, Default::default(), None);
3836        let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
3837        let msg = err.to_string();
3838        assert!(
3839            msg.contains("Sparse Arrow unions are not yet supported"),
3840            "unexpected error message: {msg}"
3841        );
3842    }
3843
3844    fn make_record_decoder_with_projector_defaults(
3845        reader_fields: &[(&str, DataType, bool)],
3846        field_defaults: Vec<Option<AvroLiteral>>,
3847        default_injections: Vec<(usize, AvroLiteral)>,
3848        writer_to_reader_len: usize,
3849    ) -> Decoder {
3850        assert_eq!(
3851            field_defaults.len(),
3852            reader_fields.len(),
3853            "field_defaults must have one entry per reader field"
3854        );
3855        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3856        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3857        for (name, dt, nullable) in reader_fields {
3858            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3859            let enc = match dt {
3860                DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
3861                DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
3862                DataType::Utf8 => Decoder::String(
3863                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3864                    Vec::with_capacity(DEFAULT_CAPACITY),
3865                ),
3866                other => panic!("Unsupported test field type in helper: {other:?}"),
3867            };
3868            encodings.push(enc);
3869        }
3870        let fields: Fields = field_refs.into();
3871        let skip_decoders: Vec<Option<Skipper>> =
3872            (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
3873        let projector = Projector {
3874            writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
3875            skip_decoders,
3876            field_defaults,
3877            default_injections: Arc::from(default_injections),
3878        };
3879        Decoder::Record(fields, encodings, Some(projector))
3880    }
3881
3882    #[test]
3883    fn test_default_append_int32_and_int64_from_int_and_long() {
3884        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3885        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
3886        let arr = d_i32.flush(None).unwrap();
3887        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3888        assert_eq!(a.len(), 1);
3889        assert_eq!(a.value(0), 42);
3890        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
3891        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
3892        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
3893        let arr64 = d_i64.flush(None).unwrap();
3894        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
3895        assert_eq!(a64.len(), 2);
3896        assert_eq!(a64.value(0), 5);
3897        assert_eq!(a64.value(1), 7);
3898    }
3899
3900    #[test]
3901    fn test_default_append_floats_and_doubles() {
3902        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
3903        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
3904        let arr32 = d_f32.flush(None).unwrap();
3905        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
3906        assert_eq!(a.value(0), 1.5);
3907        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
3908        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
3909        let arr64 = d_f64.flush(None).unwrap();
3910        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
3911        assert_eq!(b.value(0), 2.25);
3912    }
3913
3914    #[test]
3915    fn test_default_append_string_and_bytes() {
3916        let mut d_str = Decoder::String(
3917            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3918            Vec::with_capacity(DEFAULT_CAPACITY),
3919        );
3920        d_str
3921            .append_default(&AvroLiteral::String("hi".into()))
3922            .unwrap();
3923        let s_arr = d_str.flush(None).unwrap();
3924        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
3925        assert_eq!(arr.value(0), "hi");
3926        let mut d_bytes = Decoder::Binary(
3927            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3928            Vec::with_capacity(DEFAULT_CAPACITY),
3929        );
3930        d_bytes
3931            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3932            .unwrap();
3933        let b_arr = d_bytes.flush(None).unwrap();
3934        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3935        assert_eq!(barr.value(0), &[1, 2, 3]);
3936        let mut d_str_err = Decoder::String(
3937            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3938            Vec::with_capacity(DEFAULT_CAPACITY),
3939        );
3940        let err = d_str_err
3941            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
3942            .unwrap_err();
3943        assert!(
3944            err.to_string()
3945                .contains("Default for string must be string"),
3946            "unexpected error: {err:?}"
3947        );
3948    }
3949
3950    #[test]
3951    fn test_default_append_nullable_int32_null_and_value() {
3952        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3953        let mut dec = Decoder::Nullable(
3954            Nullability::NullFirst,
3955            NullBufferBuilder::new(DEFAULT_CAPACITY),
3956            Box::new(inner),
3957            NullablePlan::ReadTag,
3958        );
3959        dec.append_default(&AvroLiteral::Null).unwrap();
3960        dec.append_default(&AvroLiteral::Int(11)).unwrap();
3961        let arr = dec.flush(None).unwrap();
3962        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3963        assert_eq!(a.len(), 2);
3964        assert!(a.is_null(0));
3965        assert_eq!(a.value(1), 11);
3966    }
3967
3968    #[test]
3969    fn test_default_append_array_of_ints() {
3970        let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3971        let mut d = Decoder::try_new(&list_dt).unwrap();
3972        let items = vec![
3973            AvroLiteral::Int(1),
3974            AvroLiteral::Int(2),
3975            AvroLiteral::Int(3),
3976        ];
3977        d.append_default(&AvroLiteral::Array(items)).unwrap();
3978        let arr = d.flush(None).unwrap();
3979        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
3980        assert_eq!(list.len(), 1);
3981        assert_eq!(list.value_length(0), 3);
3982        let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
3983        assert_eq!(vals.values(), &[1, 2, 3]);
3984    }
3985
3986    #[test]
3987    fn test_default_append_map_string_to_int() {
3988        let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
3989        let mut d = Decoder::try_new(&map_dt).unwrap();
3990        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
3991        m.insert("k1".to_string(), AvroLiteral::Int(10));
3992        m.insert("k2".to_string(), AvroLiteral::Int(20));
3993        d.append_default(&AvroLiteral::Map(m)).unwrap();
3994        let arr = d.flush(None).unwrap();
3995        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
3996        assert_eq!(map.len(), 1);
3997        assert_eq!(map.value_length(0), 2);
3998        let binding = map.value(0);
3999        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4000        let k = entries
4001            .column_by_name("key")
4002            .unwrap()
4003            .as_any()
4004            .downcast_ref::<StringArray>()
4005            .unwrap();
4006        let v = entries
4007            .column_by_name("value")
4008            .unwrap()
4009            .as_any()
4010            .downcast_ref::<Int32Array>()
4011            .unwrap();
4012        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4013        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4014        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4015        assert_eq!(vals, [10, 20].into_iter().collect());
4016    }
4017
4018    #[test]
4019    fn test_default_append_enum_by_symbol() {
4020        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4021        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4022        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4023        let arr = d.flush(None).unwrap();
4024        let dict = arr
4025            .as_any()
4026            .downcast_ref::<DictionaryArray<Int32Type>>()
4027            .unwrap();
4028        assert_eq!(dict.len(), 1);
4029        let expected = Int32Array::from(vec![1]);
4030        assert_eq!(dict.keys(), &expected);
4031        let values = dict
4032            .values()
4033            .as_any()
4034            .downcast_ref::<StringArray>()
4035            .unwrap();
4036        assert_eq!(values.value(1), "B");
4037    }
4038
4039    #[test]
4040    fn test_default_append_uuid_and_type_error() {
4041        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4042        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4043        d.append_default(&AvroLiteral::String(uuid_str.into()))
4044            .unwrap();
4045        let arr_ref = d.flush(None).unwrap();
4046        let arr = arr_ref
4047            .as_any()
4048            .downcast_ref::<FixedSizeBinaryArray>()
4049            .unwrap();
4050        assert_eq!(arr.value_length(), 16);
4051        assert_eq!(arr.len(), 1);
4052        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4053        let err = d2
4054            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4055            .unwrap_err();
4056        assert!(
4057            err.to_string().contains("Default for uuid must be string"),
4058            "unexpected error: {err:?}"
4059        );
4060    }
4061
4062    #[test]
4063    fn test_default_append_fixed_and_length_mismatch() {
4064        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4065        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4066            .unwrap();
4067        let arr_ref = d.flush(None).unwrap();
4068        let arr = arr_ref
4069            .as_any()
4070            .downcast_ref::<FixedSizeBinaryArray>()
4071            .unwrap();
4072        assert_eq!(arr.value_length(), 4);
4073        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4074        let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4075        let err = d_err
4076            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4077            .unwrap_err();
4078        assert!(
4079            err.to_string().contains("Fixed default length"),
4080            "unexpected error: {err:?}"
4081        );
4082    }
4083
4084    #[test]
4085    fn test_default_append_duration_and_length_validation() {
4086        let dt = avro_from_codec(Codec::Interval);
4087        let mut d = Decoder::try_new(&dt).unwrap();
4088        let mut bytes = Vec::with_capacity(12);
4089        bytes.extend_from_slice(&1u32.to_le_bytes());
4090        bytes.extend_from_slice(&2u32.to_le_bytes());
4091        bytes.extend_from_slice(&3u32.to_le_bytes());
4092        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4093        let arr_ref = d.flush(None).unwrap();
4094        let arr = arr_ref
4095            .as_any()
4096            .downcast_ref::<IntervalMonthDayNanoArray>()
4097            .unwrap();
4098        assert_eq!(arr.len(), 1);
4099        let v = arr.value(0);
4100        assert_eq!(v.months, 1);
4101        assert_eq!(v.days, 2);
4102        assert_eq!(v.nanoseconds, 3_000_000);
4103        let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4104        let err = d_err
4105            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4106            .unwrap_err();
4107        assert!(
4108            err.to_string()
4109                .contains("Duration default must be exactly 12 bytes"),
4110            "unexpected error: {err:?}"
4111        );
4112    }
4113
4114    #[test]
4115    fn test_default_append_decimal256_from_bytes() {
4116        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4117        let mut d = Decoder::try_new(&dt).unwrap();
4118        let pos: [u8; 32] = [
4119            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4120            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4121            0x00, 0x00, 0x30, 0x39,
4122        ];
4123        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4124        let neg: [u8; 32] = [
4125            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4126            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4127            0xFF, 0xFF, 0xFF, 0x85,
4128        ];
4129        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4130        let arr = d.flush(None).unwrap();
4131        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4132        assert_eq!(dec.len(), 2);
4133        assert_eq!(dec.value_as_string(0), "123.45");
4134        assert_eq!(dec.value_as_string(1), "-1.23");
4135    }
4136
4137    #[test]
4138    fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4139        let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4140        let mut rec = make_record_decoder_with_projector_defaults(
4141            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4142            field_defaults,
4143            vec![],
4144            0,
4145        );
4146        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4147        map.insert("a".to_string(), AvroLiteral::Int(7));
4148        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4149        let arr = rec.flush(None).unwrap();
4150        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4151        let a = s
4152            .column_by_name("a")
4153            .unwrap()
4154            .as_any()
4155            .downcast_ref::<Int32Array>()
4156            .unwrap();
4157        let b = s
4158            .column_by_name("b")
4159            .unwrap()
4160            .as_any()
4161            .downcast_ref::<StringArray>()
4162            .unwrap();
4163        assert_eq!(a.value(0), 7);
4164        assert_eq!(b.value(0), "hi");
4165    }
4166
4167    #[test]
4168    fn test_record_append_default_null_uses_projector_field_defaults() {
4169        let field_defaults = vec![
4170            Some(AvroLiteral::Int(5)),
4171            Some(AvroLiteral::String("x".into())),
4172        ];
4173        let mut rec = make_record_decoder_with_projector_defaults(
4174            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4175            field_defaults,
4176            vec![],
4177            0,
4178        );
4179        rec.append_default(&AvroLiteral::Null).unwrap();
4180        let arr = rec.flush(None).unwrap();
4181        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4182        let a = s
4183            .column_by_name("a")
4184            .unwrap()
4185            .as_any()
4186            .downcast_ref::<Int32Array>()
4187            .unwrap();
4188        let b = s
4189            .column_by_name("b")
4190            .unwrap()
4191            .as_any()
4192            .downcast_ref::<StringArray>()
4193            .unwrap();
4194        assert_eq!(a.value(0), 5);
4195        assert_eq!(b.value(0), "x");
4196    }
4197
4198    #[test]
4199    fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
4200     {
4201        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
4202        let mut field_refs: Vec<FieldRef> = Vec::new();
4203        let mut encoders: Vec<Decoder> = Vec::new();
4204        for (name, dt, nullable) in &fields {
4205            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4206        }
4207        let enc_a = Decoder::Nullable(
4208            Nullability::NullSecond,
4209            NullBufferBuilder::new(DEFAULT_CAPACITY),
4210            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
4211            NullablePlan::ReadTag,
4212        );
4213        let enc_b = Decoder::Nullable(
4214            Nullability::NullSecond,
4215            NullBufferBuilder::new(DEFAULT_CAPACITY),
4216            Box::new(Decoder::String(
4217                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4218                Vec::with_capacity(DEFAULT_CAPACITY),
4219            )),
4220            NullablePlan::ReadTag,
4221        );
4222        encoders.push(enc_a);
4223        encoders.push(enc_b);
4224        let projector = Projector {
4225            writer_to_reader: Arc::from(vec![]),
4226            skip_decoders: vec![],
4227            field_defaults: vec![None, None], // no defaults -> append_null
4228            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4229        };
4230        let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
4231        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4232        map.insert("a".to_string(), AvroLiteral::Int(9));
4233        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4234        let arr = rec.flush(None).unwrap();
4235        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4236        let a = s
4237            .column_by_name("a")
4238            .unwrap()
4239            .as_any()
4240            .downcast_ref::<Int32Array>()
4241            .unwrap();
4242        let b = s
4243            .column_by_name("b")
4244            .unwrap()
4245            .as_any()
4246            .downcast_ref::<StringArray>()
4247            .unwrap();
4248        assert!(a.is_valid(0));
4249        assert_eq!(a.value(0), 9);
4250        assert!(b.is_null(0));
4251    }
4252
4253    #[test]
4254    fn test_projector_default_injection_when_writer_lacks_fields() {
4255        let defaults = vec![None, None];
4256        let injections = vec![
4257            (0, AvroLiteral::Int(99)),
4258            (1, AvroLiteral::String("alice".into())),
4259        ];
4260        let mut rec = make_record_decoder_with_projector_defaults(
4261            &[
4262                ("id", DataType::Int32, false),
4263                ("name", DataType::Utf8, false),
4264            ],
4265            defaults,
4266            injections,
4267            0,
4268        );
4269        rec.decode(&mut AvroCursor::new(&[])).unwrap();
4270        let arr = rec.flush(None).unwrap();
4271        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4272        let id = s
4273            .column_by_name("id")
4274            .unwrap()
4275            .as_any()
4276            .downcast_ref::<Int32Array>()
4277            .unwrap();
4278        let name = s
4279            .column_by_name("name")
4280            .unwrap()
4281            .as_any()
4282            .downcast_ref::<StringArray>()
4283            .unwrap();
4284        assert_eq!(id.value(0), 99);
4285        assert_eq!(name.value(0), "alice");
4286    }
4287
4288    #[test]
4289    fn union_type_ids_are_not_child_indexes() {
4290        let encodings: Vec<AvroDataType> =
4291            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4292        let fields: UnionFields = [
4293            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4294            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4295        ]
4296        .into_iter()
4297        .collect();
4298        let dt = avro_from_codec(Codec::Union(
4299            encodings.into(),
4300            fields.clone(),
4301            UnionMode::Dense,
4302        ));
4303        let mut dec = Decoder::try_new(&dt).expect("decoder");
4304        let mut b1 = encode_avro_long(1);
4305        b1.extend(encode_avro_bytes("hi".as_bytes()));
4306        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4307        let mut b0 = encode_avro_long(0);
4308        b0.extend(encode_avro_int(5));
4309        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4310        let arr = dec.flush(None).expect("flush");
4311        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4312        assert_eq!(ua.len(), 2);
4313        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4314        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4315        assert_eq!(ua.value_offset(0), 0);
4316        assert_eq!(ua.value_offset(1), 0);
4317        let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4318        assert_eq!(utf8_child.len(), 1);
4319        assert_eq!(utf8_child.value(0), "hi");
4320        let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
4321        assert_eq!(int_child.len(), 1);
4322        assert_eq!(int_child.value(0), 5);
4323        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
4324        assert_eq!(type_ids, vec![42_i8, 7_i8]);
4325    }
4326
4327    #[cfg(feature = "avro_custom_types")]
4328    #[test]
4329    fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), ArrowError> {
4330        for codec in [
4331            Codec::DurationNanos,
4332            Codec::DurationMicros,
4333            Codec::DurationMillis,
4334            Codec::DurationSeconds,
4335        ] {
4336            let dt = make_avro_dt(codec.clone(), None);
4337            let s = Skipper::from_avro(&dt)?;
4338            match s {
4339                Skipper::Int64 => {}
4340                other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
4341            }
4342        }
4343        Ok(())
4344    }
4345
4346    #[cfg(feature = "avro_custom_types")]
4347    #[test]
4348    fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), ArrowError> {
4349        let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
4350        for codec in [
4351            Codec::DurationNanos,
4352            Codec::DurationMicros,
4353            Codec::DurationMillis,
4354            Codec::DurationSeconds,
4355        ] {
4356            let dt = make_avro_dt(codec.clone(), None);
4357            let mut s = Skipper::from_avro(&dt)?;
4358            for &v in &values {
4359                let bytes = encode_avro_long(v);
4360                let mut cursor = AvroCursor::new(&bytes);
4361                s.skip(&mut cursor)?;
4362                assert_eq!(
4363                    cursor.position(),
4364                    bytes.len(),
4365                    "did not consume all bytes for {:?} value {}",
4366                    codec,
4367                    v
4368                );
4369            }
4370        }
4371        Ok(())
4372    }
4373
4374    #[cfg(feature = "avro_custom_types")]
4375    #[test]
4376    fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), ArrowError> {
4377        let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
4378        let mut s = Skipper::from_avro(&dt)?;
4379        match &s {
4380            Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
4381                Skipper::Int64 => {}
4382                ref other => panic!("expected inner Int64, got {:?}", other),
4383            },
4384            other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
4385        }
4386        {
4387            let buf = encode_vlq_u64(0);
4388            let mut cursor = AvroCursor::new(&buf);
4389            s.skip(&mut cursor)?;
4390            assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
4391        }
4392        {
4393            let mut buf = encode_vlq_u64(1);
4394            buf.extend(encode_avro_long(0));
4395            let mut cursor = AvroCursor::new(&buf);
4396            s.skip(&mut cursor)?;
4397            assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
4398        }
4399
4400        Ok(())
4401    }
4402
4403    #[cfg(feature = "avro_custom_types")]
4404    #[test]
4405    fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), ArrowError> {
4406        let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
4407        let mut s = Skipper::from_avro(&dt)?;
4408        match &s {
4409            Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
4410                Skipper::Int64 => {}
4411                ref other => panic!("expected inner Int64, got {:?}", other),
4412            },
4413            other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
4414        }
4415        {
4416            let buf = encode_vlq_u64(1);
4417            let mut cursor = AvroCursor::new(&buf);
4418            s.skip(&mut cursor)?;
4419            assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
4420        }
4421        {
4422            let mut buf = encode_vlq_u64(0);
4423            buf.extend(encode_avro_long(-1));
4424            let mut cursor = AvroCursor::new(&buf);
4425            s.skip(&mut cursor)?;
4426            assert_eq!(
4427                cursor.position(),
4428                1 + encode_avro_long(-1).len(),
4429                "expected to consume tag=0 + long(-1)"
4430            );
4431        }
4432        Ok(())
4433    }
4434
4435    #[test]
4436    fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), ArrowError> {
4437        let dt = make_avro_dt(Codec::Interval, None);
4438        let mut s = Skipper::from_avro(&dt)?;
4439        match s {
4440            Skipper::DurationFixed12 => {}
4441            other => panic!("expected DurationFixed12, got {:?}", other),
4442        }
4443        let payload = vec![0u8; 12];
4444        let mut cursor = AvroCursor::new(&payload);
4445        s.skip(&mut cursor)?;
4446        assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
4447        Ok(())
4448    }
4449
4450    #[cfg(feature = "avro_custom_types")]
4451    #[test]
4452    fn test_run_end_encoded_width16_int32_basic_grouping() {
4453        use arrow_array::RunArray;
4454        use std::sync::Arc;
4455        let inner = avro_from_codec(Codec::Int32);
4456        let ree = AvroDataType::new(
4457            Codec::RunEndEncoded(Arc::new(inner), 16),
4458            Default::default(),
4459            None,
4460        );
4461        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4462        for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
4463            let bytes = encode_avro_int(v);
4464            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4465        }
4466        let arr = dec.flush(None).expect("flush");
4467        let ra = arr
4468            .as_any()
4469            .downcast_ref::<RunArray<Int16Type>>()
4470            .expect("RunArray<Int16Type>");
4471        assert_eq!(ra.len(), 9);
4472        assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
4473        let vals = ra
4474            .values()
4475            .as_ref()
4476            .as_any()
4477            .downcast_ref::<Int32Array>()
4478            .expect("values Int32");
4479        assert_eq!(vals.values(), &[1, 2, 3]);
4480    }
4481
4482    #[cfg(feature = "avro_custom_types")]
4483    #[test]
4484    fn test_run_end_encoded_width32_nullable_values_group_nulls() {
4485        use arrow_array::RunArray;
4486        use std::sync::Arc;
4487        let inner = AvroDataType::new(
4488            Codec::Int32,
4489            Default::default(),
4490            Some(Nullability::NullSecond),
4491        );
4492        let ree = AvroDataType::new(
4493            Codec::RunEndEncoded(Arc::new(inner), 32),
4494            Default::default(),
4495            None,
4496        );
4497        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4498        let seq: [Option<i32>; 8] = [
4499            None,
4500            None,
4501            Some(7),
4502            Some(7),
4503            Some(7),
4504            None,
4505            Some(5),
4506            Some(5),
4507        ];
4508        for item in seq {
4509            let mut bytes = Vec::new();
4510            match item {
4511                None => bytes.extend_from_slice(&encode_vlq_u64(1)),
4512                Some(v) => {
4513                    bytes.extend_from_slice(&encode_vlq_u64(0));
4514                    bytes.extend_from_slice(&encode_avro_int(v));
4515                }
4516            }
4517            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4518        }
4519        let arr = dec.flush(None).expect("flush");
4520        let ra = arr
4521            .as_any()
4522            .downcast_ref::<RunArray<Int32Type>>()
4523            .expect("RunArray<Int32Type>");
4524        assert_eq!(ra.len(), 8);
4525        assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
4526        let vals = ra
4527            .values()
4528            .as_ref()
4529            .as_any()
4530            .downcast_ref::<Int32Array>()
4531            .expect("values Int32 (nullable)");
4532        assert_eq!(vals.len(), 4);
4533        assert!(vals.is_null(0));
4534        assert_eq!(vals.value(1), 7);
4535        assert!(vals.is_null(2));
4536        assert_eq!(vals.value(3), 5);
4537    }
4538
4539    #[cfg(feature = "avro_custom_types")]
4540    #[test]
4541    fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
4542        use arrow_array::RunArray;
4543        let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4544        let ree = Decoder::RunEndEncoded(
4545            8, /* bytes => Int64 run-ends */
4546            0,
4547            Box::new(inner_values),
4548        );
4549        let mut dec = Decoder::Nullable(
4550            Nullability::NullSecond,
4551            NullBufferBuilder::new(DEFAULT_CAPACITY),
4552            Box::new(ree),
4553            NullablePlan::FromSingle {
4554                promotion: Promotion::IntToDouble,
4555            },
4556        );
4557        for v in [1, 1, 2, 2, 2] {
4558            let bytes = encode_avro_int(v);
4559            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4560        }
4561        let arr = dec.flush(None).expect("flush");
4562        let ra = arr
4563            .as_any()
4564            .downcast_ref::<RunArray<Int64Type>>()
4565            .expect("RunArray<Int64Type>");
4566        assert_eq!(ra.len(), 5);
4567        assert_eq!(ra.run_ends().values(), &[2, 5]);
4568        let vals = ra
4569            .values()
4570            .as_ref()
4571            .as_any()
4572            .downcast_ref::<Float64Array>()
4573            .expect("values Float64");
4574        assert_eq!(vals.values(), &[1.0, 2.0]);
4575    }
4576
4577    #[cfg(feature = "avro_custom_types")]
4578    #[test]
4579    fn test_run_end_encoded_unsupported_run_end_width_errors() {
4580        use std::sync::Arc;
4581        let inner = avro_from_codec(Codec::Int32);
4582        let dt = AvroDataType::new(
4583            Codec::RunEndEncoded(Arc::new(inner), 3),
4584            Default::default(),
4585            None,
4586        );
4587        let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
4588        let msg = err.to_string();
4589        assert!(
4590            msg.contains("Unsupported run-end width")
4591                && msg.contains("16/32/64 bits or 2/4/8 bytes"),
4592            "unexpected error message: {msg}"
4593        );
4594    }
4595
4596    #[cfg(feature = "avro_custom_types")]
4597    #[test]
4598    fn test_run_end_encoded_empty_input_is_empty_runarray() {
4599        use arrow_array::RunArray;
4600        use std::sync::Arc;
4601        let inner = avro_from_codec(Codec::Utf8);
4602        let dt = AvroDataType::new(
4603            Codec::RunEndEncoded(Arc::new(inner), 4),
4604            Default::default(),
4605            None,
4606        );
4607        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4608        let arr = dec.flush(None).expect("flush");
4609        let ra = arr
4610            .as_any()
4611            .downcast_ref::<RunArray<Int32Type>>()
4612            .expect("RunArray<Int32Type>");
4613        assert_eq!(ra.len(), 0);
4614        assert_eq!(ra.run_ends().len(), 0);
4615        assert_eq!(ra.values().len(), 0);
4616    }
4617
4618    #[cfg(feature = "avro_custom_types")]
4619    #[test]
4620    fn test_run_end_encoded_strings_grouping_width32_bits() {
4621        use arrow_array::RunArray;
4622        use std::sync::Arc;
4623        let inner = avro_from_codec(Codec::Utf8);
4624        let dt = AvroDataType::new(
4625            Codec::RunEndEncoded(Arc::new(inner), 32),
4626            Default::default(),
4627            None,
4628        );
4629        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4630        for s in ["a", "a", "bb", "bb", "bb", "a"] {
4631            let bytes = encode_avro_bytes(s.as_bytes());
4632            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4633        }
4634        let arr = dec.flush(None).expect("flush");
4635        let ra = arr
4636            .as_any()
4637            .downcast_ref::<RunArray<Int32Type>>()
4638            .expect("RunArray<Int32Type>");
4639        assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
4640        let vals = ra
4641            .values()
4642            .as_ref()
4643            .as_any()
4644            .downcast_ref::<StringArray>()
4645            .expect("values String");
4646        assert_eq!(vals.len(), 3);
4647        assert_eq!(vals.value(0), "a");
4648        assert_eq!(vals.value(1), "bb");
4649        assert_eq!(vals.value(2), "a");
4650    }
4651
4652    #[cfg(not(feature = "avro_custom_types"))]
4653    #[test]
4654    fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
4655        let dt = avro_from_codec(Codec::Int32);
4656        let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
4657        for v in [1, 2, 3] {
4658            let bytes = encode_avro_int(v);
4659            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4660        }
4661        let arr = dec.flush(None).expect("flush");
4662        let a = arr
4663            .as_any()
4664            .downcast_ref::<Int32Array>()
4665            .expect("Int32Array");
4666        assert_eq!(a.values(), &[1, 2, 3]);
4667    }
4668
4669    #[test]
4670    fn test_timestamp_nanos_decoding_utc() {
4671        let avro_type = avro_from_codec(Codec::TimestampNanos(true));
4672        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4673        let mut data = Vec::new();
4674        for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
4675            data.extend_from_slice(&encode_avro_long(v));
4676        }
4677        let mut cur = AvroCursor::new(&data);
4678        for _ in 0..4 {
4679            decoder.decode(&mut cur).expect("decode nanos ts");
4680        }
4681        let array = decoder.flush(None).expect("flush nanos ts");
4682        let ts = array
4683            .as_any()
4684            .downcast_ref::<TimestampNanosecondArray>()
4685            .expect("TimestampNanosecondArray");
4686        assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
4687        match ts.data_type() {
4688            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4689                assert_eq!(tz.as_deref(), Some("+00:00"));
4690            }
4691            other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
4692        }
4693    }
4694
4695    #[test]
4696    fn test_timestamp_nanos_decoding_local() {
4697        let avro_type = avro_from_codec(Codec::TimestampNanos(false));
4698        let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4699        let mut data = Vec::new();
4700        for v in [10_i64, 20_i64, -30_i64] {
4701            data.extend_from_slice(&encode_avro_long(v));
4702        }
4703        let mut cur = AvroCursor::new(&data);
4704        for _ in 0..3 {
4705            decoder.decode(&mut cur).expect("decode nanos ts");
4706        }
4707        let array = decoder.flush(None).expect("flush nanos ts");
4708        let ts = array
4709            .as_any()
4710            .downcast_ref::<TimestampNanosecondArray>()
4711            .expect("TimestampNanosecondArray");
4712        assert_eq!(ts.values(), &[10, 20, -30]);
4713        match ts.data_type() {
4714            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4715                assert_eq!(tz.as_deref(), None);
4716            }
4717            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4718        }
4719    }
4720
4721    #[test]
4722    fn test_timestamp_nanos_decoding_with_nulls() {
4723        let avro_type = AvroDataType::new(
4724            Codec::TimestampNanos(false),
4725            Default::default(),
4726            Some(Nullability::NullFirst),
4727        );
4728        let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
4729        let mut data = Vec::new();
4730        data.extend_from_slice(&encode_avro_long(1));
4731        data.extend_from_slice(&encode_avro_long(42));
4732        data.extend_from_slice(&encode_avro_long(0));
4733        data.extend_from_slice(&encode_avro_long(1));
4734        data.extend_from_slice(&encode_avro_long(-7));
4735        let mut cur = AvroCursor::new(&data);
4736        for _ in 0..3 {
4737            decoder.decode(&mut cur).expect("decode nullable nanos ts");
4738        }
4739        let array = decoder.flush(None).expect("flush nullable nanos ts");
4740        let ts = array
4741            .as_any()
4742            .downcast_ref::<TimestampNanosecondArray>()
4743            .expect("TimestampNanosecondArray");
4744        assert_eq!(ts.len(), 3);
4745        assert!(ts.is_valid(0));
4746        assert!(ts.is_null(1));
4747        assert!(ts.is_valid(2));
4748        assert_eq!(ts.value(0), 42);
4749        assert_eq!(ts.value(2), -7);
4750        match ts.data_type() {
4751            DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4752                assert_eq!(tz.as_deref(), None);
4753            }
4754            other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4755        }
4756    }
4757}