arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Decoder for Arrow types.
19
20use crate::codec::{
21    AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
22    ResolvedUnion,
23};
24use crate::reader::cursor::AvroCursor;
25use crate::schema::Nullability;
26#[cfg(feature = "small_decimals")]
27use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
28use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
29use arrow_array::types::*;
30use arrow_array::*;
31use arrow_buffer::*;
32use arrow_schema::{
33    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField,
34    FieldRef, Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
35};
36#[cfg(feature = "small_decimals")]
37use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
38#[cfg(feature = "avro_custom_types")]
39use arrow_select::take::{TakeOptions, take};
40use std::cmp::Ordering;
41use std::sync::Arc;
42use strum_macros::AsRefStr;
43use uuid::Uuid;
44
45const DEFAULT_CAPACITY: usize = 1024;
46
47/// Runtime plan for decoding reader-side `["null", T]` types.
48#[derive(Clone, Copy, Debug)]
49enum NullablePlan {
50    /// Writer actually wrote a union (branch tag present).
51    ReadTag,
52    /// Writer wrote a single (non-union) value resolved to the non-null branch
53    /// of the reader union; do NOT read a branch tag, but apply any promotion.
54    FromSingle { promotion: Promotion },
55}
56
57/// Macro to decode a decimal payload for a given width and integer type.
58macro_rules! decode_decimal {
59    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
60        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
61        $builder.append_value(<$Int>::from_be_bytes(bytes));
62    }};
63}
64
65/// Macro to finish a decimal builder into an array with precision/scale and nulls.
66macro_rules! flush_decimal {
67    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
68        let (_, vals, _) = $builder.finish().into_parts();
69        let dec = <$ArrayTy>::try_new(vals, $nulls)?
70            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)
71            .map_err(|e| ArrowError::ParseError(e.to_string()))?;
72        Arc::new(dec) as ArrayRef
73    }};
74}
75
76/// Macro to append a default decimal value from two's-complement big-endian bytes
77/// into the corresponding decimal builder, with compile-time constructed error text.
78macro_rules! append_decimal_default {
79    ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
80        match $lit {
81            AvroLiteral::Bytes(b) => {
82                let ext = sign_cast_to::<$N>(b)?;
83                let val = <$Int>::from_be_bytes(ext);
84                $builder.append_value(val);
85                Ok(())
86            }
87            _ => Err(ArrowError::InvalidArgumentError(
88                concat!(
89                    "Default for ",
90                    $name,
91                    " must be bytes (two's-complement big-endian)"
92                )
93                .to_string(),
94            )),
95        }
96    }};
97}
98
99/// Decodes avro encoded data into [`RecordBatch`]
100#[derive(Debug)]
101pub(crate) struct RecordDecoder {
102    schema: SchemaRef,
103    fields: Vec<Decoder>,
104    projector: Option<Projector>,
105}
106
107impl RecordDecoder {
108    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
109    ///
110    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
111    ///
112    /// # Arguments
113    /// * `data_type` - The Avro data type to decode.
114    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
115    ///
116    /// # Errors
117    /// This function will return an error if the provided `data_type` is not a `Record`.
118    pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, ArrowError> {
119        match data_type.codec() {
120            Codec::Struct(reader_fields) => {
121                // Build Arrow schema fields and per-child decoders
122                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
123                let mut encodings = Vec::with_capacity(reader_fields.len());
124                for avro_field in reader_fields.iter() {
125                    arrow_fields.push(avro_field.field());
126                    encodings.push(Decoder::try_new(avro_field.data_type())?);
127                }
128                let projector = match data_type.resolution.as_ref() {
129                    Some(ResolutionInfo::Record(rec)) => {
130                        Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
131                    }
132                    _ => None,
133                };
134                Ok(Self {
135                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
136                    fields: encodings,
137                    projector,
138                })
139            }
140            other => Err(ArrowError::ParseError(format!(
141                "Expected record got {other:?}"
142            ))),
143        }
144    }
145
146    /// Returns the decoder's `SchemaRef`
147    pub(crate) fn schema(&self) -> &SchemaRef {
148        &self.schema
149    }
150
151    /// Decode `count` records from `buf`
152    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> {
153        let mut cursor = AvroCursor::new(buf);
154        match self.projector.as_mut() {
155            Some(proj) => {
156                for _ in 0..count {
157                    proj.project_record(&mut cursor, &mut self.fields)?;
158                }
159            }
160            None => {
161                for _ in 0..count {
162                    for field in &mut self.fields {
163                        field.decode(&mut cursor)?;
164                    }
165                }
166            }
167        }
168        Ok(cursor.position())
169    }
170
171    /// Flush the decoded records into a [`RecordBatch`]
172    pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
173        let arrays = self
174            .fields
175            .iter_mut()
176            .map(|x| x.flush(None))
177            .collect::<Result<Vec<_>, _>>()?;
178        RecordBatch::try_new(self.schema.clone(), arrays)
179    }
180}
181
182#[derive(Debug)]
183struct EnumResolution {
184    mapping: Arc<[i32]>,
185    default_index: i32,
186}
187
188#[derive(Debug, AsRefStr)]
189enum Decoder {
190    Null(usize),
191    Boolean(BooleanBufferBuilder),
192    Int32(Vec<i32>),
193    Int64(Vec<i64>),
194    #[cfg(feature = "avro_custom_types")]
195    DurationSecond(Vec<i64>),
196    #[cfg(feature = "avro_custom_types")]
197    DurationMillisecond(Vec<i64>),
198    #[cfg(feature = "avro_custom_types")]
199    DurationMicrosecond(Vec<i64>),
200    #[cfg(feature = "avro_custom_types")]
201    DurationNanosecond(Vec<i64>),
202    Float32(Vec<f32>),
203    Float64(Vec<f64>),
204    Date32(Vec<i32>),
205    TimeMillis(Vec<i32>),
206    TimeMicros(Vec<i64>),
207    TimestampMillis(bool, Vec<i64>),
208    TimestampMicros(bool, Vec<i64>),
209    Int32ToInt64(Vec<i64>),
210    Int32ToFloat32(Vec<f32>),
211    Int32ToFloat64(Vec<f64>),
212    Int64ToFloat32(Vec<f32>),
213    Int64ToFloat64(Vec<f64>),
214    Float32ToFloat64(Vec<f64>),
215    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
216    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
217    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
218    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
219    String(OffsetBufferBuilder<i32>, Vec<u8>),
220    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
221    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
222    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
223    Record(Fields, Vec<Decoder>, Option<Projector>),
224    Map(
225        FieldRef,
226        OffsetBufferBuilder<i32>,
227        OffsetBufferBuilder<i32>,
228        Vec<u8>,
229        Box<Decoder>,
230    ),
231    Fixed(i32, Vec<u8>),
232    Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
233    Duration(IntervalMonthDayNanoBuilder),
234    Uuid(Vec<u8>),
235    #[cfg(feature = "small_decimals")]
236    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
237    #[cfg(feature = "small_decimals")]
238    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
239    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
240    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
241    #[cfg(feature = "avro_custom_types")]
242    RunEndEncoded(u8, usize, Box<Decoder>),
243    Union(UnionDecoder),
244    Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
245}
246
247impl Decoder {
248    fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
249        if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
250            if info.writer_is_union && !info.reader_is_union {
251                let mut clone = data_type.clone();
252                clone.resolution = None; // Build target base decoder without Union resolution
253                let target = Box::new(Self::try_new_internal(&clone)?);
254                let decoder = Self::Union(
255                    UnionDecoderBuilder::new()
256                        .with_resolved_union(info.clone())
257                        .with_target(target)
258                        .build()?,
259                );
260                return Ok(decoder);
261            }
262        }
263        Self::try_new_internal(data_type)
264    }
265
266    fn try_new_internal(data_type: &AvroDataType) -> Result<Self, ArrowError> {
267        // Extract just the Promotion (if any) to simplify pattern matching
268        let promotion = match data_type.resolution.as_ref() {
269            Some(ResolutionInfo::Promotion(p)) => Some(p),
270            _ => None,
271        };
272        let decoder = match (data_type.codec(), promotion) {
273            (Codec::Int64, Some(Promotion::IntToLong)) => {
274                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
275            }
276            (Codec::Float32, Some(Promotion::IntToFloat)) => {
277                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
278            }
279            (Codec::Float64, Some(Promotion::IntToDouble)) => {
280                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
281            }
282            (Codec::Float32, Some(Promotion::LongToFloat)) => {
283                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
284            }
285            (Codec::Float64, Some(Promotion::LongToDouble)) => {
286                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
287            }
288            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
289                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
290            }
291            (Codec::Utf8, Some(Promotion::BytesToString))
292            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
293                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
294                Vec::with_capacity(DEFAULT_CAPACITY),
295            ),
296            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
297                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
298                Vec::with_capacity(DEFAULT_CAPACITY),
299            ),
300            (Codec::Null, _) => Self::Null(0),
301            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
302            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
303            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
304            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
305            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
306            (Codec::Binary, _) => Self::Binary(
307                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
308                Vec::with_capacity(DEFAULT_CAPACITY),
309            ),
310            (Codec::Utf8, _) => Self::String(
311                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
312                Vec::with_capacity(DEFAULT_CAPACITY),
313            ),
314            (Codec::Utf8View, _) => Self::StringView(
315                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
316                Vec::with_capacity(DEFAULT_CAPACITY),
317            ),
318            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
319            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
320            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
321            (Codec::TimestampMillis(is_utc), _) => {
322                Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
323            }
324            (Codec::TimestampMicros(is_utc), _) => {
325                Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
326            }
327            #[cfg(feature = "avro_custom_types")]
328            (Codec::DurationNanos, _) => {
329                Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
330            }
331            #[cfg(feature = "avro_custom_types")]
332            (Codec::DurationMicros, _) => {
333                Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
334            }
335            #[cfg(feature = "avro_custom_types")]
336            (Codec::DurationMillis, _) => {
337                Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
338            }
339            #[cfg(feature = "avro_custom_types")]
340            (Codec::DurationSeconds, _) => {
341                Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
342            }
343            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
344            (Codec::Decimal(precision, scale, size), _) => {
345                let p = *precision;
346                let s = *scale;
347                let prec = p as u8;
348                let scl = s.unwrap_or(0) as i8;
349                #[cfg(feature = "small_decimals")]
350                {
351                    if p <= DECIMAL32_MAX_PRECISION as usize {
352                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
353                            .with_precision_and_scale(prec, scl)?;
354                        Self::Decimal32(p, s, *size, builder)
355                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
356                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
357                            .with_precision_and_scale(prec, scl)?;
358                        Self::Decimal64(p, s, *size, builder)
359                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
360                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
361                            .with_precision_and_scale(prec, scl)?;
362                        Self::Decimal128(p, s, *size, builder)
363                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
364                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
365                            .with_precision_and_scale(prec, scl)?;
366                        Self::Decimal256(p, s, *size, builder)
367                    } else {
368                        return Err(ArrowError::ParseError(format!(
369                            "Decimal precision {p} exceeds maximum supported"
370                        )));
371                    }
372                }
373                #[cfg(not(feature = "small_decimals"))]
374                {
375                    if p <= DECIMAL128_MAX_PRECISION as usize {
376                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
377                            .with_precision_and_scale(prec, scl)?;
378                        Self::Decimal128(p, s, *size, builder)
379                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
380                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
381                            .with_precision_and_scale(prec, scl)?;
382                        Self::Decimal256(p, s, *size, builder)
383                    } else {
384                        return Err(ArrowError::ParseError(format!(
385                            "Decimal precision {p} exceeds maximum supported"
386                        )));
387                    }
388                }
389            }
390            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
391            (Codec::List(item), _) => {
392                let decoder = Self::try_new(item)?;
393                Self::Array(
394                    Arc::new(item.field_with_name("item")),
395                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
396                    Box::new(decoder),
397                )
398            }
399            (Codec::Enum(symbols), _) => {
400                let res = match data_type.resolution.as_ref() {
401                    Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
402                        mapping: mapping.mapping.clone(),
403                        default_index: mapping.default_index,
404                    }),
405                    _ => None,
406                };
407                Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
408            }
409            (Codec::Struct(fields), _) => {
410                let mut arrow_fields = Vec::with_capacity(fields.len());
411                let mut encodings = Vec::with_capacity(fields.len());
412                for avro_field in fields.iter() {
413                    let encoding = Self::try_new(avro_field.data_type())?;
414                    arrow_fields.push(avro_field.field());
415                    encodings.push(encoding);
416                }
417                let projector =
418                    if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
419                        Some(ProjectorBuilder::try_new(rec, fields).build()?)
420                    } else {
421                        None
422                    };
423                Self::Record(arrow_fields.into(), encodings, projector)
424            }
425            (Codec::Map(child), _) => {
426                let val_field = child.field_with_name("value");
427                let map_field = Arc::new(ArrowField::new(
428                    "entries",
429                    DataType::Struct(Fields::from(vec![
430                        ArrowField::new("key", DataType::Utf8, false),
431                        val_field,
432                    ])),
433                    false,
434                ));
435                let val_dec = Self::try_new(child)?;
436                Self::Map(
437                    map_field,
438                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
439                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
440                    Vec::with_capacity(DEFAULT_CAPACITY),
441                    Box::new(val_dec),
442                )
443            }
444            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
445            (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
446                let decoders = encodings
447                    .iter()
448                    .map(Self::try_new_internal)
449                    .collect::<Result<Vec<_>, _>>()?;
450                if fields.len() != decoders.len() {
451                    return Err(ArrowError::SchemaError(format!(
452                        "Union has {} fields but {} decoders",
453                        fields.len(),
454                        decoders.len()
455                    )));
456                }
457                // Proactive guard: if a user provides a union with more branches than
458                // a 32-bit Avro index can address, fail fast with a clear message.
459                let branch_count = decoders.len();
460                let max_addr = (i32::MAX as usize) + 1;
461                if branch_count > max_addr {
462                    return Err(ArrowError::SchemaError(format!(
463                        "Union has {branch_count} branches, which exceeds the maximum addressable \
464                         branches by an Avro int tag ({} + 1).",
465                        i32::MAX
466                    )));
467                }
468                let mut builder = UnionDecoderBuilder::new()
469                    .with_fields(fields.clone())
470                    .with_branches(decoders);
471                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
472                    if info.reader_is_union {
473                        builder = builder.with_resolved_union(info.clone());
474                    }
475                }
476                Self::Union(builder.build()?)
477            }
478            (Codec::Union(_, _, _), _) => {
479                return Err(ArrowError::NotYetImplemented(
480                    "Sparse Arrow unions are not yet supported".to_string(),
481                ));
482            }
483            #[cfg(feature = "avro_custom_types")]
484            (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
485                let inner = Self::try_new(values_dt)?;
486                let byte_width: u8 = match *width_bits_or_bytes {
487                    2 | 4 | 8 => *width_bits_or_bytes,
488                    16 => 2,
489                    32 => 4,
490                    64 => 8,
491                    other => {
492                        return Err(ArrowError::InvalidArgumentError(format!(
493                            "Unsupported run-end width {other} for RunEndEncoded; \
494                             expected 16/32/64 bits or 2/4/8 bytes"
495                        )));
496                    }
497                };
498                Self::RunEndEncoded(byte_width, 0, Box::new(inner))
499            }
500        };
501        Ok(match data_type.nullability() {
502            Some(nullability) => {
503                // Default to reading a union branch tag unless the resolution proves otherwise.
504                let mut plan = NullablePlan::ReadTag;
505                if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
506                    if !info.writer_is_union && info.reader_is_union {
507                        if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
508                            plan = NullablePlan::FromSingle { promotion: *promo };
509                        }
510                    }
511                }
512                Self::Nullable(
513                    nullability,
514                    NullBufferBuilder::new(DEFAULT_CAPACITY),
515                    Box::new(decoder),
516                    plan,
517                )
518            }
519            None => decoder,
520        })
521    }
522
523    /// Append a null record
524    fn append_null(&mut self) -> Result<(), ArrowError> {
525        match self {
526            Self::Null(count) => *count += 1,
527            Self::Boolean(b) => b.append(false),
528            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
529            Self::Int64(v)
530            | Self::Int32ToInt64(v)
531            | Self::TimeMicros(v)
532            | Self::TimestampMillis(_, v)
533            | Self::TimestampMicros(_, v) => v.push(0),
534            #[cfg(feature = "avro_custom_types")]
535            Self::DurationSecond(v)
536            | Self::DurationMillisecond(v)
537            | Self::DurationMicrosecond(v)
538            | Self::DurationNanosecond(v) => v.push(0),
539            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
540            Self::Float64(v)
541            | Self::Int32ToFloat64(v)
542            | Self::Int64ToFloat64(v)
543            | Self::Float32ToFloat64(v) => v.push(0.),
544            Self::Binary(offsets, _)
545            | Self::String(offsets, _)
546            | Self::StringView(offsets, _)
547            | Self::BytesToString(offsets, _)
548            | Self::StringToBytes(offsets, _) => {
549                offsets.push_length(0);
550            }
551            Self::Uuid(v) => {
552                v.extend([0; 16]);
553            }
554            Self::Array(_, offsets, _) => {
555                offsets.push_length(0);
556            }
557            Self::Record(_, e, _) => {
558                for encoding in e.iter_mut() {
559                    encoding.append_null()?;
560                }
561            }
562            Self::Map(_, _koff, moff, _, _) => {
563                moff.push_length(0);
564            }
565            Self::Fixed(sz, accum) => {
566                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
567            }
568            #[cfg(feature = "small_decimals")]
569            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
570            #[cfg(feature = "small_decimals")]
571            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
572            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
573            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
574            Self::Enum(indices, _, _) => indices.push(0),
575            Self::Duration(builder) => builder.append_null(),
576            #[cfg(feature = "avro_custom_types")]
577            Self::RunEndEncoded(_, len, inner) => {
578                *len += 1;
579                inner.append_null()?;
580            }
581            Self::Union(u) => u.append_null()?,
582            Self::Nullable(_, null_buffer, inner, _) => {
583                null_buffer.append(false);
584                inner.append_null()?;
585            }
586        }
587        Ok(())
588    }
589
590    /// Append a single default literal into the decoder's buffers
591    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
592        match self {
593            Self::Nullable(_, nb, inner, _) => {
594                if matches!(lit, AvroLiteral::Null) {
595                    nb.append(false);
596                    inner.append_null()
597                } else {
598                    nb.append(true);
599                    inner.append_default(lit)
600                }
601            }
602            Self::Null(count) => match lit {
603                AvroLiteral::Null => {
604                    *count += 1;
605                    Ok(())
606                }
607                _ => Err(ArrowError::InvalidArgumentError(
608                    "Non-null default for null type".to_string(),
609                )),
610            },
611            Self::Boolean(b) => match lit {
612                AvroLiteral::Boolean(v) => {
613                    b.append(*v);
614                    Ok(())
615                }
616                _ => Err(ArrowError::InvalidArgumentError(
617                    "Default for boolean must be boolean".to_string(),
618                )),
619            },
620            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
621                AvroLiteral::Int(i) => {
622                    v.push(*i);
623                    Ok(())
624                }
625                _ => Err(ArrowError::InvalidArgumentError(
626                    "Default for int32/date32/time-millis must be int".to_string(),
627                )),
628            },
629            #[cfg(feature = "avro_custom_types")]
630            Self::DurationSecond(v)
631            | Self::DurationMillisecond(v)
632            | Self::DurationMicrosecond(v)
633            | Self::DurationNanosecond(v) => match lit {
634                AvroLiteral::Long(i) => {
635                    v.push(*i);
636                    Ok(())
637                }
638                _ => Err(ArrowError::InvalidArgumentError(
639                    "Default for duration long must be long".to_string(),
640                )),
641            },
642            Self::Int64(v)
643            | Self::Int32ToInt64(v)
644            | Self::TimeMicros(v)
645            | Self::TimestampMillis(_, v)
646            | Self::TimestampMicros(_, v) => match lit {
647                AvroLiteral::Long(i) => {
648                    v.push(*i);
649                    Ok(())
650                }
651                AvroLiteral::Int(i) => {
652                    v.push(*i as i64);
653                    Ok(())
654                }
655                _ => Err(ArrowError::InvalidArgumentError(
656                    "Default for long/time-micros/timestamp must be long or int".to_string(),
657                )),
658            },
659            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
660                AvroLiteral::Float(f) => {
661                    v.push(*f);
662                    Ok(())
663                }
664                _ => Err(ArrowError::InvalidArgumentError(
665                    "Default for float must be float".to_string(),
666                )),
667            },
668            Self::Float64(v)
669            | Self::Int32ToFloat64(v)
670            | Self::Int64ToFloat64(v)
671            | Self::Float32ToFloat64(v) => match lit {
672                AvroLiteral::Double(f) => {
673                    v.push(*f);
674                    Ok(())
675                }
676                _ => Err(ArrowError::InvalidArgumentError(
677                    "Default for double must be double".to_string(),
678                )),
679            },
680            Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
681                AvroLiteral::Bytes(b) => {
682                    offsets.push_length(b.len());
683                    values.extend_from_slice(b);
684                    Ok(())
685                }
686                _ => Err(ArrowError::InvalidArgumentError(
687                    "Default for bytes must be bytes".to_string(),
688                )),
689            },
690            Self::BytesToString(offsets, values)
691            | Self::String(offsets, values)
692            | Self::StringView(offsets, values) => match lit {
693                AvroLiteral::String(s) => {
694                    let b = s.as_bytes();
695                    offsets.push_length(b.len());
696                    values.extend_from_slice(b);
697                    Ok(())
698                }
699                _ => Err(ArrowError::InvalidArgumentError(
700                    "Default for string must be string".to_string(),
701                )),
702            },
703            Self::Uuid(values) => match lit {
704                AvroLiteral::String(s) => {
705                    let uuid = Uuid::try_parse(s).map_err(|e| {
706                        ArrowError::InvalidArgumentError(format!("Invalid UUID default: {s} ({e})"))
707                    })?;
708                    values.extend_from_slice(uuid.as_bytes());
709                    Ok(())
710                }
711                _ => Err(ArrowError::InvalidArgumentError(
712                    "Default for uuid must be string".to_string(),
713                )),
714            },
715            Self::Fixed(sz, accum) => match lit {
716                AvroLiteral::Bytes(b) => {
717                    if b.len() != *sz as usize {
718                        return Err(ArrowError::InvalidArgumentError(format!(
719                            "Fixed default length {} does not match size {sz}",
720                            b.len(),
721                        )));
722                    }
723                    accum.extend_from_slice(b);
724                    Ok(())
725                }
726                _ => Err(ArrowError::InvalidArgumentError(
727                    "Default for fixed must be bytes".to_string(),
728                )),
729            },
730            #[cfg(feature = "small_decimals")]
731            Self::Decimal32(_, _, _, builder) => {
732                append_decimal_default!(lit, builder, 4, i32, "decimal32")
733            }
734            #[cfg(feature = "small_decimals")]
735            Self::Decimal64(_, _, _, builder) => {
736                append_decimal_default!(lit, builder, 8, i64, "decimal64")
737            }
738            Self::Decimal128(_, _, _, builder) => {
739                append_decimal_default!(lit, builder, 16, i128, "decimal128")
740            }
741            Self::Decimal256(_, _, _, builder) => {
742                append_decimal_default!(lit, builder, 32, i256, "decimal256")
743            }
744            Self::Duration(builder) => match lit {
745                AvroLiteral::Bytes(b) => {
746                    if b.len() != 12 {
747                        return Err(ArrowError::InvalidArgumentError(format!(
748                            "Duration default must be exactly 12 bytes, got {}",
749                            b.len()
750                        )));
751                    }
752                    let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
753                    let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
754                    let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
755                    let nanos = (millis as i64) * 1_000_000;
756                    builder.append_value(IntervalMonthDayNano::new(
757                        months as i32,
758                        days as i32,
759                        nanos,
760                    ));
761                    Ok(())
762                }
763                _ => Err(ArrowError::InvalidArgumentError(
764                    "Default for duration must be 12-byte little-endian months/days/millis"
765                        .to_string(),
766                )),
767            },
768            Self::Array(_, offsets, inner) => match lit {
769                AvroLiteral::Array(items) => {
770                    offsets.push_length(items.len());
771                    for item in items {
772                        inner.append_default(item)?;
773                    }
774                    Ok(())
775                }
776                _ => Err(ArrowError::InvalidArgumentError(
777                    "Default for array must be an array literal".to_string(),
778                )),
779            },
780            Self::Map(_, koff, moff, kdata, valdec) => match lit {
781                AvroLiteral::Map(entries) => {
782                    moff.push_length(entries.len());
783                    for (k, v) in entries {
784                        let kb = k.as_bytes();
785                        koff.push_length(kb.len());
786                        kdata.extend_from_slice(kb);
787                        valdec.append_default(v)?;
788                    }
789                    Ok(())
790                }
791                _ => Err(ArrowError::InvalidArgumentError(
792                    "Default for map must be a map/object literal".to_string(),
793                )),
794            },
795            Self::Enum(indices, symbols, _) => match lit {
796                AvroLiteral::Enum(sym) => {
797                    let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
798                        ArrowError::InvalidArgumentError(format!(
799                            "Enum default symbol {sym:?} not in reader symbols"
800                        ))
801                    })?;
802                    indices.push(pos as i32);
803                    Ok(())
804                }
805                _ => Err(ArrowError::InvalidArgumentError(
806                    "Default for enum must be a symbol".to_string(),
807                )),
808            },
809            #[cfg(feature = "avro_custom_types")]
810            Self::RunEndEncoded(_, len, inner) => {
811                *len += 1;
812                inner.append_default(lit)
813            }
814            Self::Union(u) => u.append_default(lit),
815            Self::Record(field_meta, decoders, projector) => match lit {
816                AvroLiteral::Map(entries) => {
817                    for (i, dec) in decoders.iter_mut().enumerate() {
818                        let name = field_meta[i].name();
819                        if let Some(sub) = entries.get(name) {
820                            dec.append_default(sub)?;
821                        } else if let Some(proj) = projector.as_ref() {
822                            proj.project_default(dec, i)?;
823                        } else {
824                            dec.append_null()?;
825                        }
826                    }
827                    Ok(())
828                }
829                AvroLiteral::Null => {
830                    for (i, dec) in decoders.iter_mut().enumerate() {
831                        if let Some(proj) = projector.as_ref() {
832                            proj.project_default(dec, i)?;
833                        } else {
834                            dec.append_null()?;
835                        }
836                    }
837                    Ok(())
838                }
839                _ => Err(ArrowError::InvalidArgumentError(
840                    "Default for record must be a map/object or null".to_string(),
841                )),
842            },
843        }
844    }
845
846    /// Decode a single record from `buf`
847    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
848        match self {
849            Self::Null(x) => *x += 1,
850            Self::Boolean(values) => values.append(buf.get_bool()?),
851            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
852                values.push(buf.get_int()?)
853            }
854            Self::Int64(values)
855            | Self::TimeMicros(values)
856            | Self::TimestampMillis(_, values)
857            | Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
858            #[cfg(feature = "avro_custom_types")]
859            Self::DurationSecond(values)
860            | Self::DurationMillisecond(values)
861            | Self::DurationMicrosecond(values)
862            | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
863            Self::Float32(values) => values.push(buf.get_float()?),
864            Self::Float64(values) => values.push(buf.get_double()?),
865            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
866            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
867            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
868            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
869            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
870            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
871            Self::StringToBytes(offsets, values)
872            | Self::BytesToString(offsets, values)
873            | Self::Binary(offsets, values)
874            | Self::String(offsets, values)
875            | Self::StringView(offsets, values) => {
876                let data = buf.get_bytes()?;
877                offsets.push_length(data.len());
878                values.extend_from_slice(data);
879            }
880            Self::Uuid(values) => {
881                let s_bytes = buf.get_bytes()?;
882                let s = std::str::from_utf8(s_bytes).map_err(|e| {
883                    ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
884                })?;
885                let uuid = Uuid::try_parse(s)
886                    .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?;
887                values.extend_from_slice(uuid.as_bytes());
888            }
889            Self::Array(_, off, encoding) => {
890                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
891                off.push_length(total_items);
892            }
893            Self::Record(_, encodings, None) => {
894                for encoding in encodings {
895                    encoding.decode(buf)?;
896                }
897            }
898            Self::Record(_, encodings, Some(proj)) => {
899                proj.project_record(buf, encodings)?;
900            }
901            Self::Map(_, koff, moff, kdata, valdec) => {
902                let newly_added = read_blocks(buf, |cur| {
903                    let kb = cur.get_bytes()?;
904                    koff.push_length(kb.len());
905                    kdata.extend_from_slice(kb);
906                    valdec.decode(cur)
907                })?;
908                moff.push_length(newly_added);
909            }
910            Self::Fixed(sz, accum) => {
911                let fx = buf.get_fixed(*sz as usize)?;
912                accum.extend_from_slice(fx);
913            }
914            #[cfg(feature = "small_decimals")]
915            Self::Decimal32(_, _, size, builder) => {
916                decode_decimal!(size, buf, builder, 4, i32);
917            }
918            #[cfg(feature = "small_decimals")]
919            Self::Decimal64(_, _, size, builder) => {
920                decode_decimal!(size, buf, builder, 8, i64);
921            }
922            Self::Decimal128(_, _, size, builder) => {
923                decode_decimal!(size, buf, builder, 16, i128);
924            }
925            Self::Decimal256(_, _, size, builder) => {
926                decode_decimal!(size, buf, builder, 32, i256);
927            }
928            Self::Enum(indices, _, None) => {
929                indices.push(buf.get_int()?);
930            }
931            Self::Enum(indices, _, Some(res)) => {
932                let raw = buf.get_int()?;
933                let resolved = usize::try_from(raw)
934                    .ok()
935                    .and_then(|idx| res.mapping.get(idx).copied())
936                    .filter(|&idx| idx >= 0)
937                    .unwrap_or(res.default_index);
938                if resolved >= 0 {
939                    indices.push(resolved);
940                } else {
941                    return Err(ArrowError::ParseError(format!(
942                        "Enum symbol index {raw} not resolvable and no default provided",
943                    )));
944                }
945            }
946            Self::Duration(builder) => {
947                let b = buf.get_fixed(12)?;
948                let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
949                let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
950                let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
951                let nanos = (millis as i64) * 1_000_000;
952                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
953            }
954            #[cfg(feature = "avro_custom_types")]
955            Self::RunEndEncoded(_, len, inner) => {
956                *len += 1;
957                inner.decode(buf)?;
958            }
959            Self::Union(u) => u.decode(buf)?,
960            Self::Nullable(order, nb, encoding, plan) => {
961                match *plan {
962                    NullablePlan::FromSingle { promotion } => {
963                        encoding.decode_with_promotion(buf, promotion)?;
964                        nb.append(true);
965                    }
966                    NullablePlan::ReadTag => {
967                        let branch = buf.read_vlq()?;
968                        let is_not_null = match *order {
969                            Nullability::NullFirst => branch != 0,
970                            Nullability::NullSecond => branch == 0,
971                        };
972                        if is_not_null {
973                            // It is important to decode before appending to null buffer in case of decode error
974                            encoding.decode(buf)?;
975                        } else {
976                            encoding.append_null()?;
977                        }
978                        nb.append(is_not_null);
979                    }
980                }
981            }
982        }
983        Ok(())
984    }
985
986    fn decode_with_promotion(
987        &mut self,
988        buf: &mut AvroCursor<'_>,
989        promotion: Promotion,
990    ) -> Result<(), ArrowError> {
991        #[cfg(feature = "avro_custom_types")]
992        if let Self::RunEndEncoded(_, len, inner) = self {
993            *len += 1;
994            return inner.decode_with_promotion(buf, promotion);
995        }
996
997        macro_rules! promote_numeric_to {
998            ($variant:ident, $getter:ident, $to:ty) => {{
999                match self {
1000                    Self::$variant(v) => {
1001                        let x = buf.$getter()?;
1002                        v.push(x as $to);
1003                        Ok(())
1004                    }
1005                    other => Err(ArrowError::ParseError(format!(
1006                        "Promotion {promotion} target mismatch: expected {}, got {}",
1007                        stringify!($variant),
1008                        <Self as ::std::convert::AsRef<str>>::as_ref(other)
1009                    ))),
1010                }
1011            }};
1012        }
1013        match promotion {
1014            Promotion::Direct => self.decode(buf),
1015            Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1016            Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1017            Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1018            Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1019            Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1020            Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1021            Promotion::StringToBytes => match self {
1022                Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1023                    let data = buf.get_bytes()?;
1024                    offsets.push_length(data.len());
1025                    values.extend_from_slice(data);
1026                    Ok(())
1027                }
1028                other => Err(ArrowError::ParseError(format!(
1029                    "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1030                    <Self as AsRef<str>>::as_ref(other)
1031                ))),
1032            },
1033            Promotion::BytesToString => match self {
1034                Self::String(offsets, values)
1035                | Self::StringView(offsets, values)
1036                | Self::BytesToString(offsets, values) => {
1037                    let data = buf.get_bytes()?;
1038                    offsets.push_length(data.len());
1039                    values.extend_from_slice(data);
1040                    Ok(())
1041                }
1042                other => Err(ArrowError::ParseError(format!(
1043                    "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1044                    <Self as AsRef<str>>::as_ref(other)
1045                ))),
1046            },
1047        }
1048    }
1049
1050    /// Flush decoded records to an [`ArrayRef`]
1051    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1052        Ok(match self {
1053            Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1054            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1055            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1056            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1057            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1058            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1059            Self::TimeMillis(values) => {
1060                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1061            }
1062            Self::TimeMicros(values) => {
1063                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1064            }
1065            Self::TimestampMillis(is_utc, values) => Arc::new(
1066                flush_primitive::<TimestampMillisecondType>(values, nulls)
1067                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1068            ),
1069            Self::TimestampMicros(is_utc, values) => Arc::new(
1070                flush_primitive::<TimestampMicrosecondType>(values, nulls)
1071                    .with_timezone_opt(is_utc.then(|| "+00:00")),
1072            ),
1073            #[cfg(feature = "avro_custom_types")]
1074            Self::DurationSecond(values) => {
1075                Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1076            }
1077            #[cfg(feature = "avro_custom_types")]
1078            Self::DurationMillisecond(values) => {
1079                Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1080            }
1081            #[cfg(feature = "avro_custom_types")]
1082            Self::DurationMicrosecond(values) => {
1083                Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1084            }
1085            #[cfg(feature = "avro_custom_types")]
1086            Self::DurationNanosecond(values) => {
1087                Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1088            }
1089            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1090            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1091            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1092            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1093                Arc::new(flush_primitive::<Float32Type>(values, nulls))
1094            }
1095            Self::Int32ToFloat64(values)
1096            | Self::Int64ToFloat64(values)
1097            | Self::Float32ToFloat64(values) => {
1098                Arc::new(flush_primitive::<Float64Type>(values, nulls))
1099            }
1100            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1101                let offsets = flush_offsets(offsets);
1102                let values = flush_values(values).into();
1103                Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1104            }
1105            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1106                let offsets = flush_offsets(offsets);
1107                let values = flush_values(values).into();
1108                Arc::new(StringArray::try_new(offsets, values, nulls)?)
1109            }
1110            Self::StringView(offsets, values) => {
1111                let offsets = flush_offsets(offsets);
1112                let values = flush_values(values);
1113                let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1114                let values: Vec<&str> = (0..array.len())
1115                    .map(|i| {
1116                        if array.is_valid(i) {
1117                            array.value(i)
1118                        } else {
1119                            ""
1120                        }
1121                    })
1122                    .collect();
1123                Arc::new(StringViewArray::from(values))
1124            }
1125            Self::Array(field, offsets, values) => {
1126                let values = values.flush(None)?;
1127                let offsets = flush_offsets(offsets);
1128                Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1129            }
1130            Self::Record(fields, encodings, _) => {
1131                let arrays = encodings
1132                    .iter_mut()
1133                    .map(|x| x.flush(None))
1134                    .collect::<Result<Vec<_>, _>>()?;
1135                Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1136            }
1137            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1138                let moff = flush_offsets(m_off);
1139                let koff = flush_offsets(k_off);
1140                let kd = flush_values(kdata).into();
1141                let val_arr = valdec.flush(None)?;
1142                let key_arr = StringArray::try_new(koff, kd, None)?;
1143                if key_arr.len() != val_arr.len() {
1144                    return Err(ArrowError::InvalidArgumentError(format!(
1145                        "Map keys length ({}) != map values length ({})",
1146                        key_arr.len(),
1147                        val_arr.len()
1148                    )));
1149                }
1150                let final_len = moff.len() - 1;
1151                if let Some(n) = &nulls {
1152                    if n.len() != final_len {
1153                        return Err(ArrowError::InvalidArgumentError(format!(
1154                            "Map array null buffer length {} != final map length {final_len}",
1155                            n.len()
1156                        )));
1157                    }
1158                }
1159                let entries_fields = match map_field.data_type() {
1160                    DataType::Struct(fields) => fields.clone(),
1161                    other => {
1162                        return Err(ArrowError::InvalidArgumentError(format!(
1163                            "Map entries field must be a Struct, got {other:?}"
1164                        )));
1165                    }
1166                };
1167                let entries_struct =
1168                    StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1169                let map_arr =
1170                    MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1171                Arc::new(map_arr)
1172            }
1173            Self::Fixed(sz, accum) => {
1174                let b: Buffer = flush_values(accum).into();
1175                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1176                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1177                Arc::new(arr)
1178            }
1179            Self::Uuid(values) => {
1180                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1181                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1182                Arc::new(arr)
1183            }
1184            #[cfg(feature = "small_decimals")]
1185            Self::Decimal32(precision, scale, _, builder) => {
1186                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1187            }
1188            #[cfg(feature = "small_decimals")]
1189            Self::Decimal64(precision, scale, _, builder) => {
1190                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1191            }
1192            Self::Decimal128(precision, scale, _, builder) => {
1193                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1194            }
1195            Self::Decimal256(precision, scale, _, builder) => {
1196                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1197            }
1198            Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1199            Self::Duration(builder) => {
1200                let (_, vals, _) = builder.finish().into_parts();
1201                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1202                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1203                Arc::new(vals)
1204            }
1205            #[cfg(feature = "avro_custom_types")]
1206            Self::RunEndEncoded(width, len, inner) => {
1207                let values = inner.flush(nulls)?;
1208                let n = *len;
1209                let arr = values.as_ref();
1210                let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1211                if n > 0 {
1212                    run_starts.push(0);
1213                    for i in 1..n {
1214                        if !values_equal_at(arr, i - 1, i) {
1215                            run_starts.push(i);
1216                        }
1217                    }
1218                }
1219                if n > (u32::MAX as usize) {
1220                    return Err(ArrowError::InvalidArgumentError(format!(
1221                        "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1222                    )));
1223                }
1224                let run_count = run_starts.len();
1225                let take_idx: PrimitiveArray<UInt32Type> =
1226                    run_starts.iter().map(|&s| s as u32).collect();
1227                let per_run_values = if run_count == 0 {
1228                    values.slice(0, 0)
1229                } else {
1230                    take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1231                        ArrowError::ParseError(format!("take() for REE values failed: {e}"))
1232                    })?
1233                };
1234
1235                macro_rules! build_run_array {
1236                    ($Native:ty, $ArrowTy:ty) => {{
1237                        let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1238                        for (idx, &_start) in run_starts.iter().enumerate() {
1239                            let end = if idx + 1 < run_count {
1240                                run_starts[idx + 1]
1241                            } else {
1242                                n
1243                            };
1244                            ends.push(end as $Native);
1245                        }
1246                        let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1247                        let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1248                            .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1249                        Arc::new(run_arr) as ArrayRef
1250                    }};
1251                }
1252                match *width {
1253                    2 => {
1254                        if n > i16::MAX as usize {
1255                            return Err(ArrowError::InvalidArgumentError(format!(
1256                                "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1257                            )));
1258                        }
1259                        build_run_array!(i16, Int16Type)
1260                    }
1261                    4 => build_run_array!(i32, Int32Type),
1262                    8 => build_run_array!(i64, Int64Type),
1263                    other => {
1264                        return Err(ArrowError::InvalidArgumentError(format!(
1265                            "Unsupported run-end width {other} for RunEndEncoded"
1266                        )));
1267                    }
1268                }
1269            }
1270            Self::Union(u) => u.flush(nulls)?,
1271        })
1272    }
1273}
1274
1275// A lookup table for resolving fields between writer and reader schemas during record projection.
1276#[derive(Debug)]
1277struct DispatchLookupTable {
1278    // Maps each reader field index `r` to the corresponding writer field index.
1279    //
1280    // Semantics:
1281    // - `to_reader[r] >= 0`: The value is an index into the writer's fields. The value from
1282    //   the writer field is decoded, and `promotion[r]` is applied.
1283    // - `to_reader[r] == NO_SOURCE` (-1): No matching writer field exists. The reader field's
1284    //   default value is used.
1285    //
1286    // Representation (`i8`):
1287    // `i8` is used for a dense, cache-friendly dispatch table, consistent with Arrow's use of
1288    // `i8` for union type IDs. This requires that writer field indices do not exceed `i8::MAX`.
1289    //
1290    // Invariants:
1291    // - `to_reader.len() == promotion.len()` and matches the reader field count.
1292    // - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
1293    to_reader: Box<[i8]>,
1294    // For each reader field `r`, specifies the `Promotion` to apply to the writer's value.
1295    //
1296    // This is used when a writer field's type can be promoted to a reader field's type
1297    // (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
1298    promotion: Box<[Promotion]>,
1299}
1300
1301// Sentinel used in `DispatchLookupTable::to_reader` to mark
1302// "no matching writer field".
1303const NO_SOURCE: i8 = -1;
1304
1305impl DispatchLookupTable {
1306    fn from_writer_to_reader(
1307        promotion_map: &[Option<(usize, Promotion)>],
1308    ) -> Result<Self, ArrowError> {
1309        let mut to_reader = Vec::with_capacity(promotion_map.len());
1310        let mut promotion = Vec::with_capacity(promotion_map.len());
1311        for map in promotion_map {
1312            match *map {
1313                Some((idx, promo)) => {
1314                    let idx_i8 = i8::try_from(idx).map_err(|_| {
1315                        ArrowError::SchemaError(format!(
1316                            "Reader branch index {idx} exceeds i8 range (max {})",
1317                            i8::MAX
1318                        ))
1319                    })?;
1320                    to_reader.push(idx_i8);
1321                    promotion.push(promo);
1322                }
1323                None => {
1324                    to_reader.push(NO_SOURCE);
1325                    promotion.push(Promotion::Direct);
1326                }
1327            }
1328        }
1329        Ok(Self {
1330            to_reader: to_reader.into_boxed_slice(),
1331            promotion: promotion.into_boxed_slice(),
1332        })
1333    }
1334
1335    // Resolve a writer branch index to (reader_idx, promotion)
1336    #[inline]
1337    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1338        let reader_index = *self.to_reader.get(writer_index)?;
1339        (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1340    }
1341}
1342
1343#[derive(Debug)]
1344struct UnionDecoder {
1345    fields: UnionFields,
1346    type_ids: Vec<i8>,
1347    offsets: Vec<i32>,
1348    branches: Vec<Decoder>,
1349    counts: Vec<i32>,
1350    reader_type_codes: Vec<i8>,
1351    default_emit_idx: usize,
1352    null_emit_idx: usize,
1353    plan: UnionReadPlan,
1354}
1355
1356impl Default for UnionDecoder {
1357    fn default() -> Self {
1358        Self {
1359            fields: UnionFields::empty(),
1360            type_ids: Vec::new(),
1361            offsets: Vec::new(),
1362            branches: Vec::new(),
1363            counts: Vec::new(),
1364            reader_type_codes: Vec::new(),
1365            default_emit_idx: 0,
1366            null_emit_idx: 0,
1367            plan: UnionReadPlan::Passthrough,
1368        }
1369    }
1370}
1371
1372#[derive(Debug)]
1373enum UnionReadPlan {
1374    ReaderUnion {
1375        lookup_table: DispatchLookupTable,
1376    },
1377    FromSingle {
1378        reader_idx: usize,
1379        promotion: Promotion,
1380    },
1381    ToSingle {
1382        target: Box<Decoder>,
1383        lookup_table: DispatchLookupTable,
1384    },
1385    Passthrough,
1386}
1387
1388impl UnionDecoder {
1389    fn try_new(
1390        fields: UnionFields,
1391        branches: Vec<Decoder>,
1392        resolved: Option<ResolvedUnion>,
1393    ) -> Result<Self, ArrowError> {
1394        let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1395        let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1396        let default_emit_idx = 0;
1397        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1398        let branch_len = branches.len().max(reader_type_codes.len());
1399        // Guard against impractically large unions that cannot be indexed by an Avro int
1400        let max_addr = (i32::MAX as usize) + 1;
1401        if branches.len() > max_addr {
1402            return Err(ArrowError::SchemaError(format!(
1403                "Reader union has {} branches, which exceeds the maximum addressable \
1404                 branches by an Avro int tag ({} + 1).",
1405                branches.len(),
1406                i32::MAX
1407            )));
1408        }
1409        Ok(Self {
1410            fields,
1411            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1412            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1413            branches,
1414            counts: vec![0; branch_len],
1415            reader_type_codes,
1416            default_emit_idx,
1417            null_emit_idx,
1418            plan: Self::plan_from_resolved(resolved)?,
1419        })
1420    }
1421
1422    fn try_new_from_writer_union(
1423        info: ResolvedUnion,
1424        target: Box<Decoder>,
1425    ) -> Result<Self, ArrowError> {
1426        // This constructor is only for writer-union to single-type resolution
1427        debug_assert!(info.writer_is_union && !info.reader_is_union);
1428        let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1429        Ok(Self {
1430            plan: UnionReadPlan::ToSingle {
1431                target,
1432                lookup_table,
1433            },
1434            ..Self::default()
1435        })
1436    }
1437
1438    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, ArrowError> {
1439        let Some(info) = resolved else {
1440            return Ok(UnionReadPlan::Passthrough);
1441        };
1442        match (info.writer_is_union, info.reader_is_union) {
1443            (true, true) => {
1444                let lookup_table =
1445                    DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1446                Ok(UnionReadPlan::ReaderUnion { lookup_table })
1447            }
1448            (false, true) => {
1449                let Some(&(reader_idx, promotion)) =
1450                    info.writer_to_reader.first().and_then(Option::as_ref)
1451                else {
1452                    return Err(ArrowError::SchemaError(
1453                        "Writer type does not match any reader union branch".to_string(),
1454                    ));
1455                };
1456                Ok(UnionReadPlan::FromSingle {
1457                    reader_idx,
1458                    promotion,
1459                })
1460            }
1461            (true, false) => Err(ArrowError::InvalidArgumentError(
1462                "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1463                    .to_string(),
1464            )),
1465            // (false, false) is invalid and should never be constructed by the resolver.
1466            _ => Err(ArrowError::SchemaError(
1467                "ResolvedUnion constructed for non-union sides; resolver should return None"
1468                    .to_string(),
1469            )),
1470        }
1471    }
1472
1473    #[inline]
1474    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
1475        // Avro unions are encoded by first writing the zero-based branch index.
1476        // In Avro 1.11.1 this is specified as an *int*; older specs said *long*,
1477        // but both use zig-zag varint encoding, so decoding as long is compatible
1478        // with either form and widely used in practice.
1479        let raw = buf.get_long()?;
1480        if raw < 0 {
1481            return Err(ArrowError::ParseError(format!(
1482                "Negative union branch index {raw}"
1483            )));
1484        }
1485        usize::try_from(raw).map_err(|_| {
1486            ArrowError::ParseError(format!(
1487                "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1488                (usize::BITS as usize)
1489            ))
1490        })
1491    }
1492
1493    #[inline]
1494    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, ArrowError> {
1495        let branches_len = self.branches.len();
1496        let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1497            return Err(ArrowError::ParseError(format!(
1498                "Union branch index {reader_idx} out of range ({branches_len} branches)"
1499            )));
1500        };
1501        self.type_ids.push(self.reader_type_codes[reader_idx]);
1502        self.offsets.push(self.counts[reader_idx]);
1503        self.counts[reader_idx] += 1;
1504        Ok(reader_branch)
1505    }
1506
1507    #[inline]
1508    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), ArrowError>
1509    where
1510        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
1511    {
1512        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1513            return action(target);
1514        }
1515        let reader_idx = match &self.plan {
1516            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1517            _ => fallback_idx,
1518        };
1519        self.emit_to(reader_idx).and_then(action)
1520    }
1521
1522    fn append_null(&mut self) -> Result<(), ArrowError> {
1523        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1524    }
1525
1526    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
1527        self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1528    }
1529
1530    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1531        let (reader_idx, promotion) = match &mut self.plan {
1532            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1533            UnionReadPlan::ReaderUnion { lookup_table } => {
1534                let idx = Self::read_tag(buf)?;
1535                lookup_table.resolve(idx).ok_or_else(|| {
1536                    ArrowError::ParseError(format!(
1537                        "Union branch index {idx} not resolvable by reader schema"
1538                    ))
1539                })?
1540            }
1541            UnionReadPlan::FromSingle {
1542                reader_idx,
1543                promotion,
1544            } => (*reader_idx, *promotion),
1545            UnionReadPlan::ToSingle {
1546                target,
1547                lookup_table,
1548            } => {
1549                let idx = Self::read_tag(buf)?;
1550                return match lookup_table.resolve(idx) {
1551                    Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1552                    None => Err(ArrowError::ParseError(format!(
1553                        "Writer union branch {idx} does not resolve to reader type"
1554                    ))),
1555                };
1556            }
1557        };
1558        let decoder = self.emit_to(reader_idx)?;
1559        decoder.decode_with_promotion(buf, promotion)
1560    }
1561
1562    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1563        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1564            return target.flush(nulls);
1565        }
1566        debug_assert!(
1567            nulls.is_none(),
1568            "UnionArray does not accept a validity bitmap; \
1569                     nulls should have been materialized as a Null child during decode"
1570        );
1571        let children = self
1572            .branches
1573            .iter_mut()
1574            .map(|d| d.flush(None))
1575            .collect::<Result<Vec<_>, _>>()?;
1576        let arr = UnionArray::try_new(
1577            self.fields.clone(),
1578            flush_values(&mut self.type_ids).into_iter().collect(),
1579            Some(flush_values(&mut self.offsets).into_iter().collect()),
1580            children,
1581        )
1582        .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1583        Ok(Arc::new(arr))
1584    }
1585}
1586
1587#[derive(Debug, Default)]
1588struct UnionDecoderBuilder {
1589    fields: Option<UnionFields>,
1590    branches: Option<Vec<Decoder>>,
1591    resolved: Option<ResolvedUnion>,
1592    target: Option<Box<Decoder>>,
1593}
1594
1595impl UnionDecoderBuilder {
1596    fn new() -> Self {
1597        Self::default()
1598    }
1599
1600    fn with_fields(mut self, fields: UnionFields) -> Self {
1601        self.fields = Some(fields);
1602        self
1603    }
1604
1605    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
1606        self.branches = Some(branches);
1607        self
1608    }
1609
1610    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
1611        self.resolved = Some(resolved_union);
1612        self
1613    }
1614
1615    fn with_target(mut self, target: Box<Decoder>) -> Self {
1616        self.target = Some(target);
1617        self
1618    }
1619
1620    fn build(self) -> Result<UnionDecoder, ArrowError> {
1621        match (self.resolved, self.fields, self.branches, self.target) {
1622            (resolved, Some(fields), Some(branches), None) => {
1623                UnionDecoder::try_new(fields, branches, resolved)
1624            }
1625            (Some(info), None, None, Some(target))
1626                if info.writer_is_union && !info.reader_is_union =>
1627            {
1628                UnionDecoder::try_new_from_writer_union(info, target)
1629            }
1630            _ => Err(ArrowError::InvalidArgumentError(
1631                "Invalid UnionDecoderBuilder configuration: expected either \
1632                 (fields + branches + resolved) with no target for reader-unions, or \
1633                 (resolved + target) with no fields/branches for writer-union to single."
1634                    .to_string(),
1635            )),
1636        }
1637    }
1638}
1639
1640#[derive(Debug, Copy, Clone)]
1641enum NegativeBlockBehavior {
1642    ProcessItems,
1643    SkipBySize,
1644}
1645
1646#[inline]
1647fn skip_blocks(
1648    buf: &mut AvroCursor,
1649    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1650) -> Result<usize, ArrowError> {
1651    process_blockwise(
1652        buf,
1653        move |c| skip_item(c),
1654        NegativeBlockBehavior::SkipBySize,
1655    )
1656}
1657
1658#[inline]
1659fn flush_dict(
1660    indices: &mut Vec<i32>,
1661    symbols: &[String],
1662    nulls: Option<NullBuffer>,
1663) -> Result<ArrayRef, ArrowError> {
1664    let keys = flush_primitive::<Int32Type>(indices, nulls);
1665    let values = Arc::new(StringArray::from_iter_values(
1666        symbols.iter().map(|s| s.as_str()),
1667    ));
1668    DictionaryArray::try_new(keys, values)
1669        .map_err(|e| ArrowError::ParseError(e.to_string()))
1670        .map(|arr| Arc::new(arr) as ArrayRef)
1671}
1672
1673#[inline]
1674fn read_blocks(
1675    buf: &mut AvroCursor,
1676    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1677) -> Result<usize, ArrowError> {
1678    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
1679}
1680
1681#[inline]
1682fn process_blockwise(
1683    buf: &mut AvroCursor,
1684    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1685    negative_behavior: NegativeBlockBehavior,
1686) -> Result<usize, ArrowError> {
1687    let mut total = 0usize;
1688    loop {
1689        // Read the block count
1690        //  positive = that many items
1691        //  negative = that many items + read block size
1692        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
1693        let block_count = buf.get_long()?;
1694        match block_count.cmp(&0) {
1695            Ordering::Equal => break,
1696            Ordering::Less => {
1697                let count = (-block_count) as usize;
1698                // A negative count is followed by a long of the size in bytes
1699                let size_in_bytes = buf.get_long()? as usize;
1700                match negative_behavior {
1701                    NegativeBlockBehavior::ProcessItems => {
1702                        // Process items one-by-one after reading size
1703                        for _ in 0..count {
1704                            on_item(buf)?;
1705                        }
1706                    }
1707                    NegativeBlockBehavior::SkipBySize => {
1708                        // Skip the entire block payload at once
1709                        let _ = buf.get_fixed(size_in_bytes)?;
1710                    }
1711                }
1712                total += count;
1713            }
1714            Ordering::Greater => {
1715                let count = block_count as usize;
1716                for _ in 0..count {
1717                    on_item(buf)?;
1718                }
1719                total += count;
1720            }
1721        }
1722    }
1723    Ok(total)
1724}
1725
1726#[inline]
1727fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
1728    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
1729}
1730
1731#[inline]
1732fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
1733    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
1734}
1735
1736#[inline]
1737fn flush_primitive<T: ArrowPrimitiveType>(
1738    values: &mut Vec<T::Native>,
1739    nulls: Option<NullBuffer>,
1740) -> PrimitiveArray<T> {
1741    PrimitiveArray::new(flush_values(values).into(), nulls)
1742}
1743
1744#[inline]
1745fn read_decimal_bytes_be<const N: usize>(
1746    buf: &mut AvroCursor<'_>,
1747    size: &Option<usize>,
1748) -> Result<[u8; N], ArrowError> {
1749    match size {
1750        Some(n) if *n == N => {
1751            let raw = buf.get_fixed(N)?;
1752            let mut arr = [0u8; N];
1753            arr.copy_from_slice(raw);
1754            Ok(arr)
1755        }
1756        Some(n) => {
1757            let raw = buf.get_fixed(*n)?;
1758            sign_cast_to::<N>(raw)
1759        }
1760        None => {
1761            let raw = buf.get_bytes()?;
1762            sign_cast_to::<N>(raw)
1763        }
1764    }
1765}
1766
1767/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
1768/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
1769/// the payload is a big-endian two's-complement integer, and when narrowing it must
1770/// be representable without changing sign or value.
1771///
1772/// If `raw.len() < N`, the value is sign-extended.
1773/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
1774/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
1775#[inline]
1776fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
1777    let len = raw.len();
1778    // Fast path: exact width, just copy
1779    if len == N {
1780        let mut out = [0u8; N];
1781        out.copy_from_slice(raw);
1782        return Ok(out);
1783    }
1784    // Determine sign byte from MSB of first byte (empty => positive)
1785    let first = raw.first().copied().unwrap_or(0u8);
1786    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
1787    // Pre-fill with sign byte to support sign extension
1788    let mut out = [sign_byte; N];
1789    if len > N {
1790        // Validate truncation: all dropped leading bytes must equal sign_byte,
1791        // and the MSB of the first kept byte must match the sign.
1792        let extra = len - N;
1793        // Any non-sign byte in the truncated prefix indicates overflow
1794        if raw[..extra].iter().any(|&b| b != sign_byte) {
1795            return Err(ArrowError::ParseError(format!(
1796                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1797                len, N
1798            )));
1799        }
1800        if N > 0 {
1801            let first_kept = raw[extra];
1802            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
1803            if sign_bit_mismatch {
1804                return Err(ArrowError::ParseError(format!(
1805                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1806                    len, N
1807                )));
1808            }
1809        }
1810        out.copy_from_slice(&raw[extra..]);
1811        return Ok(out);
1812    }
1813    out[N - len..].copy_from_slice(raw);
1814    Ok(out)
1815}
1816
1817#[cfg(feature = "avro_custom_types")]
1818#[inline]
1819fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
1820    match (arr.is_null(i), arr.is_null(j)) {
1821        (true, true) => true,
1822        (true, false) | (false, true) => false,
1823        (false, false) => {
1824            let a = arr.slice(i, 1);
1825            let b = arr.slice(j, 1);
1826            a == b
1827        }
1828    }
1829}
1830
1831#[derive(Debug)]
1832struct Projector {
1833    writer_to_reader: Arc<[Option<usize>]>,
1834    skip_decoders: Vec<Option<Skipper>>,
1835    field_defaults: Vec<Option<AvroLiteral>>,
1836    default_injections: Arc<[(usize, AvroLiteral)]>,
1837}
1838
1839#[derive(Debug)]
1840struct ProjectorBuilder<'a> {
1841    rec: &'a ResolvedRecord,
1842    reader_fields: Arc<[AvroField]>,
1843}
1844
1845impl<'a> ProjectorBuilder<'a> {
1846    #[inline]
1847    fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
1848        Self {
1849            rec,
1850            reader_fields: reader_fields.clone(),
1851        }
1852    }
1853
1854    #[inline]
1855    fn build(self) -> Result<Projector, ArrowError> {
1856        let reader_fields = self.reader_fields;
1857        let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
1858        for avro_field in reader_fields.as_ref() {
1859            if let Some(ResolutionInfo::DefaultValue(lit)) =
1860                avro_field.data_type().resolution.as_ref()
1861            {
1862                field_defaults.push(Some(lit.clone()));
1863            } else {
1864                field_defaults.push(None);
1865            }
1866        }
1867        let mut default_injections: Vec<(usize, AvroLiteral)> =
1868            Vec::with_capacity(self.rec.default_fields.len());
1869        for &idx in self.rec.default_fields.as_ref() {
1870            let lit = field_defaults
1871                .get(idx)
1872                .and_then(|lit| lit.clone())
1873                .unwrap_or(AvroLiteral::Null);
1874            default_injections.push((idx, lit));
1875        }
1876        let mut skip_decoders: Vec<Option<Skipper>> =
1877            Vec::with_capacity(self.rec.skip_fields.len());
1878        for datatype in self.rec.skip_fields.as_ref() {
1879            let skipper = match datatype {
1880                Some(datatype) => Some(Skipper::from_avro(datatype)?),
1881                None => None,
1882            };
1883            skip_decoders.push(skipper);
1884        }
1885        Ok(Projector {
1886            writer_to_reader: self.rec.writer_to_reader.clone(),
1887            skip_decoders,
1888            field_defaults,
1889            default_injections: default_injections.into(),
1890        })
1891    }
1892}
1893
1894impl Projector {
1895    #[inline]
1896    fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), ArrowError> {
1897        // SAFETY: `index` is obtained by listing the reader's record fields (i.e., from
1898        // `decoders.iter_mut().enumerate()`), and `field_defaults` was built in
1899        // `ProjectorBuilder::build` to have exactly one element per reader field.
1900        // Therefore, `index < self.field_defaults.len()` always holds here, so
1901        // `self.field_defaults[index]` cannot panic. We only take an immutable reference
1902        // via `.as_ref()`, and `self` is borrowed immutably.
1903        if let Some(default_literal) = self.field_defaults[index].as_ref() {
1904            decoder.append_default(default_literal)
1905        } else {
1906            decoder.append_null()
1907        }
1908    }
1909
1910    #[inline]
1911    fn project_record(
1912        &mut self,
1913        buf: &mut AvroCursor<'_>,
1914        encodings: &mut [Decoder],
1915    ) -> Result<(), ArrowError> {
1916        debug_assert_eq!(
1917            self.writer_to_reader.len(),
1918            self.skip_decoders.len(),
1919            "internal invariant: mapping and skipper lists must have equal length"
1920        );
1921        for (i, (mapping, skipper_opt)) in self
1922            .writer_to_reader
1923            .iter()
1924            .zip(self.skip_decoders.iter_mut())
1925            .enumerate()
1926        {
1927            match (mapping, skipper_opt.as_mut()) {
1928                (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
1929                (None, Some(skipper)) => skipper.skip(buf)?,
1930                (None, None) => {
1931                    return Err(ArrowError::SchemaError(format!(
1932                        "No skipper available for writer-only field at index {i}",
1933                    )));
1934                }
1935            }
1936        }
1937        for (reader_index, lit) in self.default_injections.as_ref() {
1938            encodings[*reader_index].append_default(lit)?;
1939        }
1940        Ok(())
1941    }
1942}
1943
1944/// Lightweight skipper for non‑projected writer fields
1945/// (fields present in the writer schema but omitted by the reader/projection);
1946/// per Avro 1.11.1 schema resolution these fields are ignored.
1947///
1948/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
1949#[derive(Debug)]
1950enum Skipper {
1951    Null,
1952    Boolean,
1953    Int32,
1954    Int64,
1955    Float32,
1956    Float64,
1957    Bytes,
1958    String,
1959    TimeMicros,
1960    TimestampMillis,
1961    TimestampMicros,
1962    Fixed(usize),
1963    Decimal(Option<usize>),
1964    UuidString,
1965    Enum,
1966    DurationFixed12,
1967    List(Box<Skipper>),
1968    Map(Box<Skipper>),
1969    Struct(Vec<Skipper>),
1970    Union(Vec<Skipper>),
1971    Nullable(Nullability, Box<Skipper>),
1972    #[cfg(feature = "avro_custom_types")]
1973    RunEndEncoded(Box<Skipper>),
1974}
1975
1976impl Skipper {
1977    fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
1978        let mut base = match dt.codec() {
1979            Codec::Null => Self::Null,
1980            Codec::Boolean => Self::Boolean,
1981            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1982            Codec::Int64 => Self::Int64,
1983            Codec::TimeMicros => Self::TimeMicros,
1984            Codec::TimestampMillis(_) => Self::TimestampMillis,
1985            Codec::TimestampMicros(_) => Self::TimestampMicros,
1986            #[cfg(feature = "avro_custom_types")]
1987            Codec::DurationNanos
1988            | Codec::DurationMicros
1989            | Codec::DurationMillis
1990            | Codec::DurationSeconds => Self::Int64,
1991            Codec::Float32 => Self::Float32,
1992            Codec::Float64 => Self::Float64,
1993            Codec::Binary => Self::Bytes,
1994            Codec::Utf8 | Codec::Utf8View => Self::String,
1995            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
1996            Codec::Decimal(_, _, size) => Self::Decimal(*size),
1997            Codec::Uuid => Self::UuidString, // encoded as string
1998            Codec::Enum(_) => Self::Enum,
1999            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2000            Codec::Struct(fields) => Self::Struct(
2001                fields
2002                    .iter()
2003                    .map(|f| Skipper::from_avro(f.data_type()))
2004                    .collect::<Result<_, _>>()?,
2005            ),
2006            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2007            Codec::Interval => Self::DurationFixed12,
2008            Codec::Union(encodings, _, _) => {
2009                let max_addr = (i32::MAX as usize) + 1;
2010                if encodings.len() > max_addr {
2011                    return Err(ArrowError::SchemaError(format!(
2012                        "Writer union has {} branches, which exceeds the maximum addressable \
2013                         branches by an Avro int tag ({} + 1).",
2014                        encodings.len(),
2015                        i32::MAX
2016                    )));
2017                }
2018                Self::Union(
2019                    encodings
2020                        .iter()
2021                        .map(Skipper::from_avro)
2022                        .collect::<Result<_, _>>()?,
2023                )
2024            }
2025            #[cfg(feature = "avro_custom_types")]
2026            Codec::RunEndEncoded(inner, _w) => {
2027                Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2028            }
2029        };
2030        if let Some(n) = dt.nullability() {
2031            base = Self::Nullable(n, Box::new(base));
2032        }
2033        Ok(base)
2034    }
2035
2036    fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
2037        match self {
2038            Self::Null => Ok(()),
2039            Self::Boolean => {
2040                buf.get_bool()?;
2041                Ok(())
2042            }
2043            Self::Int32 => {
2044                buf.get_int()?;
2045                Ok(())
2046            }
2047            Self::Int64 | Self::TimeMicros | Self::TimestampMillis | Self::TimestampMicros => {
2048                buf.get_long()?;
2049                Ok(())
2050            }
2051            Self::Float32 => {
2052                buf.get_float()?;
2053                Ok(())
2054            }
2055            Self::Float64 => {
2056                buf.get_double()?;
2057                Ok(())
2058            }
2059            Self::Bytes | Self::String | Self::UuidString => {
2060                buf.get_bytes()?;
2061                Ok(())
2062            }
2063            Self::Fixed(sz) => {
2064                buf.get_fixed(*sz)?;
2065                Ok(())
2066            }
2067            Self::Decimal(size) => {
2068                if let Some(s) = size {
2069                    buf.get_fixed(*s)
2070                } else {
2071                    buf.get_bytes()
2072                }?;
2073                Ok(())
2074            }
2075            Self::Enum => {
2076                buf.get_int()?;
2077                Ok(())
2078            }
2079            Self::DurationFixed12 => {
2080                buf.get_fixed(12)?;
2081                Ok(())
2082            }
2083            Self::List(item) => {
2084                skip_blocks(buf, |c| item.skip(c))?;
2085                Ok(())
2086            }
2087            Self::Map(value) => {
2088                skip_blocks(buf, |c| {
2089                    c.get_bytes()?; // key
2090                    value.skip(c)
2091                })?;
2092                Ok(())
2093            }
2094            Self::Struct(fields) => {
2095                for f in fields.iter_mut() {
2096                    f.skip(buf)?
2097                }
2098                Ok(())
2099            }
2100            Self::Union(encodings) => {
2101                // Union tag must be ZigZag-decoded
2102                let raw = buf.get_long()?;
2103                if raw < 0 {
2104                    return Err(ArrowError::ParseError(format!(
2105                        "Negative union branch index {raw}"
2106                    )));
2107                }
2108                let idx: usize = usize::try_from(raw).map_err(|_| {
2109                    ArrowError::ParseError(format!(
2110                        "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2111                        (usize::BITS as usize)
2112                    ))
2113                })?;
2114                let Some(encoding) = encodings.get_mut(idx) else {
2115                    return Err(ArrowError::ParseError(format!(
2116                        "Union branch index {idx} out of range for skipper ({} branches)",
2117                        encodings.len()
2118                    )));
2119                };
2120                encoding.skip(buf)
2121            }
2122            Self::Nullable(order, inner) => {
2123                let branch = buf.read_vlq()?;
2124                let is_not_null = match *order {
2125                    Nullability::NullFirst => branch != 0,
2126                    Nullability::NullSecond => branch == 0,
2127                };
2128                if is_not_null {
2129                    inner.skip(buf)?;
2130                }
2131                Ok(())
2132            }
2133            #[cfg(feature = "avro_custom_types")]
2134            Self::RunEndEncoded(inner) => inner.skip(buf),
2135        }
2136    }
2137}
2138
2139#[cfg(test)]
2140mod tests {
2141    use super::*;
2142    use crate::codec::AvroFieldBuilder;
2143    use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2144    use arrow_array::cast::AsArray;
2145    use indexmap::IndexMap;
2146    use std::collections::HashMap;
2147
2148    fn encode_avro_int(value: i32) -> Vec<u8> {
2149        let mut buf = Vec::new();
2150        let mut v = (value << 1) ^ (value >> 31);
2151        while v & !0x7F != 0 {
2152            buf.push(((v & 0x7F) | 0x80) as u8);
2153            v >>= 7;
2154        }
2155        buf.push(v as u8);
2156        buf
2157    }
2158
2159    fn encode_avro_long(value: i64) -> Vec<u8> {
2160        let mut buf = Vec::new();
2161        let mut v = (value << 1) ^ (value >> 63);
2162        while v & !0x7F != 0 {
2163            buf.push(((v & 0x7F) | 0x80) as u8);
2164            v >>= 7;
2165        }
2166        buf.push(v as u8);
2167        buf
2168    }
2169
2170    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2171        let mut buf = encode_avro_long(bytes.len() as i64);
2172        buf.extend_from_slice(bytes);
2173        buf
2174    }
2175
2176    fn avro_from_codec(codec: Codec) -> AvroDataType {
2177        AvroDataType::new(codec, Default::default(), None)
2178    }
2179
2180    fn resolved_root_datatype(
2181        writer: Schema<'static>,
2182        reader: Schema<'static>,
2183        use_utf8view: bool,
2184        strict_mode: bool,
2185    ) -> AvroDataType {
2186        // Wrap writer schema in a single-field record
2187        let writer_record = Schema::Complex(ComplexType::Record(Record {
2188            name: "Root",
2189            namespace: None,
2190            doc: None,
2191            aliases: vec![],
2192            fields: vec![Field {
2193                name: "v",
2194                r#type: writer,
2195                default: None,
2196                doc: None,
2197                aliases: vec![],
2198            }],
2199            attributes: Attributes::default(),
2200        }));
2201
2202        // Wrap reader schema in a single-field record
2203        let reader_record = Schema::Complex(ComplexType::Record(Record {
2204            name: "Root",
2205            namespace: None,
2206            doc: None,
2207            aliases: vec![],
2208            fields: vec![Field {
2209                name: "v",
2210                r#type: reader,
2211                default: None,
2212                doc: None,
2213                aliases: vec![],
2214            }],
2215            attributes: Attributes::default(),
2216        }));
2217
2218        // Build resolved record, then extract the inner field's resolved AvroDataType
2219        let field = AvroFieldBuilder::new(&writer_record)
2220            .with_reader_schema(&reader_record)
2221            .with_utf8view(use_utf8view)
2222            .with_strict_mode(strict_mode)
2223            .build()
2224            .expect("schema resolution should succeed");
2225
2226        match field.data_type().codec() {
2227            Codec::Struct(fields) => fields[0].data_type().clone(),
2228            other => panic!("expected wrapper struct, got {other:?}"),
2229        }
2230    }
2231
2232    fn decoder_for_promotion(
2233        writer: PrimitiveType,
2234        reader: PrimitiveType,
2235        use_utf8view: bool,
2236    ) -> Decoder {
2237        let ws = Schema::TypeName(TypeName::Primitive(writer));
2238        let rs = Schema::TypeName(TypeName::Primitive(reader));
2239        let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2240        Decoder::try_new(&dt).unwrap()
2241    }
2242
2243    fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2244        AvroDataType::new(codec, HashMap::new(), nullability)
2245    }
2246
2247    #[cfg(feature = "avro_custom_types")]
2248    fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2249        let mut out = Vec::with_capacity(10);
2250        while x >= 0x80 {
2251            out.push((x as u8) | 0x80);
2252            x >>= 7;
2253        }
2254        out.push(x as u8);
2255        out
2256    }
2257
2258    #[test]
2259    fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2260        let ws = Schema::Union(vec![
2261            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2262            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2263        ]);
2264        let rs = Schema::Union(vec![
2265            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2266            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2267        ]);
2268
2269        let dt = resolved_root_datatype(ws, rs, false, false);
2270        let mut dec = Decoder::try_new(&dt).unwrap();
2271
2272        let mut rec1 = encode_avro_long(0);
2273        rec1.extend(encode_avro_int(7));
2274        let mut cur1 = AvroCursor::new(&rec1);
2275        dec.decode(&mut cur1).unwrap();
2276
2277        let mut rec2 = encode_avro_long(1);
2278        rec2.extend(encode_avro_bytes("abc".as_bytes()));
2279        let mut cur2 = AvroCursor::new(&rec2);
2280        dec.decode(&mut cur2).unwrap();
2281
2282        let arr = dec.flush(None).unwrap();
2283        let ua = arr
2284            .as_any()
2285            .downcast_ref::<UnionArray>()
2286            .expect("dense union output");
2287
2288        assert_eq!(
2289            ua.type_id(0),
2290            1,
2291            "first value must select reader 'long' branch"
2292        );
2293        assert_eq!(ua.value_offset(0), 0);
2294
2295        assert_eq!(
2296            ua.type_id(1),
2297            0,
2298            "second value must select reader 'string' branch"
2299        );
2300        assert_eq!(ua.value_offset(1), 0);
2301
2302        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2303        assert_eq!(long_child.len(), 1);
2304        assert_eq!(long_child.value(0), 7);
2305
2306        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2307        assert_eq!(str_child.len(), 1);
2308        assert_eq!(str_child.value(0), "abc");
2309    }
2310
2311    #[test]
2312    fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2313        let ws = Schema::Union(vec![
2314            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2315            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2316        ]);
2317        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2318
2319        let dt = resolved_root_datatype(ws, rs, false, false);
2320        let mut dec = Decoder::try_new(&dt).unwrap();
2321
2322        let mut data = encode_avro_long(0);
2323        data.extend(encode_avro_int(5));
2324        let mut cur = AvroCursor::new(&data);
2325        dec.decode(&mut cur).unwrap();
2326
2327        let arr = dec.flush(None).unwrap();
2328        let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2329        assert_eq!(out.len(), 1);
2330        assert_eq!(out.value(0), 5);
2331    }
2332
2333    #[test]
2334    fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2335        let ws = Schema::Union(vec![
2336            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2337            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2338        ]);
2339        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2340
2341        let dt = resolved_root_datatype(ws, rs, false, false);
2342        let mut dec = Decoder::try_new(&dt).unwrap();
2343
2344        let mut data = encode_avro_long(1);
2345        data.extend(encode_avro_bytes("z".as_bytes()));
2346        let mut cur = AvroCursor::new(&data);
2347        let res = dec.decode(&mut cur);
2348        assert!(
2349            res.is_err(),
2350            "expected error when writer union branch does not resolve to reader non-union type"
2351        );
2352    }
2353
2354    #[test]
2355    fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2356        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2357        let rs = Schema::Union(vec![
2358            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2359            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2360        ]);
2361
2362        let dt = resolved_root_datatype(ws, rs, false, false);
2363        let mut dec = Decoder::try_new(&dt).unwrap();
2364
2365        let data = encode_avro_int(6);
2366        let mut cur = AvroCursor::new(&data);
2367        dec.decode(&mut cur).unwrap();
2368
2369        let arr = dec.flush(None).unwrap();
2370        let ua = arr
2371            .as_any()
2372            .downcast_ref::<UnionArray>()
2373            .expect("dense union output");
2374        assert_eq!(ua.len(), 1);
2375        assert_eq!(
2376            ua.type_id(0),
2377            1,
2378            "must resolve to reader 'long' branch (type_id 1)"
2379        );
2380        assert_eq!(ua.value_offset(0), 0);
2381
2382        let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2383        assert_eq!(long_child.len(), 1);
2384        assert_eq!(long_child.value(0), 6);
2385
2386        let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2387        assert_eq!(str_child.len(), 0, "string branch must be empty");
2388    }
2389
2390    #[test]
2391    fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2392        let ws = Schema::Union(vec![
2393            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2394            Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2395        ]);
2396        let rs = Schema::Union(vec![
2397            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2398            Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2399        ]);
2400
2401        let dt = resolved_root_datatype(ws, rs, false, false);
2402        let mut dec = Decoder::try_new(&dt).unwrap();
2403
2404        let mut data = encode_avro_long(1);
2405        data.push(1);
2406        let mut cur = AvroCursor::new(&data);
2407        let res = dec.decode(&mut cur);
2408        assert!(
2409            res.is_err(),
2410            "expected error for unmapped writer 'boolean' branch"
2411        );
2412    }
2413
2414    #[test]
2415    fn test_schema_resolution_promotion_int_to_long() {
2416        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2417        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2418        for v in [0, 1, -2, 123456] {
2419            let data = encode_avro_int(v);
2420            let mut cur = AvroCursor::new(&data);
2421            dec.decode(&mut cur).unwrap();
2422        }
2423        let arr = dec.flush(None).unwrap();
2424        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2425        assert_eq!(a.value(0), 0);
2426        assert_eq!(a.value(1), 1);
2427        assert_eq!(a.value(2), -2);
2428        assert_eq!(a.value(3), 123456);
2429    }
2430
2431    #[test]
2432    fn test_schema_resolution_promotion_int_to_float() {
2433        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2434        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2435        for v in [0, 42, -7] {
2436            let data = encode_avro_int(v);
2437            let mut cur = AvroCursor::new(&data);
2438            dec.decode(&mut cur).unwrap();
2439        }
2440        let arr = dec.flush(None).unwrap();
2441        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2442        assert_eq!(a.value(0), 0.0);
2443        assert_eq!(a.value(1), 42.0);
2444        assert_eq!(a.value(2), -7.0);
2445    }
2446
2447    #[test]
2448    fn test_schema_resolution_promotion_int_to_double() {
2449        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2450        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2451        for v in [1, -1, 10_000] {
2452            let data = encode_avro_int(v);
2453            let mut cur = AvroCursor::new(&data);
2454            dec.decode(&mut cur).unwrap();
2455        }
2456        let arr = dec.flush(None).unwrap();
2457        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2458        assert_eq!(a.value(0), 1.0);
2459        assert_eq!(a.value(1), -1.0);
2460        assert_eq!(a.value(2), 10_000.0);
2461    }
2462
2463    #[test]
2464    fn test_schema_resolution_promotion_long_to_float() {
2465        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2466        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2467        for v in [0_i64, 1_000_000_i64, -123_i64] {
2468            let data = encode_avro_long(v);
2469            let mut cur = AvroCursor::new(&data);
2470            dec.decode(&mut cur).unwrap();
2471        }
2472        let arr = dec.flush(None).unwrap();
2473        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2474        assert_eq!(a.value(0), 0.0);
2475        assert_eq!(a.value(1), 1_000_000.0);
2476        assert_eq!(a.value(2), -123.0);
2477    }
2478
2479    #[test]
2480    fn test_schema_resolution_promotion_long_to_double() {
2481        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2482        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2483        for v in [2_i64, -2_i64, 9_223_372_i64] {
2484            let data = encode_avro_long(v);
2485            let mut cur = AvroCursor::new(&data);
2486            dec.decode(&mut cur).unwrap();
2487        }
2488        let arr = dec.flush(None).unwrap();
2489        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2490        assert_eq!(a.value(0), 2.0);
2491        assert_eq!(a.value(1), -2.0);
2492        assert_eq!(a.value(2), 9_223_372.0);
2493    }
2494
2495    #[test]
2496    fn test_schema_resolution_promotion_float_to_double() {
2497        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2498        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2499        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2500            let data = v.to_le_bytes().to_vec();
2501            let mut cur = AvroCursor::new(&data);
2502            dec.decode(&mut cur).unwrap();
2503        }
2504        let arr = dec.flush(None).unwrap();
2505        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2506        assert_eq!(a.value(0), 0.5_f64);
2507        assert_eq!(a.value(1), -3.25_f64);
2508        assert_eq!(a.value(2), 1.0e6_f64);
2509    }
2510
2511    #[test]
2512    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2513        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2514        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2515        for s in ["hello", "world", "héllo"] {
2516            let data = encode_avro_bytes(s.as_bytes());
2517            let mut cur = AvroCursor::new(&data);
2518            dec.decode(&mut cur).unwrap();
2519        }
2520        let arr = dec.flush(None).unwrap();
2521        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2522        assert_eq!(a.value(0), "hello");
2523        assert_eq!(a.value(1), "world");
2524        assert_eq!(a.value(2), "héllo");
2525    }
2526
2527    #[test]
2528    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2529        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2530        assert!(matches!(dec, Decoder::BytesToString(_, _)));
2531        let data = encode_avro_bytes("abc".as_bytes());
2532        let mut cur = AvroCursor::new(&data);
2533        dec.decode(&mut cur).unwrap();
2534        let arr = dec.flush(None).unwrap();
2535        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2536        assert_eq!(a.value(0), "abc");
2537    }
2538
2539    #[test]
2540    fn test_schema_resolution_promotion_string_to_bytes() {
2541        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2542        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2543        for s in ["", "abc", "data"] {
2544            let data = encode_avro_bytes(s.as_bytes());
2545            let mut cur = AvroCursor::new(&data);
2546            dec.decode(&mut cur).unwrap();
2547        }
2548        let arr = dec.flush(None).unwrap();
2549        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2550        assert_eq!(a.value(0), b"");
2551        assert_eq!(a.value(1), b"abc");
2552        assert_eq!(a.value(2), "data".as_bytes());
2553    }
2554
2555    #[test]
2556    fn test_schema_resolution_no_promotion_passthrough_int() {
2557        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2558        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2559        // Wrap both in a synthetic single-field record and resolve with AvroFieldBuilder
2560        let writer_record = Schema::Complex(ComplexType::Record(Record {
2561            name: "Root",
2562            namespace: None,
2563            doc: None,
2564            aliases: vec![],
2565            fields: vec![Field {
2566                name: "v",
2567                r#type: ws,
2568                default: None,
2569                doc: None,
2570                aliases: vec![],
2571            }],
2572            attributes: Attributes::default(),
2573        }));
2574        let reader_record = Schema::Complex(ComplexType::Record(Record {
2575            name: "Root",
2576            namespace: None,
2577            doc: None,
2578            aliases: vec![],
2579            fields: vec![Field {
2580                name: "v",
2581                r#type: rs,
2582                default: None,
2583                doc: None,
2584                aliases: vec![],
2585            }],
2586            attributes: Attributes::default(),
2587        }));
2588        let field = AvroFieldBuilder::new(&writer_record)
2589            .with_reader_schema(&reader_record)
2590            .with_utf8view(false)
2591            .with_strict_mode(false)
2592            .build()
2593            .unwrap();
2594        // Extract the resolved inner field's AvroDataType
2595        let dt = match field.data_type().codec() {
2596            Codec::Struct(fields) => fields[0].data_type().clone(),
2597            other => panic!("expected wrapper struct, got {other:?}"),
2598        };
2599        let mut dec = Decoder::try_new(&dt).unwrap();
2600        assert!(matches!(dec, Decoder::Int32(_)));
2601        for v in [7, -9] {
2602            let data = encode_avro_int(v);
2603            let mut cur = AvroCursor::new(&data);
2604            dec.decode(&mut cur).unwrap();
2605        }
2606        let arr = dec.flush(None).unwrap();
2607        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2608        assert_eq!(a.value(0), 7);
2609        assert_eq!(a.value(1), -9);
2610    }
2611
2612    #[test]
2613    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
2614        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2615        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
2616        let writer_record = Schema::Complex(ComplexType::Record(Record {
2617            name: "Root",
2618            namespace: None,
2619            doc: None,
2620            aliases: vec![],
2621            fields: vec![Field {
2622                name: "v",
2623                r#type: ws,
2624                default: None,
2625                doc: None,
2626                aliases: vec![],
2627            }],
2628            attributes: Attributes::default(),
2629        }));
2630        let reader_record = Schema::Complex(ComplexType::Record(Record {
2631            name: "Root",
2632            namespace: None,
2633            doc: None,
2634            aliases: vec![],
2635            fields: vec![Field {
2636                name: "v",
2637                r#type: rs,
2638                default: None,
2639                doc: None,
2640                aliases: vec![],
2641            }],
2642            attributes: Attributes::default(),
2643        }));
2644        let res = AvroFieldBuilder::new(&writer_record)
2645            .with_reader_schema(&reader_record)
2646            .with_utf8view(false)
2647            .with_strict_mode(false)
2648            .build();
2649        assert!(res.is_err(), "expected error for illegal promotion");
2650    }
2651
2652    #[test]
2653    fn test_map_decoding_one_entry() {
2654        let value_type = avro_from_codec(Codec::Utf8);
2655        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2656        let mut decoder = Decoder::try_new(&map_type).unwrap();
2657        // Encode a single map with one entry: {"hello": "world"}
2658        let mut data = Vec::new();
2659        data.extend_from_slice(&encode_avro_long(1));
2660        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
2661        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
2662        data.extend_from_slice(&encode_avro_long(0));
2663        let mut cursor = AvroCursor::new(&data);
2664        decoder.decode(&mut cursor).unwrap();
2665        let array = decoder.flush(None).unwrap();
2666        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2667        assert_eq!(map_arr.len(), 1); // one map
2668        assert_eq!(map_arr.value_length(0), 1);
2669        let entries = map_arr.value(0);
2670        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
2671        assert_eq!(struct_entries.len(), 1);
2672        let key_arr = struct_entries
2673            .column_by_name("key")
2674            .unwrap()
2675            .as_any()
2676            .downcast_ref::<StringArray>()
2677            .unwrap();
2678        let val_arr = struct_entries
2679            .column_by_name("value")
2680            .unwrap()
2681            .as_any()
2682            .downcast_ref::<StringArray>()
2683            .unwrap();
2684        assert_eq!(key_arr.value(0), "hello");
2685        assert_eq!(val_arr.value(0), "world");
2686    }
2687
2688    #[test]
2689    fn test_map_decoding_empty() {
2690        let value_type = avro_from_codec(Codec::Utf8);
2691        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2692        let mut decoder = Decoder::try_new(&map_type).unwrap();
2693        let data = encode_avro_long(0);
2694        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2695        let array = decoder.flush(None).unwrap();
2696        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2697        assert_eq!(map_arr.len(), 1);
2698        assert_eq!(map_arr.value_length(0), 0);
2699    }
2700
2701    #[test]
2702    fn test_fixed_decoding() {
2703        let avro_type = avro_from_codec(Codec::Fixed(3));
2704        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2705
2706        let data1 = [1u8, 2, 3];
2707        let mut cursor1 = AvroCursor::new(&data1);
2708        decoder
2709            .decode(&mut cursor1)
2710            .expect("Failed to decode data1");
2711        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
2712        let data2 = [4u8, 5, 6];
2713        let mut cursor2 = AvroCursor::new(&data2);
2714        decoder
2715            .decode(&mut cursor2)
2716            .expect("Failed to decode data2");
2717        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
2718        let array = decoder.flush(None).expect("Failed to flush decoder");
2719        assert_eq!(array.len(), 2, "Array should contain two items");
2720        let fixed_size_binary_array = array
2721            .as_any()
2722            .downcast_ref::<FixedSizeBinaryArray>()
2723            .expect("Failed to downcast to FixedSizeBinaryArray");
2724        assert_eq!(
2725            fixed_size_binary_array.value_length(),
2726            3,
2727            "Fixed size of binary values should be 3"
2728        );
2729        assert_eq!(
2730            fixed_size_binary_array.value(0),
2731            &[1, 2, 3],
2732            "First item mismatch"
2733        );
2734        assert_eq!(
2735            fixed_size_binary_array.value(1),
2736            &[4, 5, 6],
2737            "Second item mismatch"
2738        );
2739    }
2740
2741    #[test]
2742    fn test_fixed_decoding_empty() {
2743        let avro_type = avro_from_codec(Codec::Fixed(5));
2744        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2745
2746        let array = decoder
2747            .flush(None)
2748            .expect("Failed to flush decoder for empty input");
2749
2750        assert_eq!(array.len(), 0, "Array should be empty");
2751        let fixed_size_binary_array = array
2752            .as_any()
2753            .downcast_ref::<FixedSizeBinaryArray>()
2754            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
2755
2756        assert_eq!(
2757            fixed_size_binary_array.value_length(),
2758            5,
2759            "Fixed size of binary values should be 5 as per type"
2760        );
2761    }
2762
2763    #[test]
2764    fn test_uuid_decoding() {
2765        let avro_type = avro_from_codec(Codec::Uuid);
2766        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2767        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
2768        let data = encode_avro_bytes(uuid_str.as_bytes());
2769        let mut cursor = AvroCursor::new(&data);
2770        decoder.decode(&mut cursor).expect("Failed to decode data");
2771        assert_eq!(
2772            cursor.position(),
2773            data.len(),
2774            "Cursor should advance by varint size + data size"
2775        );
2776        let array = decoder.flush(None).expect("Failed to flush decoder");
2777        let fixed_size_binary_array = array
2778            .as_any()
2779            .downcast_ref::<FixedSizeBinaryArray>()
2780            .expect("Array should be a FixedSizeBinaryArray");
2781        assert_eq!(fixed_size_binary_array.len(), 1);
2782        assert_eq!(fixed_size_binary_array.value_length(), 16);
2783        let expected_bytes = [
2784            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
2785            0x6b, 0xf6,
2786        ];
2787        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
2788    }
2789
2790    #[test]
2791    fn test_array_decoding() {
2792        let item_dt = avro_from_codec(Codec::Int32);
2793        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2794        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2795        let mut row1 = Vec::new();
2796        row1.extend_from_slice(&encode_avro_long(2));
2797        row1.extend_from_slice(&encode_avro_int(10));
2798        row1.extend_from_slice(&encode_avro_int(20));
2799        row1.extend_from_slice(&encode_avro_long(0));
2800        let row2 = encode_avro_long(0);
2801        let mut cursor = AvroCursor::new(&row1);
2802        decoder.decode(&mut cursor).unwrap();
2803        let mut cursor2 = AvroCursor::new(&row2);
2804        decoder.decode(&mut cursor2).unwrap();
2805        let array = decoder.flush(None).unwrap();
2806        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2807        assert_eq!(list_arr.len(), 2);
2808        let offsets = list_arr.value_offsets();
2809        assert_eq!(offsets, &[0, 2, 2]);
2810        let values = list_arr.values();
2811        let int_arr = values.as_primitive::<Int32Type>();
2812        assert_eq!(int_arr.len(), 2);
2813        assert_eq!(int_arr.value(0), 10);
2814        assert_eq!(int_arr.value(1), 20);
2815    }
2816
2817    #[test]
2818    fn test_array_decoding_with_negative_block_count() {
2819        let item_dt = avro_from_codec(Codec::Int32);
2820        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2821        let mut decoder = Decoder::try_new(&list_dt).unwrap();
2822        let mut data = encode_avro_long(-3);
2823        data.extend_from_slice(&encode_avro_long(12));
2824        data.extend_from_slice(&encode_avro_int(1));
2825        data.extend_from_slice(&encode_avro_int(2));
2826        data.extend_from_slice(&encode_avro_int(3));
2827        data.extend_from_slice(&encode_avro_long(0));
2828        let mut cursor = AvroCursor::new(&data);
2829        decoder.decode(&mut cursor).unwrap();
2830        let array = decoder.flush(None).unwrap();
2831        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2832        assert_eq!(list_arr.len(), 1);
2833        assert_eq!(list_arr.value_length(0), 3);
2834        let values = list_arr.values().as_primitive::<Int32Type>();
2835        assert_eq!(values.len(), 3);
2836        assert_eq!(values.value(0), 1);
2837        assert_eq!(values.value(1), 2);
2838        assert_eq!(values.value(2), 3);
2839    }
2840
2841    #[test]
2842    fn test_nested_array_decoding() {
2843        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
2844        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
2845        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
2846        let mut buf = Vec::new();
2847        buf.extend(encode_avro_long(1));
2848        buf.extend(encode_avro_long(2));
2849        buf.extend(encode_avro_int(5));
2850        buf.extend(encode_avro_int(6));
2851        buf.extend(encode_avro_long(0));
2852        buf.extend(encode_avro_long(0));
2853        let mut cursor = AvroCursor::new(&buf);
2854        decoder.decode(&mut cursor).unwrap();
2855        let arr = decoder.flush(None).unwrap();
2856        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
2857        assert_eq!(outer.len(), 1);
2858        assert_eq!(outer.value_length(0), 1);
2859        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
2860        assert_eq!(inner.len(), 1);
2861        assert_eq!(inner.value_length(0), 2);
2862        let values = inner
2863            .values()
2864            .as_any()
2865            .downcast_ref::<Int32Array>()
2866            .unwrap();
2867        assert_eq!(values.values(), &[5, 6]);
2868    }
2869
2870    #[test]
2871    fn test_array_decoding_empty_array() {
2872        let value_type = avro_from_codec(Codec::Utf8);
2873        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
2874        let mut decoder = Decoder::try_new(&map_type).unwrap();
2875        let data = encode_avro_long(0);
2876        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2877        let array = decoder.flush(None).unwrap();
2878        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2879        assert_eq!(list_arr.len(), 1);
2880        assert_eq!(list_arr.value_length(0), 0);
2881    }
2882
2883    #[test]
2884    fn test_decimal_decoding_fixed256() {
2885        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
2886        let mut decoder = Decoder::try_new(&dt).unwrap();
2887        let row1 = [
2888            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2889            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2890            0x00, 0x00, 0x30, 0x39,
2891        ];
2892        let row2 = [
2893            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2894            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2895            0xFF, 0xFF, 0xFF, 0x85,
2896        ];
2897        let mut data = Vec::new();
2898        data.extend_from_slice(&row1);
2899        data.extend_from_slice(&row2);
2900        let mut cursor = AvroCursor::new(&data);
2901        decoder.decode(&mut cursor).unwrap();
2902        decoder.decode(&mut cursor).unwrap();
2903        let arr = decoder.flush(None).unwrap();
2904        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
2905        assert_eq!(dec.len(), 2);
2906        assert_eq!(dec.value_as_string(0), "123.45");
2907        assert_eq!(dec.value_as_string(1), "-1.23");
2908    }
2909
2910    #[test]
2911    fn test_decimal_decoding_fixed128() {
2912        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
2913        let mut decoder = Decoder::try_new(&dt).unwrap();
2914        let row1 = [
2915            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2916            0x30, 0x39,
2917        ];
2918        let row2 = [
2919            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2920            0xFF, 0x85,
2921        ];
2922        let mut data = Vec::new();
2923        data.extend_from_slice(&row1);
2924        data.extend_from_slice(&row2);
2925        let mut cursor = AvroCursor::new(&data);
2926        decoder.decode(&mut cursor).unwrap();
2927        decoder.decode(&mut cursor).unwrap();
2928        let arr = decoder.flush(None).unwrap();
2929        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2930        assert_eq!(dec.len(), 2);
2931        assert_eq!(dec.value_as_string(0), "123.45");
2932        assert_eq!(dec.value_as_string(1), "-1.23");
2933    }
2934
2935    #[test]
2936    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
2937        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
2938        let mut decoder = Decoder::try_new(&dt).unwrap();
2939        let row1 = [
2940            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2941            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2942            0x00, 0x00, 0x30, 0x39,
2943        ];
2944        let row2 = [
2945            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2946            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2947            0xFF, 0xFF, 0xFF, 0x85,
2948        ];
2949        let mut data = Vec::new();
2950        data.extend_from_slice(&row1);
2951        data.extend_from_slice(&row2);
2952        let mut cursor = AvroCursor::new(&data);
2953        decoder.decode(&mut cursor).unwrap();
2954        decoder.decode(&mut cursor).unwrap();
2955        let arr = decoder.flush(None).unwrap();
2956        #[cfg(feature = "small_decimals")]
2957        {
2958            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2959            assert_eq!(dec.len(), 2);
2960            assert_eq!(dec.value_as_string(0), "123.45");
2961            assert_eq!(dec.value_as_string(1), "-1.23");
2962        }
2963        #[cfg(not(feature = "small_decimals"))]
2964        {
2965            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2966            assert_eq!(dec.len(), 2);
2967            assert_eq!(dec.value_as_string(0), "123.45");
2968            assert_eq!(dec.value_as_string(1), "-1.23");
2969        }
2970    }
2971
2972    #[test]
2973    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
2974        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
2975        let mut decoder = Decoder::try_new(&dt).unwrap();
2976        let row1 = [
2977            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2978            0x30, 0x39,
2979        ];
2980        let row2 = [
2981            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2982            0xFF, 0x85,
2983        ];
2984        let mut data = Vec::new();
2985        data.extend_from_slice(&row1);
2986        data.extend_from_slice(&row2);
2987        let mut cursor = AvroCursor::new(&data);
2988        decoder.decode(&mut cursor).unwrap();
2989        decoder.decode(&mut cursor).unwrap();
2990
2991        let arr = decoder.flush(None).unwrap();
2992        #[cfg(feature = "small_decimals")]
2993        {
2994            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2995            assert_eq!(dec.len(), 2);
2996            assert_eq!(dec.value_as_string(0), "123.45");
2997            assert_eq!(dec.value_as_string(1), "-1.23");
2998        }
2999        #[cfg(not(feature = "small_decimals"))]
3000        {
3001            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3002            assert_eq!(dec.len(), 2);
3003            assert_eq!(dec.value_as_string(0), "123.45");
3004            assert_eq!(dec.value_as_string(1), "-1.23");
3005        }
3006    }
3007
3008    #[test]
3009    fn test_decimal_decoding_bytes_with_nulls() {
3010        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3011        let inner = Decoder::try_new(&dt).unwrap();
3012        let mut decoder = Decoder::Nullable(
3013            Nullability::NullSecond,
3014            NullBufferBuilder::new(DEFAULT_CAPACITY),
3015            Box::new(inner),
3016            NullablePlan::ReadTag,
3017        );
3018        let mut data = Vec::new();
3019        data.extend_from_slice(&encode_avro_int(0));
3020        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3021        data.extend_from_slice(&encode_avro_int(1));
3022        data.extend_from_slice(&encode_avro_int(0));
3023        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3024        let mut cursor = AvroCursor::new(&data);
3025        decoder.decode(&mut cursor).unwrap();
3026        decoder.decode(&mut cursor).unwrap();
3027        decoder.decode(&mut cursor).unwrap();
3028        let arr = decoder.flush(None).unwrap();
3029        #[cfg(feature = "small_decimals")]
3030        {
3031            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3032            assert_eq!(dec_arr.len(), 3);
3033            assert!(dec_arr.is_valid(0));
3034            assert!(!dec_arr.is_valid(1));
3035            assert!(dec_arr.is_valid(2));
3036            assert_eq!(dec_arr.value_as_string(0), "123.4");
3037            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3038        }
3039        #[cfg(not(feature = "small_decimals"))]
3040        {
3041            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3042            assert_eq!(dec_arr.len(), 3);
3043            assert!(dec_arr.is_valid(0));
3044            assert!(!dec_arr.is_valid(1));
3045            assert!(dec_arr.is_valid(2));
3046            assert_eq!(dec_arr.value_as_string(0), "123.4");
3047            assert_eq!(dec_arr.value_as_string(2), "-123.4");
3048        }
3049    }
3050
3051    #[test]
3052    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3053        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3054        let inner = Decoder::try_new(&dt).unwrap();
3055        let mut decoder = Decoder::Nullable(
3056            Nullability::NullSecond,
3057            NullBufferBuilder::new(DEFAULT_CAPACITY),
3058            Box::new(inner),
3059            NullablePlan::ReadTag,
3060        );
3061        let row1 = [
3062            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3063            0xE2, 0x40,
3064        ];
3065        let row3 = [
3066            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3067            0x1D, 0xC0,
3068        ];
3069        let mut data = Vec::new();
3070        data.extend_from_slice(&encode_avro_int(0));
3071        data.extend_from_slice(&row1);
3072        data.extend_from_slice(&encode_avro_int(1));
3073        data.extend_from_slice(&encode_avro_int(0));
3074        data.extend_from_slice(&row3);
3075        let mut cursor = AvroCursor::new(&data);
3076        decoder.decode(&mut cursor).unwrap();
3077        decoder.decode(&mut cursor).unwrap();
3078        decoder.decode(&mut cursor).unwrap();
3079        let arr = decoder.flush(None).unwrap();
3080        #[cfg(feature = "small_decimals")]
3081        {
3082            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3083            assert_eq!(dec_arr.len(), 3);
3084            assert!(dec_arr.is_valid(0));
3085            assert!(!dec_arr.is_valid(1));
3086            assert!(dec_arr.is_valid(2));
3087            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3088            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3089        }
3090        #[cfg(not(feature = "small_decimals"))]
3091        {
3092            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3093            assert_eq!(dec_arr.len(), 3);
3094            assert!(dec_arr.is_valid(0));
3095            assert!(!dec_arr.is_valid(1));
3096            assert!(dec_arr.is_valid(2));
3097            assert_eq!(dec_arr.value_as_string(0), "1234.56");
3098            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3099        }
3100    }
3101
3102    #[test]
3103    fn test_enum_decoding() {
3104        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3105        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3106        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3107        let mut data = Vec::new();
3108        data.extend_from_slice(&encode_avro_int(2));
3109        data.extend_from_slice(&encode_avro_int(0));
3110        data.extend_from_slice(&encode_avro_int(1));
3111        let mut cursor = AvroCursor::new(&data);
3112        decoder.decode(&mut cursor).unwrap();
3113        decoder.decode(&mut cursor).unwrap();
3114        decoder.decode(&mut cursor).unwrap();
3115        let array = decoder.flush(None).unwrap();
3116        let dict_array = array
3117            .as_any()
3118            .downcast_ref::<DictionaryArray<Int32Type>>()
3119            .unwrap();
3120        assert_eq!(dict_array.len(), 3);
3121        let values = dict_array
3122            .values()
3123            .as_any()
3124            .downcast_ref::<StringArray>()
3125            .unwrap();
3126        assert_eq!(values.value(0), "A");
3127        assert_eq!(values.value(1), "B");
3128        assert_eq!(values.value(2), "C");
3129        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3130    }
3131
3132    #[test]
3133    fn test_enum_decoding_with_nulls() {
3134        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3135        let enum_codec = Codec::Enum(symbols.clone());
3136        let avro_type =
3137            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3138        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3139        let mut data = Vec::new();
3140        data.extend_from_slice(&encode_avro_long(1));
3141        data.extend_from_slice(&encode_avro_int(1));
3142        data.extend_from_slice(&encode_avro_long(0));
3143        data.extend_from_slice(&encode_avro_long(1));
3144        data.extend_from_slice(&encode_avro_int(0));
3145        let mut cursor = AvroCursor::new(&data);
3146        decoder.decode(&mut cursor).unwrap();
3147        decoder.decode(&mut cursor).unwrap();
3148        decoder.decode(&mut cursor).unwrap();
3149        let array = decoder.flush(None).unwrap();
3150        let dict_array = array
3151            .as_any()
3152            .downcast_ref::<DictionaryArray<Int32Type>>()
3153            .unwrap();
3154        assert_eq!(dict_array.len(), 3);
3155        assert!(dict_array.is_valid(0));
3156        assert!(dict_array.is_null(1));
3157        assert!(dict_array.is_valid(2));
3158        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3159        assert_eq!(dict_array.keys(), &expected_keys);
3160        let values = dict_array
3161            .values()
3162            .as_any()
3163            .downcast_ref::<StringArray>()
3164            .unwrap();
3165        assert_eq!(values.value(0), "X");
3166        assert_eq!(values.value(1), "Y");
3167    }
3168
3169    #[test]
3170    fn test_duration_decoding_with_nulls() {
3171        let duration_codec = Codec::Interval;
3172        let avro_type = AvroDataType::new(
3173            duration_codec,
3174            Default::default(),
3175            Some(Nullability::NullFirst),
3176        );
3177        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3178        let mut data = Vec::new();
3179        // First value: 1 month, 2 days, 3 millis
3180        data.extend_from_slice(&encode_avro_long(1)); // not null
3181        let mut duration1 = Vec::new();
3182        duration1.extend_from_slice(&1u32.to_le_bytes());
3183        duration1.extend_from_slice(&2u32.to_le_bytes());
3184        duration1.extend_from_slice(&3u32.to_le_bytes());
3185        data.extend_from_slice(&duration1);
3186        // Second value: null
3187        data.extend_from_slice(&encode_avro_long(0)); // null
3188        data.extend_from_slice(&encode_avro_long(1)); // not null
3189        let mut duration2 = Vec::new();
3190        duration2.extend_from_slice(&4u32.to_le_bytes());
3191        duration2.extend_from_slice(&5u32.to_le_bytes());
3192        duration2.extend_from_slice(&6u32.to_le_bytes());
3193        data.extend_from_slice(&duration2);
3194        let mut cursor = AvroCursor::new(&data);
3195        decoder.decode(&mut cursor).unwrap();
3196        decoder.decode(&mut cursor).unwrap();
3197        decoder.decode(&mut cursor).unwrap();
3198        let array = decoder.flush(None).unwrap();
3199        let interval_array = array
3200            .as_any()
3201            .downcast_ref::<IntervalMonthDayNanoArray>()
3202            .unwrap();
3203        assert_eq!(interval_array.len(), 3);
3204        assert!(interval_array.is_valid(0));
3205        assert!(interval_array.is_null(1));
3206        assert!(interval_array.is_valid(2));
3207        let expected = IntervalMonthDayNanoArray::from(vec![
3208            Some(IntervalMonthDayNano {
3209                months: 1,
3210                days: 2,
3211                nanoseconds: 3_000_000,
3212            }),
3213            None,
3214            Some(IntervalMonthDayNano {
3215                months: 4,
3216                days: 5,
3217                nanoseconds: 6_000_000,
3218            }),
3219        ]);
3220        assert_eq!(interval_array, &expected);
3221    }
3222
3223    #[test]
3224    fn test_duration_decoding_empty() {
3225        let duration_codec = Codec::Interval;
3226        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3227        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3228        let array = decoder.flush(None).unwrap();
3229        assert_eq!(array.len(), 0);
3230    }
3231
3232    #[test]
3233    #[cfg(feature = "avro_custom_types")]
3234    fn test_duration_seconds_decoding() {
3235        let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3236        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3237        let mut data = Vec::new();
3238        // Three values: 0, -1, 2
3239        data.extend_from_slice(&encode_avro_long(0));
3240        data.extend_from_slice(&encode_avro_long(-1));
3241        data.extend_from_slice(&encode_avro_long(2));
3242        let mut cursor = AvroCursor::new(&data);
3243        decoder.decode(&mut cursor).unwrap();
3244        decoder.decode(&mut cursor).unwrap();
3245        decoder.decode(&mut cursor).unwrap();
3246        let array = decoder.flush(None).unwrap();
3247        let dur = array
3248            .as_any()
3249            .downcast_ref::<DurationSecondArray>()
3250            .unwrap();
3251        assert_eq!(dur.values(), &[0, -1, 2]);
3252    }
3253
3254    #[test]
3255    #[cfg(feature = "avro_custom_types")]
3256    fn test_duration_milliseconds_decoding() {
3257        let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3258        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3259        let mut data = Vec::new();
3260        for v in [1i64, 0, -2] {
3261            data.extend_from_slice(&encode_avro_long(v));
3262        }
3263        let mut cursor = AvroCursor::new(&data);
3264        for _ in 0..3 {
3265            decoder.decode(&mut cursor).unwrap();
3266        }
3267        let array = decoder.flush(None).unwrap();
3268        let dur = array
3269            .as_any()
3270            .downcast_ref::<DurationMillisecondArray>()
3271            .unwrap();
3272        assert_eq!(dur.values(), &[1, 0, -2]);
3273    }
3274
3275    #[test]
3276    #[cfg(feature = "avro_custom_types")]
3277    fn test_duration_microseconds_decoding() {
3278        let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3279        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3280        let mut data = Vec::new();
3281        for v in [5i64, -6, 7] {
3282            data.extend_from_slice(&encode_avro_long(v));
3283        }
3284        let mut cursor = AvroCursor::new(&data);
3285        for _ in 0..3 {
3286            decoder.decode(&mut cursor).unwrap();
3287        }
3288        let array = decoder.flush(None).unwrap();
3289        let dur = array
3290            .as_any()
3291            .downcast_ref::<DurationMicrosecondArray>()
3292            .unwrap();
3293        assert_eq!(dur.values(), &[5, -6, 7]);
3294    }
3295
3296    #[test]
3297    #[cfg(feature = "avro_custom_types")]
3298    fn test_duration_nanoseconds_decoding() {
3299        let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3300        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3301        let mut data = Vec::new();
3302        for v in [8i64, 9, -10] {
3303            data.extend_from_slice(&encode_avro_long(v));
3304        }
3305        let mut cursor = AvroCursor::new(&data);
3306        for _ in 0..3 {
3307            decoder.decode(&mut cursor).unwrap();
3308        }
3309        let array = decoder.flush(None).unwrap();
3310        let dur = array
3311            .as_any()
3312            .downcast_ref::<DurationNanosecondArray>()
3313            .unwrap();
3314        assert_eq!(dur.values(), &[8, 9, -10]);
3315    }
3316
3317    #[test]
3318    fn test_nullable_decode_error_bitmap_corruption() {
3319        // Nullable Int32 with ['T','null'] encoding (NullSecond)
3320        let avro_type = AvroDataType::new(
3321            Codec::Int32,
3322            Default::default(),
3323            Some(Nullability::NullSecond),
3324        );
3325        let mut decoder = Decoder::try_new(&avro_type).unwrap();
3326
3327        // Row 1: union branch 1 (null)
3328        let mut row1 = Vec::new();
3329        row1.extend_from_slice(&encode_avro_int(1));
3330
3331        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
3332        let mut row2 = Vec::new();
3333        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
3334
3335        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
3336        let mut row3 = Vec::new();
3337        row3.extend_from_slice(&encode_avro_int(0)); // branch
3338        row3.extend_from_slice(&encode_avro_int(42)); // actual value
3339
3340        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3341        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
3342        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3343
3344        let array = decoder.flush(None).unwrap();
3345
3346        // Should contain 2 elements: row1 (null) and row3 (42)
3347        assert_eq!(array.len(), 2);
3348        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3349        assert!(int_array.is_null(0)); // row1 is null
3350        assert_eq!(int_array.value(1), 42); // row3 value is 42
3351    }
3352
3353    #[test]
3354    fn test_enum_mapping_reordered_symbols() {
3355        let reader_symbols: Arc<[String]> =
3356            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3357        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3358        let default_index: i32 = -1;
3359        let mut dec = Decoder::Enum(
3360            Vec::with_capacity(DEFAULT_CAPACITY),
3361            reader_symbols.clone(),
3362            Some(EnumResolution {
3363                mapping,
3364                default_index,
3365            }),
3366        );
3367        let mut data = Vec::new();
3368        data.extend_from_slice(&encode_avro_int(0));
3369        data.extend_from_slice(&encode_avro_int(1));
3370        data.extend_from_slice(&encode_avro_int(2));
3371        let mut cur = AvroCursor::new(&data);
3372        dec.decode(&mut cur).unwrap();
3373        dec.decode(&mut cur).unwrap();
3374        dec.decode(&mut cur).unwrap();
3375        let arr = dec.flush(None).unwrap();
3376        let dict = arr
3377            .as_any()
3378            .downcast_ref::<DictionaryArray<Int32Type>>()
3379            .unwrap();
3380        let expected_keys = Int32Array::from(vec![2, 0, 1]);
3381        assert_eq!(dict.keys(), &expected_keys);
3382        let values = dict
3383            .values()
3384            .as_any()
3385            .downcast_ref::<StringArray>()
3386            .unwrap();
3387        assert_eq!(values.value(0), "B");
3388        assert_eq!(values.value(1), "C");
3389        assert_eq!(values.value(2), "A");
3390    }
3391
3392    #[test]
3393    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3394        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3395        let default_index: i32 = 1;
3396        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3397        let mut dec = Decoder::Enum(
3398            Vec::with_capacity(DEFAULT_CAPACITY),
3399            reader_symbols.clone(),
3400            Some(EnumResolution {
3401                mapping,
3402                default_index,
3403            }),
3404        );
3405        let mut data = Vec::new();
3406        data.extend_from_slice(&encode_avro_int(0));
3407        data.extend_from_slice(&encode_avro_int(1));
3408        data.extend_from_slice(&encode_avro_int(99));
3409        let mut cur = AvroCursor::new(&data);
3410        dec.decode(&mut cur).unwrap();
3411        dec.decode(&mut cur).unwrap();
3412        dec.decode(&mut cur).unwrap();
3413        let arr = dec.flush(None).unwrap();
3414        let dict = arr
3415            .as_any()
3416            .downcast_ref::<DictionaryArray<Int32Type>>()
3417            .unwrap();
3418        let expected_keys = Int32Array::from(vec![0, 1, 1]);
3419        assert_eq!(dict.keys(), &expected_keys);
3420        let values = dict
3421            .values()
3422            .as_any()
3423            .downcast_ref::<StringArray>()
3424            .unwrap();
3425        assert_eq!(values.value(0), "A");
3426        assert_eq!(values.value(1), "B");
3427    }
3428
3429    #[test]
3430    fn test_enum_mapping_unknown_symbol_without_default_errors() {
3431        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3432        let default_index: i32 = -1; // indicates no default at type-level
3433        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3434        let mut dec = Decoder::Enum(
3435            Vec::with_capacity(DEFAULT_CAPACITY),
3436            reader_symbols,
3437            Some(EnumResolution {
3438                mapping,
3439                default_index,
3440            }),
3441        );
3442        let data = encode_avro_int(0);
3443        let mut cur = AvroCursor::new(&data);
3444        let err = dec
3445            .decode(&mut cur)
3446            .expect_err("expected decode error for unresolved enum without default");
3447        let msg = err.to_string();
3448        assert!(
3449            msg.contains("not resolvable") && msg.contains("no default"),
3450            "unexpected error message: {msg}"
3451        );
3452    }
3453
3454    fn make_record_resolved_decoder(
3455        reader_fields: &[(&str, DataType, bool)],
3456        writer_to_reader: Vec<Option<usize>>,
3457        skip_decoders: Vec<Option<Skipper>>,
3458    ) -> Decoder {
3459        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3460        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3461        for (name, dt, nullable) in reader_fields {
3462            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3463            let enc = match dt {
3464                DataType::Int32 => Decoder::Int32(Vec::new()),
3465                DataType::Int64 => Decoder::Int64(Vec::new()),
3466                DataType::Utf8 => {
3467                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3468                }
3469                other => panic!("Unsupported test reader field type: {other:?}"),
3470            };
3471            encodings.push(enc);
3472        }
3473        let fields: Fields = field_refs.into();
3474        Decoder::Record(
3475            fields,
3476            encodings,
3477            Some(Projector {
3478                writer_to_reader: Arc::from(writer_to_reader),
3479                skip_decoders,
3480                field_defaults: vec![None; reader_fields.len()],
3481                default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3482            }),
3483        )
3484    }
3485
3486    #[test]
3487    fn test_skip_writer_trailing_field_int32() {
3488        let mut dec = make_record_resolved_decoder(
3489            &[("id", arrow_schema::DataType::Int32, false)],
3490            vec![Some(0), None],
3491            vec![None, Some(super::Skipper::Int32)],
3492        );
3493        let mut data = Vec::new();
3494        data.extend_from_slice(&encode_avro_int(7));
3495        data.extend_from_slice(&encode_avro_int(999));
3496        let mut cur = AvroCursor::new(&data);
3497        dec.decode(&mut cur).unwrap();
3498        assert_eq!(cur.position(), data.len());
3499        let arr = dec.flush(None).unwrap();
3500        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
3501        assert_eq!(struct_arr.len(), 1);
3502        let id = struct_arr
3503            .column_by_name("id")
3504            .unwrap()
3505            .as_any()
3506            .downcast_ref::<Int32Array>()
3507            .unwrap();
3508        assert_eq!(id.value(0), 7);
3509    }
3510
3511    #[test]
3512    fn test_skip_writer_middle_field_string() {
3513        let mut dec = make_record_resolved_decoder(
3514            &[
3515                ("id", DataType::Int32, false),
3516                ("score", DataType::Int64, false),
3517            ],
3518            vec![Some(0), None, Some(1)],
3519            vec![None, Some(Skipper::String), None],
3520        );
3521        let mut data = Vec::new();
3522        data.extend_from_slice(&encode_avro_int(42));
3523        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
3524        data.extend_from_slice(&encode_avro_long(1000));
3525        let mut cur = AvroCursor::new(&data);
3526        dec.decode(&mut cur).unwrap();
3527        assert_eq!(cur.position(), data.len());
3528        let arr = dec.flush(None).unwrap();
3529        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3530        let id = s
3531            .column_by_name("id")
3532            .unwrap()
3533            .as_any()
3534            .downcast_ref::<Int32Array>()
3535            .unwrap();
3536        let score = s
3537            .column_by_name("score")
3538            .unwrap()
3539            .as_any()
3540            .downcast_ref::<Int64Array>()
3541            .unwrap();
3542        assert_eq!(id.value(0), 42);
3543        assert_eq!(score.value(0), 1000);
3544    }
3545
3546    #[test]
3547    fn test_skip_writer_array_with_negative_block_count_fast() {
3548        let mut dec = make_record_resolved_decoder(
3549            &[("id", DataType::Int32, false)],
3550            vec![None, Some(0)],
3551            vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
3552        );
3553        let mut array_payload = Vec::new();
3554        array_payload.extend_from_slice(&encode_avro_int(1));
3555        array_payload.extend_from_slice(&encode_avro_int(2));
3556        array_payload.extend_from_slice(&encode_avro_int(3));
3557        let mut data = Vec::new();
3558        data.extend_from_slice(&encode_avro_long(-3));
3559        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
3560        data.extend_from_slice(&array_payload);
3561        data.extend_from_slice(&encode_avro_long(0));
3562        data.extend_from_slice(&encode_avro_int(5));
3563        let mut cur = AvroCursor::new(&data);
3564        dec.decode(&mut cur).unwrap();
3565        assert_eq!(cur.position(), data.len());
3566        let arr = dec.flush(None).unwrap();
3567        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3568        let id = s
3569            .column_by_name("id")
3570            .unwrap()
3571            .as_any()
3572            .downcast_ref::<Int32Array>()
3573            .unwrap();
3574        assert_eq!(id.len(), 1);
3575        assert_eq!(id.value(0), 5);
3576    }
3577
3578    #[test]
3579    fn test_skip_writer_map_with_negative_block_count_fast() {
3580        let mut dec = make_record_resolved_decoder(
3581            &[("id", DataType::Int32, false)],
3582            vec![None, Some(0)],
3583            vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
3584        );
3585        let mut entries = Vec::new();
3586        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
3587        entries.extend_from_slice(&encode_avro_int(10));
3588        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
3589        entries.extend_from_slice(&encode_avro_int(20));
3590        let mut data = Vec::new();
3591        data.extend_from_slice(&encode_avro_long(-2));
3592        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
3593        data.extend_from_slice(&entries);
3594        data.extend_from_slice(&encode_avro_long(0));
3595        data.extend_from_slice(&encode_avro_int(123));
3596        let mut cur = AvroCursor::new(&data);
3597        dec.decode(&mut cur).unwrap();
3598        assert_eq!(cur.position(), data.len());
3599        let arr = dec.flush(None).unwrap();
3600        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3601        let id = s
3602            .column_by_name("id")
3603            .unwrap()
3604            .as_any()
3605            .downcast_ref::<Int32Array>()
3606            .unwrap();
3607        assert_eq!(id.len(), 1);
3608        assert_eq!(id.value(0), 123);
3609    }
3610
3611    #[test]
3612    fn test_skip_writer_nullable_field_union_nullfirst() {
3613        let mut dec = make_record_resolved_decoder(
3614            &[("id", DataType::Int32, false)],
3615            vec![None, Some(0)],
3616            vec![
3617                Some(super::Skipper::Nullable(
3618                    Nullability::NullFirst,
3619                    Box::new(super::Skipper::Int32),
3620                )),
3621                None,
3622            ],
3623        );
3624        let mut row1 = Vec::new();
3625        row1.extend_from_slice(&encode_avro_long(0));
3626        row1.extend_from_slice(&encode_avro_int(5));
3627        let mut row2 = Vec::new();
3628        row2.extend_from_slice(&encode_avro_long(1));
3629        row2.extend_from_slice(&encode_avro_int(123));
3630        row2.extend_from_slice(&encode_avro_int(7));
3631        let mut cur1 = AvroCursor::new(&row1);
3632        let mut cur2 = AvroCursor::new(&row2);
3633        dec.decode(&mut cur1).unwrap();
3634        dec.decode(&mut cur2).unwrap();
3635        assert_eq!(cur1.position(), row1.len());
3636        assert_eq!(cur2.position(), row2.len());
3637        let arr = dec.flush(None).unwrap();
3638        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3639        let id = s
3640            .column_by_name("id")
3641            .unwrap()
3642            .as_any()
3643            .downcast_ref::<Int32Array>()
3644            .unwrap();
3645        assert_eq!(id.len(), 2);
3646        assert_eq!(id.value(0), 5);
3647        assert_eq!(id.value(1), 7);
3648    }
3649
3650    fn make_dense_union_avro(
3651        children: Vec<(Codec, &'_ str, DataType)>,
3652        type_ids: Vec<i8>,
3653    ) -> AvroDataType {
3654        let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
3655        let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
3656        for (codec, name, dt) in children.into_iter() {
3657            avro_children.push(AvroDataType::new(codec, Default::default(), None));
3658            fields.push(arrow_schema::Field::new(name, dt, true));
3659        }
3660        let union_fields = UnionFields::new(type_ids, fields);
3661        let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
3662        AvroDataType::new(union_codec, Default::default(), None)
3663    }
3664
3665    #[test]
3666    fn test_union_dense_two_children_custom_type_ids() {
3667        let union_dt = make_dense_union_avro(
3668            vec![
3669                (Codec::Int32, "i", DataType::Int32),
3670                (Codec::Utf8, "s", DataType::Utf8),
3671            ],
3672            vec![2, 5],
3673        );
3674        let mut dec = Decoder::try_new(&union_dt).unwrap();
3675        let mut r1 = Vec::new();
3676        r1.extend_from_slice(&encode_avro_long(0));
3677        r1.extend_from_slice(&encode_avro_int(7));
3678        let mut r2 = Vec::new();
3679        r2.extend_from_slice(&encode_avro_long(1));
3680        r2.extend_from_slice(&encode_avro_bytes(b"x"));
3681        let mut r3 = Vec::new();
3682        r3.extend_from_slice(&encode_avro_long(0));
3683        r3.extend_from_slice(&encode_avro_int(-1));
3684        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3685        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3686        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3687        let array = dec.flush(None).unwrap();
3688        let ua = array
3689            .as_any()
3690            .downcast_ref::<UnionArray>()
3691            .expect("expected UnionArray");
3692        assert_eq!(ua.len(), 3);
3693        assert_eq!(ua.type_id(0), 2);
3694        assert_eq!(ua.type_id(1), 5);
3695        assert_eq!(ua.type_id(2), 2);
3696        assert_eq!(ua.value_offset(0), 0);
3697        assert_eq!(ua.value_offset(1), 0);
3698        assert_eq!(ua.value_offset(2), 1);
3699        let int_child = ua
3700            .child(2)
3701            .as_any()
3702            .downcast_ref::<Int32Array>()
3703            .expect("int child");
3704        assert_eq!(int_child.len(), 2);
3705        assert_eq!(int_child.value(0), 7);
3706        assert_eq!(int_child.value(1), -1);
3707        let str_child = ua
3708            .child(5)
3709            .as_any()
3710            .downcast_ref::<StringArray>()
3711            .expect("string child");
3712        assert_eq!(str_child.len(), 1);
3713        assert_eq!(str_child.value(0), "x");
3714    }
3715
3716    #[test]
3717    fn test_union_dense_with_null_and_string_children() {
3718        let union_dt = make_dense_union_avro(
3719            vec![
3720                (Codec::Null, "n", DataType::Null),
3721                (Codec::Utf8, "s", DataType::Utf8),
3722            ],
3723            vec![42, 7],
3724        );
3725        let mut dec = Decoder::try_new(&union_dt).unwrap();
3726        let r1 = encode_avro_long(0);
3727        let mut r2 = Vec::new();
3728        r2.extend_from_slice(&encode_avro_long(1));
3729        r2.extend_from_slice(&encode_avro_bytes(b"abc"));
3730        let r3 = encode_avro_long(0);
3731        dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3732        dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3733        dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3734        let array = dec.flush(None).unwrap();
3735        let ua = array
3736            .as_any()
3737            .downcast_ref::<UnionArray>()
3738            .expect("expected UnionArray");
3739        assert_eq!(ua.len(), 3);
3740        assert_eq!(ua.type_id(0), 42);
3741        assert_eq!(ua.type_id(1), 7);
3742        assert_eq!(ua.type_id(2), 42);
3743        assert_eq!(ua.value_offset(0), 0);
3744        assert_eq!(ua.value_offset(1), 0);
3745        assert_eq!(ua.value_offset(2), 1);
3746        let null_child = ua
3747            .child(42)
3748            .as_any()
3749            .downcast_ref::<NullArray>()
3750            .expect("null child");
3751        assert_eq!(null_child.len(), 2);
3752        let str_child = ua
3753            .child(7)
3754            .as_any()
3755            .downcast_ref::<StringArray>()
3756            .expect("string child");
3757        assert_eq!(str_child.len(), 1);
3758        assert_eq!(str_child.value(0), "abc");
3759    }
3760
3761    #[test]
3762    fn test_union_decode_negative_branch_index_errors() {
3763        let union_dt = make_dense_union_avro(
3764            vec![
3765                (Codec::Int32, "i", DataType::Int32),
3766                (Codec::Utf8, "s", DataType::Utf8),
3767            ],
3768            vec![0, 1],
3769        );
3770        let mut dec = Decoder::try_new(&union_dt).unwrap();
3771        let row = encode_avro_long(-1); // decodes back to -1
3772        let err = dec
3773            .decode(&mut AvroCursor::new(&row))
3774            .expect_err("expected error for negative branch index");
3775        let msg = err.to_string();
3776        assert!(
3777            msg.contains("Negative union branch index"),
3778            "unexpected error message: {msg}"
3779        );
3780    }
3781
3782    #[test]
3783    fn test_union_decode_out_of_range_branch_index_errors() {
3784        let union_dt = make_dense_union_avro(
3785            vec![
3786                (Codec::Int32, "i", DataType::Int32),
3787                (Codec::Utf8, "s", DataType::Utf8),
3788            ],
3789            vec![10, 11],
3790        );
3791        let mut dec = Decoder::try_new(&union_dt).unwrap();
3792        let row = encode_avro_long(2);
3793        let err = dec
3794            .decode(&mut AvroCursor::new(&row))
3795            .expect_err("expected error for out-of-range branch index");
3796        let msg = err.to_string();
3797        assert!(
3798            msg.contains("out of range"),
3799            "unexpected error message: {msg}"
3800        );
3801    }
3802
3803    #[test]
3804    fn test_union_sparse_mode_not_supported() {
3805        let children: Vec<AvroDataType> = vec![
3806            AvroDataType::new(Codec::Int32, Default::default(), None),
3807            AvroDataType::new(Codec::Utf8, Default::default(), None),
3808        ];
3809        let uf = UnionFields::new(
3810            vec![1, 3],
3811            vec![
3812                arrow_schema::Field::new("i", DataType::Int32, true),
3813                arrow_schema::Field::new("s", DataType::Utf8, true),
3814            ],
3815        );
3816        let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
3817        let dt = AvroDataType::new(codec, Default::default(), None);
3818        let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
3819        let msg = err.to_string();
3820        assert!(
3821            msg.contains("Sparse Arrow unions are not yet supported"),
3822            "unexpected error message: {msg}"
3823        );
3824    }
3825
3826    fn make_record_decoder_with_projector_defaults(
3827        reader_fields: &[(&str, DataType, bool)],
3828        field_defaults: Vec<Option<AvroLiteral>>,
3829        default_injections: Vec<(usize, AvroLiteral)>,
3830        writer_to_reader_len: usize,
3831    ) -> Decoder {
3832        assert_eq!(
3833            field_defaults.len(),
3834            reader_fields.len(),
3835            "field_defaults must have one entry per reader field"
3836        );
3837        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3838        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3839        for (name, dt, nullable) in reader_fields {
3840            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3841            let enc = match dt {
3842                DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
3843                DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
3844                DataType::Utf8 => Decoder::String(
3845                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3846                    Vec::with_capacity(DEFAULT_CAPACITY),
3847                ),
3848                other => panic!("Unsupported test field type in helper: {other:?}"),
3849            };
3850            encodings.push(enc);
3851        }
3852        let fields: Fields = field_refs.into();
3853        let skip_decoders: Vec<Option<Skipper>> =
3854            (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
3855        let projector = Projector {
3856            writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
3857            skip_decoders,
3858            field_defaults,
3859            default_injections: Arc::from(default_injections),
3860        };
3861        Decoder::Record(fields, encodings, Some(projector))
3862    }
3863
3864    #[test]
3865    fn test_default_append_int32_and_int64_from_int_and_long() {
3866        let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3867        d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
3868        let arr = d_i32.flush(None).unwrap();
3869        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3870        assert_eq!(a.len(), 1);
3871        assert_eq!(a.value(0), 42);
3872        let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
3873        d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
3874        d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
3875        let arr64 = d_i64.flush(None).unwrap();
3876        let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
3877        assert_eq!(a64.len(), 2);
3878        assert_eq!(a64.value(0), 5);
3879        assert_eq!(a64.value(1), 7);
3880    }
3881
3882    #[test]
3883    fn test_default_append_floats_and_doubles() {
3884        let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
3885        d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
3886        let arr32 = d_f32.flush(None).unwrap();
3887        let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
3888        assert_eq!(a.value(0), 1.5);
3889        let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
3890        d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
3891        let arr64 = d_f64.flush(None).unwrap();
3892        let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
3893        assert_eq!(b.value(0), 2.25);
3894    }
3895
3896    #[test]
3897    fn test_default_append_string_and_bytes() {
3898        let mut d_str = Decoder::String(
3899            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3900            Vec::with_capacity(DEFAULT_CAPACITY),
3901        );
3902        d_str
3903            .append_default(&AvroLiteral::String("hi".into()))
3904            .unwrap();
3905        let s_arr = d_str.flush(None).unwrap();
3906        let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
3907        assert_eq!(arr.value(0), "hi");
3908        let mut d_bytes = Decoder::Binary(
3909            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3910            Vec::with_capacity(DEFAULT_CAPACITY),
3911        );
3912        d_bytes
3913            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3914            .unwrap();
3915        let b_arr = d_bytes.flush(None).unwrap();
3916        let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3917        assert_eq!(barr.value(0), &[1, 2, 3]);
3918        let mut d_str_err = Decoder::String(
3919            OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3920            Vec::with_capacity(DEFAULT_CAPACITY),
3921        );
3922        let err = d_str_err
3923            .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
3924            .unwrap_err();
3925        assert!(
3926            err.to_string()
3927                .contains("Default for string must be string"),
3928            "unexpected error: {err:?}"
3929        );
3930    }
3931
3932    #[test]
3933    fn test_default_append_nullable_int32_null_and_value() {
3934        let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3935        let mut dec = Decoder::Nullable(
3936            Nullability::NullFirst,
3937            NullBufferBuilder::new(DEFAULT_CAPACITY),
3938            Box::new(inner),
3939            NullablePlan::ReadTag,
3940        );
3941        dec.append_default(&AvroLiteral::Null).unwrap();
3942        dec.append_default(&AvroLiteral::Int(11)).unwrap();
3943        let arr = dec.flush(None).unwrap();
3944        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3945        assert_eq!(a.len(), 2);
3946        assert!(a.is_null(0));
3947        assert_eq!(a.value(1), 11);
3948    }
3949
3950    #[test]
3951    fn test_default_append_array_of_ints() {
3952        let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3953        let mut d = Decoder::try_new(&list_dt).unwrap();
3954        let items = vec![
3955            AvroLiteral::Int(1),
3956            AvroLiteral::Int(2),
3957            AvroLiteral::Int(3),
3958        ];
3959        d.append_default(&AvroLiteral::Array(items)).unwrap();
3960        let arr = d.flush(None).unwrap();
3961        let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
3962        assert_eq!(list.len(), 1);
3963        assert_eq!(list.value_length(0), 3);
3964        let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
3965        assert_eq!(vals.values(), &[1, 2, 3]);
3966    }
3967
3968    #[test]
3969    fn test_default_append_map_string_to_int() {
3970        let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
3971        let mut d = Decoder::try_new(&map_dt).unwrap();
3972        let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
3973        m.insert("k1".to_string(), AvroLiteral::Int(10));
3974        m.insert("k2".to_string(), AvroLiteral::Int(20));
3975        d.append_default(&AvroLiteral::Map(m)).unwrap();
3976        let arr = d.flush(None).unwrap();
3977        let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
3978        assert_eq!(map.len(), 1);
3979        assert_eq!(map.value_length(0), 2);
3980        let binding = map.value(0);
3981        let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
3982        let k = entries
3983            .column_by_name("key")
3984            .unwrap()
3985            .as_any()
3986            .downcast_ref::<StringArray>()
3987            .unwrap();
3988        let v = entries
3989            .column_by_name("value")
3990            .unwrap()
3991            .as_any()
3992            .downcast_ref::<Int32Array>()
3993            .unwrap();
3994        let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
3995        assert_eq!(keys, ["k1", "k2"].into_iter().collect());
3996        let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
3997        assert_eq!(vals, [10, 20].into_iter().collect());
3998    }
3999
4000    #[test]
4001    fn test_default_append_enum_by_symbol() {
4002        let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4003        let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4004        d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4005        let arr = d.flush(None).unwrap();
4006        let dict = arr
4007            .as_any()
4008            .downcast_ref::<DictionaryArray<Int32Type>>()
4009            .unwrap();
4010        assert_eq!(dict.len(), 1);
4011        let expected = Int32Array::from(vec![1]);
4012        assert_eq!(dict.keys(), &expected);
4013        let values = dict
4014            .values()
4015            .as_any()
4016            .downcast_ref::<StringArray>()
4017            .unwrap();
4018        assert_eq!(values.value(1), "B");
4019    }
4020
4021    #[test]
4022    fn test_default_append_uuid_and_type_error() {
4023        let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4024        let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4025        d.append_default(&AvroLiteral::String(uuid_str.into()))
4026            .unwrap();
4027        let arr_ref = d.flush(None).unwrap();
4028        let arr = arr_ref
4029            .as_any()
4030            .downcast_ref::<FixedSizeBinaryArray>()
4031            .unwrap();
4032        assert_eq!(arr.value_length(), 16);
4033        assert_eq!(arr.len(), 1);
4034        let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4035        let err = d2
4036            .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4037            .unwrap_err();
4038        assert!(
4039            err.to_string().contains("Default for uuid must be string"),
4040            "unexpected error: {err:?}"
4041        );
4042    }
4043
4044    #[test]
4045    fn test_default_append_fixed_and_length_mismatch() {
4046        let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4047        d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4048            .unwrap();
4049        let arr_ref = d.flush(None).unwrap();
4050        let arr = arr_ref
4051            .as_any()
4052            .downcast_ref::<FixedSizeBinaryArray>()
4053            .unwrap();
4054        assert_eq!(arr.value_length(), 4);
4055        assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4056        let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4057        let err = d_err
4058            .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4059            .unwrap_err();
4060        assert!(
4061            err.to_string().contains("Fixed default length"),
4062            "unexpected error: {err:?}"
4063        );
4064    }
4065
4066    #[test]
4067    fn test_default_append_duration_and_length_validation() {
4068        let dt = avro_from_codec(Codec::Interval);
4069        let mut d = Decoder::try_new(&dt).unwrap();
4070        let mut bytes = Vec::with_capacity(12);
4071        bytes.extend_from_slice(&1u32.to_le_bytes());
4072        bytes.extend_from_slice(&2u32.to_le_bytes());
4073        bytes.extend_from_slice(&3u32.to_le_bytes());
4074        d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4075        let arr_ref = d.flush(None).unwrap();
4076        let arr = arr_ref
4077            .as_any()
4078            .downcast_ref::<IntervalMonthDayNanoArray>()
4079            .unwrap();
4080        assert_eq!(arr.len(), 1);
4081        let v = arr.value(0);
4082        assert_eq!(v.months, 1);
4083        assert_eq!(v.days, 2);
4084        assert_eq!(v.nanoseconds, 3_000_000);
4085        let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4086        let err = d_err
4087            .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4088            .unwrap_err();
4089        assert!(
4090            err.to_string()
4091                .contains("Duration default must be exactly 12 bytes"),
4092            "unexpected error: {err:?}"
4093        );
4094    }
4095
4096    #[test]
4097    fn test_default_append_decimal256_from_bytes() {
4098        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4099        let mut d = Decoder::try_new(&dt).unwrap();
4100        let pos: [u8; 32] = [
4101            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4102            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4103            0x00, 0x00, 0x30, 0x39,
4104        ];
4105        d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4106        let neg: [u8; 32] = [
4107            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4108            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4109            0xFF, 0xFF, 0xFF, 0x85,
4110        ];
4111        d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4112        let arr = d.flush(None).unwrap();
4113        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4114        assert_eq!(dec.len(), 2);
4115        assert_eq!(dec.value_as_string(0), "123.45");
4116        assert_eq!(dec.value_as_string(1), "-1.23");
4117    }
4118
4119    #[test]
4120    fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4121        let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4122        let mut rec = make_record_decoder_with_projector_defaults(
4123            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4124            field_defaults,
4125            vec![],
4126            0,
4127        );
4128        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4129        map.insert("a".to_string(), AvroLiteral::Int(7));
4130        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4131        let arr = rec.flush(None).unwrap();
4132        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4133        let a = s
4134            .column_by_name("a")
4135            .unwrap()
4136            .as_any()
4137            .downcast_ref::<Int32Array>()
4138            .unwrap();
4139        let b = s
4140            .column_by_name("b")
4141            .unwrap()
4142            .as_any()
4143            .downcast_ref::<StringArray>()
4144            .unwrap();
4145        assert_eq!(a.value(0), 7);
4146        assert_eq!(b.value(0), "hi");
4147    }
4148
4149    #[test]
4150    fn test_record_append_default_null_uses_projector_field_defaults() {
4151        let field_defaults = vec![
4152            Some(AvroLiteral::Int(5)),
4153            Some(AvroLiteral::String("x".into())),
4154        ];
4155        let mut rec = make_record_decoder_with_projector_defaults(
4156            &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4157            field_defaults,
4158            vec![],
4159            0,
4160        );
4161        rec.append_default(&AvroLiteral::Null).unwrap();
4162        let arr = rec.flush(None).unwrap();
4163        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4164        let a = s
4165            .column_by_name("a")
4166            .unwrap()
4167            .as_any()
4168            .downcast_ref::<Int32Array>()
4169            .unwrap();
4170        let b = s
4171            .column_by_name("b")
4172            .unwrap()
4173            .as_any()
4174            .downcast_ref::<StringArray>()
4175            .unwrap();
4176        assert_eq!(a.value(0), 5);
4177        assert_eq!(b.value(0), "x");
4178    }
4179
4180    #[test]
4181    fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
4182     {
4183        let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
4184        let mut field_refs: Vec<FieldRef> = Vec::new();
4185        let mut encoders: Vec<Decoder> = Vec::new();
4186        for (name, dt, nullable) in &fields {
4187            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4188        }
4189        let enc_a = Decoder::Nullable(
4190            Nullability::NullSecond,
4191            NullBufferBuilder::new(DEFAULT_CAPACITY),
4192            Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
4193            NullablePlan::ReadTag,
4194        );
4195        let enc_b = Decoder::Nullable(
4196            Nullability::NullSecond,
4197            NullBufferBuilder::new(DEFAULT_CAPACITY),
4198            Box::new(Decoder::String(
4199                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4200                Vec::with_capacity(DEFAULT_CAPACITY),
4201            )),
4202            NullablePlan::ReadTag,
4203        );
4204        encoders.push(enc_a);
4205        encoders.push(enc_b);
4206        let projector = Projector {
4207            writer_to_reader: Arc::from(vec![]),
4208            skip_decoders: vec![],
4209            field_defaults: vec![None, None], // no defaults -> append_null
4210            default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4211        };
4212        let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
4213        let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4214        map.insert("a".to_string(), AvroLiteral::Int(9));
4215        rec.append_default(&AvroLiteral::Map(map)).unwrap();
4216        let arr = rec.flush(None).unwrap();
4217        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4218        let a = s
4219            .column_by_name("a")
4220            .unwrap()
4221            .as_any()
4222            .downcast_ref::<Int32Array>()
4223            .unwrap();
4224        let b = s
4225            .column_by_name("b")
4226            .unwrap()
4227            .as_any()
4228            .downcast_ref::<StringArray>()
4229            .unwrap();
4230        assert!(a.is_valid(0));
4231        assert_eq!(a.value(0), 9);
4232        assert!(b.is_null(0));
4233    }
4234
4235    #[test]
4236    fn test_projector_default_injection_when_writer_lacks_fields() {
4237        let defaults = vec![None, None];
4238        let injections = vec![
4239            (0, AvroLiteral::Int(99)),
4240            (1, AvroLiteral::String("alice".into())),
4241        ];
4242        let mut rec = make_record_decoder_with_projector_defaults(
4243            &[
4244                ("id", DataType::Int32, false),
4245                ("name", DataType::Utf8, false),
4246            ],
4247            defaults,
4248            injections,
4249            0,
4250        );
4251        rec.decode(&mut AvroCursor::new(&[])).unwrap();
4252        let arr = rec.flush(None).unwrap();
4253        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4254        let id = s
4255            .column_by_name("id")
4256            .unwrap()
4257            .as_any()
4258            .downcast_ref::<Int32Array>()
4259            .unwrap();
4260        let name = s
4261            .column_by_name("name")
4262            .unwrap()
4263            .as_any()
4264            .downcast_ref::<StringArray>()
4265            .unwrap();
4266        assert_eq!(id.value(0), 99);
4267        assert_eq!(name.value(0), "alice");
4268    }
4269
4270    #[test]
4271    fn union_type_ids_are_not_child_indexes() {
4272        let encodings: Vec<AvroDataType> =
4273            vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4274        let fields: UnionFields = [
4275            (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4276            (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4277        ]
4278        .into_iter()
4279        .collect();
4280        let dt = avro_from_codec(Codec::Union(
4281            encodings.into(),
4282            fields.clone(),
4283            UnionMode::Dense,
4284        ));
4285        let mut dec = Decoder::try_new(&dt).expect("decoder");
4286        let mut b1 = encode_avro_long(1);
4287        b1.extend(encode_avro_bytes("hi".as_bytes()));
4288        dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4289        let mut b0 = encode_avro_long(0);
4290        b0.extend(encode_avro_int(5));
4291        dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4292        let arr = dec.flush(None).expect("flush");
4293        let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4294        assert_eq!(ua.len(), 2);
4295        assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4296        assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4297        assert_eq!(ua.value_offset(0), 0);
4298        assert_eq!(ua.value_offset(1), 0);
4299        let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4300        assert_eq!(utf8_child.len(), 1);
4301        assert_eq!(utf8_child.value(0), "hi");
4302        let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
4303        assert_eq!(int_child.len(), 1);
4304        assert_eq!(int_child.value(0), 5);
4305        let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
4306        assert_eq!(type_ids, vec![42_i8, 7_i8]);
4307    }
4308
4309    #[cfg(feature = "avro_custom_types")]
4310    #[test]
4311    fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), ArrowError> {
4312        for codec in [
4313            Codec::DurationNanos,
4314            Codec::DurationMicros,
4315            Codec::DurationMillis,
4316            Codec::DurationSeconds,
4317        ] {
4318            let dt = make_avro_dt(codec.clone(), None);
4319            let s = Skipper::from_avro(&dt)?;
4320            match s {
4321                Skipper::Int64 => {}
4322                other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
4323            }
4324        }
4325        Ok(())
4326    }
4327
4328    #[cfg(feature = "avro_custom_types")]
4329    #[test]
4330    fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), ArrowError> {
4331        let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
4332        for codec in [
4333            Codec::DurationNanos,
4334            Codec::DurationMicros,
4335            Codec::DurationMillis,
4336            Codec::DurationSeconds,
4337        ] {
4338            let dt = make_avro_dt(codec.clone(), None);
4339            let mut s = Skipper::from_avro(&dt)?;
4340            for &v in &values {
4341                let bytes = encode_avro_long(v);
4342                let mut cursor = AvroCursor::new(&bytes);
4343                s.skip(&mut cursor)?;
4344                assert_eq!(
4345                    cursor.position(),
4346                    bytes.len(),
4347                    "did not consume all bytes for {:?} value {}",
4348                    codec,
4349                    v
4350                );
4351            }
4352        }
4353        Ok(())
4354    }
4355
4356    #[cfg(feature = "avro_custom_types")]
4357    #[test]
4358    fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), ArrowError> {
4359        let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
4360        let mut s = Skipper::from_avro(&dt)?;
4361        match &s {
4362            Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
4363                Skipper::Int64 => {}
4364                ref other => panic!("expected inner Int64, got {:?}", other),
4365            },
4366            other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
4367        }
4368        {
4369            let buf = encode_vlq_u64(0);
4370            let mut cursor = AvroCursor::new(&buf);
4371            s.skip(&mut cursor)?;
4372            assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
4373        }
4374        {
4375            let mut buf = encode_vlq_u64(1);
4376            buf.extend(encode_avro_long(0));
4377            let mut cursor = AvroCursor::new(&buf);
4378            s.skip(&mut cursor)?;
4379            assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
4380        }
4381
4382        Ok(())
4383    }
4384
4385    #[cfg(feature = "avro_custom_types")]
4386    #[test]
4387    fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), ArrowError> {
4388        let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
4389        let mut s = Skipper::from_avro(&dt)?;
4390        match &s {
4391            Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
4392                Skipper::Int64 => {}
4393                ref other => panic!("expected inner Int64, got {:?}", other),
4394            },
4395            other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
4396        }
4397        {
4398            let buf = encode_vlq_u64(1);
4399            let mut cursor = AvroCursor::new(&buf);
4400            s.skip(&mut cursor)?;
4401            assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
4402        }
4403        {
4404            let mut buf = encode_vlq_u64(0);
4405            buf.extend(encode_avro_long(-1));
4406            let mut cursor = AvroCursor::new(&buf);
4407            s.skip(&mut cursor)?;
4408            assert_eq!(
4409                cursor.position(),
4410                1 + encode_avro_long(-1).len(),
4411                "expected to consume tag=0 + long(-1)"
4412            );
4413        }
4414        Ok(())
4415    }
4416
4417    #[test]
4418    fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), ArrowError> {
4419        let dt = make_avro_dt(Codec::Interval, None);
4420        let mut s = Skipper::from_avro(&dt)?;
4421        match s {
4422            Skipper::DurationFixed12 => {}
4423            other => panic!("expected DurationFixed12, got {:?}", other),
4424        }
4425        let payload = vec![0u8; 12];
4426        let mut cursor = AvroCursor::new(&payload);
4427        s.skip(&mut cursor)?;
4428        assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
4429        Ok(())
4430    }
4431
4432    #[cfg(feature = "avro_custom_types")]
4433    #[test]
4434    fn test_run_end_encoded_width16_int32_basic_grouping() {
4435        use arrow_array::RunArray;
4436        use std::sync::Arc;
4437        let inner = avro_from_codec(Codec::Int32);
4438        let ree = AvroDataType::new(
4439            Codec::RunEndEncoded(Arc::new(inner), 16),
4440            Default::default(),
4441            None,
4442        );
4443        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4444        for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
4445            let bytes = encode_avro_int(v);
4446            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4447        }
4448        let arr = dec.flush(None).expect("flush");
4449        let ra = arr
4450            .as_any()
4451            .downcast_ref::<RunArray<Int16Type>>()
4452            .expect("RunArray<Int16Type>");
4453        assert_eq!(ra.len(), 9);
4454        assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
4455        let vals = ra
4456            .values()
4457            .as_ref()
4458            .as_any()
4459            .downcast_ref::<Int32Array>()
4460            .expect("values Int32");
4461        assert_eq!(vals.values(), &[1, 2, 3]);
4462    }
4463
4464    #[cfg(feature = "avro_custom_types")]
4465    #[test]
4466    fn test_run_end_encoded_width32_nullable_values_group_nulls() {
4467        use arrow_array::RunArray;
4468        use std::sync::Arc;
4469        let inner = AvroDataType::new(
4470            Codec::Int32,
4471            Default::default(),
4472            Some(Nullability::NullSecond),
4473        );
4474        let ree = AvroDataType::new(
4475            Codec::RunEndEncoded(Arc::new(inner), 32),
4476            Default::default(),
4477            None,
4478        );
4479        let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4480        let seq: [Option<i32>; 8] = [
4481            None,
4482            None,
4483            Some(7),
4484            Some(7),
4485            Some(7),
4486            None,
4487            Some(5),
4488            Some(5),
4489        ];
4490        for item in seq {
4491            let mut bytes = Vec::new();
4492            match item {
4493                None => bytes.extend_from_slice(&encode_vlq_u64(1)),
4494                Some(v) => {
4495                    bytes.extend_from_slice(&encode_vlq_u64(0));
4496                    bytes.extend_from_slice(&encode_avro_int(v));
4497                }
4498            }
4499            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4500        }
4501        let arr = dec.flush(None).expect("flush");
4502        let ra = arr
4503            .as_any()
4504            .downcast_ref::<RunArray<Int32Type>>()
4505            .expect("RunArray<Int32Type>");
4506        assert_eq!(ra.len(), 8);
4507        assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
4508        let vals = ra
4509            .values()
4510            .as_ref()
4511            .as_any()
4512            .downcast_ref::<Int32Array>()
4513            .expect("values Int32 (nullable)");
4514        assert_eq!(vals.len(), 4);
4515        assert!(vals.is_null(0));
4516        assert_eq!(vals.value(1), 7);
4517        assert!(vals.is_null(2));
4518        assert_eq!(vals.value(3), 5);
4519    }
4520
4521    #[cfg(feature = "avro_custom_types")]
4522    #[test]
4523    fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
4524        use arrow_array::RunArray;
4525        let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4526        let ree = Decoder::RunEndEncoded(
4527            8, /* bytes => Int64 run-ends */
4528            0,
4529            Box::new(inner_values),
4530        );
4531        let mut dec = Decoder::Nullable(
4532            Nullability::NullSecond,
4533            NullBufferBuilder::new(DEFAULT_CAPACITY),
4534            Box::new(ree),
4535            NullablePlan::FromSingle {
4536                promotion: Promotion::IntToDouble,
4537            },
4538        );
4539        for v in [1, 1, 2, 2, 2] {
4540            let bytes = encode_avro_int(v);
4541            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4542        }
4543        let arr = dec.flush(None).expect("flush");
4544        let ra = arr
4545            .as_any()
4546            .downcast_ref::<RunArray<Int64Type>>()
4547            .expect("RunArray<Int64Type>");
4548        assert_eq!(ra.len(), 5);
4549        assert_eq!(ra.run_ends().values(), &[2, 5]);
4550        let vals = ra
4551            .values()
4552            .as_ref()
4553            .as_any()
4554            .downcast_ref::<Float64Array>()
4555            .expect("values Float64");
4556        assert_eq!(vals.values(), &[1.0, 2.0]);
4557    }
4558
4559    #[cfg(feature = "avro_custom_types")]
4560    #[test]
4561    fn test_run_end_encoded_unsupported_run_end_width_errors() {
4562        use std::sync::Arc;
4563        let inner = avro_from_codec(Codec::Int32);
4564        let dt = AvroDataType::new(
4565            Codec::RunEndEncoded(Arc::new(inner), 3),
4566            Default::default(),
4567            None,
4568        );
4569        let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
4570        let msg = err.to_string();
4571        assert!(
4572            msg.contains("Unsupported run-end width")
4573                && msg.contains("16/32/64 bits or 2/4/8 bytes"),
4574            "unexpected error message: {msg}"
4575        );
4576    }
4577
4578    #[cfg(feature = "avro_custom_types")]
4579    #[test]
4580    fn test_run_end_encoded_empty_input_is_empty_runarray() {
4581        use arrow_array::RunArray;
4582        use std::sync::Arc;
4583        let inner = avro_from_codec(Codec::Utf8);
4584        let dt = AvroDataType::new(
4585            Codec::RunEndEncoded(Arc::new(inner), 4),
4586            Default::default(),
4587            None,
4588        );
4589        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4590        let arr = dec.flush(None).expect("flush");
4591        let ra = arr
4592            .as_any()
4593            .downcast_ref::<RunArray<Int32Type>>()
4594            .expect("RunArray<Int32Type>");
4595        assert_eq!(ra.len(), 0);
4596        assert_eq!(ra.run_ends().len(), 0);
4597        assert_eq!(ra.values().len(), 0);
4598    }
4599
4600    #[cfg(feature = "avro_custom_types")]
4601    #[test]
4602    fn test_run_end_encoded_strings_grouping_width32_bits() {
4603        use arrow_array::RunArray;
4604        use std::sync::Arc;
4605        let inner = avro_from_codec(Codec::Utf8);
4606        let dt = AvroDataType::new(
4607            Codec::RunEndEncoded(Arc::new(inner), 32),
4608            Default::default(),
4609            None,
4610        );
4611        let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4612        for s in ["a", "a", "bb", "bb", "bb", "a"] {
4613            let bytes = encode_avro_bytes(s.as_bytes());
4614            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4615        }
4616        let arr = dec.flush(None).expect("flush");
4617        let ra = arr
4618            .as_any()
4619            .downcast_ref::<RunArray<Int32Type>>()
4620            .expect("RunArray<Int32Type>");
4621        assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
4622        let vals = ra
4623            .values()
4624            .as_ref()
4625            .as_any()
4626            .downcast_ref::<StringArray>()
4627            .expect("values String");
4628        assert_eq!(vals.len(), 3);
4629        assert_eq!(vals.value(0), "a");
4630        assert_eq!(vals.value(1), "bb");
4631        assert_eq!(vals.value(2), "a");
4632    }
4633
4634    #[cfg(not(feature = "avro_custom_types"))]
4635    #[test]
4636    fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
4637        let dt = avro_from_codec(Codec::Int32);
4638        let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
4639        for v in [1, 2, 3] {
4640            let bytes = encode_avro_int(v);
4641            dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4642        }
4643        let arr = dec.flush(None).expect("flush");
4644        let a = arr
4645            .as_any()
4646            .downcast_ref::<Int32Array>()
4647            .expect("Int32Array");
4648        assert_eq!(a.values(), &[1, 2, 3]);
4649    }
4650}