arrow_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Codec for Mapping Avro and Arrow types.
19
20use crate::schema::{
21    AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22    AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23    PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27    IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40/// Contains information about how to resolve differences between a writer's and a reader's schema.
41#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43    /// Indicates that the writer's type should be promoted to the reader's type.
44    Promotion(Promotion),
45    /// Indicates that a default value should be used for a field.
46    DefaultValue(AvroLiteral),
47    /// Provides mapping information for resolving enums.
48    EnumMapping(EnumMapping),
49    /// Provides resolution information for record fields.
50    Record(ResolvedRecord),
51    /// Provides mapping and shape info for resolving unions.
52    Union(ResolvedUnion),
53}
54
55/// Represents a literal Avro value.
56///
57/// This is used to represent default values in an Avro schema.
58#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60    /// Represents a null value.
61    Null,
62    /// Represents a boolean value.
63    Boolean(bool),
64    /// Represents an integer value.
65    Int(i32),
66    /// Represents a long value.
67    Long(i64),
68    /// Represents a float value.
69    Float(f32),
70    /// Represents a double value.
71    Double(f64),
72    /// Represents a bytes value.
73    Bytes(Vec<u8>),
74    /// Represents a string value.
75    String(String),
76    /// Represents an enum symbol.
77    Enum(String),
78    /// Represents a JSON array default for an Avro array, containing element literals.
79    Array(Vec<AvroLiteral>),
80    /// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals.
81    Map(IndexMap<String, AvroLiteral>),
82}
83
84/// Contains the necessary information to resolve a writer's record against a reader's record schema.
85#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87    /// Maps a writer's field index to the corresponding reader's field index.
88    /// `None` if the writer's field is not present in the reader's schema.
89    pub(crate) writer_to_reader: Arc<[Option<usize>]>,
90    /// A list of indices in the reader's schema for fields that have a default value.
91    pub(crate) default_fields: Arc<[usize]>,
92    /// For fields present in the writer's schema but not the reader's, this stores their data type.
93    /// This is needed to correctly skip over these fields during deserialization.
94    pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
95}
96
97/// Defines the type of promotion to be applied during schema resolution.
98///
99/// Schema resolution may require promoting a writer's data type to a reader's data type.
100/// For example, an `int` can be promoted to a `long`, `float`, or `double`.
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub(crate) enum Promotion {
103    /// Direct read with no data type promotion.
104    Direct,
105    /// Promotes an `int` to a `long`.
106    IntToLong,
107    /// Promotes an `int` to a `float`.
108    IntToFloat,
109    /// Promotes an `int` to a `double`.
110    IntToDouble,
111    /// Promotes a `long` to a `float`.
112    LongToFloat,
113    /// Promotes a `long` to a `double`.
114    LongToDouble,
115    /// Promotes a `float` to a `double`.
116    FloatToDouble,
117    /// Promotes a `string` to `bytes`.
118    StringToBytes,
119    /// Promotes `bytes` to a `string`.
120    BytesToString,
121}
122
123impl Display for Promotion {
124    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
125        match self {
126            Self::Direct => write!(formatter, "Direct"),
127            Self::IntToLong => write!(formatter, "Int->Long"),
128            Self::IntToFloat => write!(formatter, "Int->Float"),
129            Self::IntToDouble => write!(formatter, "Int->Double"),
130            Self::LongToFloat => write!(formatter, "Long->Float"),
131            Self::LongToDouble => write!(formatter, "Long->Double"),
132            Self::FloatToDouble => write!(formatter, "Float->Double"),
133            Self::StringToBytes => write!(formatter, "String->Bytes"),
134            Self::BytesToString => write!(formatter, "Bytes->String"),
135        }
136    }
137}
138
139/// Information required to resolve a writer union against a reader union (or single type).
140#[derive(Debug, Clone, PartialEq)]
141pub(crate) struct ResolvedUnion {
142    /// For each writer branch index, the reader branch index and how to read it.
143    /// `None` means the writer branch doesn't resolve against the reader.
144    pub(crate) writer_to_reader: Arc<[Option<(usize, Promotion)>]>,
145    /// Whether the writer schema at this site is a union
146    pub(crate) writer_is_union: bool,
147    /// Whether the reader schema at this site is a union
148    pub(crate) reader_is_union: bool,
149}
150
151/// Holds the mapping information for resolving Avro enums.
152///
153/// When resolving schemas, the writer's enum symbols must be mapped to the reader's symbols.
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub(crate) struct EnumMapping {
156    /// A mapping from the writer's symbol index to the reader's symbol index.
157    pub(crate) mapping: Arc<[i32]>,
158    /// The index to use for a writer's symbol that is not present in the reader's enum
159    /// and a default value is specified in the reader's schema.
160    pub(crate) default_index: i32,
161}
162
163#[cfg(feature = "canonical_extension_types")]
164fn with_extension_type(codec: &Codec, field: Field) -> Field {
165    match codec {
166        Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
167        _ => field,
168    }
169}
170
171/// An Avro datatype mapped to the arrow data model
172#[derive(Debug, Clone, PartialEq)]
173pub(crate) struct AvroDataType {
174    nullability: Option<Nullability>,
175    metadata: HashMap<String, String>,
176    codec: Codec,
177    pub(crate) resolution: Option<ResolutionInfo>,
178}
179
180impl AvroDataType {
181    /// Create a new [`AvroDataType`] with the given parts.
182    pub(crate) fn new(
183        codec: Codec,
184        metadata: HashMap<String, String>,
185        nullability: Option<Nullability>,
186    ) -> Self {
187        AvroDataType {
188            codec,
189            metadata,
190            nullability,
191            resolution: None,
192        }
193    }
194
195    #[inline]
196    fn new_with_resolution(
197        codec: Codec,
198        metadata: HashMap<String, String>,
199        nullability: Option<Nullability>,
200        resolution: Option<ResolutionInfo>,
201    ) -> Self {
202        Self {
203            codec,
204            metadata,
205            nullability,
206            resolution,
207        }
208    }
209
210    /// Returns an arrow [`Field`] with the given name
211    pub(crate) fn field_with_name(&self, name: &str) -> Field {
212        let mut nullable = self.nullability.is_some();
213        if !nullable {
214            if let Codec::Union(children, _, _) = self.codec() {
215                // If any encoded branch is `null`, mark field as nullable
216                if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
217                    nullable = true;
218                }
219            }
220        }
221        let data_type = self.codec.data_type();
222        let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
223        #[cfg(feature = "canonical_extension_types")]
224        return with_extension_type(&self.codec, field);
225        #[cfg(not(feature = "canonical_extension_types"))]
226        field
227    }
228
229    /// Returns a reference to the codec used by this data type
230    ///
231    /// The codec determines how Avro data is encoded and mapped to Arrow data types.
232    /// This is useful when we need to inspect or use the specific encoding of a field.
233    pub(crate) fn codec(&self) -> &Codec {
234        &self.codec
235    }
236
237    /// Returns the nullability status of this data type
238    ///
239    /// In Avro, nullability is represented through unions with null types.
240    /// The returned value indicates how nulls are encoded in the Avro format:
241    /// - `Some(Nullability::NullFirst)` - Nulls are encoded as the first union variant
242    /// - `Some(Nullability::NullSecond)` - Nulls are encoded as the second union variant
243    /// - `None` - The type is not nullable
244    pub(crate) fn nullability(&self) -> Option<Nullability> {
245        self.nullability
246    }
247
248    #[inline]
249    fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
250        fn expect_string<'v>(
251            default_json: &'v Value,
252            data_type: &str,
253        ) -> Result<&'v str, ArrowError> {
254            match default_json {
255                Value::String(s) => Ok(s.as_str()),
256                _ => Err(ArrowError::SchemaError(format!(
257                    "Default value must be a JSON string for {data_type}"
258                ))),
259            }
260        }
261
262        fn parse_bytes_default(
263            default_json: &Value,
264            expected_len: Option<usize>,
265        ) -> Result<Vec<u8>, ArrowError> {
266            let s = expect_string(default_json, "bytes/fixed logical types")?;
267            let mut out = Vec::with_capacity(s.len());
268            for ch in s.chars() {
269                let cp = ch as u32;
270                if cp > 0xFF {
271                    return Err(ArrowError::SchemaError(format!(
272                        "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
273                    )));
274                }
275                out.push(cp as u8);
276            }
277            if let Some(len) = expected_len {
278                if out.len() != len {
279                    return Err(ArrowError::SchemaError(format!(
280                        "Default length {} does not match expected fixed size {len}",
281                        out.len(),
282                    )));
283                }
284            }
285            Ok(out)
286        }
287
288        fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
289            match default_json {
290                Value::Number(n) => n.as_i64().ok_or_else(|| {
291                    ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
292                }),
293                _ => Err(ArrowError::SchemaError(format!(
294                    "Default {data_type} must be a JSON integer"
295                ))),
296            }
297        }
298
299        fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
300            match default_json {
301                Value::Number(n) => n.as_f64().ok_or_else(|| {
302                    ArrowError::SchemaError(format!("Default {data_type} must be a number"))
303                }),
304                _ => Err(ArrowError::SchemaError(format!(
305                    "Default {data_type} must be a JSON number"
306                ))),
307            }
308        }
309
310        // Handle JSON nulls per-spec: allowed only for `null` type or unions with null FIRST
311        if default_json.is_null() {
312            return match self.codec() {
313                Codec::Null => Ok(AvroLiteral::Null),
314                Codec::Union(encodings, _, _) if !encodings.is_empty()
315                    && matches!(encodings[0].codec(), Codec::Null) =>
316                    {
317                        Ok(AvroLiteral::Null)
318                    }
319                _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
320                _ => Err(ArrowError::SchemaError(
321                    "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
322                        .to_string(),
323                )),
324            };
325        }
326        let lit = match self.codec() {
327            Codec::Null => {
328                return Err(ArrowError::SchemaError(
329                    "Default for `null` type must be JSON null".to_string(),
330                ));
331            }
332            Codec::Boolean => match default_json {
333                Value::Bool(b) => AvroLiteral::Boolean(*b),
334                _ => {
335                    return Err(ArrowError::SchemaError(
336                        "Boolean default must be a JSON boolean".to_string(),
337                    ));
338                }
339            },
340            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
341                let i = parse_json_i64(default_json, "int")?;
342                if i < i32::MIN as i64 || i > i32::MAX as i64 {
343                    return Err(ArrowError::SchemaError(format!(
344                        "Default int {i} out of i32 range"
345                    )));
346                }
347                AvroLiteral::Int(i as i32)
348            }
349            Codec::Int64
350            | Codec::TimeMicros
351            | Codec::TimestampMillis(_)
352            | Codec::TimestampMicros(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
353            #[cfg(feature = "avro_custom_types")]
354            Codec::DurationNanos
355            | Codec::DurationMicros
356            | Codec::DurationMillis
357            | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
358            Codec::Float32 => {
359                let f = parse_json_f64(default_json, "float")?;
360                if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
361                    return Err(ArrowError::SchemaError(format!(
362                        "Default float {f} out of f32 range or not finite"
363                    )));
364                }
365                AvroLiteral::Float(f as f32)
366            }
367            Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
368            Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
369                AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
370            }
371            Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
372            Codec::Fixed(sz) => {
373                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
374            }
375            Codec::Decimal(_, _, fixed_size) => {
376                AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
377            }
378            Codec::Enum(symbols) => {
379                let s = expect_string(default_json, "enum")?;
380                if symbols.iter().any(|sym| sym == s) {
381                    AvroLiteral::Enum(s.to_string())
382                } else {
383                    return Err(ArrowError::SchemaError(format!(
384                        "Default enum symbol {s:?} not found in reader enum symbols"
385                    )));
386                }
387            }
388            Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
389            Codec::List(item_dt) => match default_json {
390                Value::Array(items) => AvroLiteral::Array(
391                    items
392                        .iter()
393                        .map(|v| item_dt.parse_default_literal(v))
394                        .collect::<Result<_, _>>()?,
395                ),
396                _ => {
397                    return Err(ArrowError::SchemaError(
398                        "Default value must be a JSON array for Avro array type".to_string(),
399                    ));
400                }
401            },
402            Codec::Map(val_dt) => match default_json {
403                Value::Object(map) => {
404                    let mut out = IndexMap::with_capacity(map.len());
405                    for (k, v) in map {
406                        out.insert(k.clone(), val_dt.parse_default_literal(v)?);
407                    }
408                    AvroLiteral::Map(out)
409                }
410                _ => {
411                    return Err(ArrowError::SchemaError(
412                        "Default value must be a JSON object for Avro map type".to_string(),
413                    ));
414                }
415            },
416            Codec::Struct(fields) => match default_json {
417                Value::Object(obj) => {
418                    let mut out: IndexMap<String, AvroLiteral> =
419                        IndexMap::with_capacity(fields.len());
420                    for f in fields.as_ref() {
421                        let name = f.name().to_string();
422                        if let Some(sub) = obj.get(&name) {
423                            out.insert(name, f.data_type().parse_default_literal(sub)?);
424                        } else {
425                            // Cache metadata lookup once
426                            let stored_default =
427                                f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
428                            if stored_default.is_none()
429                                && f.data_type().nullability() == Some(Nullability::default())
430                            {
431                                out.insert(name, AvroLiteral::Null);
432                            } else if let Some(default_json) = stored_default {
433                                let v: Value =
434                                    serde_json::from_str(default_json).map_err(|e| {
435                                        ArrowError::SchemaError(format!(
436                                            "Failed to parse stored subfield default JSON for '{}': {e}",
437                                            f.name(),
438                                        ))
439                                    })?;
440                                out.insert(name, f.data_type().parse_default_literal(&v)?);
441                            } else {
442                                return Err(ArrowError::SchemaError(format!(
443                                    "Record default missing required subfield '{}' with non-nullable type {:?}",
444                                    f.name(),
445                                    f.data_type().codec()
446                                )));
447                            }
448                        }
449                    }
450                    AvroLiteral::Map(out)
451                }
452                _ => {
453                    return Err(ArrowError::SchemaError(
454                        "Default value for record/struct must be a JSON object".to_string(),
455                    ));
456                }
457            },
458            Codec::Union(encodings, _, _) => {
459                let Some(default_encoding) = encodings.first() else {
460                    return Err(ArrowError::SchemaError(
461                        "Union with no branches cannot have a default".to_string(),
462                    ));
463                };
464                default_encoding.parse_default_literal(default_json)?
465            }
466            #[cfg(feature = "avro_custom_types")]
467            Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
468        };
469        Ok(lit)
470    }
471
472    fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
473        let json_text = serde_json::to_string(default_json).map_err(|e| {
474            ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
475        })?;
476        self.metadata
477            .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
478        Ok(())
479    }
480
481    fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
482        let lit = self.parse_default_literal(default_json)?;
483        self.store_default(default_json)?;
484        Ok(lit)
485    }
486}
487
488/// A named [`AvroDataType`]
489#[derive(Debug, Clone, PartialEq)]
490pub(crate) struct AvroField {
491    name: String,
492    data_type: AvroDataType,
493}
494
495impl AvroField {
496    /// Returns the arrow [`Field`]
497    pub(crate) fn field(&self) -> Field {
498        self.data_type.field_with_name(&self.name)
499    }
500
501    /// Returns the [`AvroDataType`]
502    pub(crate) fn data_type(&self) -> &AvroDataType {
503        &self.data_type
504    }
505
506    /// Returns a new [`AvroField`] with Utf8View support enabled
507    ///
508    /// This will convert any Utf8 codecs to Utf8View codecs. This method is used to
509    /// enable potential performance optimizations in string-heavy workloads by using
510    /// Arrow's StringViewArray data structure.
511    ///
512    /// Returns a new `AvroField` with the same structure, but with string types
513    /// converted to use `Utf8View` instead of `Utf8`.
514    pub(crate) fn with_utf8view(&self) -> Self {
515        let mut field = self.clone();
516        if let Codec::Utf8 = field.data_type.codec {
517            field.data_type.codec = Codec::Utf8View;
518        }
519        field
520    }
521
522    /// Returns the name of this Avro field
523    ///
524    /// This is the field name as defined in the Avro schema.
525    /// It's used to identify fields within a record structure.
526    pub(crate) fn name(&self) -> &str {
527        &self.name
528    }
529}
530
531impl<'a> TryFrom<&Schema<'a>> for AvroField {
532    type Error = ArrowError;
533
534    fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
535        match schema {
536            Schema::Complex(ComplexType::Record(r)) => {
537                let mut resolver = Maker::new(false, false);
538                let data_type = resolver.make_data_type(schema, None, None)?;
539                Ok(AvroField {
540                    data_type,
541                    name: r.name.to_string(),
542                })
543            }
544            _ => Err(ArrowError::ParseError(format!(
545                "Expected record got {schema:?}"
546            ))),
547        }
548    }
549}
550
551/// Builder for an [`AvroField`]
552#[derive(Debug)]
553pub(crate) struct AvroFieldBuilder<'a> {
554    writer_schema: &'a Schema<'a>,
555    reader_schema: Option<&'a Schema<'a>>,
556    use_utf8view: bool,
557    strict_mode: bool,
558}
559
560impl<'a> AvroFieldBuilder<'a> {
561    /// Creates a new [`AvroFieldBuilder`] for a given writer schema.
562    pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
563        Self {
564            writer_schema,
565            reader_schema: None,
566            use_utf8view: false,
567            strict_mode: false,
568        }
569    }
570
571    /// Sets the reader schema for schema resolution.
572    ///
573    /// If a reader schema is provided, the builder will produce a resolved `AvroField`
574    /// that can handle differences between the writer's and reader's schemas.
575    #[inline]
576    pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
577        self.reader_schema = Some(reader_schema);
578        self
579    }
580
581    /// Enable or disable Utf8View support
582    pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
583        self.use_utf8view = use_utf8view;
584        self
585    }
586
587    /// Enable or disable strict mode.
588    pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
589        self.strict_mode = strict_mode;
590        self
591    }
592
593    /// Build an [`AvroField`] from the builder
594    pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
595        match self.writer_schema {
596            Schema::Complex(ComplexType::Record(r)) => {
597                let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
598                let data_type =
599                    resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
600                Ok(AvroField {
601                    name: r.name.to_string(),
602                    data_type,
603                })
604            }
605            _ => Err(ArrowError::ParseError(format!(
606                "Expected a Record schema to build an AvroField, but got {:?}",
607                self.writer_schema
608            ))),
609        }
610    }
611}
612
613/// An Avro encoding
614///
615/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
616#[derive(Debug, Clone, PartialEq)]
617pub(crate) enum Codec {
618    /// Represents Avro null type, maps to Arrow's Null data type
619    Null,
620    /// Represents Avro boolean type, maps to Arrow's Boolean data type
621    Boolean,
622    /// Represents Avro int type, maps to Arrow's Int32 data type
623    Int32,
624    /// Represents Avro long type, maps to Arrow's Int64 data type
625    Int64,
626    /// Represents Avro float type, maps to Arrow's Float32 data type
627    Float32,
628    /// Represents Avro double type, maps to Arrow's Float64 data type
629    Float64,
630    /// Represents Avro bytes type, maps to Arrow's Binary data type
631    Binary,
632    /// String data represented as UTF-8 encoded bytes, corresponding to Arrow's StringArray
633    Utf8,
634    /// String data represented as UTF-8 encoded bytes with an optimized view representation,
635    /// corresponding to Arrow's StringViewArray which provides better performance for string operations
636    ///
637    /// The Utf8View option can be enabled via `ReadOptions::use_utf8view`.
638    Utf8View,
639    /// Represents Avro date logical type, maps to Arrow's Date32 data type
640    Date32,
641    /// Represents Avro time-millis logical type, maps to Arrow's Time32(TimeUnit::Millisecond) data type
642    TimeMillis,
643    /// Represents Avro time-micros logical type, maps to Arrow's Time64(TimeUnit::Microsecond) data type
644    TimeMicros,
645    /// Represents Avro timestamp-millis or local-timestamp-millis logical type
646    ///
647    /// Maps to Arrow's Timestamp(TimeUnit::Millisecond) data type
648    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
649    TimestampMillis(bool),
650    /// Represents Avro timestamp-micros or local-timestamp-micros logical type
651    ///
652    /// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
653    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
654    TimestampMicros(bool),
655    /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
656    /// The i32 parameter indicates the fixed binary size
657    Fixed(i32),
658    /// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types
659    ///
660    /// The fields are `(precision, scale, fixed_size)`.
661    /// - `precision` (`usize`): Total number of digits.
662    /// - `scale` (`Option<usize>`): Number of fractional digits.
663    /// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
664    Decimal(usize, Option<usize>, Option<usize>),
665    /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16.
666    Uuid,
667    /// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type.
668    ///
669    /// The enclosed value contains the enum's symbols.
670    Enum(Arc<[String]>),
671    /// Represents Avro array type, maps to Arrow's List data type
672    List(Arc<AvroDataType>),
673    /// Represents Avro record type, maps to Arrow's Struct data type
674    Struct(Arc<[AvroField]>),
675    /// Represents Avro map type, maps to Arrow's Map data type
676    Map(Arc<AvroDataType>),
677    /// Represents Avro duration logical type, maps to Arrow's Interval(IntervalUnit::MonthDayNano) data type
678    Interval,
679    /// Represents Avro union type, maps to Arrow's Union data type
680    Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
681    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Nanosecond)
682    #[cfg(feature = "avro_custom_types")]
683    DurationNanos,
684    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Microsecond)
685    #[cfg(feature = "avro_custom_types")]
686    DurationMicros,
687    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Millisecond)
688    #[cfg(feature = "avro_custom_types")]
689    DurationMillis,
690    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Second)
691    #[cfg(feature = "avro_custom_types")]
692    DurationSeconds,
693    #[cfg(feature = "avro_custom_types")]
694    RunEndEncoded(Arc<AvroDataType>, u8),
695}
696
697impl Codec {
698    fn data_type(&self) -> DataType {
699        match self {
700            Self::Null => DataType::Null,
701            Self::Boolean => DataType::Boolean,
702            Self::Int32 => DataType::Int32,
703            Self::Int64 => DataType::Int64,
704            Self::Float32 => DataType::Float32,
705            Self::Float64 => DataType::Float64,
706            Self::Binary => DataType::Binary,
707            Self::Utf8 => DataType::Utf8,
708            Self::Utf8View => DataType::Utf8View,
709            Self::Date32 => DataType::Date32,
710            Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
711            Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
712            Self::TimestampMillis(is_utc) => {
713                DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
714            }
715            Self::TimestampMicros(is_utc) => {
716                DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
717            }
718            Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
719            Self::Fixed(size) => DataType::FixedSizeBinary(*size),
720            Self::Decimal(precision, scale, _size) => {
721                let p = *precision as u8;
722                let s = scale.unwrap_or(0) as i8;
723                #[cfg(feature = "small_decimals")]
724                {
725                    if *precision <= DECIMAL32_MAX_PRECISION as usize {
726                        DataType::Decimal32(p, s)
727                    } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
728                        DataType::Decimal64(p, s)
729                    } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
730                        DataType::Decimal128(p, s)
731                    } else {
732                        DataType::Decimal256(p, s)
733                    }
734                }
735                #[cfg(not(feature = "small_decimals"))]
736                {
737                    if *precision <= DECIMAL128_MAX_PRECISION as usize {
738                        DataType::Decimal128(p, s)
739                    } else {
740                        DataType::Decimal256(p, s)
741                    }
742                }
743            }
744            Self::Uuid => DataType::FixedSizeBinary(16),
745            Self::Enum(_) => {
746                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
747            }
748            Self::List(f) => {
749                DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
750            }
751            Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
752            Self::Map(value_type) => {
753                let val_field = value_type.field_with_name("value");
754                DataType::Map(
755                    Arc::new(Field::new(
756                        "entries",
757                        DataType::Struct(Fields::from(vec![
758                            Field::new("key", DataType::Utf8, false),
759                            val_field,
760                        ])),
761                        false,
762                    )),
763                    false,
764                )
765            }
766            Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
767            #[cfg(feature = "avro_custom_types")]
768            Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
769            #[cfg(feature = "avro_custom_types")]
770            Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
771            #[cfg(feature = "avro_custom_types")]
772            Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
773            #[cfg(feature = "avro_custom_types")]
774            Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
775            #[cfg(feature = "avro_custom_types")]
776            Self::RunEndEncoded(values, bits) => {
777                let run_ends_dt = match *bits {
778                    16 => DataType::Int16,
779                    32 => DataType::Int32,
780                    64 => DataType::Int64,
781                    _ => unreachable!(),
782                };
783                DataType::RunEndEncoded(
784                    Arc::new(Field::new("run_ends", run_ends_dt, false)),
785                    Arc::new(Field::new("values", values.codec().data_type(), true)),
786                )
787            }
788        }
789    }
790
791    /// Converts a string codec to use Utf8View if requested
792    ///
793    /// The conversion only happens if both:
794    /// 1. `use_utf8view` is true
795    /// 2. The codec is currently `Utf8`
796    pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
797        if use_utf8view && matches!(self, Self::Utf8) {
798            Self::Utf8View
799        } else {
800            self
801        }
802    }
803
804    #[inline]
805    fn union_field_name(&self) -> String {
806        UnionFieldKind::from(self).as_ref().to_owned()
807    }
808}
809
810impl From<PrimitiveType> for Codec {
811    fn from(value: PrimitiveType) -> Self {
812        match value {
813            PrimitiveType::Null => Self::Null,
814            PrimitiveType::Boolean => Self::Boolean,
815            PrimitiveType::Int => Self::Int32,
816            PrimitiveType::Long => Self::Int64,
817            PrimitiveType::Float => Self::Float32,
818            PrimitiveType::Double => Self::Float64,
819            PrimitiveType::Bytes => Self::Binary,
820            PrimitiveType::String => Self::Utf8,
821        }
822    }
823}
824
825/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro
826/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian).
827///
828/// Per Avro spec (Decimal logical type), for a fixed length `n`:
829/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋.
830///
831/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports
832/// Decimal256, which is 32 bytes and has max precision 76).
833const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
834    // Precomputed exact table for n = 1..=32
835    // 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28,
836    // 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52,
837    // 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76
838    const MAX_P: [usize; 32] = [
839        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
840        59, 62, 64, 67, 69, 71, 74, 76,
841    ];
842    match n {
843        1..=32 => Some(MAX_P[n - 1]),
844        _ => None,
845    }
846}
847
848fn parse_decimal_attributes(
849    attributes: &Attributes,
850    fallback_size: Option<usize>,
851    precision_required: bool,
852) -> Result<(usize, usize, Option<usize>), ArrowError> {
853    let precision = attributes
854        .additional
855        .get("precision")
856        .and_then(|v| v.as_u64())
857        .or(if precision_required { None } else { Some(10) })
858        .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
859        as usize;
860    let scale = attributes
861        .additional
862        .get("scale")
863        .and_then(|v| v.as_u64())
864        .unwrap_or(0) as usize;
865    let size = attributes
866        .additional
867        .get("size")
868        .and_then(|v| v.as_u64())
869        .map(|s| s as usize)
870        .or(fallback_size);
871    if precision == 0 {
872        return Err(ArrowError::ParseError(
873            "Decimal requires precision > 0".to_string(),
874        ));
875    }
876    if scale > precision {
877        return Err(ArrowError::ParseError(format!(
878            "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
879        )));
880    }
881    if precision > DECIMAL256_MAX_PRECISION as usize {
882        return Err(ArrowError::ParseError(format!(
883            "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
884            DECIMAL256_MAX_PRECISION
885        )));
886    }
887    if let Some(sz) = size {
888        let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
889            ArrowError::ParseError(format!(
890                "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
891            ))
892        })?;
893        if precision > max_p {
894            return Err(ArrowError::ParseError(format!(
895                "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
896            )));
897        }
898    }
899    Ok((precision, scale, size))
900}
901
902#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
903#[strum(serialize_all = "snake_case")]
904enum UnionFieldKind {
905    Null,
906    Boolean,
907    Int,
908    Long,
909    Float,
910    Double,
911    Bytes,
912    String,
913    Date,
914    TimeMillis,
915    TimeMicros,
916    TimestampMillisUtc,
917    TimestampMillisLocal,
918    TimestampMicrosUtc,
919    TimestampMicrosLocal,
920    Duration,
921    Fixed,
922    Decimal,
923    Enum,
924    Array,
925    Record,
926    Map,
927    Uuid,
928    Union,
929}
930
931impl From<&Codec> for UnionFieldKind {
932    fn from(c: &Codec) -> Self {
933        match c {
934            Codec::Null => Self::Null,
935            Codec::Boolean => Self::Boolean,
936            Codec::Int32 => Self::Int,
937            Codec::Int64 => Self::Long,
938            Codec::Float32 => Self::Float,
939            Codec::Float64 => Self::Double,
940            Codec::Binary => Self::Bytes,
941            Codec::Utf8 | Codec::Utf8View => Self::String,
942            Codec::Date32 => Self::Date,
943            Codec::TimeMillis => Self::TimeMillis,
944            Codec::TimeMicros => Self::TimeMicros,
945            Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
946            Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
947            Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
948            Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
949            Codec::Interval => Self::Duration,
950            Codec::Fixed(_) => Self::Fixed,
951            Codec::Decimal(..) => Self::Decimal,
952            Codec::Enum(_) => Self::Enum,
953            Codec::List(_) => Self::Array,
954            Codec::Struct(_) => Self::Record,
955            Codec::Map(_) => Self::Map,
956            Codec::Uuid => Self::Uuid,
957            Codec::Union(..) => Self::Union,
958            #[cfg(feature = "avro_custom_types")]
959            Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
960            #[cfg(feature = "avro_custom_types")]
961            Codec::DurationNanos
962            | Codec::DurationMicros
963            | Codec::DurationMillis
964            | Codec::DurationSeconds => Self::Duration,
965        }
966    }
967}
968
969fn union_branch_name(dt: &AvroDataType) -> String {
970    if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
971        if name.contains(".") {
972            // Full name
973            return name.to_string();
974        }
975        if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
976            return format!("{ns}.{name}");
977        }
978        return name.to_string();
979    }
980    dt.codec.union_field_name()
981}
982
983fn build_union_fields(encodings: &[AvroDataType]) -> UnionFields {
984    let arrow_fields: Vec<Field> = encodings
985        .iter()
986        .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
987        .collect();
988    let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
989    UnionFields::new(type_ids, arrow_fields)
990}
991
992/// Resolves Avro type names to [`AvroDataType`]
993///
994/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
995#[derive(Debug, Default)]
996struct Resolver<'a> {
997    map: HashMap<(&'a str, &'a str), AvroDataType>,
998}
999
1000impl<'a> Resolver<'a> {
1001    fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1002        self.map.insert((namespace.unwrap_or(""), name), schema);
1003    }
1004
1005    fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1006        let (namespace, name) = name
1007            .rsplit_once('.')
1008            .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1009        self.map
1010            .get(&(namespace, name))
1011            .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1012            .cloned()
1013    }
1014}
1015
1016fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1017    let mut out = HashSet::with_capacity(1 + aliases.len());
1018    let (full, _) = make_full_name(name, ns, None);
1019    out.insert(full);
1020    for a in aliases {
1021        let (fa, _) = make_full_name(a, None, ns);
1022        out.insert(fa);
1023    }
1024    out
1025}
1026
1027fn names_match(
1028    writer_name: &str,
1029    writer_namespace: Option<&str>,
1030    writer_aliases: &[&str],
1031    reader_name: &str,
1032    reader_namespace: Option<&str>,
1033    reader_aliases: &[&str],
1034) -> bool {
1035    let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1036    let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1037    // If the canonical full names match, or any alias matches cross-wise.
1038    !writer_set.is_disjoint(&reader_set)
1039}
1040
1041fn ensure_names_match(
1042    data_type: &str,
1043    writer_name: &str,
1044    writer_namespace: Option<&str>,
1045    writer_aliases: &[&str],
1046    reader_name: &str,
1047    reader_namespace: Option<&str>,
1048    reader_aliases: &[&str],
1049) -> Result<(), ArrowError> {
1050    if names_match(
1051        writer_name,
1052        writer_namespace,
1053        writer_aliases,
1054        reader_name,
1055        reader_namespace,
1056        reader_aliases,
1057    ) {
1058        Ok(())
1059    } else {
1060        Err(ArrowError::ParseError(format!(
1061            "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1062        )))
1063    }
1064}
1065
1066fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1067    match schema {
1068        Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1069        Schema::Type(Type {
1070            r#type: TypeName::Primitive(primitive),
1071            ..
1072        }) => Some(*primitive),
1073        _ => None,
1074    }
1075}
1076
1077fn nullable_union_variants<'x, 'y>(
1078    variant: &'y [Schema<'x>],
1079) -> Option<(Nullability, &'y Schema<'x>)> {
1080    if variant.len() != 2 {
1081        return None;
1082    }
1083    let is_null = |schema: &Schema<'x>| {
1084        matches!(
1085            schema,
1086            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1087        )
1088    };
1089    match (is_null(&variant[0]), is_null(&variant[1])) {
1090        (true, false) => Some((Nullability::NullFirst, &variant[1])),
1091        (false, true) => Some((Nullability::NullSecond, &variant[0])),
1092        _ => None,
1093    }
1094}
1095
1096#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1097enum UnionBranchKey {
1098    Named(String),
1099    Primitive(PrimitiveType),
1100    Array,
1101    Map,
1102}
1103
1104fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1105    let (name, namespace) = match s {
1106        Schema::TypeName(TypeName::Primitive(p))
1107        | Schema::Type(Type {
1108            r#type: TypeName::Primitive(p),
1109            ..
1110        }) => return Some(UnionBranchKey::Primitive(*p)),
1111        Schema::TypeName(TypeName::Ref(name))
1112        | Schema::Type(Type {
1113            r#type: TypeName::Ref(name),
1114            ..
1115        }) => (name, None),
1116        Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1117        Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1118        Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1119        Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1120        Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1121        Schema::Union(_) => return None,
1122    };
1123    let (full, _) = make_full_name(name, namespace, enclosing_ns);
1124    Some(UnionBranchKey::Named(full))
1125}
1126
1127fn union_first_duplicate<'a>(
1128    branches: &'a [Schema<'a>],
1129    enclosing_ns: Option<&'a str>,
1130) -> Option<String> {
1131    let mut seen = HashSet::with_capacity(branches.len());
1132    for schema in branches {
1133        if let Some(key) = branch_key_of(schema, enclosing_ns) {
1134            if !seen.insert(key.clone()) {
1135                let msg = match key {
1136                    UnionBranchKey::Named(full) => format!("named type {full}"),
1137                    UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1138                    UnionBranchKey::Array => "array".to_string(),
1139                    UnionBranchKey::Map => "map".to_string(),
1140                };
1141                return Some(msg);
1142            }
1143        }
1144    }
1145    None
1146}
1147
1148/// Resolves Avro type names to [`AvroDataType`]
1149///
1150/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1151struct Maker<'a> {
1152    resolver: Resolver<'a>,
1153    use_utf8view: bool,
1154    strict_mode: bool,
1155}
1156
1157impl<'a> Maker<'a> {
1158    fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1159        Self {
1160            resolver: Default::default(),
1161            use_utf8view,
1162            strict_mode,
1163        }
1164    }
1165
1166    #[cfg(feature = "avro_custom_types")]
1167    #[inline]
1168    fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1169        if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1170            let mut inner = (*values).clone();
1171            inner.nullability = Some(nb);
1172            dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1173        }
1174    }
1175
1176    fn make_data_type<'s>(
1177        &mut self,
1178        writer_schema: &'s Schema<'a>,
1179        reader_schema: Option<&'s Schema<'a>>,
1180        namespace: Option<&'a str>,
1181    ) -> Result<AvroDataType, ArrowError> {
1182        match reader_schema {
1183            Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1184            None => self.parse_type(writer_schema, namespace),
1185        }
1186    }
1187
1188    /// Parses a [`AvroDataType`] from the provided `Schema` and the given `name` and `namespace`
1189    ///
1190    /// `name`: is the name used to refer to `schema` in its parent
1191    /// `namespace`: an optional qualifier used as part of a type hierarchy
1192    /// If the data type is a string, convert to use Utf8View if requested
1193    ///
1194    /// This function is used during the schema conversion process to determine whether
1195    /// string data should be represented as StringArray (default) or StringViewArray.
1196    ///
1197    /// `use_utf8view`: if true, use Utf8View instead of Utf8 for string types
1198    ///
1199    /// See [`Resolver`] for more information
1200    fn parse_type<'s>(
1201        &mut self,
1202        schema: &'s Schema<'a>,
1203        namespace: Option<&'a str>,
1204    ) -> Result<AvroDataType, ArrowError> {
1205        match schema {
1206            Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1207                Codec::from(*p).with_utf8view(self.use_utf8view),
1208                Default::default(),
1209                None,
1210            )),
1211            Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1212            Schema::Union(f) => {
1213                let null = f
1214                    .iter()
1215                    .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1216                match (f.len() == 2, null) {
1217                    (true, Some(0)) => {
1218                        let mut field = self.parse_type(&f[1], namespace)?;
1219                        field.nullability = Some(Nullability::NullFirst);
1220                        #[cfg(feature = "avro_custom_types")]
1221                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1222                        return Ok(field);
1223                    }
1224                    (true, Some(1)) => {
1225                        if self.strict_mode {
1226                            return Err(ArrowError::SchemaError(
1227                                "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1228                                    .to_string(),
1229                            ));
1230                        }
1231                        let mut field = self.parse_type(&f[0], namespace)?;
1232                        field.nullability = Some(Nullability::NullSecond);
1233                        #[cfg(feature = "avro_custom_types")]
1234                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1235                        return Ok(field);
1236                    }
1237                    _ => {}
1238                }
1239                // Validate: unions may not immediately contain unions
1240                if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1241                    return Err(ArrowError::SchemaError(
1242                        "Avro unions may not immediately contain other unions".to_string(),
1243                    ));
1244                }
1245                // Validate: duplicates (named by full name; non-named by kind)
1246                if let Some(dup) = union_first_duplicate(f, namespace) {
1247                    return Err(ArrowError::SchemaError(format!(
1248                        "Avro union contains duplicate branch type: {dup}"
1249                    )));
1250                }
1251                // Parse all branches
1252                let children: Vec<AvroDataType> = f
1253                    .iter()
1254                    .map(|s| self.parse_type(s, namespace))
1255                    .collect::<Result<_, _>>()?;
1256                // Build Arrow layout once here
1257                let union_fields = build_union_fields(&children);
1258                Ok(AvroDataType::new(
1259                    Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1260                    Default::default(),
1261                    None,
1262                ))
1263            }
1264            Schema::Complex(c) => match c {
1265                ComplexType::Record(r) => {
1266                    let namespace = r.namespace.or(namespace);
1267                    let mut metadata = r.attributes.field_metadata();
1268                    let fields = r
1269                        .fields
1270                        .iter()
1271                        .map(|field| {
1272                            Ok(AvroField {
1273                                name: field.name.to_string(),
1274                                data_type: self.parse_type(&field.r#type, namespace)?,
1275                            })
1276                        })
1277                        .collect::<Result<_, ArrowError>>()?;
1278                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1279                    if let Some(ns) = namespace {
1280                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1281                    }
1282                    let field = AvroDataType {
1283                        nullability: None,
1284                        codec: Codec::Struct(fields),
1285                        metadata,
1286                        resolution: None,
1287                    };
1288                    self.resolver.register(r.name, namespace, field.clone());
1289                    Ok(field)
1290                }
1291                ComplexType::Array(a) => {
1292                    let field = self.parse_type(a.items.as_ref(), namespace)?;
1293                    Ok(AvroDataType {
1294                        nullability: None,
1295                        metadata: a.attributes.field_metadata(),
1296                        codec: Codec::List(Arc::new(field)),
1297                        resolution: None,
1298                    })
1299                }
1300                ComplexType::Fixed(f) => {
1301                    let size = f.size.try_into().map_err(|e| {
1302                        ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1303                    })?;
1304                    let namespace = f.namespace.or(namespace);
1305                    let mut metadata = f.attributes.field_metadata();
1306                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1307                    if let Some(ns) = namespace {
1308                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1309                    }
1310                    let field = match f.attributes.logical_type {
1311                        Some("decimal") => {
1312                            let (precision, scale, _) =
1313                                parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1314                            AvroDataType {
1315                                nullability: None,
1316                                metadata,
1317                                codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1318                                resolution: None,
1319                            }
1320                        }
1321                        Some("duration") => {
1322                            if size != 12 {
1323                                return Err(ArrowError::ParseError(format!(
1324                                    "Invalid fixed size for Duration: {size}, must be 12"
1325                                )));
1326                            };
1327                            AvroDataType {
1328                                nullability: None,
1329                                metadata,
1330                                codec: Codec::Interval,
1331                                resolution: None,
1332                            }
1333                        }
1334                        _ => AvroDataType {
1335                            nullability: None,
1336                            metadata,
1337                            codec: Codec::Fixed(size),
1338                            resolution: None,
1339                        },
1340                    };
1341                    self.resolver.register(f.name, namespace, field.clone());
1342                    Ok(field)
1343                }
1344                ComplexType::Enum(e) => {
1345                    let namespace = e.namespace.or(namespace);
1346                    let symbols = e
1347                        .symbols
1348                        .iter()
1349                        .map(|s| s.to_string())
1350                        .collect::<Arc<[String]>>();
1351                    let mut metadata = e.attributes.field_metadata();
1352                    let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1353                        ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1354                    })?;
1355                    metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1356                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1357                    if let Some(ns) = namespace {
1358                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1359                    }
1360                    let field = AvroDataType {
1361                        nullability: None,
1362                        metadata,
1363                        codec: Codec::Enum(symbols),
1364                        resolution: None,
1365                    };
1366                    self.resolver.register(e.name, namespace, field.clone());
1367                    Ok(field)
1368                }
1369                ComplexType::Map(m) => {
1370                    let val = self.parse_type(&m.values, namespace)?;
1371                    Ok(AvroDataType {
1372                        nullability: None,
1373                        metadata: m.attributes.field_metadata(),
1374                        codec: Codec::Map(Arc::new(val)),
1375                        resolution: None,
1376                    })
1377                }
1378            },
1379            Schema::Type(t) => {
1380                let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1381                // https://avro.apache.org/docs/1.11.1/specification/#logical-types
1382                match (t.attributes.logical_type, &mut field.codec) {
1383                    (Some("decimal"), c @ Codec::Binary) => {
1384                        let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1385                        *c = Codec::Decimal(prec, Some(sc), None);
1386                    }
1387                    (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1388                    (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1389                    (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1390                    (Some("timestamp-millis"), c @ Codec::Int64) => {
1391                        *c = Codec::TimestampMillis(true)
1392                    }
1393                    (Some("timestamp-micros"), c @ Codec::Int64) => {
1394                        *c = Codec::TimestampMicros(true)
1395                    }
1396                    (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1397                        *c = Codec::TimestampMillis(false)
1398                    }
1399                    (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1400                        *c = Codec::TimestampMicros(false)
1401                    }
1402                    (Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
1403                    #[cfg(feature = "avro_custom_types")]
1404                    (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1405                    #[cfg(feature = "avro_custom_types")]
1406                    (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1407                    #[cfg(feature = "avro_custom_types")]
1408                    (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1409                    #[cfg(feature = "avro_custom_types")]
1410                    (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1411                        *c = Codec::DurationSeconds
1412                    }
1413                    #[cfg(feature = "avro_custom_types")]
1414                    (Some("arrow.run-end-encoded"), _) => {
1415                        let bits_u8: u8 = t
1416                            .attributes
1417                            .additional
1418                            .get("arrow.runEndIndexBits")
1419                            .and_then(|v| v.as_u64())
1420                            .and_then(|n| u8::try_from(n).ok())
1421                            .ok_or_else(|| ArrowError::ParseError(
1422                                "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1423                                    .to_string(),
1424                            ))?;
1425                        if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1426                            return Err(ArrowError::ParseError(format!(
1427                                "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1428                            )));
1429                        }
1430                        // Wrap the parsed underlying site as REE
1431                        let values_site = field.clone();
1432                        field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1433                    }
1434                    (Some(logical), _) => {
1435                        // Insert unrecognized logical type into metadata map
1436                        field.metadata.insert("logicalType".into(), logical.into());
1437                    }
1438                    (None, _) => {}
1439                }
1440                if !t.attributes.additional.is_empty() {
1441                    for (k, v) in &t.attributes.additional {
1442                        field.metadata.insert(k.to_string(), v.to_string());
1443                    }
1444                }
1445                Ok(field)
1446            }
1447        }
1448    }
1449
1450    fn resolve_type<'s>(
1451        &mut self,
1452        writer_schema: &'s Schema<'a>,
1453        reader_schema: &'s Schema<'a>,
1454        namespace: Option<&'a str>,
1455    ) -> Result<AvroDataType, ArrowError> {
1456        if let (Some(write_primitive), Some(read_primitive)) =
1457            (primitive_of(writer_schema), primitive_of(reader_schema))
1458        {
1459            return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1460        }
1461        match (writer_schema, reader_schema) {
1462            (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1463                let writer_variants = writer_variants.as_slice();
1464                let reader_variants = reader_variants.as_slice();
1465                match (
1466                    nullable_union_variants(writer_variants),
1467                    nullable_union_variants(reader_variants),
1468                ) {
1469                    (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
1470                        let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?;
1471                        dt.nullability = Some(w_nb);
1472                        #[cfg(feature = "avro_custom_types")]
1473                        Self::propagate_nullability_into_ree(&mut dt, w_nb);
1474                        Ok(dt)
1475                    }
1476                    _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1477                }
1478            }
1479            (Schema::Union(writer_variants), reader_non_union) => {
1480                let writer_to_reader: Vec<Option<(usize, Promotion)>> = writer_variants
1481                    .iter()
1482                    .map(|writer| {
1483                        self.resolve_type(writer, reader_non_union, namespace)
1484                            .ok()
1485                            .map(|tmp| (0usize, Self::coercion_from(&tmp)))
1486                    })
1487                    .collect();
1488                let mut dt = self.parse_type(reader_non_union, namespace)?;
1489                dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1490                    writer_to_reader: Arc::from(writer_to_reader),
1491                    writer_is_union: true,
1492                    reader_is_union: false,
1493                }));
1494                Ok(dt)
1495            }
1496            (writer_non_union, Schema::Union(reader_variants)) => {
1497                let promo = self.find_best_promotion(
1498                    writer_non_union,
1499                    reader_variants.as_slice(),
1500                    namespace,
1501                );
1502                let Some((reader_index, promotion)) = promo else {
1503                    return Err(ArrowError::SchemaError(
1504                        "Writer schema does not match any reader union branch".to_string(),
1505                    ));
1506                };
1507                let mut dt = self.parse_type(reader_schema, namespace)?;
1508                dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1509                    writer_to_reader: Arc::from(vec![Some((reader_index, promotion))]),
1510                    writer_is_union: false,
1511                    reader_is_union: true,
1512                }));
1513                Ok(dt)
1514            }
1515            (
1516                Schema::Complex(ComplexType::Array(writer_array)),
1517                Schema::Complex(ComplexType::Array(reader_array)),
1518            ) => self.resolve_array(writer_array, reader_array, namespace),
1519            (
1520                Schema::Complex(ComplexType::Map(writer_map)),
1521                Schema::Complex(ComplexType::Map(reader_map)),
1522            ) => self.resolve_map(writer_map, reader_map, namespace),
1523            (
1524                Schema::Complex(ComplexType::Fixed(writer_fixed)),
1525                Schema::Complex(ComplexType::Fixed(reader_fixed)),
1526            ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1527            (
1528                Schema::Complex(ComplexType::Record(writer_record)),
1529                Schema::Complex(ComplexType::Record(reader_record)),
1530            ) => self.resolve_records(writer_record, reader_record, namespace),
1531            (
1532                Schema::Complex(ComplexType::Enum(writer_enum)),
1533                Schema::Complex(ComplexType::Enum(reader_enum)),
1534            ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1535            (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1536            (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1537            _ => Err(ArrowError::NotYetImplemented(
1538                "Other resolutions not yet implemented".to_string(),
1539            )),
1540        }
1541    }
1542
1543    #[inline]
1544    fn coercion_from(dt: &AvroDataType) -> Promotion {
1545        match dt.resolution.as_ref() {
1546            Some(ResolutionInfo::Promotion(promotion)) => *promotion,
1547            _ => Promotion::Direct,
1548        }
1549    }
1550
1551    fn find_best_promotion(
1552        &mut self,
1553        writer: &Schema<'a>,
1554        reader_variants: &[Schema<'a>],
1555        namespace: Option<&'a str>,
1556    ) -> Option<(usize, Promotion)> {
1557        let mut first_promotion: Option<(usize, Promotion)> = None;
1558        for (reader_index, reader) in reader_variants.iter().enumerate() {
1559            if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
1560                let promotion = Self::coercion_from(&tmp);
1561                if promotion == Promotion::Direct {
1562                    // An exact match is best, return immediately.
1563                    return Some((reader_index, promotion));
1564                } else if first_promotion.is_none() {
1565                    // Store the first valid promotion but keep searching for a direct match.
1566                    first_promotion = Some((reader_index, promotion));
1567                }
1568            }
1569        }
1570        first_promotion
1571    }
1572
1573    fn resolve_unions<'s>(
1574        &mut self,
1575        writer_variants: &'s [Schema<'a>],
1576        reader_variants: &'s [Schema<'a>],
1577        namespace: Option<&'a str>,
1578    ) -> Result<AvroDataType, ArrowError> {
1579        let reader_encodings: Vec<AvroDataType> = reader_variants
1580            .iter()
1581            .map(|reader_schema| self.parse_type(reader_schema, namespace))
1582            .collect::<Result<_, _>>()?;
1583        let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
1584            Vec::with_capacity(writer_variants.len());
1585        for writer in writer_variants {
1586            writer_to_reader.push(self.find_best_promotion(writer, reader_variants, namespace));
1587        }
1588        let union_fields = build_union_fields(&reader_encodings);
1589        let mut dt = AvroDataType::new(
1590            Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1591            Default::default(),
1592            None,
1593        );
1594        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1595            writer_to_reader: Arc::from(writer_to_reader),
1596            writer_is_union: true,
1597            reader_is_union: true,
1598        }));
1599        Ok(dt)
1600    }
1601
1602    fn resolve_array(
1603        &mut self,
1604        writer_array: &Array<'a>,
1605        reader_array: &Array<'a>,
1606        namespace: Option<&'a str>,
1607    ) -> Result<AvroDataType, ArrowError> {
1608        Ok(AvroDataType {
1609            nullability: None,
1610            metadata: reader_array.attributes.field_metadata(),
1611            codec: Codec::List(Arc::new(self.make_data_type(
1612                writer_array.items.as_ref(),
1613                Some(reader_array.items.as_ref()),
1614                namespace,
1615            )?)),
1616            resolution: None,
1617        })
1618    }
1619
1620    fn resolve_map(
1621        &mut self,
1622        writer_map: &Map<'a>,
1623        reader_map: &Map<'a>,
1624        namespace: Option<&'a str>,
1625    ) -> Result<AvroDataType, ArrowError> {
1626        Ok(AvroDataType {
1627            nullability: None,
1628            metadata: reader_map.attributes.field_metadata(),
1629            codec: Codec::Map(Arc::new(self.make_data_type(
1630                &writer_map.values,
1631                Some(&reader_map.values),
1632                namespace,
1633            )?)),
1634            resolution: None,
1635        })
1636    }
1637
1638    fn resolve_fixed<'s>(
1639        &mut self,
1640        writer_fixed: &Fixed<'a>,
1641        reader_fixed: &Fixed<'a>,
1642        reader_schema: &'s Schema<'a>,
1643        namespace: Option<&'a str>,
1644    ) -> Result<AvroDataType, ArrowError> {
1645        ensure_names_match(
1646            "Fixed",
1647            writer_fixed.name,
1648            writer_fixed.namespace,
1649            &writer_fixed.aliases,
1650            reader_fixed.name,
1651            reader_fixed.namespace,
1652            &reader_fixed.aliases,
1653        )?;
1654        if writer_fixed.size != reader_fixed.size {
1655            return Err(ArrowError::SchemaError(format!(
1656                "Fixed size mismatch for {}: writer={}, reader={}",
1657                reader_fixed.name, writer_fixed.size, reader_fixed.size
1658            )));
1659        }
1660        self.parse_type(reader_schema, namespace)
1661    }
1662
1663    fn resolve_primitives(
1664        &mut self,
1665        write_primitive: PrimitiveType,
1666        read_primitive: PrimitiveType,
1667        reader_schema: &Schema<'a>,
1668    ) -> Result<AvroDataType, ArrowError> {
1669        if write_primitive == read_primitive {
1670            return self.parse_type(reader_schema, None);
1671        }
1672        let promotion = match (write_primitive, read_primitive) {
1673            (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
1674            (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
1675            (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
1676            (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
1677            (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
1678            (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
1679            (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
1680            (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
1681            _ => {
1682                return Err(ArrowError::ParseError(format!(
1683                    "Illegal promotion {write_primitive:?} to {read_primitive:?}"
1684                )));
1685            }
1686        };
1687        let mut datatype = self.parse_type(reader_schema, None)?;
1688        datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
1689        Ok(datatype)
1690    }
1691
1692    // Resolve writer vs. reader enum schemas according to Avro 1.11.1.
1693    //
1694    // # How enums resolve (writer to reader)
1695    // Per “Schema Resolution”:
1696    // * The two schemas must refer to the same (unqualified) enum name (or match
1697    //   via alias rewriting).
1698    // * If the writer’s symbol is not present in the reader’s enum and the reader
1699    //   enum has a `default`, that `default` symbol must be used; otherwise,
1700    //   error.
1701    //   https://avro.apache.org/docs/1.11.1/specification/#schema-resolution
1702    // * Avro “Aliases” are applied from the reader side to rewrite the writer’s
1703    //   names during resolution. For robustness across ecosystems, we also accept
1704    //   symmetry here (see note below).
1705    //   https://avro.apache.org/docs/1.11.1/specification/#aliases
1706    //
1707    // # Rationale for this code path
1708    // 1. Do the work once at schema‑resolution time. Avro serializes an enum as a
1709    //    writer‑side position. Mapping positions on the hot decoder path is expensive
1710    //    if done with string lookups. This method builds a `writer_index to reader_index`
1711    //    vector once, so decoding just does an O(1) table lookup.
1712    // 2. Adopt the reader’s symbol set and order. We return an Arrow
1713    //    `Dictionary(Int32, Utf8)` whose dictionary values are the reader enum
1714    //    symbols. This makes downstream semantics match the reader schema, including
1715    //    Avro’s sort order rule that orders enums by symbol position in the schema.
1716    //    https://avro.apache.org/docs/1.11.1/specification/#sort-order
1717    // 3. Honor Avro’s `default` for enums. Avro 1.9+ allows a type‑level default
1718    //    on the enum. When the writer emits a symbol unknown to the reader, we map it
1719    //    to the reader’s validated `default` symbol if present; otherwise we signal an
1720    //    error at decoding time.
1721    //    https://avro.apache.org/docs/1.11.1/specification/#enums
1722    //
1723    // # Implementation notes
1724    // * We first check that enum names match or are*alias‑equivalent. The Avro
1725    //   spec describes alias rewriting using reader aliases; this implementation
1726    //   additionally treats writer aliases as acceptable for name matching to be
1727    //   resilient with schemas produced by different tooling.
1728    // * We build `EnumMapping`:
1729    //   - `mapping[i]` = reader index of the writer symbol at writer index `i`.
1730    //   - If the writer symbol is absent and the reader has a default, we store the
1731    //     reader index of that default.
1732    //   - Otherwise we store `-1` as a sentinel meaning unresolvable; the decoder
1733    //     must treat encountering such a value as an error, per the spec.
1734    // * We persist the reader symbol list in field metadata under
1735    //   `AVRO_ENUM_SYMBOLS_METADATA_KEY`, so consumers can inspect the dictionary
1736    //   without needing the original Avro schema.
1737    // * The Arrow representation is `Dictionary(Int32, Utf8)`, which aligns with
1738    //   Avro’s integer index encoding for enums.
1739    //
1740    // # Examples
1741    // * Writer `["A","B","C"]`, Reader `["A","B"]`, Reader default `"A"`
1742    //     `mapping = [0, 1, 0]`, `default_index = 0`.
1743    // * Writer `["A","B"]`, Reader `["B","A"]` (no default)
1744    //     `mapping = [1, 0]`, `default_index = -1`.
1745    // * Writer `["A","B","C"]`, Reader `["A","B"]` (no default)
1746    //     `mapping = [0, 1, -1]` (decode must error on `"C"`).
1747    fn resolve_enums(
1748        &mut self,
1749        writer_enum: &Enum<'a>,
1750        reader_enum: &Enum<'a>,
1751        reader_schema: &Schema<'a>,
1752        namespace: Option<&'a str>,
1753    ) -> Result<AvroDataType, ArrowError> {
1754        ensure_names_match(
1755            "Enum",
1756            writer_enum.name,
1757            writer_enum.namespace,
1758            &writer_enum.aliases,
1759            reader_enum.name,
1760            reader_enum.namespace,
1761            &reader_enum.aliases,
1762        )?;
1763        if writer_enum.symbols == reader_enum.symbols {
1764            return self.parse_type(reader_schema, namespace);
1765        }
1766        let reader_index: HashMap<&str, i32> = reader_enum
1767            .symbols
1768            .iter()
1769            .enumerate()
1770            .map(|(index, &symbol)| (symbol, index as i32))
1771            .collect();
1772        let default_index: i32 = match reader_enum.default {
1773            Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
1774                ArrowError::SchemaError(format!(
1775                    "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
1776                    reader_enum.name,
1777                ))
1778            })?,
1779            None => -1,
1780        };
1781        let mapping: Vec<i32> = writer_enum
1782            .symbols
1783            .iter()
1784            .map(|&write_symbol| {
1785                reader_index
1786                    .get(write_symbol)
1787                    .copied()
1788                    .unwrap_or(default_index)
1789            })
1790            .collect();
1791        if self.strict_mode && mapping.iter().any(|&m| m < 0) {
1792            return Err(ArrowError::SchemaError(format!(
1793                "Reader enum '{}' does not cover all writer symbols and no default is provided",
1794                reader_enum.name
1795            )));
1796        }
1797        let mut dt = self.parse_type(reader_schema, namespace)?;
1798        dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
1799            mapping: Arc::from(mapping),
1800            default_index,
1801        }));
1802        let reader_ns = reader_enum.namespace.or(namespace);
1803        self.resolver
1804            .register(reader_enum.name, reader_ns, dt.clone());
1805        Ok(dt)
1806    }
1807
1808    #[inline]
1809    fn build_writer_lookup(
1810        writer_record: &Record<'a>,
1811    ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
1812        let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
1813        for (idx, wf) in writer_record.fields.iter().enumerate() {
1814            // Avro field names are unique; last-in wins are acceptable and match previous behavior.
1815            map.insert(wf.name, idx);
1816        }
1817        // Track ambiguous writer aliases (alias used by multiple writer fields)
1818        let mut ambiguous: HashSet<&str> = HashSet::new();
1819        for (idx, wf) in writer_record.fields.iter().enumerate() {
1820            for &alias in &wf.aliases {
1821                match map.entry(alias) {
1822                    Entry::Occupied(e) if *e.get() != idx => {
1823                        ambiguous.insert(alias);
1824                    }
1825                    Entry::Vacant(e) => {
1826                        e.insert(idx);
1827                    }
1828                    _ => {}
1829                }
1830            }
1831        }
1832        (map, ambiguous)
1833    }
1834
1835    fn resolve_records(
1836        &mut self,
1837        writer_record: &Record<'a>,
1838        reader_record: &Record<'a>,
1839        namespace: Option<&'a str>,
1840    ) -> Result<AvroDataType, ArrowError> {
1841        ensure_names_match(
1842            "Record",
1843            writer_record.name,
1844            writer_record.namespace,
1845            &writer_record.aliases,
1846            reader_record.name,
1847            reader_record.namespace,
1848            &reader_record.aliases,
1849        )?;
1850        let writer_ns = writer_record.namespace.or(namespace);
1851        let reader_ns = reader_record.namespace.or(namespace);
1852        let reader_md = reader_record.attributes.field_metadata();
1853        // Build writer lookup and ambiguous alias set.
1854        let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
1855        let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
1856        let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
1857        // Capture default field indices during the main loop (one pass).
1858        let mut default_fields: Vec<usize> = Vec::new();
1859        for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
1860            // Direct name match, then reader aliases (a writer alias map is pre-populated).
1861            let mut match_idx = writer_lookup.get(r_field.name).copied();
1862            let mut matched_via_alias: Option<&str> = None;
1863            if match_idx.is_none() {
1864                for &alias in &r_field.aliases {
1865                    if let Some(i) = writer_lookup.get(alias).copied() {
1866                        if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
1867                            return Err(ArrowError::SchemaError(format!(
1868                                "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
1869                                r_field.name
1870                            )));
1871                        }
1872                        match_idx = Some(i);
1873                        matched_via_alias = Some(alias);
1874                        break;
1875                    }
1876                }
1877            }
1878            if let Some(wi) = match_idx {
1879                if writer_to_reader[wi].is_none() {
1880                    let w_schema = &writer_record.fields[wi].r#type;
1881                    let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
1882                    writer_to_reader[wi] = Some(reader_idx);
1883                    reader_fields.push(AvroField {
1884                        name: r_field.name.to_owned(),
1885                        data_type: dt,
1886                    });
1887                    continue;
1888                } else if self.strict_mode {
1889                    // Writer field already mapped and strict_mode => error
1890                    let existing_reader = writer_to_reader[wi].unwrap();
1891                    let via = matched_via_alias
1892                        .map(|a| format!("alias '{a}'"))
1893                        .unwrap_or_else(|| "name match".to_string());
1894                    return Err(ArrowError::SchemaError(format!(
1895                        "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
1896                        writer_record.fields[wi].name
1897                    )));
1898                }
1899                // Non-strict and already mapped -> fall through to defaulting logic
1900            }
1901            // No match (or conflicted in non-strict mode): attach default per Avro spec.
1902            let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
1903            if let Some(default_json) = r_field.default.as_ref() {
1904                dt.resolution = Some(ResolutionInfo::DefaultValue(
1905                    dt.parse_and_store_default(default_json)?,
1906                ));
1907                default_fields.push(reader_idx);
1908            } else if dt.nullability() == Some(Nullability::NullFirst) {
1909                // The only valid implicit default for a union is the first branch (null-first case).
1910                dt.resolution = Some(ResolutionInfo::DefaultValue(
1911                    dt.parse_and_store_default(&Value::Null)?,
1912                ));
1913                default_fields.push(reader_idx);
1914            } else {
1915                return Err(ArrowError::SchemaError(format!(
1916                    "Reader field '{}' not present in writer schema must have a default value",
1917                    r_field.name
1918                )));
1919            }
1920            reader_fields.push(AvroField {
1921                name: r_field.name.to_owned(),
1922                data_type: dt,
1923            });
1924        }
1925        // Build skip_fields in writer order; pre-size and push.
1926        let mut skip_fields: Vec<Option<AvroDataType>> =
1927            Vec::with_capacity(writer_record.fields.len());
1928        for (writer_index, writer_field) in writer_record.fields.iter().enumerate() {
1929            if writer_to_reader[writer_index].is_some() {
1930                skip_fields.push(None);
1931            } else {
1932                skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?));
1933            }
1934        }
1935        let resolved = AvroDataType::new_with_resolution(
1936            Codec::Struct(Arc::from(reader_fields)),
1937            reader_md,
1938            None,
1939            Some(ResolutionInfo::Record(ResolvedRecord {
1940                writer_to_reader: Arc::from(writer_to_reader),
1941                default_fields: Arc::from(default_fields),
1942                skip_fields: Arc::from(skip_fields),
1943            })),
1944        );
1945        // Register a resolved record by reader name+namespace for potential named type refs.
1946        self.resolver
1947            .register(reader_record.name, reader_ns, resolved.clone());
1948        Ok(resolved)
1949    }
1950}
1951
1952#[cfg(test)]
1953mod tests {
1954    use super::*;
1955    use crate::schema::{
1956        AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
1957        Fixed, PrimitiveType, Record, Schema, Type, TypeName,
1958    };
1959    use indexmap::IndexMap;
1960    use serde_json::{self, Value};
1961
1962    fn create_schema_with_logical_type(
1963        primitive_type: PrimitiveType,
1964        logical_type: &'static str,
1965    ) -> Schema<'static> {
1966        let attributes = Attributes {
1967            logical_type: Some(logical_type),
1968            additional: Default::default(),
1969        };
1970
1971        Schema::Type(Type {
1972            r#type: TypeName::Primitive(primitive_type),
1973            attributes,
1974        })
1975    }
1976
1977    fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
1978        let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
1979        let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
1980        let mut maker = Maker::new(false, false);
1981        maker
1982            .make_data_type(&writer_schema, Some(&reader_schema), None)
1983            .expect("promotion should resolve")
1984    }
1985
1986    fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
1987        Schema::TypeName(TypeName::Primitive(pt))
1988    }
1989    fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
1990        Schema::Union(branches)
1991    }
1992
1993    #[test]
1994    fn test_date_logical_type() {
1995        let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
1996
1997        let mut maker = Maker::new(false, false);
1998        let result = maker.make_data_type(&schema, None, None).unwrap();
1999
2000        assert!(matches!(result.codec, Codec::Date32));
2001    }
2002
2003    #[test]
2004    fn test_time_millis_logical_type() {
2005        let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2006
2007        let mut maker = Maker::new(false, false);
2008        let result = maker.make_data_type(&schema, None, None).unwrap();
2009
2010        assert!(matches!(result.codec, Codec::TimeMillis));
2011    }
2012
2013    #[test]
2014    fn test_time_micros_logical_type() {
2015        let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2016
2017        let mut maker = Maker::new(false, false);
2018        let result = maker.make_data_type(&schema, None, None).unwrap();
2019
2020        assert!(matches!(result.codec, Codec::TimeMicros));
2021    }
2022
2023    #[test]
2024    fn test_timestamp_millis_logical_type() {
2025        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2026
2027        let mut maker = Maker::new(false, false);
2028        let result = maker.make_data_type(&schema, None, None).unwrap();
2029
2030        assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2031    }
2032
2033    #[test]
2034    fn test_timestamp_micros_logical_type() {
2035        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2036
2037        let mut maker = Maker::new(false, false);
2038        let result = maker.make_data_type(&schema, None, None).unwrap();
2039
2040        assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2041    }
2042
2043    #[test]
2044    fn test_local_timestamp_millis_logical_type() {
2045        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2046
2047        let mut maker = Maker::new(false, false);
2048        let result = maker.make_data_type(&schema, None, None).unwrap();
2049
2050        assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2051    }
2052
2053    #[test]
2054    fn test_local_timestamp_micros_logical_type() {
2055        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2056
2057        let mut maker = Maker::new(false, false);
2058        let result = maker.make_data_type(&schema, None, None).unwrap();
2059
2060        assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2061    }
2062
2063    #[test]
2064    fn test_uuid_type() {
2065        let mut codec = Codec::Fixed(16);
2066        if let c @ Codec::Fixed(16) = &mut codec {
2067            *c = Codec::Uuid;
2068        }
2069        assert!(matches!(codec, Codec::Uuid));
2070    }
2071
2072    #[test]
2073    fn test_duration_logical_type() {
2074        let mut codec = Codec::Fixed(12);
2075
2076        if let c @ Codec::Fixed(12) = &mut codec {
2077            *c = Codec::Interval;
2078        }
2079
2080        assert!(matches!(codec, Codec::Interval));
2081    }
2082
2083    #[test]
2084    fn test_decimal_logical_type_not_implemented() {
2085        let codec = Codec::Fixed(16);
2086
2087        let process_decimal = || -> Result<(), ArrowError> {
2088            if let Codec::Fixed(_) = codec {
2089                return Err(ArrowError::NotYetImplemented(
2090                    "Decimals are not currently supported".to_string(),
2091                ));
2092            }
2093            Ok(())
2094        };
2095
2096        let result = process_decimal();
2097
2098        assert!(result.is_err());
2099        if let Err(ArrowError::NotYetImplemented(msg)) = result {
2100            assert!(msg.contains("Decimals are not currently supported"));
2101        } else {
2102            panic!("Expected NotYetImplemented error");
2103        }
2104    }
2105    #[test]
2106    fn test_unknown_logical_type_added_to_metadata() {
2107        let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2108
2109        let mut maker = Maker::new(false, false);
2110        let result = maker.make_data_type(&schema, None, None).unwrap();
2111
2112        assert_eq!(
2113            result.metadata.get("logicalType"),
2114            Some(&"custom-type".to_string())
2115        );
2116    }
2117
2118    #[test]
2119    fn test_string_with_utf8view_enabled() {
2120        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2121
2122        let mut maker = Maker::new(true, false);
2123        let result = maker.make_data_type(&schema, None, None).unwrap();
2124
2125        assert!(matches!(result.codec, Codec::Utf8View));
2126    }
2127
2128    #[test]
2129    fn test_string_without_utf8view_enabled() {
2130        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2131
2132        let mut maker = Maker::new(false, false);
2133        let result = maker.make_data_type(&schema, None, None).unwrap();
2134
2135        assert!(matches!(result.codec, Codec::Utf8));
2136    }
2137
2138    #[test]
2139    fn test_record_with_string_and_utf8view_enabled() {
2140        let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2141
2142        let avro_field = crate::schema::Field {
2143            name: "string_field",
2144            r#type: field_schema,
2145            default: None,
2146            doc: None,
2147            aliases: vec![],
2148        };
2149
2150        let record = Record {
2151            name: "test_record",
2152            namespace: None,
2153            aliases: vec![],
2154            doc: None,
2155            fields: vec![avro_field],
2156            attributes: Attributes::default(),
2157        };
2158
2159        let schema = Schema::Complex(ComplexType::Record(record));
2160
2161        let mut maker = Maker::new(true, false);
2162        let result = maker.make_data_type(&schema, None, None).unwrap();
2163
2164        if let Codec::Struct(fields) = &result.codec {
2165            let first_field_codec = &fields[0].data_type().codec;
2166            assert!(matches!(first_field_codec, Codec::Utf8View));
2167        } else {
2168            panic!("Expected Struct codec");
2169        }
2170    }
2171
2172    #[test]
2173    fn test_union_with_strict_mode() {
2174        let schema = Schema::Union(vec![
2175            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2176            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2177        ]);
2178
2179        let mut maker = Maker::new(false, true);
2180        let result = maker.make_data_type(&schema, None, None);
2181
2182        assert!(result.is_err());
2183        match result {
2184            Err(ArrowError::SchemaError(msg)) => {
2185                assert!(msg.contains(
2186                    "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2187                ));
2188            }
2189            _ => panic!("Expected SchemaError"),
2190        }
2191    }
2192
2193    #[test]
2194    fn test_resolve_int_to_float_promotion() {
2195        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2196        assert!(matches!(result.codec, Codec::Float32));
2197        assert_eq!(
2198            result.resolution,
2199            Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2200        );
2201    }
2202
2203    #[test]
2204    fn test_resolve_int_to_double_promotion() {
2205        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2206        assert!(matches!(result.codec, Codec::Float64));
2207        assert_eq!(
2208            result.resolution,
2209            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2210        );
2211    }
2212
2213    #[test]
2214    fn test_resolve_long_to_float_promotion() {
2215        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2216        assert!(matches!(result.codec, Codec::Float32));
2217        assert_eq!(
2218            result.resolution,
2219            Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2220        );
2221    }
2222
2223    #[test]
2224    fn test_resolve_long_to_double_promotion() {
2225        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2226        assert!(matches!(result.codec, Codec::Float64));
2227        assert_eq!(
2228            result.resolution,
2229            Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2230        );
2231    }
2232
2233    #[test]
2234    fn test_resolve_float_to_double_promotion() {
2235        let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2236        assert!(matches!(result.codec, Codec::Float64));
2237        assert_eq!(
2238            result.resolution,
2239            Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2240        );
2241    }
2242
2243    #[test]
2244    fn test_resolve_string_to_bytes_promotion() {
2245        let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2246        assert!(matches!(result.codec, Codec::Binary));
2247        assert_eq!(
2248            result.resolution,
2249            Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2250        );
2251    }
2252
2253    #[test]
2254    fn test_resolve_bytes_to_string_promotion() {
2255        let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2256        assert!(matches!(result.codec, Codec::Utf8));
2257        assert_eq!(
2258            result.resolution,
2259            Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2260        );
2261    }
2262
2263    #[test]
2264    fn test_resolve_illegal_promotion_double_to_float_errors() {
2265        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2266        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2267        let mut maker = Maker::new(false, false);
2268        let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2269        assert!(result.is_err());
2270        match result {
2271            Err(ArrowError::ParseError(msg)) => {
2272                assert!(msg.contains("Illegal promotion"));
2273            }
2274            _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2275        }
2276    }
2277
2278    #[test]
2279    fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2280        let writer = Schema::Union(vec![
2281            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2282            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2283        ]);
2284        let reader = Schema::Union(vec![
2285            Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2286            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2287        ]);
2288        let mut maker = Maker::new(false, false);
2289        let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2290        assert!(matches!(result.codec, Codec::Float64));
2291        assert_eq!(
2292            result.resolution,
2293            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2294        );
2295        assert_eq!(result.nullability, Some(Nullability::NullFirst));
2296    }
2297
2298    #[test]
2299    fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2300        let writer = mk_union(vec![
2301            mk_primitive(PrimitiveType::String),
2302            mk_primitive(PrimitiveType::Long),
2303        ]);
2304        let reader = mk_primitive(PrimitiveType::Bytes);
2305        let mut maker = Maker::new(false, false);
2306        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2307        assert!(matches!(dt.codec(), Codec::Binary));
2308        let resolved = match dt.resolution {
2309            Some(ResolutionInfo::Union(u)) => u,
2310            other => panic!("expected union resolution info, got {other:?}"),
2311        };
2312        assert!(resolved.writer_is_union && !resolved.reader_is_union);
2313        assert_eq!(
2314            resolved.writer_to_reader.as_ref(),
2315            &[Some((0, Promotion::StringToBytes)), None]
2316        );
2317    }
2318
2319    #[test]
2320    fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2321        let writer = mk_primitive(PrimitiveType::Long);
2322        let reader = mk_union(vec![
2323            mk_primitive(PrimitiveType::Long),
2324            mk_primitive(PrimitiveType::Double),
2325        ]);
2326        let mut maker = Maker::new(false, false);
2327        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2328        let resolved = match dt.resolution {
2329            Some(ResolutionInfo::Union(u)) => u,
2330            other => panic!("expected union resolution info, got {other:?}"),
2331        };
2332        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2333        assert_eq!(
2334            resolved.writer_to_reader.as_ref(),
2335            &[Some((0, Promotion::Direct))]
2336        );
2337    }
2338
2339    #[test]
2340    fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2341        let writer = mk_primitive(PrimitiveType::Int);
2342        let reader = mk_union(vec![
2343            mk_primitive(PrimitiveType::Null),
2344            mk_primitive(PrimitiveType::Long),
2345            mk_primitive(PrimitiveType::String),
2346        ]);
2347        let mut maker = Maker::new(false, false);
2348        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2349        let resolved = match dt.resolution {
2350            Some(ResolutionInfo::Union(u)) => u,
2351            other => panic!("expected union resolution info, got {other:?}"),
2352        };
2353        assert_eq!(
2354            resolved.writer_to_reader.as_ref(),
2355            &[Some((1, Promotion::IntToLong))]
2356        );
2357    }
2358
2359    #[test]
2360    fn test_resolve_both_nullable_unions_direct_match() {
2361        let writer = mk_union(vec![
2362            mk_primitive(PrimitiveType::Null),
2363            mk_primitive(PrimitiveType::String),
2364        ]);
2365        let reader = mk_union(vec![
2366            mk_primitive(PrimitiveType::String),
2367            mk_primitive(PrimitiveType::Null),
2368        ]);
2369        let mut maker = Maker::new(false, false);
2370        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2371        assert!(matches!(dt.codec(), Codec::Utf8));
2372        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2373        assert!(dt.resolution.is_none());
2374    }
2375
2376    #[test]
2377    fn test_resolve_both_nullable_unions_with_promotion() {
2378        let writer = mk_union(vec![
2379            mk_primitive(PrimitiveType::Null),
2380            mk_primitive(PrimitiveType::Int),
2381        ]);
2382        let reader = mk_union(vec![
2383            mk_primitive(PrimitiveType::Double),
2384            mk_primitive(PrimitiveType::Null),
2385        ]);
2386        let mut maker = Maker::new(false, false);
2387        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2388        assert!(matches!(dt.codec(), Codec::Float64));
2389        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2390        assert_eq!(
2391            dt.resolution,
2392            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2393        );
2394    }
2395
2396    #[test]
2397    fn test_resolve_type_promotion() {
2398        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2399        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2400        let mut maker = Maker::new(false, false);
2401        let result = maker
2402            .make_data_type(&writer_schema, Some(&reader_schema), None)
2403            .unwrap();
2404        assert!(matches!(result.codec, Codec::Int64));
2405        assert_eq!(
2406            result.resolution,
2407            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2408        );
2409    }
2410
2411    #[test]
2412    fn test_nested_record_type_reuse_without_namespace() {
2413        let schema_str = r#"
2414        {
2415          "type": "record",
2416          "name": "Record",
2417          "fields": [
2418            {
2419              "name": "nested",
2420              "type": {
2421                "type": "record",
2422                "name": "Nested",
2423                "fields": [
2424                  { "name": "nested_int", "type": "int" }
2425                ]
2426              }
2427            },
2428            { "name": "nestedRecord", "type": "Nested" },
2429            { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
2430            { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
2431          ]
2432        }
2433        "#;
2434
2435        let schema: Schema = serde_json::from_str(schema_str).unwrap();
2436
2437        let mut maker = Maker::new(false, false);
2438        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2439
2440        if let Codec::Struct(fields) = avro_data_type.codec() {
2441            assert_eq!(fields.len(), 4);
2442
2443            // nested
2444            assert_eq!(fields[0].name(), "nested");
2445            let nested_data_type = fields[0].data_type();
2446            if let Codec::Struct(nested_fields) = nested_data_type.codec() {
2447                assert_eq!(nested_fields.len(), 1);
2448                assert_eq!(nested_fields[0].name(), "nested_int");
2449                assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
2450            } else {
2451                panic!(
2452                    "'nested' field is not a struct but {:?}",
2453                    nested_data_type.codec()
2454                );
2455            }
2456
2457            // nestedRecord
2458            assert_eq!(fields[1].name(), "nestedRecord");
2459            let nested_record_data_type = fields[1].data_type();
2460            assert_eq!(
2461                nested_record_data_type.codec().data_type(),
2462                nested_data_type.codec().data_type()
2463            );
2464
2465            // nestedArray
2466            assert_eq!(fields[2].name(), "nestedArray");
2467            if let Codec::List(item_type) = fields[2].data_type().codec() {
2468                assert_eq!(
2469                    item_type.codec().data_type(),
2470                    nested_data_type.codec().data_type()
2471                );
2472            } else {
2473                panic!("'nestedArray' field is not a list");
2474            }
2475
2476            // nestedMap
2477            assert_eq!(fields[3].name(), "nestedMap");
2478            if let Codec::Map(value_type) = fields[3].data_type().codec() {
2479                assert_eq!(
2480                    value_type.codec().data_type(),
2481                    nested_data_type.codec().data_type()
2482                );
2483            } else {
2484                panic!("'nestedMap' field is not a map");
2485            }
2486        } else {
2487            panic!("Top-level schema is not a struct");
2488        }
2489    }
2490
2491    #[test]
2492    fn test_nested_enum_type_reuse_with_namespace() {
2493        let schema_str = r#"
2494        {
2495          "type": "record",
2496          "name": "Record",
2497          "namespace": "record_ns",
2498          "fields": [
2499            {
2500              "name": "status",
2501              "type": {
2502                "type": "enum",
2503                "name": "Status",
2504                "namespace": "enum_ns",
2505                "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
2506              }
2507            },
2508            { "name": "backupStatus", "type": "enum_ns.Status" },
2509            { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
2510            { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
2511          ]
2512        }
2513        "#;
2514
2515        let schema: Schema = serde_json::from_str(schema_str).unwrap();
2516
2517        let mut maker = Maker::new(false, false);
2518        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2519
2520        if let Codec::Struct(fields) = avro_data_type.codec() {
2521            assert_eq!(fields.len(), 4);
2522
2523            // status
2524            assert_eq!(fields[0].name(), "status");
2525            let status_data_type = fields[0].data_type();
2526            if let Codec::Enum(symbols) = status_data_type.codec() {
2527                assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
2528            } else {
2529                panic!(
2530                    "'status' field is not an enum but {:?}",
2531                    status_data_type.codec()
2532                );
2533            }
2534
2535            // backupStatus
2536            assert_eq!(fields[1].name(), "backupStatus");
2537            let backup_status_data_type = fields[1].data_type();
2538            assert_eq!(
2539                backup_status_data_type.codec().data_type(),
2540                status_data_type.codec().data_type()
2541            );
2542
2543            // statusHistory
2544            assert_eq!(fields[2].name(), "statusHistory");
2545            if let Codec::List(item_type) = fields[2].data_type().codec() {
2546                assert_eq!(
2547                    item_type.codec().data_type(),
2548                    status_data_type.codec().data_type()
2549                );
2550            } else {
2551                panic!("'statusHistory' field is not a list");
2552            }
2553
2554            // statusMap
2555            assert_eq!(fields[3].name(), "statusMap");
2556            if let Codec::Map(value_type) = fields[3].data_type().codec() {
2557                assert_eq!(
2558                    value_type.codec().data_type(),
2559                    status_data_type.codec().data_type()
2560                );
2561            } else {
2562                panic!("'statusMap' field is not a map");
2563            }
2564        } else {
2565            panic!("Top-level schema is not a struct");
2566        }
2567    }
2568
2569    #[test]
2570    fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
2571        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2572        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2573        let mut maker = Maker::new(false, false);
2574        let data_type = maker
2575            .make_data_type(&writer_schema, Some(&reader_schema), None)
2576            .expect("resolution should succeed");
2577        let field = AvroField {
2578            name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
2579            data_type,
2580        };
2581        assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
2582        assert!(matches!(field.data_type().codec(), Codec::Utf8));
2583    }
2584
2585    fn json_string(s: &str) -> Value {
2586        Value::String(s.to_string())
2587    }
2588
2589    fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
2590        let stored = dt
2591            .metadata
2592            .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
2593            .cloned()
2594            .unwrap_or_default();
2595        let expected = serde_json::to_string(default_json).unwrap();
2596        assert_eq!(stored, expected, "stored default metadata should match");
2597    }
2598
2599    #[test]
2600    fn test_validate_and_store_default_null_and_nullability_rules() {
2601        let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
2602        let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
2603        assert_eq!(lit, AvroLiteral::Null);
2604        assert_default_stored(&dt_null, &Value::Null);
2605        let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2606        let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
2607        assert!(
2608            err.to_string()
2609                .contains("JSON null default is only valid for `null` type"),
2610            "unexpected error: {err}"
2611        );
2612        let mut dt_int_nf =
2613            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
2614        let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
2615        assert_eq!(lit2, AvroLiteral::Null);
2616        assert_default_stored(&dt_int_nf, &Value::Null);
2617        let mut dt_int_ns =
2618            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
2619        let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
2620        assert!(
2621            err2.to_string()
2622                .contains("JSON null default is only valid for `null` type"),
2623            "unexpected error: {err2}"
2624        );
2625    }
2626
2627    #[test]
2628    fn test_validate_and_store_default_primitives_and_temporal() {
2629        let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
2630        let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
2631        assert_eq!(lit, AvroLiteral::Boolean(true));
2632        assert_default_stored(&dt_bool, &Value::Bool(true));
2633        let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2634        let lit = dt_i32
2635            .parse_and_store_default(&serde_json::json!(123))
2636            .unwrap();
2637        assert_eq!(lit, AvroLiteral::Int(123));
2638        assert_default_stored(&dt_i32, &serde_json::json!(123));
2639        let err = dt_i32
2640            .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
2641            .unwrap_err();
2642        assert!(format!("{err}").contains("out of i32 range"));
2643        let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2644        let lit = dt_i64
2645            .parse_and_store_default(&serde_json::json!(1234567890))
2646            .unwrap();
2647        assert_eq!(lit, AvroLiteral::Long(1234567890));
2648        assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
2649        let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
2650        let lit = dt_f32
2651            .parse_and_store_default(&serde_json::json!(1.25))
2652            .unwrap();
2653        assert_eq!(lit, AvroLiteral::Float(1.25));
2654        assert_default_stored(&dt_f32, &serde_json::json!(1.25));
2655        let err = dt_f32
2656            .parse_and_store_default(&serde_json::json!(1e39))
2657            .unwrap_err();
2658        assert!(format!("{err}").contains("out of f32 range"));
2659        let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2660        let lit = dt_f64
2661            .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
2662            .unwrap();
2663        assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
2664        assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
2665        let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
2666        let l = dt_str
2667            .parse_and_store_default(&json_string("hello"))
2668            .unwrap();
2669        assert_eq!(l, AvroLiteral::String("hello".into()));
2670        assert_default_stored(&dt_str, &json_string("hello"));
2671        let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
2672        let l = dt_strv
2673            .parse_and_store_default(&json_string("view"))
2674            .unwrap();
2675        assert_eq!(l, AvroLiteral::String("view".into()));
2676        assert_default_stored(&dt_strv, &json_string("view"));
2677        let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
2678        let l = dt_uuid
2679            .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
2680            .unwrap();
2681        assert_eq!(
2682            l,
2683            AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
2684        );
2685        let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
2686        let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
2687        assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
2688        let err = dt_bin
2689            .parse_and_store_default(&json_string("€")) // U+20AC
2690            .unwrap_err();
2691        assert!(format!("{err}").contains("Invalid codepoint"));
2692        let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
2693        let ld = dt_date
2694            .parse_and_store_default(&serde_json::json!(1))
2695            .unwrap();
2696        assert_eq!(ld, AvroLiteral::Int(1));
2697        let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
2698        let lt = dt_tmill
2699            .parse_and_store_default(&serde_json::json!(86_400_000))
2700            .unwrap();
2701        assert_eq!(lt, AvroLiteral::Int(86_400_000));
2702        let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
2703        let ltm = dt_tmicros
2704            .parse_and_store_default(&serde_json::json!(1_000_000))
2705            .unwrap();
2706        assert_eq!(ltm, AvroLiteral::Long(1_000_000));
2707        let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
2708        let l1 = dt_ts_milli
2709            .parse_and_store_default(&serde_json::json!(123))
2710            .unwrap();
2711        assert_eq!(l1, AvroLiteral::Long(123));
2712        let mut dt_ts_micro =
2713            AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
2714        let l2 = dt_ts_micro
2715            .parse_and_store_default(&serde_json::json!(456))
2716            .unwrap();
2717        assert_eq!(l2, AvroLiteral::Long(456));
2718    }
2719
2720    #[test]
2721    fn test_validate_and_store_default_fixed_decimal_interval() {
2722        let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
2723        let l = dt_fixed
2724            .parse_and_store_default(&json_string("WXYZ"))
2725            .unwrap();
2726        assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
2727        let err = dt_fixed
2728            .parse_and_store_default(&json_string("TOO LONG"))
2729            .unwrap_err();
2730        assert!(err.to_string().contains("Default length"));
2731        let mut dt_dec_fixed =
2732            AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
2733        let l = dt_dec_fixed
2734            .parse_and_store_default(&json_string("abc"))
2735            .unwrap();
2736        assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
2737        let err = dt_dec_fixed
2738            .parse_and_store_default(&json_string("toolong"))
2739            .unwrap_err();
2740        assert!(err.to_string().contains("Default length"));
2741        let mut dt_dec_bytes =
2742            AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2743        let l = dt_dec_bytes
2744            .parse_and_store_default(&json_string("freeform"))
2745            .unwrap();
2746        assert_eq!(
2747            l,
2748            AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
2749        );
2750        let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
2751        let l = dt_interval
2752            .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
2753            .unwrap();
2754        assert_eq!(
2755            l,
2756            AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
2757        );
2758        let err = dt_interval
2759            .parse_and_store_default(&json_string("short"))
2760            .unwrap_err();
2761        assert!(err.to_string().contains("Default length"));
2762    }
2763
2764    #[test]
2765    fn test_validate_and_store_default_enum_list_map_struct() {
2766        let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
2767            .into_iter()
2768            .collect();
2769        let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
2770        let l = dt_enum
2771            .parse_and_store_default(&json_string("GREEN"))
2772            .unwrap();
2773        assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
2774        let err = dt_enum
2775            .parse_and_store_default(&json_string("YELLOW"))
2776            .unwrap_err();
2777        assert!(err.to_string().contains("Default enum symbol"));
2778        let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2779        let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
2780        let val = serde_json::json!([1, 2, 3]);
2781        let l = dt_list.parse_and_store_default(&val).unwrap();
2782        assert_eq!(
2783            l,
2784            AvroLiteral::Array(vec![
2785                AvroLiteral::Long(1),
2786                AvroLiteral::Long(2),
2787                AvroLiteral::Long(3)
2788            ])
2789        );
2790        let err = dt_list
2791            .parse_and_store_default(&serde_json::json!({"not":"array"}))
2792            .unwrap_err();
2793        assert!(err.to_string().contains("JSON array"));
2794        let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2795        let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
2796        let mv = serde_json::json!({"x": 1.5, "y": 2.5});
2797        let l = dt_map.parse_and_store_default(&mv).unwrap();
2798        let mut expected = IndexMap::new();
2799        expected.insert("x".into(), AvroLiteral::Double(1.5));
2800        expected.insert("y".into(), AvroLiteral::Double(2.5));
2801        assert_eq!(l, AvroLiteral::Map(expected));
2802        // Not object -> error
2803        let err = dt_map
2804            .parse_and_store_default(&serde_json::json!(123))
2805            .unwrap_err();
2806        assert!(err.to_string().contains("JSON object"));
2807        let mut field_a = AvroField {
2808            name: "a".into(),
2809            data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
2810        };
2811        let field_b = AvroField {
2812            name: "b".into(),
2813            data_type: AvroDataType::new(
2814                Codec::Int64,
2815                HashMap::new(),
2816                Some(Nullability::NullFirst),
2817            ),
2818        };
2819        let mut c_md = HashMap::new();
2820        c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
2821        let field_c = AvroField {
2822            name: "c".into(),
2823            data_type: AvroDataType::new(Codec::Utf8, c_md, None),
2824        };
2825        field_a.data_type.metadata.insert("doc".into(), "na".into());
2826        let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
2827        let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
2828        let default_obj = serde_json::json!({"a": 7});
2829        let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
2830        let mut expected = IndexMap::new();
2831        expected.insert("a".into(), AvroLiteral::Int(7));
2832        expected.insert("b".into(), AvroLiteral::Null);
2833        expected.insert("c".into(), AvroLiteral::String("xyz".into()));
2834        assert_eq!(l, AvroLiteral::Map(expected));
2835        assert_default_stored(&dt_struct, &default_obj);
2836        let req_field = AvroField {
2837            name: "req".into(),
2838            data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
2839        };
2840        let mut dt_bad = AvroDataType::new(
2841            Codec::Struct(Arc::from(vec![req_field])),
2842            HashMap::new(),
2843            None,
2844        );
2845        let err = dt_bad
2846            .parse_and_store_default(&serde_json::json!({}))
2847            .unwrap_err();
2848        assert!(
2849            err.to_string().contains("missing required subfield 'req'"),
2850            "unexpected error: {err}"
2851        );
2852        let err = dt_struct
2853            .parse_and_store_default(&serde_json::json!(10))
2854            .unwrap_err();
2855        err.to_string().contains("must be a JSON object");
2856    }
2857
2858    #[test]
2859    fn test_resolve_array_promotion_and_reader_metadata() {
2860        let mut w_add: HashMap<&str, Value> = HashMap::new();
2861        w_add.insert("who", json_string("writer"));
2862        let mut r_add: HashMap<&str, Value> = HashMap::new();
2863        r_add.insert("who", json_string("reader"));
2864        let writer_schema = Schema::Complex(ComplexType::Array(Array {
2865            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2866            attributes: Attributes {
2867                logical_type: None,
2868                additional: w_add,
2869            },
2870        }));
2871        let reader_schema = Schema::Complex(ComplexType::Array(Array {
2872            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
2873            attributes: Attributes {
2874                logical_type: None,
2875                additional: r_add,
2876            },
2877        }));
2878        let mut maker = Maker::new(false, false);
2879        let dt = maker
2880            .make_data_type(&writer_schema, Some(&reader_schema), None)
2881            .unwrap();
2882        assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
2883        if let Codec::List(inner) = dt.codec() {
2884            assert!(matches!(inner.codec(), Codec::Int64));
2885            assert_eq!(
2886                inner.resolution,
2887                Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2888            );
2889        } else {
2890            panic!("expected list codec");
2891        }
2892    }
2893
2894    #[test]
2895    fn test_resolve_fixed_success_name_and_size_match_and_alias() {
2896        let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
2897            name: "MD5",
2898            namespace: None,
2899            aliases: vec!["Hash16"],
2900            size: 16,
2901            attributes: Attributes::default(),
2902        }));
2903        let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
2904            name: "Hash16",
2905            namespace: None,
2906            aliases: vec![],
2907            size: 16,
2908            attributes: Attributes::default(),
2909        }));
2910        let mut maker = Maker::new(false, false);
2911        let dt = maker
2912            .make_data_type(&writer_schema, Some(&reader_schema), None)
2913            .unwrap();
2914        assert!(matches!(dt.codec(), Codec::Fixed(16)));
2915    }
2916
2917    #[test]
2918    fn test_resolve_records_mapping_default_fields_and_skip_fields() {
2919        let writer = Schema::Complex(ComplexType::Record(Record {
2920            name: "R",
2921            namespace: None,
2922            doc: None,
2923            aliases: vec![],
2924            fields: vec![
2925                crate::schema::Field {
2926                    name: "a",
2927                    doc: None,
2928                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2929                    default: None,
2930                    aliases: vec![],
2931                },
2932                crate::schema::Field {
2933                    name: "skipme",
2934                    doc: None,
2935                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2936                    default: None,
2937                    aliases: vec![],
2938                },
2939                crate::schema::Field {
2940                    name: "b",
2941                    doc: None,
2942                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2943                    default: None,
2944                    aliases: vec![],
2945                },
2946            ],
2947            attributes: Attributes::default(),
2948        }));
2949        let reader = Schema::Complex(ComplexType::Record(Record {
2950            name: "R",
2951            namespace: None,
2952            doc: None,
2953            aliases: vec![],
2954            fields: vec![
2955                crate::schema::Field {
2956                    name: "b",
2957                    doc: None,
2958                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2959                    default: None,
2960                    aliases: vec![],
2961                },
2962                crate::schema::Field {
2963                    name: "a",
2964                    doc: None,
2965                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2966                    default: None,
2967                    aliases: vec![],
2968                },
2969                crate::schema::Field {
2970                    name: "name",
2971                    doc: None,
2972                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2973                    default: Some(json_string("anon")),
2974                    aliases: vec![],
2975                },
2976                crate::schema::Field {
2977                    name: "opt",
2978                    doc: None,
2979                    r#type: Schema::Union(vec![
2980                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2981                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2982                    ]),
2983                    default: None, // should default to null because NullFirst
2984                    aliases: vec![],
2985                },
2986            ],
2987            attributes: Attributes::default(),
2988        }));
2989        let mut maker = Maker::new(false, false);
2990        let dt = maker
2991            .make_data_type(&writer, Some(&reader), None)
2992            .expect("record resolution");
2993        let fields = match dt.codec() {
2994            Codec::Struct(f) => f,
2995            other => panic!("expected struct, got {other:?}"),
2996        };
2997        assert_eq!(fields.len(), 4);
2998        assert_eq!(fields[0].name(), "b");
2999        assert_eq!(fields[1].name(), "a");
3000        assert_eq!(fields[2].name(), "name");
3001        assert_eq!(fields[3].name(), "opt");
3002        assert!(matches!(
3003            fields[1].data_type().resolution,
3004            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3005        ));
3006        let rec = match dt.resolution {
3007            Some(ResolutionInfo::Record(ref r)) => r.clone(),
3008            other => panic!("expected record resolution, got {other:?}"),
3009        };
3010        assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
3011        assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3012        assert!(rec.skip_fields[0].is_none());
3013        assert!(rec.skip_fields[2].is_none());
3014        let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
3015        assert!(matches!(skip1.codec(), Codec::Utf8));
3016        let name_md = &fields[2].data_type().metadata;
3017        assert_eq!(
3018            name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3019            Some(&"\"anon\"".to_string())
3020        );
3021        let opt_md = &fields[3].data_type().metadata;
3022        assert_eq!(
3023            opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3024            Some(&"null".to_string())
3025        );
3026    }
3027
3028    #[test]
3029    fn test_named_type_alias_resolution_record_cross_namespace() {
3030        let writer_record = Record {
3031            name: "PersonV2",
3032            namespace: Some("com.example.v2"),
3033            doc: None,
3034            aliases: vec!["com.example.Person"],
3035            fields: vec![
3036                AvroFieldSchema {
3037                    name: "name",
3038                    doc: None,
3039                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3040                    default: None,
3041                    aliases: vec![],
3042                },
3043                AvroFieldSchema {
3044                    name: "age",
3045                    doc: None,
3046                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3047                    default: None,
3048                    aliases: vec![],
3049                },
3050            ],
3051            attributes: Attributes::default(),
3052        };
3053        let reader_record = Record {
3054            name: "Person",
3055            namespace: Some("com.example"),
3056            doc: None,
3057            aliases: vec![],
3058            fields: writer_record.fields.clone(),
3059            attributes: Attributes::default(),
3060        };
3061        let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3062        let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3063        let mut maker = Maker::new(false, false);
3064        let result = maker
3065            .make_data_type(&writer_schema, Some(&reader_schema), None)
3066            .expect("record alias resolution should succeed");
3067        match result.codec {
3068            Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3069            other => panic!("expected struct, got {other:?}"),
3070        }
3071    }
3072
3073    #[test]
3074    fn test_named_type_alias_resolution_enum_cross_namespace() {
3075        let writer_enum = Enum {
3076            name: "ColorV2",
3077            namespace: Some("org.example.v2"),
3078            doc: None,
3079            aliases: vec!["org.example.Color"],
3080            symbols: vec!["RED", "GREEN", "BLUE"],
3081            default: None,
3082            attributes: Attributes::default(),
3083        };
3084        let reader_enum = Enum {
3085            name: "Color",
3086            namespace: Some("org.example"),
3087            doc: None,
3088            aliases: vec![],
3089            symbols: vec!["RED", "GREEN", "BLUE"],
3090            default: None,
3091            attributes: Attributes::default(),
3092        };
3093        let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3094        let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3095        let mut maker = Maker::new(false, false);
3096        maker
3097            .make_data_type(&writer_schema, Some(&reader_schema), None)
3098            .expect("enum alias resolution should succeed");
3099    }
3100
3101    #[test]
3102    fn test_named_type_alias_resolution_fixed_cross_namespace() {
3103        let writer_fixed = Fixed {
3104            name: "Fx10V2",
3105            namespace: Some("ns.v2"),
3106            aliases: vec!["ns.Fx10"],
3107            size: 10,
3108            attributes: Attributes::default(),
3109        };
3110        let reader_fixed = Fixed {
3111            name: "Fx10",
3112            namespace: Some("ns"),
3113            aliases: vec![],
3114            size: 10,
3115            attributes: Attributes::default(),
3116        };
3117        let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3118        let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3119        let mut maker = Maker::new(false, false);
3120        maker
3121            .make_data_type(&writer_schema, Some(&reader_schema), None)
3122            .expect("fixed alias resolution should succeed");
3123    }
3124}