arrow_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Codec for Mapping Avro and Arrow types.
19
20use crate::schema::{
21    AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22    AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23    PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27    IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40/// Contains information about how to resolve differences between a writer's and a reader's schema.
41#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43    /// Indicates that the writer's type should be promoted to the reader's type.
44    Promotion(Promotion),
45    /// Indicates that a default value should be used for a field.
46    DefaultValue(AvroLiteral),
47    /// Provides mapping information for resolving enums.
48    EnumMapping(EnumMapping),
49    /// Provides resolution information for record fields.
50    Record(ResolvedRecord),
51    /// Provides mapping and shape info for resolving unions.
52    Union(ResolvedUnion),
53}
54
55/// Represents a literal Avro value.
56///
57/// This is used to represent default values in an Avro schema.
58#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60    /// Represents a null value.
61    Null,
62    /// Represents a boolean value.
63    Boolean(bool),
64    /// Represents an integer value.
65    Int(i32),
66    /// Represents a long value.
67    Long(i64),
68    /// Represents a float value.
69    Float(f32),
70    /// Represents a double value.
71    Double(f64),
72    /// Represents a bytes value.
73    Bytes(Vec<u8>),
74    /// Represents a string value.
75    String(String),
76    /// Represents an enum symbol.
77    Enum(String),
78    /// Represents a JSON array default for an Avro array, containing element literals.
79    Array(Vec<AvroLiteral>),
80    /// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals.
81    Map(IndexMap<String, AvroLiteral>),
82}
83
84/// Contains the necessary information to resolve a writer's record against a reader's record schema.
85#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87    /// Maps a writer's field index to the corresponding reader's field index.
88    /// `None` if the writer's field is not present in the reader's schema.
89    pub(crate) writer_to_reader: Arc<[Option<usize>]>,
90    /// A list of indices in the reader's schema for fields that have a default value.
91    pub(crate) default_fields: Arc<[usize]>,
92    /// For fields present in the writer's schema but not the reader's, this stores their data type.
93    /// This is needed to correctly skip over these fields during deserialization.
94    pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
95}
96
97/// Defines the type of promotion to be applied during schema resolution.
98///
99/// Schema resolution may require promoting a writer's data type to a reader's data type.
100/// For example, an `int` can be promoted to a `long`, `float`, or `double`.
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub(crate) enum Promotion {
103    /// Direct read with no data type promotion.
104    Direct,
105    /// Promotes an `int` to a `long`.
106    IntToLong,
107    /// Promotes an `int` to a `float`.
108    IntToFloat,
109    /// Promotes an `int` to a `double`.
110    IntToDouble,
111    /// Promotes a `long` to a `float`.
112    LongToFloat,
113    /// Promotes a `long` to a `double`.
114    LongToDouble,
115    /// Promotes a `float` to a `double`.
116    FloatToDouble,
117    /// Promotes a `string` to `bytes`.
118    StringToBytes,
119    /// Promotes `bytes` to a `string`.
120    BytesToString,
121}
122
123impl Display for Promotion {
124    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
125        match self {
126            Self::Direct => write!(formatter, "Direct"),
127            Self::IntToLong => write!(formatter, "Int->Long"),
128            Self::IntToFloat => write!(formatter, "Int->Float"),
129            Self::IntToDouble => write!(formatter, "Int->Double"),
130            Self::LongToFloat => write!(formatter, "Long->Float"),
131            Self::LongToDouble => write!(formatter, "Long->Double"),
132            Self::FloatToDouble => write!(formatter, "Float->Double"),
133            Self::StringToBytes => write!(formatter, "String->Bytes"),
134            Self::BytesToString => write!(formatter, "Bytes->String"),
135        }
136    }
137}
138
139/// Information required to resolve a writer union against a reader union (or single type).
140#[derive(Debug, Clone, PartialEq)]
141pub(crate) struct ResolvedUnion {
142    /// For each writer branch index, the reader branch index and how to read it.
143    /// `None` means the writer branch doesn't resolve against the reader.
144    pub(crate) writer_to_reader: Arc<[Option<(usize, Promotion)>]>,
145    /// Whether the writer schema at this site is a union
146    pub(crate) writer_is_union: bool,
147    /// Whether the reader schema at this site is a union
148    pub(crate) reader_is_union: bool,
149}
150
151/// Holds the mapping information for resolving Avro enums.
152///
153/// When resolving schemas, the writer's enum symbols must be mapped to the reader's symbols.
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub(crate) struct EnumMapping {
156    /// A mapping from the writer's symbol index to the reader's symbol index.
157    pub(crate) mapping: Arc<[i32]>,
158    /// The index to use for a writer's symbol that is not present in the reader's enum
159    /// and a default value is specified in the reader's schema.
160    pub(crate) default_index: i32,
161}
162
163#[cfg(feature = "canonical_extension_types")]
164fn with_extension_type(codec: &Codec, field: Field) -> Field {
165    match codec {
166        Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
167        _ => field,
168    }
169}
170
171/// An Avro datatype mapped to the arrow data model
172#[derive(Debug, Clone, PartialEq)]
173pub(crate) struct AvroDataType {
174    nullability: Option<Nullability>,
175    metadata: HashMap<String, String>,
176    codec: Codec,
177    pub(crate) resolution: Option<ResolutionInfo>,
178}
179
180impl AvroDataType {
181    /// Create a new [`AvroDataType`] with the given parts.
182    pub(crate) fn new(
183        codec: Codec,
184        metadata: HashMap<String, String>,
185        nullability: Option<Nullability>,
186    ) -> Self {
187        AvroDataType {
188            codec,
189            metadata,
190            nullability,
191            resolution: None,
192        }
193    }
194
195    #[inline]
196    fn new_with_resolution(
197        codec: Codec,
198        metadata: HashMap<String, String>,
199        nullability: Option<Nullability>,
200        resolution: Option<ResolutionInfo>,
201    ) -> Self {
202        Self {
203            codec,
204            metadata,
205            nullability,
206            resolution,
207        }
208    }
209
210    /// Returns an arrow [`Field`] with the given name
211    pub(crate) fn field_with_name(&self, name: &str) -> Field {
212        let mut nullable = self.nullability.is_some();
213        if !nullable {
214            if let Codec::Union(children, _, _) = self.codec() {
215                // If any encoded branch is `null`, mark field as nullable
216                if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
217                    nullable = true;
218                }
219            }
220        }
221        let data_type = self.codec.data_type();
222        let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
223        #[cfg(feature = "canonical_extension_types")]
224        return with_extension_type(&self.codec, field);
225        #[cfg(not(feature = "canonical_extension_types"))]
226        field
227    }
228
229    /// Returns a reference to the codec used by this data type
230    ///
231    /// The codec determines how Avro data is encoded and mapped to Arrow data types.
232    /// This is useful when we need to inspect or use the specific encoding of a field.
233    pub(crate) fn codec(&self) -> &Codec {
234        &self.codec
235    }
236
237    /// Returns the nullability status of this data type
238    ///
239    /// In Avro, nullability is represented through unions with null types.
240    /// The returned value indicates how nulls are encoded in the Avro format:
241    /// - `Some(Nullability::NullFirst)` - Nulls are encoded as the first union variant
242    /// - `Some(Nullability::NullSecond)` - Nulls are encoded as the second union variant
243    /// - `None` - The type is not nullable
244    pub(crate) fn nullability(&self) -> Option<Nullability> {
245        self.nullability
246    }
247
248    #[inline]
249    fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
250        fn expect_string<'v>(
251            default_json: &'v Value,
252            data_type: &str,
253        ) -> Result<&'v str, ArrowError> {
254            match default_json {
255                Value::String(s) => Ok(s.as_str()),
256                _ => Err(ArrowError::SchemaError(format!(
257                    "Default value must be a JSON string for {data_type}"
258                ))),
259            }
260        }
261
262        fn parse_bytes_default(
263            default_json: &Value,
264            expected_len: Option<usize>,
265        ) -> Result<Vec<u8>, ArrowError> {
266            let s = expect_string(default_json, "bytes/fixed logical types")?;
267            let mut out = Vec::with_capacity(s.len());
268            for ch in s.chars() {
269                let cp = ch as u32;
270                if cp > 0xFF {
271                    return Err(ArrowError::SchemaError(format!(
272                        "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
273                    )));
274                }
275                out.push(cp as u8);
276            }
277            if let Some(len) = expected_len {
278                if out.len() != len {
279                    return Err(ArrowError::SchemaError(format!(
280                        "Default length {} does not match expected fixed size {len}",
281                        out.len(),
282                    )));
283                }
284            }
285            Ok(out)
286        }
287
288        fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
289            match default_json {
290                Value::Number(n) => n.as_i64().ok_or_else(|| {
291                    ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
292                }),
293                _ => Err(ArrowError::SchemaError(format!(
294                    "Default {data_type} must be a JSON integer"
295                ))),
296            }
297        }
298
299        fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
300            match default_json {
301                Value::Number(n) => n.as_f64().ok_or_else(|| {
302                    ArrowError::SchemaError(format!("Default {data_type} must be a number"))
303                }),
304                _ => Err(ArrowError::SchemaError(format!(
305                    "Default {data_type} must be a JSON number"
306                ))),
307            }
308        }
309
310        // Handle JSON nulls per-spec: allowed only for `null` type or unions with null FIRST
311        if default_json.is_null() {
312            return match self.codec() {
313                Codec::Null => Ok(AvroLiteral::Null),
314                Codec::Union(encodings, _, _) if !encodings.is_empty()
315                    && matches!(encodings[0].codec(), Codec::Null) =>
316                    {
317                        Ok(AvroLiteral::Null)
318                    }
319                _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
320                _ => Err(ArrowError::SchemaError(
321                    "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
322                        .to_string(),
323                )),
324            };
325        }
326        let lit = match self.codec() {
327            Codec::Null => {
328                return Err(ArrowError::SchemaError(
329                    "Default for `null` type must be JSON null".to_string(),
330                ));
331            }
332            Codec::Boolean => match default_json {
333                Value::Bool(b) => AvroLiteral::Boolean(*b),
334                _ => {
335                    return Err(ArrowError::SchemaError(
336                        "Boolean default must be a JSON boolean".to_string(),
337                    ));
338                }
339            },
340            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
341                let i = parse_json_i64(default_json, "int")?;
342                if i < i32::MIN as i64 || i > i32::MAX as i64 {
343                    return Err(ArrowError::SchemaError(format!(
344                        "Default int {i} out of i32 range"
345                    )));
346                }
347                AvroLiteral::Int(i as i32)
348            }
349            Codec::Int64
350            | Codec::TimeMicros
351            | Codec::TimestampMillis(_)
352            | Codec::TimestampMicros(_)
353            | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
354            #[cfg(feature = "avro_custom_types")]
355            Codec::DurationNanos
356            | Codec::DurationMicros
357            | Codec::DurationMillis
358            | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
359            Codec::Float32 => {
360                let f = parse_json_f64(default_json, "float")?;
361                if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
362                    return Err(ArrowError::SchemaError(format!(
363                        "Default float {f} out of f32 range or not finite"
364                    )));
365                }
366                AvroLiteral::Float(f as f32)
367            }
368            Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
369            Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
370                AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
371            }
372            Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
373            Codec::Fixed(sz) => {
374                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
375            }
376            Codec::Decimal(_, _, fixed_size) => {
377                AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
378            }
379            Codec::Enum(symbols) => {
380                let s = expect_string(default_json, "enum")?;
381                if symbols.iter().any(|sym| sym == s) {
382                    AvroLiteral::Enum(s.to_string())
383                } else {
384                    return Err(ArrowError::SchemaError(format!(
385                        "Default enum symbol {s:?} not found in reader enum symbols"
386                    )));
387                }
388            }
389            Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
390            Codec::List(item_dt) => match default_json {
391                Value::Array(items) => AvroLiteral::Array(
392                    items
393                        .iter()
394                        .map(|v| item_dt.parse_default_literal(v))
395                        .collect::<Result<_, _>>()?,
396                ),
397                _ => {
398                    return Err(ArrowError::SchemaError(
399                        "Default value must be a JSON array for Avro array type".to_string(),
400                    ));
401                }
402            },
403            Codec::Map(val_dt) => match default_json {
404                Value::Object(map) => {
405                    let mut out = IndexMap::with_capacity(map.len());
406                    for (k, v) in map {
407                        out.insert(k.clone(), val_dt.parse_default_literal(v)?);
408                    }
409                    AvroLiteral::Map(out)
410                }
411                _ => {
412                    return Err(ArrowError::SchemaError(
413                        "Default value must be a JSON object for Avro map type".to_string(),
414                    ));
415                }
416            },
417            Codec::Struct(fields) => match default_json {
418                Value::Object(obj) => {
419                    let mut out: IndexMap<String, AvroLiteral> =
420                        IndexMap::with_capacity(fields.len());
421                    for f in fields.as_ref() {
422                        let name = f.name().to_string();
423                        if let Some(sub) = obj.get(&name) {
424                            out.insert(name, f.data_type().parse_default_literal(sub)?);
425                        } else {
426                            // Cache metadata lookup once
427                            let stored_default =
428                                f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
429                            if stored_default.is_none()
430                                && f.data_type().nullability() == Some(Nullability::default())
431                            {
432                                out.insert(name, AvroLiteral::Null);
433                            } else if let Some(default_json) = stored_default {
434                                let v: Value =
435                                    serde_json::from_str(default_json).map_err(|e| {
436                                        ArrowError::SchemaError(format!(
437                                            "Failed to parse stored subfield default JSON for '{}': {e}",
438                                            f.name(),
439                                        ))
440                                    })?;
441                                out.insert(name, f.data_type().parse_default_literal(&v)?);
442                            } else {
443                                return Err(ArrowError::SchemaError(format!(
444                                    "Record default missing required subfield '{}' with non-nullable type {:?}",
445                                    f.name(),
446                                    f.data_type().codec()
447                                )));
448                            }
449                        }
450                    }
451                    AvroLiteral::Map(out)
452                }
453                _ => {
454                    return Err(ArrowError::SchemaError(
455                        "Default value for record/struct must be a JSON object".to_string(),
456                    ));
457                }
458            },
459            Codec::Union(encodings, _, _) => {
460                let Some(default_encoding) = encodings.first() else {
461                    return Err(ArrowError::SchemaError(
462                        "Union with no branches cannot have a default".to_string(),
463                    ));
464                };
465                default_encoding.parse_default_literal(default_json)?
466            }
467            #[cfg(feature = "avro_custom_types")]
468            Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
469        };
470        Ok(lit)
471    }
472
473    fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
474        let json_text = serde_json::to_string(default_json).map_err(|e| {
475            ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
476        })?;
477        self.metadata
478            .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
479        Ok(())
480    }
481
482    fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
483        let lit = self.parse_default_literal(default_json)?;
484        self.store_default(default_json)?;
485        Ok(lit)
486    }
487}
488
489/// A named [`AvroDataType`]
490#[derive(Debug, Clone, PartialEq)]
491pub(crate) struct AvroField {
492    name: String,
493    data_type: AvroDataType,
494}
495
496impl AvroField {
497    /// Returns the arrow [`Field`]
498    pub(crate) fn field(&self) -> Field {
499        self.data_type.field_with_name(&self.name)
500    }
501
502    /// Returns the [`AvroDataType`]
503    pub(crate) fn data_type(&self) -> &AvroDataType {
504        &self.data_type
505    }
506
507    /// Returns a new [`AvroField`] with Utf8View support enabled
508    ///
509    /// This will convert any Utf8 codecs to Utf8View codecs. This method is used to
510    /// enable potential performance optimizations in string-heavy workloads by using
511    /// Arrow's StringViewArray data structure.
512    ///
513    /// Returns a new `AvroField` with the same structure, but with string types
514    /// converted to use `Utf8View` instead of `Utf8`.
515    pub(crate) fn with_utf8view(&self) -> Self {
516        let mut field = self.clone();
517        if let Codec::Utf8 = field.data_type.codec {
518            field.data_type.codec = Codec::Utf8View;
519        }
520        field
521    }
522
523    /// Returns the name of this Avro field
524    ///
525    /// This is the field name as defined in the Avro schema.
526    /// It's used to identify fields within a record structure.
527    pub(crate) fn name(&self) -> &str {
528        &self.name
529    }
530}
531
532impl<'a> TryFrom<&Schema<'a>> for AvroField {
533    type Error = ArrowError;
534
535    fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
536        match schema {
537            Schema::Complex(ComplexType::Record(r)) => {
538                let mut resolver = Maker::new(false, false);
539                let data_type = resolver.make_data_type(schema, None, None)?;
540                Ok(AvroField {
541                    data_type,
542                    name: r.name.to_string(),
543                })
544            }
545            _ => Err(ArrowError::ParseError(format!(
546                "Expected record got {schema:?}"
547            ))),
548        }
549    }
550}
551
552/// Builder for an [`AvroField`]
553#[derive(Debug)]
554pub(crate) struct AvroFieldBuilder<'a> {
555    writer_schema: &'a Schema<'a>,
556    reader_schema: Option<&'a Schema<'a>>,
557    use_utf8view: bool,
558    strict_mode: bool,
559}
560
561impl<'a> AvroFieldBuilder<'a> {
562    /// Creates a new [`AvroFieldBuilder`] for a given writer schema.
563    pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
564        Self {
565            writer_schema,
566            reader_schema: None,
567            use_utf8view: false,
568            strict_mode: false,
569        }
570    }
571
572    /// Sets the reader schema for schema resolution.
573    ///
574    /// If a reader schema is provided, the builder will produce a resolved `AvroField`
575    /// that can handle differences between the writer's and reader's schemas.
576    #[inline]
577    pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
578        self.reader_schema = Some(reader_schema);
579        self
580    }
581
582    /// Enable or disable Utf8View support
583    pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
584        self.use_utf8view = use_utf8view;
585        self
586    }
587
588    /// Enable or disable strict mode.
589    pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
590        self.strict_mode = strict_mode;
591        self
592    }
593
594    /// Build an [`AvroField`] from the builder
595    pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
596        match self.writer_schema {
597            Schema::Complex(ComplexType::Record(r)) => {
598                let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
599                let data_type =
600                    resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
601                Ok(AvroField {
602                    name: r.name.to_string(),
603                    data_type,
604                })
605            }
606            _ => Err(ArrowError::ParseError(format!(
607                "Expected a Record schema to build an AvroField, but got {:?}",
608                self.writer_schema
609            ))),
610        }
611    }
612}
613
614/// An Avro encoding
615///
616/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
617#[derive(Debug, Clone, PartialEq)]
618pub(crate) enum Codec {
619    /// Represents Avro null type, maps to Arrow's Null data type
620    Null,
621    /// Represents Avro boolean type, maps to Arrow's Boolean data type
622    Boolean,
623    /// Represents Avro int type, maps to Arrow's Int32 data type
624    Int32,
625    /// Represents Avro long type, maps to Arrow's Int64 data type
626    Int64,
627    /// Represents Avro float type, maps to Arrow's Float32 data type
628    Float32,
629    /// Represents Avro double type, maps to Arrow's Float64 data type
630    Float64,
631    /// Represents Avro bytes type, maps to Arrow's Binary data type
632    Binary,
633    /// String data represented as UTF-8 encoded bytes, corresponding to Arrow's StringArray
634    Utf8,
635    /// String data represented as UTF-8 encoded bytes with an optimized view representation,
636    /// corresponding to Arrow's StringViewArray which provides better performance for string operations
637    ///
638    /// The Utf8View option can be enabled via `ReadOptions::use_utf8view`.
639    Utf8View,
640    /// Represents Avro date logical type, maps to Arrow's Date32 data type
641    Date32,
642    /// Represents Avro time-millis logical type, maps to Arrow's Time32(TimeUnit::Millisecond) data type
643    TimeMillis,
644    /// Represents Avro time-micros logical type, maps to Arrow's Time64(TimeUnit::Microsecond) data type
645    TimeMicros,
646    /// Represents Avro timestamp-millis or local-timestamp-millis logical type
647    ///
648    /// Maps to Arrow's Timestamp(TimeUnit::Millisecond) data type
649    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
650    TimestampMillis(bool),
651    /// Represents Avro timestamp-micros or local-timestamp-micros logical type
652    ///
653    /// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
654    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
655    TimestampMicros(bool),
656    /// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
657    ///
658    /// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
659    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
660    TimestampNanos(bool),
661    /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
662    /// The i32 parameter indicates the fixed binary size
663    Fixed(i32),
664    /// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types
665    ///
666    /// The fields are `(precision, scale, fixed_size)`.
667    /// - `precision` (`usize`): Total number of digits.
668    /// - `scale` (`Option<usize>`): Number of fractional digits.
669    /// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
670    Decimal(usize, Option<usize>, Option<usize>),
671    /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16.
672    Uuid,
673    /// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type.
674    ///
675    /// The enclosed value contains the enum's symbols.
676    Enum(Arc<[String]>),
677    /// Represents Avro array type, maps to Arrow's List data type
678    List(Arc<AvroDataType>),
679    /// Represents Avro record type, maps to Arrow's Struct data type
680    Struct(Arc<[AvroField]>),
681    /// Represents Avro map type, maps to Arrow's Map data type
682    Map(Arc<AvroDataType>),
683    /// Represents Avro duration logical type, maps to Arrow's Interval(IntervalUnit::MonthDayNano) data type
684    Interval,
685    /// Represents Avro union type, maps to Arrow's Union data type
686    Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
687    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Nanosecond)
688    #[cfg(feature = "avro_custom_types")]
689    DurationNanos,
690    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Microsecond)
691    #[cfg(feature = "avro_custom_types")]
692    DurationMicros,
693    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Millisecond)
694    #[cfg(feature = "avro_custom_types")]
695    DurationMillis,
696    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Second)
697    #[cfg(feature = "avro_custom_types")]
698    DurationSeconds,
699    #[cfg(feature = "avro_custom_types")]
700    RunEndEncoded(Arc<AvroDataType>, u8),
701}
702
703impl Codec {
704    fn data_type(&self) -> DataType {
705        match self {
706            Self::Null => DataType::Null,
707            Self::Boolean => DataType::Boolean,
708            Self::Int32 => DataType::Int32,
709            Self::Int64 => DataType::Int64,
710            Self::Float32 => DataType::Float32,
711            Self::Float64 => DataType::Float64,
712            Self::Binary => DataType::Binary,
713            Self::Utf8 => DataType::Utf8,
714            Self::Utf8View => DataType::Utf8View,
715            Self::Date32 => DataType::Date32,
716            Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
717            Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
718            Self::TimestampMillis(is_utc) => {
719                DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
720            }
721            Self::TimestampMicros(is_utc) => {
722                DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
723            }
724            Self::TimestampNanos(is_utc) => {
725                DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
726            }
727            Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
728            Self::Fixed(size) => DataType::FixedSizeBinary(*size),
729            Self::Decimal(precision, scale, _size) => {
730                let p = *precision as u8;
731                let s = scale.unwrap_or(0) as i8;
732                #[cfg(feature = "small_decimals")]
733                {
734                    if *precision <= DECIMAL32_MAX_PRECISION as usize {
735                        DataType::Decimal32(p, s)
736                    } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
737                        DataType::Decimal64(p, s)
738                    } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
739                        DataType::Decimal128(p, s)
740                    } else {
741                        DataType::Decimal256(p, s)
742                    }
743                }
744                #[cfg(not(feature = "small_decimals"))]
745                {
746                    if *precision <= DECIMAL128_MAX_PRECISION as usize {
747                        DataType::Decimal128(p, s)
748                    } else {
749                        DataType::Decimal256(p, s)
750                    }
751                }
752            }
753            Self::Uuid => DataType::FixedSizeBinary(16),
754            Self::Enum(_) => {
755                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
756            }
757            Self::List(f) => {
758                DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
759            }
760            Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
761            Self::Map(value_type) => {
762                let val_field = value_type.field_with_name("value");
763                DataType::Map(
764                    Arc::new(Field::new(
765                        "entries",
766                        DataType::Struct(Fields::from(vec![
767                            Field::new("key", DataType::Utf8, false),
768                            val_field,
769                        ])),
770                        false,
771                    )),
772                    false,
773                )
774            }
775            Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
776            #[cfg(feature = "avro_custom_types")]
777            Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
778            #[cfg(feature = "avro_custom_types")]
779            Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
780            #[cfg(feature = "avro_custom_types")]
781            Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
782            #[cfg(feature = "avro_custom_types")]
783            Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
784            #[cfg(feature = "avro_custom_types")]
785            Self::RunEndEncoded(values, bits) => {
786                let run_ends_dt = match *bits {
787                    16 => DataType::Int16,
788                    32 => DataType::Int32,
789                    64 => DataType::Int64,
790                    _ => unreachable!(),
791                };
792                DataType::RunEndEncoded(
793                    Arc::new(Field::new("run_ends", run_ends_dt, false)),
794                    Arc::new(Field::new("values", values.codec().data_type(), true)),
795                )
796            }
797        }
798    }
799
800    /// Converts a string codec to use Utf8View if requested
801    ///
802    /// The conversion only happens if both:
803    /// 1. `use_utf8view` is true
804    /// 2. The codec is currently `Utf8`
805    pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
806        if use_utf8view && matches!(self, Self::Utf8) {
807            Self::Utf8View
808        } else {
809            self
810        }
811    }
812
813    #[inline]
814    fn union_field_name(&self) -> String {
815        UnionFieldKind::from(self).as_ref().to_owned()
816    }
817}
818
819impl From<PrimitiveType> for Codec {
820    fn from(value: PrimitiveType) -> Self {
821        match value {
822            PrimitiveType::Null => Self::Null,
823            PrimitiveType::Boolean => Self::Boolean,
824            PrimitiveType::Int => Self::Int32,
825            PrimitiveType::Long => Self::Int64,
826            PrimitiveType::Float => Self::Float32,
827            PrimitiveType::Double => Self::Float64,
828            PrimitiveType::Bytes => Self::Binary,
829            PrimitiveType::String => Self::Utf8,
830        }
831    }
832}
833
834/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro
835/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian).
836///
837/// Per Avro spec (Decimal logical type), for a fixed length `n`:
838/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋.
839///
840/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports
841/// Decimal256, which is 32 bytes and has max precision 76).
842const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
843    // Precomputed exact table for n = 1..=32
844    // 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28,
845    // 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52,
846    // 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76
847    const MAX_P: [usize; 32] = [
848        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
849        59, 62, 64, 67, 69, 71, 74, 76,
850    ];
851    match n {
852        1..=32 => Some(MAX_P[n - 1]),
853        _ => None,
854    }
855}
856
857fn parse_decimal_attributes(
858    attributes: &Attributes,
859    fallback_size: Option<usize>,
860    precision_required: bool,
861) -> Result<(usize, usize, Option<usize>), ArrowError> {
862    let precision = attributes
863        .additional
864        .get("precision")
865        .and_then(|v| v.as_u64())
866        .or(if precision_required { None } else { Some(10) })
867        .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
868        as usize;
869    let scale = attributes
870        .additional
871        .get("scale")
872        .and_then(|v| v.as_u64())
873        .unwrap_or(0) as usize;
874    let size = attributes
875        .additional
876        .get("size")
877        .and_then(|v| v.as_u64())
878        .map(|s| s as usize)
879        .or(fallback_size);
880    if precision == 0 {
881        return Err(ArrowError::ParseError(
882            "Decimal requires precision > 0".to_string(),
883        ));
884    }
885    if scale > precision {
886        return Err(ArrowError::ParseError(format!(
887            "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
888        )));
889    }
890    if precision > DECIMAL256_MAX_PRECISION as usize {
891        return Err(ArrowError::ParseError(format!(
892            "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
893            DECIMAL256_MAX_PRECISION
894        )));
895    }
896    if let Some(sz) = size {
897        let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
898            ArrowError::ParseError(format!(
899                "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
900            ))
901        })?;
902        if precision > max_p {
903            return Err(ArrowError::ParseError(format!(
904                "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
905            )));
906        }
907    }
908    Ok((precision, scale, size))
909}
910
911#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
912#[strum(serialize_all = "snake_case")]
913enum UnionFieldKind {
914    Null,
915    Boolean,
916    Int,
917    Long,
918    Float,
919    Double,
920    Bytes,
921    String,
922    Date,
923    TimeMillis,
924    TimeMicros,
925    TimestampMillisUtc,
926    TimestampMillisLocal,
927    TimestampMicrosUtc,
928    TimestampMicrosLocal,
929    TimestampNanosUtc,
930    TimestampNanosLocal,
931    Duration,
932    Fixed,
933    Decimal,
934    Enum,
935    Array,
936    Record,
937    Map,
938    Uuid,
939    Union,
940}
941
942impl From<&Codec> for UnionFieldKind {
943    fn from(c: &Codec) -> Self {
944        match c {
945            Codec::Null => Self::Null,
946            Codec::Boolean => Self::Boolean,
947            Codec::Int32 => Self::Int,
948            Codec::Int64 => Self::Long,
949            Codec::Float32 => Self::Float,
950            Codec::Float64 => Self::Double,
951            Codec::Binary => Self::Bytes,
952            Codec::Utf8 | Codec::Utf8View => Self::String,
953            Codec::Date32 => Self::Date,
954            Codec::TimeMillis => Self::TimeMillis,
955            Codec::TimeMicros => Self::TimeMicros,
956            Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
957            Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
958            Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
959            Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
960            Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
961            Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
962            Codec::Interval => Self::Duration,
963            Codec::Fixed(_) => Self::Fixed,
964            Codec::Decimal(..) => Self::Decimal,
965            Codec::Enum(_) => Self::Enum,
966            Codec::List(_) => Self::Array,
967            Codec::Struct(_) => Self::Record,
968            Codec::Map(_) => Self::Map,
969            Codec::Uuid => Self::Uuid,
970            Codec::Union(..) => Self::Union,
971            #[cfg(feature = "avro_custom_types")]
972            Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
973            #[cfg(feature = "avro_custom_types")]
974            Codec::DurationNanos
975            | Codec::DurationMicros
976            | Codec::DurationMillis
977            | Codec::DurationSeconds => Self::Duration,
978        }
979    }
980}
981
982fn union_branch_name(dt: &AvroDataType) -> String {
983    if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
984        if name.contains(".") {
985            // Full name
986            return name.to_string();
987        }
988        if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
989            return format!("{ns}.{name}");
990        }
991        return name.to_string();
992    }
993    dt.codec.union_field_name()
994}
995
996fn build_union_fields(encodings: &[AvroDataType]) -> UnionFields {
997    let arrow_fields: Vec<Field> = encodings
998        .iter()
999        .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1000        .collect();
1001    let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1002    UnionFields::new(type_ids, arrow_fields)
1003}
1004
1005/// Resolves Avro type names to [`AvroDataType`]
1006///
1007/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1008#[derive(Debug, Default)]
1009struct Resolver<'a> {
1010    map: HashMap<(&'a str, &'a str), AvroDataType>,
1011}
1012
1013impl<'a> Resolver<'a> {
1014    fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1015        self.map.insert((namespace.unwrap_or(""), name), schema);
1016    }
1017
1018    fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1019        let (namespace, name) = name
1020            .rsplit_once('.')
1021            .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1022        self.map
1023            .get(&(namespace, name))
1024            .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1025            .cloned()
1026    }
1027}
1028
1029fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1030    let mut out = HashSet::with_capacity(1 + aliases.len());
1031    let (full, _) = make_full_name(name, ns, None);
1032    out.insert(full);
1033    for a in aliases {
1034        let (fa, _) = make_full_name(a, None, ns);
1035        out.insert(fa);
1036    }
1037    out
1038}
1039
1040fn names_match(
1041    writer_name: &str,
1042    writer_namespace: Option<&str>,
1043    writer_aliases: &[&str],
1044    reader_name: &str,
1045    reader_namespace: Option<&str>,
1046    reader_aliases: &[&str],
1047) -> bool {
1048    let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1049    let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1050    // If the canonical full names match, or any alias matches cross-wise.
1051    !writer_set.is_disjoint(&reader_set)
1052}
1053
1054fn ensure_names_match(
1055    data_type: &str,
1056    writer_name: &str,
1057    writer_namespace: Option<&str>,
1058    writer_aliases: &[&str],
1059    reader_name: &str,
1060    reader_namespace: Option<&str>,
1061    reader_aliases: &[&str],
1062) -> Result<(), ArrowError> {
1063    if names_match(
1064        writer_name,
1065        writer_namespace,
1066        writer_aliases,
1067        reader_name,
1068        reader_namespace,
1069        reader_aliases,
1070    ) {
1071        Ok(())
1072    } else {
1073        Err(ArrowError::ParseError(format!(
1074            "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1075        )))
1076    }
1077}
1078
1079fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1080    match schema {
1081        Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1082        Schema::Type(Type {
1083            r#type: TypeName::Primitive(primitive),
1084            ..
1085        }) => Some(*primitive),
1086        _ => None,
1087    }
1088}
1089
1090fn nullable_union_variants<'x, 'y>(
1091    variant: &'y [Schema<'x>],
1092) -> Option<(Nullability, &'y Schema<'x>)> {
1093    if variant.len() != 2 {
1094        return None;
1095    }
1096    let is_null = |schema: &Schema<'x>| {
1097        matches!(
1098            schema,
1099            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1100        )
1101    };
1102    match (is_null(&variant[0]), is_null(&variant[1])) {
1103        (true, false) => Some((Nullability::NullFirst, &variant[1])),
1104        (false, true) => Some((Nullability::NullSecond, &variant[0])),
1105        _ => None,
1106    }
1107}
1108
1109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1110enum UnionBranchKey {
1111    Named(String),
1112    Primitive(PrimitiveType),
1113    Array,
1114    Map,
1115}
1116
1117fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1118    let (name, namespace) = match s {
1119        Schema::TypeName(TypeName::Primitive(p))
1120        | Schema::Type(Type {
1121            r#type: TypeName::Primitive(p),
1122            ..
1123        }) => return Some(UnionBranchKey::Primitive(*p)),
1124        Schema::TypeName(TypeName::Ref(name))
1125        | Schema::Type(Type {
1126            r#type: TypeName::Ref(name),
1127            ..
1128        }) => (name, None),
1129        Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1130        Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1131        Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1132        Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1133        Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1134        Schema::Union(_) => return None,
1135    };
1136    let (full, _) = make_full_name(name, namespace, enclosing_ns);
1137    Some(UnionBranchKey::Named(full))
1138}
1139
1140fn union_first_duplicate<'a>(
1141    branches: &'a [Schema<'a>],
1142    enclosing_ns: Option<&'a str>,
1143) -> Option<String> {
1144    let mut seen = HashSet::with_capacity(branches.len());
1145    for schema in branches {
1146        if let Some(key) = branch_key_of(schema, enclosing_ns) {
1147            if !seen.insert(key.clone()) {
1148                let msg = match key {
1149                    UnionBranchKey::Named(full) => format!("named type {full}"),
1150                    UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1151                    UnionBranchKey::Array => "array".to_string(),
1152                    UnionBranchKey::Map => "map".to_string(),
1153                };
1154                return Some(msg);
1155            }
1156        }
1157    }
1158    None
1159}
1160
1161/// Resolves Avro type names to [`AvroDataType`]
1162///
1163/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1164struct Maker<'a> {
1165    resolver: Resolver<'a>,
1166    use_utf8view: bool,
1167    strict_mode: bool,
1168}
1169
1170impl<'a> Maker<'a> {
1171    fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1172        Self {
1173            resolver: Default::default(),
1174            use_utf8view,
1175            strict_mode,
1176        }
1177    }
1178
1179    #[cfg(feature = "avro_custom_types")]
1180    #[inline]
1181    fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1182        if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1183            let mut inner = (*values).clone();
1184            inner.nullability = Some(nb);
1185            dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1186        }
1187    }
1188
1189    fn make_data_type<'s>(
1190        &mut self,
1191        writer_schema: &'s Schema<'a>,
1192        reader_schema: Option<&'s Schema<'a>>,
1193        namespace: Option<&'a str>,
1194    ) -> Result<AvroDataType, ArrowError> {
1195        match reader_schema {
1196            Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1197            None => self.parse_type(writer_schema, namespace),
1198        }
1199    }
1200
1201    /// Parses a [`AvroDataType`] from the provided `Schema` and the given `name` and `namespace`
1202    ///
1203    /// `name`: is the name used to refer to `schema` in its parent
1204    /// `namespace`: an optional qualifier used as part of a type hierarchy
1205    /// If the data type is a string, convert to use Utf8View if requested
1206    ///
1207    /// This function is used during the schema conversion process to determine whether
1208    /// string data should be represented as StringArray (default) or StringViewArray.
1209    ///
1210    /// `use_utf8view`: if true, use Utf8View instead of Utf8 for string types
1211    ///
1212    /// See [`Resolver`] for more information
1213    fn parse_type<'s>(
1214        &mut self,
1215        schema: &'s Schema<'a>,
1216        namespace: Option<&'a str>,
1217    ) -> Result<AvroDataType, ArrowError> {
1218        match schema {
1219            Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1220                Codec::from(*p).with_utf8view(self.use_utf8view),
1221                Default::default(),
1222                None,
1223            )),
1224            Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1225            Schema::Union(f) => {
1226                let null = f
1227                    .iter()
1228                    .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1229                match (f.len() == 2, null) {
1230                    (true, Some(0)) => {
1231                        let mut field = self.parse_type(&f[1], namespace)?;
1232                        field.nullability = Some(Nullability::NullFirst);
1233                        #[cfg(feature = "avro_custom_types")]
1234                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1235                        return Ok(field);
1236                    }
1237                    (true, Some(1)) => {
1238                        if self.strict_mode {
1239                            return Err(ArrowError::SchemaError(
1240                                "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1241                                    .to_string(),
1242                            ));
1243                        }
1244                        let mut field = self.parse_type(&f[0], namespace)?;
1245                        field.nullability = Some(Nullability::NullSecond);
1246                        #[cfg(feature = "avro_custom_types")]
1247                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1248                        return Ok(field);
1249                    }
1250                    _ => {}
1251                }
1252                // Validate: unions may not immediately contain unions
1253                if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1254                    return Err(ArrowError::SchemaError(
1255                        "Avro unions may not immediately contain other unions".to_string(),
1256                    ));
1257                }
1258                // Validate: duplicates (named by full name; non-named by kind)
1259                if let Some(dup) = union_first_duplicate(f, namespace) {
1260                    return Err(ArrowError::SchemaError(format!(
1261                        "Avro union contains duplicate branch type: {dup}"
1262                    )));
1263                }
1264                // Parse all branches
1265                let children: Vec<AvroDataType> = f
1266                    .iter()
1267                    .map(|s| self.parse_type(s, namespace))
1268                    .collect::<Result<_, _>>()?;
1269                // Build Arrow layout once here
1270                let union_fields = build_union_fields(&children);
1271                Ok(AvroDataType::new(
1272                    Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1273                    Default::default(),
1274                    None,
1275                ))
1276            }
1277            Schema::Complex(c) => match c {
1278                ComplexType::Record(r) => {
1279                    let namespace = r.namespace.or(namespace);
1280                    let mut metadata = r.attributes.field_metadata();
1281                    let fields = r
1282                        .fields
1283                        .iter()
1284                        .map(|field| {
1285                            Ok(AvroField {
1286                                name: field.name.to_string(),
1287                                data_type: self.parse_type(&field.r#type, namespace)?,
1288                            })
1289                        })
1290                        .collect::<Result<_, ArrowError>>()?;
1291                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1292                    if let Some(ns) = namespace {
1293                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1294                    }
1295                    let field = AvroDataType {
1296                        nullability: None,
1297                        codec: Codec::Struct(fields),
1298                        metadata,
1299                        resolution: None,
1300                    };
1301                    self.resolver.register(r.name, namespace, field.clone());
1302                    Ok(field)
1303                }
1304                ComplexType::Array(a) => {
1305                    let field = self.parse_type(a.items.as_ref(), namespace)?;
1306                    Ok(AvroDataType {
1307                        nullability: None,
1308                        metadata: a.attributes.field_metadata(),
1309                        codec: Codec::List(Arc::new(field)),
1310                        resolution: None,
1311                    })
1312                }
1313                ComplexType::Fixed(f) => {
1314                    let size = f.size.try_into().map_err(|e| {
1315                        ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1316                    })?;
1317                    let namespace = f.namespace.or(namespace);
1318                    let mut metadata = f.attributes.field_metadata();
1319                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1320                    if let Some(ns) = namespace {
1321                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1322                    }
1323                    let field = match f.attributes.logical_type {
1324                        Some("decimal") => {
1325                            let (precision, scale, _) =
1326                                parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1327                            AvroDataType {
1328                                nullability: None,
1329                                metadata,
1330                                codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1331                                resolution: None,
1332                            }
1333                        }
1334                        Some("duration") => {
1335                            if size != 12 {
1336                                return Err(ArrowError::ParseError(format!(
1337                                    "Invalid fixed size for Duration: {size}, must be 12"
1338                                )));
1339                            };
1340                            AvroDataType {
1341                                nullability: None,
1342                                metadata,
1343                                codec: Codec::Interval,
1344                                resolution: None,
1345                            }
1346                        }
1347                        _ => AvroDataType {
1348                            nullability: None,
1349                            metadata,
1350                            codec: Codec::Fixed(size),
1351                            resolution: None,
1352                        },
1353                    };
1354                    self.resolver.register(f.name, namespace, field.clone());
1355                    Ok(field)
1356                }
1357                ComplexType::Enum(e) => {
1358                    let namespace = e.namespace.or(namespace);
1359                    let symbols = e
1360                        .symbols
1361                        .iter()
1362                        .map(|s| s.to_string())
1363                        .collect::<Arc<[String]>>();
1364                    let mut metadata = e.attributes.field_metadata();
1365                    let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1366                        ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1367                    })?;
1368                    metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1369                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1370                    if let Some(ns) = namespace {
1371                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1372                    }
1373                    let field = AvroDataType {
1374                        nullability: None,
1375                        metadata,
1376                        codec: Codec::Enum(symbols),
1377                        resolution: None,
1378                    };
1379                    self.resolver.register(e.name, namespace, field.clone());
1380                    Ok(field)
1381                }
1382                ComplexType::Map(m) => {
1383                    let val = self.parse_type(&m.values, namespace)?;
1384                    Ok(AvroDataType {
1385                        nullability: None,
1386                        metadata: m.attributes.field_metadata(),
1387                        codec: Codec::Map(Arc::new(val)),
1388                        resolution: None,
1389                    })
1390                }
1391            },
1392            Schema::Type(t) => {
1393                let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1394                // https://avro.apache.org/docs/1.11.1/specification/#logical-types
1395                match (t.attributes.logical_type, &mut field.codec) {
1396                    (Some("decimal"), c @ Codec::Binary) => {
1397                        let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1398                        *c = Codec::Decimal(prec, Some(sc), None);
1399                    }
1400                    (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1401                    (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1402                    (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1403                    (Some("timestamp-millis"), c @ Codec::Int64) => {
1404                        *c = Codec::TimestampMillis(true)
1405                    }
1406                    (Some("timestamp-micros"), c @ Codec::Int64) => {
1407                        *c = Codec::TimestampMicros(true)
1408                    }
1409                    (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1410                        *c = Codec::TimestampMillis(false)
1411                    }
1412                    (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1413                        *c = Codec::TimestampMicros(false)
1414                    }
1415                    (Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
1416                    (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1417                        *c = Codec::TimestampNanos(false)
1418                    }
1419                    (Some("uuid"), c @ Codec::Utf8) => {
1420                        // Map Avro string+logicalType=uuid into the UUID Codec,
1421                        // and preserve the logicalType in Arrow field metadata
1422                        // so writers can round-trip it correctly.
1423                        *c = Codec::Uuid;
1424                        field.metadata.insert("logicalType".into(), "uuid".into());
1425                    }
1426                    #[cfg(feature = "avro_custom_types")]
1427                    (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1428                    #[cfg(feature = "avro_custom_types")]
1429                    (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1430                    #[cfg(feature = "avro_custom_types")]
1431                    (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1432                    #[cfg(feature = "avro_custom_types")]
1433                    (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1434                        *c = Codec::DurationSeconds
1435                    }
1436                    #[cfg(feature = "avro_custom_types")]
1437                    (Some("arrow.run-end-encoded"), _) => {
1438                        let bits_u8: u8 = t
1439                            .attributes
1440                            .additional
1441                            .get("arrow.runEndIndexBits")
1442                            .and_then(|v| v.as_u64())
1443                            .and_then(|n| u8::try_from(n).ok())
1444                            .ok_or_else(|| ArrowError::ParseError(
1445                                "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1446                                    .to_string(),
1447                            ))?;
1448                        if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1449                            return Err(ArrowError::ParseError(format!(
1450                                "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1451                            )));
1452                        }
1453                        // Wrap the parsed underlying site as REE
1454                        let values_site = field.clone();
1455                        field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1456                    }
1457                    (Some(logical), _) => {
1458                        // Insert unrecognized logical type into metadata map
1459                        field.metadata.insert("logicalType".into(), logical.into());
1460                    }
1461                    (None, _) => {}
1462                }
1463                if matches!(field.codec, Codec::Int64) {
1464                    if let Some(unit) = t
1465                        .attributes
1466                        .additional
1467                        .get("arrowTimeUnit")
1468                        .and_then(|v| v.as_str())
1469                    {
1470                        if unit == "nanosecond" {
1471                            field.codec = Codec::TimestampNanos(false);
1472                        }
1473                    }
1474                }
1475                if !t.attributes.additional.is_empty() {
1476                    for (k, v) in &t.attributes.additional {
1477                        field.metadata.insert(k.to_string(), v.to_string());
1478                    }
1479                }
1480                Ok(field)
1481            }
1482        }
1483    }
1484
1485    fn resolve_type<'s>(
1486        &mut self,
1487        writer_schema: &'s Schema<'a>,
1488        reader_schema: &'s Schema<'a>,
1489        namespace: Option<&'a str>,
1490    ) -> Result<AvroDataType, ArrowError> {
1491        if let (Some(write_primitive), Some(read_primitive)) =
1492            (primitive_of(writer_schema), primitive_of(reader_schema))
1493        {
1494            return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1495        }
1496        match (writer_schema, reader_schema) {
1497            (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1498                let writer_variants = writer_variants.as_slice();
1499                let reader_variants = reader_variants.as_slice();
1500                match (
1501                    nullable_union_variants(writer_variants),
1502                    nullable_union_variants(reader_variants),
1503                ) {
1504                    (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
1505                        let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?;
1506                        dt.nullability = Some(w_nb);
1507                        #[cfg(feature = "avro_custom_types")]
1508                        Self::propagate_nullability_into_ree(&mut dt, w_nb);
1509                        Ok(dt)
1510                    }
1511                    _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1512                }
1513            }
1514            (Schema::Union(writer_variants), reader_non_union) => {
1515                let writer_to_reader: Vec<Option<(usize, Promotion)>> = writer_variants
1516                    .iter()
1517                    .map(|writer| {
1518                        self.resolve_type(writer, reader_non_union, namespace)
1519                            .ok()
1520                            .map(|tmp| (0usize, Self::coercion_from(&tmp)))
1521                    })
1522                    .collect();
1523                let mut dt = self.parse_type(reader_non_union, namespace)?;
1524                dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1525                    writer_to_reader: Arc::from(writer_to_reader),
1526                    writer_is_union: true,
1527                    reader_is_union: false,
1528                }));
1529                Ok(dt)
1530            }
1531            (writer_non_union, Schema::Union(reader_variants)) => {
1532                let promo = self.find_best_promotion(
1533                    writer_non_union,
1534                    reader_variants.as_slice(),
1535                    namespace,
1536                );
1537                let Some((reader_index, promotion)) = promo else {
1538                    return Err(ArrowError::SchemaError(
1539                        "Writer schema does not match any reader union branch".to_string(),
1540                    ));
1541                };
1542                let mut dt = self.parse_type(reader_schema, namespace)?;
1543                dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1544                    writer_to_reader: Arc::from(vec![Some((reader_index, promotion))]),
1545                    writer_is_union: false,
1546                    reader_is_union: true,
1547                }));
1548                Ok(dt)
1549            }
1550            (
1551                Schema::Complex(ComplexType::Array(writer_array)),
1552                Schema::Complex(ComplexType::Array(reader_array)),
1553            ) => self.resolve_array(writer_array, reader_array, namespace),
1554            (
1555                Schema::Complex(ComplexType::Map(writer_map)),
1556                Schema::Complex(ComplexType::Map(reader_map)),
1557            ) => self.resolve_map(writer_map, reader_map, namespace),
1558            (
1559                Schema::Complex(ComplexType::Fixed(writer_fixed)),
1560                Schema::Complex(ComplexType::Fixed(reader_fixed)),
1561            ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1562            (
1563                Schema::Complex(ComplexType::Record(writer_record)),
1564                Schema::Complex(ComplexType::Record(reader_record)),
1565            ) => self.resolve_records(writer_record, reader_record, namespace),
1566            (
1567                Schema::Complex(ComplexType::Enum(writer_enum)),
1568                Schema::Complex(ComplexType::Enum(reader_enum)),
1569            ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1570            (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1571            (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1572            _ => Err(ArrowError::NotYetImplemented(
1573                "Other resolutions not yet implemented".to_string(),
1574            )),
1575        }
1576    }
1577
1578    #[inline]
1579    fn coercion_from(dt: &AvroDataType) -> Promotion {
1580        match dt.resolution.as_ref() {
1581            Some(ResolutionInfo::Promotion(promotion)) => *promotion,
1582            _ => Promotion::Direct,
1583        }
1584    }
1585
1586    fn find_best_promotion(
1587        &mut self,
1588        writer: &Schema<'a>,
1589        reader_variants: &[Schema<'a>],
1590        namespace: Option<&'a str>,
1591    ) -> Option<(usize, Promotion)> {
1592        let mut first_promotion: Option<(usize, Promotion)> = None;
1593        for (reader_index, reader) in reader_variants.iter().enumerate() {
1594            if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
1595                let promotion = Self::coercion_from(&tmp);
1596                if promotion == Promotion::Direct {
1597                    // An exact match is best, return immediately.
1598                    return Some((reader_index, promotion));
1599                } else if first_promotion.is_none() {
1600                    // Store the first valid promotion but keep searching for a direct match.
1601                    first_promotion = Some((reader_index, promotion));
1602                }
1603            }
1604        }
1605        first_promotion
1606    }
1607
1608    fn resolve_unions<'s>(
1609        &mut self,
1610        writer_variants: &'s [Schema<'a>],
1611        reader_variants: &'s [Schema<'a>],
1612        namespace: Option<&'a str>,
1613    ) -> Result<AvroDataType, ArrowError> {
1614        let reader_encodings: Vec<AvroDataType> = reader_variants
1615            .iter()
1616            .map(|reader_schema| self.parse_type(reader_schema, namespace))
1617            .collect::<Result<_, _>>()?;
1618        let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
1619            Vec::with_capacity(writer_variants.len());
1620        for writer in writer_variants {
1621            writer_to_reader.push(self.find_best_promotion(writer, reader_variants, namespace));
1622        }
1623        let union_fields = build_union_fields(&reader_encodings);
1624        let mut dt = AvroDataType::new(
1625            Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1626            Default::default(),
1627            None,
1628        );
1629        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1630            writer_to_reader: Arc::from(writer_to_reader),
1631            writer_is_union: true,
1632            reader_is_union: true,
1633        }));
1634        Ok(dt)
1635    }
1636
1637    fn resolve_array(
1638        &mut self,
1639        writer_array: &Array<'a>,
1640        reader_array: &Array<'a>,
1641        namespace: Option<&'a str>,
1642    ) -> Result<AvroDataType, ArrowError> {
1643        Ok(AvroDataType {
1644            nullability: None,
1645            metadata: reader_array.attributes.field_metadata(),
1646            codec: Codec::List(Arc::new(self.make_data_type(
1647                writer_array.items.as_ref(),
1648                Some(reader_array.items.as_ref()),
1649                namespace,
1650            )?)),
1651            resolution: None,
1652        })
1653    }
1654
1655    fn resolve_map(
1656        &mut self,
1657        writer_map: &Map<'a>,
1658        reader_map: &Map<'a>,
1659        namespace: Option<&'a str>,
1660    ) -> Result<AvroDataType, ArrowError> {
1661        Ok(AvroDataType {
1662            nullability: None,
1663            metadata: reader_map.attributes.field_metadata(),
1664            codec: Codec::Map(Arc::new(self.make_data_type(
1665                &writer_map.values,
1666                Some(&reader_map.values),
1667                namespace,
1668            )?)),
1669            resolution: None,
1670        })
1671    }
1672
1673    fn resolve_fixed<'s>(
1674        &mut self,
1675        writer_fixed: &Fixed<'a>,
1676        reader_fixed: &Fixed<'a>,
1677        reader_schema: &'s Schema<'a>,
1678        namespace: Option<&'a str>,
1679    ) -> Result<AvroDataType, ArrowError> {
1680        ensure_names_match(
1681            "Fixed",
1682            writer_fixed.name,
1683            writer_fixed.namespace,
1684            &writer_fixed.aliases,
1685            reader_fixed.name,
1686            reader_fixed.namespace,
1687            &reader_fixed.aliases,
1688        )?;
1689        if writer_fixed.size != reader_fixed.size {
1690            return Err(ArrowError::SchemaError(format!(
1691                "Fixed size mismatch for {}: writer={}, reader={}",
1692                reader_fixed.name, writer_fixed.size, reader_fixed.size
1693            )));
1694        }
1695        self.parse_type(reader_schema, namespace)
1696    }
1697
1698    fn resolve_primitives(
1699        &mut self,
1700        write_primitive: PrimitiveType,
1701        read_primitive: PrimitiveType,
1702        reader_schema: &Schema<'a>,
1703    ) -> Result<AvroDataType, ArrowError> {
1704        if write_primitive == read_primitive {
1705            return self.parse_type(reader_schema, None);
1706        }
1707        let promotion = match (write_primitive, read_primitive) {
1708            (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
1709            (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
1710            (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
1711            (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
1712            (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
1713            (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
1714            (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
1715            (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
1716            _ => {
1717                return Err(ArrowError::ParseError(format!(
1718                    "Illegal promotion {write_primitive:?} to {read_primitive:?}"
1719                )));
1720            }
1721        };
1722        let mut datatype = self.parse_type(reader_schema, None)?;
1723        datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
1724        Ok(datatype)
1725    }
1726
1727    // Resolve writer vs. reader enum schemas according to Avro 1.11.1.
1728    //
1729    // # How enums resolve (writer to reader)
1730    // Per “Schema Resolution”:
1731    // * The two schemas must refer to the same (unqualified) enum name (or match
1732    //   via alias rewriting).
1733    // * If the writer’s symbol is not present in the reader’s enum and the reader
1734    //   enum has a `default`, that `default` symbol must be used; otherwise,
1735    //   error.
1736    //   https://avro.apache.org/docs/1.11.1/specification/#schema-resolution
1737    // * Avro “Aliases” are applied from the reader side to rewrite the writer’s
1738    //   names during resolution. For robustness across ecosystems, we also accept
1739    //   symmetry here (see note below).
1740    //   https://avro.apache.org/docs/1.11.1/specification/#aliases
1741    //
1742    // # Rationale for this code path
1743    // 1. Do the work once at schema‑resolution time. Avro serializes an enum as a
1744    //    writer‑side position. Mapping positions on the hot decoder path is expensive
1745    //    if done with string lookups. This method builds a `writer_index to reader_index`
1746    //    vector once, so decoding just does an O(1) table lookup.
1747    // 2. Adopt the reader’s symbol set and order. We return an Arrow
1748    //    `Dictionary(Int32, Utf8)` whose dictionary values are the reader enum
1749    //    symbols. This makes downstream semantics match the reader schema, including
1750    //    Avro’s sort order rule that orders enums by symbol position in the schema.
1751    //    https://avro.apache.org/docs/1.11.1/specification/#sort-order
1752    // 3. Honor Avro’s `default` for enums. Avro 1.9+ allows a type‑level default
1753    //    on the enum. When the writer emits a symbol unknown to the reader, we map it
1754    //    to the reader’s validated `default` symbol if present; otherwise we signal an
1755    //    error at decoding time.
1756    //    https://avro.apache.org/docs/1.11.1/specification/#enums
1757    //
1758    // # Implementation notes
1759    // * We first check that enum names match or are*alias‑equivalent. The Avro
1760    //   spec describes alias rewriting using reader aliases; this implementation
1761    //   additionally treats writer aliases as acceptable for name matching to be
1762    //   resilient with schemas produced by different tooling.
1763    // * We build `EnumMapping`:
1764    //   - `mapping[i]` = reader index of the writer symbol at writer index `i`.
1765    //   - If the writer symbol is absent and the reader has a default, we store the
1766    //     reader index of that default.
1767    //   - Otherwise we store `-1` as a sentinel meaning unresolvable; the decoder
1768    //     must treat encountering such a value as an error, per the spec.
1769    // * We persist the reader symbol list in field metadata under
1770    //   `AVRO_ENUM_SYMBOLS_METADATA_KEY`, so consumers can inspect the dictionary
1771    //   without needing the original Avro schema.
1772    // * The Arrow representation is `Dictionary(Int32, Utf8)`, which aligns with
1773    //   Avro’s integer index encoding for enums.
1774    //
1775    // # Examples
1776    // * Writer `["A","B","C"]`, Reader `["A","B"]`, Reader default `"A"`
1777    //     `mapping = [0, 1, 0]`, `default_index = 0`.
1778    // * Writer `["A","B"]`, Reader `["B","A"]` (no default)
1779    //     `mapping = [1, 0]`, `default_index = -1`.
1780    // * Writer `["A","B","C"]`, Reader `["A","B"]` (no default)
1781    //     `mapping = [0, 1, -1]` (decode must error on `"C"`).
1782    fn resolve_enums(
1783        &mut self,
1784        writer_enum: &Enum<'a>,
1785        reader_enum: &Enum<'a>,
1786        reader_schema: &Schema<'a>,
1787        namespace: Option<&'a str>,
1788    ) -> Result<AvroDataType, ArrowError> {
1789        ensure_names_match(
1790            "Enum",
1791            writer_enum.name,
1792            writer_enum.namespace,
1793            &writer_enum.aliases,
1794            reader_enum.name,
1795            reader_enum.namespace,
1796            &reader_enum.aliases,
1797        )?;
1798        if writer_enum.symbols == reader_enum.symbols {
1799            return self.parse_type(reader_schema, namespace);
1800        }
1801        let reader_index: HashMap<&str, i32> = reader_enum
1802            .symbols
1803            .iter()
1804            .enumerate()
1805            .map(|(index, &symbol)| (symbol, index as i32))
1806            .collect();
1807        let default_index: i32 = match reader_enum.default {
1808            Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
1809                ArrowError::SchemaError(format!(
1810                    "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
1811                    reader_enum.name,
1812                ))
1813            })?,
1814            None => -1,
1815        };
1816        let mapping: Vec<i32> = writer_enum
1817            .symbols
1818            .iter()
1819            .map(|&write_symbol| {
1820                reader_index
1821                    .get(write_symbol)
1822                    .copied()
1823                    .unwrap_or(default_index)
1824            })
1825            .collect();
1826        if self.strict_mode && mapping.iter().any(|&m| m < 0) {
1827            return Err(ArrowError::SchemaError(format!(
1828                "Reader enum '{}' does not cover all writer symbols and no default is provided",
1829                reader_enum.name
1830            )));
1831        }
1832        let mut dt = self.parse_type(reader_schema, namespace)?;
1833        dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
1834            mapping: Arc::from(mapping),
1835            default_index,
1836        }));
1837        let reader_ns = reader_enum.namespace.or(namespace);
1838        self.resolver
1839            .register(reader_enum.name, reader_ns, dt.clone());
1840        Ok(dt)
1841    }
1842
1843    #[inline]
1844    fn build_writer_lookup(
1845        writer_record: &Record<'a>,
1846    ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
1847        let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
1848        for (idx, wf) in writer_record.fields.iter().enumerate() {
1849            // Avro field names are unique; last-in wins are acceptable and match previous behavior.
1850            map.insert(wf.name, idx);
1851        }
1852        // Track ambiguous writer aliases (alias used by multiple writer fields)
1853        let mut ambiguous: HashSet<&str> = HashSet::new();
1854        for (idx, wf) in writer_record.fields.iter().enumerate() {
1855            for &alias in &wf.aliases {
1856                match map.entry(alias) {
1857                    Entry::Occupied(e) if *e.get() != idx => {
1858                        ambiguous.insert(alias);
1859                    }
1860                    Entry::Vacant(e) => {
1861                        e.insert(idx);
1862                    }
1863                    _ => {}
1864                }
1865            }
1866        }
1867        (map, ambiguous)
1868    }
1869
1870    fn resolve_records(
1871        &mut self,
1872        writer_record: &Record<'a>,
1873        reader_record: &Record<'a>,
1874        namespace: Option<&'a str>,
1875    ) -> Result<AvroDataType, ArrowError> {
1876        ensure_names_match(
1877            "Record",
1878            writer_record.name,
1879            writer_record.namespace,
1880            &writer_record.aliases,
1881            reader_record.name,
1882            reader_record.namespace,
1883            &reader_record.aliases,
1884        )?;
1885        let writer_ns = writer_record.namespace.or(namespace);
1886        let reader_ns = reader_record.namespace.or(namespace);
1887        let reader_md = reader_record.attributes.field_metadata();
1888        // Build writer lookup and ambiguous alias set.
1889        let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
1890        let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
1891        let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
1892        // Capture default field indices during the main loop (one pass).
1893        let mut default_fields: Vec<usize> = Vec::new();
1894        for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
1895            // Direct name match, then reader aliases (a writer alias map is pre-populated).
1896            let mut match_idx = writer_lookup.get(r_field.name).copied();
1897            let mut matched_via_alias: Option<&str> = None;
1898            if match_idx.is_none() {
1899                for &alias in &r_field.aliases {
1900                    if let Some(i) = writer_lookup.get(alias).copied() {
1901                        if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
1902                            return Err(ArrowError::SchemaError(format!(
1903                                "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
1904                                r_field.name
1905                            )));
1906                        }
1907                        match_idx = Some(i);
1908                        matched_via_alias = Some(alias);
1909                        break;
1910                    }
1911                }
1912            }
1913            if let Some(wi) = match_idx {
1914                if writer_to_reader[wi].is_none() {
1915                    let w_schema = &writer_record.fields[wi].r#type;
1916                    let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
1917                    writer_to_reader[wi] = Some(reader_idx);
1918                    reader_fields.push(AvroField {
1919                        name: r_field.name.to_owned(),
1920                        data_type: dt,
1921                    });
1922                    continue;
1923                } else if self.strict_mode {
1924                    // Writer field already mapped and strict_mode => error
1925                    let existing_reader = writer_to_reader[wi].unwrap();
1926                    let via = matched_via_alias
1927                        .map(|a| format!("alias '{a}'"))
1928                        .unwrap_or_else(|| "name match".to_string());
1929                    return Err(ArrowError::SchemaError(format!(
1930                        "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
1931                        writer_record.fields[wi].name
1932                    )));
1933                }
1934                // Non-strict and already mapped -> fall through to defaulting logic
1935            }
1936            // No match (or conflicted in non-strict mode): attach default per Avro spec.
1937            let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
1938            if let Some(default_json) = r_field.default.as_ref() {
1939                dt.resolution = Some(ResolutionInfo::DefaultValue(
1940                    dt.parse_and_store_default(default_json)?,
1941                ));
1942                default_fields.push(reader_idx);
1943            } else if dt.nullability() == Some(Nullability::NullFirst) {
1944                // The only valid implicit default for a union is the first branch (null-first case).
1945                dt.resolution = Some(ResolutionInfo::DefaultValue(
1946                    dt.parse_and_store_default(&Value::Null)?,
1947                ));
1948                default_fields.push(reader_idx);
1949            } else {
1950                return Err(ArrowError::SchemaError(format!(
1951                    "Reader field '{}' not present in writer schema must have a default value",
1952                    r_field.name
1953                )));
1954            }
1955            reader_fields.push(AvroField {
1956                name: r_field.name.to_owned(),
1957                data_type: dt,
1958            });
1959        }
1960        // Build skip_fields in writer order; pre-size and push.
1961        let mut skip_fields: Vec<Option<AvroDataType>> =
1962            Vec::with_capacity(writer_record.fields.len());
1963        for (writer_index, writer_field) in writer_record.fields.iter().enumerate() {
1964            if writer_to_reader[writer_index].is_some() {
1965                skip_fields.push(None);
1966            } else {
1967                skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?));
1968            }
1969        }
1970        let resolved = AvroDataType::new_with_resolution(
1971            Codec::Struct(Arc::from(reader_fields)),
1972            reader_md,
1973            None,
1974            Some(ResolutionInfo::Record(ResolvedRecord {
1975                writer_to_reader: Arc::from(writer_to_reader),
1976                default_fields: Arc::from(default_fields),
1977                skip_fields: Arc::from(skip_fields),
1978            })),
1979        );
1980        // Register a resolved record by reader name+namespace for potential named type refs.
1981        self.resolver
1982            .register(reader_record.name, reader_ns, resolved.clone());
1983        Ok(resolved)
1984    }
1985}
1986
1987#[cfg(test)]
1988mod tests {
1989    use super::*;
1990    use crate::schema::{
1991        AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
1992        Fixed, PrimitiveType, Record, Schema, Type, TypeName,
1993    };
1994    use indexmap::IndexMap;
1995    use serde_json::{self, Value};
1996
1997    fn create_schema_with_logical_type(
1998        primitive_type: PrimitiveType,
1999        logical_type: &'static str,
2000    ) -> Schema<'static> {
2001        let attributes = Attributes {
2002            logical_type: Some(logical_type),
2003            additional: Default::default(),
2004        };
2005
2006        Schema::Type(Type {
2007            r#type: TypeName::Primitive(primitive_type),
2008            attributes,
2009        })
2010    }
2011
2012    fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2013        let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2014        let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2015        let mut maker = Maker::new(false, false);
2016        maker
2017            .make_data_type(&writer_schema, Some(&reader_schema), None)
2018            .expect("promotion should resolve")
2019    }
2020
2021    fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2022        Schema::TypeName(TypeName::Primitive(pt))
2023    }
2024    fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2025        Schema::Union(branches)
2026    }
2027
2028    #[test]
2029    fn test_date_logical_type() {
2030        let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2031
2032        let mut maker = Maker::new(false, false);
2033        let result = maker.make_data_type(&schema, None, None).unwrap();
2034
2035        assert!(matches!(result.codec, Codec::Date32));
2036    }
2037
2038    #[test]
2039    fn test_time_millis_logical_type() {
2040        let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2041
2042        let mut maker = Maker::new(false, false);
2043        let result = maker.make_data_type(&schema, None, None).unwrap();
2044
2045        assert!(matches!(result.codec, Codec::TimeMillis));
2046    }
2047
2048    #[test]
2049    fn test_time_micros_logical_type() {
2050        let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2051
2052        let mut maker = Maker::new(false, false);
2053        let result = maker.make_data_type(&schema, None, None).unwrap();
2054
2055        assert!(matches!(result.codec, Codec::TimeMicros));
2056    }
2057
2058    #[test]
2059    fn test_timestamp_millis_logical_type() {
2060        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2061
2062        let mut maker = Maker::new(false, false);
2063        let result = maker.make_data_type(&schema, None, None).unwrap();
2064
2065        assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2066    }
2067
2068    #[test]
2069    fn test_timestamp_micros_logical_type() {
2070        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2071
2072        let mut maker = Maker::new(false, false);
2073        let result = maker.make_data_type(&schema, None, None).unwrap();
2074
2075        assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2076    }
2077
2078    #[test]
2079    fn test_local_timestamp_millis_logical_type() {
2080        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2081
2082        let mut maker = Maker::new(false, false);
2083        let result = maker.make_data_type(&schema, None, None).unwrap();
2084
2085        assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2086    }
2087
2088    #[test]
2089    fn test_local_timestamp_micros_logical_type() {
2090        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2091
2092        let mut maker = Maker::new(false, false);
2093        let result = maker.make_data_type(&schema, None, None).unwrap();
2094
2095        assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2096    }
2097
2098    #[test]
2099    fn test_uuid_type() {
2100        let mut codec = Codec::Fixed(16);
2101        if let c @ Codec::Fixed(16) = &mut codec {
2102            *c = Codec::Uuid;
2103        }
2104        assert!(matches!(codec, Codec::Uuid));
2105    }
2106
2107    #[test]
2108    fn test_duration_logical_type() {
2109        let mut codec = Codec::Fixed(12);
2110
2111        if let c @ Codec::Fixed(12) = &mut codec {
2112            *c = Codec::Interval;
2113        }
2114
2115        assert!(matches!(codec, Codec::Interval));
2116    }
2117
2118    #[test]
2119    fn test_decimal_logical_type_not_implemented() {
2120        let codec = Codec::Fixed(16);
2121
2122        let process_decimal = || -> Result<(), ArrowError> {
2123            if let Codec::Fixed(_) = codec {
2124                return Err(ArrowError::NotYetImplemented(
2125                    "Decimals are not currently supported".to_string(),
2126                ));
2127            }
2128            Ok(())
2129        };
2130
2131        let result = process_decimal();
2132
2133        assert!(result.is_err());
2134        if let Err(ArrowError::NotYetImplemented(msg)) = result {
2135            assert!(msg.contains("Decimals are not currently supported"));
2136        } else {
2137            panic!("Expected NotYetImplemented error");
2138        }
2139    }
2140    #[test]
2141    fn test_unknown_logical_type_added_to_metadata() {
2142        let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2143
2144        let mut maker = Maker::new(false, false);
2145        let result = maker.make_data_type(&schema, None, None).unwrap();
2146
2147        assert_eq!(
2148            result.metadata.get("logicalType"),
2149            Some(&"custom-type".to_string())
2150        );
2151    }
2152
2153    #[test]
2154    fn test_string_with_utf8view_enabled() {
2155        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2156
2157        let mut maker = Maker::new(true, false);
2158        let result = maker.make_data_type(&schema, None, None).unwrap();
2159
2160        assert!(matches!(result.codec, Codec::Utf8View));
2161    }
2162
2163    #[test]
2164    fn test_string_without_utf8view_enabled() {
2165        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2166
2167        let mut maker = Maker::new(false, false);
2168        let result = maker.make_data_type(&schema, None, None).unwrap();
2169
2170        assert!(matches!(result.codec, Codec::Utf8));
2171    }
2172
2173    #[test]
2174    fn test_record_with_string_and_utf8view_enabled() {
2175        let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2176
2177        let avro_field = crate::schema::Field {
2178            name: "string_field",
2179            r#type: field_schema,
2180            default: None,
2181            doc: None,
2182            aliases: vec![],
2183        };
2184
2185        let record = Record {
2186            name: "test_record",
2187            namespace: None,
2188            aliases: vec![],
2189            doc: None,
2190            fields: vec![avro_field],
2191            attributes: Attributes::default(),
2192        };
2193
2194        let schema = Schema::Complex(ComplexType::Record(record));
2195
2196        let mut maker = Maker::new(true, false);
2197        let result = maker.make_data_type(&schema, None, None).unwrap();
2198
2199        if let Codec::Struct(fields) = &result.codec {
2200            let first_field_codec = &fields[0].data_type().codec;
2201            assert!(matches!(first_field_codec, Codec::Utf8View));
2202        } else {
2203            panic!("Expected Struct codec");
2204        }
2205    }
2206
2207    #[test]
2208    fn test_union_with_strict_mode() {
2209        let schema = Schema::Union(vec![
2210            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2211            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2212        ]);
2213
2214        let mut maker = Maker::new(false, true);
2215        let result = maker.make_data_type(&schema, None, None);
2216
2217        assert!(result.is_err());
2218        match result {
2219            Err(ArrowError::SchemaError(msg)) => {
2220                assert!(msg.contains(
2221                    "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2222                ));
2223            }
2224            _ => panic!("Expected SchemaError"),
2225        }
2226    }
2227
2228    #[test]
2229    fn test_resolve_int_to_float_promotion() {
2230        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2231        assert!(matches!(result.codec, Codec::Float32));
2232        assert_eq!(
2233            result.resolution,
2234            Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2235        );
2236    }
2237
2238    #[test]
2239    fn test_resolve_int_to_double_promotion() {
2240        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2241        assert!(matches!(result.codec, Codec::Float64));
2242        assert_eq!(
2243            result.resolution,
2244            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2245        );
2246    }
2247
2248    #[test]
2249    fn test_resolve_long_to_float_promotion() {
2250        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2251        assert!(matches!(result.codec, Codec::Float32));
2252        assert_eq!(
2253            result.resolution,
2254            Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2255        );
2256    }
2257
2258    #[test]
2259    fn test_resolve_long_to_double_promotion() {
2260        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2261        assert!(matches!(result.codec, Codec::Float64));
2262        assert_eq!(
2263            result.resolution,
2264            Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2265        );
2266    }
2267
2268    #[test]
2269    fn test_resolve_float_to_double_promotion() {
2270        let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2271        assert!(matches!(result.codec, Codec::Float64));
2272        assert_eq!(
2273            result.resolution,
2274            Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2275        );
2276    }
2277
2278    #[test]
2279    fn test_resolve_string_to_bytes_promotion() {
2280        let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2281        assert!(matches!(result.codec, Codec::Binary));
2282        assert_eq!(
2283            result.resolution,
2284            Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2285        );
2286    }
2287
2288    #[test]
2289    fn test_resolve_bytes_to_string_promotion() {
2290        let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2291        assert!(matches!(result.codec, Codec::Utf8));
2292        assert_eq!(
2293            result.resolution,
2294            Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2295        );
2296    }
2297
2298    #[test]
2299    fn test_resolve_illegal_promotion_double_to_float_errors() {
2300        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2301        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2302        let mut maker = Maker::new(false, false);
2303        let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2304        assert!(result.is_err());
2305        match result {
2306            Err(ArrowError::ParseError(msg)) => {
2307                assert!(msg.contains("Illegal promotion"));
2308            }
2309            _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2310        }
2311    }
2312
2313    #[test]
2314    fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2315        let writer = Schema::Union(vec![
2316            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2317            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2318        ]);
2319        let reader = Schema::Union(vec![
2320            Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2321            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2322        ]);
2323        let mut maker = Maker::new(false, false);
2324        let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2325        assert!(matches!(result.codec, Codec::Float64));
2326        assert_eq!(
2327            result.resolution,
2328            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2329        );
2330        assert_eq!(result.nullability, Some(Nullability::NullFirst));
2331    }
2332
2333    #[test]
2334    fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2335        let writer = mk_union(vec![
2336            mk_primitive(PrimitiveType::String),
2337            mk_primitive(PrimitiveType::Long),
2338        ]);
2339        let reader = mk_primitive(PrimitiveType::Bytes);
2340        let mut maker = Maker::new(false, false);
2341        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2342        assert!(matches!(dt.codec(), Codec::Binary));
2343        let resolved = match dt.resolution {
2344            Some(ResolutionInfo::Union(u)) => u,
2345            other => panic!("expected union resolution info, got {other:?}"),
2346        };
2347        assert!(resolved.writer_is_union && !resolved.reader_is_union);
2348        assert_eq!(
2349            resolved.writer_to_reader.as_ref(),
2350            &[Some((0, Promotion::StringToBytes)), None]
2351        );
2352    }
2353
2354    #[test]
2355    fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2356        let writer = mk_primitive(PrimitiveType::Long);
2357        let reader = mk_union(vec![
2358            mk_primitive(PrimitiveType::Long),
2359            mk_primitive(PrimitiveType::Double),
2360        ]);
2361        let mut maker = Maker::new(false, false);
2362        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2363        let resolved = match dt.resolution {
2364            Some(ResolutionInfo::Union(u)) => u,
2365            other => panic!("expected union resolution info, got {other:?}"),
2366        };
2367        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2368        assert_eq!(
2369            resolved.writer_to_reader.as_ref(),
2370            &[Some((0, Promotion::Direct))]
2371        );
2372    }
2373
2374    #[test]
2375    fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2376        let writer = mk_primitive(PrimitiveType::Int);
2377        let reader = mk_union(vec![
2378            mk_primitive(PrimitiveType::Null),
2379            mk_primitive(PrimitiveType::Long),
2380            mk_primitive(PrimitiveType::String),
2381        ]);
2382        let mut maker = Maker::new(false, false);
2383        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2384        let resolved = match dt.resolution {
2385            Some(ResolutionInfo::Union(u)) => u,
2386            other => panic!("expected union resolution info, got {other:?}"),
2387        };
2388        assert_eq!(
2389            resolved.writer_to_reader.as_ref(),
2390            &[Some((1, Promotion::IntToLong))]
2391        );
2392    }
2393
2394    #[test]
2395    fn test_resolve_both_nullable_unions_direct_match() {
2396        let writer = mk_union(vec![
2397            mk_primitive(PrimitiveType::Null),
2398            mk_primitive(PrimitiveType::String),
2399        ]);
2400        let reader = mk_union(vec![
2401            mk_primitive(PrimitiveType::String),
2402            mk_primitive(PrimitiveType::Null),
2403        ]);
2404        let mut maker = Maker::new(false, false);
2405        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2406        assert!(matches!(dt.codec(), Codec::Utf8));
2407        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2408        assert!(dt.resolution.is_none());
2409    }
2410
2411    #[test]
2412    fn test_resolve_both_nullable_unions_with_promotion() {
2413        let writer = mk_union(vec![
2414            mk_primitive(PrimitiveType::Null),
2415            mk_primitive(PrimitiveType::Int),
2416        ]);
2417        let reader = mk_union(vec![
2418            mk_primitive(PrimitiveType::Double),
2419            mk_primitive(PrimitiveType::Null),
2420        ]);
2421        let mut maker = Maker::new(false, false);
2422        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2423        assert!(matches!(dt.codec(), Codec::Float64));
2424        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2425        assert_eq!(
2426            dt.resolution,
2427            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2428        );
2429    }
2430
2431    #[test]
2432    fn test_resolve_type_promotion() {
2433        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2434        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2435        let mut maker = Maker::new(false, false);
2436        let result = maker
2437            .make_data_type(&writer_schema, Some(&reader_schema), None)
2438            .unwrap();
2439        assert!(matches!(result.codec, Codec::Int64));
2440        assert_eq!(
2441            result.resolution,
2442            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2443        );
2444    }
2445
2446    #[test]
2447    fn test_nested_record_type_reuse_without_namespace() {
2448        let schema_str = r#"
2449        {
2450          "type": "record",
2451          "name": "Record",
2452          "fields": [
2453            {
2454              "name": "nested",
2455              "type": {
2456                "type": "record",
2457                "name": "Nested",
2458                "fields": [
2459                  { "name": "nested_int", "type": "int" }
2460                ]
2461              }
2462            },
2463            { "name": "nestedRecord", "type": "Nested" },
2464            { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
2465            { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
2466          ]
2467        }
2468        "#;
2469
2470        let schema: Schema = serde_json::from_str(schema_str).unwrap();
2471
2472        let mut maker = Maker::new(false, false);
2473        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2474
2475        if let Codec::Struct(fields) = avro_data_type.codec() {
2476            assert_eq!(fields.len(), 4);
2477
2478            // nested
2479            assert_eq!(fields[0].name(), "nested");
2480            let nested_data_type = fields[0].data_type();
2481            if let Codec::Struct(nested_fields) = nested_data_type.codec() {
2482                assert_eq!(nested_fields.len(), 1);
2483                assert_eq!(nested_fields[0].name(), "nested_int");
2484                assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
2485            } else {
2486                panic!(
2487                    "'nested' field is not a struct but {:?}",
2488                    nested_data_type.codec()
2489                );
2490            }
2491
2492            // nestedRecord
2493            assert_eq!(fields[1].name(), "nestedRecord");
2494            let nested_record_data_type = fields[1].data_type();
2495            assert_eq!(
2496                nested_record_data_type.codec().data_type(),
2497                nested_data_type.codec().data_type()
2498            );
2499
2500            // nestedArray
2501            assert_eq!(fields[2].name(), "nestedArray");
2502            if let Codec::List(item_type) = fields[2].data_type().codec() {
2503                assert_eq!(
2504                    item_type.codec().data_type(),
2505                    nested_data_type.codec().data_type()
2506                );
2507            } else {
2508                panic!("'nestedArray' field is not a list");
2509            }
2510
2511            // nestedMap
2512            assert_eq!(fields[3].name(), "nestedMap");
2513            if let Codec::Map(value_type) = fields[3].data_type().codec() {
2514                assert_eq!(
2515                    value_type.codec().data_type(),
2516                    nested_data_type.codec().data_type()
2517                );
2518            } else {
2519                panic!("'nestedMap' field is not a map");
2520            }
2521        } else {
2522            panic!("Top-level schema is not a struct");
2523        }
2524    }
2525
2526    #[test]
2527    fn test_nested_enum_type_reuse_with_namespace() {
2528        let schema_str = r#"
2529        {
2530          "type": "record",
2531          "name": "Record",
2532          "namespace": "record_ns",
2533          "fields": [
2534            {
2535              "name": "status",
2536              "type": {
2537                "type": "enum",
2538                "name": "Status",
2539                "namespace": "enum_ns",
2540                "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
2541              }
2542            },
2543            { "name": "backupStatus", "type": "enum_ns.Status" },
2544            { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
2545            { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
2546          ]
2547        }
2548        "#;
2549
2550        let schema: Schema = serde_json::from_str(schema_str).unwrap();
2551
2552        let mut maker = Maker::new(false, false);
2553        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2554
2555        if let Codec::Struct(fields) = avro_data_type.codec() {
2556            assert_eq!(fields.len(), 4);
2557
2558            // status
2559            assert_eq!(fields[0].name(), "status");
2560            let status_data_type = fields[0].data_type();
2561            if let Codec::Enum(symbols) = status_data_type.codec() {
2562                assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
2563            } else {
2564                panic!(
2565                    "'status' field is not an enum but {:?}",
2566                    status_data_type.codec()
2567                );
2568            }
2569
2570            // backupStatus
2571            assert_eq!(fields[1].name(), "backupStatus");
2572            let backup_status_data_type = fields[1].data_type();
2573            assert_eq!(
2574                backup_status_data_type.codec().data_type(),
2575                status_data_type.codec().data_type()
2576            );
2577
2578            // statusHistory
2579            assert_eq!(fields[2].name(), "statusHistory");
2580            if let Codec::List(item_type) = fields[2].data_type().codec() {
2581                assert_eq!(
2582                    item_type.codec().data_type(),
2583                    status_data_type.codec().data_type()
2584                );
2585            } else {
2586                panic!("'statusHistory' field is not a list");
2587            }
2588
2589            // statusMap
2590            assert_eq!(fields[3].name(), "statusMap");
2591            if let Codec::Map(value_type) = fields[3].data_type().codec() {
2592                assert_eq!(
2593                    value_type.codec().data_type(),
2594                    status_data_type.codec().data_type()
2595                );
2596            } else {
2597                panic!("'statusMap' field is not a map");
2598            }
2599        } else {
2600            panic!("Top-level schema is not a struct");
2601        }
2602    }
2603
2604    #[test]
2605    fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
2606        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2607        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2608        let mut maker = Maker::new(false, false);
2609        let data_type = maker
2610            .make_data_type(&writer_schema, Some(&reader_schema), None)
2611            .expect("resolution should succeed");
2612        let field = AvroField {
2613            name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
2614            data_type,
2615        };
2616        assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
2617        assert!(matches!(field.data_type().codec(), Codec::Utf8));
2618    }
2619
2620    fn json_string(s: &str) -> Value {
2621        Value::String(s.to_string())
2622    }
2623
2624    fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
2625        let stored = dt
2626            .metadata
2627            .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
2628            .cloned()
2629            .unwrap_or_default();
2630        let expected = serde_json::to_string(default_json).unwrap();
2631        assert_eq!(stored, expected, "stored default metadata should match");
2632    }
2633
2634    #[test]
2635    fn test_validate_and_store_default_null_and_nullability_rules() {
2636        let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
2637        let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
2638        assert_eq!(lit, AvroLiteral::Null);
2639        assert_default_stored(&dt_null, &Value::Null);
2640        let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2641        let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
2642        assert!(
2643            err.to_string()
2644                .contains("JSON null default is only valid for `null` type"),
2645            "unexpected error: {err}"
2646        );
2647        let mut dt_int_nf =
2648            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
2649        let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
2650        assert_eq!(lit2, AvroLiteral::Null);
2651        assert_default_stored(&dt_int_nf, &Value::Null);
2652        let mut dt_int_ns =
2653            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
2654        let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
2655        assert!(
2656            err2.to_string()
2657                .contains("JSON null default is only valid for `null` type"),
2658            "unexpected error: {err2}"
2659        );
2660    }
2661
2662    #[test]
2663    fn test_validate_and_store_default_primitives_and_temporal() {
2664        let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
2665        let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
2666        assert_eq!(lit, AvroLiteral::Boolean(true));
2667        assert_default_stored(&dt_bool, &Value::Bool(true));
2668        let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2669        let lit = dt_i32
2670            .parse_and_store_default(&serde_json::json!(123))
2671            .unwrap();
2672        assert_eq!(lit, AvroLiteral::Int(123));
2673        assert_default_stored(&dt_i32, &serde_json::json!(123));
2674        let err = dt_i32
2675            .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
2676            .unwrap_err();
2677        assert!(format!("{err}").contains("out of i32 range"));
2678        let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2679        let lit = dt_i64
2680            .parse_and_store_default(&serde_json::json!(1234567890))
2681            .unwrap();
2682        assert_eq!(lit, AvroLiteral::Long(1234567890));
2683        assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
2684        let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
2685        let lit = dt_f32
2686            .parse_and_store_default(&serde_json::json!(1.25))
2687            .unwrap();
2688        assert_eq!(lit, AvroLiteral::Float(1.25));
2689        assert_default_stored(&dt_f32, &serde_json::json!(1.25));
2690        let err = dt_f32
2691            .parse_and_store_default(&serde_json::json!(1e39))
2692            .unwrap_err();
2693        assert!(format!("{err}").contains("out of f32 range"));
2694        let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2695        let lit = dt_f64
2696            .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
2697            .unwrap();
2698        assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
2699        assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
2700        let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
2701        let l = dt_str
2702            .parse_and_store_default(&json_string("hello"))
2703            .unwrap();
2704        assert_eq!(l, AvroLiteral::String("hello".into()));
2705        assert_default_stored(&dt_str, &json_string("hello"));
2706        let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
2707        let l = dt_strv
2708            .parse_and_store_default(&json_string("view"))
2709            .unwrap();
2710        assert_eq!(l, AvroLiteral::String("view".into()));
2711        assert_default_stored(&dt_strv, &json_string("view"));
2712        let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
2713        let l = dt_uuid
2714            .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
2715            .unwrap();
2716        assert_eq!(
2717            l,
2718            AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
2719        );
2720        let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
2721        let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
2722        assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
2723        let err = dt_bin
2724            .parse_and_store_default(&json_string("€")) // U+20AC
2725            .unwrap_err();
2726        assert!(format!("{err}").contains("Invalid codepoint"));
2727        let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
2728        let ld = dt_date
2729            .parse_and_store_default(&serde_json::json!(1))
2730            .unwrap();
2731        assert_eq!(ld, AvroLiteral::Int(1));
2732        let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
2733        let lt = dt_tmill
2734            .parse_and_store_default(&serde_json::json!(86_400_000))
2735            .unwrap();
2736        assert_eq!(lt, AvroLiteral::Int(86_400_000));
2737        let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
2738        let ltm = dt_tmicros
2739            .parse_and_store_default(&serde_json::json!(1_000_000))
2740            .unwrap();
2741        assert_eq!(ltm, AvroLiteral::Long(1_000_000));
2742        let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
2743        let l1 = dt_ts_milli
2744            .parse_and_store_default(&serde_json::json!(123))
2745            .unwrap();
2746        assert_eq!(l1, AvroLiteral::Long(123));
2747        let mut dt_ts_micro =
2748            AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
2749        let l2 = dt_ts_micro
2750            .parse_and_store_default(&serde_json::json!(456))
2751            .unwrap();
2752        assert_eq!(l2, AvroLiteral::Long(456));
2753    }
2754
2755    #[test]
2756    fn test_validate_and_store_default_fixed_decimal_interval() {
2757        let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
2758        let l = dt_fixed
2759            .parse_and_store_default(&json_string("WXYZ"))
2760            .unwrap();
2761        assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
2762        let err = dt_fixed
2763            .parse_and_store_default(&json_string("TOO LONG"))
2764            .unwrap_err();
2765        assert!(err.to_string().contains("Default length"));
2766        let mut dt_dec_fixed =
2767            AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
2768        let l = dt_dec_fixed
2769            .parse_and_store_default(&json_string("abc"))
2770            .unwrap();
2771        assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
2772        let err = dt_dec_fixed
2773            .parse_and_store_default(&json_string("toolong"))
2774            .unwrap_err();
2775        assert!(err.to_string().contains("Default length"));
2776        let mut dt_dec_bytes =
2777            AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2778        let l = dt_dec_bytes
2779            .parse_and_store_default(&json_string("freeform"))
2780            .unwrap();
2781        assert_eq!(
2782            l,
2783            AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
2784        );
2785        let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
2786        let l = dt_interval
2787            .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
2788            .unwrap();
2789        assert_eq!(
2790            l,
2791            AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
2792        );
2793        let err = dt_interval
2794            .parse_and_store_default(&json_string("short"))
2795            .unwrap_err();
2796        assert!(err.to_string().contains("Default length"));
2797    }
2798
2799    #[test]
2800    fn test_validate_and_store_default_enum_list_map_struct() {
2801        let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
2802            .into_iter()
2803            .collect();
2804        let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
2805        let l = dt_enum
2806            .parse_and_store_default(&json_string("GREEN"))
2807            .unwrap();
2808        assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
2809        let err = dt_enum
2810            .parse_and_store_default(&json_string("YELLOW"))
2811            .unwrap_err();
2812        assert!(err.to_string().contains("Default enum symbol"));
2813        let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2814        let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
2815        let val = serde_json::json!([1, 2, 3]);
2816        let l = dt_list.parse_and_store_default(&val).unwrap();
2817        assert_eq!(
2818            l,
2819            AvroLiteral::Array(vec![
2820                AvroLiteral::Long(1),
2821                AvroLiteral::Long(2),
2822                AvroLiteral::Long(3)
2823            ])
2824        );
2825        let err = dt_list
2826            .parse_and_store_default(&serde_json::json!({"not":"array"}))
2827            .unwrap_err();
2828        assert!(err.to_string().contains("JSON array"));
2829        let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2830        let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
2831        let mv = serde_json::json!({"x": 1.5, "y": 2.5});
2832        let l = dt_map.parse_and_store_default(&mv).unwrap();
2833        let mut expected = IndexMap::new();
2834        expected.insert("x".into(), AvroLiteral::Double(1.5));
2835        expected.insert("y".into(), AvroLiteral::Double(2.5));
2836        assert_eq!(l, AvroLiteral::Map(expected));
2837        // Not object -> error
2838        let err = dt_map
2839            .parse_and_store_default(&serde_json::json!(123))
2840            .unwrap_err();
2841        assert!(err.to_string().contains("JSON object"));
2842        let mut field_a = AvroField {
2843            name: "a".into(),
2844            data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
2845        };
2846        let field_b = AvroField {
2847            name: "b".into(),
2848            data_type: AvroDataType::new(
2849                Codec::Int64,
2850                HashMap::new(),
2851                Some(Nullability::NullFirst),
2852            ),
2853        };
2854        let mut c_md = HashMap::new();
2855        c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
2856        let field_c = AvroField {
2857            name: "c".into(),
2858            data_type: AvroDataType::new(Codec::Utf8, c_md, None),
2859        };
2860        field_a.data_type.metadata.insert("doc".into(), "na".into());
2861        let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
2862        let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
2863        let default_obj = serde_json::json!({"a": 7});
2864        let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
2865        let mut expected = IndexMap::new();
2866        expected.insert("a".into(), AvroLiteral::Int(7));
2867        expected.insert("b".into(), AvroLiteral::Null);
2868        expected.insert("c".into(), AvroLiteral::String("xyz".into()));
2869        assert_eq!(l, AvroLiteral::Map(expected));
2870        assert_default_stored(&dt_struct, &default_obj);
2871        let req_field = AvroField {
2872            name: "req".into(),
2873            data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
2874        };
2875        let mut dt_bad = AvroDataType::new(
2876            Codec::Struct(Arc::from(vec![req_field])),
2877            HashMap::new(),
2878            None,
2879        );
2880        let err = dt_bad
2881            .parse_and_store_default(&serde_json::json!({}))
2882            .unwrap_err();
2883        assert!(
2884            err.to_string().contains("missing required subfield 'req'"),
2885            "unexpected error: {err}"
2886        );
2887        let err = dt_struct
2888            .parse_and_store_default(&serde_json::json!(10))
2889            .unwrap_err();
2890        err.to_string().contains("must be a JSON object");
2891    }
2892
2893    #[test]
2894    fn test_resolve_array_promotion_and_reader_metadata() {
2895        let mut w_add: HashMap<&str, Value> = HashMap::new();
2896        w_add.insert("who", json_string("writer"));
2897        let mut r_add: HashMap<&str, Value> = HashMap::new();
2898        r_add.insert("who", json_string("reader"));
2899        let writer_schema = Schema::Complex(ComplexType::Array(Array {
2900            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2901            attributes: Attributes {
2902                logical_type: None,
2903                additional: w_add,
2904            },
2905        }));
2906        let reader_schema = Schema::Complex(ComplexType::Array(Array {
2907            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
2908            attributes: Attributes {
2909                logical_type: None,
2910                additional: r_add,
2911            },
2912        }));
2913        let mut maker = Maker::new(false, false);
2914        let dt = maker
2915            .make_data_type(&writer_schema, Some(&reader_schema), None)
2916            .unwrap();
2917        assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
2918        if let Codec::List(inner) = dt.codec() {
2919            assert!(matches!(inner.codec(), Codec::Int64));
2920            assert_eq!(
2921                inner.resolution,
2922                Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2923            );
2924        } else {
2925            panic!("expected list codec");
2926        }
2927    }
2928
2929    #[test]
2930    fn test_resolve_fixed_success_name_and_size_match_and_alias() {
2931        let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
2932            name: "MD5",
2933            namespace: None,
2934            aliases: vec!["Hash16"],
2935            size: 16,
2936            attributes: Attributes::default(),
2937        }));
2938        let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
2939            name: "Hash16",
2940            namespace: None,
2941            aliases: vec![],
2942            size: 16,
2943            attributes: Attributes::default(),
2944        }));
2945        let mut maker = Maker::new(false, false);
2946        let dt = maker
2947            .make_data_type(&writer_schema, Some(&reader_schema), None)
2948            .unwrap();
2949        assert!(matches!(dt.codec(), Codec::Fixed(16)));
2950    }
2951
2952    #[test]
2953    fn test_resolve_records_mapping_default_fields_and_skip_fields() {
2954        let writer = Schema::Complex(ComplexType::Record(Record {
2955            name: "R",
2956            namespace: None,
2957            doc: None,
2958            aliases: vec![],
2959            fields: vec![
2960                crate::schema::Field {
2961                    name: "a",
2962                    doc: None,
2963                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2964                    default: None,
2965                    aliases: vec![],
2966                },
2967                crate::schema::Field {
2968                    name: "skipme",
2969                    doc: None,
2970                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2971                    default: None,
2972                    aliases: vec![],
2973                },
2974                crate::schema::Field {
2975                    name: "b",
2976                    doc: None,
2977                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2978                    default: None,
2979                    aliases: vec![],
2980                },
2981            ],
2982            attributes: Attributes::default(),
2983        }));
2984        let reader = Schema::Complex(ComplexType::Record(Record {
2985            name: "R",
2986            namespace: None,
2987            doc: None,
2988            aliases: vec![],
2989            fields: vec![
2990                crate::schema::Field {
2991                    name: "b",
2992                    doc: None,
2993                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2994                    default: None,
2995                    aliases: vec![],
2996                },
2997                crate::schema::Field {
2998                    name: "a",
2999                    doc: None,
3000                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3001                    default: None,
3002                    aliases: vec![],
3003                },
3004                crate::schema::Field {
3005                    name: "name",
3006                    doc: None,
3007                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3008                    default: Some(json_string("anon")),
3009                    aliases: vec![],
3010                },
3011                crate::schema::Field {
3012                    name: "opt",
3013                    doc: None,
3014                    r#type: Schema::Union(vec![
3015                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3016                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3017                    ]),
3018                    default: None, // should default to null because NullFirst
3019                    aliases: vec![],
3020                },
3021            ],
3022            attributes: Attributes::default(),
3023        }));
3024        let mut maker = Maker::new(false, false);
3025        let dt = maker
3026            .make_data_type(&writer, Some(&reader), None)
3027            .expect("record resolution");
3028        let fields = match dt.codec() {
3029            Codec::Struct(f) => f,
3030            other => panic!("expected struct, got {other:?}"),
3031        };
3032        assert_eq!(fields.len(), 4);
3033        assert_eq!(fields[0].name(), "b");
3034        assert_eq!(fields[1].name(), "a");
3035        assert_eq!(fields[2].name(), "name");
3036        assert_eq!(fields[3].name(), "opt");
3037        assert!(matches!(
3038            fields[1].data_type().resolution,
3039            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3040        ));
3041        let rec = match dt.resolution {
3042            Some(ResolutionInfo::Record(ref r)) => r.clone(),
3043            other => panic!("expected record resolution, got {other:?}"),
3044        };
3045        assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
3046        assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3047        assert!(rec.skip_fields[0].is_none());
3048        assert!(rec.skip_fields[2].is_none());
3049        let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
3050        assert!(matches!(skip1.codec(), Codec::Utf8));
3051        let name_md = &fields[2].data_type().metadata;
3052        assert_eq!(
3053            name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3054            Some(&"\"anon\"".to_string())
3055        );
3056        let opt_md = &fields[3].data_type().metadata;
3057        assert_eq!(
3058            opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3059            Some(&"null".to_string())
3060        );
3061    }
3062
3063    #[test]
3064    fn test_named_type_alias_resolution_record_cross_namespace() {
3065        let writer_record = Record {
3066            name: "PersonV2",
3067            namespace: Some("com.example.v2"),
3068            doc: None,
3069            aliases: vec!["com.example.Person"],
3070            fields: vec![
3071                AvroFieldSchema {
3072                    name: "name",
3073                    doc: None,
3074                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3075                    default: None,
3076                    aliases: vec![],
3077                },
3078                AvroFieldSchema {
3079                    name: "age",
3080                    doc: None,
3081                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3082                    default: None,
3083                    aliases: vec![],
3084                },
3085            ],
3086            attributes: Attributes::default(),
3087        };
3088        let reader_record = Record {
3089            name: "Person",
3090            namespace: Some("com.example"),
3091            doc: None,
3092            aliases: vec![],
3093            fields: writer_record.fields.clone(),
3094            attributes: Attributes::default(),
3095        };
3096        let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3097        let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3098        let mut maker = Maker::new(false, false);
3099        let result = maker
3100            .make_data_type(&writer_schema, Some(&reader_schema), None)
3101            .expect("record alias resolution should succeed");
3102        match result.codec {
3103            Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3104            other => panic!("expected struct, got {other:?}"),
3105        }
3106    }
3107
3108    #[test]
3109    fn test_named_type_alias_resolution_enum_cross_namespace() {
3110        let writer_enum = Enum {
3111            name: "ColorV2",
3112            namespace: Some("org.example.v2"),
3113            doc: None,
3114            aliases: vec!["org.example.Color"],
3115            symbols: vec!["RED", "GREEN", "BLUE"],
3116            default: None,
3117            attributes: Attributes::default(),
3118        };
3119        let reader_enum = Enum {
3120            name: "Color",
3121            namespace: Some("org.example"),
3122            doc: None,
3123            aliases: vec![],
3124            symbols: vec!["RED", "GREEN", "BLUE"],
3125            default: None,
3126            attributes: Attributes::default(),
3127        };
3128        let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3129        let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3130        let mut maker = Maker::new(false, false);
3131        maker
3132            .make_data_type(&writer_schema, Some(&reader_schema), None)
3133            .expect("enum alias resolution should succeed");
3134    }
3135
3136    #[test]
3137    fn test_named_type_alias_resolution_fixed_cross_namespace() {
3138        let writer_fixed = Fixed {
3139            name: "Fx10V2",
3140            namespace: Some("ns.v2"),
3141            aliases: vec!["ns.Fx10"],
3142            size: 10,
3143            attributes: Attributes::default(),
3144        };
3145        let reader_fixed = Fixed {
3146            name: "Fx10",
3147            namespace: Some("ns"),
3148            aliases: vec![],
3149            size: 10,
3150            attributes: Attributes::default(),
3151        };
3152        let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3153        let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3154        let mut maker = Maker::new(false, false);
3155        maker
3156            .make_data_type(&writer_schema, Some(&reader_schema), None)
3157            .expect("fixed alias resolution should succeed");
3158    }
3159}