Skip to main content

arrow_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Codec for Mapping Avro and Arrow types.
19
20use crate::schema::{
21    AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22    AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23    PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27    IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40/// Contains information about how to resolve differences between a writer's and a reader's schema.
41#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43    /// Indicates that the writer's type should be promoted to the reader's type.
44    Promotion(Promotion),
45    /// Indicates that a default value should be used for a field.
46    DefaultValue(AvroLiteral),
47    /// Provides mapping information for resolving enums.
48    EnumMapping(EnumMapping),
49    /// Provides resolution information for record fields.
50    Record(ResolvedRecord),
51    /// Provides mapping and shape info for resolving unions.
52    Union(ResolvedUnion),
53}
54
55/// Represents a literal Avro value.
56///
57/// This is used to represent default values in an Avro schema.
58#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60    /// Represents a null value.
61    Null,
62    /// Represents a boolean value.
63    Boolean(bool),
64    /// Represents an integer value.
65    Int(i32),
66    /// Represents a long value.
67    Long(i64),
68    /// Represents a float value.
69    Float(f32),
70    /// Represents a double value.
71    Double(f64),
72    /// Represents a bytes value.
73    Bytes(Vec<u8>),
74    /// Represents a string value.
75    String(String),
76    /// Represents an enum symbol.
77    Enum(String),
78    /// Represents a JSON array default for an Avro array, containing element literals.
79    Array(Vec<AvroLiteral>),
80    /// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals.
81    Map(IndexMap<String, AvroLiteral>),
82}
83
84/// Contains the necessary information to resolve a writer's record against a reader's record schema.
85#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87    /// Maps a writer's field index to the corresponding reader's field index.
88    /// `None` if the writer's field is not present in the reader's schema.
89    pub(crate) writer_to_reader: Arc<[Option<usize>]>,
90    /// A list of indices in the reader's schema for fields that have a default value.
91    pub(crate) default_fields: Arc<[usize]>,
92    /// For fields present in the writer's schema but not the reader's, this stores their data type.
93    /// This is needed to correctly skip over these fields during deserialization.
94    pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
95}
96
97/// Defines the type of promotion to be applied during schema resolution.
98///
99/// Schema resolution may require promoting a writer's data type to a reader's data type.
100/// For example, an `int` can be promoted to a `long`, `float`, or `double`.
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub(crate) enum Promotion {
103    /// Direct read with no data type promotion.
104    Direct,
105    /// Promotes an `int` to a `long`.
106    IntToLong,
107    /// Promotes an `int` to a `float`.
108    IntToFloat,
109    /// Promotes an `int` to a `double`.
110    IntToDouble,
111    /// Promotes a `long` to a `float`.
112    LongToFloat,
113    /// Promotes a `long` to a `double`.
114    LongToDouble,
115    /// Promotes a `float` to a `double`.
116    FloatToDouble,
117    /// Promotes a `string` to `bytes`.
118    StringToBytes,
119    /// Promotes `bytes` to a `string`.
120    BytesToString,
121}
122
123impl Display for Promotion {
124    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
125        match self {
126            Self::Direct => write!(formatter, "Direct"),
127            Self::IntToLong => write!(formatter, "Int->Long"),
128            Self::IntToFloat => write!(formatter, "Int->Float"),
129            Self::IntToDouble => write!(formatter, "Int->Double"),
130            Self::LongToFloat => write!(formatter, "Long->Float"),
131            Self::LongToDouble => write!(formatter, "Long->Double"),
132            Self::FloatToDouble => write!(formatter, "Float->Double"),
133            Self::StringToBytes => write!(formatter, "String->Bytes"),
134            Self::BytesToString => write!(formatter, "Bytes->String"),
135        }
136    }
137}
138
139/// Information required to resolve a writer union against a reader union (or single type).
140#[derive(Debug, Clone, PartialEq)]
141pub(crate) struct ResolvedUnion {
142    /// For each writer branch index, the reader branch index and how to read it.
143    /// `None` means the writer branch doesn't resolve against the reader.
144    pub(crate) writer_to_reader: Arc<[Option<(usize, Promotion)>]>,
145    /// Whether the writer schema at this site is a union
146    pub(crate) writer_is_union: bool,
147    /// Whether the reader schema at this site is a union
148    pub(crate) reader_is_union: bool,
149}
150
151/// Holds the mapping information for resolving Avro enums.
152///
153/// When resolving schemas, the writer's enum symbols must be mapped to the reader's symbols.
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub(crate) struct EnumMapping {
156    /// A mapping from the writer's symbol index to the reader's symbol index.
157    pub(crate) mapping: Arc<[i32]>,
158    /// The index to use for a writer's symbol that is not present in the reader's enum
159    /// and a default value is specified in the reader's schema.
160    pub(crate) default_index: i32,
161}
162
163#[cfg(feature = "canonical_extension_types")]
164fn with_extension_type(codec: &Codec, field: Field) -> Field {
165    match codec {
166        Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
167        _ => field,
168    }
169}
170
171/// An Avro datatype mapped to the arrow data model
172#[derive(Debug, Clone, PartialEq)]
173pub(crate) struct AvroDataType {
174    nullability: Option<Nullability>,
175    metadata: HashMap<String, String>,
176    codec: Codec,
177    pub(crate) resolution: Option<ResolutionInfo>,
178}
179
180impl AvroDataType {
181    /// Create a new [`AvroDataType`] with the given parts.
182    pub(crate) fn new(
183        codec: Codec,
184        metadata: HashMap<String, String>,
185        nullability: Option<Nullability>,
186    ) -> Self {
187        AvroDataType {
188            codec,
189            metadata,
190            nullability,
191            resolution: None,
192        }
193    }
194
195    #[inline]
196    fn new_with_resolution(
197        codec: Codec,
198        metadata: HashMap<String, String>,
199        nullability: Option<Nullability>,
200        resolution: Option<ResolutionInfo>,
201    ) -> Self {
202        Self {
203            codec,
204            metadata,
205            nullability,
206            resolution,
207        }
208    }
209
210    /// Returns an arrow [`Field`] with the given name
211    pub(crate) fn field_with_name(&self, name: &str) -> Field {
212        let mut nullable = self.nullability.is_some();
213        if !nullable {
214            if let Codec::Union(children, _, _) = self.codec() {
215                // If any encoded branch is `null`, mark field as nullable
216                if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
217                    nullable = true;
218                }
219            }
220        }
221        let data_type = self.codec.data_type();
222        let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
223        #[cfg(feature = "canonical_extension_types")]
224        return with_extension_type(&self.codec, field);
225        #[cfg(not(feature = "canonical_extension_types"))]
226        field
227    }
228
229    /// Returns a reference to the codec used by this data type
230    ///
231    /// The codec determines how Avro data is encoded and mapped to Arrow data types.
232    /// This is useful when we need to inspect or use the specific encoding of a field.
233    pub(crate) fn codec(&self) -> &Codec {
234        &self.codec
235    }
236
237    /// Returns the nullability status of this data type
238    ///
239    /// In Avro, nullability is represented through unions with null types.
240    /// The returned value indicates how nulls are encoded in the Avro format:
241    /// - `Some(Nullability::NullFirst)` - Nulls are encoded as the first union variant
242    /// - `Some(Nullability::NullSecond)` - Nulls are encoded as the second union variant
243    /// - `None` - The type is not nullable
244    pub(crate) fn nullability(&self) -> Option<Nullability> {
245        self.nullability
246    }
247
248    #[inline]
249    fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
250        fn expect_string<'v>(
251            default_json: &'v Value,
252            data_type: &str,
253        ) -> Result<&'v str, ArrowError> {
254            match default_json {
255                Value::String(s) => Ok(s.as_str()),
256                _ => Err(ArrowError::SchemaError(format!(
257                    "Default value must be a JSON string for {data_type}"
258                ))),
259            }
260        }
261
262        fn parse_bytes_default(
263            default_json: &Value,
264            expected_len: Option<usize>,
265        ) -> Result<Vec<u8>, ArrowError> {
266            let s = expect_string(default_json, "bytes/fixed logical types")?;
267            let mut out = Vec::with_capacity(s.len());
268            for ch in s.chars() {
269                let cp = ch as u32;
270                if cp > 0xFF {
271                    return Err(ArrowError::SchemaError(format!(
272                        "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
273                    )));
274                }
275                out.push(cp as u8);
276            }
277            if let Some(len) = expected_len {
278                if out.len() != len {
279                    return Err(ArrowError::SchemaError(format!(
280                        "Default length {} does not match expected fixed size {len}",
281                        out.len(),
282                    )));
283                }
284            }
285            Ok(out)
286        }
287
288        fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
289            match default_json {
290                Value::Number(n) => n.as_i64().ok_or_else(|| {
291                    ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
292                }),
293                _ => Err(ArrowError::SchemaError(format!(
294                    "Default {data_type} must be a JSON integer"
295                ))),
296            }
297        }
298
299        fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
300            match default_json {
301                Value::Number(n) => n.as_f64().ok_or_else(|| {
302                    ArrowError::SchemaError(format!("Default {data_type} must be a number"))
303                }),
304                _ => Err(ArrowError::SchemaError(format!(
305                    "Default {data_type} must be a JSON number"
306                ))),
307            }
308        }
309
310        // Handle JSON nulls per-spec: allowed only for `null` type or unions with null FIRST
311        if default_json.is_null() {
312            return match self.codec() {
313                Codec::Null => Ok(AvroLiteral::Null),
314                Codec::Union(encodings, _, _) if !encodings.is_empty()
315                    && matches!(encodings[0].codec(), Codec::Null) =>
316                    {
317                        Ok(AvroLiteral::Null)
318                    }
319                _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
320                _ => Err(ArrowError::SchemaError(
321                    "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
322                        .to_string(),
323                )),
324            };
325        }
326        let lit = match self.codec() {
327            Codec::Null => {
328                return Err(ArrowError::SchemaError(
329                    "Default for `null` type must be JSON null".to_string(),
330                ));
331            }
332            Codec::Boolean => match default_json {
333                Value::Bool(b) => AvroLiteral::Boolean(*b),
334                _ => {
335                    return Err(ArrowError::SchemaError(
336                        "Boolean default must be a JSON boolean".to_string(),
337                    ));
338                }
339            },
340            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
341                let i = parse_json_i64(default_json, "int")?;
342                if i < i32::MIN as i64 || i > i32::MAX as i64 {
343                    return Err(ArrowError::SchemaError(format!(
344                        "Default int {i} out of i32 range"
345                    )));
346                }
347                AvroLiteral::Int(i as i32)
348            }
349            Codec::Int64
350            | Codec::TimeMicros
351            | Codec::TimestampMillis(_)
352            | Codec::TimestampMicros(_)
353            | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
354            #[cfg(feature = "avro_custom_types")]
355            Codec::DurationNanos
356            | Codec::DurationMicros
357            | Codec::DurationMillis
358            | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
359            Codec::Float32 => {
360                let f = parse_json_f64(default_json, "float")?;
361                if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
362                    return Err(ArrowError::SchemaError(format!(
363                        "Default float {f} out of f32 range or not finite"
364                    )));
365                }
366                AvroLiteral::Float(f as f32)
367            }
368            Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
369            Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
370                AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
371            }
372            Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
373            Codec::Fixed(sz) => {
374                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
375            }
376            Codec::Decimal(_, _, fixed_size) => {
377                AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
378            }
379            Codec::Enum(symbols) => {
380                let s = expect_string(default_json, "enum")?;
381                if symbols.iter().any(|sym| sym == s) {
382                    AvroLiteral::Enum(s.to_string())
383                } else {
384                    return Err(ArrowError::SchemaError(format!(
385                        "Default enum symbol {s:?} not found in reader enum symbols"
386                    )));
387                }
388            }
389            Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
390            Codec::List(item_dt) => match default_json {
391                Value::Array(items) => AvroLiteral::Array(
392                    items
393                        .iter()
394                        .map(|v| item_dt.parse_default_literal(v))
395                        .collect::<Result<_, _>>()?,
396                ),
397                _ => {
398                    return Err(ArrowError::SchemaError(
399                        "Default value must be a JSON array for Avro array type".to_string(),
400                    ));
401                }
402            },
403            Codec::Map(val_dt) => match default_json {
404                Value::Object(map) => {
405                    let mut out = IndexMap::with_capacity(map.len());
406                    for (k, v) in map {
407                        out.insert(k.clone(), val_dt.parse_default_literal(v)?);
408                    }
409                    AvroLiteral::Map(out)
410                }
411                _ => {
412                    return Err(ArrowError::SchemaError(
413                        "Default value must be a JSON object for Avro map type".to_string(),
414                    ));
415                }
416            },
417            Codec::Struct(fields) => match default_json {
418                Value::Object(obj) => {
419                    let mut out: IndexMap<String, AvroLiteral> =
420                        IndexMap::with_capacity(fields.len());
421                    for f in fields.as_ref() {
422                        let name = f.name().to_string();
423                        if let Some(sub) = obj.get(&name) {
424                            out.insert(name, f.data_type().parse_default_literal(sub)?);
425                        } else {
426                            // Cache metadata lookup once
427                            let stored_default =
428                                f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
429                            if stored_default.is_none()
430                                && f.data_type().nullability() == Some(Nullability::default())
431                            {
432                                out.insert(name, AvroLiteral::Null);
433                            } else if let Some(default_json) = stored_default {
434                                let v: Value =
435                                    serde_json::from_str(default_json).map_err(|e| {
436                                        ArrowError::SchemaError(format!(
437                                            "Failed to parse stored subfield default JSON for '{}': {e}",
438                                            f.name(),
439                                        ))
440                                    })?;
441                                out.insert(name, f.data_type().parse_default_literal(&v)?);
442                            } else {
443                                return Err(ArrowError::SchemaError(format!(
444                                    "Record default missing required subfield '{}' with non-nullable type {:?}",
445                                    f.name(),
446                                    f.data_type().codec()
447                                )));
448                            }
449                        }
450                    }
451                    AvroLiteral::Map(out)
452                }
453                _ => {
454                    return Err(ArrowError::SchemaError(
455                        "Default value for record/struct must be a JSON object".to_string(),
456                    ));
457                }
458            },
459            Codec::Union(encodings, _, _) => {
460                let Some(default_encoding) = encodings.first() else {
461                    return Err(ArrowError::SchemaError(
462                        "Union with no branches cannot have a default".to_string(),
463                    ));
464                };
465                default_encoding.parse_default_literal(default_json)?
466            }
467            #[cfg(feature = "avro_custom_types")]
468            Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
469        };
470        Ok(lit)
471    }
472
473    fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
474        let json_text = serde_json::to_string(default_json).map_err(|e| {
475            ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
476        })?;
477        self.metadata
478            .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
479        Ok(())
480    }
481
482    fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
483        let lit = self.parse_default_literal(default_json)?;
484        self.store_default(default_json)?;
485        Ok(lit)
486    }
487}
488
489/// A named [`AvroDataType`]
490#[derive(Debug, Clone, PartialEq)]
491pub(crate) struct AvroField {
492    name: String,
493    data_type: AvroDataType,
494}
495
496impl AvroField {
497    /// Returns the arrow [`Field`]
498    pub(crate) fn field(&self) -> Field {
499        self.data_type.field_with_name(&self.name)
500    }
501
502    /// Returns the [`AvroDataType`]
503    pub(crate) fn data_type(&self) -> &AvroDataType {
504        &self.data_type
505    }
506
507    /// Returns a new [`AvroField`] with Utf8View support enabled
508    ///
509    /// This will convert any Utf8 codecs to Utf8View codecs. This method is used to
510    /// enable potential performance optimizations in string-heavy workloads by using
511    /// Arrow's StringViewArray data structure.
512    ///
513    /// Returns a new `AvroField` with the same structure, but with string types
514    /// converted to use `Utf8View` instead of `Utf8`.
515    pub(crate) fn with_utf8view(&self) -> Self {
516        let mut field = self.clone();
517        if let Codec::Utf8 = field.data_type.codec {
518            field.data_type.codec = Codec::Utf8View;
519        }
520        field
521    }
522
523    /// Returns the name of this Avro field
524    ///
525    /// This is the field name as defined in the Avro schema.
526    /// It's used to identify fields within a record structure.
527    pub(crate) fn name(&self) -> &str {
528        &self.name
529    }
530}
531
532impl<'a> TryFrom<&Schema<'a>> for AvroField {
533    type Error = ArrowError;
534
535    fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
536        match schema {
537            Schema::Complex(ComplexType::Record(r)) => {
538                let mut resolver = Maker::new(false, false);
539                let data_type = resolver.make_data_type(schema, None, None)?;
540                Ok(AvroField {
541                    data_type,
542                    name: r.name.to_string(),
543                })
544            }
545            _ => Err(ArrowError::ParseError(format!(
546                "Expected record got {schema:?}"
547            ))),
548        }
549    }
550}
551
552/// Builder for an [`AvroField`]
553#[derive(Debug)]
554pub(crate) struct AvroFieldBuilder<'a> {
555    writer_schema: &'a Schema<'a>,
556    reader_schema: Option<&'a Schema<'a>>,
557    use_utf8view: bool,
558    strict_mode: bool,
559}
560
561impl<'a> AvroFieldBuilder<'a> {
562    /// Creates a new [`AvroFieldBuilder`] for a given writer schema.
563    pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
564        Self {
565            writer_schema,
566            reader_schema: None,
567            use_utf8view: false,
568            strict_mode: false,
569        }
570    }
571
572    /// Sets the reader schema for schema resolution.
573    ///
574    /// If a reader schema is provided, the builder will produce a resolved `AvroField`
575    /// that can handle differences between the writer's and reader's schemas.
576    #[inline]
577    pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
578        self.reader_schema = Some(reader_schema);
579        self
580    }
581
582    /// Enable or disable Utf8View support
583    pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
584        self.use_utf8view = use_utf8view;
585        self
586    }
587
588    /// Enable or disable strict mode.
589    pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
590        self.strict_mode = strict_mode;
591        self
592    }
593
594    /// Build an [`AvroField`] from the builder
595    pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
596        match self.writer_schema {
597            Schema::Complex(ComplexType::Record(r)) => {
598                let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
599                let data_type =
600                    resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
601                Ok(AvroField {
602                    name: r.name.to_string(),
603                    data_type,
604                })
605            }
606            _ => Err(ArrowError::ParseError(format!(
607                "Expected a Record schema to build an AvroField, but got {:?}",
608                self.writer_schema
609            ))),
610        }
611    }
612}
613
614/// An Avro encoding
615///
616/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
617#[derive(Debug, Clone, PartialEq)]
618pub(crate) enum Codec {
619    /// Represents Avro null type, maps to Arrow's Null data type
620    Null,
621    /// Represents Avro boolean type, maps to Arrow's Boolean data type
622    Boolean,
623    /// Represents Avro int type, maps to Arrow's Int32 data type
624    Int32,
625    /// Represents Avro long type, maps to Arrow's Int64 data type
626    Int64,
627    /// Represents Avro float type, maps to Arrow's Float32 data type
628    Float32,
629    /// Represents Avro double type, maps to Arrow's Float64 data type
630    Float64,
631    /// Represents Avro bytes type, maps to Arrow's Binary data type
632    Binary,
633    /// String data represented as UTF-8 encoded bytes, corresponding to Arrow's StringArray
634    Utf8,
635    /// String data represented as UTF-8 encoded bytes with an optimized view representation,
636    /// corresponding to Arrow's StringViewArray which provides better performance for string operations
637    ///
638    /// The Utf8View option can be enabled via `ReadOptions::use_utf8view`.
639    Utf8View,
640    /// Represents Avro date logical type, maps to Arrow's Date32 data type
641    Date32,
642    /// Represents Avro time-millis logical type, maps to Arrow's Time32(TimeUnit::Millisecond) data type
643    TimeMillis,
644    /// Represents Avro time-micros logical type, maps to Arrow's Time64(TimeUnit::Microsecond) data type
645    TimeMicros,
646    /// Represents Avro timestamp-millis or local-timestamp-millis logical type
647    ///
648    /// Maps to Arrow's Timestamp(TimeUnit::Millisecond) data type
649    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
650    TimestampMillis(bool),
651    /// Represents Avro timestamp-micros or local-timestamp-micros logical type
652    ///
653    /// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
654    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
655    TimestampMicros(bool),
656    /// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
657    ///
658    /// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
659    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
660    TimestampNanos(bool),
661    /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
662    /// The i32 parameter indicates the fixed binary size
663    Fixed(i32),
664    /// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types
665    ///
666    /// The fields are `(precision, scale, fixed_size)`.
667    /// - `precision` (`usize`): Total number of digits.
668    /// - `scale` (`Option<usize>`): Number of fractional digits.
669    /// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
670    Decimal(usize, Option<usize>, Option<usize>),
671    /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16.
672    Uuid,
673    /// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type.
674    ///
675    /// The enclosed value contains the enum's symbols.
676    Enum(Arc<[String]>),
677    /// Represents Avro array type, maps to Arrow's List data type
678    List(Arc<AvroDataType>),
679    /// Represents Avro record type, maps to Arrow's Struct data type
680    Struct(Arc<[AvroField]>),
681    /// Represents Avro map type, maps to Arrow's Map data type
682    Map(Arc<AvroDataType>),
683    /// Represents Avro duration logical type, maps to Arrow's Interval(IntervalUnit::MonthDayNano) data type
684    Interval,
685    /// Represents Avro union type, maps to Arrow's Union data type
686    Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
687    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Nanosecond)
688    #[cfg(feature = "avro_custom_types")]
689    DurationNanos,
690    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Microsecond)
691    #[cfg(feature = "avro_custom_types")]
692    DurationMicros,
693    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Millisecond)
694    #[cfg(feature = "avro_custom_types")]
695    DurationMillis,
696    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Second)
697    #[cfg(feature = "avro_custom_types")]
698    DurationSeconds,
699    #[cfg(feature = "avro_custom_types")]
700    RunEndEncoded(Arc<AvroDataType>, u8),
701}
702
703impl Codec {
704    fn data_type(&self) -> DataType {
705        match self {
706            Self::Null => DataType::Null,
707            Self::Boolean => DataType::Boolean,
708            Self::Int32 => DataType::Int32,
709            Self::Int64 => DataType::Int64,
710            Self::Float32 => DataType::Float32,
711            Self::Float64 => DataType::Float64,
712            Self::Binary => DataType::Binary,
713            Self::Utf8 => DataType::Utf8,
714            Self::Utf8View => DataType::Utf8View,
715            Self::Date32 => DataType::Date32,
716            Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
717            Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
718            Self::TimestampMillis(is_utc) => {
719                DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
720            }
721            Self::TimestampMicros(is_utc) => {
722                DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
723            }
724            Self::TimestampNanos(is_utc) => {
725                DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
726            }
727            Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
728            Self::Fixed(size) => DataType::FixedSizeBinary(*size),
729            Self::Decimal(precision, scale, _size) => {
730                let p = *precision as u8;
731                let s = scale.unwrap_or(0) as i8;
732                #[cfg(feature = "small_decimals")]
733                {
734                    if *precision <= DECIMAL32_MAX_PRECISION as usize {
735                        DataType::Decimal32(p, s)
736                    } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
737                        DataType::Decimal64(p, s)
738                    } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
739                        DataType::Decimal128(p, s)
740                    } else {
741                        DataType::Decimal256(p, s)
742                    }
743                }
744                #[cfg(not(feature = "small_decimals"))]
745                {
746                    if *precision <= DECIMAL128_MAX_PRECISION as usize {
747                        DataType::Decimal128(p, s)
748                    } else {
749                        DataType::Decimal256(p, s)
750                    }
751                }
752            }
753            Self::Uuid => DataType::FixedSizeBinary(16),
754            Self::Enum(_) => {
755                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
756            }
757            Self::List(f) => {
758                DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
759            }
760            Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
761            Self::Map(value_type) => {
762                let val_field = value_type.field_with_name("value");
763                DataType::Map(
764                    Arc::new(Field::new(
765                        "entries",
766                        DataType::Struct(Fields::from(vec![
767                            Field::new("key", DataType::Utf8, false),
768                            val_field,
769                        ])),
770                        false,
771                    )),
772                    false,
773                )
774            }
775            Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
776            #[cfg(feature = "avro_custom_types")]
777            Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
778            #[cfg(feature = "avro_custom_types")]
779            Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
780            #[cfg(feature = "avro_custom_types")]
781            Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
782            #[cfg(feature = "avro_custom_types")]
783            Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
784            #[cfg(feature = "avro_custom_types")]
785            Self::RunEndEncoded(values, bits) => {
786                let run_ends_dt = match *bits {
787                    16 => DataType::Int16,
788                    32 => DataType::Int32,
789                    64 => DataType::Int64,
790                    _ => unreachable!(),
791                };
792                DataType::RunEndEncoded(
793                    Arc::new(Field::new("run_ends", run_ends_dt, false)),
794                    Arc::new(Field::new("values", values.codec().data_type(), true)),
795                )
796            }
797        }
798    }
799
800    /// Converts a string codec to use Utf8View if requested
801    ///
802    /// The conversion only happens if both:
803    /// 1. `use_utf8view` is true
804    /// 2. The codec is currently `Utf8`
805    pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
806        if use_utf8view && matches!(self, Self::Utf8) {
807            Self::Utf8View
808        } else {
809            self
810        }
811    }
812
813    #[inline]
814    fn union_field_name(&self) -> String {
815        UnionFieldKind::from(self).as_ref().to_owned()
816    }
817}
818
819impl From<PrimitiveType> for Codec {
820    fn from(value: PrimitiveType) -> Self {
821        match value {
822            PrimitiveType::Null => Self::Null,
823            PrimitiveType::Boolean => Self::Boolean,
824            PrimitiveType::Int => Self::Int32,
825            PrimitiveType::Long => Self::Int64,
826            PrimitiveType::Float => Self::Float32,
827            PrimitiveType::Double => Self::Float64,
828            PrimitiveType::Bytes => Self::Binary,
829            PrimitiveType::String => Self::Utf8,
830        }
831    }
832}
833
834/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro
835/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian).
836///
837/// Per Avro spec (Decimal logical type), for a fixed length `n`:
838/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋.
839///
840/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports
841/// Decimal256, which is 32 bytes and has max precision 76).
842const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
843    // Precomputed exact table for n = 1..=32
844    // 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28,
845    // 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52,
846    // 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76
847    const MAX_P: [usize; 32] = [
848        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
849        59, 62, 64, 67, 69, 71, 74, 76,
850    ];
851    match n {
852        1..=32 => Some(MAX_P[n - 1]),
853        _ => None,
854    }
855}
856
857fn parse_decimal_attributes(
858    attributes: &Attributes,
859    fallback_size: Option<usize>,
860    precision_required: bool,
861) -> Result<(usize, usize, Option<usize>), ArrowError> {
862    let precision = attributes
863        .additional
864        .get("precision")
865        .and_then(|v| v.as_u64())
866        .or(if precision_required { None } else { Some(10) })
867        .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
868        as usize;
869    let scale = attributes
870        .additional
871        .get("scale")
872        .and_then(|v| v.as_u64())
873        .unwrap_or(0) as usize;
874    let size = attributes
875        .additional
876        .get("size")
877        .and_then(|v| v.as_u64())
878        .map(|s| s as usize)
879        .or(fallback_size);
880    if precision == 0 {
881        return Err(ArrowError::ParseError(
882            "Decimal requires precision > 0".to_string(),
883        ));
884    }
885    if scale > precision {
886        return Err(ArrowError::ParseError(format!(
887            "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
888        )));
889    }
890    if precision > DECIMAL256_MAX_PRECISION as usize {
891        return Err(ArrowError::ParseError(format!(
892            "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
893            DECIMAL256_MAX_PRECISION
894        )));
895    }
896    if let Some(sz) = size {
897        let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
898            ArrowError::ParseError(format!(
899                "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
900            ))
901        })?;
902        if precision > max_p {
903            return Err(ArrowError::ParseError(format!(
904                "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
905            )));
906        }
907    }
908    Ok((precision, scale, size))
909}
910
911#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
912#[strum(serialize_all = "snake_case")]
913enum UnionFieldKind {
914    Null,
915    Boolean,
916    Int,
917    Long,
918    Float,
919    Double,
920    Bytes,
921    String,
922    Date,
923    TimeMillis,
924    TimeMicros,
925    TimestampMillisUtc,
926    TimestampMillisLocal,
927    TimestampMicrosUtc,
928    TimestampMicrosLocal,
929    TimestampNanosUtc,
930    TimestampNanosLocal,
931    Duration,
932    Fixed,
933    Decimal,
934    Enum,
935    Array,
936    Record,
937    Map,
938    Uuid,
939    Union,
940}
941
942impl From<&Codec> for UnionFieldKind {
943    fn from(c: &Codec) -> Self {
944        match c {
945            Codec::Null => Self::Null,
946            Codec::Boolean => Self::Boolean,
947            Codec::Int32 => Self::Int,
948            Codec::Int64 => Self::Long,
949            Codec::Float32 => Self::Float,
950            Codec::Float64 => Self::Double,
951            Codec::Binary => Self::Bytes,
952            Codec::Utf8 | Codec::Utf8View => Self::String,
953            Codec::Date32 => Self::Date,
954            Codec::TimeMillis => Self::TimeMillis,
955            Codec::TimeMicros => Self::TimeMicros,
956            Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
957            Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
958            Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
959            Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
960            Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
961            Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
962            Codec::Interval => Self::Duration,
963            Codec::Fixed(_) => Self::Fixed,
964            Codec::Decimal(..) => Self::Decimal,
965            Codec::Enum(_) => Self::Enum,
966            Codec::List(_) => Self::Array,
967            Codec::Struct(_) => Self::Record,
968            Codec::Map(_) => Self::Map,
969            Codec::Uuid => Self::Uuid,
970            Codec::Union(..) => Self::Union,
971            #[cfg(feature = "avro_custom_types")]
972            Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
973            #[cfg(feature = "avro_custom_types")]
974            Codec::DurationNanos
975            | Codec::DurationMicros
976            | Codec::DurationMillis
977            | Codec::DurationSeconds => Self::Duration,
978        }
979    }
980}
981
982fn union_branch_name(dt: &AvroDataType) -> String {
983    if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
984        if name.contains(".") {
985            // Full name
986            return name.to_string();
987        }
988        if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
989            return format!("{ns}.{name}");
990        }
991        return name.to_string();
992    }
993    dt.codec.union_field_name()
994}
995
996fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
997    let arrow_fields: Vec<Field> = encodings
998        .iter()
999        .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1000        .collect();
1001    let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1002    UnionFields::try_new(type_ids, arrow_fields)
1003}
1004
1005/// Resolves Avro type names to [`AvroDataType`]
1006///
1007/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1008#[derive(Debug, Default)]
1009struct Resolver<'a> {
1010    map: HashMap<(&'a str, &'a str), AvroDataType>,
1011}
1012
1013impl<'a> Resolver<'a> {
1014    fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1015        self.map.insert((namespace.unwrap_or(""), name), schema);
1016    }
1017
1018    fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1019        let (namespace, name) = name
1020            .rsplit_once('.')
1021            .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1022        self.map
1023            .get(&(namespace, name))
1024            .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1025            .cloned()
1026    }
1027}
1028
1029fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1030    let mut out = HashSet::with_capacity(1 + aliases.len());
1031    let (full, _) = make_full_name(name, ns, None);
1032    out.insert(full);
1033    for a in aliases {
1034        let (fa, _) = make_full_name(a, None, ns);
1035        out.insert(fa);
1036    }
1037    out
1038}
1039
1040fn names_match(
1041    writer_name: &str,
1042    writer_namespace: Option<&str>,
1043    writer_aliases: &[&str],
1044    reader_name: &str,
1045    reader_namespace: Option<&str>,
1046    reader_aliases: &[&str],
1047) -> bool {
1048    let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1049    let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1050    // If the canonical full names match, or any alias matches cross-wise.
1051    !writer_set.is_disjoint(&reader_set)
1052}
1053
1054fn ensure_names_match(
1055    data_type: &str,
1056    writer_name: &str,
1057    writer_namespace: Option<&str>,
1058    writer_aliases: &[&str],
1059    reader_name: &str,
1060    reader_namespace: Option<&str>,
1061    reader_aliases: &[&str],
1062) -> Result<(), ArrowError> {
1063    if names_match(
1064        writer_name,
1065        writer_namespace,
1066        writer_aliases,
1067        reader_name,
1068        reader_namespace,
1069        reader_aliases,
1070    ) {
1071        Ok(())
1072    } else {
1073        Err(ArrowError::ParseError(format!(
1074            "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1075        )))
1076    }
1077}
1078
1079fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1080    match schema {
1081        Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1082        Schema::Type(Type {
1083            r#type: TypeName::Primitive(primitive),
1084            ..
1085        }) => Some(*primitive),
1086        _ => None,
1087    }
1088}
1089
1090fn nullable_union_variants<'x, 'y>(
1091    variant: &'y [Schema<'x>],
1092) -> Option<(Nullability, &'y Schema<'x>)> {
1093    if variant.len() != 2 {
1094        return None;
1095    }
1096    let is_null = |schema: &Schema<'x>| {
1097        matches!(
1098            schema,
1099            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1100        )
1101    };
1102    match (is_null(&variant[0]), is_null(&variant[1])) {
1103        (true, false) => Some((Nullability::NullFirst, &variant[1])),
1104        (false, true) => Some((Nullability::NullSecond, &variant[0])),
1105        _ => None,
1106    }
1107}
1108
1109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1110enum UnionBranchKey {
1111    Named(String),
1112    Primitive(PrimitiveType),
1113    Array,
1114    Map,
1115}
1116
1117fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1118    let (name, namespace) = match s {
1119        Schema::TypeName(TypeName::Primitive(p))
1120        | Schema::Type(Type {
1121            r#type: TypeName::Primitive(p),
1122            ..
1123        }) => return Some(UnionBranchKey::Primitive(*p)),
1124        Schema::TypeName(TypeName::Ref(name))
1125        | Schema::Type(Type {
1126            r#type: TypeName::Ref(name),
1127            ..
1128        }) => (name, None),
1129        Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1130        Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1131        Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1132        Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1133        Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1134        Schema::Union(_) => return None,
1135    };
1136    let (full, _) = make_full_name(name, namespace, enclosing_ns);
1137    Some(UnionBranchKey::Named(full))
1138}
1139
1140fn union_first_duplicate<'a>(
1141    branches: &'a [Schema<'a>],
1142    enclosing_ns: Option<&'a str>,
1143) -> Option<String> {
1144    let mut seen = HashSet::with_capacity(branches.len());
1145    for schema in branches {
1146        if let Some(key) = branch_key_of(schema, enclosing_ns) {
1147            if !seen.insert(key.clone()) {
1148                let msg = match key {
1149                    UnionBranchKey::Named(full) => format!("named type {full}"),
1150                    UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1151                    UnionBranchKey::Array => "array".to_string(),
1152                    UnionBranchKey::Map => "map".to_string(),
1153                };
1154                return Some(msg);
1155            }
1156        }
1157    }
1158    None
1159}
1160
1161/// Resolves Avro type names to [`AvroDataType`]
1162///
1163/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1164struct Maker<'a> {
1165    resolver: Resolver<'a>,
1166    use_utf8view: bool,
1167    strict_mode: bool,
1168}
1169
1170impl<'a> Maker<'a> {
1171    fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1172        Self {
1173            resolver: Default::default(),
1174            use_utf8view,
1175            strict_mode,
1176        }
1177    }
1178
1179    #[cfg(feature = "avro_custom_types")]
1180    #[inline]
1181    fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1182        if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1183            let mut inner = (*values).clone();
1184            inner.nullability = Some(nb);
1185            dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1186        }
1187    }
1188
1189    fn make_data_type<'s>(
1190        &mut self,
1191        writer_schema: &'s Schema<'a>,
1192        reader_schema: Option<&'s Schema<'a>>,
1193        namespace: Option<&'a str>,
1194    ) -> Result<AvroDataType, ArrowError> {
1195        match reader_schema {
1196            Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1197            None => self.parse_type(writer_schema, namespace),
1198        }
1199    }
1200
1201    /// Parses a [`AvroDataType`] from the provided `Schema` and the given `name` and `namespace`
1202    ///
1203    /// `name`: is the name used to refer to `schema` in its parent
1204    /// `namespace`: an optional qualifier used as part of a type hierarchy
1205    /// If the data type is a string, convert to use Utf8View if requested
1206    ///
1207    /// This function is used during the schema conversion process to determine whether
1208    /// string data should be represented as StringArray (default) or StringViewArray.
1209    ///
1210    /// `use_utf8view`: if true, use Utf8View instead of Utf8 for string types
1211    ///
1212    /// See [`Resolver`] for more information
1213    fn parse_type<'s>(
1214        &mut self,
1215        schema: &'s Schema<'a>,
1216        namespace: Option<&'a str>,
1217    ) -> Result<AvroDataType, ArrowError> {
1218        match schema {
1219            Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1220                Codec::from(*p).with_utf8view(self.use_utf8view),
1221                Default::default(),
1222                None,
1223            )),
1224            Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1225            Schema::Union(f) => {
1226                let null = f
1227                    .iter()
1228                    .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1229                match (f.len() == 2, null) {
1230                    (true, Some(0)) => {
1231                        let mut field = self.parse_type(&f[1], namespace)?;
1232                        field.nullability = Some(Nullability::NullFirst);
1233                        #[cfg(feature = "avro_custom_types")]
1234                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1235                        return Ok(field);
1236                    }
1237                    (true, Some(1)) => {
1238                        if self.strict_mode {
1239                            return Err(ArrowError::SchemaError(
1240                                "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1241                                    .to_string(),
1242                            ));
1243                        }
1244                        let mut field = self.parse_type(&f[0], namespace)?;
1245                        field.nullability = Some(Nullability::NullSecond);
1246                        #[cfg(feature = "avro_custom_types")]
1247                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1248                        return Ok(field);
1249                    }
1250                    _ => {}
1251                }
1252                // Validate: unions may not immediately contain unions
1253                if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1254                    return Err(ArrowError::SchemaError(
1255                        "Avro unions may not immediately contain other unions".to_string(),
1256                    ));
1257                }
1258                // Validate: duplicates (named by full name; non-named by kind)
1259                if let Some(dup) = union_first_duplicate(f, namespace) {
1260                    return Err(ArrowError::SchemaError(format!(
1261                        "Avro union contains duplicate branch type: {dup}"
1262                    )));
1263                }
1264                // Parse all branches
1265                let children: Vec<AvroDataType> = f
1266                    .iter()
1267                    .map(|s| self.parse_type(s, namespace))
1268                    .collect::<Result<_, _>>()?;
1269                // Build Arrow layout once here
1270                let union_fields = build_union_fields(&children)?;
1271                Ok(AvroDataType::new(
1272                    Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1273                    Default::default(),
1274                    None,
1275                ))
1276            }
1277            Schema::Complex(c) => match c {
1278                ComplexType::Record(r) => {
1279                    let namespace = r.namespace.or(namespace);
1280                    let mut metadata = r.attributes.field_metadata();
1281                    let fields = r
1282                        .fields
1283                        .iter()
1284                        .map(|field| {
1285                            Ok(AvroField {
1286                                name: field.name.to_string(),
1287                                data_type: self.parse_type(&field.r#type, namespace)?,
1288                            })
1289                        })
1290                        .collect::<Result<_, ArrowError>>()?;
1291                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1292                    if let Some(ns) = namespace {
1293                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1294                    }
1295                    let field = AvroDataType {
1296                        nullability: None,
1297                        codec: Codec::Struct(fields),
1298                        metadata,
1299                        resolution: None,
1300                    };
1301                    self.resolver.register(r.name, namespace, field.clone());
1302                    Ok(field)
1303                }
1304                ComplexType::Array(a) => {
1305                    let field = self.parse_type(a.items.as_ref(), namespace)?;
1306                    Ok(AvroDataType {
1307                        nullability: None,
1308                        metadata: a.attributes.field_metadata(),
1309                        codec: Codec::List(Arc::new(field)),
1310                        resolution: None,
1311                    })
1312                }
1313                ComplexType::Fixed(f) => {
1314                    let size = f.size.try_into().map_err(|e| {
1315                        ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1316                    })?;
1317                    let namespace = f.namespace.or(namespace);
1318                    let mut metadata = f.attributes.field_metadata();
1319                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1320                    if let Some(ns) = namespace {
1321                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1322                    }
1323                    let field = match f.attributes.logical_type {
1324                        Some("decimal") => {
1325                            let (precision, scale, _) =
1326                                parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1327                            AvroDataType {
1328                                nullability: None,
1329                                metadata,
1330                                codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1331                                resolution: None,
1332                            }
1333                        }
1334                        Some("duration") => {
1335                            if size != 12 {
1336                                return Err(ArrowError::ParseError(format!(
1337                                    "Invalid fixed size for Duration: {size}, must be 12"
1338                                )));
1339                            };
1340                            AvroDataType {
1341                                nullability: None,
1342                                metadata,
1343                                codec: Codec::Interval,
1344                                resolution: None,
1345                            }
1346                        }
1347                        _ => AvroDataType {
1348                            nullability: None,
1349                            metadata,
1350                            codec: Codec::Fixed(size),
1351                            resolution: None,
1352                        },
1353                    };
1354                    self.resolver.register(f.name, namespace, field.clone());
1355                    Ok(field)
1356                }
1357                ComplexType::Enum(e) => {
1358                    let namespace = e.namespace.or(namespace);
1359                    let symbols = e
1360                        .symbols
1361                        .iter()
1362                        .map(|s| s.to_string())
1363                        .collect::<Arc<[String]>>();
1364                    let mut metadata = e.attributes.field_metadata();
1365                    let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1366                        ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1367                    })?;
1368                    metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1369                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1370                    if let Some(ns) = namespace {
1371                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1372                    }
1373                    let field = AvroDataType {
1374                        nullability: None,
1375                        metadata,
1376                        codec: Codec::Enum(symbols),
1377                        resolution: None,
1378                    };
1379                    self.resolver.register(e.name, namespace, field.clone());
1380                    Ok(field)
1381                }
1382                ComplexType::Map(m) => {
1383                    let val = self.parse_type(&m.values, namespace)?;
1384                    Ok(AvroDataType {
1385                        nullability: None,
1386                        metadata: m.attributes.field_metadata(),
1387                        codec: Codec::Map(Arc::new(val)),
1388                        resolution: None,
1389                    })
1390                }
1391            },
1392            Schema::Type(t) => {
1393                let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1394                // https://avro.apache.org/docs/1.11.1/specification/#logical-types
1395                match (t.attributes.logical_type, &mut field.codec) {
1396                    (Some("decimal"), c @ Codec::Binary) => {
1397                        let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1398                        *c = Codec::Decimal(prec, Some(sc), None);
1399                    }
1400                    (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1401                    (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1402                    (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1403                    (Some("timestamp-millis"), c @ Codec::Int64) => {
1404                        *c = Codec::TimestampMillis(true)
1405                    }
1406                    (Some("timestamp-micros"), c @ Codec::Int64) => {
1407                        *c = Codec::TimestampMicros(true)
1408                    }
1409                    (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1410                        *c = Codec::TimestampMillis(false)
1411                    }
1412                    (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1413                        *c = Codec::TimestampMicros(false)
1414                    }
1415                    (Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
1416                    (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1417                        *c = Codec::TimestampNanos(false)
1418                    }
1419                    (Some("uuid"), c @ Codec::Utf8) => {
1420                        // Map Avro string+logicalType=uuid into the UUID Codec,
1421                        // and preserve the logicalType in Arrow field metadata
1422                        // so writers can round-trip it correctly.
1423                        *c = Codec::Uuid;
1424                        field.metadata.insert("logicalType".into(), "uuid".into());
1425                    }
1426                    #[cfg(feature = "avro_custom_types")]
1427                    (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1428                    #[cfg(feature = "avro_custom_types")]
1429                    (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1430                    #[cfg(feature = "avro_custom_types")]
1431                    (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1432                    #[cfg(feature = "avro_custom_types")]
1433                    (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1434                        *c = Codec::DurationSeconds
1435                    }
1436                    #[cfg(feature = "avro_custom_types")]
1437                    (Some("arrow.run-end-encoded"), _) => {
1438                        let bits_u8: u8 = t
1439                            .attributes
1440                            .additional
1441                            .get("arrow.runEndIndexBits")
1442                            .and_then(|v| v.as_u64())
1443                            .and_then(|n| u8::try_from(n).ok())
1444                            .ok_or_else(|| ArrowError::ParseError(
1445                                "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1446                                    .to_string(),
1447                            ))?;
1448                        if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1449                            return Err(ArrowError::ParseError(format!(
1450                                "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1451                            )));
1452                        }
1453                        // Wrap the parsed underlying site as REE
1454                        let values_site = field.clone();
1455                        field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1456                    }
1457                    (Some(logical), _) => {
1458                        // Insert unrecognized logical type into metadata map
1459                        field.metadata.insert("logicalType".into(), logical.into());
1460                    }
1461                    (None, _) => {}
1462                }
1463                if matches!(field.codec, Codec::Int64) {
1464                    if let Some(unit) = t
1465                        .attributes
1466                        .additional
1467                        .get("arrowTimeUnit")
1468                        .and_then(|v| v.as_str())
1469                    {
1470                        if unit == "nanosecond" {
1471                            field.codec = Codec::TimestampNanos(false);
1472                        }
1473                    }
1474                }
1475                if !t.attributes.additional.is_empty() {
1476                    for (k, v) in &t.attributes.additional {
1477                        field.metadata.insert(k.to_string(), v.to_string());
1478                    }
1479                }
1480                Ok(field)
1481            }
1482        }
1483    }
1484
1485    fn resolve_type<'s>(
1486        &mut self,
1487        writer_schema: &'s Schema<'a>,
1488        reader_schema: &'s Schema<'a>,
1489        namespace: Option<&'a str>,
1490    ) -> Result<AvroDataType, ArrowError> {
1491        if let (Some(write_primitive), Some(read_primitive)) =
1492            (primitive_of(writer_schema), primitive_of(reader_schema))
1493        {
1494            return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1495        }
1496        match (writer_schema, reader_schema) {
1497            (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1498                let writer_variants = writer_variants.as_slice();
1499                let reader_variants = reader_variants.as_slice();
1500                match (
1501                    nullable_union_variants(writer_variants),
1502                    nullable_union_variants(reader_variants),
1503                ) {
1504                    (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
1505                        let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?;
1506                        dt.nullability = Some(w_nb);
1507                        #[cfg(feature = "avro_custom_types")]
1508                        Self::propagate_nullability_into_ree(&mut dt, w_nb);
1509                        Ok(dt)
1510                    }
1511                    _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1512                }
1513            }
1514            (Schema::Union(writer_variants), reader_non_union) => {
1515                let writer_to_reader: Vec<Option<(usize, Promotion)>> = writer_variants
1516                    .iter()
1517                    .map(|writer| {
1518                        self.resolve_type(writer, reader_non_union, namespace)
1519                            .ok()
1520                            .map(|tmp| (0usize, Self::coercion_from(&tmp)))
1521                    })
1522                    .collect();
1523                let mut dt = self.parse_type(reader_non_union, namespace)?;
1524                dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1525                    writer_to_reader: Arc::from(writer_to_reader),
1526                    writer_is_union: true,
1527                    reader_is_union: false,
1528                }));
1529                Ok(dt)
1530            }
1531            (writer_non_union, Schema::Union(reader_variants)) => {
1532                if let Some((nullability, non_null_branch)) =
1533                    nullable_union_variants(reader_variants)
1534                {
1535                    let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1536                    let non_null_idx = match nullability {
1537                        Nullability::NullFirst => 1,
1538                        Nullability::NullSecond => 0,
1539                    };
1540                    #[cfg(feature = "avro_custom_types")]
1541                    Self::propagate_nullability_into_ree(&mut dt, nullability);
1542                    dt.nullability = Some(nullability);
1543                    let promotion = Self::coercion_from(&dt);
1544                    dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1545                        writer_to_reader: Arc::from(vec![Some((non_null_idx, promotion))]),
1546                        writer_is_union: false,
1547                        reader_is_union: true,
1548                    }));
1549                    Ok(dt)
1550                } else {
1551                    let mut best_match: Option<(usize, AvroDataType, Promotion)> = None;
1552                    for (i, variant) in reader_variants.iter().enumerate() {
1553                        if let Ok(resolved_dt) =
1554                            self.resolve_type(writer_non_union, variant, namespace)
1555                        {
1556                            let promotion = Self::coercion_from(&resolved_dt);
1557                            if promotion == Promotion::Direct {
1558                                best_match = Some((i, resolved_dt, promotion));
1559                                break;
1560                            } else if best_match.is_none() {
1561                                best_match = Some((i, resolved_dt, promotion));
1562                            }
1563                        }
1564                    }
1565                    let Some((match_idx, match_dt, promotion)) = best_match else {
1566                        return Err(ArrowError::SchemaError(
1567                            "Writer schema does not match any reader union branch".to_string(),
1568                        ));
1569                    };
1570                    let mut children = Vec::with_capacity(reader_variants.len());
1571                    let mut match_dt = Some(match_dt);
1572                    for (i, variant) in reader_variants.iter().enumerate() {
1573                        if i == match_idx {
1574                            if let Some(mut dt) = match_dt.take() {
1575                                if matches!(dt.resolution, Some(ResolutionInfo::Promotion(_))) {
1576                                    dt.resolution = None;
1577                                }
1578                                children.push(dt);
1579                            }
1580                        } else {
1581                            children.push(self.parse_type(variant, namespace)?);
1582                        }
1583                    }
1584                    let union_fields = build_union_fields(&children)?;
1585                    let mut dt = AvroDataType::new(
1586                        Codec::Union(children.into(), union_fields, UnionMode::Dense),
1587                        Default::default(),
1588                        None,
1589                    );
1590                    dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1591                        writer_to_reader: Arc::from(vec![Some((match_idx, promotion))]),
1592                        writer_is_union: false,
1593                        reader_is_union: true,
1594                    }));
1595                    Ok(dt)
1596                }
1597            }
1598            (
1599                Schema::Complex(ComplexType::Array(writer_array)),
1600                Schema::Complex(ComplexType::Array(reader_array)),
1601            ) => self.resolve_array(writer_array, reader_array, namespace),
1602            (
1603                Schema::Complex(ComplexType::Map(writer_map)),
1604                Schema::Complex(ComplexType::Map(reader_map)),
1605            ) => self.resolve_map(writer_map, reader_map, namespace),
1606            (
1607                Schema::Complex(ComplexType::Fixed(writer_fixed)),
1608                Schema::Complex(ComplexType::Fixed(reader_fixed)),
1609            ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1610            (
1611                Schema::Complex(ComplexType::Record(writer_record)),
1612                Schema::Complex(ComplexType::Record(reader_record)),
1613            ) => self.resolve_records(writer_record, reader_record, namespace),
1614            (
1615                Schema::Complex(ComplexType::Enum(writer_enum)),
1616                Schema::Complex(ComplexType::Enum(reader_enum)),
1617            ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1618            (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1619            (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1620            _ => Err(ArrowError::NotYetImplemented(
1621                "Other resolutions not yet implemented".to_string(),
1622            )),
1623        }
1624    }
1625
1626    #[inline]
1627    fn coercion_from(dt: &AvroDataType) -> Promotion {
1628        match dt.resolution.as_ref() {
1629            Some(ResolutionInfo::Promotion(promotion)) => *promotion,
1630            _ => Promotion::Direct,
1631        }
1632    }
1633
1634    fn find_best_promotion(
1635        &mut self,
1636        writer: &Schema<'a>,
1637        reader_variants: &[Schema<'a>],
1638        namespace: Option<&'a str>,
1639    ) -> Option<(usize, Promotion)> {
1640        let mut first_promotion: Option<(usize, Promotion)> = None;
1641        for (reader_index, reader) in reader_variants.iter().enumerate() {
1642            if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
1643                let promotion = Self::coercion_from(&tmp);
1644                if promotion == Promotion::Direct {
1645                    // An exact match is best, return immediately.
1646                    return Some((reader_index, promotion));
1647                } else if first_promotion.is_none() {
1648                    // Store the first valid promotion but keep searching for a direct match.
1649                    first_promotion = Some((reader_index, promotion));
1650                }
1651            }
1652        }
1653        first_promotion
1654    }
1655
1656    fn resolve_unions<'s>(
1657        &mut self,
1658        writer_variants: &'s [Schema<'a>],
1659        reader_variants: &'s [Schema<'a>],
1660        namespace: Option<&'a str>,
1661    ) -> Result<AvroDataType, ArrowError> {
1662        let reader_encodings: Vec<AvroDataType> = reader_variants
1663            .iter()
1664            .map(|reader_schema| self.parse_type(reader_schema, namespace))
1665            .collect::<Result<_, _>>()?;
1666        let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
1667            Vec::with_capacity(writer_variants.len());
1668        for writer in writer_variants {
1669            writer_to_reader.push(self.find_best_promotion(writer, reader_variants, namespace));
1670        }
1671        let union_fields = build_union_fields(&reader_encodings)?;
1672        let mut dt = AvroDataType::new(
1673            Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1674            Default::default(),
1675            None,
1676        );
1677        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1678            writer_to_reader: Arc::from(writer_to_reader),
1679            writer_is_union: true,
1680            reader_is_union: true,
1681        }));
1682        Ok(dt)
1683    }
1684
1685    fn resolve_array(
1686        &mut self,
1687        writer_array: &Array<'a>,
1688        reader_array: &Array<'a>,
1689        namespace: Option<&'a str>,
1690    ) -> Result<AvroDataType, ArrowError> {
1691        Ok(AvroDataType {
1692            nullability: None,
1693            metadata: reader_array.attributes.field_metadata(),
1694            codec: Codec::List(Arc::new(self.make_data_type(
1695                writer_array.items.as_ref(),
1696                Some(reader_array.items.as_ref()),
1697                namespace,
1698            )?)),
1699            resolution: None,
1700        })
1701    }
1702
1703    fn resolve_map(
1704        &mut self,
1705        writer_map: &Map<'a>,
1706        reader_map: &Map<'a>,
1707        namespace: Option<&'a str>,
1708    ) -> Result<AvroDataType, ArrowError> {
1709        Ok(AvroDataType {
1710            nullability: None,
1711            metadata: reader_map.attributes.field_metadata(),
1712            codec: Codec::Map(Arc::new(self.make_data_type(
1713                &writer_map.values,
1714                Some(&reader_map.values),
1715                namespace,
1716            )?)),
1717            resolution: None,
1718        })
1719    }
1720
1721    fn resolve_fixed<'s>(
1722        &mut self,
1723        writer_fixed: &Fixed<'a>,
1724        reader_fixed: &Fixed<'a>,
1725        reader_schema: &'s Schema<'a>,
1726        namespace: Option<&'a str>,
1727    ) -> Result<AvroDataType, ArrowError> {
1728        ensure_names_match(
1729            "Fixed",
1730            writer_fixed.name,
1731            writer_fixed.namespace,
1732            &writer_fixed.aliases,
1733            reader_fixed.name,
1734            reader_fixed.namespace,
1735            &reader_fixed.aliases,
1736        )?;
1737        if writer_fixed.size != reader_fixed.size {
1738            return Err(ArrowError::SchemaError(format!(
1739                "Fixed size mismatch for {}: writer={}, reader={}",
1740                reader_fixed.name, writer_fixed.size, reader_fixed.size
1741            )));
1742        }
1743        self.parse_type(reader_schema, namespace)
1744    }
1745
1746    fn resolve_primitives(
1747        &mut self,
1748        write_primitive: PrimitiveType,
1749        read_primitive: PrimitiveType,
1750        reader_schema: &Schema<'a>,
1751    ) -> Result<AvroDataType, ArrowError> {
1752        if write_primitive == read_primitive {
1753            return self.parse_type(reader_schema, None);
1754        }
1755        let promotion = match (write_primitive, read_primitive) {
1756            (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
1757            (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
1758            (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
1759            (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
1760            (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
1761            (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
1762            (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
1763            (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
1764            _ => {
1765                return Err(ArrowError::ParseError(format!(
1766                    "Illegal promotion {write_primitive:?} to {read_primitive:?}"
1767                )));
1768            }
1769        };
1770        let mut datatype = self.parse_type(reader_schema, None)?;
1771        datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
1772        Ok(datatype)
1773    }
1774
1775    // Resolve writer vs. reader enum schemas according to Avro 1.11.1.
1776    //
1777    // # How enums resolve (writer to reader)
1778    // Per “Schema Resolution”:
1779    // * The two schemas must refer to the same (unqualified) enum name (or match
1780    //   via alias rewriting).
1781    // * If the writer’s symbol is not present in the reader’s enum and the reader
1782    //   enum has a `default`, that `default` symbol must be used; otherwise,
1783    //   error.
1784    //   https://avro.apache.org/docs/1.11.1/specification/#schema-resolution
1785    // * Avro “Aliases” are applied from the reader side to rewrite the writer’s
1786    //   names during resolution. For robustness across ecosystems, we also accept
1787    //   symmetry here (see note below).
1788    //   https://avro.apache.org/docs/1.11.1/specification/#aliases
1789    //
1790    // # Rationale for this code path
1791    // 1. Do the work once at schema‑resolution time. Avro serializes an enum as a
1792    //    writer‑side position. Mapping positions on the hot decoder path is expensive
1793    //    if done with string lookups. This method builds a `writer_index to reader_index`
1794    //    vector once, so decoding just does an O(1) table lookup.
1795    // 2. Adopt the reader’s symbol set and order. We return an Arrow
1796    //    `Dictionary(Int32, Utf8)` whose dictionary values are the reader enum
1797    //    symbols. This makes downstream semantics match the reader schema, including
1798    //    Avro’s sort order rule that orders enums by symbol position in the schema.
1799    //    https://avro.apache.org/docs/1.11.1/specification/#sort-order
1800    // 3. Honor Avro’s `default` for enums. Avro 1.9+ allows a type‑level default
1801    //    on the enum. When the writer emits a symbol unknown to the reader, we map it
1802    //    to the reader’s validated `default` symbol if present; otherwise we signal an
1803    //    error at decoding time.
1804    //    https://avro.apache.org/docs/1.11.1/specification/#enums
1805    //
1806    // # Implementation notes
1807    // * We first check that enum names match or are*alias‑equivalent. The Avro
1808    //   spec describes alias rewriting using reader aliases; this implementation
1809    //   additionally treats writer aliases as acceptable for name matching to be
1810    //   resilient with schemas produced by different tooling.
1811    // * We build `EnumMapping`:
1812    //   - `mapping[i]` = reader index of the writer symbol at writer index `i`.
1813    //   - If the writer symbol is absent and the reader has a default, we store the
1814    //     reader index of that default.
1815    //   - Otherwise we store `-1` as a sentinel meaning unresolvable; the decoder
1816    //     must treat encountering such a value as an error, per the spec.
1817    // * We persist the reader symbol list in field metadata under
1818    //   `AVRO_ENUM_SYMBOLS_METADATA_KEY`, so consumers can inspect the dictionary
1819    //   without needing the original Avro schema.
1820    // * The Arrow representation is `Dictionary(Int32, Utf8)`, which aligns with
1821    //   Avro’s integer index encoding for enums.
1822    //
1823    // # Examples
1824    // * Writer `["A","B","C"]`, Reader `["A","B"]`, Reader default `"A"`
1825    //     `mapping = [0, 1, 0]`, `default_index = 0`.
1826    // * Writer `["A","B"]`, Reader `["B","A"]` (no default)
1827    //     `mapping = [1, 0]`, `default_index = -1`.
1828    // * Writer `["A","B","C"]`, Reader `["A","B"]` (no default)
1829    //     `mapping = [0, 1, -1]` (decode must error on `"C"`).
1830    fn resolve_enums(
1831        &mut self,
1832        writer_enum: &Enum<'a>,
1833        reader_enum: &Enum<'a>,
1834        reader_schema: &Schema<'a>,
1835        namespace: Option<&'a str>,
1836    ) -> Result<AvroDataType, ArrowError> {
1837        ensure_names_match(
1838            "Enum",
1839            writer_enum.name,
1840            writer_enum.namespace,
1841            &writer_enum.aliases,
1842            reader_enum.name,
1843            reader_enum.namespace,
1844            &reader_enum.aliases,
1845        )?;
1846        if writer_enum.symbols == reader_enum.symbols {
1847            return self.parse_type(reader_schema, namespace);
1848        }
1849        let reader_index: HashMap<&str, i32> = reader_enum
1850            .symbols
1851            .iter()
1852            .enumerate()
1853            .map(|(index, &symbol)| (symbol, index as i32))
1854            .collect();
1855        let default_index: i32 = match reader_enum.default {
1856            Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
1857                ArrowError::SchemaError(format!(
1858                    "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
1859                    reader_enum.name,
1860                ))
1861            })?,
1862            None => -1,
1863        };
1864        let mapping: Vec<i32> = writer_enum
1865            .symbols
1866            .iter()
1867            .map(|&write_symbol| {
1868                reader_index
1869                    .get(write_symbol)
1870                    .copied()
1871                    .unwrap_or(default_index)
1872            })
1873            .collect();
1874        if self.strict_mode && mapping.iter().any(|&m| m < 0) {
1875            return Err(ArrowError::SchemaError(format!(
1876                "Reader enum '{}' does not cover all writer symbols and no default is provided",
1877                reader_enum.name
1878            )));
1879        }
1880        let mut dt = self.parse_type(reader_schema, namespace)?;
1881        dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
1882            mapping: Arc::from(mapping),
1883            default_index,
1884        }));
1885        let reader_ns = reader_enum.namespace.or(namespace);
1886        self.resolver
1887            .register(reader_enum.name, reader_ns, dt.clone());
1888        Ok(dt)
1889    }
1890
1891    #[inline]
1892    fn build_writer_lookup(
1893        writer_record: &Record<'a>,
1894    ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
1895        let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
1896        for (idx, wf) in writer_record.fields.iter().enumerate() {
1897            // Avro field names are unique; last-in wins are acceptable and match previous behavior.
1898            map.insert(wf.name, idx);
1899        }
1900        // Track ambiguous writer aliases (alias used by multiple writer fields)
1901        let mut ambiguous: HashSet<&str> = HashSet::new();
1902        for (idx, wf) in writer_record.fields.iter().enumerate() {
1903            for &alias in &wf.aliases {
1904                match map.entry(alias) {
1905                    Entry::Occupied(e) if *e.get() != idx => {
1906                        ambiguous.insert(alias);
1907                    }
1908                    Entry::Vacant(e) => {
1909                        e.insert(idx);
1910                    }
1911                    _ => {}
1912                }
1913            }
1914        }
1915        (map, ambiguous)
1916    }
1917
1918    fn resolve_records(
1919        &mut self,
1920        writer_record: &Record<'a>,
1921        reader_record: &Record<'a>,
1922        namespace: Option<&'a str>,
1923    ) -> Result<AvroDataType, ArrowError> {
1924        ensure_names_match(
1925            "Record",
1926            writer_record.name,
1927            writer_record.namespace,
1928            &writer_record.aliases,
1929            reader_record.name,
1930            reader_record.namespace,
1931            &reader_record.aliases,
1932        )?;
1933        let writer_ns = writer_record.namespace.or(namespace);
1934        let reader_ns = reader_record.namespace.or(namespace);
1935        let reader_md = reader_record.attributes.field_metadata();
1936        // Build writer lookup and ambiguous alias set.
1937        let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
1938        let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
1939        let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
1940        // Capture default field indices during the main loop (one pass).
1941        let mut default_fields: Vec<usize> = Vec::new();
1942        for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
1943            // Direct name match, then reader aliases (a writer alias map is pre-populated).
1944            let mut match_idx = writer_lookup.get(r_field.name).copied();
1945            let mut matched_via_alias: Option<&str> = None;
1946            if match_idx.is_none() {
1947                for &alias in &r_field.aliases {
1948                    if let Some(i) = writer_lookup.get(alias).copied() {
1949                        if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
1950                            return Err(ArrowError::SchemaError(format!(
1951                                "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
1952                                r_field.name
1953                            )));
1954                        }
1955                        match_idx = Some(i);
1956                        matched_via_alias = Some(alias);
1957                        break;
1958                    }
1959                }
1960            }
1961            if let Some(wi) = match_idx {
1962                if writer_to_reader[wi].is_none() {
1963                    let w_schema = &writer_record.fields[wi].r#type;
1964                    let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
1965                    writer_to_reader[wi] = Some(reader_idx);
1966                    reader_fields.push(AvroField {
1967                        name: r_field.name.to_owned(),
1968                        data_type: dt,
1969                    });
1970                    continue;
1971                } else if self.strict_mode {
1972                    // Writer field already mapped and strict_mode => error
1973                    let existing_reader = writer_to_reader[wi].unwrap();
1974                    let via = matched_via_alias
1975                        .map(|a| format!("alias '{a}'"))
1976                        .unwrap_or_else(|| "name match".to_string());
1977                    return Err(ArrowError::SchemaError(format!(
1978                        "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
1979                        writer_record.fields[wi].name
1980                    )));
1981                }
1982                // Non-strict and already mapped -> fall through to defaulting logic
1983            }
1984            // No match (or conflicted in non-strict mode): attach default per Avro spec.
1985            let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
1986            if let Some(default_json) = r_field.default.as_ref() {
1987                dt.resolution = Some(ResolutionInfo::DefaultValue(
1988                    dt.parse_and_store_default(default_json)?,
1989                ));
1990                default_fields.push(reader_idx);
1991            } else if dt.nullability() == Some(Nullability::NullFirst) {
1992                // The only valid implicit default for a union is the first branch (null-first case).
1993                dt.resolution = Some(ResolutionInfo::DefaultValue(
1994                    dt.parse_and_store_default(&Value::Null)?,
1995                ));
1996                default_fields.push(reader_idx);
1997            } else {
1998                return Err(ArrowError::SchemaError(format!(
1999                    "Reader field '{}' not present in writer schema must have a default value",
2000                    r_field.name
2001                )));
2002            }
2003            reader_fields.push(AvroField {
2004                name: r_field.name.to_owned(),
2005                data_type: dt,
2006            });
2007        }
2008        // Build skip_fields in writer order; pre-size and push.
2009        let mut skip_fields: Vec<Option<AvroDataType>> =
2010            Vec::with_capacity(writer_record.fields.len());
2011        for (writer_index, writer_field) in writer_record.fields.iter().enumerate() {
2012            if writer_to_reader[writer_index].is_some() {
2013                skip_fields.push(None);
2014            } else {
2015                skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?));
2016            }
2017        }
2018        let resolved = AvroDataType::new_with_resolution(
2019            Codec::Struct(Arc::from(reader_fields)),
2020            reader_md,
2021            None,
2022            Some(ResolutionInfo::Record(ResolvedRecord {
2023                writer_to_reader: Arc::from(writer_to_reader),
2024                default_fields: Arc::from(default_fields),
2025                skip_fields: Arc::from(skip_fields),
2026            })),
2027        );
2028        // Register a resolved record by reader name+namespace for potential named type refs.
2029        self.resolver
2030            .register(reader_record.name, reader_ns, resolved.clone());
2031        Ok(resolved)
2032    }
2033}
2034
2035#[cfg(test)]
2036mod tests {
2037    use super::*;
2038    use crate::schema::{
2039        AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2040        Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2041    };
2042    use indexmap::IndexMap;
2043    use serde_json::{self, Value};
2044
2045    fn create_schema_with_logical_type(
2046        primitive_type: PrimitiveType,
2047        logical_type: &'static str,
2048    ) -> Schema<'static> {
2049        let attributes = Attributes {
2050            logical_type: Some(logical_type),
2051            additional: Default::default(),
2052        };
2053
2054        Schema::Type(Type {
2055            r#type: TypeName::Primitive(primitive_type),
2056            attributes,
2057        })
2058    }
2059
2060    fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2061        let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2062        let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2063        let mut maker = Maker::new(false, false);
2064        maker
2065            .make_data_type(&writer_schema, Some(&reader_schema), None)
2066            .expect("promotion should resolve")
2067    }
2068
2069    fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2070        Schema::TypeName(TypeName::Primitive(pt))
2071    }
2072    fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2073        Schema::Union(branches)
2074    }
2075
2076    #[test]
2077    fn test_date_logical_type() {
2078        let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2079
2080        let mut maker = Maker::new(false, false);
2081        let result = maker.make_data_type(&schema, None, None).unwrap();
2082
2083        assert!(matches!(result.codec, Codec::Date32));
2084    }
2085
2086    #[test]
2087    fn test_time_millis_logical_type() {
2088        let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2089
2090        let mut maker = Maker::new(false, false);
2091        let result = maker.make_data_type(&schema, None, None).unwrap();
2092
2093        assert!(matches!(result.codec, Codec::TimeMillis));
2094    }
2095
2096    #[test]
2097    fn test_time_micros_logical_type() {
2098        let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2099
2100        let mut maker = Maker::new(false, false);
2101        let result = maker.make_data_type(&schema, None, None).unwrap();
2102
2103        assert!(matches!(result.codec, Codec::TimeMicros));
2104    }
2105
2106    #[test]
2107    fn test_timestamp_millis_logical_type() {
2108        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2109
2110        let mut maker = Maker::new(false, false);
2111        let result = maker.make_data_type(&schema, None, None).unwrap();
2112
2113        assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2114    }
2115
2116    #[test]
2117    fn test_timestamp_micros_logical_type() {
2118        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2119
2120        let mut maker = Maker::new(false, false);
2121        let result = maker.make_data_type(&schema, None, None).unwrap();
2122
2123        assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2124    }
2125
2126    #[test]
2127    fn test_local_timestamp_millis_logical_type() {
2128        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2129
2130        let mut maker = Maker::new(false, false);
2131        let result = maker.make_data_type(&schema, None, None).unwrap();
2132
2133        assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2134    }
2135
2136    #[test]
2137    fn test_local_timestamp_micros_logical_type() {
2138        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2139
2140        let mut maker = Maker::new(false, false);
2141        let result = maker.make_data_type(&schema, None, None).unwrap();
2142
2143        assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2144    }
2145
2146    #[test]
2147    fn test_uuid_type() {
2148        let mut codec = Codec::Fixed(16);
2149        if let c @ Codec::Fixed(16) = &mut codec {
2150            *c = Codec::Uuid;
2151        }
2152        assert!(matches!(codec, Codec::Uuid));
2153    }
2154
2155    #[test]
2156    fn test_duration_logical_type() {
2157        let mut codec = Codec::Fixed(12);
2158
2159        if let c @ Codec::Fixed(12) = &mut codec {
2160            *c = Codec::Interval;
2161        }
2162
2163        assert!(matches!(codec, Codec::Interval));
2164    }
2165
2166    #[test]
2167    fn test_decimal_logical_type_not_implemented() {
2168        let codec = Codec::Fixed(16);
2169
2170        let process_decimal = || -> Result<(), ArrowError> {
2171            if let Codec::Fixed(_) = codec {
2172                return Err(ArrowError::NotYetImplemented(
2173                    "Decimals are not currently supported".to_string(),
2174                ));
2175            }
2176            Ok(())
2177        };
2178
2179        let result = process_decimal();
2180
2181        assert!(result.is_err());
2182        if let Err(ArrowError::NotYetImplemented(msg)) = result {
2183            assert!(msg.contains("Decimals are not currently supported"));
2184        } else {
2185            panic!("Expected NotYetImplemented error");
2186        }
2187    }
2188    #[test]
2189    fn test_unknown_logical_type_added_to_metadata() {
2190        let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2191
2192        let mut maker = Maker::new(false, false);
2193        let result = maker.make_data_type(&schema, None, None).unwrap();
2194
2195        assert_eq!(
2196            result.metadata.get("logicalType"),
2197            Some(&"custom-type".to_string())
2198        );
2199    }
2200
2201    #[test]
2202    fn test_string_with_utf8view_enabled() {
2203        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2204
2205        let mut maker = Maker::new(true, false);
2206        let result = maker.make_data_type(&schema, None, None).unwrap();
2207
2208        assert!(matches!(result.codec, Codec::Utf8View));
2209    }
2210
2211    #[test]
2212    fn test_string_without_utf8view_enabled() {
2213        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2214
2215        let mut maker = Maker::new(false, false);
2216        let result = maker.make_data_type(&schema, None, None).unwrap();
2217
2218        assert!(matches!(result.codec, Codec::Utf8));
2219    }
2220
2221    #[test]
2222    fn test_record_with_string_and_utf8view_enabled() {
2223        let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2224
2225        let avro_field = crate::schema::Field {
2226            name: "string_field",
2227            r#type: field_schema,
2228            default: None,
2229            doc: None,
2230            aliases: vec![],
2231        };
2232
2233        let record = Record {
2234            name: "test_record",
2235            namespace: None,
2236            aliases: vec![],
2237            doc: None,
2238            fields: vec![avro_field],
2239            attributes: Attributes::default(),
2240        };
2241
2242        let schema = Schema::Complex(ComplexType::Record(record));
2243
2244        let mut maker = Maker::new(true, false);
2245        let result = maker.make_data_type(&schema, None, None).unwrap();
2246
2247        if let Codec::Struct(fields) = &result.codec {
2248            let first_field_codec = &fields[0].data_type().codec;
2249            assert!(matches!(first_field_codec, Codec::Utf8View));
2250        } else {
2251            panic!("Expected Struct codec");
2252        }
2253    }
2254
2255    #[test]
2256    fn test_union_with_strict_mode() {
2257        let schema = Schema::Union(vec![
2258            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2259            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2260        ]);
2261
2262        let mut maker = Maker::new(false, true);
2263        let result = maker.make_data_type(&schema, None, None);
2264
2265        assert!(result.is_err());
2266        match result {
2267            Err(ArrowError::SchemaError(msg)) => {
2268                assert!(msg.contains(
2269                    "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2270                ));
2271            }
2272            _ => panic!("Expected SchemaError"),
2273        }
2274    }
2275
2276    #[test]
2277    fn test_resolve_int_to_float_promotion() {
2278        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2279        assert!(matches!(result.codec, Codec::Float32));
2280        assert_eq!(
2281            result.resolution,
2282            Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2283        );
2284    }
2285
2286    #[test]
2287    fn test_resolve_int_to_double_promotion() {
2288        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2289        assert!(matches!(result.codec, Codec::Float64));
2290        assert_eq!(
2291            result.resolution,
2292            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2293        );
2294    }
2295
2296    #[test]
2297    fn test_resolve_long_to_float_promotion() {
2298        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2299        assert!(matches!(result.codec, Codec::Float32));
2300        assert_eq!(
2301            result.resolution,
2302            Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2303        );
2304    }
2305
2306    #[test]
2307    fn test_resolve_long_to_double_promotion() {
2308        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2309        assert!(matches!(result.codec, Codec::Float64));
2310        assert_eq!(
2311            result.resolution,
2312            Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2313        );
2314    }
2315
2316    #[test]
2317    fn test_resolve_float_to_double_promotion() {
2318        let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2319        assert!(matches!(result.codec, Codec::Float64));
2320        assert_eq!(
2321            result.resolution,
2322            Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2323        );
2324    }
2325
2326    #[test]
2327    fn test_resolve_string_to_bytes_promotion() {
2328        let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2329        assert!(matches!(result.codec, Codec::Binary));
2330        assert_eq!(
2331            result.resolution,
2332            Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2333        );
2334    }
2335
2336    #[test]
2337    fn test_resolve_bytes_to_string_promotion() {
2338        let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2339        assert!(matches!(result.codec, Codec::Utf8));
2340        assert_eq!(
2341            result.resolution,
2342            Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2343        );
2344    }
2345
2346    #[test]
2347    fn test_resolve_illegal_promotion_double_to_float_errors() {
2348        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2349        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2350        let mut maker = Maker::new(false, false);
2351        let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2352        assert!(result.is_err());
2353        match result {
2354            Err(ArrowError::ParseError(msg)) => {
2355                assert!(msg.contains("Illegal promotion"));
2356            }
2357            _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2358        }
2359    }
2360
2361    #[test]
2362    fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2363        let writer = Schema::Union(vec![
2364            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2365            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2366        ]);
2367        let reader = Schema::Union(vec![
2368            Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2369            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2370        ]);
2371        let mut maker = Maker::new(false, false);
2372        let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2373        assert!(matches!(result.codec, Codec::Float64));
2374        assert_eq!(
2375            result.resolution,
2376            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2377        );
2378        assert_eq!(result.nullability, Some(Nullability::NullFirst));
2379    }
2380
2381    #[test]
2382    fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2383        let writer = mk_union(vec![
2384            mk_primitive(PrimitiveType::String),
2385            mk_primitive(PrimitiveType::Long),
2386        ]);
2387        let reader = mk_primitive(PrimitiveType::Bytes);
2388        let mut maker = Maker::new(false, false);
2389        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2390        assert!(matches!(dt.codec(), Codec::Binary));
2391        let resolved = match dt.resolution {
2392            Some(ResolutionInfo::Union(u)) => u,
2393            other => panic!("expected union resolution info, got {other:?}"),
2394        };
2395        assert!(resolved.writer_is_union && !resolved.reader_is_union);
2396        assert_eq!(
2397            resolved.writer_to_reader.as_ref(),
2398            &[Some((0, Promotion::StringToBytes)), None]
2399        );
2400    }
2401
2402    #[test]
2403    fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2404        let writer = mk_primitive(PrimitiveType::Long);
2405        let reader = mk_union(vec![
2406            mk_primitive(PrimitiveType::Long),
2407            mk_primitive(PrimitiveType::Double),
2408        ]);
2409        let mut maker = Maker::new(false, false);
2410        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2411        let resolved = match dt.resolution {
2412            Some(ResolutionInfo::Union(u)) => u,
2413            other => panic!("expected union resolution info, got {other:?}"),
2414        };
2415        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2416        assert_eq!(
2417            resolved.writer_to_reader.as_ref(),
2418            &[Some((0, Promotion::Direct))]
2419        );
2420    }
2421
2422    #[test]
2423    fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2424        let writer = mk_primitive(PrimitiveType::Int);
2425        let reader = mk_union(vec![
2426            mk_primitive(PrimitiveType::Null),
2427            mk_primitive(PrimitiveType::Long),
2428            mk_primitive(PrimitiveType::String),
2429        ]);
2430        let mut maker = Maker::new(false, false);
2431        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2432        let resolved = match dt.resolution {
2433            Some(ResolutionInfo::Union(u)) => u,
2434            other => panic!("expected union resolution info, got {other:?}"),
2435        };
2436        assert_eq!(
2437            resolved.writer_to_reader.as_ref(),
2438            &[Some((1, Promotion::IntToLong))]
2439        );
2440    }
2441
2442    #[test]
2443    fn test_resolve_both_nullable_unions_direct_match() {
2444        let writer = mk_union(vec![
2445            mk_primitive(PrimitiveType::Null),
2446            mk_primitive(PrimitiveType::String),
2447        ]);
2448        let reader = mk_union(vec![
2449            mk_primitive(PrimitiveType::String),
2450            mk_primitive(PrimitiveType::Null),
2451        ]);
2452        let mut maker = Maker::new(false, false);
2453        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2454        assert!(matches!(dt.codec(), Codec::Utf8));
2455        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2456        assert!(dt.resolution.is_none());
2457    }
2458
2459    #[test]
2460    fn test_resolve_both_nullable_unions_with_promotion() {
2461        let writer = mk_union(vec![
2462            mk_primitive(PrimitiveType::Null),
2463            mk_primitive(PrimitiveType::Int),
2464        ]);
2465        let reader = mk_union(vec![
2466            mk_primitive(PrimitiveType::Double),
2467            mk_primitive(PrimitiveType::Null),
2468        ]);
2469        let mut maker = Maker::new(false, false);
2470        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2471        assert!(matches!(dt.codec(), Codec::Float64));
2472        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2473        assert_eq!(
2474            dt.resolution,
2475            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2476        );
2477    }
2478
2479    #[test]
2480    fn test_resolve_type_promotion() {
2481        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2482        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2483        let mut maker = Maker::new(false, false);
2484        let result = maker
2485            .make_data_type(&writer_schema, Some(&reader_schema), None)
2486            .unwrap();
2487        assert!(matches!(result.codec, Codec::Int64));
2488        assert_eq!(
2489            result.resolution,
2490            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2491        );
2492    }
2493
2494    #[test]
2495    fn test_nested_record_type_reuse_without_namespace() {
2496        let schema_str = r#"
2497        {
2498          "type": "record",
2499          "name": "Record",
2500          "fields": [
2501            {
2502              "name": "nested",
2503              "type": {
2504                "type": "record",
2505                "name": "Nested",
2506                "fields": [
2507                  { "name": "nested_int", "type": "int" }
2508                ]
2509              }
2510            },
2511            { "name": "nestedRecord", "type": "Nested" },
2512            { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
2513            { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
2514          ]
2515        }
2516        "#;
2517
2518        let schema: Schema = serde_json::from_str(schema_str).unwrap();
2519
2520        let mut maker = Maker::new(false, false);
2521        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2522
2523        if let Codec::Struct(fields) = avro_data_type.codec() {
2524            assert_eq!(fields.len(), 4);
2525
2526            // nested
2527            assert_eq!(fields[0].name(), "nested");
2528            let nested_data_type = fields[0].data_type();
2529            if let Codec::Struct(nested_fields) = nested_data_type.codec() {
2530                assert_eq!(nested_fields.len(), 1);
2531                assert_eq!(nested_fields[0].name(), "nested_int");
2532                assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
2533            } else {
2534                panic!(
2535                    "'nested' field is not a struct but {:?}",
2536                    nested_data_type.codec()
2537                );
2538            }
2539
2540            // nestedRecord
2541            assert_eq!(fields[1].name(), "nestedRecord");
2542            let nested_record_data_type = fields[1].data_type();
2543            assert_eq!(
2544                nested_record_data_type.codec().data_type(),
2545                nested_data_type.codec().data_type()
2546            );
2547
2548            // nestedArray
2549            assert_eq!(fields[2].name(), "nestedArray");
2550            if let Codec::List(item_type) = fields[2].data_type().codec() {
2551                assert_eq!(
2552                    item_type.codec().data_type(),
2553                    nested_data_type.codec().data_type()
2554                );
2555            } else {
2556                panic!("'nestedArray' field is not a list");
2557            }
2558
2559            // nestedMap
2560            assert_eq!(fields[3].name(), "nestedMap");
2561            if let Codec::Map(value_type) = fields[3].data_type().codec() {
2562                assert_eq!(
2563                    value_type.codec().data_type(),
2564                    nested_data_type.codec().data_type()
2565                );
2566            } else {
2567                panic!("'nestedMap' field is not a map");
2568            }
2569        } else {
2570            panic!("Top-level schema is not a struct");
2571        }
2572    }
2573
2574    #[test]
2575    fn test_nested_enum_type_reuse_with_namespace() {
2576        let schema_str = r#"
2577        {
2578          "type": "record",
2579          "name": "Record",
2580          "namespace": "record_ns",
2581          "fields": [
2582            {
2583              "name": "status",
2584              "type": {
2585                "type": "enum",
2586                "name": "Status",
2587                "namespace": "enum_ns",
2588                "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
2589              }
2590            },
2591            { "name": "backupStatus", "type": "enum_ns.Status" },
2592            { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
2593            { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
2594          ]
2595        }
2596        "#;
2597
2598        let schema: Schema = serde_json::from_str(schema_str).unwrap();
2599
2600        let mut maker = Maker::new(false, false);
2601        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2602
2603        if let Codec::Struct(fields) = avro_data_type.codec() {
2604            assert_eq!(fields.len(), 4);
2605
2606            // status
2607            assert_eq!(fields[0].name(), "status");
2608            let status_data_type = fields[0].data_type();
2609            if let Codec::Enum(symbols) = status_data_type.codec() {
2610                assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
2611            } else {
2612                panic!(
2613                    "'status' field is not an enum but {:?}",
2614                    status_data_type.codec()
2615                );
2616            }
2617
2618            // backupStatus
2619            assert_eq!(fields[1].name(), "backupStatus");
2620            let backup_status_data_type = fields[1].data_type();
2621            assert_eq!(
2622                backup_status_data_type.codec().data_type(),
2623                status_data_type.codec().data_type()
2624            );
2625
2626            // statusHistory
2627            assert_eq!(fields[2].name(), "statusHistory");
2628            if let Codec::List(item_type) = fields[2].data_type().codec() {
2629                assert_eq!(
2630                    item_type.codec().data_type(),
2631                    status_data_type.codec().data_type()
2632                );
2633            } else {
2634                panic!("'statusHistory' field is not a list");
2635            }
2636
2637            // statusMap
2638            assert_eq!(fields[3].name(), "statusMap");
2639            if let Codec::Map(value_type) = fields[3].data_type().codec() {
2640                assert_eq!(
2641                    value_type.codec().data_type(),
2642                    status_data_type.codec().data_type()
2643                );
2644            } else {
2645                panic!("'statusMap' field is not a map");
2646            }
2647        } else {
2648            panic!("Top-level schema is not a struct");
2649        }
2650    }
2651
2652    #[test]
2653    fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
2654        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2655        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2656        let mut maker = Maker::new(false, false);
2657        let data_type = maker
2658            .make_data_type(&writer_schema, Some(&reader_schema), None)
2659            .expect("resolution should succeed");
2660        let field = AvroField {
2661            name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
2662            data_type,
2663        };
2664        assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
2665        assert!(matches!(field.data_type().codec(), Codec::Utf8));
2666    }
2667
2668    fn json_string(s: &str) -> Value {
2669        Value::String(s.to_string())
2670    }
2671
2672    fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
2673        let stored = dt
2674            .metadata
2675            .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
2676            .cloned()
2677            .unwrap_or_default();
2678        let expected = serde_json::to_string(default_json).unwrap();
2679        assert_eq!(stored, expected, "stored default metadata should match");
2680    }
2681
2682    #[test]
2683    fn test_validate_and_store_default_null_and_nullability_rules() {
2684        let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
2685        let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
2686        assert_eq!(lit, AvroLiteral::Null);
2687        assert_default_stored(&dt_null, &Value::Null);
2688        let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2689        let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
2690        assert!(
2691            err.to_string()
2692                .contains("JSON null default is only valid for `null` type"),
2693            "unexpected error: {err}"
2694        );
2695        let mut dt_int_nf =
2696            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
2697        let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
2698        assert_eq!(lit2, AvroLiteral::Null);
2699        assert_default_stored(&dt_int_nf, &Value::Null);
2700        let mut dt_int_ns =
2701            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
2702        let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
2703        assert!(
2704            err2.to_string()
2705                .contains("JSON null default is only valid for `null` type"),
2706            "unexpected error: {err2}"
2707        );
2708    }
2709
2710    #[test]
2711    fn test_validate_and_store_default_primitives_and_temporal() {
2712        let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
2713        let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
2714        assert_eq!(lit, AvroLiteral::Boolean(true));
2715        assert_default_stored(&dt_bool, &Value::Bool(true));
2716        let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2717        let lit = dt_i32
2718            .parse_and_store_default(&serde_json::json!(123))
2719            .unwrap();
2720        assert_eq!(lit, AvroLiteral::Int(123));
2721        assert_default_stored(&dt_i32, &serde_json::json!(123));
2722        let err = dt_i32
2723            .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
2724            .unwrap_err();
2725        assert!(format!("{err}").contains("out of i32 range"));
2726        let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2727        let lit = dt_i64
2728            .parse_and_store_default(&serde_json::json!(1234567890))
2729            .unwrap();
2730        assert_eq!(lit, AvroLiteral::Long(1234567890));
2731        assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
2732        let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
2733        let lit = dt_f32
2734            .parse_and_store_default(&serde_json::json!(1.25))
2735            .unwrap();
2736        assert_eq!(lit, AvroLiteral::Float(1.25));
2737        assert_default_stored(&dt_f32, &serde_json::json!(1.25));
2738        let err = dt_f32
2739            .parse_and_store_default(&serde_json::json!(1e39))
2740            .unwrap_err();
2741        assert!(format!("{err}").contains("out of f32 range"));
2742        let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2743        let lit = dt_f64
2744            .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
2745            .unwrap();
2746        assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
2747        assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
2748        let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
2749        let l = dt_str
2750            .parse_and_store_default(&json_string("hello"))
2751            .unwrap();
2752        assert_eq!(l, AvroLiteral::String("hello".into()));
2753        assert_default_stored(&dt_str, &json_string("hello"));
2754        let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
2755        let l = dt_strv
2756            .parse_and_store_default(&json_string("view"))
2757            .unwrap();
2758        assert_eq!(l, AvroLiteral::String("view".into()));
2759        assert_default_stored(&dt_strv, &json_string("view"));
2760        let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
2761        let l = dt_uuid
2762            .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
2763            .unwrap();
2764        assert_eq!(
2765            l,
2766            AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
2767        );
2768        let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
2769        let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
2770        assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
2771        let err = dt_bin
2772            .parse_and_store_default(&json_string("€")) // U+20AC
2773            .unwrap_err();
2774        assert!(format!("{err}").contains("Invalid codepoint"));
2775        let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
2776        let ld = dt_date
2777            .parse_and_store_default(&serde_json::json!(1))
2778            .unwrap();
2779        assert_eq!(ld, AvroLiteral::Int(1));
2780        let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
2781        let lt = dt_tmill
2782            .parse_and_store_default(&serde_json::json!(86_400_000))
2783            .unwrap();
2784        assert_eq!(lt, AvroLiteral::Int(86_400_000));
2785        let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
2786        let ltm = dt_tmicros
2787            .parse_and_store_default(&serde_json::json!(1_000_000))
2788            .unwrap();
2789        assert_eq!(ltm, AvroLiteral::Long(1_000_000));
2790        let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
2791        let l1 = dt_ts_milli
2792            .parse_and_store_default(&serde_json::json!(123))
2793            .unwrap();
2794        assert_eq!(l1, AvroLiteral::Long(123));
2795        let mut dt_ts_micro =
2796            AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
2797        let l2 = dt_ts_micro
2798            .parse_and_store_default(&serde_json::json!(456))
2799            .unwrap();
2800        assert_eq!(l2, AvroLiteral::Long(456));
2801    }
2802
2803    #[test]
2804    fn test_validate_and_store_default_fixed_decimal_interval() {
2805        let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
2806        let l = dt_fixed
2807            .parse_and_store_default(&json_string("WXYZ"))
2808            .unwrap();
2809        assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
2810        let err = dt_fixed
2811            .parse_and_store_default(&json_string("TOO LONG"))
2812            .unwrap_err();
2813        assert!(err.to_string().contains("Default length"));
2814        let mut dt_dec_fixed =
2815            AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
2816        let l = dt_dec_fixed
2817            .parse_and_store_default(&json_string("abc"))
2818            .unwrap();
2819        assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
2820        let err = dt_dec_fixed
2821            .parse_and_store_default(&json_string("toolong"))
2822            .unwrap_err();
2823        assert!(err.to_string().contains("Default length"));
2824        let mut dt_dec_bytes =
2825            AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2826        let l = dt_dec_bytes
2827            .parse_and_store_default(&json_string("freeform"))
2828            .unwrap();
2829        assert_eq!(
2830            l,
2831            AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
2832        );
2833        let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
2834        let l = dt_interval
2835            .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
2836            .unwrap();
2837        assert_eq!(
2838            l,
2839            AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
2840        );
2841        let err = dt_interval
2842            .parse_and_store_default(&json_string("short"))
2843            .unwrap_err();
2844        assert!(err.to_string().contains("Default length"));
2845    }
2846
2847    #[test]
2848    fn test_validate_and_store_default_enum_list_map_struct() {
2849        let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
2850            .into_iter()
2851            .collect();
2852        let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
2853        let l = dt_enum
2854            .parse_and_store_default(&json_string("GREEN"))
2855            .unwrap();
2856        assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
2857        let err = dt_enum
2858            .parse_and_store_default(&json_string("YELLOW"))
2859            .unwrap_err();
2860        assert!(err.to_string().contains("Default enum symbol"));
2861        let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2862        let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
2863        let val = serde_json::json!([1, 2, 3]);
2864        let l = dt_list.parse_and_store_default(&val).unwrap();
2865        assert_eq!(
2866            l,
2867            AvroLiteral::Array(vec![
2868                AvroLiteral::Long(1),
2869                AvroLiteral::Long(2),
2870                AvroLiteral::Long(3)
2871            ])
2872        );
2873        let err = dt_list
2874            .parse_and_store_default(&serde_json::json!({"not":"array"}))
2875            .unwrap_err();
2876        assert!(err.to_string().contains("JSON array"));
2877        let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2878        let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
2879        let mv = serde_json::json!({"x": 1.5, "y": 2.5});
2880        let l = dt_map.parse_and_store_default(&mv).unwrap();
2881        let mut expected = IndexMap::new();
2882        expected.insert("x".into(), AvroLiteral::Double(1.5));
2883        expected.insert("y".into(), AvroLiteral::Double(2.5));
2884        assert_eq!(l, AvroLiteral::Map(expected));
2885        // Not object -> error
2886        let err = dt_map
2887            .parse_and_store_default(&serde_json::json!(123))
2888            .unwrap_err();
2889        assert!(err.to_string().contains("JSON object"));
2890        let mut field_a = AvroField {
2891            name: "a".into(),
2892            data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
2893        };
2894        let field_b = AvroField {
2895            name: "b".into(),
2896            data_type: AvroDataType::new(
2897                Codec::Int64,
2898                HashMap::new(),
2899                Some(Nullability::NullFirst),
2900            ),
2901        };
2902        let mut c_md = HashMap::new();
2903        c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
2904        let field_c = AvroField {
2905            name: "c".into(),
2906            data_type: AvroDataType::new(Codec::Utf8, c_md, None),
2907        };
2908        field_a.data_type.metadata.insert("doc".into(), "na".into());
2909        let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
2910        let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
2911        let default_obj = serde_json::json!({"a": 7});
2912        let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
2913        let mut expected = IndexMap::new();
2914        expected.insert("a".into(), AvroLiteral::Int(7));
2915        expected.insert("b".into(), AvroLiteral::Null);
2916        expected.insert("c".into(), AvroLiteral::String("xyz".into()));
2917        assert_eq!(l, AvroLiteral::Map(expected));
2918        assert_default_stored(&dt_struct, &default_obj);
2919        let req_field = AvroField {
2920            name: "req".into(),
2921            data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
2922        };
2923        let mut dt_bad = AvroDataType::new(
2924            Codec::Struct(Arc::from(vec![req_field])),
2925            HashMap::new(),
2926            None,
2927        );
2928        let err = dt_bad
2929            .parse_and_store_default(&serde_json::json!({}))
2930            .unwrap_err();
2931        assert!(
2932            err.to_string().contains("missing required subfield 'req'"),
2933            "unexpected error: {err}"
2934        );
2935        let err = dt_struct
2936            .parse_and_store_default(&serde_json::json!(10))
2937            .unwrap_err();
2938        err.to_string().contains("must be a JSON object");
2939    }
2940
2941    #[test]
2942    fn test_resolve_array_promotion_and_reader_metadata() {
2943        let mut w_add: HashMap<&str, Value> = HashMap::new();
2944        w_add.insert("who", json_string("writer"));
2945        let mut r_add: HashMap<&str, Value> = HashMap::new();
2946        r_add.insert("who", json_string("reader"));
2947        let writer_schema = Schema::Complex(ComplexType::Array(Array {
2948            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2949            attributes: Attributes {
2950                logical_type: None,
2951                additional: w_add,
2952            },
2953        }));
2954        let reader_schema = Schema::Complex(ComplexType::Array(Array {
2955            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
2956            attributes: Attributes {
2957                logical_type: None,
2958                additional: r_add,
2959            },
2960        }));
2961        let mut maker = Maker::new(false, false);
2962        let dt = maker
2963            .make_data_type(&writer_schema, Some(&reader_schema), None)
2964            .unwrap();
2965        assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
2966        if let Codec::List(inner) = dt.codec() {
2967            assert!(matches!(inner.codec(), Codec::Int64));
2968            assert_eq!(
2969                inner.resolution,
2970                Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2971            );
2972        } else {
2973            panic!("expected list codec");
2974        }
2975    }
2976
2977    #[test]
2978    fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
2979        let writer_schema = Schema::Complex(ComplexType::Array(Array {
2980            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2981            attributes: Attributes::default(),
2982        }));
2983        let reader_schema = Schema::Complex(ComplexType::Array(Array {
2984            items: Box::new(mk_union(vec![
2985                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2986                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2987            ])),
2988            attributes: Attributes::default(),
2989        }));
2990        let mut maker = Maker::new(false, false);
2991        let dt = maker
2992            .make_data_type(&writer_schema, Some(&reader_schema), None)
2993            .unwrap();
2994        if let Codec::List(inner) = dt.codec() {
2995            assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
2996            assert!(matches!(inner.codec(), Codec::Int32));
2997            match inner.resolution.as_ref() {
2998                Some(ResolutionInfo::Union(info)) => {
2999                    assert!(!info.writer_is_union, "writer should be non-union");
3000                    assert!(info.reader_is_union, "reader should be union");
3001                    assert_eq!(
3002                        info.writer_to_reader.as_ref(),
3003                        &[Some((1, Promotion::Direct))]
3004                    );
3005                }
3006                other => panic!("expected Union resolution, got {other:?}"),
3007            }
3008        } else {
3009            panic!("expected List codec");
3010        }
3011    }
3012
3013    #[test]
3014    fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3015        let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3016            name: "MD5",
3017            namespace: None,
3018            aliases: vec!["Hash16"],
3019            size: 16,
3020            attributes: Attributes::default(),
3021        }));
3022        let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3023            name: "Hash16",
3024            namespace: None,
3025            aliases: vec![],
3026            size: 16,
3027            attributes: Attributes::default(),
3028        }));
3029        let mut maker = Maker::new(false, false);
3030        let dt = maker
3031            .make_data_type(&writer_schema, Some(&reader_schema), None)
3032            .unwrap();
3033        assert!(matches!(dt.codec(), Codec::Fixed(16)));
3034    }
3035
3036    #[test]
3037    fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3038        let writer = Schema::Complex(ComplexType::Record(Record {
3039            name: "R",
3040            namespace: None,
3041            doc: None,
3042            aliases: vec![],
3043            fields: vec![
3044                crate::schema::Field {
3045                    name: "a",
3046                    doc: None,
3047                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3048                    default: None,
3049                    aliases: vec![],
3050                },
3051                crate::schema::Field {
3052                    name: "skipme",
3053                    doc: None,
3054                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3055                    default: None,
3056                    aliases: vec![],
3057                },
3058                crate::schema::Field {
3059                    name: "b",
3060                    doc: None,
3061                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3062                    default: None,
3063                    aliases: vec![],
3064                },
3065            ],
3066            attributes: Attributes::default(),
3067        }));
3068        let reader = Schema::Complex(ComplexType::Record(Record {
3069            name: "R",
3070            namespace: None,
3071            doc: None,
3072            aliases: vec![],
3073            fields: vec![
3074                crate::schema::Field {
3075                    name: "b",
3076                    doc: None,
3077                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3078                    default: None,
3079                    aliases: vec![],
3080                },
3081                crate::schema::Field {
3082                    name: "a",
3083                    doc: None,
3084                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3085                    default: None,
3086                    aliases: vec![],
3087                },
3088                crate::schema::Field {
3089                    name: "name",
3090                    doc: None,
3091                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3092                    default: Some(json_string("anon")),
3093                    aliases: vec![],
3094                },
3095                crate::schema::Field {
3096                    name: "opt",
3097                    doc: None,
3098                    r#type: Schema::Union(vec![
3099                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3100                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3101                    ]),
3102                    default: None, // should default to null because NullFirst
3103                    aliases: vec![],
3104                },
3105            ],
3106            attributes: Attributes::default(),
3107        }));
3108        let mut maker = Maker::new(false, false);
3109        let dt = maker
3110            .make_data_type(&writer, Some(&reader), None)
3111            .expect("record resolution");
3112        let fields = match dt.codec() {
3113            Codec::Struct(f) => f,
3114            other => panic!("expected struct, got {other:?}"),
3115        };
3116        assert_eq!(fields.len(), 4);
3117        assert_eq!(fields[0].name(), "b");
3118        assert_eq!(fields[1].name(), "a");
3119        assert_eq!(fields[2].name(), "name");
3120        assert_eq!(fields[3].name(), "opt");
3121        assert!(matches!(
3122            fields[1].data_type().resolution,
3123            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3124        ));
3125        let rec = match dt.resolution {
3126            Some(ResolutionInfo::Record(ref r)) => r.clone(),
3127            other => panic!("expected record resolution, got {other:?}"),
3128        };
3129        assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
3130        assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3131        assert!(rec.skip_fields[0].is_none());
3132        assert!(rec.skip_fields[2].is_none());
3133        let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
3134        assert!(matches!(skip1.codec(), Codec::Utf8));
3135        let name_md = &fields[2].data_type().metadata;
3136        assert_eq!(
3137            name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3138            Some(&"\"anon\"".to_string())
3139        );
3140        let opt_md = &fields[3].data_type().metadata;
3141        assert_eq!(
3142            opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3143            Some(&"null".to_string())
3144        );
3145    }
3146
3147    #[test]
3148    fn test_named_type_alias_resolution_record_cross_namespace() {
3149        let writer_record = Record {
3150            name: "PersonV2",
3151            namespace: Some("com.example.v2"),
3152            doc: None,
3153            aliases: vec!["com.example.Person"],
3154            fields: vec![
3155                AvroFieldSchema {
3156                    name: "name",
3157                    doc: None,
3158                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3159                    default: None,
3160                    aliases: vec![],
3161                },
3162                AvroFieldSchema {
3163                    name: "age",
3164                    doc: None,
3165                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3166                    default: None,
3167                    aliases: vec![],
3168                },
3169            ],
3170            attributes: Attributes::default(),
3171        };
3172        let reader_record = Record {
3173            name: "Person",
3174            namespace: Some("com.example"),
3175            doc: None,
3176            aliases: vec![],
3177            fields: writer_record.fields.clone(),
3178            attributes: Attributes::default(),
3179        };
3180        let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3181        let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3182        let mut maker = Maker::new(false, false);
3183        let result = maker
3184            .make_data_type(&writer_schema, Some(&reader_schema), None)
3185            .expect("record alias resolution should succeed");
3186        match result.codec {
3187            Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3188            other => panic!("expected struct, got {other:?}"),
3189        }
3190    }
3191
3192    #[test]
3193    fn test_named_type_alias_resolution_enum_cross_namespace() {
3194        let writer_enum = Enum {
3195            name: "ColorV2",
3196            namespace: Some("org.example.v2"),
3197            doc: None,
3198            aliases: vec!["org.example.Color"],
3199            symbols: vec!["RED", "GREEN", "BLUE"],
3200            default: None,
3201            attributes: Attributes::default(),
3202        };
3203        let reader_enum = Enum {
3204            name: "Color",
3205            namespace: Some("org.example"),
3206            doc: None,
3207            aliases: vec![],
3208            symbols: vec!["RED", "GREEN", "BLUE"],
3209            default: None,
3210            attributes: Attributes::default(),
3211        };
3212        let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3213        let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3214        let mut maker = Maker::new(false, false);
3215        maker
3216            .make_data_type(&writer_schema, Some(&reader_schema), None)
3217            .expect("enum alias resolution should succeed");
3218    }
3219
3220    #[test]
3221    fn test_named_type_alias_resolution_fixed_cross_namespace() {
3222        let writer_fixed = Fixed {
3223            name: "Fx10V2",
3224            namespace: Some("ns.v2"),
3225            aliases: vec!["ns.Fx10"],
3226            size: 10,
3227            attributes: Attributes::default(),
3228        };
3229        let reader_fixed = Fixed {
3230            name: "Fx10",
3231            namespace: Some("ns"),
3232            aliases: vec![],
3233            size: 10,
3234            attributes: Attributes::default(),
3235        };
3236        let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3237        let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3238        let mut maker = Maker::new(false, false);
3239        maker
3240            .make_data_type(&writer_schema, Some(&reader_schema), None)
3241            .expect("fixed alias resolution should succeed");
3242    }
3243}