arrow_avro/reader/
record.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::codec::{AvroDataType, Codec, Promotion, ResolutionInfo};
19use crate::reader::block::{Block, BlockDecoder};
20use crate::reader::cursor::AvroCursor;
21use crate::reader::header::Header;
22use crate::schema::*;
23use arrow_array::builder::{
24    ArrayBuilder, Decimal128Builder, Decimal256Builder, Decimal32Builder, Decimal64Builder,
25    IntervalMonthDayNanoBuilder, PrimitiveBuilder,
26};
27use arrow_array::types::*;
28use arrow_array::*;
29use arrow_buffer::*;
30use arrow_schema::{
31    ArrowError, DataType, Field as ArrowField, FieldRef, Fields, IntervalUnit,
32    Schema as ArrowSchema, SchemaRef, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
33};
34#[cfg(feature = "small_decimals")]
35use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
36use std::cmp::Ordering;
37use std::collections::HashMap;
38use std::io::Read;
39use std::sync::Arc;
40use uuid::Uuid;
41
42const DEFAULT_CAPACITY: usize = 1024;
43
44/// Macro to decode a decimal payload for a given width and integer type.
45macro_rules! decode_decimal {
46    ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
47        let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
48        $builder.append_value(<$Int>::from_be_bytes(bytes));
49    }};
50}
51
52/// Macro to finish a decimal builder into an array with precision/scale and nulls.
53macro_rules! flush_decimal {
54    ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
55        let (_, vals, _) = $builder.finish().into_parts();
56        let dec = <$ArrayTy>::new(vals, $nulls)
57            .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)
58            .map_err(|e| ArrowError::ParseError(e.to_string()))?;
59        Arc::new(dec) as ArrayRef
60    }};
61}
62
63#[derive(Debug)]
64pub(crate) struct RecordDecoderBuilder<'a> {
65    data_type: &'a AvroDataType,
66    use_utf8view: bool,
67}
68
69impl<'a> RecordDecoderBuilder<'a> {
70    pub(crate) fn new(data_type: &'a AvroDataType) -> Self {
71        Self {
72            data_type,
73            use_utf8view: false,
74        }
75    }
76
77    pub(crate) fn with_utf8_view(mut self, use_utf8view: bool) -> Self {
78        self.use_utf8view = use_utf8view;
79        self
80    }
81
82    /// Builds the `RecordDecoder`.
83    pub(crate) fn build(self) -> Result<RecordDecoder, ArrowError> {
84        RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view)
85    }
86}
87
88/// Decodes avro encoded data into [`RecordBatch`]
89#[derive(Debug)]
90pub(crate) struct RecordDecoder {
91    schema: SchemaRef,
92    fields: Vec<Decoder>,
93    use_utf8view: bool,
94    resolved: Option<ResolvedRuntime>,
95}
96
97#[derive(Debug)]
98struct ResolvedRuntime {
99    /// writer field index -> reader field index (or None if writer-only)
100    writer_to_reader: Arc<[Option<usize>]>,
101    /// per-writer-field skipper (Some only when writer-only)
102    skip_decoders: Vec<Option<Skipper>>,
103}
104
105impl RecordDecoder {
106    /// Creates a new `RecordDecoderBuilder` for configuring a `RecordDecoder`.
107    pub(crate) fn new(data_type: &'_ AvroDataType) -> Self {
108        RecordDecoderBuilder::new(data_type).build().unwrap()
109    }
110
111    /// Create a new [`RecordDecoder`] from the provided [`AvroDataType`] with default options
112    pub(crate) fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
113        RecordDecoderBuilder::new(data_type)
114            .with_utf8_view(true)
115            .build()
116    }
117
118    /// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options.
119    ///
120    /// This method allows you to customize how the Avro data is decoded into Arrow arrays.
121    ///
122    /// # Arguments
123    /// * `data_type` - The Avro data type to decode.
124    /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types.
125    ///
126    /// # Errors
127    /// This function will return an error if the provided `data_type` is not a `Record`.
128    pub(crate) fn try_new_with_options(
129        data_type: &AvroDataType,
130        use_utf8view: bool,
131    ) -> Result<Self, ArrowError> {
132        match data_type.codec() {
133            Codec::Struct(reader_fields) => {
134                // Build Arrow schema fields and per-child decoders
135                let mut arrow_fields = Vec::with_capacity(reader_fields.len());
136                let mut encodings = Vec::with_capacity(reader_fields.len());
137                for avro_field in reader_fields.iter() {
138                    arrow_fields.push(avro_field.field());
139                    encodings.push(Decoder::try_new(avro_field.data_type())?);
140                }
141                // If this record carries resolution metadata, prepare top-level runtime helpers
142                let resolved = match data_type.resolution.as_ref() {
143                    Some(ResolutionInfo::Record(rec)) => {
144                        let skip_decoders = build_skip_decoders(&rec.skip_fields)?;
145                        Some(ResolvedRuntime {
146                            writer_to_reader: rec.writer_to_reader.clone(),
147                            skip_decoders,
148                        })
149                    }
150                    _ => None,
151                };
152                Ok(Self {
153                    schema: Arc::new(ArrowSchema::new(arrow_fields)),
154                    fields: encodings,
155                    use_utf8view,
156                    resolved,
157                })
158            }
159            other => Err(ArrowError::ParseError(format!(
160                "Expected record got {other:?}"
161            ))),
162        }
163    }
164
165    /// Returns the decoder's `SchemaRef`
166    pub(crate) fn schema(&self) -> &SchemaRef {
167        &self.schema
168    }
169
170    /// Decode `count` records from `buf`
171    pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> {
172        let mut cursor = AvroCursor::new(buf);
173        match self.resolved.as_mut() {
174            Some(runtime) => {
175                // Top-level resolved record: read writer fields in writer order,
176                // project into reader fields, and skip writer-only fields
177                for _ in 0..count {
178                    decode_with_resolution(
179                        &mut cursor,
180                        &mut self.fields,
181                        &runtime.writer_to_reader,
182                        &mut runtime.skip_decoders,
183                    )?;
184                }
185            }
186            None => {
187                for _ in 0..count {
188                    for field in &mut self.fields {
189                        field.decode(&mut cursor)?;
190                    }
191                }
192            }
193        }
194        Ok(cursor.position())
195    }
196
197    /// Flush the decoded records into a [`RecordBatch`]
198    pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
199        let arrays = self
200            .fields
201            .iter_mut()
202            .map(|x| x.flush(None))
203            .collect::<Result<Vec<_>, _>>()?;
204        RecordBatch::try_new(self.schema.clone(), arrays)
205    }
206}
207
208fn decode_with_resolution(
209    buf: &mut AvroCursor<'_>,
210    encodings: &mut [Decoder],
211    writer_to_reader: &[Option<usize>],
212    skippers: &mut [Option<Skipper>],
213) -> Result<(), ArrowError> {
214    for (w_idx, (target, skipper_opt)) in writer_to_reader.iter().zip(skippers).enumerate() {
215        match (*target, skipper_opt.as_mut()) {
216            (Some(r_idx), _) => encodings[r_idx].decode(buf)?,
217            (None, Some(sk)) => sk.skip(buf)?,
218            (None, None) => {
219                return Err(ArrowError::SchemaError(format!(
220                    "No skipper available for writer-only field at index {w_idx}",
221                )));
222            }
223        }
224    }
225    Ok(())
226}
227
228#[derive(Debug)]
229enum Decoder {
230    Null(usize),
231    Boolean(BooleanBufferBuilder),
232    Int32(Vec<i32>),
233    Int64(Vec<i64>),
234    Float32(Vec<f32>),
235    Float64(Vec<f64>),
236    Date32(Vec<i32>),
237    TimeMillis(Vec<i32>),
238    TimeMicros(Vec<i64>),
239    TimestampMillis(bool, Vec<i64>),
240    TimestampMicros(bool, Vec<i64>),
241    Int32ToInt64(Vec<i64>),
242    Int32ToFloat32(Vec<f32>),
243    Int32ToFloat64(Vec<f64>),
244    Int64ToFloat32(Vec<f32>),
245    Int64ToFloat64(Vec<f64>),
246    Float32ToFloat64(Vec<f64>),
247    BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
248    StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
249    Binary(OffsetBufferBuilder<i32>, Vec<u8>),
250    /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray
251    String(OffsetBufferBuilder<i32>, Vec<u8>),
252    /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray
253    StringView(OffsetBufferBuilder<i32>, Vec<u8>),
254    Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
255    Record(Fields, Vec<Decoder>),
256    Map(
257        FieldRef,
258        OffsetBufferBuilder<i32>,
259        OffsetBufferBuilder<i32>,
260        Vec<u8>,
261        Box<Decoder>,
262    ),
263    Fixed(i32, Vec<u8>),
264    Enum(Vec<i32>, Arc<[String]>),
265    Duration(IntervalMonthDayNanoBuilder),
266    Uuid(Vec<u8>),
267    Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
268    Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
269    Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
270    Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
271    Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
272    EnumResolved {
273        indices: Vec<i32>,
274        symbols: Arc<[String]>,
275        mapping: Arc<[i32]>,
276        default_index: i32,
277    },
278    /// Resolved record that needs writer->reader projection and skipping writer-only fields
279    RecordResolved {
280        fields: Fields,
281        encodings: Vec<Decoder>,
282        writer_to_reader: Arc<[Option<usize>]>,
283        skip_decoders: Vec<Option<Skipper>>,
284    },
285}
286
287impl Decoder {
288    fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
289        // Extract just the Promotion (if any) to simplify pattern matching
290        let promotion = match data_type.resolution.as_ref() {
291            Some(ResolutionInfo::Promotion(p)) => Some(p),
292            _ => None,
293        };
294        let decoder = match (data_type.codec(), promotion) {
295            (Codec::Int64, Some(Promotion::IntToLong)) => {
296                Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
297            }
298            (Codec::Float32, Some(Promotion::IntToFloat)) => {
299                Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
300            }
301            (Codec::Float64, Some(Promotion::IntToDouble)) => {
302                Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
303            }
304            (Codec::Float32, Some(Promotion::LongToFloat)) => {
305                Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
306            }
307            (Codec::Float64, Some(Promotion::LongToDouble)) => {
308                Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
309            }
310            (Codec::Float64, Some(Promotion::FloatToDouble)) => {
311                Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
312            }
313            (Codec::Utf8, Some(Promotion::BytesToString))
314            | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
315                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
316                Vec::with_capacity(DEFAULT_CAPACITY),
317            ),
318            (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
319                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
320                Vec::with_capacity(DEFAULT_CAPACITY),
321            ),
322            (Codec::Null, _) => Self::Null(0),
323            (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
324            (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
325            (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
326            (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
327            (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
328            (Codec::Binary, _) => Self::Binary(
329                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
330                Vec::with_capacity(DEFAULT_CAPACITY),
331            ),
332            (Codec::Utf8, _) => Self::String(
333                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
334                Vec::with_capacity(DEFAULT_CAPACITY),
335            ),
336            (Codec::Utf8View, _) => Self::StringView(
337                OffsetBufferBuilder::new(DEFAULT_CAPACITY),
338                Vec::with_capacity(DEFAULT_CAPACITY),
339            ),
340            (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
341            (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
342            (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
343            (Codec::TimestampMillis(is_utc), _) => {
344                Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
345            }
346            (Codec::TimestampMicros(is_utc), _) => {
347                Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
348            }
349            (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
350            (Codec::Decimal(precision, scale, size), _) => {
351                let p = *precision;
352                let s = *scale;
353                let prec = p as u8;
354                let scl = s.unwrap_or(0) as i8;
355                #[cfg(feature = "small_decimals")]
356                {
357                    if p <= DECIMAL32_MAX_PRECISION as usize {
358                        let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
359                            .with_precision_and_scale(prec, scl)?;
360                        Self::Decimal32(p, s, *size, builder)
361                    } else if p <= DECIMAL64_MAX_PRECISION as usize {
362                        let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
363                            .with_precision_and_scale(prec, scl)?;
364                        Self::Decimal64(p, s, *size, builder)
365                    } else if p <= DECIMAL128_MAX_PRECISION as usize {
366                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
367                            .with_precision_and_scale(prec, scl)?;
368                        Self::Decimal128(p, s, *size, builder)
369                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
370                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
371                            .with_precision_and_scale(prec, scl)?;
372                        Self::Decimal256(p, s, *size, builder)
373                    } else {
374                        return Err(ArrowError::ParseError(format!(
375                            "Decimal precision {p} exceeds maximum supported"
376                        )));
377                    }
378                }
379                #[cfg(not(feature = "small_decimals"))]
380                {
381                    if p <= DECIMAL128_MAX_PRECISION as usize {
382                        let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
383                            .with_precision_and_scale(prec, scl)?;
384                        Self::Decimal128(p, s, *size, builder)
385                    } else if p <= DECIMAL256_MAX_PRECISION as usize {
386                        let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
387                            .with_precision_and_scale(prec, scl)?;
388                        Self::Decimal256(p, s, *size, builder)
389                    } else {
390                        return Err(ArrowError::ParseError(format!(
391                            "Decimal precision {p} exceeds maximum supported"
392                        )));
393                    }
394                }
395            }
396            (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
397            (Codec::List(item), _) => {
398                let decoder = Self::try_new(item)?;
399                Self::Array(
400                    Arc::new(item.field_with_name("item")),
401                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
402                    Box::new(decoder),
403                )
404            }
405            (Codec::Enum(symbols), _) => {
406                if let Some(ResolutionInfo::EnumMapping(mapping)) = data_type.resolution.as_ref() {
407                    Self::EnumResolved {
408                        indices: Vec::with_capacity(DEFAULT_CAPACITY),
409                        symbols: symbols.clone(),
410                        mapping: mapping.mapping.clone(),
411                        default_index: mapping.default_index,
412                    }
413                } else {
414                    Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone())
415                }
416            }
417            (Codec::Struct(fields), _) => {
418                let mut arrow_fields = Vec::with_capacity(fields.len());
419                let mut encodings = Vec::with_capacity(fields.len());
420                for avro_field in fields.iter() {
421                    let encoding = Self::try_new(avro_field.data_type())?;
422                    arrow_fields.push(avro_field.field());
423                    encodings.push(encoding);
424                }
425                if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
426                    let skip_decoders = build_skip_decoders(&rec.skip_fields)?;
427                    Self::RecordResolved {
428                        fields: arrow_fields.into(),
429                        encodings,
430                        writer_to_reader: rec.writer_to_reader.clone(),
431                        skip_decoders,
432                    }
433                } else {
434                    Self::Record(arrow_fields.into(), encodings)
435                }
436            }
437            (Codec::Map(child), _) => {
438                let val_field = child.field_with_name("value");
439                let map_field = Arc::new(ArrowField::new(
440                    "entries",
441                    DataType::Struct(Fields::from(vec![
442                        ArrowField::new("key", DataType::Utf8, false),
443                        val_field,
444                    ])),
445                    false,
446                ));
447                let val_dec = Self::try_new(child)?;
448                Self::Map(
449                    map_field,
450                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
451                    OffsetBufferBuilder::new(DEFAULT_CAPACITY),
452                    Vec::with_capacity(DEFAULT_CAPACITY),
453                    Box::new(val_dec),
454                )
455            }
456            (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
457        };
458        Ok(match data_type.nullability() {
459            Some(nullability) => Self::Nullable(
460                nullability,
461                NullBufferBuilder::new(DEFAULT_CAPACITY),
462                Box::new(decoder),
463            ),
464            None => decoder,
465        })
466    }
467
468    /// Append a null record
469    fn append_null(&mut self) {
470        match self {
471            Self::Null(count) => *count += 1,
472            Self::Boolean(b) => b.append(false),
473            Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
474            Self::Int64(v)
475            | Self::Int32ToInt64(v)
476            | Self::TimeMicros(v)
477            | Self::TimestampMillis(_, v)
478            | Self::TimestampMicros(_, v) => v.push(0),
479            Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
480            Self::Float64(v)
481            | Self::Int32ToFloat64(v)
482            | Self::Int64ToFloat64(v)
483            | Self::Float32ToFloat64(v) => v.push(0.),
484            Self::Binary(offsets, _)
485            | Self::String(offsets, _)
486            | Self::StringView(offsets, _)
487            | Self::BytesToString(offsets, _)
488            | Self::StringToBytes(offsets, _) => {
489                offsets.push_length(0);
490            }
491            Self::Uuid(v) => {
492                v.extend([0; 16]);
493            }
494            Self::Array(_, offsets, e) => {
495                offsets.push_length(0);
496            }
497            Self::Record(_, e) => e.iter_mut().for_each(|e| e.append_null()),
498            Self::Map(_, _koff, moff, _, _) => {
499                moff.push_length(0);
500            }
501            Self::Fixed(sz, accum) => {
502                accum.extend(std::iter::repeat_n(0u8, *sz as usize));
503            }
504            Self::Decimal32(_, _, _, builder) => builder.append_value(0),
505            Self::Decimal64(_, _, _, builder) => builder.append_value(0),
506            Self::Decimal128(_, _, _, builder) => builder.append_value(0),
507            Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
508            Self::Enum(indices, _) => indices.push(0),
509            Self::EnumResolved { indices, .. } => indices.push(0),
510            Self::Duration(builder) => builder.append_null(),
511            Self::Nullable(_, null_buffer, inner) => {
512                null_buffer.append(false);
513                inner.append_null();
514            }
515            Self::RecordResolved { encodings, .. } => {
516                encodings.iter_mut().for_each(|e| e.append_null());
517            }
518        }
519    }
520
521    /// Decode a single record from `buf`
522    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
523        match self {
524            Self::Null(x) => *x += 1,
525            Self::Boolean(values) => values.append(buf.get_bool()?),
526            Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
527                values.push(buf.get_int()?)
528            }
529            Self::Int64(values)
530            | Self::TimeMicros(values)
531            | Self::TimestampMillis(_, values)
532            | Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
533            Self::Float32(values) => values.push(buf.get_float()?),
534            Self::Float64(values) => values.push(buf.get_double()?),
535            Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
536            Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
537            Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
538            Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
539            Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
540            Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
541            Self::StringToBytes(offsets, values)
542            | Self::BytesToString(offsets, values)
543            | Self::Binary(offsets, values)
544            | Self::String(offsets, values)
545            | Self::StringView(offsets, values) => {
546                let data = buf.get_bytes()?;
547                offsets.push_length(data.len());
548                values.extend_from_slice(data);
549            }
550            Self::Uuid(values) => {
551                let s_bytes = buf.get_bytes()?;
552                let s = std::str::from_utf8(s_bytes).map_err(|e| {
553                    ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
554                })?;
555                let uuid = Uuid::try_parse(s)
556                    .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?;
557                values.extend_from_slice(uuid.as_bytes());
558            }
559            Self::Array(_, off, encoding) => {
560                let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
561                off.push_length(total_items);
562            }
563            Self::Record(_, encodings) => {
564                for encoding in encodings {
565                    encoding.decode(buf)?;
566                }
567            }
568            Self::Map(_, koff, moff, kdata, valdec) => {
569                let newly_added = read_blocks(buf, |cur| {
570                    let kb = cur.get_bytes()?;
571                    koff.push_length(kb.len());
572                    kdata.extend_from_slice(kb);
573                    valdec.decode(cur)
574                })?;
575                moff.push_length(newly_added);
576            }
577            Self::Fixed(sz, accum) => {
578                let fx = buf.get_fixed(*sz as usize)?;
579                accum.extend_from_slice(fx);
580            }
581            Self::Decimal32(_, _, size, builder) => {
582                decode_decimal!(size, buf, builder, 4, i32);
583            }
584            Self::Decimal64(_, _, size, builder) => {
585                decode_decimal!(size, buf, builder, 8, i64);
586            }
587            Self::Decimal128(_, _, size, builder) => {
588                decode_decimal!(size, buf, builder, 16, i128);
589            }
590            Self::Decimal256(_, _, size, builder) => {
591                decode_decimal!(size, buf, builder, 32, i256);
592            }
593            Self::Enum(indices, _) => {
594                indices.push(buf.get_int()?);
595            }
596            Self::EnumResolved {
597                indices,
598                mapping,
599                default_index,
600                ..
601            } => {
602                let raw = buf.get_int()?;
603                let resolved = usize::try_from(raw)
604                    .ok()
605                    .and_then(|idx| mapping.get(idx).copied())
606                    .filter(|&idx| idx >= 0)
607                    .unwrap_or(*default_index);
608                if resolved >= 0 {
609                    indices.push(resolved);
610                } else {
611                    return Err(ArrowError::ParseError(format!(
612                        "Enum symbol index {raw} not resolvable and no default provided",
613                    )));
614                }
615            }
616            Self::Duration(builder) => {
617                let b = buf.get_fixed(12)?;
618                let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
619                let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
620                let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
621                let nanos = (millis as i64) * 1_000_000;
622                builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
623            }
624            Self::Nullable(order, nb, encoding) => {
625                let branch = buf.read_vlq()?;
626                let is_not_null = match *order {
627                    Nullability::NullFirst => branch != 0,
628                    Nullability::NullSecond => branch == 0,
629                };
630                if is_not_null {
631                    // It is important to decode before appending to null buffer in case of decode error
632                    encoding.decode(buf)?;
633                } else {
634                    encoding.append_null();
635                }
636                nb.append(is_not_null);
637            }
638            Self::RecordResolved {
639                encodings,
640                writer_to_reader,
641                skip_decoders,
642                ..
643            } => {
644                decode_with_resolution(buf, encodings, writer_to_reader, skip_decoders)?;
645            }
646        }
647        Ok(())
648    }
649
650    /// Flush decoded records to an [`ArrayRef`]
651    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
652        Ok(match self {
653            Self::Nullable(_, n, e) => e.flush(n.finish())?,
654            Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
655            Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
656            Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
657            Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
658            Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
659            Self::TimeMillis(values) => {
660                Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
661            }
662            Self::TimeMicros(values) => {
663                Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
664            }
665            Self::TimestampMillis(is_utc, values) => Arc::new(
666                flush_primitive::<TimestampMillisecondType>(values, nulls)
667                    .with_timezone_opt(is_utc.then(|| "+00:00")),
668            ),
669            Self::TimestampMicros(is_utc, values) => Arc::new(
670                flush_primitive::<TimestampMicrosecondType>(values, nulls)
671                    .with_timezone_opt(is_utc.then(|| "+00:00")),
672            ),
673            Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
674            Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
675            Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
676            Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
677                Arc::new(flush_primitive::<Float32Type>(values, nulls))
678            }
679            Self::Int32ToFloat64(values)
680            | Self::Int64ToFloat64(values)
681            | Self::Float32ToFloat64(values) => {
682                Arc::new(flush_primitive::<Float64Type>(values, nulls))
683            }
684            Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
685                let offsets = flush_offsets(offsets);
686                let values = flush_values(values).into();
687                Arc::new(BinaryArray::new(offsets, values, nulls))
688            }
689            Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
690                let offsets = flush_offsets(offsets);
691                let values = flush_values(values).into();
692                Arc::new(StringArray::new(offsets, values, nulls))
693            }
694            Self::StringView(offsets, values) => {
695                let offsets = flush_offsets(offsets);
696                let values = flush_values(values);
697                let array = StringArray::new(offsets, values.into(), nulls.clone());
698                let values: Vec<&str> = (0..array.len())
699                    .map(|i| {
700                        if array.is_valid(i) {
701                            array.value(i)
702                        } else {
703                            ""
704                        }
705                    })
706                    .collect();
707                Arc::new(StringViewArray::from(values))
708            }
709            Self::Array(field, offsets, values) => {
710                let values = values.flush(None)?;
711                let offsets = flush_offsets(offsets);
712                Arc::new(ListArray::new(field.clone(), offsets, values, nulls))
713            }
714            Self::Record(fields, encodings) => {
715                let arrays = encodings
716                    .iter_mut()
717                    .map(|x| x.flush(None))
718                    .collect::<Result<Vec<_>, _>>()?;
719                Arc::new(StructArray::new(fields.clone(), arrays, nulls))
720            }
721            Self::Map(map_field, k_off, m_off, kdata, valdec) => {
722                let moff = flush_offsets(m_off);
723                let koff = flush_offsets(k_off);
724                let kd = flush_values(kdata).into();
725                let val_arr = valdec.flush(None)?;
726                let key_arr = StringArray::new(koff, kd, None);
727                if key_arr.len() != val_arr.len() {
728                    return Err(ArrowError::InvalidArgumentError(format!(
729                        "Map keys length ({}) != map values length ({})",
730                        key_arr.len(),
731                        val_arr.len()
732                    )));
733                }
734                let final_len = moff.len() - 1;
735                if let Some(n) = &nulls {
736                    if n.len() != final_len {
737                        return Err(ArrowError::InvalidArgumentError(format!(
738                            "Map array null buffer length {} != final map length {final_len}",
739                            n.len()
740                        )));
741                    }
742                }
743                let entries_fields = match map_field.data_type() {
744                    DataType::Struct(fields) => fields.clone(),
745                    other => {
746                        return Err(ArrowError::InvalidArgumentError(format!(
747                            "Map entries field must be a Struct, got {other:?}"
748                        )))
749                    }
750                };
751                let entries_struct =
752                    StructArray::new(entries_fields, vec![Arc::new(key_arr), val_arr], None);
753                let map_arr = MapArray::new(map_field.clone(), moff, entries_struct, nulls, false);
754                Arc::new(map_arr)
755            }
756            Self::Fixed(sz, accum) => {
757                let b: Buffer = flush_values(accum).into();
758                let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
759                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
760                Arc::new(arr)
761            }
762            Self::Uuid(values) => {
763                let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
764                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
765                Arc::new(arr)
766            }
767            Self::Decimal32(precision, scale, _, builder) => {
768                flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
769            }
770            Self::Decimal64(precision, scale, _, builder) => {
771                flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
772            }
773            Self::Decimal128(precision, scale, _, builder) => {
774                flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
775            }
776            Self::Decimal256(precision, scale, _, builder) => {
777                flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
778            }
779            Self::Enum(indices, symbols) => flush_dict(indices, symbols, nulls)?,
780            Self::EnumResolved {
781                indices, symbols, ..
782            } => flush_dict(indices, symbols, nulls)?,
783            Self::Duration(builder) => {
784                let (_, vals, _) = builder.finish().into_parts();
785                let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
786                    .map_err(|e| ArrowError::ParseError(e.to_string()))?;
787                Arc::new(vals)
788            }
789            Self::RecordResolved {
790                fields, encodings, ..
791            } => {
792                let arrays = encodings
793                    .iter_mut()
794                    .map(|x| x.flush(None))
795                    .collect::<Result<Vec<_>, _>>()?;
796                Arc::new(StructArray::new(fields.clone(), arrays, nulls))
797            }
798        })
799    }
800}
801
802#[derive(Debug, Copy, Clone)]
803enum NegativeBlockBehavior {
804    ProcessItems,
805    SkipBySize,
806}
807
808#[inline]
809fn skip_blocks(
810    buf: &mut AvroCursor,
811    mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
812) -> Result<usize, ArrowError> {
813    process_blockwise(
814        buf,
815        move |c| skip_item(c),
816        NegativeBlockBehavior::SkipBySize,
817    )
818}
819
820#[inline]
821fn flush_dict(
822    indices: &mut Vec<i32>,
823    symbols: &[String],
824    nulls: Option<NullBuffer>,
825) -> Result<ArrayRef, ArrowError> {
826    let keys = flush_primitive::<Int32Type>(indices, nulls);
827    let values = Arc::new(StringArray::from_iter_values(
828        symbols.iter().map(|s| s.as_str()),
829    ));
830    DictionaryArray::try_new(keys, values)
831        .map_err(|e| ArrowError::ParseError(e.to_string()))
832        .map(|arr| Arc::new(arr) as ArrayRef)
833}
834
835#[inline]
836fn read_blocks(
837    buf: &mut AvroCursor,
838    decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
839) -> Result<usize, ArrowError> {
840    process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
841}
842
843#[inline]
844fn process_blockwise(
845    buf: &mut AvroCursor,
846    mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
847    negative_behavior: NegativeBlockBehavior,
848) -> Result<usize, ArrowError> {
849    let mut total = 0usize;
850    loop {
851        // Read the block count
852        //  positive = that many items
853        //  negative = that many items + read block size
854        //  See: https://avro.apache.org/docs/1.11.1/specification/#maps
855        let block_count = buf.get_long()?;
856        match block_count.cmp(&0) {
857            Ordering::Equal => break,
858            Ordering::Less => {
859                let count = (-block_count) as usize;
860                // A negative count is followed by a long of the size in bytes
861                let size_in_bytes = buf.get_long()? as usize;
862                match negative_behavior {
863                    NegativeBlockBehavior::ProcessItems => {
864                        // Process items one-by-one after reading size
865                        for _ in 0..count {
866                            on_item(buf)?;
867                        }
868                    }
869                    NegativeBlockBehavior::SkipBySize => {
870                        // Skip the entire block payload at once
871                        let _ = buf.get_fixed(size_in_bytes)?;
872                    }
873                }
874                total += count;
875            }
876            Ordering::Greater => {
877                let count = block_count as usize;
878                for _ in 0..count {
879                    on_item(buf)?;
880                }
881                total += count;
882            }
883        }
884    }
885    Ok(total)
886}
887
888#[inline]
889fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
890    std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
891}
892
893#[inline]
894fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
895    std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
896}
897
898#[inline]
899fn flush_primitive<T: ArrowPrimitiveType>(
900    values: &mut Vec<T::Native>,
901    nulls: Option<NullBuffer>,
902) -> PrimitiveArray<T> {
903    PrimitiveArray::new(flush_values(values).into(), nulls)
904}
905
906#[inline]
907fn read_decimal_bytes_be<const N: usize>(
908    buf: &mut AvroCursor<'_>,
909    size: &Option<usize>,
910) -> Result<[u8; N], ArrowError> {
911    match size {
912        Some(n) if *n == N => {
913            let raw = buf.get_fixed(N)?;
914            let mut arr = [0u8; N];
915            arr.copy_from_slice(raw);
916            Ok(arr)
917        }
918        Some(n) => {
919            let raw = buf.get_fixed(*n)?;
920            sign_cast_to::<N>(raw)
921        }
922        None => {
923            let raw = buf.get_bytes()?;
924            sign_cast_to::<N>(raw)
925        }
926    }
927}
928
929/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement
930/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding:
931/// the payload is a big-endian two's-complement integer, and when narrowing it must
932/// be representable without changing sign or value.
933///
934/// If `raw.len() < N`, the value is sign-extended.
935/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte
936/// and the MSB of the first kept byte must match the sign (to avoid silent overflow).
937#[inline]
938fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
939    let len = raw.len();
940    // Fast path: exact width, just copy
941    if len == N {
942        let mut out = [0u8; N];
943        out.copy_from_slice(raw);
944        return Ok(out);
945    }
946    // Determine sign byte from MSB of first byte (empty => positive)
947    let first = raw.first().copied().unwrap_or(0u8);
948    let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
949    // Pre-fill with sign byte to support sign extension
950    let mut out = [sign_byte; N];
951    if len > N {
952        // Validate truncation: all dropped leading bytes must equal sign_byte,
953        // and the MSB of the first kept byte must match the sign.
954        let extra = len - N;
955        // Any non-sign byte in the truncated prefix indicates overflow
956        if raw[..extra].iter().any(|&b| b != sign_byte) {
957            return Err(ArrowError::ParseError(format!(
958                "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
959                len, N
960            )));
961        }
962        if N > 0 {
963            let first_kept = raw[extra];
964            let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
965            if sign_bit_mismatch {
966                return Err(ArrowError::ParseError(format!(
967                    "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
968                    len, N
969                )));
970            }
971        }
972        out.copy_from_slice(&raw[extra..]);
973        return Ok(out);
974    }
975    out[N - len..].copy_from_slice(raw);
976    Ok(out)
977}
978
979/// Lightweight skipper for non‑projected writer fields
980/// (fields present in the writer schema but omitted by the reader/projection);
981/// per Avro 1.11.1 schema resolution these fields are ignored.
982///
983/// <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
984#[derive(Debug)]
985enum Skipper {
986    Null,
987    Boolean,
988    Int32,
989    Int64,
990    Float32,
991    Float64,
992    Bytes,
993    String,
994    Date32,
995    TimeMillis,
996    TimeMicros,
997    TimestampMillis,
998    TimestampMicros,
999    Fixed(usize),
1000    Decimal(Option<usize>),
1001    UuidString,
1002    Enum,
1003    DurationFixed12,
1004    List(Box<Skipper>),
1005    Map(Box<Skipper>),
1006    Struct(Vec<Skipper>),
1007    Nullable(Nullability, Box<Skipper>),
1008}
1009
1010impl Skipper {
1011    fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
1012        let mut base = match dt.codec() {
1013            Codec::Null => Self::Null,
1014            Codec::Boolean => Self::Boolean,
1015            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1016            Codec::Int64 => Self::Int64,
1017            Codec::TimeMicros => Self::TimeMicros,
1018            Codec::TimestampMillis(_) => Self::TimestampMillis,
1019            Codec::TimestampMicros(_) => Self::TimestampMicros,
1020            Codec::Float32 => Self::Float32,
1021            Codec::Float64 => Self::Float64,
1022            Codec::Binary => Self::Bytes,
1023            Codec::Utf8 | Codec::Utf8View => Self::String,
1024            Codec::Fixed(sz) => Self::Fixed(*sz as usize),
1025            Codec::Decimal(_, _, size) => Self::Decimal(*size),
1026            Codec::Uuid => Self::UuidString, // encoded as string
1027            Codec::Enum(_) => Self::Enum,
1028            Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
1029            Codec::Struct(fields) => Self::Struct(
1030                fields
1031                    .iter()
1032                    .map(|f| Skipper::from_avro(f.data_type()))
1033                    .collect::<Result<_, _>>()?,
1034            ),
1035            Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
1036            Codec::Interval => Self::DurationFixed12,
1037            _ => {
1038                return Err(ArrowError::NotYetImplemented(format!(
1039                    "Skipper not implemented for codec {:?}",
1040                    dt.codec()
1041                )));
1042            }
1043        };
1044        if let Some(n) = dt.nullability() {
1045            base = Self::Nullable(n, Box::new(base));
1046        }
1047        Ok(base)
1048    }
1049
1050    fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1051        match self {
1052            Self::Null => Ok(()),
1053            Self::Boolean => {
1054                buf.get_bool()?;
1055                Ok(())
1056            }
1057            Self::Int32 | Self::Date32 | Self::TimeMillis => {
1058                buf.get_int()?;
1059                Ok(())
1060            }
1061            Self::Int64 | Self::TimeMicros | Self::TimestampMillis | Self::TimestampMicros => {
1062                buf.get_long()?;
1063                Ok(())
1064            }
1065            Self::Float32 => {
1066                buf.get_float()?;
1067                Ok(())
1068            }
1069            Self::Float64 => {
1070                buf.get_double()?;
1071                Ok(())
1072            }
1073            Self::Bytes | Self::String | Self::UuidString => {
1074                buf.get_bytes()?;
1075                Ok(())
1076            }
1077            Self::Fixed(sz) => {
1078                buf.get_fixed(*sz)?;
1079                Ok(())
1080            }
1081            Self::Decimal(size) => {
1082                if let Some(s) = size {
1083                    buf.get_fixed(*s)
1084                } else {
1085                    buf.get_bytes()
1086                }?;
1087                Ok(())
1088            }
1089            Self::Enum => {
1090                buf.get_int()?;
1091                Ok(())
1092            }
1093            Self::DurationFixed12 => {
1094                buf.get_fixed(12)?;
1095                Ok(())
1096            }
1097            Self::List(item) => {
1098                skip_blocks(buf, |c| item.skip(c))?;
1099                Ok(())
1100            }
1101            Self::Map(value) => {
1102                skip_blocks(buf, |c| {
1103                    c.get_bytes()?; // key
1104                    value.skip(c)
1105                })?;
1106                Ok(())
1107            }
1108            Self::Struct(fields) => {
1109                for f in fields.iter_mut() {
1110                    f.skip(buf)?
1111                }
1112                Ok(())
1113            }
1114            Self::Nullable(order, inner) => {
1115                let branch = buf.read_vlq()?;
1116                let is_not_null = match *order {
1117                    Nullability::NullFirst => branch != 0,
1118                    Nullability::NullSecond => branch == 0,
1119                };
1120                if is_not_null {
1121                    inner.skip(buf)?;
1122                }
1123                Ok(())
1124            }
1125        }
1126    }
1127}
1128
1129#[inline]
1130fn build_skip_decoders(
1131    skip_fields: &[Option<AvroDataType>],
1132) -> Result<Vec<Option<Skipper>>, ArrowError> {
1133    skip_fields
1134        .iter()
1135        .map(|opt| opt.as_ref().map(Skipper::from_avro).transpose())
1136        .collect()
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141    use super::*;
1142    use crate::codec::AvroField;
1143    use arrow_array::{
1144        cast::AsArray, Array, Decimal128Array, Decimal256Array, Decimal32Array, DictionaryArray,
1145        FixedSizeBinaryArray, IntervalMonthDayNanoArray, ListArray, MapArray, StringArray,
1146        StructArray,
1147    };
1148
1149    fn encode_avro_int(value: i32) -> Vec<u8> {
1150        let mut buf = Vec::new();
1151        let mut v = (value << 1) ^ (value >> 31);
1152        while v & !0x7F != 0 {
1153            buf.push(((v & 0x7F) | 0x80) as u8);
1154            v >>= 7;
1155        }
1156        buf.push(v as u8);
1157        buf
1158    }
1159
1160    fn encode_avro_long(value: i64) -> Vec<u8> {
1161        let mut buf = Vec::new();
1162        let mut v = (value << 1) ^ (value >> 63);
1163        while v & !0x7F != 0 {
1164            buf.push(((v & 0x7F) | 0x80) as u8);
1165            v >>= 7;
1166        }
1167        buf.push(v as u8);
1168        buf
1169    }
1170
1171    fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
1172        let mut buf = encode_avro_long(bytes.len() as i64);
1173        buf.extend_from_slice(bytes);
1174        buf
1175    }
1176
1177    fn avro_from_codec(codec: Codec) -> AvroDataType {
1178        AvroDataType::new(codec, Default::default(), None)
1179    }
1180
1181    fn decoder_for_promotion(
1182        writer: PrimitiveType,
1183        reader: PrimitiveType,
1184        use_utf8view: bool,
1185    ) -> Decoder {
1186        let ws = Schema::TypeName(TypeName::Primitive(writer));
1187        let rs = Schema::TypeName(TypeName::Primitive(reader));
1188        let field =
1189            AvroField::resolve_from_writer_and_reader(&ws, &rs, use_utf8view, false).unwrap();
1190        Decoder::try_new(field.data_type()).unwrap()
1191    }
1192
1193    #[test]
1194    fn test_schema_resolution_promotion_int_to_long() {
1195        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
1196        assert!(matches!(dec, Decoder::Int32ToInt64(_)));
1197        for v in [0, 1, -2, 123456] {
1198            let data = encode_avro_int(v);
1199            let mut cur = AvroCursor::new(&data);
1200            dec.decode(&mut cur).unwrap();
1201        }
1202        let arr = dec.flush(None).unwrap();
1203        let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1204        assert_eq!(a.value(0), 0);
1205        assert_eq!(a.value(1), 1);
1206        assert_eq!(a.value(2), -2);
1207        assert_eq!(a.value(3), 123456);
1208    }
1209
1210    #[test]
1211    fn test_schema_resolution_promotion_int_to_float() {
1212        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
1213        assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
1214        for v in [0, 42, -7] {
1215            let data = encode_avro_int(v);
1216            let mut cur = AvroCursor::new(&data);
1217            dec.decode(&mut cur).unwrap();
1218        }
1219        let arr = dec.flush(None).unwrap();
1220        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
1221        assert_eq!(a.value(0), 0.0);
1222        assert_eq!(a.value(1), 42.0);
1223        assert_eq!(a.value(2), -7.0);
1224    }
1225
1226    #[test]
1227    fn test_schema_resolution_promotion_int_to_double() {
1228        let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
1229        assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
1230        for v in [1, -1, 10_000] {
1231            let data = encode_avro_int(v);
1232            let mut cur = AvroCursor::new(&data);
1233            dec.decode(&mut cur).unwrap();
1234        }
1235        let arr = dec.flush(None).unwrap();
1236        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
1237        assert_eq!(a.value(0), 1.0);
1238        assert_eq!(a.value(1), -1.0);
1239        assert_eq!(a.value(2), 10_000.0);
1240    }
1241
1242    #[test]
1243    fn test_schema_resolution_promotion_long_to_float() {
1244        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
1245        assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
1246        for v in [0_i64, 1_000_000_i64, -123_i64] {
1247            let data = encode_avro_long(v);
1248            let mut cur = AvroCursor::new(&data);
1249            dec.decode(&mut cur).unwrap();
1250        }
1251        let arr = dec.flush(None).unwrap();
1252        let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
1253        assert_eq!(a.value(0), 0.0);
1254        assert_eq!(a.value(1), 1_000_000.0);
1255        assert_eq!(a.value(2), -123.0);
1256    }
1257
1258    #[test]
1259    fn test_schema_resolution_promotion_long_to_double() {
1260        let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
1261        assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
1262        for v in [2_i64, -2_i64, 9_223_372_i64] {
1263            let data = encode_avro_long(v);
1264            let mut cur = AvroCursor::new(&data);
1265            dec.decode(&mut cur).unwrap();
1266        }
1267        let arr = dec.flush(None).unwrap();
1268        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
1269        assert_eq!(a.value(0), 2.0);
1270        assert_eq!(a.value(1), -2.0);
1271        assert_eq!(a.value(2), 9_223_372.0);
1272    }
1273
1274    #[test]
1275    fn test_schema_resolution_promotion_float_to_double() {
1276        let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
1277        assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
1278        for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
1279            let data = v.to_le_bytes().to_vec();
1280            let mut cur = AvroCursor::new(&data);
1281            dec.decode(&mut cur).unwrap();
1282        }
1283        let arr = dec.flush(None).unwrap();
1284        let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
1285        assert_eq!(a.value(0), 0.5_f64);
1286        assert_eq!(a.value(1), -3.25_f64);
1287        assert_eq!(a.value(2), 1.0e6_f64);
1288    }
1289
1290    #[test]
1291    fn test_schema_resolution_promotion_bytes_to_string_utf8() {
1292        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
1293        assert!(matches!(dec, Decoder::BytesToString(_, _)));
1294        for s in ["hello", "world", "héllo"] {
1295            let data = encode_avro_bytes(s.as_bytes());
1296            let mut cur = AvroCursor::new(&data);
1297            dec.decode(&mut cur).unwrap();
1298        }
1299        let arr = dec.flush(None).unwrap();
1300        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
1301        assert_eq!(a.value(0), "hello");
1302        assert_eq!(a.value(1), "world");
1303        assert_eq!(a.value(2), "héllo");
1304    }
1305
1306    #[test]
1307    fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
1308        let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
1309        assert!(matches!(dec, Decoder::BytesToString(_, _)));
1310        let data = encode_avro_bytes("abc".as_bytes());
1311        let mut cur = AvroCursor::new(&data);
1312        dec.decode(&mut cur).unwrap();
1313        let arr = dec.flush(None).unwrap();
1314        let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
1315        assert_eq!(a.value(0), "abc");
1316    }
1317
1318    #[test]
1319    fn test_schema_resolution_promotion_string_to_bytes() {
1320        let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
1321        assert!(matches!(dec, Decoder::StringToBytes(_, _)));
1322        for s in ["", "abc", "data"] {
1323            let data = encode_avro_bytes(s.as_bytes());
1324            let mut cur = AvroCursor::new(&data);
1325            dec.decode(&mut cur).unwrap();
1326        }
1327        let arr = dec.flush(None).unwrap();
1328        let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
1329        assert_eq!(a.value(0), b"");
1330        assert_eq!(a.value(1), b"abc");
1331        assert_eq!(a.value(2), "data".as_bytes());
1332    }
1333
1334    #[test]
1335    fn test_schema_resolution_no_promotion_passthrough_int() {
1336        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
1337        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
1338        let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
1339        let mut dec = Decoder::try_new(field.data_type()).unwrap();
1340        assert!(matches!(dec, Decoder::Int32(_)));
1341        for v in [7, -9] {
1342            let data = encode_avro_int(v);
1343            let mut cur = AvroCursor::new(&data);
1344            dec.decode(&mut cur).unwrap();
1345        }
1346        let arr = dec.flush(None).unwrap();
1347        let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1348        assert_eq!(a.value(0), 7);
1349        assert_eq!(a.value(1), -9);
1350    }
1351
1352    #[test]
1353    fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
1354        let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
1355        let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
1356        let res = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false);
1357        assert!(res.is_err(), "expected error for illegal promotion");
1358    }
1359
1360    #[test]
1361    fn test_map_decoding_one_entry() {
1362        let value_type = avro_from_codec(Codec::Utf8);
1363        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
1364        let mut decoder = Decoder::try_new(&map_type).unwrap();
1365        // Encode a single map with one entry: {"hello": "world"}
1366        let mut data = Vec::new();
1367        data.extend_from_slice(&encode_avro_long(1));
1368        data.extend_from_slice(&encode_avro_bytes(b"hello")); // key
1369        data.extend_from_slice(&encode_avro_bytes(b"world")); // value
1370        data.extend_from_slice(&encode_avro_long(0));
1371        let mut cursor = AvroCursor::new(&data);
1372        decoder.decode(&mut cursor).unwrap();
1373        let array = decoder.flush(None).unwrap();
1374        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
1375        assert_eq!(map_arr.len(), 1); // one map
1376        assert_eq!(map_arr.value_length(0), 1);
1377        let entries = map_arr.value(0);
1378        let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
1379        assert_eq!(struct_entries.len(), 1);
1380        let key_arr = struct_entries
1381            .column_by_name("key")
1382            .unwrap()
1383            .as_any()
1384            .downcast_ref::<StringArray>()
1385            .unwrap();
1386        let val_arr = struct_entries
1387            .column_by_name("value")
1388            .unwrap()
1389            .as_any()
1390            .downcast_ref::<StringArray>()
1391            .unwrap();
1392        assert_eq!(key_arr.value(0), "hello");
1393        assert_eq!(val_arr.value(0), "world");
1394    }
1395
1396    #[test]
1397    fn test_map_decoding_empty() {
1398        let value_type = avro_from_codec(Codec::Utf8);
1399        let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
1400        let mut decoder = Decoder::try_new(&map_type).unwrap();
1401        let data = encode_avro_long(0);
1402        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
1403        let array = decoder.flush(None).unwrap();
1404        let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
1405        assert_eq!(map_arr.len(), 1);
1406        assert_eq!(map_arr.value_length(0), 0);
1407    }
1408
1409    #[test]
1410    fn test_fixed_decoding() {
1411        let avro_type = avro_from_codec(Codec::Fixed(3));
1412        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
1413
1414        let data1 = [1u8, 2, 3];
1415        let mut cursor1 = AvroCursor::new(&data1);
1416        decoder
1417            .decode(&mut cursor1)
1418            .expect("Failed to decode data1");
1419        assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
1420        let data2 = [4u8, 5, 6];
1421        let mut cursor2 = AvroCursor::new(&data2);
1422        decoder
1423            .decode(&mut cursor2)
1424            .expect("Failed to decode data2");
1425        assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
1426        let array = decoder.flush(None).expect("Failed to flush decoder");
1427        assert_eq!(array.len(), 2, "Array should contain two items");
1428        let fixed_size_binary_array = array
1429            .as_any()
1430            .downcast_ref::<FixedSizeBinaryArray>()
1431            .expect("Failed to downcast to FixedSizeBinaryArray");
1432        assert_eq!(
1433            fixed_size_binary_array.value_length(),
1434            3,
1435            "Fixed size of binary values should be 3"
1436        );
1437        assert_eq!(
1438            fixed_size_binary_array.value(0),
1439            &[1, 2, 3],
1440            "First item mismatch"
1441        );
1442        assert_eq!(
1443            fixed_size_binary_array.value(1),
1444            &[4, 5, 6],
1445            "Second item mismatch"
1446        );
1447    }
1448
1449    #[test]
1450    fn test_fixed_decoding_empty() {
1451        let avro_type = avro_from_codec(Codec::Fixed(5));
1452        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
1453
1454        let array = decoder
1455            .flush(None)
1456            .expect("Failed to flush decoder for empty input");
1457
1458        assert_eq!(array.len(), 0, "Array should be empty");
1459        let fixed_size_binary_array = array
1460            .as_any()
1461            .downcast_ref::<FixedSizeBinaryArray>()
1462            .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
1463
1464        assert_eq!(
1465            fixed_size_binary_array.value_length(),
1466            5,
1467            "Fixed size of binary values should be 5 as per type"
1468        );
1469    }
1470
1471    #[test]
1472    fn test_uuid_decoding() {
1473        let avro_type = avro_from_codec(Codec::Uuid);
1474        let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
1475        let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
1476        let data = encode_avro_bytes(uuid_str.as_bytes());
1477        let mut cursor = AvroCursor::new(&data);
1478        decoder.decode(&mut cursor).expect("Failed to decode data");
1479        assert_eq!(
1480            cursor.position(),
1481            data.len(),
1482            "Cursor should advance by varint size + data size"
1483        );
1484        let array = decoder.flush(None).expect("Failed to flush decoder");
1485        let fixed_size_binary_array = array
1486            .as_any()
1487            .downcast_ref::<FixedSizeBinaryArray>()
1488            .expect("Array should be a FixedSizeBinaryArray");
1489        assert_eq!(fixed_size_binary_array.len(), 1);
1490        assert_eq!(fixed_size_binary_array.value_length(), 16);
1491        let expected_bytes = [
1492            0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
1493            0x6b, 0xf6,
1494        ];
1495        assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
1496    }
1497
1498    #[test]
1499    fn test_array_decoding() {
1500        let item_dt = avro_from_codec(Codec::Int32);
1501        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
1502        let mut decoder = Decoder::try_new(&list_dt).unwrap();
1503        let mut row1 = Vec::new();
1504        row1.extend_from_slice(&encode_avro_long(2));
1505        row1.extend_from_slice(&encode_avro_int(10));
1506        row1.extend_from_slice(&encode_avro_int(20));
1507        row1.extend_from_slice(&encode_avro_long(0));
1508        let row2 = encode_avro_long(0);
1509        let mut cursor = AvroCursor::new(&row1);
1510        decoder.decode(&mut cursor).unwrap();
1511        let mut cursor2 = AvroCursor::new(&row2);
1512        decoder.decode(&mut cursor2).unwrap();
1513        let array = decoder.flush(None).unwrap();
1514        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
1515        assert_eq!(list_arr.len(), 2);
1516        let offsets = list_arr.value_offsets();
1517        assert_eq!(offsets, &[0, 2, 2]);
1518        let values = list_arr.values();
1519        let int_arr = values.as_primitive::<Int32Type>();
1520        assert_eq!(int_arr.len(), 2);
1521        assert_eq!(int_arr.value(0), 10);
1522        assert_eq!(int_arr.value(1), 20);
1523    }
1524
1525    #[test]
1526    fn test_array_decoding_with_negative_block_count() {
1527        let item_dt = avro_from_codec(Codec::Int32);
1528        let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
1529        let mut decoder = Decoder::try_new(&list_dt).unwrap();
1530        let mut data = encode_avro_long(-3);
1531        data.extend_from_slice(&encode_avro_long(12));
1532        data.extend_from_slice(&encode_avro_int(1));
1533        data.extend_from_slice(&encode_avro_int(2));
1534        data.extend_from_slice(&encode_avro_int(3));
1535        data.extend_from_slice(&encode_avro_long(0));
1536        let mut cursor = AvroCursor::new(&data);
1537        decoder.decode(&mut cursor).unwrap();
1538        let array = decoder.flush(None).unwrap();
1539        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
1540        assert_eq!(list_arr.len(), 1);
1541        assert_eq!(list_arr.value_length(0), 3);
1542        let values = list_arr.values().as_primitive::<Int32Type>();
1543        assert_eq!(values.len(), 3);
1544        assert_eq!(values.value(0), 1);
1545        assert_eq!(values.value(1), 2);
1546        assert_eq!(values.value(2), 3);
1547    }
1548
1549    #[test]
1550    fn test_nested_array_decoding() {
1551        let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
1552        let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
1553        let mut decoder = Decoder::try_new(&nested_ty).unwrap();
1554        let mut buf = Vec::new();
1555        buf.extend(encode_avro_long(1));
1556        buf.extend(encode_avro_long(2));
1557        buf.extend(encode_avro_int(5));
1558        buf.extend(encode_avro_int(6));
1559        buf.extend(encode_avro_long(0));
1560        buf.extend(encode_avro_long(0));
1561        let mut cursor = AvroCursor::new(&buf);
1562        decoder.decode(&mut cursor).unwrap();
1563        let arr = decoder.flush(None).unwrap();
1564        let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
1565        assert_eq!(outer.len(), 1);
1566        assert_eq!(outer.value_length(0), 1);
1567        let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
1568        assert_eq!(inner.len(), 1);
1569        assert_eq!(inner.value_length(0), 2);
1570        let values = inner
1571            .values()
1572            .as_any()
1573            .downcast_ref::<Int32Array>()
1574            .unwrap();
1575        assert_eq!(values.values(), &[5, 6]);
1576    }
1577
1578    #[test]
1579    fn test_array_decoding_empty_array() {
1580        let value_type = avro_from_codec(Codec::Utf8);
1581        let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
1582        let mut decoder = Decoder::try_new(&map_type).unwrap();
1583        let data = encode_avro_long(0);
1584        decoder.decode(&mut AvroCursor::new(&data)).unwrap();
1585        let array = decoder.flush(None).unwrap();
1586        let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
1587        assert_eq!(list_arr.len(), 1);
1588        assert_eq!(list_arr.value_length(0), 0);
1589    }
1590
1591    #[test]
1592    fn test_decimal_decoding_fixed256() {
1593        let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
1594        let mut decoder = Decoder::try_new(&dt).unwrap();
1595        let row1 = [
1596            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1597            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1598            0x00, 0x00, 0x30, 0x39,
1599        ];
1600        let row2 = [
1601            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1602            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1603            0xFF, 0xFF, 0xFF, 0x85,
1604        ];
1605        let mut data = Vec::new();
1606        data.extend_from_slice(&row1);
1607        data.extend_from_slice(&row2);
1608        let mut cursor = AvroCursor::new(&data);
1609        decoder.decode(&mut cursor).unwrap();
1610        decoder.decode(&mut cursor).unwrap();
1611        let arr = decoder.flush(None).unwrap();
1612        let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
1613        assert_eq!(dec.len(), 2);
1614        assert_eq!(dec.value_as_string(0), "123.45");
1615        assert_eq!(dec.value_as_string(1), "-1.23");
1616    }
1617
1618    #[test]
1619    fn test_decimal_decoding_fixed128() {
1620        let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
1621        let mut decoder = Decoder::try_new(&dt).unwrap();
1622        let row1 = [
1623            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1624            0x30, 0x39,
1625        ];
1626        let row2 = [
1627            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1628            0xFF, 0x85,
1629        ];
1630        let mut data = Vec::new();
1631        data.extend_from_slice(&row1);
1632        data.extend_from_slice(&row2);
1633        let mut cursor = AvroCursor::new(&data);
1634        decoder.decode(&mut cursor).unwrap();
1635        decoder.decode(&mut cursor).unwrap();
1636        let arr = decoder.flush(None).unwrap();
1637        let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1638        assert_eq!(dec.len(), 2);
1639        assert_eq!(dec.value_as_string(0), "123.45");
1640        assert_eq!(dec.value_as_string(1), "-1.23");
1641    }
1642
1643    #[test]
1644    fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
1645        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
1646        let mut decoder = Decoder::try_new(&dt).unwrap();
1647        let row1 = [
1648            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1649            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1650            0x00, 0x00, 0x30, 0x39,
1651        ];
1652        let row2 = [
1653            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1654            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1655            0xFF, 0xFF, 0xFF, 0x85,
1656        ];
1657        let mut data = Vec::new();
1658        data.extend_from_slice(&row1);
1659        data.extend_from_slice(&row2);
1660        let mut cursor = AvroCursor::new(&data);
1661        decoder.decode(&mut cursor).unwrap();
1662        decoder.decode(&mut cursor).unwrap();
1663        let arr = decoder.flush(None).unwrap();
1664        #[cfg(feature = "small_decimals")]
1665        {
1666            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
1667            assert_eq!(dec.len(), 2);
1668            assert_eq!(dec.value_as_string(0), "123.45");
1669            assert_eq!(dec.value_as_string(1), "-1.23");
1670        }
1671        #[cfg(not(feature = "small_decimals"))]
1672        {
1673            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1674            assert_eq!(dec.len(), 2);
1675            assert_eq!(dec.value_as_string(0), "123.45");
1676            assert_eq!(dec.value_as_string(1), "-1.23");
1677        }
1678    }
1679
1680    #[test]
1681    fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
1682        let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
1683        let mut decoder = Decoder::try_new(&dt).unwrap();
1684        let row1 = [
1685            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1686            0x30, 0x39,
1687        ];
1688        let row2 = [
1689            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1690            0xFF, 0x85,
1691        ];
1692        let mut data = Vec::new();
1693        data.extend_from_slice(&row1);
1694        data.extend_from_slice(&row2);
1695        let mut cursor = AvroCursor::new(&data);
1696        decoder.decode(&mut cursor).unwrap();
1697        decoder.decode(&mut cursor).unwrap();
1698
1699        let arr = decoder.flush(None).unwrap();
1700        #[cfg(feature = "small_decimals")]
1701        {
1702            let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
1703            assert_eq!(dec.len(), 2);
1704            assert_eq!(dec.value_as_string(0), "123.45");
1705            assert_eq!(dec.value_as_string(1), "-1.23");
1706        }
1707        #[cfg(not(feature = "small_decimals"))]
1708        {
1709            let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1710            assert_eq!(dec.len(), 2);
1711            assert_eq!(dec.value_as_string(0), "123.45");
1712            assert_eq!(dec.value_as_string(1), "-1.23");
1713        }
1714    }
1715
1716    #[test]
1717    fn test_decimal_decoding_bytes_with_nulls() {
1718        let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
1719        let inner = Decoder::try_new(&dt).unwrap();
1720        let mut decoder = Decoder::Nullable(
1721            Nullability::NullSecond,
1722            NullBufferBuilder::new(DEFAULT_CAPACITY),
1723            Box::new(inner),
1724        );
1725        let mut data = Vec::new();
1726        data.extend_from_slice(&encode_avro_int(0));
1727        data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
1728        data.extend_from_slice(&encode_avro_int(1));
1729        data.extend_from_slice(&encode_avro_int(0));
1730        data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
1731        let mut cursor = AvroCursor::new(&data);
1732        decoder.decode(&mut cursor).unwrap();
1733        decoder.decode(&mut cursor).unwrap();
1734        decoder.decode(&mut cursor).unwrap();
1735        let arr = decoder.flush(None).unwrap();
1736        #[cfg(feature = "small_decimals")]
1737        {
1738            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
1739            assert_eq!(dec_arr.len(), 3);
1740            assert!(dec_arr.is_valid(0));
1741            assert!(!dec_arr.is_valid(1));
1742            assert!(dec_arr.is_valid(2));
1743            assert_eq!(dec_arr.value_as_string(0), "123.4");
1744            assert_eq!(dec_arr.value_as_string(2), "-123.4");
1745        }
1746        #[cfg(not(feature = "small_decimals"))]
1747        {
1748            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1749            assert_eq!(dec_arr.len(), 3);
1750            assert!(dec_arr.is_valid(0));
1751            assert!(!dec_arr.is_valid(1));
1752            assert!(dec_arr.is_valid(2));
1753            assert_eq!(dec_arr.value_as_string(0), "123.4");
1754            assert_eq!(dec_arr.value_as_string(2), "-123.4");
1755        }
1756    }
1757
1758    #[test]
1759    fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
1760        let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
1761        let inner = Decoder::try_new(&dt).unwrap();
1762        let mut decoder = Decoder::Nullable(
1763            Nullability::NullSecond,
1764            NullBufferBuilder::new(DEFAULT_CAPACITY),
1765            Box::new(inner),
1766        );
1767        let row1 = [
1768            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
1769            0xE2, 0x40,
1770        ];
1771        let row3 = [
1772            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
1773            0x1D, 0xC0,
1774        ];
1775        let mut data = Vec::new();
1776        data.extend_from_slice(&encode_avro_int(0));
1777        data.extend_from_slice(&row1);
1778        data.extend_from_slice(&encode_avro_int(1));
1779        data.extend_from_slice(&encode_avro_int(0));
1780        data.extend_from_slice(&row3);
1781        let mut cursor = AvroCursor::new(&data);
1782        decoder.decode(&mut cursor).unwrap();
1783        decoder.decode(&mut cursor).unwrap();
1784        decoder.decode(&mut cursor).unwrap();
1785        let arr = decoder.flush(None).unwrap();
1786        #[cfg(feature = "small_decimals")]
1787        {
1788            let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
1789            assert_eq!(dec_arr.len(), 3);
1790            assert!(dec_arr.is_valid(0));
1791            assert!(!dec_arr.is_valid(1));
1792            assert!(dec_arr.is_valid(2));
1793            assert_eq!(dec_arr.value_as_string(0), "1234.56");
1794            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
1795        }
1796        #[cfg(not(feature = "small_decimals"))]
1797        {
1798            let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1799            assert_eq!(dec_arr.len(), 3);
1800            assert!(dec_arr.is_valid(0));
1801            assert!(!dec_arr.is_valid(1));
1802            assert!(dec_arr.is_valid(2));
1803            assert_eq!(dec_arr.value_as_string(0), "1234.56");
1804            assert_eq!(dec_arr.value_as_string(2), "-1234.56");
1805        }
1806    }
1807
1808    #[test]
1809    fn test_enum_decoding() {
1810        let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
1811        let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
1812        let mut decoder = Decoder::try_new(&avro_type).unwrap();
1813        let mut data = Vec::new();
1814        data.extend_from_slice(&encode_avro_int(2));
1815        data.extend_from_slice(&encode_avro_int(0));
1816        data.extend_from_slice(&encode_avro_int(1));
1817        let mut cursor = AvroCursor::new(&data);
1818        decoder.decode(&mut cursor).unwrap();
1819        decoder.decode(&mut cursor).unwrap();
1820        decoder.decode(&mut cursor).unwrap();
1821        let array = decoder.flush(None).unwrap();
1822        let dict_array = array
1823            .as_any()
1824            .downcast_ref::<DictionaryArray<Int32Type>>()
1825            .unwrap();
1826        assert_eq!(dict_array.len(), 3);
1827        let values = dict_array
1828            .values()
1829            .as_any()
1830            .downcast_ref::<StringArray>()
1831            .unwrap();
1832        assert_eq!(values.value(0), "A");
1833        assert_eq!(values.value(1), "B");
1834        assert_eq!(values.value(2), "C");
1835        assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
1836    }
1837
1838    #[test]
1839    fn test_enum_decoding_with_nulls() {
1840        let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
1841        let enum_codec = Codec::Enum(symbols.clone());
1842        let avro_type =
1843            AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
1844        let mut decoder = Decoder::try_new(&avro_type).unwrap();
1845        let mut data = Vec::new();
1846        data.extend_from_slice(&encode_avro_long(1));
1847        data.extend_from_slice(&encode_avro_int(1));
1848        data.extend_from_slice(&encode_avro_long(0));
1849        data.extend_from_slice(&encode_avro_long(1));
1850        data.extend_from_slice(&encode_avro_int(0));
1851        let mut cursor = AvroCursor::new(&data);
1852        decoder.decode(&mut cursor).unwrap();
1853        decoder.decode(&mut cursor).unwrap();
1854        decoder.decode(&mut cursor).unwrap();
1855        let array = decoder.flush(None).unwrap();
1856        let dict_array = array
1857            .as_any()
1858            .downcast_ref::<DictionaryArray<Int32Type>>()
1859            .unwrap();
1860        assert_eq!(dict_array.len(), 3);
1861        assert!(dict_array.is_valid(0));
1862        assert!(dict_array.is_null(1));
1863        assert!(dict_array.is_valid(2));
1864        let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
1865        assert_eq!(dict_array.keys(), &expected_keys);
1866        let values = dict_array
1867            .values()
1868            .as_any()
1869            .downcast_ref::<StringArray>()
1870            .unwrap();
1871        assert_eq!(values.value(0), "X");
1872        assert_eq!(values.value(1), "Y");
1873    }
1874
1875    #[test]
1876    fn test_duration_decoding_with_nulls() {
1877        let duration_codec = Codec::Interval;
1878        let avro_type = AvroDataType::new(
1879            duration_codec,
1880            Default::default(),
1881            Some(Nullability::NullFirst),
1882        );
1883        let mut decoder = Decoder::try_new(&avro_type).unwrap();
1884        let mut data = Vec::new();
1885        // First value: 1 month, 2 days, 3 millis
1886        data.extend_from_slice(&encode_avro_long(1)); // not null
1887        let mut duration1 = Vec::new();
1888        duration1.extend_from_slice(&1u32.to_le_bytes());
1889        duration1.extend_from_slice(&2u32.to_le_bytes());
1890        duration1.extend_from_slice(&3u32.to_le_bytes());
1891        data.extend_from_slice(&duration1);
1892        // Second value: null
1893        data.extend_from_slice(&encode_avro_long(0)); // null
1894        data.extend_from_slice(&encode_avro_long(1)); // not null
1895        let mut duration2 = Vec::new();
1896        duration2.extend_from_slice(&4u32.to_le_bytes());
1897        duration2.extend_from_slice(&5u32.to_le_bytes());
1898        duration2.extend_from_slice(&6u32.to_le_bytes());
1899        data.extend_from_slice(&duration2);
1900        let mut cursor = AvroCursor::new(&data);
1901        decoder.decode(&mut cursor).unwrap();
1902        decoder.decode(&mut cursor).unwrap();
1903        decoder.decode(&mut cursor).unwrap();
1904        let array = decoder.flush(None).unwrap();
1905        let interval_array = array
1906            .as_any()
1907            .downcast_ref::<IntervalMonthDayNanoArray>()
1908            .unwrap();
1909        assert_eq!(interval_array.len(), 3);
1910        assert!(interval_array.is_valid(0));
1911        assert!(interval_array.is_null(1));
1912        assert!(interval_array.is_valid(2));
1913        let expected = IntervalMonthDayNanoArray::from(vec![
1914            Some(IntervalMonthDayNano {
1915                months: 1,
1916                days: 2,
1917                nanoseconds: 3_000_000,
1918            }),
1919            None,
1920            Some(IntervalMonthDayNano {
1921                months: 4,
1922                days: 5,
1923                nanoseconds: 6_000_000,
1924            }),
1925        ]);
1926        assert_eq!(interval_array, &expected);
1927    }
1928
1929    #[test]
1930    fn test_duration_decoding_empty() {
1931        let duration_codec = Codec::Interval;
1932        let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
1933        let mut decoder = Decoder::try_new(&avro_type).unwrap();
1934        let array = decoder.flush(None).unwrap();
1935        assert_eq!(array.len(), 0);
1936    }
1937
1938    #[test]
1939    fn test_nullable_decode_error_bitmap_corruption() {
1940        // Nullable Int32 with ['T','null'] encoding (NullSecond)
1941        let avro_type = AvroDataType::new(
1942            Codec::Int32,
1943            Default::default(),
1944            Some(Nullability::NullSecond),
1945        );
1946        let mut decoder = Decoder::try_new(&avro_type).unwrap();
1947
1948        // Row 1: union branch 1 (null)
1949        let mut row1 = Vec::new();
1950        row1.extend_from_slice(&encode_avro_int(1));
1951
1952        // Row 2: union branch 0 (non-null) but missing the int payload -> decode error
1953        let mut row2 = Vec::new();
1954        row2.extend_from_slice(&encode_avro_int(0)); // branch = 0 => non-null
1955
1956        // Row 3: union branch 0 (non-null) with correct int payload -> should succeed
1957        let mut row3 = Vec::new();
1958        row3.extend_from_slice(&encode_avro_int(0)); // branch
1959        row3.extend_from_slice(&encode_avro_int(42)); // actual value
1960
1961        decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
1962        assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); // decode error
1963        decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
1964
1965        let array = decoder.flush(None).unwrap();
1966
1967        // Should contain 2 elements: row1 (null) and row3 (42)
1968        assert_eq!(array.len(), 2);
1969        let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
1970        assert!(int_array.is_null(0)); // row1 is null
1971        assert_eq!(int_array.value(1), 42); // row3 value is 42
1972    }
1973
1974    #[test]
1975    fn test_enum_mapping_reordered_symbols() {
1976        let reader_symbols: Arc<[String]> =
1977            vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
1978        let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
1979        let default_index: i32 = -1;
1980        let mut dec = Decoder::EnumResolved {
1981            indices: Vec::with_capacity(DEFAULT_CAPACITY),
1982            symbols: reader_symbols.clone(),
1983            mapping,
1984            default_index,
1985        };
1986        let mut data = Vec::new();
1987        data.extend_from_slice(&encode_avro_int(0));
1988        data.extend_from_slice(&encode_avro_int(1));
1989        data.extend_from_slice(&encode_avro_int(2));
1990        let mut cur = AvroCursor::new(&data);
1991        dec.decode(&mut cur).unwrap();
1992        dec.decode(&mut cur).unwrap();
1993        dec.decode(&mut cur).unwrap();
1994        let arr = dec.flush(None).unwrap();
1995        let dict = arr
1996            .as_any()
1997            .downcast_ref::<DictionaryArray<Int32Type>>()
1998            .unwrap();
1999        let expected_keys = Int32Array::from(vec![2, 0, 1]);
2000        assert_eq!(dict.keys(), &expected_keys);
2001        let values = dict
2002            .values()
2003            .as_any()
2004            .downcast_ref::<StringArray>()
2005            .unwrap();
2006        assert_eq!(values.value(0), "B");
2007        assert_eq!(values.value(1), "C");
2008        assert_eq!(values.value(2), "A");
2009    }
2010
2011    #[test]
2012    fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
2013        let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
2014        let default_index: i32 = 1;
2015        let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
2016        let mut dec = Decoder::EnumResolved {
2017            indices: Vec::with_capacity(DEFAULT_CAPACITY),
2018            symbols: reader_symbols.clone(),
2019            mapping,
2020            default_index,
2021        };
2022        let mut data = Vec::new();
2023        data.extend_from_slice(&encode_avro_int(0));
2024        data.extend_from_slice(&encode_avro_int(1));
2025        data.extend_from_slice(&encode_avro_int(99));
2026        let mut cur = AvroCursor::new(&data);
2027        dec.decode(&mut cur).unwrap();
2028        dec.decode(&mut cur).unwrap();
2029        dec.decode(&mut cur).unwrap();
2030        let arr = dec.flush(None).unwrap();
2031        let dict = arr
2032            .as_any()
2033            .downcast_ref::<DictionaryArray<Int32Type>>()
2034            .unwrap();
2035        let expected_keys = Int32Array::from(vec![0, 1, 1]);
2036        assert_eq!(dict.keys(), &expected_keys);
2037        let values = dict
2038            .values()
2039            .as_any()
2040            .downcast_ref::<StringArray>()
2041            .unwrap();
2042        assert_eq!(values.value(0), "A");
2043        assert_eq!(values.value(1), "B");
2044    }
2045
2046    #[test]
2047    fn test_enum_mapping_unknown_symbol_without_default_errors() {
2048        let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
2049        let default_index: i32 = -1; // indicates no default at type-level
2050        let mapping: Arc<[i32]> = Arc::from(vec![-1]);
2051        let mut dec = Decoder::EnumResolved {
2052            indices: Vec::with_capacity(DEFAULT_CAPACITY),
2053            symbols: reader_symbols,
2054            mapping,
2055            default_index,
2056        };
2057        let data = encode_avro_int(0);
2058        let mut cur = AvroCursor::new(&data);
2059        let err = dec
2060            .decode(&mut cur)
2061            .expect_err("expected decode error for unresolved enum without default");
2062        let msg = err.to_string();
2063        assert!(
2064            msg.contains("not resolvable") && msg.contains("no default"),
2065            "unexpected error message: {msg}"
2066        );
2067    }
2068
2069    fn make_record_resolved_decoder(
2070        reader_fields: &[(&str, DataType, bool)],
2071        writer_to_reader: Vec<Option<usize>>,
2072        mut skip_decoders: Vec<Option<super::Skipper>>,
2073    ) -> Decoder {
2074        let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
2075        let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
2076        for (name, dt, nullable) in reader_fields {
2077            field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
2078            let enc = match dt {
2079                DataType::Int32 => Decoder::Int32(Vec::new()),
2080                DataType::Int64 => Decoder::Int64(Vec::new()),
2081                DataType::Utf8 => {
2082                    Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
2083                }
2084                other => panic!("Unsupported test reader field type: {other:?}"),
2085            };
2086            encodings.push(enc);
2087        }
2088        let fields: Fields = field_refs.into();
2089        Decoder::RecordResolved {
2090            fields,
2091            encodings,
2092            writer_to_reader: Arc::from(writer_to_reader),
2093            skip_decoders,
2094        }
2095    }
2096
2097    #[test]
2098    fn test_skip_writer_trailing_field_int32() {
2099        let mut dec = make_record_resolved_decoder(
2100            &[("id", arrow_schema::DataType::Int32, false)],
2101            vec![Some(0), None],
2102            vec![None, Some(super::Skipper::Int32)],
2103        );
2104        let mut data = Vec::new();
2105        data.extend_from_slice(&encode_avro_int(7));
2106        data.extend_from_slice(&encode_avro_int(999));
2107        let mut cur = AvroCursor::new(&data);
2108        dec.decode(&mut cur).unwrap();
2109        assert_eq!(cur.position(), data.len());
2110        let arr = dec.flush(None).unwrap();
2111        let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
2112        assert_eq!(struct_arr.len(), 1);
2113        let id = struct_arr
2114            .column_by_name("id")
2115            .unwrap()
2116            .as_any()
2117            .downcast_ref::<Int32Array>()
2118            .unwrap();
2119        assert_eq!(id.value(0), 7);
2120    }
2121
2122    #[test]
2123    fn test_skip_writer_middle_field_string() {
2124        let mut dec = make_record_resolved_decoder(
2125            &[
2126                ("id", DataType::Int32, false),
2127                ("score", DataType::Int64, false),
2128            ],
2129            vec![Some(0), None, Some(1)],
2130            vec![None, Some(Skipper::String), None],
2131        );
2132        let mut data = Vec::new();
2133        data.extend_from_slice(&encode_avro_int(42));
2134        data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
2135        data.extend_from_slice(&encode_avro_long(1000));
2136        let mut cur = AvroCursor::new(&data);
2137        dec.decode(&mut cur).unwrap();
2138        assert_eq!(cur.position(), data.len());
2139        let arr = dec.flush(None).unwrap();
2140        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
2141        let id = s
2142            .column_by_name("id")
2143            .unwrap()
2144            .as_any()
2145            .downcast_ref::<Int32Array>()
2146            .unwrap();
2147        let score = s
2148            .column_by_name("score")
2149            .unwrap()
2150            .as_any()
2151            .downcast_ref::<Int64Array>()
2152            .unwrap();
2153        assert_eq!(id.value(0), 42);
2154        assert_eq!(score.value(0), 1000);
2155    }
2156
2157    #[test]
2158    fn test_skip_writer_array_with_negative_block_count_fast() {
2159        let mut dec = make_record_resolved_decoder(
2160            &[("id", DataType::Int32, false)],
2161            vec![None, Some(0)],
2162            vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
2163        );
2164        let mut array_payload = Vec::new();
2165        array_payload.extend_from_slice(&encode_avro_int(1));
2166        array_payload.extend_from_slice(&encode_avro_int(2));
2167        array_payload.extend_from_slice(&encode_avro_int(3));
2168        let mut data = Vec::new();
2169        data.extend_from_slice(&encode_avro_long(-3));
2170        data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
2171        data.extend_from_slice(&array_payload);
2172        data.extend_from_slice(&encode_avro_long(0));
2173        data.extend_from_slice(&encode_avro_int(5));
2174        let mut cur = AvroCursor::new(&data);
2175        dec.decode(&mut cur).unwrap();
2176        assert_eq!(cur.position(), data.len());
2177        let arr = dec.flush(None).unwrap();
2178        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
2179        let id = s
2180            .column_by_name("id")
2181            .unwrap()
2182            .as_any()
2183            .downcast_ref::<Int32Array>()
2184            .unwrap();
2185        assert_eq!(id.len(), 1);
2186        assert_eq!(id.value(0), 5);
2187    }
2188
2189    #[test]
2190    fn test_skip_writer_map_with_negative_block_count_fast() {
2191        let mut dec = make_record_resolved_decoder(
2192            &[("id", DataType::Int32, false)],
2193            vec![None, Some(0)],
2194            vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
2195        );
2196        let mut entries = Vec::new();
2197        entries.extend_from_slice(&encode_avro_bytes(b"k1"));
2198        entries.extend_from_slice(&encode_avro_int(10));
2199        entries.extend_from_slice(&encode_avro_bytes(b"k2"));
2200        entries.extend_from_slice(&encode_avro_int(20));
2201        let mut data = Vec::new();
2202        data.extend_from_slice(&encode_avro_long(-2));
2203        data.extend_from_slice(&encode_avro_long(entries.len() as i64));
2204        data.extend_from_slice(&entries);
2205        data.extend_from_slice(&encode_avro_long(0));
2206        data.extend_from_slice(&encode_avro_int(123));
2207        let mut cur = AvroCursor::new(&data);
2208        dec.decode(&mut cur).unwrap();
2209        assert_eq!(cur.position(), data.len());
2210        let arr = dec.flush(None).unwrap();
2211        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
2212        let id = s
2213            .column_by_name("id")
2214            .unwrap()
2215            .as_any()
2216            .downcast_ref::<Int32Array>()
2217            .unwrap();
2218        assert_eq!(id.len(), 1);
2219        assert_eq!(id.value(0), 123);
2220    }
2221
2222    #[test]
2223    fn test_skip_writer_nullable_field_union_nullfirst() {
2224        let mut dec = make_record_resolved_decoder(
2225            &[("id", DataType::Int32, false)],
2226            vec![None, Some(0)],
2227            vec![
2228                Some(super::Skipper::Nullable(
2229                    Nullability::NullFirst,
2230                    Box::new(super::Skipper::Int32),
2231                )),
2232                None,
2233            ],
2234        );
2235        let mut row1 = Vec::new();
2236        row1.extend_from_slice(&encode_avro_long(0));
2237        row1.extend_from_slice(&encode_avro_int(5));
2238        let mut row2 = Vec::new();
2239        row2.extend_from_slice(&encode_avro_long(1));
2240        row2.extend_from_slice(&encode_avro_int(123));
2241        row2.extend_from_slice(&encode_avro_int(7));
2242        let mut cur1 = AvroCursor::new(&row1);
2243        let mut cur2 = AvroCursor::new(&row2);
2244        dec.decode(&mut cur1).unwrap();
2245        dec.decode(&mut cur2).unwrap();
2246        assert_eq!(cur1.position(), row1.len());
2247        assert_eq!(cur2.position(), row2.len());
2248        let arr = dec.flush(None).unwrap();
2249        let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
2250        let id = s
2251            .column_by_name("id")
2252            .unwrap()
2253            .as_any()
2254            .downcast_ref::<Int32Array>()
2255            .unwrap();
2256        assert_eq!(id.len(), 2);
2257        assert_eq!(id.value(0), 5);
2258        assert_eq!(id.value(1), 7);
2259    }
2260}