arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::codec::{
19    AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
20    ResolvedUnion,
21};
22use crate::reader::cursor::AvroCursor;
23use crate::schema::Nullability;
24use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
25#[cfg(feature = "small_decimals")]
26use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
27use arrow_array::types::*;
28use arrow_array::*;
29use arrow_buffer::*;
30use arrow_schema::{
31    ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as ArrowSchema, SchemaRef,
32    UnionFields, UnionMode, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
33};
34#[cfg(feature = "small_decimals")]
35use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
36use std::cmp::Ordering;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39use uuid::Uuid;
40
41const DEFAULT_CAPACITY: usize = 1024;
42
43/// Runtime plan for decoding reader-side `["null", T]` types.
44#[derive(Clone, Copy, Debug)]
45enum NullablePlan {
46    /// Writer actually wrote a union (branch tag present).
47    ReadTag,
48    /// Writer wrote a single (non-union) value resolved to the non-null branch
49    /// of the reader union; do NOT read a branch tag, but apply any promotion.
50    FromSingle { promotion: Promotion },
51}
52
53/// Macro to decode a decimal payload for a given width and integer type.
54macro_rules! decode_decimal {
55    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
56        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
57        $builder.append_value(<$Int>::from_be_bytes(bytes));
58    }};
59}
60
61/// Macro to finish a decimal builder into an array with precision/scale and nulls.
62macro_rules! flush_decimal {
63    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
64        let (_, vals, _) = $builder.finish().into_parts();
65        let dec = <$ArrayTy>::try_new(vals, $nulls)?
66            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)
67            .map_err(|e| ArrowError::ParseError(e.to_string()))?;
68        Arc::new(dec) as ArrayRef
69    }};
70}
71
72/// Macro to append a default decimal value from two's-complement big-endian bytes
73/// into the corresponding decimal builder, with compile-time constructed error text.
74macro_rules! append_decimal_default {
75    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
76        match $lit {
77            AvroLiteral::Bytes(b) => {
78                let ext = sign_cast_to::<$N>(b)?;
79                let val = <$Int>::from_be_bytes(ext);
80                $builder.append_value(val);
81                Ok(())
82            }
83            _ => Err(ArrowError::InvalidArgumentError(
84                concat!(
85                    "Default for ",
86                    $name,
87                    " must be bytes (two's-complement big-endian)"
88                )
89                .to_string(),
90            )),
91        }
92    }};
93}
94
95#[derive(Debug)]
96pub(crate) struct RecordDecoderBuilder<'a> {
97    data_type: &'a AvroDataType,
98    use_utf8view: bool,
99}
100
101impl<'a> RecordDecoderBuilder<'a> {
102    pub(crate) fn new(data_type: &'a AvroDataType) -> Self {
103        Self {
104            data_type,
105            use_utf8view: false,
106        }
107    }
108
109    pub(crate) fn with_utf8_view(mut self, use_utf8view: bool) -> Self {
110        self.use_utf8view = use_utf8view;
111        self
112    }
113
114    /// Builds the `RecordDecoder`.
115    pub(crate) fn build(self) -> Result<RecordDecoder, ArrowError> {
116        RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view)
117    }
118}
119
120/// Decodes avro encoded data into [`RecordBatch`]
121#[derive(Debug)]
122pub(crate) struct RecordDecoder {
123    schema: SchemaRef,
124    fields: Vec<Decoder>,
125    use_utf8view: bool,
126    projector: Option<Projector>,
127}
128
129impl RecordDecoder {
130    /// Creates a new `RecordDecoderBuilder` for configuring a `RecordDecoder`.
131    pub(crate) fn new(data_type: &'_ AvroDataType) -> Self {
132        RecordDecoderBuilder::new(data_type).build().unwrap()
133    }
134
135    /// Create a new [`RecordDecoder`] from the provided [`AvroDataType`] with default options
136    pub(crate) fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
137        RecordDecoderBuilder::new(data_type)
138            .with_utf8_view(true)
139            .build()
140    }
141
142    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
143    ///
144    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
145    ///
146    /// # Arguments
147    /// * `data_type` - The Avro data type to decode.
148    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
149    ///
150    /// # Errors
151    /// This function will return an error if the provided `data_type` is not a `Record`.
152    pub(crate) fn try_new_with_options(
153        data_type: &AvroDataType,
154        use_utf8view: bool,
155    ) -> Result<Self, ArrowError> {
156        match data_type.codec() {
157            Codec::Struct(reader_fields) => {
158                // Build Arrow schema fields and per-child decoders
159                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
160                let mut encodings = Vec::with_capacity(reader_fields.len());
161                for avro_field in reader_fields.iter() {
162                    arrow_fields.push(avro_field.field());
163                    encodings.push(Decoder::try_new(avro_field.data_type())?);
164                }
165                let projector = match data_type.resolution.as_ref() {
166                    Some(ResolutionInfo::Record(rec)) => {
167                        Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
168                    }
169                    _ => None,
170                };
171                Ok(Self {
172                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
173                    fields: encodings,
174                    use_utf8view,
175                    projector,
176                })
177            }
178            other => Err(ArrowError::ParseError(format!(
179                "Expected record got {other:?}"
180            ))),
181        }
182    }
183
184    /// Returns the decoder's `SchemaRef`
185    pub(crate) fn schema(&self) -> &SchemaRef {
186        &self.schema
187    }
188
189    /// Decode `count` records from `buf`
190    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> {
191        let mut cursor = AvroCursor::new(buf);
192        match self.projector.as_mut() {
193            Some(proj) => {
194                for _ in 0..count {
195                    proj.project_record(&mut cursor, &mut self.fields)?;
196                }
197            }
198            None => {
199                for _ in 0..count {
200                    for field in &mut self.fields {
201                        field.decode(&mut cursor)?;
202                    }
203                }
204            }
205        }
206        Ok(cursor.position())
207    }
208
209    /// Flush the decoded records into a [`RecordBatch`]
210    pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
211        let arrays = self
212            .fields
213            .iter_mut()
214            .map(|x| x.flush(None))
215            .collect::<Result<Vec<_>, _>>()?;
216        RecordBatch::try_new(self.schema.clone(), arrays)
217    }
218}
219
220#[derive(Debug)]
221struct EnumResolution {
222    mapping: Arc<[i32]>,
223    default_index: i32,
224}
225
226#[derive(Debug, AsRefStr)]
227enum Decoder {
228    Null(usize),
229    Boolean(BooleanBufferBuilder),
230    Int32(Vec<i32>),
231    Int64(Vec<i64>),
232    #[cfg(feature = "avro_custom_types")]
233    DurationSecond(Vec<i64>),
234    #[cfg(feature = "avro_custom_types")]
235    DurationMillisecond(Vec<i64>),
236    #[cfg(feature = "avro_custom_types")]
237    DurationMicrosecond(Vec<i64>),
238    #[cfg(feature = "avro_custom_types")]
239    DurationNanosecond(Vec<i64>),
240    Float32(Vec<f32>),
241    Float64(Vec<f64>),
242    Date32(Vec<i32>),
243    TimeMillis(Vec<i32>),
244    TimeMicros(Vec<i64>),
245    TimestampMillis(bool, Vec<i64>),
246    TimestampMicros(bool, Vec<i64>),
247    Int32ToInt64(Vec<i64>),
248    Int32ToFloat32(Vec<f32>),
249    Int32ToFloat64(Vec<f64>),
250    Int64ToFloat32(Vec<f32>),
251    Int64ToFloat64(Vec<f64>),
252    Float32ToFloat64(Vec<f64>),
253    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
254    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
255    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
256    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
257    String(OffsetBufferBuilder<i32>, Vec<u8>),
258    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
259    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
260    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
261    Record(Fields, Vec<Decoder>, Option<Projector>),
262    Map(
263        FieldRef,
264        OffsetBufferBuilder<i32>,
265        OffsetBufferBuilder<i32>,
266        Vec<u8>,
267        Box<Decoder>,
268    ),
269    Fixed(i32, Vec<u8>),
270    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
271    Duration(IntervalMonthDayNanoBuilder),
272    Uuid(Vec<u8>),
273    #[cfg(feature = "small_decimals")]
274    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
275    #[cfg(feature = "small_decimals")]
276    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
277    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
278    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
279    Union(UnionDecoder),
280    Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
281}
282
283impl Decoder {
284    fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
285        if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
286            if info.writer_is_union && !info.reader_is_union {
287                let mut clone = data_type.clone();
288                clone.resolution = None; // Build target base decoder without Union resolution
289                let target = Box::new(Self::try_new_internal(&clone)?);
290                let decoder = Self::Union(
291                    UnionDecoderBuilder::new()
292                        .with_resolved_union(info.clone())
293                        .with_target(target)
294                        .build()?,
295                );
296                return Ok(decoder);
297            }
298        }
299        Self::try_new_internal(data_type)
300    }
301
302    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, ArrowError> {
303        // Extract just the Promotion (if any) to simplify pattern matching
304        let promotion = match data_type.resolution.as_ref() {
305            Some(ResolutionInfo::Promotion(p)) => Some(p),
306            _ => None,
307        };
308        let decoder = match (data_type.codec(), promotion) {
309            (Codec::Int64, Some(Promotion::IntToLong)) => {
310                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
311            }
312            (Codec::Float32, Some(Promotion::IntToFloat)) => {
313                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
314            }
315            (Codec::Float64, Some(Promotion::IntToDouble)) => {
316                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
317            }
318            (Codec::Float32, Some(Promotion::LongToFloat)) => {
319                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
320            }
321            (Codec::Float64, Some(Promotion::LongToDouble)) => {
322                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
323            }
324            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
325                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
326            }
327            (Codec::Utf8, Some(Promotion::BytesToString))
328            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
329                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
330                Vec::with_capacity(DEFAULT_CAPACITY),
331            ),
332            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
333                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
334                Vec::with_capacity(DEFAULT_CAPACITY),
335            ),
336            (Codec::Null, _) => Self::Null(0),
337            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
338            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
339            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
340            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
341            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
342            (Codec::Binary, _) => Self::Binary(
343                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
344                Vec::with_capacity(DEFAULT_CAPACITY),
345            ),
346            (Codec::Utf8, _) => Self::String(
347                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
348                Vec::with_capacity(DEFAULT_CAPACITY),
349            ),
350            (Codec::Utf8View, _) => Self::StringView(
351                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
352                Vec::with_capacity(DEFAULT_CAPACITY),
353            ),
354            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
355            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
356            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
357            (Codec::TimestampMillis(is_utc), _) => {
358                Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
359            }
360            (Codec::TimestampMicros(is_utc), _) => {
361                Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
362            }
363            #[cfg(feature = "avro_custom_types")]
364            (Codec::DurationNanos, _) => {
365                Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
366            }
367            #[cfg(feature = "avro_custom_types")]
368            (Codec::DurationMicros, _) => {
369                Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
370            }
371            #[cfg(feature = "avro_custom_types")]
372            (Codec::DurationMillis, _) => {
373                Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
374            }
375            #[cfg(feature = "avro_custom_types")]
376            (Codec::DurationSeconds, _) => {
377                Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
378            }
379            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
380            (Codec::Decimal(precision, scale, size), _) => {
381                let p = *precision;
382                let s = *scale;
383                let prec = p as u8;
384                let scl = s.unwrap_or(0) as i8;
385                #[cfg(feature = "small_decimals")]
386                {
387                    if p <= DECIMAL32_MAX_PRECISION as usize {
388                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
389                            .with_precision_and_scale(prec, scl)?;
390                        Self::Decimal32(p, s, *size, builder)
391                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
392                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
393                            .with_precision_and_scale(prec, scl)?;
394                        Self::Decimal64(p, s, *size, builder)
395                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
396                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
397                            .with_precision_and_scale(prec, scl)?;
398                        Self::Decimal128(p, s, *size, builder)
399                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
400                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
401                            .with_precision_and_scale(prec, scl)?;
402                        Self::Decimal256(p, s, *size, builder)
403                    } else {
404                        return Err(ArrowError::ParseError(format!(
405                            "Decimal precision {p} exceeds maximum supported"
406                        )));
407                    }
408                }
409                #[cfg(not(feature = "small_decimals"))]
410                {
411                    if p <= DECIMAL128_MAX_PRECISION as usize {
412                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
413                            .with_precision_and_scale(prec, scl)?;
414                        Self::Decimal128(p, s, *size, builder)
415                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
416                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
417                            .with_precision_and_scale(prec, scl)?;
418                        Self::Decimal256(p, s, *size, builder)
419                    } else {
420                        return Err(ArrowError::ParseError(format!(
421                            "Decimal precision {p} exceeds maximum supported"
422                        )));
423                    }
424                }
425            }
426            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
427            (Codec::List(item), _) => {
428                let decoder = Self::try_new(item)?;
429                Self::Array(
430                    Arc::new(item.field_with_name("item")),
431                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
432                    Box::new(decoder),
433                )
434            }
435            (Codec::Enum(symbols), _) => {
436                let res = match data_type.resolution.as_ref() {
437                    Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
438                        mapping: mapping.mapping.clone(),
439                        default_index: mapping.default_index,
440                    }),
441                    _ => None,
442                };
443                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
444            }
445            (Codec::Struct(fields), _) => {
446                let mut arrow_fields = Vec::with_capacity(fields.len());
447                let mut encodings = Vec::with_capacity(fields.len());
448                for avro_field in fields.iter() {
449                    let encoding = Self::try_new(avro_field.data_type())?;
450                    arrow_fields.push(avro_field.field());
451                    encodings.push(encoding);
452                }
453                let projector =
454                    if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
455                        Some(ProjectorBuilder::try_new(rec, fields).build()?)
456                    } else {
457                        None
458                    };
459                Self::Record(arrow_fields.into(), encodings, projector)
460            }
461            (Codec::Map(child), _) => {
462                let val_field = child.field_with_name("value");
463                let map_field = Arc::new(ArrowField::new(
464                    "entries",
465                    DataType::Struct(Fields::from(vec![
466                        ArrowField::new("key", DataType::Utf8, false),
467                        val_field,
468                    ])),
469                    false,
470                ));
471                let val_dec = Self::try_new(child)?;
472                Self::Map(
473                    map_field,
474                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
475                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
476                    Vec::with_capacity(DEFAULT_CAPACITY),
477                    Box::new(val_dec),
478                )
479            }
480            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
481            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
482                let decoders = encodings
483                    .iter()
484                    .map(Self::try_new_internal)
485                    .collect::<Result<Vec<_>, _>>()?;
486                if fields.len() != decoders.len() {
487                    return Err(ArrowError::SchemaError(format!(
488                        "Union has {} fields but {} decoders",
489                        fields.len(),
490                        decoders.len()
491                    )));
492                }
493                // Proactive guard: if a user provides a union with more branches than
494                // a 32-bit Avro index can address, fail fast with a clear message.
495                let branch_count = decoders.len();
496                let max_addr = (i32::MAX as usize) + 1;
497                if branch_count > max_addr {
498                    return Err(ArrowError::SchemaError(format!(
499                        "Union has {branch_count} branches, which exceeds the maximum addressable \
500                         branches by an Avro int tag ({} + 1).",
501                        i32::MAX
502                    )));
503                }
504                let mut builder = UnionDecoderBuilder::new()
505                    .with_fields(fields.clone())
506                    .with_branches(decoders);
507                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
508                    if info.reader_is_union {
509                        builder = builder.with_resolved_union(info.clone());
510                    }
511                }
512                Self::Union(builder.build()?)
513            }
514            (Codec::Union(_, _, _), _) => {
515                return Err(ArrowError::NotYetImplemented(
516                    "Sparse Arrow unions are not yet supported".to_string(),
517                ));
518            }
519        };
520        Ok(match data_type.nullability() {
521            Some(nullability) => {
522                // Default to reading a union branch tag unless the resolution proves otherwise.
523                let mut plan = NullablePlan::ReadTag;
524                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
525                    if !info.writer_is_union && info.reader_is_union {
526                        if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
527                            plan = NullablePlan::FromSingle { promotion: *promo };
528                        }
529                    }
530                }
531                Self::Nullable(
532                    nullability,
533                    NullBufferBuilder::new(DEFAULT_CAPACITY),
534                    Box::new(decoder),
535                    plan,
536                )
537            }
538            None => decoder,
539        })
540    }
541
542    /// Append a null record
543    fn append_null(&mut self) -> Result<(), ArrowError> {
544        match self {
545            Self::Null(count) => *count += 1,
546            Self::Boolean(b) => b.append(false),
547            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
548            Self::Int64(v)
549            | Self::Int32ToInt64(v)
550            | Self::TimeMicros(v)
551            | Self::TimestampMillis(_, v)
552            | Self::TimestampMicros(_, v) => v.push(0),
553            #[cfg(feature = "avro_custom_types")]
554            Self::DurationSecond(v)
555            | Self::DurationMillisecond(v)
556            | Self::DurationMicrosecond(v)
557            | Self::DurationNanosecond(v) => v.push(0),
558            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
559            Self::Float64(v)
560            | Self::Int32ToFloat64(v)
561            | Self::Int64ToFloat64(v)
562            | Self::Float32ToFloat64(v) => v.push(0.),
563            Self::Binary(offsets, _)
564            | Self::String(offsets, _)
565            | Self::StringView(offsets, _)
566            | Self::BytesToString(offsets, _)
567            | Self::StringToBytes(offsets, _) => {
568                offsets.push_length(0);
569            }
570            Self::Uuid(v) => {
571                v.extend([0; 16]);
572            }
573            Self::Array(_, offsets, _) => {
574                offsets.push_length(0);
575            }
576            Self::Record(_, e, _) => {
577                for encoding in e.iter_mut() {
578                    encoding.append_null();
579                }
580            }
581            Self::Map(_, _koff, moff, _, _) => {
582                moff.push_length(0);
583            }
584            Self::Fixed(sz, accum) => {
585                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
586            }
587            #[cfg(feature = "small_decimals")]
588            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
589            #[cfg(feature = "small_decimals")]
590            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
591            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
592            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
593            Self::Enum(indices, _, _) => indices.push(0),
594            Self::Duration(builder) => builder.append_null(),
595            Self::Union(u) => u.append_null()?,
596            Self::Nullable(_, null_buffer, inner, _) => {
597                null_buffer.append(false);
598                inner.append_null();
599            }
600        }
601        Ok(())
602    }
603
604    /// Append a single default literal into the decoder's buffers
605    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
606        match self {
607            Self::Nullable(_, nb, inner, _) => {
608                if matches!(lit, AvroLiteral::Null) {
609                    nb.append(false);
610                    inner.append_null()
611                } else {
612                    nb.append(true);
613                    inner.append_default(lit)
614                }
615            }
616            Self::Null(count) => match lit {
617                AvroLiteral::Null => {
618                    *count += 1;
619                    Ok(())
620                }
621                _ => Err(ArrowError::InvalidArgumentError(
622                    "Non-null default for null type".to_string(),
623                )),
624            },
625            Self::Boolean(b) => match lit {
626                AvroLiteral::Boolean(v) => {
627                    b.append(*v);
628                    Ok(())
629                }
630                _ => Err(ArrowError::InvalidArgumentError(
631                    "Default for boolean must be boolean".to_string(),
632                )),
633            },
634            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
635                AvroLiteral::Int(i) => {
636                    v.push(*i);
637                    Ok(())
638                }
639                _ => Err(ArrowError::InvalidArgumentError(
640                    "Default for int32/date32/time-millis must be int".to_string(),
641                )),
642            },
643            #[cfg(feature = "avro_custom_types")]
644            Self::DurationSecond(v)
645            | Self::DurationMillisecond(v)
646            | Self::DurationMicrosecond(v)
647            | Self::DurationNanosecond(v) => match lit {
648                AvroLiteral::Long(i) => {
649                    v.push(*i);
650                    Ok(())
651                }
652                _ => Err(ArrowError::InvalidArgumentError(
653                    "Default for duration long must be long".to_string(),
654                )),
655            },
656            Self::Int64(v)
657            | Self::Int32ToInt64(v)
658            | Self::TimeMicros(v)
659            | Self::TimestampMillis(_, v)
660            | Self::TimestampMicros(_, v) => match lit {
661                AvroLiteral::Long(i) => {
662                    v.push(*i);
663                    Ok(())
664                }
665                AvroLiteral::Int(i) => {
666                    v.push(*i as i64);
667                    Ok(())
668                }
669                _ => Err(ArrowError::InvalidArgumentError(
670                    "Default for long/time-micros/timestamp must be long or int".to_string(),
671                )),
672            },
673            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
674                AvroLiteral::Float(f) => {
675                    v.push(*f);
676                    Ok(())
677                }
678                _ => Err(ArrowError::InvalidArgumentError(
679                    "Default for float must be float".to_string(),
680                )),
681            },
682            Self::Float64(v)
683            | Self::Int32ToFloat64(v)
684            | Self::Int64ToFloat64(v)
685            | Self::Float32ToFloat64(v) => match lit {
686                AvroLiteral::Double(f) => {
687                    v.push(*f);
688                    Ok(())
689                }
690                _ => Err(ArrowError::InvalidArgumentError(
691                    "Default for double must be double".to_string(),
692                )),
693            },
694            Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
695                AvroLiteral::Bytes(b) => {
696                    offsets.push_length(b.len());
697                    values.extend_from_slice(b);
698                    Ok(())
699                }
700                _ => Err(ArrowError::InvalidArgumentError(
701                    "Default for bytes must be bytes".to_string(),
702                )),
703            },
704            Self::BytesToString(offsets, values)
705            | Self::String(offsets, values)
706            | Self::StringView(offsets, values) => match lit {
707                AvroLiteral::String(s) => {
708                    let b = s.as_bytes();
709                    offsets.push_length(b.len());
710                    values.extend_from_slice(b);
711                    Ok(())
712                }
713                _ => Err(ArrowError::InvalidArgumentError(
714                    "Default for string must be string".to_string(),
715                )),
716            },
717            Self::Uuid(values) => match lit {
718                AvroLiteral::String(s) => {
719                    let uuid = Uuid::try_parse(s).map_err(|e| {
720                        ArrowError::InvalidArgumentError(format!("Invalid UUID default: {s} ({e})"))
721                    })?;
722                    values.extend_from_slice(uuid.as_bytes());
723                    Ok(())
724                }
725                _ => Err(ArrowError::InvalidArgumentError(
726                    "Default for uuid must be string".to_string(),
727                )),
728            },
729            Self::Fixed(sz, accum) => match lit {
730                AvroLiteral::Bytes(b) => {
731                    if b.len() != *sz as usize {
732                        return Err(ArrowError::InvalidArgumentError(format!(
733                            "Fixed default length {} does not match size {sz}",
734                            b.len(),
735                        )));
736                    }
737                    accum.extend_from_slice(b);
738                    Ok(())
739                }
740                _ => Err(ArrowError::InvalidArgumentError(
741                    "Default for fixed must be bytes".to_string(),
742                )),
743            },
744            #[cfg(feature = "small_decimals")]
745            Self::Decimal32(_, _, _, builder) => {
746                append_decimal_default!(lit, builder, 4, i32, "decimal32")
747            }
748            #[cfg(feature = "small_decimals")]
749            Self::Decimal64(_, _, _, builder) => {
750                append_decimal_default!(lit, builder, 8, i64, "decimal64")
751            }
752            Self::Decimal128(_, _, _, builder) => {
753                append_decimal_default!(lit, builder, 16, i128, "decimal128")
754            }
755            Self::Decimal256(_, _, _, builder) => {
756                append_decimal_default!(lit, builder, 32, i256, "decimal256")
757            }
758            Self::Duration(builder) => match lit {
759                AvroLiteral::Bytes(b) => {
760                    if b.len() != 12 {
761                        return Err(ArrowError::InvalidArgumentError(format!(
762                            "Duration default must be exactly 12 bytes, got {}",
763                            b.len()
764                        )));
765                    }
766                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
767                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
768                    let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
769                    let nanos = (millis as i64) * 1_000_000;
770                    builder.append_value(IntervalMonthDayNano::new(
771                        months as i32,
772                        days as i32,
773                        nanos,
774                    ));
775                    Ok(())
776                }
777                _ => Err(ArrowError::InvalidArgumentError(
778                    "Default for duration must be 12-byte little-endian months/days/millis"
779                        .to_string(),
780                )),
781            },
782            Self::Array(_, offsets, inner) => match lit {
783                AvroLiteral::Array(items) => {
784                    offsets.push_length(items.len());
785                    for item in items {
786                        inner.append_default(item)?;
787                    }
788                    Ok(())
789                }
790                _ => Err(ArrowError::InvalidArgumentError(
791                    "Default for array must be an array literal".to_string(),
792                )),
793            },
794            Self::Map(_, koff, moff, kdata, valdec) => match lit {
795                AvroLiteral::Map(entries) => {
796                    moff.push_length(entries.len());
797                    for (k, v) in entries {
798                        let kb = k.as_bytes();
799                        koff.push_length(kb.len());
800                        kdata.extend_from_slice(kb);
801                        valdec.append_default(v)?;
802                    }
803                    Ok(())
804                }
805                _ => Err(ArrowError::InvalidArgumentError(
806                    "Default for map must be a map/object literal".to_string(),
807                )),
808            },
809            Self::Enum(indices, symbols, _) => match lit {
810                AvroLiteral::Enum(sym) => {
811                    let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
812                        ArrowError::InvalidArgumentError(format!(
813                            "Enum default symbol {sym:?} not in reader symbols"
814                        ))
815                    })?;
816                    indices.push(pos as i32);
817                    Ok(())
818                }
819                _ => Err(ArrowError::InvalidArgumentError(
820                    "Default for enum must be a symbol".to_string(),
821                )),
822            },
823            Self::Union(u) => u.append_default(lit),
824            Self::Record(field_meta, decoders, projector) => match lit {
825                AvroLiteral::Map(entries) => {
826                    for (i, dec) in decoders.iter_mut().enumerate() {
827                        let name = field_meta[i].name();
828                        if let Some(sub) = entries.get(name) {
829                            dec.append_default(sub)?;
830                        } else if let Some(proj) = projector.as_ref() {
831                            proj.project_default(dec, i)?;
832                        } else {
833                            dec.append_null();
834                        }
835                    }
836                    Ok(())
837                }
838                AvroLiteral::Null => {
839                    for (i, dec) in decoders.iter_mut().enumerate() {
840                        if let Some(proj) = projector.as_ref() {
841                            proj.project_default(dec, i)?;
842                        } else {
843                            dec.append_null();
844                        }
845                    }
846                    Ok(())
847                }
848                _ => Err(ArrowError::InvalidArgumentError(
849                    "Default for record must be a map/object or null".to_string(),
850                )),
851            },
852        }
853    }
854
855    /// Decode a single record from `buf`
856    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
857        match self {
858            Self::Null(x) => *x += 1,
859            Self::Boolean(values) => values.append(buf.get_bool()?),
860            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
861                values.push(buf.get_int()?)
862            }
863            Self::Int64(values)
864            | Self::TimeMicros(values)
865            | Self::TimestampMillis(_, values)
866            | Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
867            #[cfg(feature = "avro_custom_types")]
868            Self::DurationSecond(values)
869            | Self::DurationMillisecond(values)
870            | Self::DurationMicrosecond(values)
871            | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
872            Self::Float32(values) => values.push(buf.get_float()?),
873            Self::Float64(values) => values.push(buf.get_double()?),
874            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
875            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
876            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
877            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
878            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
879            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
880            Self::StringToBytes(offsets, values)
881            | Self::BytesToString(offsets, values)
882            | Self::Binary(offsets, values)
883            | Self::String(offsets, values)
884            | Self::StringView(offsets, values) => {
885                let data = buf.get_bytes()?;
886                offsets.push_length(data.len());
887                values.extend_from_slice(data);
888            }
889            Self::Uuid(values) => {
890                let s_bytes = buf.get_bytes()?;
891                let s = std::str::from_utf8(s_bytes).map_err(|e| {
892                    ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
893                })?;
894                let uuid = Uuid::try_parse(s)
895                    .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?;
896                values.extend_from_slice(uuid.as_bytes());
897            }
898            Self::Array(_, off, encoding) => {
899                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
900                off.push_length(total_items);
901            }
902            Self::Record(_, encodings, None) => {
903                for encoding in encodings {
904                    encoding.decode(buf)?;
905                }
906            }
907            Self::Record(_, encodings, Some(proj)) => {
908                proj.project_record(buf, encodings)?;
909            }
910            Self::Map(_, koff, moff, kdata, valdec) => {
911                let newly_added = read_blocks(buf, |cur| {
912                    let kb = cur.get_bytes()?;
913                    koff.push_length(kb.len());
914                    kdata.extend_from_slice(kb);
915                    valdec.decode(cur)
916                })?;
917                moff.push_length(newly_added);
918            }
919            Self::Fixed(sz, accum) => {
920                let fx = buf.get_fixed(*sz as usize)?;
921                accum.extend_from_slice(fx);
922            }
923            #[cfg(feature = "small_decimals")]
924            Self::Decimal32(_, _, size, builder) => {
925                decode_decimal!(size, buf, builder, 4, i32);
926            }
927            #[cfg(feature = "small_decimals")]
928            Self::Decimal64(_, _, size, builder) => {
929                decode_decimal!(size, buf, builder, 8, i64);
930            }
931            Self::Decimal128(_, _, size, builder) => {
932                decode_decimal!(size, buf, builder, 16, i128);
933            }
934            Self::Decimal256(_, _, size, builder) => {
935                decode_decimal!(size, buf, builder, 32, i256);
936            }
937            Self::Enum(indices, _, None) => {
938                indices.push(buf.get_int()?);
939            }
940            Self::Enum(indices, _, Some(res)) => {
941                let raw = buf.get_int()?;
942                let resolved = usize::try_from(raw)
943                    .ok()
944                    .and_then(|idx| res.mapping.get(idx).copied())
945                    .filter(|&idx| idx >= 0)
946                    .unwrap_or(res.default_index);
947                if resolved >= 0 {
948                    indices.push(resolved);
949                } else {
950                    return Err(ArrowError::ParseError(format!(
951                        "Enum symbol index {raw} not resolvable and no default provided",
952                    )));
953                }
954            }
955            Self::Duration(builder) => {
956                let b = buf.get_fixed(12)?;
957                let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
958                let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
959                let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
960                let nanos = (millis as i64) * 1_000_000;
961                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
962            }
963            Self::Union(u) => u.decode(buf)?,
964            Self::Nullable(order, nb, encoding, plan) => {
965                match *plan {
966                    NullablePlan::FromSingle { promotion } => {
967                        encoding.decode_with_promotion(buf, promotion)?;
968                        nb.append(true);
969                    }
970                    NullablePlan::ReadTag => {
971                        let branch = buf.read_vlq()?;
972                        let is_not_null = match *order {
973                            Nullability::NullFirst => branch != 0,
974                            Nullability::NullSecond => branch == 0,
975                        };
976                        if is_not_null {
977                            // It is important to decode before appending to null buffer in case of decode error
978                            encoding.decode(buf)?;
979                        } else {
980                            encoding.append_null();
981                        }
982                        nb.append(is_not_null);
983                    }
984                }
985            }
986        }
987        Ok(())
988    }
989
990    fn decode_with_promotion(
991        &mut self,
992        buf: &mut AvroCursor<'_>,
993        promotion: Promotion,
994    ) -> Result<(), ArrowError> {
995        macro_rules! promote_numeric_to {
996            ($variant:ident, $getter:ident, $to:ty) => {{
997                match self {
998                    Self::$variant(v) => {
999                        let x = buf.$getter()?;
1000                        v.push(x as $to);
1001                        Ok(())
1002                    }
1003                    other => Err(ArrowError::ParseError(format!(
1004                        "Promotion {promotion} target mismatch: expected {}, got {}",
1005                        stringify!($variant),
1006                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
1007                    ))),
1008                }
1009            }};
1010        }
1011        match promotion {
1012            Promotion::Direct => self.decode(buf),
1013            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1014            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1015            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1016            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1017            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1018            Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1019            Promotion::StringToBytes => match self {
1020                Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1021                    let data = buf.get_bytes()?;
1022                    offsets.push_length(data.len());
1023                    values.extend_from_slice(data);
1024                    Ok(())
1025                }
1026                other => Err(ArrowError::ParseError(format!(
1027                    "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1028                    <Self as AsRef<str>>::as_ref(other)
1029                ))),
1030            },
1031            Promotion::BytesToString => match self {
1032                Self::String(offsets, values)
1033                | Self::StringView(offsets, values)
1034                | Self::BytesToString(offsets, values) => {
1035                    let data = buf.get_bytes()?;
1036                    offsets.push_length(data.len());
1037                    values.extend_from_slice(data);
1038                    Ok(())
1039                }
1040                other => Err(ArrowError::ParseError(format!(
1041                    "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1042                    <Self as AsRef<str>>::as_ref(other)
1043                ))),
1044            },
1045        }
1046    }
1047
1048    /// Flush decoded records to an [`ArrayRef`]
1049    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1050        Ok(match self {
1051            Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1052            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1053            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1054            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1055            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1056            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1057            Self::TimeMillis(values) => {
1058                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1059            }
1060            Self::TimeMicros(values) => {
1061                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1062            }
1063            Self::TimestampMillis(is_utc, values) => Arc::new(
1064                flush_primitive::<TimestampMillisecondType>(values, nulls)
1065                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1066            ),
1067            Self::TimestampMicros(is_utc, values) => Arc::new(
1068                flush_primitive::<TimestampMicrosecondType>(values, nulls)
1069                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1070            ),
1071            #[cfg(feature = "avro_custom_types")]
1072            Self::DurationSecond(values) => {
1073                Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1074            }
1075            #[cfg(feature = "avro_custom_types")]
1076            Self::DurationMillisecond(values) => {
1077                Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1078            }
1079            #[cfg(feature = "avro_custom_types")]
1080            Self::DurationMicrosecond(values) => {
1081                Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1082            }
1083            #[cfg(feature = "avro_custom_types")]
1084            Self::DurationNanosecond(values) => {
1085                Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1086            }
1087            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1088            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1089            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1090            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1091                Arc::new(flush_primitive::<Float32Type>(values, nulls))
1092            }
1093            Self::Int32ToFloat64(values)
1094            | Self::Int64ToFloat64(values)
1095            | Self::Float32ToFloat64(values) => {
1096                Arc::new(flush_primitive::<Float64Type>(values, nulls))
1097            }
1098            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1099                let offsets = flush_offsets(offsets);
1100                let values = flush_values(values).into();
1101                Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1102            }
1103            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1104                let offsets = flush_offsets(offsets);
1105                let values = flush_values(values).into();
1106                Arc::new(StringArray::try_new(offsets, values, nulls)?)
1107            }
1108            Self::StringView(offsets, values) => {
1109                let offsets = flush_offsets(offsets);
1110                let values = flush_values(values);
1111                let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1112                let values: Vec<&str> = (0..array.len())
1113                    .map(|i| {
1114                        if array.is_valid(i) {
1115                            array.value(i)
1116                        } else {
1117                            ""
1118                        }
1119                    })
1120                    .collect();
1121                Arc::new(StringViewArray::from(values))
1122            }
1123            Self::Array(field, offsets, values) => {
1124                let values = values.flush(None)?;
1125                let offsets = flush_offsets(offsets);
1126                Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1127            }
1128            Self::Record(fields, encodings, _) => {
1129                let arrays = encodings
1130                    .iter_mut()
1131                    .map(|x| x.flush(None))
1132                    .collect::<Result<Vec<_>, _>>()?;
1133                Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1134            }
1135            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1136                let moff = flush_offsets(m_off);
1137                let koff = flush_offsets(k_off);
1138                let kd = flush_values(kdata).into();
1139                let val_arr = valdec.flush(None)?;
1140                let key_arr = StringArray::try_new(koff, kd, None)?;
1141                if key_arr.len() != val_arr.len() {
1142                    return Err(ArrowError::InvalidArgumentError(format!(
1143                        "Map keys length ({}) != map values length ({})",
1144                        key_arr.len(),
1145                        val_arr.len()
1146                    )));
1147                }
1148                let final_len = moff.len() - 1;
1149                if let Some(n) = &nulls {
1150                    if n.len() != final_len {
1151                        return Err(ArrowError::InvalidArgumentError(format!(
1152                            "Map array null buffer length {} != final map length {final_len}",
1153                            n.len()
1154                        )));
1155                    }
1156                }
1157                let entries_fields = match map_field.data_type() {
1158                    DataType::Struct(fields) => fields.clone(),
1159                    other => {
1160                        return Err(ArrowError::InvalidArgumentError(format!(
1161                            "Map entries field must be a Struct, got {other:?}"
1162                        )));
1163                    }
1164                };
1165                let entries_struct =
1166                    StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1167                let map_arr =
1168                    MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1169                Arc::new(map_arr)
1170            }
1171            Self::Fixed(sz, accum) => {
1172                let b: Buffer = flush_values(accum).into();
1173                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1174                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1175                Arc::new(arr)
1176            }
1177            Self::Uuid(values) => {
1178                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1179                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1180                Arc::new(arr)
1181            }
1182            #[cfg(feature = "small_decimals")]
1183            Self::Decimal32(precision, scale, _, builder) => {
1184                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1185            }
1186            #[cfg(feature = "small_decimals")]
1187            Self::Decimal64(precision, scale, _, builder) => {
1188                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1189            }
1190            Self::Decimal128(precision, scale, _, builder) => {
1191                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1192            }
1193            Self::Decimal256(precision, scale, _, builder) => {
1194                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1195            }
1196            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1197            Self::Duration(builder) => {
1198                let (_, vals, _) = builder.finish().into_parts();
1199                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1200                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1201                Arc::new(vals)
1202            }
1203            Self::Union(u) => u.flush(nulls)?,
1204        })
1205    }
1206}
1207
1208// A lookup table for resolving fields between writer and reader schemas during record projection.
1209#[derive(Debug)]
1210struct DispatchLookupTable {
1211    // Maps each reader field index `r` to the corresponding writer field index.
1212    //
1213    // Semantics:
1214    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. The value from
1215    //   the writer field is decoded, and `promotion[r]` is applied.
1216    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. The reader field's
1217    //   default value is used.
1218    //
1219    // Representation (`i8`):
1220    // `i8` is used for a dense, cache-friendly dispatch table, consistent with Arrow's use of
1221    // `i8` for union type IDs. This requires that writer field indices do not exceed `i8::MAX`.
1222    //
1223    // Invariants:
1224    // - `to_reader.len() == promotion.len()` and matches the reader field count.
1225    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
1226    to_reader: Box<[i8]>,
1227    // For each reader field `r`, specifies the `Promotion` to apply to the writer's value.
1228    //
1229    // This is used when a writer field's type can be promoted to a reader field's type
1230    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
1231    promotion: Box<[Promotion]>,
1232}
1233
1234// Sentinel used in `DispatchLookupTable::to_reader` to mark
1235// "no matching writer field".
1236const NO_SOURCE: i8 = -1;
1237
1238impl DispatchLookupTable {
1239    fn from_writer_to_reader(
1240        promotion_map: &[Option<(usize, Promotion)>],
1241    ) -> Result<Self, ArrowError> {
1242        let mut to_reader = Vec::with_capacity(promotion_map.len());
1243        let mut promotion = Vec::with_capacity(promotion_map.len());
1244        for map in promotion_map {
1245            match *map {
1246                Some((idx, promo)) => {
1247                    let idx_i8 = i8::try_from(idx).map_err(|_| {
1248                        ArrowError::SchemaError(format!(
1249                            "Reader branch index {idx} exceeds i8 range (max {})",
1250                            i8::MAX
1251                        ))
1252                    })?;
1253                    to_reader.push(idx_i8);
1254                    promotion.push(promo);
1255                }
1256                None => {
1257                    to_reader.push(NO_SOURCE);
1258                    promotion.push(Promotion::Direct);
1259                }
1260            }
1261        }
1262        Ok(Self {
1263            to_reader: to_reader.into_boxed_slice(),
1264            promotion: promotion.into_boxed_slice(),
1265        })
1266    }
1267
1268    // Resolve a writer branch index to (reader_idx, promotion)
1269    #[inline]
1270    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1271        let reader_index = *self.to_reader.get(writer_index)?;
1272        (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1273    }
1274}
1275
1276#[derive(Debug)]
1277struct UnionDecoder {
1278    fields: UnionFields,
1279    type_ids: Vec<i8>,
1280    offsets: Vec<i32>,
1281    branches: Vec<Decoder>,
1282    counts: Vec<i32>,
1283    reader_type_codes: Vec<i8>,
1284    null_branch: Option<usize>,
1285    default_emit_idx: usize,
1286    null_emit_idx: usize,
1287    plan: UnionReadPlan,
1288}
1289
1290impl Default for UnionDecoder {
1291    fn default() -> Self {
1292        Self {
1293            fields: UnionFields::empty(),
1294            type_ids: Vec::new(),
1295            offsets: Vec::new(),
1296            branches: Vec::new(),
1297            counts: Vec::new(),
1298            reader_type_codes: Vec::new(),
1299            null_branch: None,
1300            default_emit_idx: 0,
1301            null_emit_idx: 0,
1302            plan: UnionReadPlan::Passthrough,
1303        }
1304    }
1305}
1306
1307#[derive(Debug)]
1308enum UnionReadPlan {
1309    ReaderUnion {
1310        lookup_table: DispatchLookupTable,
1311    },
1312    FromSingle {
1313        reader_idx: usize,
1314        promotion: Promotion,
1315    },
1316    ToSingle {
1317        target: Box<Decoder>,
1318        lookup_table: DispatchLookupTable,
1319    },
1320    Passthrough,
1321}
1322
1323impl UnionDecoder {
1324    fn try_new(
1325        fields: UnionFields,
1326        branches: Vec<Decoder>,
1327        resolved: Option<ResolvedUnion>,
1328    ) -> Result<Self, ArrowError> {
1329        let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1330        let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1331        let default_emit_idx = 0;
1332        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1333        let branch_len = branches.len().max(reader_type_codes.len());
1334        // Guard against impractically large unions that cannot be indexed by an Avro int
1335        let max_addr = (i32::MAX as usize) + 1;
1336        if branches.len() > max_addr {
1337            return Err(ArrowError::SchemaError(format!(
1338                "Reader union has {} branches, which exceeds the maximum addressable \
1339                 branches by an Avro int tag ({} + 1).",
1340                branches.len(),
1341                i32::MAX
1342            )));
1343        }
1344        Ok(Self {
1345            fields,
1346            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1347            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1348            branches,
1349            counts: vec![0; branch_len],
1350            reader_type_codes,
1351            null_branch,
1352            default_emit_idx,
1353            null_emit_idx,
1354            plan: Self::plan_from_resolved(resolved)?,
1355        })
1356    }
1357
1358    fn try_new_from_writer_union(
1359        info: ResolvedUnion,
1360        target: Box<Decoder>,
1361    ) -> Result<Self, ArrowError> {
1362        // This constructor is only for writer-union to single-type resolution
1363        debug_assert!(info.writer_is_union && !info.reader_is_union);
1364        let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1365        Ok(Self {
1366            plan: UnionReadPlan::ToSingle {
1367                target,
1368                lookup_table,
1369            },
1370            ..Self::default()
1371        })
1372    }
1373
1374    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, ArrowError> {
1375        let Some(info) = resolved else {
1376            return Ok(UnionReadPlan::Passthrough);
1377        };
1378        match (info.writer_is_union, info.reader_is_union) {
1379            (true, true) => {
1380                let lookup_table =
1381                    DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1382                Ok(UnionReadPlan::ReaderUnion { lookup_table })
1383            }
1384            (false, true) => {
1385                let Some(&(reader_idx, promotion)) =
1386                    info.writer_to_reader.first().and_then(Option::as_ref)
1387                else {
1388                    return Err(ArrowError::SchemaError(
1389                        "Writer type does not match any reader union branch".to_string(),
1390                    ));
1391                };
1392                Ok(UnionReadPlan::FromSingle {
1393                    reader_idx,
1394                    promotion,
1395                })
1396            }
1397            (true, false) => Err(ArrowError::InvalidArgumentError(
1398                "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1399                    .to_string(),
1400            )),
1401            // (false, false) is invalid and should never be constructed by the resolver.
1402            _ => Err(ArrowError::SchemaError(
1403                "ResolvedUnion constructed for non-union sides; resolver should return None"
1404                    .to_string(),
1405            )),
1406        }
1407    }
1408
1409    #[inline]
1410    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
1411        // Avro unions are encoded by first writing the zero-based branch index.
1412        // In Avro 1.11.1 this is specified as an *int*; older specs said *long*,
1413        // but both use zig-zag varint encoding, so decoding as long is compatible
1414        // with either form and widely used in practice.
1415        let raw = buf.get_long()?;
1416        if raw < 0 {
1417            return Err(ArrowError::ParseError(format!(
1418                "Negative union branch index {raw}"
1419            )));
1420        }
1421        usize::try_from(raw).map_err(|_| {
1422            ArrowError::ParseError(format!(
1423                "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1424                (usize::BITS as usize)
1425            ))
1426        })
1427    }
1428
1429    #[inline]
1430    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, ArrowError> {
1431        let branches_len = self.branches.len();
1432        let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1433            return Err(ArrowError::ParseError(format!(
1434                "Union branch index {reader_idx} out of range ({branches_len} branches)"
1435            )));
1436        };
1437        self.type_ids.push(self.reader_type_codes[reader_idx]);
1438        self.offsets.push(self.counts[reader_idx]);
1439        self.counts[reader_idx] += 1;
1440        Ok(reader_branch)
1441    }
1442
1443    #[inline]
1444    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), ArrowError>
1445    where
1446        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
1447    {
1448        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1449            return action(target);
1450        }
1451        let reader_idx = match &self.plan {
1452            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1453            _ => fallback_idx,
1454        };
1455        self.emit_to(reader_idx).and_then(action)
1456    }
1457
1458    fn append_null(&mut self) -> Result<(), ArrowError> {
1459        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1460    }
1461
1462    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
1463        self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1464    }
1465
1466    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1467        let (reader_idx, promotion) = match &mut self.plan {
1468            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1469            UnionReadPlan::ReaderUnion { lookup_table } => {
1470                let idx = Self::read_tag(buf)?;
1471                lookup_table.resolve(idx).ok_or_else(|| {
1472                    ArrowError::ParseError(format!(
1473                        "Union branch index {idx} not resolvable by reader schema"
1474                    ))
1475                })?
1476            }
1477            UnionReadPlan::FromSingle {
1478                reader_idx,
1479                promotion,
1480            } => (*reader_idx, *promotion),
1481            UnionReadPlan::ToSingle {
1482                target,
1483                lookup_table,
1484            } => {
1485                let idx = Self::read_tag(buf)?;
1486                return match lookup_table.resolve(idx) {
1487                    Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1488                    None => Err(ArrowError::ParseError(format!(
1489                        "Writer union branch {idx} does not resolve to reader type"
1490                    ))),
1491                };
1492            }
1493        };
1494        let decoder = self.emit_to(reader_idx)?;
1495        decoder.decode_with_promotion(buf, promotion)
1496    }
1497
1498    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1499        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1500            return target.flush(nulls);
1501        }
1502        debug_assert!(
1503            nulls.is_none(),
1504            "UnionArray does not accept a validity bitmap; \
1505                     nulls should have been materialized as a Null child during decode"
1506        );
1507        let children = self
1508            .branches
1509            .iter_mut()
1510            .map(|d| d.flush(None))
1511            .collect::<Result<Vec<_>, _>>()?;
1512        let arr = UnionArray::try_new(
1513            self.fields.clone(),
1514            flush_values(&mut self.type_ids).into_iter().collect(),
1515            Some(flush_values(&mut self.offsets).into_iter().collect()),
1516            children,
1517        )
1518        .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1519        Ok(Arc::new(arr))
1520    }
1521}
1522
1523#[derive(Debug, Default)]
1524struct UnionDecoderBuilder {
1525    fields: Option<UnionFields>,
1526    branches: Option<Vec<Decoder>>,
1527    resolved: Option<ResolvedUnion>,
1528    target: Option<Box<Decoder>>,
1529}
1530
1531impl UnionDecoderBuilder {
1532    fn new() -> Self {
1533        Self::default()
1534    }
1535
1536    fn with_fields(mut self, fields: UnionFields) -> Self {
1537        self.fields = Some(fields);
1538        self
1539    }
1540
1541    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
1542        self.branches = Some(branches);
1543        self
1544    }
1545
1546    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
1547        self.resolved = Some(resolved_union);
1548        self
1549    }
1550
1551    fn with_target(mut self, target: Box<Decoder>) -> Self {
1552        self.target = Some(target);
1553        self
1554    }
1555
1556    fn build(self) -> Result<UnionDecoder, ArrowError> {
1557        match (self.resolved, self.fields, self.branches, self.target) {
1558            (resolved, Some(fields), Some(branches), None) => {
1559                UnionDecoder::try_new(fields, branches, resolved)
1560            }
1561            (Some(info), None, None, Some(target))
1562                if info.writer_is_union && !info.reader_is_union =>
1563            {
1564                UnionDecoder::try_new_from_writer_union(info, target)
1565            }
1566            _ => Err(ArrowError::InvalidArgumentError(
1567                "Invalid UnionDecoderBuilder configuration: expected either \
1568                 (fields + branches + resolved) with no target for reader-unions, or \
1569                 (resolved + target) with no fields/branches for writer-union to single."
1570                    .to_string(),
1571            )),
1572        }
1573    }
1574}
1575
1576#[derive(Debug, Copy, Clone)]
1577enum NegativeBlockBehavior {
1578    ProcessItems,
1579    SkipBySize,
1580}
1581
1582#[inline]
1583fn skip_blocks(
1584    buf: &mut AvroCursor,
1585    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1586) -> Result<usize, ArrowError> {
1587    process_blockwise(
1588        buf,
1589        move |c| skip_item(c),
1590        NegativeBlockBehavior::SkipBySize,
1591    )
1592}
1593
1594#[inline]
1595fn flush_dict(
1596    indices: &mut Vec<i32>,
1597    symbols: &[String],
1598    nulls: Option<NullBuffer>,
1599) -> Result<ArrayRef, ArrowError> {
1600    let keys = flush_primitive::<Int32Type>(indices, nulls);
1601    let values = Arc::new(StringArray::from_iter_values(
1602        symbols.iter().map(|s| s.as_str()),
1603    ));
1604    DictionaryArray::try_new(keys, values)
1605        .map_err(|e| ArrowError::ParseError(e.to_string()))
1606        .map(|arr| Arc::new(arr) as ArrayRef)
1607}
1608
1609#[inline]
1610fn read_blocks(
1611    buf: &mut AvroCursor,
1612    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1613) -> Result<usize, ArrowError> {
1614    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
1615}
1616
1617#[inline]
1618fn process_blockwise(
1619    buf: &mut AvroCursor,
1620    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1621    negative_behavior: NegativeBlockBehavior,
1622) -> Result<usize, ArrowError> {
1623    let mut total = 0usize;
1624    loop {
1625        // Read the block count
1626        //  positive = that many items
1627        //  negative = that many items + read block size
1628        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
1629        let block_count = buf.get_long()?;
1630        match block_count.cmp(&0) {
1631            Ordering::Equal => break,
1632            Ordering::Less => {
1633                let count = (-block_count) as usize;
1634                // A negative count is followed by a long of the size in bytes
1635                let size_in_bytes = buf.get_long()? as usize;
1636                match negative_behavior {
1637                    NegativeBlockBehavior::ProcessItems => {
1638                        // Process items one-by-one after reading size
1639                        for _ in 0..count {
1640                            on_item(buf)?;
1641                        }
1642                    }
1643                    NegativeBlockBehavior::SkipBySize => {
1644                        // Skip the entire block payload at once
1645                        let _ = buf.get_fixed(size_in_bytes)?;
1646                    }
1647                }
1648                total += count;
1649            }
1650            Ordering::Greater => {
1651                let count = block_count as usize;
1652                for _ in 0..count {
1653                    on_item(buf)?;
1654                }
1655                total += count;
1656            }
1657        }
1658    }
1659    Ok(total)
1660}
1661
1662#[inline]
1663fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
1664    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
1665}
1666
1667#[inline]
1668fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
1669    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
1670}
1671
1672#[inline]
1673fn flush_primitive<T: ArrowPrimitiveType>(
1674    values: &mut Vec<T::Native>,
1675    nulls: Option<NullBuffer>,
1676) -> PrimitiveArray<T> {
1677    PrimitiveArray::new(flush_values(values).into(), nulls)
1678}
1679
1680#[inline]
1681fn read_decimal_bytes_be<const N: usize>(
1682    buf: &mut AvroCursor<'_>,
1683    size: &Option<usize>,
1684) -> Result<[u8; N], ArrowError> {
1685    match size {
1686        Some(n) if *n == N => {
1687            let raw = buf.get_fixed(N)?;
1688            let mut arr = [0u8; N];
1689            arr.copy_from_slice(raw);
1690            Ok(arr)
1691        }
1692        Some(n) => {
1693            let raw = buf.get_fixed(*n)?;
1694            sign_cast_to::<N>(raw)
1695        }
1696        None => {
1697            let raw = buf.get_bytes()?;
1698            sign_cast_to::<N>(raw)
1699        }
1700    }
1701}
1702
1703/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
1704/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
1705/// the payload is a big-endian two's-complement integer, and when narrowing it must
1706/// be representable without changing sign or value.
1707///
1708/// If `raw.len() < N`, the value is sign-extended.
1709/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
1710/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
1711#[inline]
1712fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
1713    let len = raw.len();
1714    // Fast path: exact width, just copy
1715    if len == N {
1716        let mut out = [0u8; N];
1717        out.copy_from_slice(raw);
1718        return Ok(out);
1719    }
1720    // Determine sign byte from MSB of first byte (empty => positive)
1721    let first = raw.first().copied().unwrap_or(0u8);
1722    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
1723    // Pre-fill with sign byte to support sign extension
1724    let mut out = [sign_byte; N];
1725    if len > N {
1726        // Validate truncation: all dropped leading bytes must equal sign_byte,
1727        // and the MSB of the first kept byte must match the sign.
1728        let extra = len - N;
1729        // Any non-sign byte in the truncated prefix indicates overflow
1730        if raw[..extra].iter().any(|&b| b != sign_byte) {
1731            return Err(ArrowError::ParseError(format!(
1732                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1733                len, N
1734            )));
1735        }
1736        if N > 0 {
1737            let first_kept = raw[extra];
1738            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
1739            if sign_bit_mismatch {
1740                return Err(ArrowError::ParseError(format!(
1741                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1742                    len, N
1743                )));
1744            }
1745        }
1746        out.copy_from_slice(&raw[extra..]);
1747        return Ok(out);
1748    }
1749    out[N - len..].copy_from_slice(raw);
1750    Ok(out)
1751}
1752
1753#[derive(Debug)]
1754struct Projector {
1755    writer_to_reader: Arc<[Option<usize>]>,
1756    skip_decoders: Vec<Option<Skipper>>,
1757    field_defaults: Vec<Option<AvroLiteral>>,
1758    default_injections: Arc<[(usize, AvroLiteral)]>,
1759}
1760
1761#[derive(Debug)]
1762struct ProjectorBuilder<'a> {
1763    rec: &'a ResolvedRecord,
1764    reader_fields: Arc<[AvroField]>,
1765}
1766
1767impl<'a> ProjectorBuilder<'a> {
1768    #[inline]
1769    fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
1770        Self {
1771            rec,
1772            reader_fields: reader_fields.clone(),
1773        }
1774    }
1775
1776    #[inline]
1777    fn build(self) -> Result<Projector, ArrowError> {
1778        let reader_fields = self.reader_fields;
1779        let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
1780        for avro_field in reader_fields.as_ref() {
1781            if let Some(ResolutionInfo::DefaultValue(lit)) =
1782                avro_field.data_type().resolution.as_ref()
1783            {
1784                field_defaults.push(Some(lit.clone()));
1785            } else {
1786                field_defaults.push(None);
1787            }
1788        }
1789        let mut default_injections: Vec<(usize, AvroLiteral)> =
1790            Vec::with_capacity(self.rec.default_fields.len());
1791        for &idx in self.rec.default_fields.as_ref() {
1792            let lit = field_defaults
1793                .get(idx)
1794                .and_then(|lit| lit.clone())
1795                .unwrap_or(AvroLiteral::Null);
1796            default_injections.push((idx, lit));
1797        }
1798        let mut skip_decoders: Vec<Option<Skipper>> =
1799            Vec::with_capacity(self.rec.skip_fields.len());
1800        for datatype in self.rec.skip_fields.as_ref() {
1801            let skipper = match datatype {
1802                Some(datatype) => Some(Skipper::from_avro(datatype)?),
1803                None => None,
1804            };
1805            skip_decoders.push(skipper);
1806        }
1807        Ok(Projector {
1808            writer_to_reader: self.rec.writer_to_reader.clone(),
1809            skip_decoders,
1810            field_defaults,
1811            default_injections: default_injections.into(),
1812        })
1813    }
1814}
1815
1816impl Projector {
1817    #[inline]
1818    fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), ArrowError> {
1819        // SAFETY: `index` is obtained by listing the reader's record fields (i.e., from
1820        // `decoders.iter_mut().enumerate()`), and `field_defaults` was built in
1821        // `ProjectorBuilder::build` to have exactly one element per reader field.
1822        // Therefore, `index < self.field_defaults.len()` always holds here, so
1823        // `self.field_defaults[index]` cannot panic. We only take an immutable reference
1824        // via `.as_ref()`, and `self` is borrowed immutably.
1825        if let Some(default_literal) = self.field_defaults[index].as_ref() {
1826            decoder.append_default(default_literal)
1827        } else {
1828            decoder.append_null()
1829        }
1830    }
1831
1832    #[inline]
1833    fn project_record(
1834        &mut self,
1835        buf: &mut AvroCursor<'_>,
1836        encodings: &mut [Decoder],
1837    ) -> Result<(), ArrowError> {
1838        debug_assert_eq!(
1839            self.writer_to_reader.len(),
1840            self.skip_decoders.len(),
1841            "internal invariant: mapping and skipper lists must have equal length"
1842        );
1843        for (i, (mapping, skipper_opt)) in self
1844            .writer_to_reader
1845            .iter()
1846            .zip(self.skip_decoders.iter_mut())
1847            .enumerate()
1848        {
1849            match (mapping, skipper_opt.as_mut()) {
1850                (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
1851                (None, Some(skipper)) => skipper.skip(buf)?,
1852                (None, None) => {
1853                    return Err(ArrowError::SchemaError(format!(
1854                        "No skipper available for writer-only field at index {i}",
1855                    )));
1856                }
1857            }
1858        }
1859        for (reader_index, lit) in self.default_injections.as_ref() {
1860            encodings[*reader_index].append_default(lit)?;
1861        }
1862        Ok(())
1863    }
1864}
1865
1866/// Lightweight skipper for non‑projected writer fields
1867/// (fields present in the writer schema but omitted by the reader/projection);
1868/// per Avro 1.11.1 schema resolution these fields are ignored.
1869///
1870/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
1871#[derive(Debug)]
1872enum Skipper {
1873    Null,
1874    Boolean,
1875    Int32,
1876    Int64,
1877    Float32,
1878    Float64,
1879    Bytes,
1880    String,
1881    Date32,
1882    TimeMillis,
1883    TimeMicros,
1884    TimestampMillis,
1885    TimestampMicros,
1886    Fixed(usize),
1887    Decimal(Option<usize>),
1888    UuidString,
1889    Enum,
1890    DurationFixed12,
1891    List(Box<Skipper>),
1892    Map(Box<Skipper>),
1893    Struct(Vec<Skipper>),
1894    Union(Vec<Skipper>),
1895    Nullable(Nullability, Box<Skipper>),
1896}
1897
1898impl Skipper {
1899    fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
1900        let mut base = match dt.codec() {
1901            Codec::Null => Self::Null,
1902            Codec::Boolean => Self::Boolean,
1903            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1904            Codec::Int64 => Self::Int64,
1905            Codec::TimeMicros => Self::TimeMicros,
1906            Codec::TimestampMillis(_) => Self::TimestampMillis,
1907            Codec::TimestampMicros(_) => Self::TimestampMicros,
1908            Codec::Float32 => Self::Float32,
1909            Codec::Float64 => Self::Float64,
1910            Codec::Binary => Self::Bytes,
1911            Codec::Utf8 | Codec::Utf8View => Self::String,
1912            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
1913            Codec::Decimal(_, _, size) => Self::Decimal(*size),
1914            Codec::Uuid => Self::UuidString, // encoded as string
1915            Codec::Enum(_) => Self::Enum,
1916            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
1917            Codec::Struct(fields) => Self::Struct(
1918                fields
1919                    .iter()
1920                    .map(|f| Skipper::from_avro(f.data_type()))
1921                    .collect::<Result<_, _>>()?,
1922            ),
1923            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
1924            Codec::Interval => Self::DurationFixed12,
1925            Codec::Union(encodings, _, _) => {
1926                let max_addr = (i32::MAX as usize) + 1;
1927                if encodings.len() > max_addr {
1928                    return Err(ArrowError::SchemaError(format!(
1929                        "Writer union has {} branches, which exceeds the maximum addressable \
1930                         branches by an Avro int tag ({} + 1).",
1931                        encodings.len(),
1932                        i32::MAX
1933                    )));
1934                }
1935                Self::Union(
1936                    encodings
1937                        .iter()
1938                        .map(Skipper::from_avro)
1939                        .collect::<Result<_, _>>()?,
1940                )
1941            }
1942            _ => {
1943                return Err(ArrowError::NotYetImplemented(format!(
1944                    "Skipper not implemented for codec {:?}",
1945                    dt.codec()
1946                )));
1947            }
1948        };
1949        if let Some(n) = dt.nullability() {
1950            base = Self::Nullable(n, Box::new(base));
1951        }
1952        Ok(base)
1953    }
1954
1955    fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1956        match self {
1957            Self::Null => Ok(()),
1958            Self::Boolean => {
1959                buf.get_bool()?;
1960                Ok(())
1961            }
1962            Self::Int32 | Self::Date32 | Self::TimeMillis => {
1963                buf.get_int()?;
1964                Ok(())
1965            }
1966            Self::Int64 | Self::TimeMicros | Self::TimestampMillis | Self::TimestampMicros => {
1967                buf.get_long()?;
1968                Ok(())
1969            }
1970            Self::Float32 => {
1971                buf.get_float()?;
1972                Ok(())
1973            }
1974            Self::Float64 => {
1975                buf.get_double()?;
1976                Ok(())
1977            }
1978            Self::Bytes | Self::String | Self::UuidString => {
1979                buf.get_bytes()?;
1980                Ok(())
1981            }
1982            Self::Fixed(sz) => {
1983                buf.get_fixed(*sz)?;
1984                Ok(())
1985            }
1986            Self::Decimal(size) => {
1987                if let Some(s) = size {
1988                    buf.get_fixed(*s)
1989                } else {
1990                    buf.get_bytes()
1991                }?;
1992                Ok(())
1993            }
1994            Self::Enum => {
1995                buf.get_int()?;
1996                Ok(())
1997            }
1998            Self::DurationFixed12 => {
1999                buf.get_fixed(12)?;
2000                Ok(())
2001            }
2002            Self::List(item) => {
2003                skip_blocks(buf, |c| item.skip(c))?;
2004                Ok(())
2005            }
2006            Self::Map(value) => {
2007                skip_blocks(buf, |c| {
2008                    c.get_bytes()?; // key
2009                    value.skip(c)
2010                })?;
2011                Ok(())
2012            }
2013            Self::Struct(fields) => {
2014                for f in fields.iter_mut() {
2015                    f.skip(buf)?
2016                }
2017                Ok(())
2018            }
2019            Self::Union(encodings) => {
2020                // Union tag must be ZigZag-decoded
2021                let raw = buf.get_long()?;
2022                if raw < 0 {
2023                    return Err(ArrowError::ParseError(format!(
2024                        "Negative union branch index {raw}"
2025                    )));
2026                }
2027                let idx: usize = usize::try_from(raw).map_err(|_| {
2028                    ArrowError::ParseError(format!(
2029                        "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2030                        (usize::BITS as usize)
2031                    ))
2032                })?;
2033                let Some(encoding) = encodings.get_mut(idx) else {
2034                    return Err(ArrowError::ParseError(format!(
2035                        "Union branch index {idx} out of range for skipper ({} branches)",
2036                        encodings.len()
2037                    )));
2038                };
2039                encoding.skip(buf)
2040            }
2041            Self::Nullable(order, inner) => {
2042                let branch = buf.read_vlq()?;
2043                let is_not_null = match *order {
2044                    Nullability::NullFirst => branch != 0,
2045                    Nullability::NullSecond => branch == 0,
2046                };
2047                if is_not_null {
2048                    inner.skip(buf)?;
2049                }
2050                Ok(())
2051            }
2052        }
2053    }
2054}
2055
2056#[cfg(test)]
2057mod tests {
2058    use super::*;
2059    use crate::codec::AvroField;
2060    use crate::schema::{PrimitiveType, Schema, TypeName};
2061    use arrow_array::cast::AsArray;
2062    use indexmap::IndexMap;
2063
2064    fn encode_avro_int(value: i32) -> Vec<u8> {
2065        let mut buf = Vec::new();
2066        let mut v = (value << 1) ^ (value >> 31);
2067        while v & !0x7F != 0 {
2068            buf.push(((v & 0x7F) | 0x80) as u8);
2069            v >>= 7;
2070        }
2071        buf.push(v as u8);
2072        buf
2073    }
2074
2075    fn encode_avro_long(value: i64) -> Vec<u8> {
2076        let mut buf = Vec::new();
2077        let mut v = (value << 1) ^ (value >> 63);
2078        while v & !0x7F != 0 {
2079            buf.push(((v & 0x7F) | 0x80) as u8);
2080            v >>= 7;
2081        }
2082        buf.push(v as u8);
2083        buf
2084    }
2085
2086    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2087        let mut buf = encode_avro_long(bytes.len() as i64);
2088        buf.extend_from_slice(bytes);
2089        buf
2090    }
2091
2092    fn avro_from_codec(codec: Codec) -> AvroDataType {
2093        AvroDataType::new(codec, Default::default(), None)
2094    }
2095
2096    fn decoder_for_promotion(
2097        writer: PrimitiveType,
2098        reader: PrimitiveType,
2099        use_utf8view: bool,
2100    ) -> Decoder {
2101        let ws = Schema::TypeName(TypeName::Primitive(writer));
2102        let rs = Schema::TypeName(TypeName::Primitive(reader));
2103        let field =
2104            AvroField::resolve_from_writer_and_reader(&ws, &rs, use_utf8view, false).unwrap();
2105        Decoder::try_new(field.data_type()).unwrap()
2106    }
2107
2108    #[test]
2109    fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2110        let ws = Schema::Union(vec![
2111            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2112            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2113        ]);
2114        let rs = Schema::Union(vec![
2115            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2116            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2117        ]);
2118        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2119        let mut dec = Decoder::try_new(field.data_type()).unwrap();
2120        let mut rec1 = encode_avro_long(0);
2121        rec1.extend(encode_avro_int(7));
2122        let mut cur1 = AvroCursor::new(&rec1);
2123        dec.decode(&mut cur1).unwrap();
2124        let mut rec2 = encode_avro_long(1);
2125        rec2.extend(encode_avro_bytes("abc".as_bytes()));
2126        let mut cur2 = AvroCursor::new(&rec2);
2127        dec.decode(&mut cur2).unwrap();
2128        let arr = dec.flush(None).unwrap();
2129        let ua = arr
2130            .as_any()
2131            .downcast_ref::<UnionArray>()
2132            .expect("dense union output");
2133        assert_eq!(
2134            ua.type_id(0),
2135            1,
2136            "first value must select reader 'long' branch"
2137        );
2138        assert_eq!(ua.value_offset(0), 0);
2139        assert_eq!(
2140            ua.type_id(1),
2141            0,
2142            "second value must select reader 'string' branch"
2143        );
2144        assert_eq!(ua.value_offset(1), 0);
2145        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2146        assert_eq!(long_child.len(), 1);
2147        assert_eq!(long_child.value(0), 7);
2148        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2149        assert_eq!(str_child.len(), 1);
2150        assert_eq!(str_child.value(0), "abc");
2151    }
2152
2153    #[test]
2154    fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2155        let ws = Schema::Union(vec![
2156            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2157            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2158        ]);
2159        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2160        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2161        let mut dec = Decoder::try_new(field.data_type()).unwrap();
2162        let mut data = encode_avro_long(0);
2163        data.extend(encode_avro_int(5));
2164        let mut cur = AvroCursor::new(&data);
2165        dec.decode(&mut cur).unwrap();
2166        let arr = dec.flush(None).unwrap();
2167        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2168        assert_eq!(out.len(), 1);
2169        assert_eq!(out.value(0), 5);
2170    }
2171
2172    #[test]
2173    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2174        let ws = Schema::Union(vec![
2175            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2176            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2177        ]);
2178        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2179        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2180        let mut dec = Decoder::try_new(field.data_type()).unwrap();
2181        let mut data = encode_avro_long(1);
2182        data.extend(encode_avro_bytes("z".as_bytes()));
2183        let mut cur = AvroCursor::new(&data);
2184        let res = dec.decode(&mut cur);
2185        assert!(
2186            res.is_err(),
2187            "expected error when writer union branch does not resolve to reader non-union type"
2188        );
2189    }
2190
2191    #[test]
2192    fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2193        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2194        let rs = Schema::Union(vec![
2195            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2196            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2197        ]);
2198        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2199        let mut dec = Decoder::try_new(field.data_type()).unwrap();
2200        let data = encode_avro_int(6);
2201        let mut cur = AvroCursor::new(&data);
2202        dec.decode(&mut cur).unwrap();
2203        let arr = dec.flush(None).unwrap();
2204        let ua = arr
2205            .as_any()
2206            .downcast_ref::<UnionArray>()
2207            .expect("dense union output");
2208        assert_eq!(ua.len(), 1);
2209        assert_eq!(
2210            ua.type_id(0),
2211            1,
2212            "must resolve to reader 'long' branch (type_id 1)"
2213        );
2214        assert_eq!(ua.value_offset(0), 0);
2215        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2216        assert_eq!(long_child.len(), 1);
2217        assert_eq!(long_child.value(0), 6);
2218        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2219        assert_eq!(str_child.len(), 0, "string branch must be empty");
2220    }
2221
2222    #[test]
2223    fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2224        let ws = Schema::Union(vec![
2225            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2226            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2227        ]);
2228        let rs = Schema::Union(vec![
2229            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2230            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2231        ]);
2232        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2233        let mut dec = Decoder::try_new(field.data_type()).unwrap();
2234        let mut data = encode_avro_long(1);
2235        data.push(1);
2236        let mut cur = AvroCursor::new(&data);
2237        let res = dec.decode(&mut cur);
2238        assert!(
2239            res.is_err(),
2240            "expected error for unmapped writer 'boolean' branch"
2241        );
2242    }
2243
2244    #[test]
2245    fn test_schema_resolution_promotion_int_to_long() {
2246        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2247        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2248        for v in [0, 1, -2, 123456] {
2249            let data = encode_avro_int(v);
2250            let mut cur = AvroCursor::new(&data);
2251            dec.decode(&mut cur).unwrap();
2252        }
2253        let arr = dec.flush(None).unwrap();
2254        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2255        assert_eq!(a.value(0), 0);
2256        assert_eq!(a.value(1), 1);
2257        assert_eq!(a.value(2), -2);
2258        assert_eq!(a.value(3), 123456);
2259    }
2260
2261    #[test]
2262    fn test_schema_resolution_promotion_int_to_float() {
2263        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2264        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2265        for v in [0, 42, -7] {
2266            let data = encode_avro_int(v);
2267            let mut cur = AvroCursor::new(&data);
2268            dec.decode(&mut cur).unwrap();
2269        }
2270        let arr = dec.flush(None).unwrap();
2271        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2272        assert_eq!(a.value(0), 0.0);
2273        assert_eq!(a.value(1), 42.0);
2274        assert_eq!(a.value(2), -7.0);
2275    }
2276
2277    #[test]
2278    fn test_schema_resolution_promotion_int_to_double() {
2279        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2280        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2281        for v in [1, -1, 10_000] {
2282            let data = encode_avro_int(v);
2283            let mut cur = AvroCursor::new(&data);
2284            dec.decode(&mut cur).unwrap();
2285        }
2286        let arr = dec.flush(None).unwrap();
2287        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2288        assert_eq!(a.value(0), 1.0);
2289        assert_eq!(a.value(1), -1.0);
2290        assert_eq!(a.value(2), 10_000.0);
2291    }
2292
2293    #[test]
2294    fn test_schema_resolution_promotion_long_to_float() {
2295        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2296        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2297        for v in [0_i64, 1_000_000_i64, -123_i64] {
2298            let data = encode_avro_long(v);
2299            let mut cur = AvroCursor::new(&data);
2300            dec.decode(&mut cur).unwrap();
2301        }
2302        let arr = dec.flush(None).unwrap();
2303        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2304        assert_eq!(a.value(0), 0.0);
2305        assert_eq!(a.value(1), 1_000_000.0);
2306        assert_eq!(a.value(2), -123.0);
2307    }
2308
2309    #[test]
2310    fn test_schema_resolution_promotion_long_to_double() {
2311        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2312        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2313        for v in [2_i64, -2_i64, 9_223_372_i64] {
2314            let data = encode_avro_long(v);
2315            let mut cur = AvroCursor::new(&data);
2316            dec.decode(&mut cur).unwrap();
2317        }
2318        let arr = dec.flush(None).unwrap();
2319        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2320        assert_eq!(a.value(0), 2.0);
2321        assert_eq!(a.value(1), -2.0);
2322        assert_eq!(a.value(2), 9_223_372.0);
2323    }
2324
2325    #[test]
2326    fn test_schema_resolution_promotion_float_to_double() {
2327        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2328        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2329        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2330            let data = v.to_le_bytes().to_vec();
2331            let mut cur = AvroCursor::new(&data);
2332            dec.decode(&mut cur).unwrap();
2333        }
2334        let arr = dec.flush(None).unwrap();
2335        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2336        assert_eq!(a.value(0), 0.5_f64);
2337        assert_eq!(a.value(1), -3.25_f64);
2338        assert_eq!(a.value(2), 1.0e6_f64);
2339    }
2340
2341    #[test]
2342    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2343        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2344        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2345        for s in ["hello", "world", "héllo"] {
2346            let data = encode_avro_bytes(s.as_bytes());
2347            let mut cur = AvroCursor::new(&data);
2348            dec.decode(&mut cur).unwrap();
2349        }
2350        let arr = dec.flush(None).unwrap();
2351        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2352        assert_eq!(a.value(0), "hello");
2353        assert_eq!(a.value(1), "world");
2354        assert_eq!(a.value(2), "héllo");
2355    }
2356
2357    #[test]
2358    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2359        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2360        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2361        let data = encode_avro_bytes("abc".as_bytes());
2362        let mut cur = AvroCursor::new(&data);
2363        dec.decode(&mut cur).unwrap();
2364        let arr = dec.flush(None).unwrap();
2365        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2366        assert_eq!(a.value(0), "abc");
2367    }
2368
2369    #[test]
2370    fn test_schema_resolution_promotion_string_to_bytes() {
2371        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2372        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2373        for s in ["", "abc", "data"] {
2374            let data = encode_avro_bytes(s.as_bytes());
2375            let mut cur = AvroCursor::new(&data);
2376            dec.decode(&mut cur).unwrap();
2377        }
2378        let arr = dec.flush(None).unwrap();
2379        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2380        assert_eq!(a.value(0), b"");
2381        assert_eq!(a.value(1), b"abc");
2382        assert_eq!(a.value(2), "data".as_bytes());
2383    }
2384
2385    #[test]
2386    fn test_schema_resolution_no_promotion_passthrough_int() {
2387        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2388        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2389        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2390        let mut dec = Decoder::try_new(field.data_type()).unwrap();
2391        assert!(matches!(dec, Decoder::Int32(_)));
2392        for v in [7, -9] {
2393            let data = encode_avro_int(v);
2394            let mut cur = AvroCursor::new(&data);
2395            dec.decode(&mut cur).unwrap();
2396        }
2397        let arr = dec.flush(None).unwrap();
2398        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2399        assert_eq!(a.value(0), 7);
2400        assert_eq!(a.value(1), -9);
2401    }
2402
2403    #[test]
2404    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
2405        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2406        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
2407        let res = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false);
2408        assert!(res.is_err(), "expected error for illegal promotion");
2409    }
2410
2411    #[test]
2412    fn test_map_decoding_one_entry() {
2413        let value_type = avro_from_codec(Codec::Utf8);
2414        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2415        let mut decoder = Decoder::try_new(&map_type).unwrap();
2416        // Encode a single map with one entry: {"hello": "world"}
2417        let mut data = Vec::new();
2418        data.extend_from_slice(&encode_avro_long(1));
2419        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
2420        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
2421        data.extend_from_slice(&encode_avro_long(0));
2422        let mut cursor = AvroCursor::new(&data);
2423        decoder.decode(&mut cursor).unwrap();
2424        let array = decoder.flush(None).unwrap();
2425        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2426        assert_eq!(map_arr.len(), 1); // one map
2427        assert_eq!(map_arr.value_length(0), 1);
2428        let entries = map_arr.value(0);
2429        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
2430        assert_eq!(struct_entries.len(), 1);
2431        let key_arr = struct_entries
2432            .column_by_name("key")
2433            .unwrap()
2434            .as_any()
2435            .downcast_ref::<StringArray>()
2436            .unwrap();
2437        let val_arr = struct_entries
2438            .column_by_name("value")
2439            .unwrap()
2440            .as_any()
2441            .downcast_ref::<StringArray>()
2442            .unwrap();
2443        assert_eq!(key_arr.value(0), "hello");
2444        assert_eq!(val_arr.value(0), "world");
2445    }
2446
2447    #[test]
2448    fn test_map_decoding_empty() {
2449        let value_type = avro_from_codec(Codec::Utf8);
2450        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2451        let mut decoder = Decoder::try_new(&map_type).unwrap();
2452        let data = encode_avro_long(0);
2453        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2454        let array = decoder.flush(None).unwrap();
2455        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2456        assert_eq!(map_arr.len(), 1);
2457        assert_eq!(map_arr.value_length(0), 0);
2458    }
2459
2460    #[test]
2461    fn test_fixed_decoding() {
2462        let avro_type = avro_from_codec(Codec::Fixed(3));
2463        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2464
2465        let data1 = [1u8, 2, 3];
2466        let mut cursor1 = AvroCursor::new(&data1);
2467        decoder
2468            .decode(&mut cursor1)
2469            .expect("Failed to decode data1");
2470        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
2471        let data2 = [4u8, 5, 6];
2472        let mut cursor2 = AvroCursor::new(&data2);
2473        decoder
2474            .decode(&mut cursor2)
2475            .expect("Failed to decode data2");
2476        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
2477        let array = decoder.flush(None).expect("Failed to flush decoder");
2478        assert_eq!(array.len(), 2, "Array should contain two items");
2479        let fixed_size_binary_array = array
2480            .as_any()
2481            .downcast_ref::<FixedSizeBinaryArray>()
2482            .expect("Failed to downcast to FixedSizeBinaryArray");
2483        assert_eq!(
2484            fixed_size_binary_array.value_length(),
2485            3,
2486            "Fixed size of binary values should be 3"
2487        );
2488        assert_eq!(
2489            fixed_size_binary_array.value(0),
2490            &[1, 2, 3],
2491            "First item mismatch"
2492        );
2493        assert_eq!(
2494            fixed_size_binary_array.value(1),
2495            &[4, 5, 6],
2496            "Second item mismatch"
2497        );
2498    }
2499
2500    #[test]
2501    fn test_fixed_decoding_empty() {
2502        let avro_type = avro_from_codec(Codec::Fixed(5));
2503        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2504
2505        let array = decoder
2506            .flush(None)
2507            .expect("Failed to flush decoder for empty input");
2508
2509        assert_eq!(array.len(), 0, "Array should be empty");
2510        let fixed_size_binary_array = array
2511            .as_any()
2512            .downcast_ref::<FixedSizeBinaryArray>()
2513            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
2514
2515        assert_eq!(
2516            fixed_size_binary_array.value_length(),
2517            5,
2518            "Fixed size of binary values should be 5 as per type"
2519        );
2520    }
2521
2522    #[test]
2523    fn test_uuid_decoding() {
2524        let avro_type = avro_from_codec(Codec::Uuid);
2525        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2526        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
2527        let data = encode_avro_bytes(uuid_str.as_bytes());
2528        let mut cursor = AvroCursor::new(&data);
2529        decoder.decode(&mut cursor).expect("Failed to decode data");
2530        assert_eq!(
2531            cursor.position(),
2532            data.len(),
2533            "Cursor should advance by varint size + data size"
2534        );
2535        let array = decoder.flush(None).expect("Failed to flush decoder");
2536        let fixed_size_binary_array = array
2537            .as_any()
2538            .downcast_ref::<FixedSizeBinaryArray>()
2539            .expect("Array should be a FixedSizeBinaryArray");
2540        assert_eq!(fixed_size_binary_array.len(), 1);
2541        assert_eq!(fixed_size_binary_array.value_length(), 16);
2542        let expected_bytes = [
2543            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
2544            0x6b, 0xf6,
2545        ];
2546        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
2547    }
2548
2549    #[test]
2550    fn test_array_decoding() {
2551        let item_dt = avro_from_codec(Codec::Int32);
2552        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2553        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2554        let mut row1 = Vec::new();
2555        row1.extend_from_slice(&encode_avro_long(2));
2556        row1.extend_from_slice(&encode_avro_int(10));
2557        row1.extend_from_slice(&encode_avro_int(20));
2558        row1.extend_from_slice(&encode_avro_long(0));
2559        let row2 = encode_avro_long(0);
2560        let mut cursor = AvroCursor::new(&row1);
2561        decoder.decode(&mut cursor).unwrap();
2562        let mut cursor2 = AvroCursor::new(&row2);
2563        decoder.decode(&mut cursor2).unwrap();
2564        let array = decoder.flush(None).unwrap();
2565        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2566        assert_eq!(list_arr.len(), 2);
2567        let offsets = list_arr.value_offsets();
2568        assert_eq!(offsets, &[0, 2, 2]);
2569        let values = list_arr.values();
2570        let int_arr = values.as_primitive::<Int32Type>();
2571        assert_eq!(int_arr.len(), 2);
2572        assert_eq!(int_arr.value(0), 10);
2573        assert_eq!(int_arr.value(1), 20);
2574    }
2575
2576    #[test]
2577    fn test_array_decoding_with_negative_block_count() {
2578        let item_dt = avro_from_codec(Codec::Int32);
2579        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2580        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2581        let mut data = encode_avro_long(-3);
2582        data.extend_from_slice(&encode_avro_long(12));
2583        data.extend_from_slice(&encode_avro_int(1));
2584        data.extend_from_slice(&encode_avro_int(2));
2585        data.extend_from_slice(&encode_avro_int(3));
2586        data.extend_from_slice(&encode_avro_long(0));
2587        let mut cursor = AvroCursor::new(&data);
2588        decoder.decode(&mut cursor).unwrap();
2589        let array = decoder.flush(None).unwrap();
2590        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2591        assert_eq!(list_arr.len(), 1);
2592        assert_eq!(list_arr.value_length(0), 3);
2593        let values = list_arr.values().as_primitive::<Int32Type>();
2594        assert_eq!(values.len(), 3);
2595        assert_eq!(values.value(0), 1);
2596        assert_eq!(values.value(1), 2);
2597        assert_eq!(values.value(2), 3);
2598    }
2599
2600    #[test]
2601    fn test_nested_array_decoding() {
2602        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
2603        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
2604        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
2605        let mut buf = Vec::new();
2606        buf.extend(encode_avro_long(1));
2607        buf.extend(encode_avro_long(2));
2608        buf.extend(encode_avro_int(5));
2609        buf.extend(encode_avro_int(6));
2610        buf.extend(encode_avro_long(0));
2611        buf.extend(encode_avro_long(0));
2612        let mut cursor = AvroCursor::new(&buf);
2613        decoder.decode(&mut cursor).unwrap();
2614        let arr = decoder.flush(None).unwrap();
2615        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
2616        assert_eq!(outer.len(), 1);
2617        assert_eq!(outer.value_length(0), 1);
2618        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
2619        assert_eq!(inner.len(), 1);
2620        assert_eq!(inner.value_length(0), 2);
2621        let values = inner
2622            .values()
2623            .as_any()
2624            .downcast_ref::<Int32Array>()
2625            .unwrap();
2626        assert_eq!(values.values(), &[5, 6]);
2627    }
2628
2629    #[test]
2630    fn test_array_decoding_empty_array() {
2631        let value_type = avro_from_codec(Codec::Utf8);
2632        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
2633        let mut decoder = Decoder::try_new(&map_type).unwrap();
2634        let data = encode_avro_long(0);
2635        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2636        let array = decoder.flush(None).unwrap();
2637        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2638        assert_eq!(list_arr.len(), 1);
2639        assert_eq!(list_arr.value_length(0), 0);
2640    }
2641
2642    #[test]
2643    fn test_decimal_decoding_fixed256() {
2644        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
2645        let mut decoder = Decoder::try_new(&dt).unwrap();
2646        let row1 = [
2647            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2648            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2649            0x00, 0x00, 0x30, 0x39,
2650        ];
2651        let row2 = [
2652            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2653            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2654            0xFF, 0xFF, 0xFF, 0x85,
2655        ];
2656        let mut data = Vec::new();
2657        data.extend_from_slice(&row1);
2658        data.extend_from_slice(&row2);
2659        let mut cursor = AvroCursor::new(&data);
2660        decoder.decode(&mut cursor).unwrap();
2661        decoder.decode(&mut cursor).unwrap();
2662        let arr = decoder.flush(None).unwrap();
2663        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
2664        assert_eq!(dec.len(), 2);
2665        assert_eq!(dec.value_as_string(0), "123.45");
2666        assert_eq!(dec.value_as_string(1), "-1.23");
2667    }
2668
2669    #[test]
2670    fn test_decimal_decoding_fixed128() {
2671        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
2672        let mut decoder = Decoder::try_new(&dt).unwrap();
2673        let row1 = [
2674            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2675            0x30, 0x39,
2676        ];
2677        let row2 = [
2678            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2679            0xFF, 0x85,
2680        ];
2681        let mut data = Vec::new();
2682        data.extend_from_slice(&row1);
2683        data.extend_from_slice(&row2);
2684        let mut cursor = AvroCursor::new(&data);
2685        decoder.decode(&mut cursor).unwrap();
2686        decoder.decode(&mut cursor).unwrap();
2687        let arr = decoder.flush(None).unwrap();
2688        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2689        assert_eq!(dec.len(), 2);
2690        assert_eq!(dec.value_as_string(0), "123.45");
2691        assert_eq!(dec.value_as_string(1), "-1.23");
2692    }
2693
2694    #[test]
2695    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
2696        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
2697        let mut decoder = Decoder::try_new(&dt).unwrap();
2698        let row1 = [
2699            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2700            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2701            0x00, 0x00, 0x30, 0x39,
2702        ];
2703        let row2 = [
2704            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2705            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2706            0xFF, 0xFF, 0xFF, 0x85,
2707        ];
2708        let mut data = Vec::new();
2709        data.extend_from_slice(&row1);
2710        data.extend_from_slice(&row2);
2711        let mut cursor = AvroCursor::new(&data);
2712        decoder.decode(&mut cursor).unwrap();
2713        decoder.decode(&mut cursor).unwrap();
2714        let arr = decoder.flush(None).unwrap();
2715        #[cfg(feature = "small_decimals")]
2716        {
2717            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2718            assert_eq!(dec.len(), 2);
2719            assert_eq!(dec.value_as_string(0), "123.45");
2720            assert_eq!(dec.value_as_string(1), "-1.23");
2721        }
2722        #[cfg(not(feature = "small_decimals"))]
2723        {
2724            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2725            assert_eq!(dec.len(), 2);
2726            assert_eq!(dec.value_as_string(0), "123.45");
2727            assert_eq!(dec.value_as_string(1), "-1.23");
2728        }
2729    }
2730
2731    #[test]
2732    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
2733        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
2734        let mut decoder = Decoder::try_new(&dt).unwrap();
2735        let row1 = [
2736            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2737            0x30, 0x39,
2738        ];
2739        let row2 = [
2740            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2741            0xFF, 0x85,
2742        ];
2743        let mut data = Vec::new();
2744        data.extend_from_slice(&row1);
2745        data.extend_from_slice(&row2);
2746        let mut cursor = AvroCursor::new(&data);
2747        decoder.decode(&mut cursor).unwrap();
2748        decoder.decode(&mut cursor).unwrap();
2749
2750        let arr = decoder.flush(None).unwrap();
2751        #[cfg(feature = "small_decimals")]
2752        {
2753            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2754            assert_eq!(dec.len(), 2);
2755            assert_eq!(dec.value_as_string(0), "123.45");
2756            assert_eq!(dec.value_as_string(1), "-1.23");
2757        }
2758        #[cfg(not(feature = "small_decimals"))]
2759        {
2760            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2761            assert_eq!(dec.len(), 2);
2762            assert_eq!(dec.value_as_string(0), "123.45");
2763            assert_eq!(dec.value_as_string(1), "-1.23");
2764        }
2765    }
2766
2767    #[test]
2768    fn test_decimal_decoding_bytes_with_nulls() {
2769        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
2770        let inner = Decoder::try_new(&dt).unwrap();
2771        let mut decoder = Decoder::Nullable(
2772            Nullability::NullSecond,
2773            NullBufferBuilder::new(DEFAULT_CAPACITY),
2774            Box::new(inner),
2775            NullablePlan::ReadTag,
2776        );
2777        let mut data = Vec::new();
2778        data.extend_from_slice(&encode_avro_int(0));
2779        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
2780        data.extend_from_slice(&encode_avro_int(1));
2781        data.extend_from_slice(&encode_avro_int(0));
2782        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
2783        let mut cursor = AvroCursor::new(&data);
2784        decoder.decode(&mut cursor).unwrap();
2785        decoder.decode(&mut cursor).unwrap();
2786        decoder.decode(&mut cursor).unwrap();
2787        let arr = decoder.flush(None).unwrap();
2788        #[cfg(feature = "small_decimals")]
2789        {
2790            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2791            assert_eq!(dec_arr.len(), 3);
2792            assert!(dec_arr.is_valid(0));
2793            assert!(!dec_arr.is_valid(1));
2794            assert!(dec_arr.is_valid(2));
2795            assert_eq!(dec_arr.value_as_string(0), "123.4");
2796            assert_eq!(dec_arr.value_as_string(2), "-123.4");
2797        }
2798        #[cfg(not(feature = "small_decimals"))]
2799        {
2800            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2801            assert_eq!(dec_arr.len(), 3);
2802            assert!(dec_arr.is_valid(0));
2803            assert!(!dec_arr.is_valid(1));
2804            assert!(dec_arr.is_valid(2));
2805            assert_eq!(dec_arr.value_as_string(0), "123.4");
2806            assert_eq!(dec_arr.value_as_string(2), "-123.4");
2807        }
2808    }
2809
2810    #[test]
2811    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
2812        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
2813        let inner = Decoder::try_new(&dt).unwrap();
2814        let mut decoder = Decoder::Nullable(
2815            Nullability::NullSecond,
2816            NullBufferBuilder::new(DEFAULT_CAPACITY),
2817            Box::new(inner),
2818            NullablePlan::ReadTag,
2819        );
2820        let row1 = [
2821            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
2822            0xE2, 0x40,
2823        ];
2824        let row3 = [
2825            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
2826            0x1D, 0xC0,
2827        ];
2828        let mut data = Vec::new();
2829        data.extend_from_slice(&encode_avro_int(0));
2830        data.extend_from_slice(&row1);
2831        data.extend_from_slice(&encode_avro_int(1));
2832        data.extend_from_slice(&encode_avro_int(0));
2833        data.extend_from_slice(&row3);
2834        let mut cursor = AvroCursor::new(&data);
2835        decoder.decode(&mut cursor).unwrap();
2836        decoder.decode(&mut cursor).unwrap();
2837        decoder.decode(&mut cursor).unwrap();
2838        let arr = decoder.flush(None).unwrap();
2839        #[cfg(feature = "small_decimals")]
2840        {
2841            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2842            assert_eq!(dec_arr.len(), 3);
2843            assert!(dec_arr.is_valid(0));
2844            assert!(!dec_arr.is_valid(1));
2845            assert!(dec_arr.is_valid(2));
2846            assert_eq!(dec_arr.value_as_string(0), "1234.56");
2847            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
2848        }
2849        #[cfg(not(feature = "small_decimals"))]
2850        {
2851            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2852            assert_eq!(dec_arr.len(), 3);
2853            assert!(dec_arr.is_valid(0));
2854            assert!(!dec_arr.is_valid(1));
2855            assert!(dec_arr.is_valid(2));
2856            assert_eq!(dec_arr.value_as_string(0), "1234.56");
2857            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
2858        }
2859    }
2860
2861    #[test]
2862    fn test_enum_decoding() {
2863        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
2864        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
2865        let mut decoder = Decoder::try_new(&avro_type).unwrap();
2866        let mut data = Vec::new();
2867        data.extend_from_slice(&encode_avro_int(2));
2868        data.extend_from_slice(&encode_avro_int(0));
2869        data.extend_from_slice(&encode_avro_int(1));
2870        let mut cursor = AvroCursor::new(&data);
2871        decoder.decode(&mut cursor).unwrap();
2872        decoder.decode(&mut cursor).unwrap();
2873        decoder.decode(&mut cursor).unwrap();
2874        let array = decoder.flush(None).unwrap();
2875        let dict_array = array
2876            .as_any()
2877            .downcast_ref::<DictionaryArray<Int32Type>>()
2878            .unwrap();
2879        assert_eq!(dict_array.len(), 3);
2880        let values = dict_array
2881            .values()
2882            .as_any()
2883            .downcast_ref::<StringArray>()
2884            .unwrap();
2885        assert_eq!(values.value(0), "A");
2886        assert_eq!(values.value(1), "B");
2887        assert_eq!(values.value(2), "C");
2888        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
2889    }
2890
2891    #[test]
2892    fn test_enum_decoding_with_nulls() {
2893        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
2894        let enum_codec = Codec::Enum(symbols.clone());
2895        let avro_type =
2896            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
2897        let mut decoder = Decoder::try_new(&avro_type).unwrap();
2898        let mut data = Vec::new();
2899        data.extend_from_slice(&encode_avro_long(1));
2900        data.extend_from_slice(&encode_avro_int(1));
2901        data.extend_from_slice(&encode_avro_long(0));
2902        data.extend_from_slice(&encode_avro_long(1));
2903        data.extend_from_slice(&encode_avro_int(0));
2904        let mut cursor = AvroCursor::new(&data);
2905        decoder.decode(&mut cursor).unwrap();
2906        decoder.decode(&mut cursor).unwrap();
2907        decoder.decode(&mut cursor).unwrap();
2908        let array = decoder.flush(None).unwrap();
2909        let dict_array = array
2910            .as_any()
2911            .downcast_ref::<DictionaryArray<Int32Type>>()
2912            .unwrap();
2913        assert_eq!(dict_array.len(), 3);
2914        assert!(dict_array.is_valid(0));
2915        assert!(dict_array.is_null(1));
2916        assert!(dict_array.is_valid(2));
2917        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
2918        assert_eq!(dict_array.keys(), &expected_keys);
2919        let values = dict_array
2920            .values()
2921            .as_any()
2922            .downcast_ref::<StringArray>()
2923            .unwrap();
2924        assert_eq!(values.value(0), "X");
2925        assert_eq!(values.value(1), "Y");
2926    }
2927
2928    #[test]
2929    fn test_duration_decoding_with_nulls() {
2930        let duration_codec = Codec::Interval;
2931        let avro_type = AvroDataType::new(
2932            duration_codec,
2933            Default::default(),
2934            Some(Nullability::NullFirst),
2935        );
2936        let mut decoder = Decoder::try_new(&avro_type).unwrap();
2937        let mut data = Vec::new();
2938        // First value: 1 month, 2 days, 3 millis
2939        data.extend_from_slice(&encode_avro_long(1)); // not null
2940        let mut duration1 = Vec::new();
2941        duration1.extend_from_slice(&1u32.to_le_bytes());
2942        duration1.extend_from_slice(&2u32.to_le_bytes());
2943        duration1.extend_from_slice(&3u32.to_le_bytes());
2944        data.extend_from_slice(&duration1);
2945        // Second value: null
2946        data.extend_from_slice(&encode_avro_long(0)); // null
2947        data.extend_from_slice(&encode_avro_long(1)); // not null
2948        let mut duration2 = Vec::new();
2949        duration2.extend_from_slice(&4u32.to_le_bytes());
2950        duration2.extend_from_slice(&5u32.to_le_bytes());
2951        duration2.extend_from_slice(&6u32.to_le_bytes());
2952        data.extend_from_slice(&duration2);
2953        let mut cursor = AvroCursor::new(&data);
2954        decoder.decode(&mut cursor).unwrap();
2955        decoder.decode(&mut cursor).unwrap();
2956        decoder.decode(&mut cursor).unwrap();
2957        let array = decoder.flush(None).unwrap();
2958        let interval_array = array
2959            .as_any()
2960            .downcast_ref::<IntervalMonthDayNanoArray>()
2961            .unwrap();
2962        assert_eq!(interval_array.len(), 3);
2963        assert!(interval_array.is_valid(0));
2964        assert!(interval_array.is_null(1));
2965        assert!(interval_array.is_valid(2));
2966        let expected = IntervalMonthDayNanoArray::from(vec![
2967            Some(IntervalMonthDayNano {
2968                months: 1,
2969                days: 2,
2970                nanoseconds: 3_000_000,
2971            }),
2972            None,
2973            Some(IntervalMonthDayNano {
2974                months: 4,
2975                days: 5,
2976                nanoseconds: 6_000_000,
2977            }),
2978        ]);
2979        assert_eq!(interval_array, &expected);
2980    }
2981
2982    #[test]
2983    fn test_duration_decoding_empty() {
2984        let duration_codec = Codec::Interval;
2985        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
2986        let mut decoder = Decoder::try_new(&avro_type).unwrap();
2987        let array = decoder.flush(None).unwrap();
2988        assert_eq!(array.len(), 0);
2989    }
2990
2991    #[test]
2992    #[cfg(feature = "avro_custom_types")]
2993    fn test_duration_seconds_decoding() {
2994        let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
2995        let mut decoder = Decoder::try_new(&avro_type).unwrap();
2996        let mut data = Vec::new();
2997        // Three values: 0, -1, 2
2998        data.extend_from_slice(&encode_avro_long(0));
2999        data.extend_from_slice(&encode_avro_long(-1));
3000        data.extend_from_slice(&encode_avro_long(2));
3001        let mut cursor = AvroCursor::new(&data);
3002        decoder.decode(&mut cursor).unwrap();
3003        decoder.decode(&mut cursor).unwrap();
3004        decoder.decode(&mut cursor).unwrap();
3005        let array = decoder.flush(None).unwrap();
3006        let dur = array
3007            .as_any()
3008            .downcast_ref::<DurationSecondArray>()
3009            .unwrap();
3010        assert_eq!(dur.values(), &[0, -1, 2]);
3011    }
3012
3013    #[test]
3014    #[cfg(feature = "avro_custom_types")]
3015    fn test_duration_milliseconds_decoding() {
3016        let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3017        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3018        let mut data = Vec::new();
3019        for v in [1i64, 0, -2] {
3020            data.extend_from_slice(&encode_avro_long(v));
3021        }
3022        let mut cursor = AvroCursor::new(&data);
3023        for _ in 0..3 {
3024            decoder.decode(&mut cursor).unwrap();
3025        }
3026        let array = decoder.flush(None).unwrap();
3027        let dur = array
3028            .as_any()
3029            .downcast_ref::<DurationMillisecondArray>()
3030            .unwrap();
3031        assert_eq!(dur.values(), &[1, 0, -2]);
3032    }
3033
3034    #[test]
3035    #[cfg(feature = "avro_custom_types")]
3036    fn test_duration_microseconds_decoding() {
3037        let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3038        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3039        let mut data = Vec::new();
3040        for v in [5i64, -6, 7] {
3041            data.extend_from_slice(&encode_avro_long(v));
3042        }
3043        let mut cursor = AvroCursor::new(&data);
3044        for _ in 0..3 {
3045            decoder.decode(&mut cursor).unwrap();
3046        }
3047        let array = decoder.flush(None).unwrap();
3048        let dur = array
3049            .as_any()
3050            .downcast_ref::<DurationMicrosecondArray>()
3051            .unwrap();
3052        assert_eq!(dur.values(), &[5, -6, 7]);
3053    }
3054
3055    #[test]
3056    #[cfg(feature = "avro_custom_types")]
3057    fn test_duration_nanoseconds_decoding() {
3058        let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3059        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3060        let mut data = Vec::new();
3061        for v in [8i64, 9, -10] {
3062            data.extend_from_slice(&encode_avro_long(v));
3063        }
3064        let mut cursor = AvroCursor::new(&data);
3065        for _ in 0..3 {
3066            decoder.decode(&mut cursor).unwrap();
3067        }
3068        let array = decoder.flush(None).unwrap();
3069        let dur = array
3070            .as_any()
3071            .downcast_ref::<DurationNanosecondArray>()
3072            .unwrap();
3073        assert_eq!(dur.values(), &[8, 9, -10]);
3074    }
3075
3076    #[test]
3077    fn test_nullable_decode_error_bitmap_corruption() {
3078        // Nullable Int32 with ['T','null'] encoding (NullSecond)
3079        let avro_type = AvroDataType::new(
3080            Codec::Int32,
3081            Default::default(),
3082            Some(Nullability::NullSecond),
3083        );
3084        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3085
3086        // Row 1: union branch 1 (null)
3087        let mut row1 = Vec::new();
3088        row1.extend_from_slice(&encode_avro_int(1));
3089
3090        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
3091        let mut row2 = Vec::new();
3092        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
3093
3094        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
3095        let mut row3 = Vec::new();
3096        row3.extend_from_slice(&encode_avro_int(0)); // branch
3097        row3.extend_from_slice(&encode_avro_int(42)); // actual value
3098
3099        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3100        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
3101        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3102
3103        let array = decoder.flush(None).unwrap();
3104
3105        // Should contain 2 elements: row1 (null) and row3 (42)
3106        assert_eq!(array.len(), 2);
3107        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3108        assert!(int_array.is_null(0)); // row1 is null
3109        assert_eq!(int_array.value(1), 42); // row3 value is 42
3110    }
3111
3112    #[test]
3113    fn test_enum_mapping_reordered_symbols() {
3114        let reader_symbols: Arc<[String]> =
3115            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3116        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3117        let default_index: i32 = -1;
3118        let mut dec = Decoder::Enum(
3119            Vec::with_capacity(DEFAULT_CAPACITY),
3120            reader_symbols.clone(),
3121            Some(EnumResolution {
3122                mapping,
3123                default_index,
3124            }),
3125        );
3126        let mut data = Vec::new();
3127        data.extend_from_slice(&encode_avro_int(0));
3128        data.extend_from_slice(&encode_avro_int(1));
3129        data.extend_from_slice(&encode_avro_int(2));
3130        let mut cur = AvroCursor::new(&data);
3131        dec.decode(&mut cur).unwrap();
3132        dec.decode(&mut cur).unwrap();
3133        dec.decode(&mut cur).unwrap();
3134        let arr = dec.flush(None).unwrap();
3135        let dict = arr
3136            .as_any()
3137            .downcast_ref::<DictionaryArray<Int32Type>>()
3138            .unwrap();
3139        let expected_keys = Int32Array::from(vec![2, 0, 1]);
3140        assert_eq!(dict.keys(), &expected_keys);
3141        let values = dict
3142            .values()
3143            .as_any()
3144            .downcast_ref::<StringArray>()
3145            .unwrap();
3146        assert_eq!(values.value(0), "B");
3147        assert_eq!(values.value(1), "C");
3148        assert_eq!(values.value(2), "A");
3149    }
3150
3151    #[test]
3152    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3153        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3154        let default_index: i32 = 1;
3155        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3156        let mut dec = Decoder::Enum(
3157            Vec::with_capacity(DEFAULT_CAPACITY),
3158            reader_symbols.clone(),
3159            Some(EnumResolution {
3160                mapping,
3161                default_index,
3162            }),
3163        );
3164        let mut data = Vec::new();
3165        data.extend_from_slice(&encode_avro_int(0));
3166        data.extend_from_slice(&encode_avro_int(1));
3167        data.extend_from_slice(&encode_avro_int(99));
3168        let mut cur = AvroCursor::new(&data);
3169        dec.decode(&mut cur).unwrap();
3170        dec.decode(&mut cur).unwrap();
3171        dec.decode(&mut cur).unwrap();
3172        let arr = dec.flush(None).unwrap();
3173        let dict = arr
3174            .as_any()
3175            .downcast_ref::<DictionaryArray<Int32Type>>()
3176            .unwrap();
3177        let expected_keys = Int32Array::from(vec![0, 1, 1]);
3178        assert_eq!(dict.keys(), &expected_keys);
3179        let values = dict
3180            .values()
3181            .as_any()
3182            .downcast_ref::<StringArray>()
3183            .unwrap();
3184        assert_eq!(values.value(0), "A");
3185        assert_eq!(values.value(1), "B");
3186    }
3187
3188    #[test]
3189    fn test_enum_mapping_unknown_symbol_without_default_errors() {
3190        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3191        let default_index: i32 = -1; // indicates no default at type-level
3192        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3193        let mut dec = Decoder::Enum(
3194            Vec::with_capacity(DEFAULT_CAPACITY),
3195            reader_symbols,
3196            Some(EnumResolution {
3197                mapping,
3198                default_index,
3199            }),
3200        );
3201        let data = encode_avro_int(0);
3202        let mut cur = AvroCursor::new(&data);
3203        let err = dec
3204            .decode(&mut cur)
3205            .expect_err("expected decode error for unresolved enum without default");
3206        let msg = err.to_string();
3207        assert!(
3208            msg.contains("not resolvable") && msg.contains("no default"),
3209            "unexpected error message: {msg}"
3210        );
3211    }
3212
3213    fn make_record_resolved_decoder(
3214        reader_fields: &[(&str, DataType, bool)],
3215        writer_to_reader: Vec<Option<usize>>,
3216        skip_decoders: Vec<Option<Skipper>>,
3217    ) -> Decoder {
3218        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3219        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3220        for (name, dt, nullable) in reader_fields {
3221            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3222            let enc = match dt {
3223                DataType::Int32 => Decoder::Int32(Vec::new()),
3224                DataType::Int64 => Decoder::Int64(Vec::new()),
3225                DataType::Utf8 => {
3226                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3227                }
3228                other => panic!("Unsupported test reader field type: {other:?}"),
3229            };
3230            encodings.push(enc);
3231        }
3232        let fields: Fields = field_refs.into();
3233        Decoder::Record(
3234            fields,
3235            encodings,
3236            Some(Projector {
3237                writer_to_reader: Arc::from(writer_to_reader),
3238                skip_decoders,
3239                field_defaults: vec![None; reader_fields.len()],
3240                default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3241            }),
3242        )
3243    }
3244
3245    #[test]
3246    fn test_skip_writer_trailing_field_int32() {
3247        let mut dec = make_record_resolved_decoder(
3248            &[("id", arrow_schema::DataType::Int32, false)],
3249            vec![Some(0), None],
3250            vec![None, Some(super::Skipper::Int32)],
3251        );
3252        let mut data = Vec::new();
3253        data.extend_from_slice(&encode_avro_int(7));
3254        data.extend_from_slice(&encode_avro_int(999));
3255        let mut cur = AvroCursor::new(&data);
3256        dec.decode(&mut cur).unwrap();
3257        assert_eq!(cur.position(), data.len());
3258        let arr = dec.flush(None).unwrap();
3259        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
3260        assert_eq!(struct_arr.len(), 1);
3261        let id = struct_arr
3262            .column_by_name("id")
3263            .unwrap()
3264            .as_any()
3265            .downcast_ref::<Int32Array>()
3266            .unwrap();
3267        assert_eq!(id.value(0), 7);
3268    }
3269
3270    #[test]
3271    fn test_skip_writer_middle_field_string() {
3272        let mut dec = make_record_resolved_decoder(
3273            &[
3274                ("id", DataType::Int32, false),
3275                ("score", DataType::Int64, false),
3276            ],
3277            vec![Some(0), None, Some(1)],
3278            vec![None, Some(Skipper::String), None],
3279        );
3280        let mut data = Vec::new();
3281        data.extend_from_slice(&encode_avro_int(42));
3282        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
3283        data.extend_from_slice(&encode_avro_long(1000));
3284        let mut cur = AvroCursor::new(&data);
3285        dec.decode(&mut cur).unwrap();
3286        assert_eq!(cur.position(), data.len());
3287        let arr = dec.flush(None).unwrap();
3288        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3289        let id = s
3290            .column_by_name("id")
3291            .unwrap()
3292            .as_any()
3293            .downcast_ref::<Int32Array>()
3294            .unwrap();
3295        let score = s
3296            .column_by_name("score")
3297            .unwrap()
3298            .as_any()
3299            .downcast_ref::<Int64Array>()
3300            .unwrap();
3301        assert_eq!(id.value(0), 42);
3302        assert_eq!(score.value(0), 1000);
3303    }
3304
3305    #[test]
3306    fn test_skip_writer_array_with_negative_block_count_fast() {
3307        let mut dec = make_record_resolved_decoder(
3308            &[("id", DataType::Int32, false)],
3309            vec![None, Some(0)],
3310            vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
3311        );
3312        let mut array_payload = Vec::new();
3313        array_payload.extend_from_slice(&encode_avro_int(1));
3314        array_payload.extend_from_slice(&encode_avro_int(2));
3315        array_payload.extend_from_slice(&encode_avro_int(3));
3316        let mut data = Vec::new();
3317        data.extend_from_slice(&encode_avro_long(-3));
3318        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
3319        data.extend_from_slice(&array_payload);
3320        data.extend_from_slice(&encode_avro_long(0));
3321        data.extend_from_slice(&encode_avro_int(5));
3322        let mut cur = AvroCursor::new(&data);
3323        dec.decode(&mut cur).unwrap();
3324        assert_eq!(cur.position(), data.len());
3325        let arr = dec.flush(None).unwrap();
3326        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3327        let id = s
3328            .column_by_name("id")
3329            .unwrap()
3330            .as_any()
3331            .downcast_ref::<Int32Array>()
3332            .unwrap();
3333        assert_eq!(id.len(), 1);
3334        assert_eq!(id.value(0), 5);
3335    }
3336
3337    #[test]
3338    fn test_skip_writer_map_with_negative_block_count_fast() {
3339        let mut dec = make_record_resolved_decoder(
3340            &[("id", DataType::Int32, false)],
3341            vec![None, Some(0)],
3342            vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
3343        );
3344        let mut entries = Vec::new();
3345        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
3346        entries.extend_from_slice(&encode_avro_int(10));
3347        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
3348        entries.extend_from_slice(&encode_avro_int(20));
3349        let mut data = Vec::new();
3350        data.extend_from_slice(&encode_avro_long(-2));
3351        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
3352        data.extend_from_slice(&entries);
3353        data.extend_from_slice(&encode_avro_long(0));
3354        data.extend_from_slice(&encode_avro_int(123));
3355        let mut cur = AvroCursor::new(&data);
3356        dec.decode(&mut cur).unwrap();
3357        assert_eq!(cur.position(), data.len());
3358        let arr = dec.flush(None).unwrap();
3359        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3360        let id = s
3361            .column_by_name("id")
3362            .unwrap()
3363            .as_any()
3364            .downcast_ref::<Int32Array>()
3365            .unwrap();
3366        assert_eq!(id.len(), 1);
3367        assert_eq!(id.value(0), 123);
3368    }
3369
3370    #[test]
3371    fn test_skip_writer_nullable_field_union_nullfirst() {
3372        let mut dec = make_record_resolved_decoder(
3373            &[("id", DataType::Int32, false)],
3374            vec![None, Some(0)],
3375            vec![
3376                Some(super::Skipper::Nullable(
3377                    Nullability::NullFirst,
3378                    Box::new(super::Skipper::Int32),
3379                )),
3380                None,
3381            ],
3382        );
3383        let mut row1 = Vec::new();
3384        row1.extend_from_slice(&encode_avro_long(0));
3385        row1.extend_from_slice(&encode_avro_int(5));
3386        let mut row2 = Vec::new();
3387        row2.extend_from_slice(&encode_avro_long(1));
3388        row2.extend_from_slice(&encode_avro_int(123));
3389        row2.extend_from_slice(&encode_avro_int(7));
3390        let mut cur1 = AvroCursor::new(&row1);
3391        let mut cur2 = AvroCursor::new(&row2);
3392        dec.decode(&mut cur1).unwrap();
3393        dec.decode(&mut cur2).unwrap();
3394        assert_eq!(cur1.position(), row1.len());
3395        assert_eq!(cur2.position(), row2.len());
3396        let arr = dec.flush(None).unwrap();
3397        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3398        let id = s
3399            .column_by_name("id")
3400            .unwrap()
3401            .as_any()
3402            .downcast_ref::<Int32Array>()
3403            .unwrap();
3404        assert_eq!(id.len(), 2);
3405        assert_eq!(id.value(0), 5);
3406        assert_eq!(id.value(1), 7);
3407    }
3408
3409    fn make_dense_union_avro(
3410        children: Vec<(Codec, &'_ str, DataType)>,
3411        type_ids: Vec<i8>,
3412    ) -> AvroDataType {
3413        let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
3414        let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
3415        for (codec, name, dt) in children.into_iter() {
3416            avro_children.push(AvroDataType::new(codec, Default::default(), None));
3417            fields.push(arrow_schema::Field::new(name, dt, true));
3418        }
3419        let union_fields = UnionFields::new(type_ids, fields);
3420        let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
3421        AvroDataType::new(union_codec, Default::default(), None)
3422    }
3423
3424    #[test]
3425    fn test_union_dense_two_children_custom_type_ids() {
3426        let union_dt = make_dense_union_avro(
3427            vec![
3428                (Codec::Int32, "i", DataType::Int32),
3429                (Codec::Utf8, "s", DataType::Utf8),
3430            ],
3431            vec![2, 5],
3432        );
3433        let mut dec = Decoder::try_new(&union_dt).unwrap();
3434        let mut r1 = Vec::new();
3435        r1.extend_from_slice(&encode_avro_long(0));
3436        r1.extend_from_slice(&encode_avro_int(7));
3437        let mut r2 = Vec::new();
3438        r2.extend_from_slice(&encode_avro_long(1));
3439        r2.extend_from_slice(&encode_avro_bytes(b"x"));
3440        let mut r3 = Vec::new();
3441        r3.extend_from_slice(&encode_avro_long(0));
3442        r3.extend_from_slice(&encode_avro_int(-1));
3443        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3444        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3445        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3446        let array = dec.flush(None).unwrap();
3447        let ua = array
3448            .as_any()
3449            .downcast_ref::<UnionArray>()
3450            .expect("expected UnionArray");
3451        assert_eq!(ua.len(), 3);
3452        assert_eq!(ua.type_id(0), 2);
3453        assert_eq!(ua.type_id(1), 5);
3454        assert_eq!(ua.type_id(2), 2);
3455        assert_eq!(ua.value_offset(0), 0);
3456        assert_eq!(ua.value_offset(1), 0);
3457        assert_eq!(ua.value_offset(2), 1);
3458        let int_child = ua
3459            .child(2)
3460            .as_any()
3461            .downcast_ref::<Int32Array>()
3462            .expect("int child");
3463        assert_eq!(int_child.len(), 2);
3464        assert_eq!(int_child.value(0), 7);
3465        assert_eq!(int_child.value(1), -1);
3466        let str_child = ua
3467            .child(5)
3468            .as_any()
3469            .downcast_ref::<StringArray>()
3470            .expect("string child");
3471        assert_eq!(str_child.len(), 1);
3472        assert_eq!(str_child.value(0), "x");
3473    }
3474
3475    #[test]
3476    fn test_union_dense_with_null_and_string_children() {
3477        let union_dt = make_dense_union_avro(
3478            vec![
3479                (Codec::Null, "n", DataType::Null),
3480                (Codec::Utf8, "s", DataType::Utf8),
3481            ],
3482            vec![42, 7],
3483        );
3484        let mut dec = Decoder::try_new(&union_dt).unwrap();
3485        let r1 = encode_avro_long(0);
3486        let mut r2 = Vec::new();
3487        r2.extend_from_slice(&encode_avro_long(1));
3488        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
3489        let r3 = encode_avro_long(0);
3490        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3491        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3492        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3493        let array = dec.flush(None).unwrap();
3494        let ua = array
3495            .as_any()
3496            .downcast_ref::<UnionArray>()
3497            .expect("expected UnionArray");
3498        assert_eq!(ua.len(), 3);
3499        assert_eq!(ua.type_id(0), 42);
3500        assert_eq!(ua.type_id(1), 7);
3501        assert_eq!(ua.type_id(2), 42);
3502        assert_eq!(ua.value_offset(0), 0);
3503        assert_eq!(ua.value_offset(1), 0);
3504        assert_eq!(ua.value_offset(2), 1);
3505        let null_child = ua
3506            .child(42)
3507            .as_any()
3508            .downcast_ref::<NullArray>()
3509            .expect("null child");
3510        assert_eq!(null_child.len(), 2);
3511        let str_child = ua
3512            .child(7)
3513            .as_any()
3514            .downcast_ref::<StringArray>()
3515            .expect("string child");
3516        assert_eq!(str_child.len(), 1);
3517        assert_eq!(str_child.value(0), "abc");
3518    }
3519
3520    #[test]
3521    fn test_union_decode_negative_branch_index_errors() {
3522        let union_dt = make_dense_union_avro(
3523            vec![
3524                (Codec::Int32, "i", DataType::Int32),
3525                (Codec::Utf8, "s", DataType::Utf8),
3526            ],
3527            vec![0, 1],
3528        );
3529        let mut dec = Decoder::try_new(&union_dt).unwrap();
3530        let row = encode_avro_long(-1); // decodes back to -1
3531        let err = dec
3532            .decode(&mut AvroCursor::new(&row))
3533            .expect_err("expected error for negative branch index");
3534        let msg = err.to_string();
3535        assert!(
3536            msg.contains("Negative union branch index"),
3537            "unexpected error message: {msg}"
3538        );
3539    }
3540
3541    #[test]
3542    fn test_union_decode_out_of_range_branch_index_errors() {
3543        let union_dt = make_dense_union_avro(
3544            vec![
3545                (Codec::Int32, "i", DataType::Int32),
3546                (Codec::Utf8, "s", DataType::Utf8),
3547            ],
3548            vec![10, 11],
3549        );
3550        let mut dec = Decoder::try_new(&union_dt).unwrap();
3551        let row = encode_avro_long(2);
3552        let err = dec
3553            .decode(&mut AvroCursor::new(&row))
3554            .expect_err("expected error for out-of-range branch index");
3555        let msg = err.to_string();
3556        assert!(
3557            msg.contains("out of range"),
3558            "unexpected error message: {msg}"
3559        );
3560    }
3561
3562    #[test]
3563    fn test_union_sparse_mode_not_supported() {
3564        let children: Vec<AvroDataType> = vec![
3565            AvroDataType::new(Codec::Int32, Default::default(), None),
3566            AvroDataType::new(Codec::Utf8, Default::default(), None),
3567        ];
3568        let uf = UnionFields::new(
3569            vec![1, 3],
3570            vec![
3571                arrow_schema::Field::new("i", DataType::Int32, true),
3572                arrow_schema::Field::new("s", DataType::Utf8, true),
3573            ],
3574        );
3575        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
3576        let dt = AvroDataType::new(codec, Default::default(), None);
3577        let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
3578        let msg = err.to_string();
3579        assert!(
3580            msg.contains("Sparse Arrow unions are not yet supported"),
3581            "unexpected error message: {msg}"
3582        );
3583    }
3584
3585    fn make_record_decoder_with_projector_defaults(
3586        reader_fields: &[(&str, DataType, bool)],
3587        field_defaults: Vec<Option<AvroLiteral>>,
3588        default_injections: Vec<(usize, AvroLiteral)>,
3589        writer_to_reader_len: usize,
3590    ) -> Decoder {
3591        assert_eq!(
3592            field_defaults.len(),
3593            reader_fields.len(),
3594            "field_defaults must have one entry per reader field"
3595        );
3596        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3597        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3598        for (name, dt, nullable) in reader_fields {
3599            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3600            let enc = match dt {
3601                DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
3602                DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
3603                DataType::Utf8 => Decoder::String(
3604                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3605                    Vec::with_capacity(DEFAULT_CAPACITY),
3606                ),
3607                other => panic!("Unsupported test field type in helper: {other:?}"),
3608            };
3609            encodings.push(enc);
3610        }
3611        let fields: Fields = field_refs.into();
3612        let skip_decoders: Vec<Option<Skipper>> =
3613            (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
3614        let projector = Projector {
3615            writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
3616            skip_decoders,
3617            field_defaults,
3618            default_injections: Arc::from(default_injections),
3619        };
3620        Decoder::Record(fields, encodings, Some(projector))
3621    }
3622
3623    #[test]
3624    fn test_default_append_int32_and_int64_from_int_and_long() {
3625        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3626        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
3627        let arr = d_i32.flush(None).unwrap();
3628        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3629        assert_eq!(a.len(), 1);
3630        assert_eq!(a.value(0), 42);
3631        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
3632        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
3633        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
3634        let arr64 = d_i64.flush(None).unwrap();
3635        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
3636        assert_eq!(a64.len(), 2);
3637        assert_eq!(a64.value(0), 5);
3638        assert_eq!(a64.value(1), 7);
3639    }
3640
3641    #[test]
3642    fn test_default_append_floats_and_doubles() {
3643        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
3644        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
3645        let arr32 = d_f32.flush(None).unwrap();
3646        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
3647        assert_eq!(a.value(0), 1.5);
3648        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
3649        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
3650        let arr64 = d_f64.flush(None).unwrap();
3651        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
3652        assert_eq!(b.value(0), 2.25);
3653    }
3654
3655    #[test]
3656    fn test_default_append_string_and_bytes() {
3657        let mut d_str = Decoder::String(
3658            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3659            Vec::with_capacity(DEFAULT_CAPACITY),
3660        );
3661        d_str
3662            .append_default(&AvroLiteral::String("hi".into()))
3663            .unwrap();
3664        let s_arr = d_str.flush(None).unwrap();
3665        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
3666        assert_eq!(arr.value(0), "hi");
3667        let mut d_bytes = Decoder::Binary(
3668            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3669            Vec::with_capacity(DEFAULT_CAPACITY),
3670        );
3671        d_bytes
3672            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3673            .unwrap();
3674        let b_arr = d_bytes.flush(None).unwrap();
3675        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3676        assert_eq!(barr.value(0), &[1, 2, 3]);
3677        let mut d_str_err = Decoder::String(
3678            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3679            Vec::with_capacity(DEFAULT_CAPACITY),
3680        );
3681        let err = d_str_err
3682            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
3683            .unwrap_err();
3684        assert!(
3685            err.to_string()
3686                .contains("Default for string must be string"),
3687            "unexpected error: {err:?}"
3688        );
3689    }
3690
3691    #[test]
3692    fn test_default_append_nullable_int32_null_and_value() {
3693        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3694        let mut dec = Decoder::Nullable(
3695            Nullability::NullFirst,
3696            NullBufferBuilder::new(DEFAULT_CAPACITY),
3697            Box::new(inner),
3698            NullablePlan::ReadTag,
3699        );
3700        dec.append_default(&AvroLiteral::Null).unwrap();
3701        dec.append_default(&AvroLiteral::Int(11)).unwrap();
3702        let arr = dec.flush(None).unwrap();
3703        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3704        assert_eq!(a.len(), 2);
3705        assert!(a.is_null(0));
3706        assert_eq!(a.value(1), 11);
3707    }
3708
3709    #[test]
3710    fn test_default_append_array_of_ints() {
3711        let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3712        let mut d = Decoder::try_new(&list_dt).unwrap();
3713        let items = vec![
3714            AvroLiteral::Int(1),
3715            AvroLiteral::Int(2),
3716            AvroLiteral::Int(3),
3717        ];
3718        d.append_default(&AvroLiteral::Array(items)).unwrap();
3719        let arr = d.flush(None).unwrap();
3720        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
3721        assert_eq!(list.len(), 1);
3722        assert_eq!(list.value_length(0), 3);
3723        let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
3724        assert_eq!(vals.values(), &[1, 2, 3]);
3725    }
3726
3727    #[test]
3728    fn test_default_append_map_string_to_int() {
3729        let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
3730        let mut d = Decoder::try_new(&map_dt).unwrap();
3731        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
3732        m.insert("k1".to_string(), AvroLiteral::Int(10));
3733        m.insert("k2".to_string(), AvroLiteral::Int(20));
3734        d.append_default(&AvroLiteral::Map(m)).unwrap();
3735        let arr = d.flush(None).unwrap();
3736        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
3737        assert_eq!(map.len(), 1);
3738        assert_eq!(map.value_length(0), 2);
3739        let binding = map.value(0);
3740        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
3741        let k = entries
3742            .column_by_name("key")
3743            .unwrap()
3744            .as_any()
3745            .downcast_ref::<StringArray>()
3746            .unwrap();
3747        let v = entries
3748            .column_by_name("value")
3749            .unwrap()
3750            .as_any()
3751            .downcast_ref::<Int32Array>()
3752            .unwrap();
3753        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
3754        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
3755        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
3756        assert_eq!(vals, [10, 20].into_iter().collect());
3757    }
3758
3759    #[test]
3760    fn test_default_append_enum_by_symbol() {
3761        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
3762        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
3763        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
3764        let arr = d.flush(None).unwrap();
3765        let dict = arr
3766            .as_any()
3767            .downcast_ref::<DictionaryArray<Int32Type>>()
3768            .unwrap();
3769        assert_eq!(dict.len(), 1);
3770        let expected = Int32Array::from(vec![1]);
3771        assert_eq!(dict.keys(), &expected);
3772        let values = dict
3773            .values()
3774            .as_any()
3775            .downcast_ref::<StringArray>()
3776            .unwrap();
3777        assert_eq!(values.value(1), "B");
3778    }
3779
3780    #[test]
3781    fn test_default_append_uuid_and_type_error() {
3782        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
3783        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
3784        d.append_default(&AvroLiteral::String(uuid_str.into()))
3785            .unwrap();
3786        let arr_ref = d.flush(None).unwrap();
3787        let arr = arr_ref
3788            .as_any()
3789            .downcast_ref::<FixedSizeBinaryArray>()
3790            .unwrap();
3791        assert_eq!(arr.value_length(), 16);
3792        assert_eq!(arr.len(), 1);
3793        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
3794        let err = d2
3795            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
3796            .unwrap_err();
3797        assert!(
3798            err.to_string().contains("Default for uuid must be string"),
3799            "unexpected error: {err:?}"
3800        );
3801    }
3802
3803    #[test]
3804    fn test_default_append_fixed_and_length_mismatch() {
3805        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
3806        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
3807            .unwrap();
3808        let arr_ref = d.flush(None).unwrap();
3809        let arr = arr_ref
3810            .as_any()
3811            .downcast_ref::<FixedSizeBinaryArray>()
3812            .unwrap();
3813        assert_eq!(arr.value_length(), 4);
3814        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
3815        let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
3816        let err = d_err
3817            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3818            .unwrap_err();
3819        assert!(
3820            err.to_string().contains("Fixed default length"),
3821            "unexpected error: {err:?}"
3822        );
3823    }
3824
3825    #[test]
3826    fn test_default_append_duration_and_length_validation() {
3827        let dt = avro_from_codec(Codec::Interval);
3828        let mut d = Decoder::try_new(&dt).unwrap();
3829        let mut bytes = Vec::with_capacity(12);
3830        bytes.extend_from_slice(&1u32.to_le_bytes());
3831        bytes.extend_from_slice(&2u32.to_le_bytes());
3832        bytes.extend_from_slice(&3u32.to_le_bytes());
3833        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
3834        let arr_ref = d.flush(None).unwrap();
3835        let arr = arr_ref
3836            .as_any()
3837            .downcast_ref::<IntervalMonthDayNanoArray>()
3838            .unwrap();
3839        assert_eq!(arr.len(), 1);
3840        let v = arr.value(0);
3841        assert_eq!(v.months, 1);
3842        assert_eq!(v.days, 2);
3843        assert_eq!(v.nanoseconds, 3_000_000);
3844        let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
3845        let err = d_err
3846            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
3847            .unwrap_err();
3848        assert!(
3849            err.to_string()
3850                .contains("Duration default must be exactly 12 bytes"),
3851            "unexpected error: {err:?}"
3852        );
3853    }
3854
3855    #[test]
3856    fn test_default_append_decimal256_from_bytes() {
3857        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3858        let mut d = Decoder::try_new(&dt).unwrap();
3859        let pos: [u8; 32] = [
3860            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3861            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3862            0x00, 0x00, 0x30, 0x39,
3863        ];
3864        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
3865        let neg: [u8; 32] = [
3866            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3867            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3868            0xFF, 0xFF, 0xFF, 0x85,
3869        ];
3870        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
3871        let arr = d.flush(None).unwrap();
3872        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3873        assert_eq!(dec.len(), 2);
3874        assert_eq!(dec.value_as_string(0), "123.45");
3875        assert_eq!(dec.value_as_string(1), "-1.23");
3876    }
3877
3878    #[test]
3879    fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
3880        let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
3881        let mut rec = make_record_decoder_with_projector_defaults(
3882            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
3883            field_defaults,
3884            vec![],
3885            0,
3886        );
3887        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
3888        map.insert("a".to_string(), AvroLiteral::Int(7));
3889        rec.append_default(&AvroLiteral::Map(map)).unwrap();
3890        let arr = rec.flush(None).unwrap();
3891        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3892        let a = s
3893            .column_by_name("a")
3894            .unwrap()
3895            .as_any()
3896            .downcast_ref::<Int32Array>()
3897            .unwrap();
3898        let b = s
3899            .column_by_name("b")
3900            .unwrap()
3901            .as_any()
3902            .downcast_ref::<StringArray>()
3903            .unwrap();
3904        assert_eq!(a.value(0), 7);
3905        assert_eq!(b.value(0), "hi");
3906    }
3907
3908    #[test]
3909    fn test_record_append_default_null_uses_projector_field_defaults() {
3910        let field_defaults = vec![
3911            Some(AvroLiteral::Int(5)),
3912            Some(AvroLiteral::String("x".into())),
3913        ];
3914        let mut rec = make_record_decoder_with_projector_defaults(
3915            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
3916            field_defaults,
3917            vec![],
3918            0,
3919        );
3920        rec.append_default(&AvroLiteral::Null).unwrap();
3921        let arr = rec.flush(None).unwrap();
3922        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3923        let a = s
3924            .column_by_name("a")
3925            .unwrap()
3926            .as_any()
3927            .downcast_ref::<Int32Array>()
3928            .unwrap();
3929        let b = s
3930            .column_by_name("b")
3931            .unwrap()
3932            .as_any()
3933            .downcast_ref::<StringArray>()
3934            .unwrap();
3935        assert_eq!(a.value(0), 5);
3936        assert_eq!(b.value(0), "x");
3937    }
3938
3939    #[test]
3940    fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties(
3941    ) {
3942        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
3943        let mut field_refs: Vec<FieldRef> = Vec::new();
3944        let mut encoders: Vec<Decoder> = Vec::new();
3945        for (name, dt, nullable) in &fields {
3946            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3947        }
3948        let enc_a = Decoder::Nullable(
3949            Nullability::NullSecond,
3950            NullBufferBuilder::new(DEFAULT_CAPACITY),
3951            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
3952            NullablePlan::ReadTag,
3953        );
3954        let enc_b = Decoder::Nullable(
3955            Nullability::NullSecond,
3956            NullBufferBuilder::new(DEFAULT_CAPACITY),
3957            Box::new(Decoder::String(
3958                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3959                Vec::with_capacity(DEFAULT_CAPACITY),
3960            )),
3961            NullablePlan::ReadTag,
3962        );
3963        encoders.push(enc_a);
3964        encoders.push(enc_b);
3965        let projector = Projector {
3966            writer_to_reader: Arc::from(vec![]),
3967            skip_decoders: vec![],
3968            field_defaults: vec![None, None], // no defaults -> append_null
3969            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3970        };
3971        let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
3972        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
3973        map.insert("a".to_string(), AvroLiteral::Int(9));
3974        rec.append_default(&AvroLiteral::Map(map)).unwrap();
3975        let arr = rec.flush(None).unwrap();
3976        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3977        let a = s
3978            .column_by_name("a")
3979            .unwrap()
3980            .as_any()
3981            .downcast_ref::<Int32Array>()
3982            .unwrap();
3983        let b = s
3984            .column_by_name("b")
3985            .unwrap()
3986            .as_any()
3987            .downcast_ref::<StringArray>()
3988            .unwrap();
3989        assert!(a.is_valid(0));
3990        assert_eq!(a.value(0), 9);
3991        assert!(b.is_null(0));
3992    }
3993
3994    #[test]
3995    fn test_projector_default_injection_when_writer_lacks_fields() {
3996        let defaults = vec![None, None];
3997        let injections = vec![
3998            (0, AvroLiteral::Int(99)),
3999            (1, AvroLiteral::String("alice".into())),
4000        ];
4001        let mut rec = make_record_decoder_with_projector_defaults(
4002            &[
4003                ("id", DataType::Int32, false),
4004                ("name", DataType::Utf8, false),
4005            ],
4006            defaults,
4007            injections,
4008            0,
4009        );
4010        rec.decode(&mut AvroCursor::new(&[])).unwrap();
4011        let arr = rec.flush(None).unwrap();
4012        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4013        let id = s
4014            .column_by_name("id")
4015            .unwrap()
4016            .as_any()
4017            .downcast_ref::<Int32Array>()
4018            .unwrap();
4019        let name = s
4020            .column_by_name("name")
4021            .unwrap()
4022            .as_any()
4023            .downcast_ref::<StringArray>()
4024            .unwrap();
4025        assert_eq!(id.value(0), 99);
4026        assert_eq!(name.value(0), "alice");
4027    }
4028
4029    #[test]
4030    fn union_type_ids_are_not_child_indexes() {
4031        let encodings: Vec<AvroDataType> =
4032            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4033        let fields: UnionFields = [
4034            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4035            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4036        ]
4037        .into_iter()
4038        .collect();
4039        let dt = avro_from_codec(Codec::Union(
4040            encodings.into(),
4041            fields.clone(),
4042            UnionMode::Dense,
4043        ));
4044        let mut dec = Decoder::try_new(&dt).expect("decoder");
4045        let mut b1 = encode_avro_long(1);
4046        b1.extend(encode_avro_bytes("hi".as_bytes()));
4047        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4048        let mut b0 = encode_avro_long(0);
4049        b0.extend(encode_avro_int(5));
4050        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4051        let arr = dec.flush(None).expect("flush");
4052        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4053        assert_eq!(ua.len(), 2);
4054        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4055        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4056        assert_eq!(ua.value_offset(0), 0);
4057        assert_eq!(ua.value_offset(1), 0);
4058        let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4059        assert_eq!(utf8_child.len(), 1);
4060        assert_eq!(utf8_child.value(0), "hi");
4061        let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
4062        assert_eq!(int_child.len(), 1);
4063        assert_eq!(int_child.value(0), 5);
4064        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
4065        assert_eq!(type_ids, vec![42_i8, 7_i8]);
4066    }
4067}