Skip to main content

arrow_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Codec for Mapping Avro and Arrow types.
19
20use crate::schema::{
21    AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22    AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23    PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27    IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40/// Contains information about how to resolve differences between a writer's and a reader's schema.
41#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43    /// Indicates that the writer's type should be promoted to the reader's type.
44    Promotion(Promotion),
45    /// Indicates that a default value should be used for a field.
46    DefaultValue(AvroLiteral),
47    /// Provides mapping information for resolving enums.
48    EnumMapping(EnumMapping),
49    /// Provides resolution information for record fields.
50    Record(ResolvedRecord),
51    /// Provides mapping and shape info for resolving unions.
52    Union(ResolvedUnion),
53}
54
55/// Represents a literal Avro value.
56///
57/// This is used to represent default values in an Avro schema.
58#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60    /// Represents a null value.
61    Null,
62    /// Represents a boolean value.
63    Boolean(bool),
64    /// Represents an integer value.
65    Int(i32),
66    /// Represents a long value.
67    Long(i64),
68    /// Represents a float value.
69    Float(f32),
70    /// Represents a double value.
71    Double(f64),
72    /// Represents a bytes value.
73    Bytes(Vec<u8>),
74    /// Represents a string value.
75    String(String),
76    /// Represents an enum symbol.
77    Enum(String),
78    /// Represents a JSON array default for an Avro array, containing element literals.
79    Array(Vec<AvroLiteral>),
80    /// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals.
81    Map(IndexMap<String, AvroLiteral>),
82}
83
84/// Contains the necessary information to resolve a writer's record against a reader's record schema.
85#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87    /// Maps a writer's field index to the field's resolution against the reader's schema.
88    pub(crate) writer_fields: Arc<[ResolvedField]>,
89    /// A list of indices in the reader's schema for fields that have a default value.
90    pub(crate) default_fields: Arc<[usize]>,
91}
92
93/// Resolution information for record fields in the writer schema.
94#[derive(Debug, Clone, PartialEq)]
95pub(crate) enum ResolvedField {
96    /// Resolves to a field indexed in the reader schema.
97    ToReader(usize),
98    /// For fields present in the writer's schema but not the reader's, this stores their data type.
99    /// This is needed to correctly skip over these fields during deserialization.
100    Skip(AvroDataType),
101}
102
103/// Defines the type of promotion to be applied during schema resolution.
104///
105/// Schema resolution may require promoting a writer's data type to a reader's data type.
106/// For example, an `int` can be promoted to a `long`, `float`, or `double`.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub(crate) enum Promotion {
109    /// Direct read with no data type promotion.
110    Direct,
111    /// Promotes an `int` to a `long`.
112    IntToLong,
113    /// Promotes an `int` to a `float`.
114    IntToFloat,
115    /// Promotes an `int` to a `double`.
116    IntToDouble,
117    /// Promotes a `long` to a `float`.
118    LongToFloat,
119    /// Promotes a `long` to a `double`.
120    LongToDouble,
121    /// Promotes a `float` to a `double`.
122    FloatToDouble,
123    /// Promotes a `string` to `bytes`.
124    StringToBytes,
125    /// Promotes `bytes` to a `string`.
126    BytesToString,
127}
128
129impl Display for Promotion {
130    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
131        match self {
132            Self::Direct => write!(formatter, "Direct"),
133            Self::IntToLong => write!(formatter, "Int->Long"),
134            Self::IntToFloat => write!(formatter, "Int->Float"),
135            Self::IntToDouble => write!(formatter, "Int->Double"),
136            Self::LongToFloat => write!(formatter, "Long->Float"),
137            Self::LongToDouble => write!(formatter, "Long->Double"),
138            Self::FloatToDouble => write!(formatter, "Float->Double"),
139            Self::StringToBytes => write!(formatter, "String->Bytes"),
140            Self::BytesToString => write!(formatter, "Bytes->String"),
141        }
142    }
143}
144
145/// Information required to resolve a writer union against a reader union (or single type).
146#[derive(Debug, Clone, PartialEq)]
147pub(crate) struct ResolvedUnion {
148    /// For each writer branch index, the reader branch index and how to read it.
149    /// `None` means the writer branch doesn't resolve against the reader.
150    pub(crate) writer_to_reader: Arc<[Option<(usize, ResolutionInfo)>]>,
151    /// Whether the writer schema at this site is a union
152    pub(crate) writer_is_union: bool,
153    /// Whether the reader schema at this site is a union
154    pub(crate) reader_is_union: bool,
155}
156
157/// Holds the mapping information for resolving Avro enums.
158///
159/// When resolving schemas, the writer's enum symbols must be mapped to the reader's symbols.
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub(crate) struct EnumMapping {
162    /// A mapping from the writer's symbol index to the reader's symbol index.
163    pub(crate) mapping: Arc<[i32]>,
164    /// The index to use for a writer's symbol that is not present in the reader's enum
165    /// and a default value is specified in the reader's schema.
166    pub(crate) default_index: i32,
167}
168
169#[cfg(feature = "canonical_extension_types")]
170fn with_extension_type(codec: &Codec, field: Field) -> Field {
171    match codec {
172        Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
173        _ => field,
174    }
175}
176
177/// An Avro datatype mapped to the arrow data model
178#[derive(Debug, Clone, PartialEq)]
179pub(crate) struct AvroDataType {
180    nullability: Option<Nullability>,
181    metadata: HashMap<String, String>,
182    codec: Codec,
183    pub(crate) resolution: Option<ResolutionInfo>,
184}
185
186impl AvroDataType {
187    /// Create a new [`AvroDataType`] with the given parts.
188    pub(crate) fn new(
189        codec: Codec,
190        metadata: HashMap<String, String>,
191        nullability: Option<Nullability>,
192    ) -> Self {
193        AvroDataType {
194            codec,
195            metadata,
196            nullability,
197            resolution: None,
198        }
199    }
200
201    #[inline]
202    fn new_with_resolution(
203        codec: Codec,
204        metadata: HashMap<String, String>,
205        nullability: Option<Nullability>,
206        resolution: Option<ResolutionInfo>,
207    ) -> Self {
208        Self {
209            codec,
210            metadata,
211            nullability,
212            resolution,
213        }
214    }
215
216    /// Returns an arrow [`Field`] with the given name
217    pub(crate) fn field_with_name(&self, name: &str) -> Field {
218        let mut nullable = self.nullability.is_some();
219        if !nullable {
220            if let Codec::Union(children, _, _) = self.codec() {
221                // If any encoded branch is `null`, mark field as nullable
222                if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
223                    nullable = true;
224                }
225            }
226        }
227        let data_type = self.codec.data_type();
228        let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
229        #[cfg(feature = "canonical_extension_types")]
230        return with_extension_type(&self.codec, field);
231        #[cfg(not(feature = "canonical_extension_types"))]
232        field
233    }
234
235    /// Returns a reference to the codec used by this data type
236    ///
237    /// The codec determines how Avro data is encoded and mapped to Arrow data types.
238    /// This is useful when we need to inspect or use the specific encoding of a field.
239    pub(crate) fn codec(&self) -> &Codec {
240        &self.codec
241    }
242
243    /// Returns the nullability status of this data type
244    ///
245    /// In Avro, nullability is represented through unions with null types.
246    /// The returned value indicates how nulls are encoded in the Avro format:
247    /// - `Some(Nullability::NullFirst)` - Nulls are encoded as the first union variant
248    /// - `Some(Nullability::NullSecond)` - Nulls are encoded as the second union variant
249    /// - `None` - The type is not nullable
250    pub(crate) fn nullability(&self) -> Option<Nullability> {
251        self.nullability
252    }
253
254    #[inline]
255    fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
256        fn expect_string<'v>(
257            default_json: &'v Value,
258            data_type: &str,
259        ) -> Result<&'v str, ArrowError> {
260            match default_json {
261                Value::String(s) => Ok(s.as_str()),
262                _ => Err(ArrowError::SchemaError(format!(
263                    "Default value must be a JSON string for {data_type}"
264                ))),
265            }
266        }
267
268        fn parse_bytes_default(
269            default_json: &Value,
270            expected_len: Option<usize>,
271        ) -> Result<Vec<u8>, ArrowError> {
272            let s = expect_string(default_json, "bytes/fixed logical types")?;
273            let mut out = Vec::with_capacity(s.len());
274            for ch in s.chars() {
275                let cp = ch as u32;
276                if cp > 0xFF {
277                    return Err(ArrowError::SchemaError(format!(
278                        "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
279                    )));
280                }
281                out.push(cp as u8);
282            }
283            if let Some(len) = expected_len {
284                if out.len() != len {
285                    return Err(ArrowError::SchemaError(format!(
286                        "Default length {} does not match expected fixed size {len}",
287                        out.len(),
288                    )));
289                }
290            }
291            Ok(out)
292        }
293
294        fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
295            match default_json {
296                Value::Number(n) => n.as_i64().ok_or_else(|| {
297                    ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
298                }),
299                _ => Err(ArrowError::SchemaError(format!(
300                    "Default {data_type} must be a JSON integer"
301                ))),
302            }
303        }
304
305        fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
306            match default_json {
307                Value::Number(n) => n.as_f64().ok_or_else(|| {
308                    ArrowError::SchemaError(format!("Default {data_type} must be a number"))
309                }),
310                _ => Err(ArrowError::SchemaError(format!(
311                    "Default {data_type} must be a JSON number"
312                ))),
313            }
314        }
315
316        // Handle JSON nulls per-spec: allowed only for `null` type or unions with null FIRST
317        if default_json.is_null() {
318            return match self.codec() {
319                Codec::Null => Ok(AvroLiteral::Null),
320                Codec::Union(encodings, _, _) if !encodings.is_empty()
321                    && matches!(encodings[0].codec(), Codec::Null) =>
322                    {
323                        Ok(AvroLiteral::Null)
324                    }
325                _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
326                _ => Err(ArrowError::SchemaError(
327                    "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
328                        .to_string(),
329                )),
330            };
331        }
332        let lit = match self.codec() {
333            Codec::Null => {
334                return Err(ArrowError::SchemaError(
335                    "Default for `null` type must be JSON null".to_string(),
336                ));
337            }
338            Codec::Boolean => match default_json {
339                Value::Bool(b) => AvroLiteral::Boolean(*b),
340                _ => {
341                    return Err(ArrowError::SchemaError(
342                        "Boolean default must be a JSON boolean".to_string(),
343                    ));
344                }
345            },
346            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
347                let i = parse_json_i64(default_json, "int")?;
348                if i < i32::MIN as i64 || i > i32::MAX as i64 {
349                    return Err(ArrowError::SchemaError(format!(
350                        "Default int {i} out of i32 range"
351                    )));
352                }
353                AvroLiteral::Int(i as i32)
354            }
355            Codec::Int64
356            | Codec::TimeMicros
357            | Codec::TimestampMillis(_)
358            | Codec::TimestampMicros(_)
359            | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
360            #[cfg(feature = "avro_custom_types")]
361            Codec::DurationNanos
362            | Codec::DurationMicros
363            | Codec::DurationMillis
364            | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
365            #[cfg(feature = "avro_custom_types")]
366            Codec::Int8 => {
367                let i = parse_json_i64(default_json, "int")?;
368                if i < i8::MIN as i64 || i > i8::MAX as i64 {
369                    return Err(ArrowError::SchemaError(format!(
370                        "Default int8 {i} out of i8 range"
371                    )));
372                }
373                AvroLiteral::Int(i as i32)
374            }
375            #[cfg(feature = "avro_custom_types")]
376            Codec::Int16 => {
377                let i = parse_json_i64(default_json, "int")?;
378                if i < i16::MIN as i64 || i > i16::MAX as i64 {
379                    return Err(ArrowError::SchemaError(format!(
380                        "Default int16 {i} out of i16 range"
381                    )));
382                }
383                AvroLiteral::Int(i as i32)
384            }
385            #[cfg(feature = "avro_custom_types")]
386            Codec::UInt8 => {
387                let i = parse_json_i64(default_json, "int")?;
388                if i < 0 || i > u8::MAX as i64 {
389                    return Err(ArrowError::SchemaError(format!(
390                        "Default uint8 {i} out of u8 range"
391                    )));
392                }
393                AvroLiteral::Int(i as i32)
394            }
395            #[cfg(feature = "avro_custom_types")]
396            Codec::UInt16 => {
397                let i = parse_json_i64(default_json, "int")?;
398                if i < 0 || i > u16::MAX as i64 {
399                    return Err(ArrowError::SchemaError(format!(
400                        "Default uint16 {i} out of u16 range"
401                    )));
402                }
403                AvroLiteral::Int(i as i32)
404            }
405            #[cfg(feature = "avro_custom_types")]
406            Codec::UInt32 => {
407                let i = parse_json_i64(default_json, "long")?;
408                if i < 0 || i > u32::MAX as i64 {
409                    return Err(ArrowError::SchemaError(format!(
410                        "Default uint32 {i} out of u32 range"
411                    )));
412                }
413                AvroLiteral::Long(i)
414            }
415            #[cfg(feature = "avro_custom_types")]
416            Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
417                AvroLiteral::Long(parse_json_i64(default_json, "long")?)
418            }
419            #[cfg(feature = "avro_custom_types")]
420            Codec::UInt64 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?),
421            #[cfg(feature = "avro_custom_types")]
422            Codec::Float16 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(2))?),
423            #[cfg(feature = "avro_custom_types")]
424            Codec::Time32Secs => {
425                let i = parse_json_i64(default_json, "int")?;
426                if i < i32::MIN as i64 || i > i32::MAX as i64 {
427                    return Err(ArrowError::SchemaError(format!(
428                        "Default time32-secs {i} out of i32 range"
429                    )));
430                }
431                AvroLiteral::Int(i as i32)
432            }
433            #[cfg(feature = "avro_custom_types")]
434            Codec::IntervalYearMonth => {
435                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
436            }
437            #[cfg(feature = "avro_custom_types")]
438            Codec::IntervalMonthDayNano => {
439                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
440            }
441            #[cfg(feature = "avro_custom_types")]
442            Codec::IntervalDayTime => {
443                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
444            }
445            Codec::Float32 => {
446                let f = parse_json_f64(default_json, "float")?;
447                if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
448                    return Err(ArrowError::SchemaError(format!(
449                        "Default float {f} out of f32 range or not finite"
450                    )));
451                }
452                AvroLiteral::Float(f as f32)
453            }
454            Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
455            Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
456                AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
457            }
458            Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
459            Codec::Fixed(sz) => {
460                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
461            }
462            Codec::Decimal(_, _, fixed_size) => {
463                AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
464            }
465            Codec::Enum(symbols) => {
466                let s = expect_string(default_json, "enum")?;
467                if symbols.iter().any(|sym| sym == s) {
468                    AvroLiteral::Enum(s.to_string())
469                } else {
470                    return Err(ArrowError::SchemaError(format!(
471                        "Default enum symbol {s:?} not found in reader enum symbols"
472                    )));
473                }
474            }
475            Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
476            Codec::List(item_dt) => match default_json {
477                Value::Array(items) => AvroLiteral::Array(
478                    items
479                        .iter()
480                        .map(|v| item_dt.parse_default_literal(v))
481                        .collect::<Result<_, _>>()?,
482                ),
483                _ => {
484                    return Err(ArrowError::SchemaError(
485                        "Default value must be a JSON array for Avro array type".to_string(),
486                    ));
487                }
488            },
489            Codec::Map(val_dt) => match default_json {
490                Value::Object(map) => {
491                    let mut out = IndexMap::with_capacity(map.len());
492                    for (k, v) in map {
493                        out.insert(k.clone(), val_dt.parse_default_literal(v)?);
494                    }
495                    AvroLiteral::Map(out)
496                }
497                _ => {
498                    return Err(ArrowError::SchemaError(
499                        "Default value must be a JSON object for Avro map type".to_string(),
500                    ));
501                }
502            },
503            Codec::Struct(fields) => match default_json {
504                Value::Object(obj) => {
505                    let mut out: IndexMap<String, AvroLiteral> =
506                        IndexMap::with_capacity(fields.len());
507                    for f in fields.as_ref() {
508                        let name = f.name().to_string();
509                        if let Some(sub) = obj.get(&name) {
510                            out.insert(name, f.data_type().parse_default_literal(sub)?);
511                        } else {
512                            // Cache metadata lookup once
513                            let stored_default =
514                                f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
515                            if stored_default.is_none()
516                                && f.data_type().nullability() == Some(Nullability::default())
517                            {
518                                out.insert(name, AvroLiteral::Null);
519                            } else if let Some(default_json) = stored_default {
520                                let v: Value =
521                                    serde_json::from_str(default_json).map_err(|e| {
522                                        ArrowError::SchemaError(format!(
523                                            "Failed to parse stored subfield default JSON for '{}': {e}",
524                                            f.name(),
525                                        ))
526                                    })?;
527                                out.insert(name, f.data_type().parse_default_literal(&v)?);
528                            } else {
529                                return Err(ArrowError::SchemaError(format!(
530                                    "Record default missing required subfield '{}' with non-nullable type {:?}",
531                                    f.name(),
532                                    f.data_type().codec()
533                                )));
534                            }
535                        }
536                    }
537                    AvroLiteral::Map(out)
538                }
539                _ => {
540                    return Err(ArrowError::SchemaError(
541                        "Default value for record/struct must be a JSON object".to_string(),
542                    ));
543                }
544            },
545            Codec::Union(encodings, _, _) => {
546                let Some(default_encoding) = encodings.first() else {
547                    return Err(ArrowError::SchemaError(
548                        "Union with no branches cannot have a default".to_string(),
549                    ));
550                };
551                default_encoding.parse_default_literal(default_json)?
552            }
553            #[cfg(feature = "avro_custom_types")]
554            Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
555        };
556        Ok(lit)
557    }
558
559    fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
560        let json_text = serde_json::to_string(default_json).map_err(|e| {
561            ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
562        })?;
563        self.metadata
564            .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
565        Ok(())
566    }
567
568    fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
569        let lit = self.parse_default_literal(default_json)?;
570        self.store_default(default_json)?;
571        Ok(lit)
572    }
573}
574
575/// A named [`AvroDataType`]
576#[derive(Debug, Clone, PartialEq)]
577pub(crate) struct AvroField {
578    name: String,
579    data_type: AvroDataType,
580}
581
582impl AvroField {
583    /// Returns the arrow [`Field`]
584    pub(crate) fn field(&self) -> Field {
585        self.data_type.field_with_name(&self.name)
586    }
587
588    /// Returns the [`AvroDataType`]
589    pub(crate) fn data_type(&self) -> &AvroDataType {
590        &self.data_type
591    }
592
593    /// Returns a new [`AvroField`] with Utf8View support enabled
594    ///
595    /// This will convert any Utf8 codecs to Utf8View codecs. This method is used to
596    /// enable potential performance optimizations in string-heavy workloads by using
597    /// Arrow's StringViewArray data structure.
598    ///
599    /// Returns a new `AvroField` with the same structure, but with string types
600    /// converted to use `Utf8View` instead of `Utf8`.
601    pub(crate) fn with_utf8view(&self) -> Self {
602        let mut field = self.clone();
603        if let Codec::Utf8 = field.data_type.codec {
604            field.data_type.codec = Codec::Utf8View;
605        }
606        field
607    }
608
609    /// Returns the name of this Avro field
610    ///
611    /// This is the field name as defined in the Avro schema.
612    /// It's used to identify fields within a record structure.
613    pub(crate) fn name(&self) -> &str {
614        &self.name
615    }
616}
617
618impl<'a> TryFrom<&Schema<'a>> for AvroField {
619    type Error = ArrowError;
620
621    fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
622        match schema {
623            Schema::Complex(ComplexType::Record(r)) => {
624                let mut resolver = Maker::new(false, false);
625                let data_type = resolver.make_data_type(schema, None, None)?;
626                Ok(AvroField {
627                    data_type,
628                    name: r.name.to_string(),
629                })
630            }
631            _ => Err(ArrowError::ParseError(format!(
632                "Expected record got {schema:?}"
633            ))),
634        }
635    }
636}
637
638/// Builder for an [`AvroField`]
639#[derive(Debug)]
640pub(crate) struct AvroFieldBuilder<'a> {
641    writer_schema: &'a Schema<'a>,
642    reader_schema: Option<&'a Schema<'a>>,
643    use_utf8view: bool,
644    strict_mode: bool,
645}
646
647impl<'a> AvroFieldBuilder<'a> {
648    /// Creates a new [`AvroFieldBuilder`] for a given writer schema.
649    pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
650        Self {
651            writer_schema,
652            reader_schema: None,
653            use_utf8view: false,
654            strict_mode: false,
655        }
656    }
657
658    /// Sets the reader schema for schema resolution.
659    ///
660    /// If a reader schema is provided, the builder will produce a resolved `AvroField`
661    /// that can handle differences between the writer's and reader's schemas.
662    #[inline]
663    pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
664        self.reader_schema = Some(reader_schema);
665        self
666    }
667
668    /// Enable or disable Utf8View support
669    pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
670        self.use_utf8view = use_utf8view;
671        self
672    }
673
674    /// Enable or disable strict mode.
675    pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
676        self.strict_mode = strict_mode;
677        self
678    }
679
680    /// Build an [`AvroField`] from the builder
681    pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
682        match self.writer_schema {
683            Schema::Complex(ComplexType::Record(r)) => {
684                let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
685                let data_type =
686                    resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
687                Ok(AvroField {
688                    name: r.name.to_string(),
689                    data_type,
690                })
691            }
692            _ => Err(ArrowError::ParseError(format!(
693                "Expected a Record schema to build an AvroField, but got {:?}",
694                self.writer_schema
695            ))),
696        }
697    }
698}
699
700/// An Avro encoding
701///
702/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
703#[derive(Debug, Clone, PartialEq)]
704pub(crate) enum Codec {
705    /// Represents Avro null type, maps to Arrow's Null data type
706    Null,
707    /// Represents Avro boolean type, maps to Arrow's Boolean data type
708    Boolean,
709    /// Represents Avro int type, maps to Arrow's Int32 data type
710    Int32,
711    /// Represents Avro long type, maps to Arrow's Int64 data type
712    Int64,
713    /// Represents Avro float type, maps to Arrow's Float32 data type
714    Float32,
715    /// Represents Avro double type, maps to Arrow's Float64 data type
716    Float64,
717    /// Represents Avro bytes type, maps to Arrow's Binary data type
718    Binary,
719    /// String data represented as UTF-8 encoded bytes, corresponding to Arrow's StringArray
720    Utf8,
721    /// String data represented as UTF-8 encoded bytes with an optimized view representation,
722    /// corresponding to Arrow's StringViewArray which provides better performance for string operations
723    ///
724    /// The Utf8View option can be enabled via `ReadOptions::use_utf8view`.
725    Utf8View,
726    /// Represents Avro date logical type, maps to Arrow's Date32 data type
727    Date32,
728    /// Represents Avro time-millis logical type, maps to Arrow's Time32(TimeUnit::Millisecond) data type
729    TimeMillis,
730    /// Represents Avro time-micros logical type, maps to Arrow's Time64(TimeUnit::Microsecond) data type
731    TimeMicros,
732    /// Represents Avro timestamp-millis or local-timestamp-millis logical type
733    ///
734    /// Maps to Arrow's Timestamp(TimeUnit::Millisecond) data type
735    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
736    TimestampMillis(bool),
737    /// Represents Avro timestamp-micros or local-timestamp-micros logical type
738    ///
739    /// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
740    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
741    TimestampMicros(bool),
742    /// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
743    ///
744    /// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
745    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
746    TimestampNanos(bool),
747    /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
748    /// The i32 parameter indicates the fixed binary size
749    Fixed(i32),
750    /// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types
751    ///
752    /// The fields are `(precision, scale, fixed_size)`.
753    /// - `precision` (`usize`): Total number of digits.
754    /// - `scale` (`Option<usize>`): Number of fractional digits.
755    /// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
756    Decimal(usize, Option<usize>, Option<usize>),
757    /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16.
758    Uuid,
759    /// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type.
760    ///
761    /// The enclosed value contains the enum's symbols.
762    Enum(Arc<[String]>),
763    /// Represents Avro array type, maps to Arrow's List data type
764    List(Arc<AvroDataType>),
765    /// Represents Avro record type, maps to Arrow's Struct data type
766    Struct(Arc<[AvroField]>),
767    /// Represents Avro map type, maps to Arrow's Map data type
768    Map(Arc<AvroDataType>),
769    /// Represents Avro duration logical type, maps to Arrow's Interval(IntervalUnit::MonthDayNano) data type
770    Interval,
771    /// Represents Avro union type, maps to Arrow's Union data type
772    Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
773    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Nanosecond)
774    #[cfg(feature = "avro_custom_types")]
775    DurationNanos,
776    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Microsecond)
777    #[cfg(feature = "avro_custom_types")]
778    DurationMicros,
779    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Millisecond)
780    #[cfg(feature = "avro_custom_types")]
781    DurationMillis,
782    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Second)
783    #[cfg(feature = "avro_custom_types")]
784    DurationSeconds,
785    #[cfg(feature = "avro_custom_types")]
786    RunEndEncoded(Arc<AvroDataType>, u8),
787    /// Arrow Int8 custom logical type (arrow.int8)
788    #[cfg(feature = "avro_custom_types")]
789    Int8,
790    /// Arrow Int16 custom logical type (arrow.int16)
791    #[cfg(feature = "avro_custom_types")]
792    Int16,
793    /// Arrow UInt8 custom logical type (arrow.uint8)
794    #[cfg(feature = "avro_custom_types")]
795    UInt8,
796    /// Arrow UInt16 custom logical type (arrow.uint16)
797    #[cfg(feature = "avro_custom_types")]
798    UInt16,
799    /// Arrow UInt32 custom logical type (arrow.uint32)
800    #[cfg(feature = "avro_custom_types")]
801    UInt32,
802    /// Arrow UInt64 custom logical type (arrow.uint64) - stored as fixed(8)
803    #[cfg(feature = "avro_custom_types")]
804    UInt64,
805    /// Arrow Float16 custom logical type (arrow.float16) - stored as fixed(2)
806    #[cfg(feature = "avro_custom_types")]
807    Float16,
808    /// Arrow Date64 custom logical type (arrow.date64)
809    #[cfg(feature = "avro_custom_types")]
810    Date64,
811    /// Arrow Time64(Nanosecond) custom logical type (arrow.time64-nanosecond)
812    #[cfg(feature = "avro_custom_types")]
813    TimeNanos,
814    /// Arrow Time32(Second) custom logical type (arrow.time32-second)
815    #[cfg(feature = "avro_custom_types")]
816    Time32Secs,
817    /// Arrow Timestamp(Second) custom logical type (arrow.timestamp-second)
818    /// The bool indicates UTC (true) or local (false)
819    #[cfg(feature = "avro_custom_types")]
820    TimestampSecs(bool),
821    /// Arrow Interval(YearMonth) custom logical type (arrow.interval-year-month)
822    #[cfg(feature = "avro_custom_types")]
823    IntervalYearMonth,
824    /// Arrow Interval(MonthDayNano) custom logical type (arrow.interval-month-day-nano)
825    #[cfg(feature = "avro_custom_types")]
826    IntervalMonthDayNano,
827    /// Arrow Interval(DayTime) custom logical type (arrow.interval-day-time)
828    #[cfg(feature = "avro_custom_types")]
829    IntervalDayTime,
830}
831
832impl Codec {
833    fn data_type(&self) -> DataType {
834        match self {
835            Self::Null => DataType::Null,
836            Self::Boolean => DataType::Boolean,
837            Self::Int32 => DataType::Int32,
838            Self::Int64 => DataType::Int64,
839            Self::Float32 => DataType::Float32,
840            Self::Float64 => DataType::Float64,
841            Self::Binary => DataType::Binary,
842            Self::Utf8 => DataType::Utf8,
843            Self::Utf8View => DataType::Utf8View,
844            Self::Date32 => DataType::Date32,
845            Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
846            Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
847            Self::TimestampMillis(is_utc) => {
848                DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
849            }
850            Self::TimestampMicros(is_utc) => {
851                DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
852            }
853            Self::TimestampNanos(is_utc) => {
854                DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
855            }
856            Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
857            Self::Fixed(size) => DataType::FixedSizeBinary(*size),
858            Self::Decimal(precision, scale, _size) => {
859                let p = *precision as u8;
860                let s = scale.unwrap_or(0) as i8;
861                #[cfg(feature = "small_decimals")]
862                {
863                    if *precision <= DECIMAL32_MAX_PRECISION as usize {
864                        DataType::Decimal32(p, s)
865                    } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
866                        DataType::Decimal64(p, s)
867                    } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
868                        DataType::Decimal128(p, s)
869                    } else {
870                        DataType::Decimal256(p, s)
871                    }
872                }
873                #[cfg(not(feature = "small_decimals"))]
874                {
875                    if *precision <= DECIMAL128_MAX_PRECISION as usize {
876                        DataType::Decimal128(p, s)
877                    } else {
878                        DataType::Decimal256(p, s)
879                    }
880                }
881            }
882            Self::Uuid => DataType::FixedSizeBinary(16),
883            Self::Enum(_) => {
884                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
885            }
886            Self::List(f) => {
887                DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
888            }
889            Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
890            Self::Map(value_type) => {
891                let val_field = value_type.field_with_name("value");
892                DataType::Map(
893                    Arc::new(Field::new(
894                        "entries",
895                        DataType::Struct(Fields::from(vec![
896                            Field::new("key", DataType::Utf8, false),
897                            val_field,
898                        ])),
899                        false,
900                    )),
901                    false,
902                )
903            }
904            Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
905            #[cfg(feature = "avro_custom_types")]
906            Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
907            #[cfg(feature = "avro_custom_types")]
908            Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
909            #[cfg(feature = "avro_custom_types")]
910            Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
911            #[cfg(feature = "avro_custom_types")]
912            Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
913            #[cfg(feature = "avro_custom_types")]
914            Self::RunEndEncoded(values, bits) => {
915                let run_ends_dt = match *bits {
916                    16 => DataType::Int16,
917                    32 => DataType::Int32,
918                    64 => DataType::Int64,
919                    _ => unreachable!(),
920                };
921                DataType::RunEndEncoded(
922                    Arc::new(Field::new("run_ends", run_ends_dt, false)),
923                    Arc::new(Field::new("values", values.codec().data_type(), true)),
924                )
925            }
926            #[cfg(feature = "avro_custom_types")]
927            Self::Int8 => DataType::Int8,
928            #[cfg(feature = "avro_custom_types")]
929            Self::Int16 => DataType::Int16,
930            #[cfg(feature = "avro_custom_types")]
931            Self::UInt8 => DataType::UInt8,
932            #[cfg(feature = "avro_custom_types")]
933            Self::UInt16 => DataType::UInt16,
934            #[cfg(feature = "avro_custom_types")]
935            Self::UInt32 => DataType::UInt32,
936            #[cfg(feature = "avro_custom_types")]
937            Self::UInt64 => DataType::UInt64,
938            #[cfg(feature = "avro_custom_types")]
939            Self::Float16 => DataType::Float16,
940            #[cfg(feature = "avro_custom_types")]
941            Self::Date64 => DataType::Date64,
942            #[cfg(feature = "avro_custom_types")]
943            Self::TimeNanos => DataType::Time64(TimeUnit::Nanosecond),
944            #[cfg(feature = "avro_custom_types")]
945            Self::Time32Secs => DataType::Time32(TimeUnit::Second),
946            #[cfg(feature = "avro_custom_types")]
947            Self::TimestampSecs(is_utc) => {
948                DataType::Timestamp(TimeUnit::Second, is_utc.then(|| "+00:00".into()))
949            }
950            #[cfg(feature = "avro_custom_types")]
951            Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
952            #[cfg(feature = "avro_custom_types")]
953            Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
954            #[cfg(feature = "avro_custom_types")]
955            Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
956        }
957    }
958
959    /// Converts a string codec to use Utf8View if requested
960    ///
961    /// The conversion only happens if both:
962    /// 1. `use_utf8view` is true
963    /// 2. The codec is currently `Utf8`
964    pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
965        if use_utf8view && matches!(self, Self::Utf8) {
966            Self::Utf8View
967        } else {
968            self
969        }
970    }
971
972    #[inline]
973    fn union_field_name(&self) -> String {
974        UnionFieldKind::from(self).as_ref().to_owned()
975    }
976}
977
978impl From<PrimitiveType> for Codec {
979    fn from(value: PrimitiveType) -> Self {
980        match value {
981            PrimitiveType::Null => Self::Null,
982            PrimitiveType::Boolean => Self::Boolean,
983            PrimitiveType::Int => Self::Int32,
984            PrimitiveType::Long => Self::Int64,
985            PrimitiveType::Float => Self::Float32,
986            PrimitiveType::Double => Self::Float64,
987            PrimitiveType::Bytes => Self::Binary,
988            PrimitiveType::String => Self::Utf8,
989        }
990    }
991}
992
993/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro
994/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian).
995///
996/// Per Avro spec (Decimal logical type), for a fixed length `n`:
997/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋.
998///
999/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports
1000/// Decimal256, which is 32 bytes and has max precision 76).
1001const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
1002    // Precomputed exact table for n = 1..=32
1003    // 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28,
1004    // 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52,
1005    // 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76
1006    const MAX_P: [usize; 32] = [
1007        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1008        59, 62, 64, 67, 69, 71, 74, 76,
1009    ];
1010    match n {
1011        1..=32 => Some(MAX_P[n - 1]),
1012        _ => None,
1013    }
1014}
1015
1016fn parse_decimal_attributes(
1017    attributes: &Attributes,
1018    fallback_size: Option<usize>,
1019    precision_required: bool,
1020) -> Result<(usize, usize, Option<usize>), ArrowError> {
1021    let precision = attributes
1022        .additional
1023        .get("precision")
1024        .and_then(|v| v.as_u64())
1025        .or(if precision_required { None } else { Some(10) })
1026        .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
1027        as usize;
1028    let scale = attributes
1029        .additional
1030        .get("scale")
1031        .and_then(|v| v.as_u64())
1032        .unwrap_or(0) as usize;
1033    let size = attributes
1034        .additional
1035        .get("size")
1036        .and_then(|v| v.as_u64())
1037        .map(|s| s as usize)
1038        .or(fallback_size);
1039    if precision == 0 {
1040        return Err(ArrowError::ParseError(
1041            "Decimal requires precision > 0".to_string(),
1042        ));
1043    }
1044    if scale > precision {
1045        return Err(ArrowError::ParseError(format!(
1046            "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
1047        )));
1048    }
1049    if precision > DECIMAL256_MAX_PRECISION as usize {
1050        return Err(ArrowError::ParseError(format!(
1051            "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
1052            DECIMAL256_MAX_PRECISION
1053        )));
1054    }
1055    if let Some(sz) = size {
1056        let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
1057            ArrowError::ParseError(format!(
1058                "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
1059            ))
1060        })?;
1061        if precision > max_p {
1062            return Err(ArrowError::ParseError(format!(
1063                "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
1064            )));
1065        }
1066    }
1067    Ok((precision, scale, size))
1068}
1069
1070#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
1071#[strum(serialize_all = "snake_case")]
1072enum UnionFieldKind {
1073    Null,
1074    Boolean,
1075    Int,
1076    Long,
1077    Float,
1078    Double,
1079    Bytes,
1080    String,
1081    Date,
1082    TimeMillis,
1083    TimeMicros,
1084    TimestampMillisUtc,
1085    TimestampMillisLocal,
1086    TimestampMicrosUtc,
1087    TimestampMicrosLocal,
1088    TimestampNanosUtc,
1089    TimestampNanosLocal,
1090    Duration,
1091    Fixed,
1092    Decimal,
1093    Enum,
1094    Array,
1095    Record,
1096    Map,
1097    Uuid,
1098    Union,
1099}
1100
1101impl From<&Codec> for UnionFieldKind {
1102    fn from(c: &Codec) -> Self {
1103        match c {
1104            Codec::Null => Self::Null,
1105            Codec::Boolean => Self::Boolean,
1106            Codec::Int32 => Self::Int,
1107            Codec::Int64 => Self::Long,
1108            Codec::Float32 => Self::Float,
1109            Codec::Float64 => Self::Double,
1110            Codec::Binary => Self::Bytes,
1111            Codec::Utf8 | Codec::Utf8View => Self::String,
1112            Codec::Date32 => Self::Date,
1113            Codec::TimeMillis => Self::TimeMillis,
1114            Codec::TimeMicros => Self::TimeMicros,
1115            Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
1116            Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
1117            Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
1118            Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
1119            Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
1120            Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
1121            Codec::Interval => Self::Duration,
1122            Codec::Fixed(_) => Self::Fixed,
1123            Codec::Decimal(..) => Self::Decimal,
1124            Codec::Enum(_) => Self::Enum,
1125            Codec::List(_) => Self::Array,
1126            Codec::Struct(_) => Self::Record,
1127            Codec::Map(_) => Self::Map,
1128            Codec::Uuid => Self::Uuid,
1129            Codec::Union(..) => Self::Union,
1130            #[cfg(feature = "avro_custom_types")]
1131            Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
1132            #[cfg(feature = "avro_custom_types")]
1133            Codec::DurationNanos
1134            | Codec::DurationMicros
1135            | Codec::DurationMillis
1136            | Codec::DurationSeconds => Self::Duration,
1137            #[cfg(feature = "avro_custom_types")]
1138            Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 => Self::Int,
1139            #[cfg(feature = "avro_custom_types")]
1140            Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
1141                Self::Long
1142            }
1143            #[cfg(feature = "avro_custom_types")]
1144            Codec::Time32Secs => Self::TimeMillis, // Closest standard type
1145            #[cfg(feature = "avro_custom_types")]
1146            Codec::UInt64
1147            | Codec::Float16
1148            | Codec::IntervalYearMonth
1149            | Codec::IntervalMonthDayNano
1150            | Codec::IntervalDayTime => Self::Fixed,
1151        }
1152    }
1153}
1154
1155fn union_branch_name(dt: &AvroDataType) -> String {
1156    if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
1157        if name.contains(".") {
1158            // Full name
1159            return name.to_string();
1160        }
1161        if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1162            return format!("{ns}.{name}");
1163        }
1164        return name.to_string();
1165    }
1166    dt.codec.union_field_name()
1167}
1168
1169fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
1170    let arrow_fields: Vec<Field> = encodings
1171        .iter()
1172        .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1173        .collect();
1174    let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1175    UnionFields::try_new(type_ids, arrow_fields)
1176}
1177
1178/// Resolves Avro type names to [`AvroDataType`]
1179///
1180/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1181#[derive(Debug, Default)]
1182struct Resolver<'a> {
1183    map: HashMap<(&'a str, &'a str), AvroDataType>,
1184}
1185
1186impl<'a> Resolver<'a> {
1187    fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1188        self.map.insert((namespace.unwrap_or(""), name), schema);
1189    }
1190
1191    fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1192        let (namespace, name) = name
1193            .rsplit_once('.')
1194            .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1195        self.map
1196            .get(&(namespace, name))
1197            .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1198            .cloned()
1199    }
1200}
1201
1202fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1203    let mut out = HashSet::with_capacity(1 + aliases.len());
1204    let (full, _) = make_full_name(name, ns, None);
1205    out.insert(full);
1206    for a in aliases {
1207        let (fa, _) = make_full_name(a, None, ns);
1208        out.insert(fa);
1209    }
1210    out
1211}
1212
1213fn names_match(
1214    writer_name: &str,
1215    writer_namespace: Option<&str>,
1216    writer_aliases: &[&str],
1217    reader_name: &str,
1218    reader_namespace: Option<&str>,
1219    reader_aliases: &[&str],
1220) -> bool {
1221    let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1222    let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1223    // If the canonical full names match, or any alias matches cross-wise.
1224    !writer_set.is_disjoint(&reader_set)
1225}
1226
1227fn ensure_names_match(
1228    data_type: &str,
1229    writer_name: &str,
1230    writer_namespace: Option<&str>,
1231    writer_aliases: &[&str],
1232    reader_name: &str,
1233    reader_namespace: Option<&str>,
1234    reader_aliases: &[&str],
1235) -> Result<(), ArrowError> {
1236    if names_match(
1237        writer_name,
1238        writer_namespace,
1239        writer_aliases,
1240        reader_name,
1241        reader_namespace,
1242        reader_aliases,
1243    ) {
1244        Ok(())
1245    } else {
1246        Err(ArrowError::ParseError(format!(
1247            "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1248        )))
1249    }
1250}
1251
1252fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1253    match schema {
1254        Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1255        Schema::Type(Type {
1256            r#type: TypeName::Primitive(primitive),
1257            ..
1258        }) => Some(*primitive),
1259        _ => None,
1260    }
1261}
1262
1263fn nullable_union_variants<'x, 'y>(
1264    variant: &'y [Schema<'x>],
1265) -> Option<(Nullability, &'y Schema<'x>)> {
1266    if variant.len() != 2 {
1267        return None;
1268    }
1269    let is_null = |schema: &Schema<'x>| {
1270        matches!(
1271            schema,
1272            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1273        )
1274    };
1275    match (is_null(&variant[0]), is_null(&variant[1])) {
1276        (true, false) => Some((Nullability::NullFirst, &variant[1])),
1277        (false, true) => Some((Nullability::NullSecond, &variant[0])),
1278        _ => None,
1279    }
1280}
1281
1282#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1283enum UnionBranchKey {
1284    Named(String),
1285    Primitive(PrimitiveType),
1286    Array,
1287    Map,
1288}
1289
1290fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1291    let (name, namespace) = match s {
1292        Schema::TypeName(TypeName::Primitive(p))
1293        | Schema::Type(Type {
1294            r#type: TypeName::Primitive(p),
1295            ..
1296        }) => return Some(UnionBranchKey::Primitive(*p)),
1297        Schema::TypeName(TypeName::Ref(name))
1298        | Schema::Type(Type {
1299            r#type: TypeName::Ref(name),
1300            ..
1301        }) => (name, None),
1302        Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1303        Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1304        Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1305        Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1306        Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1307        Schema::Union(_) => return None,
1308    };
1309    let (full, _) = make_full_name(name, namespace, enclosing_ns);
1310    Some(UnionBranchKey::Named(full))
1311}
1312
1313fn union_first_duplicate<'a>(
1314    branches: &'a [Schema<'a>],
1315    enclosing_ns: Option<&'a str>,
1316) -> Option<String> {
1317    let mut seen = HashSet::with_capacity(branches.len());
1318    for schema in branches {
1319        if let Some(key) = branch_key_of(schema, enclosing_ns) {
1320            if !seen.insert(key.clone()) {
1321                let msg = match key {
1322                    UnionBranchKey::Named(full) => format!("named type {full}"),
1323                    UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1324                    UnionBranchKey::Array => "array".to_string(),
1325                    UnionBranchKey::Map => "map".to_string(),
1326                };
1327                return Some(msg);
1328            }
1329        }
1330    }
1331    None
1332}
1333
1334/// Resolves Avro type names to [`AvroDataType`]
1335///
1336/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1337struct Maker<'a> {
1338    resolver: Resolver<'a>,
1339    use_utf8view: bool,
1340    strict_mode: bool,
1341}
1342
1343impl<'a> Maker<'a> {
1344    fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1345        Self {
1346            resolver: Default::default(),
1347            use_utf8view,
1348            strict_mode,
1349        }
1350    }
1351
1352    #[cfg(feature = "avro_custom_types")]
1353    #[inline]
1354    fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1355        if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1356            let mut inner = (*values).clone();
1357            inner.nullability = Some(nb);
1358            dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1359        }
1360    }
1361
1362    fn make_data_type<'s>(
1363        &mut self,
1364        writer_schema: &'s Schema<'a>,
1365        reader_schema: Option<&'s Schema<'a>>,
1366        namespace: Option<&'a str>,
1367    ) -> Result<AvroDataType, ArrowError> {
1368        match reader_schema {
1369            Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1370            None => self.parse_type(writer_schema, namespace),
1371        }
1372    }
1373
1374    /// Parses a [`AvroDataType`] from the provided `Schema` and the given `name` and `namespace`
1375    ///
1376    /// `name`: is the name used to refer to `schema` in its parent
1377    /// `namespace`: an optional qualifier used as part of a type hierarchy
1378    /// If the data type is a string, convert to use Utf8View if requested
1379    ///
1380    /// This function is used during the schema conversion process to determine whether
1381    /// string data should be represented as StringArray (default) or StringViewArray.
1382    ///
1383    /// `use_utf8view`: if true, use Utf8View instead of Utf8 for string types
1384    ///
1385    /// See [`Resolver`] for more information
1386    fn parse_type<'s>(
1387        &mut self,
1388        schema: &'s Schema<'a>,
1389        namespace: Option<&'a str>,
1390    ) -> Result<AvroDataType, ArrowError> {
1391        match schema {
1392            Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1393                Codec::from(*p).with_utf8view(self.use_utf8view),
1394                Default::default(),
1395                None,
1396            )),
1397            Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1398            Schema::Union(f) => {
1399                let null = f
1400                    .iter()
1401                    .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1402                match (f.len() == 2, null) {
1403                    (true, Some(0)) => {
1404                        let mut field = self.parse_type(&f[1], namespace)?;
1405                        field.nullability = Some(Nullability::NullFirst);
1406                        #[cfg(feature = "avro_custom_types")]
1407                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1408                        return Ok(field);
1409                    }
1410                    (true, Some(1)) => {
1411                        if self.strict_mode {
1412                            return Err(ArrowError::SchemaError(
1413                                "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1414                                    .to_string(),
1415                            ));
1416                        }
1417                        let mut field = self.parse_type(&f[0], namespace)?;
1418                        field.nullability = Some(Nullability::NullSecond);
1419                        #[cfg(feature = "avro_custom_types")]
1420                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1421                        return Ok(field);
1422                    }
1423                    _ => {}
1424                }
1425                // Validate: unions may not immediately contain unions
1426                if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1427                    return Err(ArrowError::SchemaError(
1428                        "Avro unions may not immediately contain other unions".to_string(),
1429                    ));
1430                }
1431                // Validate: duplicates (named by full name; non-named by kind)
1432                if let Some(dup) = union_first_duplicate(f, namespace) {
1433                    return Err(ArrowError::SchemaError(format!(
1434                        "Avro union contains duplicate branch type: {dup}"
1435                    )));
1436                }
1437                // Parse all branches
1438                let children: Vec<AvroDataType> = f
1439                    .iter()
1440                    .map(|s| self.parse_type(s, namespace))
1441                    .collect::<Result<_, _>>()?;
1442                // Build Arrow layout once here
1443                let union_fields = build_union_fields(&children)?;
1444                Ok(AvroDataType::new(
1445                    Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1446                    Default::default(),
1447                    None,
1448                ))
1449            }
1450            Schema::Complex(c) => match c {
1451                ComplexType::Record(r) => {
1452                    let namespace = r.namespace.or(namespace);
1453                    let mut metadata = r.attributes.field_metadata();
1454                    let fields = r
1455                        .fields
1456                        .iter()
1457                        .map(|field| {
1458                            Ok(AvroField {
1459                                name: field.name.to_string(),
1460                                data_type: self.parse_type(&field.r#type, namespace)?,
1461                            })
1462                        })
1463                        .collect::<Result<_, ArrowError>>()?;
1464                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1465                    if let Some(ns) = namespace {
1466                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1467                    }
1468                    let field = AvroDataType {
1469                        nullability: None,
1470                        codec: Codec::Struct(fields),
1471                        metadata,
1472                        resolution: None,
1473                    };
1474                    self.resolver.register(r.name, namespace, field.clone());
1475                    Ok(field)
1476                }
1477                ComplexType::Array(a) => {
1478                    let field = self.parse_type(a.items.as_ref(), namespace)?;
1479                    Ok(AvroDataType {
1480                        nullability: None,
1481                        metadata: a.attributes.field_metadata(),
1482                        codec: Codec::List(Arc::new(field)),
1483                        resolution: None,
1484                    })
1485                }
1486                ComplexType::Fixed(f) => {
1487                    let size = f.size.try_into().map_err(|e| {
1488                        ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1489                    })?;
1490                    let namespace = f.namespace.or(namespace);
1491                    let mut metadata = f.attributes.field_metadata();
1492                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1493                    if let Some(ns) = namespace {
1494                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1495                    }
1496                    let field = match f.attributes.logical_type {
1497                        Some("decimal") => {
1498                            let (precision, scale, _) =
1499                                parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1500                            AvroDataType {
1501                                nullability: None,
1502                                metadata,
1503                                codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1504                                resolution: None,
1505                            }
1506                        }
1507                        Some("duration") => {
1508                            if size != 12 {
1509                                return Err(ArrowError::ParseError(format!(
1510                                    "Invalid fixed size for Duration: {size}, must be 12"
1511                                )));
1512                            };
1513                            AvroDataType {
1514                                nullability: None,
1515                                metadata,
1516                                codec: Codec::Interval,
1517                                resolution: None,
1518                            }
1519                        }
1520                        #[cfg(feature = "avro_custom_types")]
1521                        Some("arrow.uint64") if size == 8 => AvroDataType {
1522                            nullability: None,
1523                            metadata,
1524                            codec: Codec::UInt64,
1525                            resolution: None,
1526                        },
1527                        #[cfg(feature = "avro_custom_types")]
1528                        Some("arrow.float16") if size == 2 => AvroDataType {
1529                            nullability: None,
1530                            metadata,
1531                            codec: Codec::Float16,
1532                            resolution: None,
1533                        },
1534                        #[cfg(feature = "avro_custom_types")]
1535                        Some("arrow.interval-year-month") if size == 4 => AvroDataType {
1536                            nullability: None,
1537                            metadata,
1538                            codec: Codec::IntervalYearMonth,
1539                            resolution: None,
1540                        },
1541                        #[cfg(feature = "avro_custom_types")]
1542                        Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1543                            nullability: None,
1544                            metadata,
1545                            codec: Codec::IntervalMonthDayNano,
1546                            resolution: None,
1547                        },
1548                        #[cfg(feature = "avro_custom_types")]
1549                        Some("arrow.interval-day-time") if size == 8 => AvroDataType {
1550                            nullability: None,
1551                            metadata,
1552                            codec: Codec::IntervalDayTime,
1553                            resolution: None,
1554                        },
1555                        _ => AvroDataType {
1556                            nullability: None,
1557                            metadata,
1558                            codec: Codec::Fixed(size),
1559                            resolution: None,
1560                        },
1561                    };
1562                    self.resolver.register(f.name, namespace, field.clone());
1563                    Ok(field)
1564                }
1565                ComplexType::Enum(e) => {
1566                    let namespace = e.namespace.or(namespace);
1567                    let symbols = e
1568                        .symbols
1569                        .iter()
1570                        .map(|s| s.to_string())
1571                        .collect::<Arc<[String]>>();
1572                    let mut metadata = e.attributes.field_metadata();
1573                    let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1574                        ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1575                    })?;
1576                    metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1577                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1578                    if let Some(ns) = namespace {
1579                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1580                    }
1581                    let field = AvroDataType {
1582                        nullability: None,
1583                        metadata,
1584                        codec: Codec::Enum(symbols),
1585                        resolution: None,
1586                    };
1587                    self.resolver.register(e.name, namespace, field.clone());
1588                    Ok(field)
1589                }
1590                ComplexType::Map(m) => {
1591                    let val = self.parse_type(&m.values, namespace)?;
1592                    Ok(AvroDataType {
1593                        nullability: None,
1594                        metadata: m.attributes.field_metadata(),
1595                        codec: Codec::Map(Arc::new(val)),
1596                        resolution: None,
1597                    })
1598                }
1599            },
1600            Schema::Type(t) => {
1601                let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1602                // https://avro.apache.org/docs/1.11.1/specification/#logical-types
1603                match (t.attributes.logical_type, &mut field.codec) {
1604                    (Some("decimal"), c @ Codec::Binary) => {
1605                        let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1606                        *c = Codec::Decimal(prec, Some(sc), None);
1607                    }
1608                    (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1609                    (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1610                    (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1611                    (Some("timestamp-millis"), c @ Codec::Int64) => {
1612                        *c = Codec::TimestampMillis(true)
1613                    }
1614                    (Some("timestamp-micros"), c @ Codec::Int64) => {
1615                        *c = Codec::TimestampMicros(true)
1616                    }
1617                    (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1618                        *c = Codec::TimestampMillis(false)
1619                    }
1620                    (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1621                        *c = Codec::TimestampMicros(false)
1622                    }
1623                    (Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
1624                    (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1625                        *c = Codec::TimestampNanos(false)
1626                    }
1627                    (Some("uuid"), c @ Codec::Utf8) => {
1628                        // Map Avro string+logicalType=uuid into the UUID Codec,
1629                        // and preserve the logicalType in Arrow field metadata
1630                        // so writers can round-trip it correctly.
1631                        *c = Codec::Uuid;
1632                        field.metadata.insert("logicalType".into(), "uuid".into());
1633                    }
1634                    #[cfg(feature = "avro_custom_types")]
1635                    (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1636                    #[cfg(feature = "avro_custom_types")]
1637                    (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1638                    #[cfg(feature = "avro_custom_types")]
1639                    (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1640                    #[cfg(feature = "avro_custom_types")]
1641                    (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1642                        *c = Codec::DurationSeconds
1643                    }
1644                    #[cfg(feature = "avro_custom_types")]
1645                    (Some("arrow.run-end-encoded"), _) => {
1646                        let bits_u8: u8 = t
1647                            .attributes
1648                            .additional
1649                            .get("arrow.runEndIndexBits")
1650                            .and_then(|v| v.as_u64())
1651                            .and_then(|n| u8::try_from(n).ok())
1652                            .ok_or_else(|| ArrowError::ParseError(
1653                                "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1654                                    .to_string(),
1655                            ))?;
1656                        if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1657                            return Err(ArrowError::ParseError(format!(
1658                                "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1659                            )));
1660                        }
1661                        // Wrap the parsed underlying site as REE
1662                        let values_site = field.clone();
1663                        field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1664                    }
1665                    // Arrow-specific integer width types
1666                    #[cfg(feature = "avro_custom_types")]
1667                    (Some("arrow.int8"), c @ Codec::Int32) => *c = Codec::Int8,
1668                    #[cfg(feature = "avro_custom_types")]
1669                    (Some("arrow.int16"), c @ Codec::Int32) => *c = Codec::Int16,
1670                    #[cfg(feature = "avro_custom_types")]
1671                    (Some("arrow.uint8"), c @ Codec::Int32) => *c = Codec::UInt8,
1672                    #[cfg(feature = "avro_custom_types")]
1673                    (Some("arrow.uint16"), c @ Codec::Int32) => *c = Codec::UInt16,
1674                    #[cfg(feature = "avro_custom_types")]
1675                    (Some("arrow.uint32"), c @ Codec::Int64) => *c = Codec::UInt32,
1676                    #[cfg(feature = "avro_custom_types")]
1677                    (Some("arrow.uint64"), c @ Codec::Fixed(8)) => *c = Codec::UInt64,
1678                    // Arrow Float16 stored as fixed(2)
1679                    #[cfg(feature = "avro_custom_types")]
1680                    (Some("arrow.float16"), c @ Codec::Fixed(2)) => *c = Codec::Float16,
1681                    // Arrow Date64 custom type
1682                    #[cfg(feature = "avro_custom_types")]
1683                    (Some("arrow.date64"), c @ Codec::Int64) => *c = Codec::Date64,
1684                    // Arrow time/timestamp types with second precision
1685                    #[cfg(feature = "avro_custom_types")]
1686                    (Some("arrow.time64-nanosecond"), c @ Codec::Int64) => *c = Codec::TimeNanos,
1687                    #[cfg(feature = "avro_custom_types")]
1688                    (Some("arrow.time32-second"), c @ Codec::Int32) => *c = Codec::Time32Secs,
1689                    #[cfg(feature = "avro_custom_types")]
1690                    (Some("arrow.timestamp-second"), c @ Codec::Int64) => {
1691                        *c = Codec::TimestampSecs(true)
1692                    }
1693                    #[cfg(feature = "avro_custom_types")]
1694                    (Some("arrow.local-timestamp-second"), c @ Codec::Int64) => {
1695                        *c = Codec::TimestampSecs(false)
1696                    }
1697                    // Arrow interval types
1698                    #[cfg(feature = "avro_custom_types")]
1699                    (Some("arrow.interval-year-month"), c @ Codec::Fixed(4)) => {
1700                        *c = Codec::IntervalYearMonth
1701                    }
1702                    #[cfg(feature = "avro_custom_types")]
1703                    (Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1704                        *c = Codec::IntervalMonthDayNano
1705                    }
1706                    #[cfg(feature = "avro_custom_types")]
1707                    (Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
1708                        *c = Codec::IntervalDayTime
1709                    }
1710                    (Some(logical), _) => {
1711                        // Insert unrecognized logical type into metadata map
1712                        field.metadata.insert("logicalType".into(), logical.into());
1713                    }
1714                    (None, _) => {}
1715                }
1716                if matches!(field.codec, Codec::Int64) {
1717                    if let Some(unit) = t
1718                        .attributes
1719                        .additional
1720                        .get("arrowTimeUnit")
1721                        .and_then(|v| v.as_str())
1722                    {
1723                        if unit == "nanosecond" {
1724                            field.codec = Codec::TimestampNanos(false);
1725                        }
1726                    }
1727                }
1728                if !t.attributes.additional.is_empty() {
1729                    for (k, v) in &t.attributes.additional {
1730                        field.metadata.insert(k.to_string(), v.to_string());
1731                    }
1732                }
1733                Ok(field)
1734            }
1735        }
1736    }
1737
1738    fn resolve_type<'s>(
1739        &mut self,
1740        writer_schema: &'s Schema<'a>,
1741        reader_schema: &'s Schema<'a>,
1742        namespace: Option<&'a str>,
1743    ) -> Result<AvroDataType, ArrowError> {
1744        if let (Some(write_primitive), Some(read_primitive)) =
1745            (primitive_of(writer_schema), primitive_of(reader_schema))
1746        {
1747            return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1748        }
1749        match (writer_schema, reader_schema) {
1750            (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1751                let writer_variants = writer_variants.as_slice();
1752                let reader_variants = reader_variants.as_slice();
1753                match (
1754                    nullable_union_variants(writer_variants),
1755                    nullable_union_variants(reader_variants),
1756                ) {
1757                    (Some((w_nb, w_nonnull)), Some((r_nb, r_nonnull))) => {
1758                        let mut dt = self.resolve_type(w_nonnull, r_nonnull, namespace)?;
1759                        let mut writer_to_reader = vec![None, None];
1760                        writer_to_reader[w_nb.non_null_index()] = Some((
1761                            r_nb.non_null_index(),
1762                            dt.resolution
1763                                .take()
1764                                .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct)),
1765                        ));
1766                        dt.nullability = Some(w_nb);
1767                        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1768                            writer_to_reader: Arc::from(writer_to_reader),
1769                            writer_is_union: true,
1770                            reader_is_union: true,
1771                        }));
1772                        #[cfg(feature = "avro_custom_types")]
1773                        Self::propagate_nullability_into_ree(&mut dt, w_nb);
1774                        Ok(dt)
1775                    }
1776                    _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1777                }
1778            }
1779            (Schema::Union(writer_variants), reader_non_union) => {
1780                let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1781                    .iter()
1782                    .map(|writer| {
1783                        self.resolve_type(writer, reader_non_union, namespace)
1784                            .ok()
1785                            .map(|tmp| {
1786                                let resolution = tmp
1787                                    .resolution
1788                                    .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1789                                (0usize, resolution)
1790                            })
1791                    })
1792                    .collect();
1793                let mut dt = self.parse_type(reader_non_union, namespace)?;
1794                dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1795                    writer_to_reader: Arc::from(writer_to_reader),
1796                    writer_is_union: true,
1797                    reader_is_union: false,
1798                }));
1799                Ok(dt)
1800            }
1801            (writer_non_union, Schema::Union(reader_variants)) => {
1802                if let Some((nullability, non_null_branch)) =
1803                    nullable_union_variants(reader_variants)
1804                {
1805                    let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1806                    #[cfg(feature = "avro_custom_types")]
1807                    Self::propagate_nullability_into_ree(&mut dt, nullability);
1808                    dt.nullability = Some(nullability);
1809                    // Ensure resolution is set to a non-Union variant to suppress
1810                    // reading the union tag which is the default behavior.
1811                    if dt.resolution.is_none() {
1812                        dt.resolution = Some(ResolutionInfo::Promotion(Promotion::Direct));
1813                    }
1814                    Ok(dt)
1815                } else {
1816                    let Some((match_idx, mut match_dt)) =
1817                        self.find_best_union_match(writer_non_union, reader_variants, namespace)
1818                    else {
1819                        return Err(ArrowError::SchemaError(
1820                            "Writer schema does not match any reader union branch".to_string(),
1821                        ));
1822                    };
1823                    // Steal the resolution info from the matching reader branch
1824                    // for the Union resolution, but preserve possible resolution
1825                    // information on its inner types.
1826                    // For other branches, resolution is irrelevant,
1827                    // so just parse them.
1828                    let resolution = match_dt
1829                        .resolution
1830                        .take()
1831                        .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1832                    let mut match_dt = Some(match_dt);
1833                    let children = reader_variants
1834                        .iter()
1835                        .enumerate()
1836                        .map(|(idx, variant)| {
1837                            if idx == match_idx {
1838                                Ok(match_dt.take().unwrap())
1839                            } else {
1840                                self.parse_type(variant, namespace)
1841                            }
1842                        })
1843                        .collect::<Result<Vec<_>, _>>()?;
1844                    let union_fields = build_union_fields(&children)?;
1845                    let mut dt = AvroDataType::new(
1846                        Codec::Union(children.into(), union_fields, UnionMode::Dense),
1847                        Default::default(),
1848                        None,
1849                    );
1850                    dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1851                        writer_to_reader: Arc::from(vec![Some((match_idx, resolution))]),
1852                        writer_is_union: false,
1853                        reader_is_union: true,
1854                    }));
1855                    Ok(dt)
1856                }
1857            }
1858            (
1859                Schema::Complex(ComplexType::Array(writer_array)),
1860                Schema::Complex(ComplexType::Array(reader_array)),
1861            ) => self.resolve_array(writer_array, reader_array, namespace),
1862            (
1863                Schema::Complex(ComplexType::Map(writer_map)),
1864                Schema::Complex(ComplexType::Map(reader_map)),
1865            ) => self.resolve_map(writer_map, reader_map, namespace),
1866            (
1867                Schema::Complex(ComplexType::Fixed(writer_fixed)),
1868                Schema::Complex(ComplexType::Fixed(reader_fixed)),
1869            ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1870            (
1871                Schema::Complex(ComplexType::Record(writer_record)),
1872                Schema::Complex(ComplexType::Record(reader_record)),
1873            ) => self.resolve_records(writer_record, reader_record, namespace),
1874            (
1875                Schema::Complex(ComplexType::Enum(writer_enum)),
1876                Schema::Complex(ComplexType::Enum(reader_enum)),
1877            ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1878            (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1879            (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1880            _ => Err(ArrowError::NotYetImplemented(
1881                "Other resolutions not yet implemented".to_string(),
1882            )),
1883        }
1884    }
1885
1886    fn find_best_union_match(
1887        &mut self,
1888        writer: &Schema<'a>,
1889        reader_variants: &[Schema<'a>],
1890        namespace: Option<&'a str>,
1891    ) -> Option<(usize, AvroDataType)> {
1892        let mut first_resolution = None;
1893        for (reader_index, reader) in reader_variants.iter().enumerate() {
1894            if let Ok(dt) = self.resolve_type(writer, reader, namespace) {
1895                match &dt.resolution {
1896                    None | Some(ResolutionInfo::Promotion(Promotion::Direct)) => {
1897                        // An exact match is best, return immediately.
1898                        return Some((reader_index, dt));
1899                    }
1900                    Some(_) => {
1901                        if first_resolution.is_none() {
1902                            // Store the first valid promotion but keep searching for a direct match.
1903                            first_resolution = Some((reader_index, dt));
1904                        }
1905                    }
1906                };
1907            }
1908        }
1909        first_resolution
1910    }
1911
1912    fn resolve_unions<'s>(
1913        &mut self,
1914        writer_variants: &'s [Schema<'a>],
1915        reader_variants: &'s [Schema<'a>],
1916        namespace: Option<&'a str>,
1917    ) -> Result<AvroDataType, ArrowError> {
1918        let mut resolved_reader_encodings = HashMap::new();
1919        let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1920            .iter()
1921            .map(|writer| {
1922                self.find_best_union_match(writer, reader_variants, namespace)
1923                    .map(|(match_idx, mut match_dt)| {
1924                        let resolution = match_dt
1925                            .resolution
1926                            .take()
1927                            .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1928                        // TODO: check for overlapping reader variants?
1929                        // They should not be possible in a valid schema.
1930                        resolved_reader_encodings.insert(match_idx, match_dt);
1931                        (match_idx, resolution)
1932                    })
1933            })
1934            .collect();
1935        let reader_encodings: Vec<AvroDataType> = reader_variants
1936            .iter()
1937            .enumerate()
1938            .map(|(reader_idx, reader_schema)| {
1939                if let Some(resolved) = resolved_reader_encodings.remove(&reader_idx) {
1940                    Ok(resolved)
1941                } else {
1942                    self.parse_type(reader_schema, namespace)
1943                }
1944            })
1945            .collect::<Result<_, _>>()?;
1946        let union_fields = build_union_fields(&reader_encodings)?;
1947        let mut dt = AvroDataType::new(
1948            Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1949            Default::default(),
1950            None,
1951        );
1952        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1953            writer_to_reader: Arc::from(writer_to_reader),
1954            writer_is_union: true,
1955            reader_is_union: true,
1956        }));
1957        Ok(dt)
1958    }
1959
1960    fn resolve_array(
1961        &mut self,
1962        writer_array: &Array<'a>,
1963        reader_array: &Array<'a>,
1964        namespace: Option<&'a str>,
1965    ) -> Result<AvroDataType, ArrowError> {
1966        Ok(AvroDataType {
1967            nullability: None,
1968            metadata: reader_array.attributes.field_metadata(),
1969            codec: Codec::List(Arc::new(self.make_data_type(
1970                writer_array.items.as_ref(),
1971                Some(reader_array.items.as_ref()),
1972                namespace,
1973            )?)),
1974            resolution: None,
1975        })
1976    }
1977
1978    fn resolve_map(
1979        &mut self,
1980        writer_map: &Map<'a>,
1981        reader_map: &Map<'a>,
1982        namespace: Option<&'a str>,
1983    ) -> Result<AvroDataType, ArrowError> {
1984        Ok(AvroDataType {
1985            nullability: None,
1986            metadata: reader_map.attributes.field_metadata(),
1987            codec: Codec::Map(Arc::new(self.make_data_type(
1988                &writer_map.values,
1989                Some(&reader_map.values),
1990                namespace,
1991            )?)),
1992            resolution: None,
1993        })
1994    }
1995
1996    fn resolve_fixed<'s>(
1997        &mut self,
1998        writer_fixed: &Fixed<'a>,
1999        reader_fixed: &Fixed<'a>,
2000        reader_schema: &'s Schema<'a>,
2001        namespace: Option<&'a str>,
2002    ) -> Result<AvroDataType, ArrowError> {
2003        ensure_names_match(
2004            "Fixed",
2005            writer_fixed.name,
2006            writer_fixed.namespace,
2007            &writer_fixed.aliases,
2008            reader_fixed.name,
2009            reader_fixed.namespace,
2010            &reader_fixed.aliases,
2011        )?;
2012        if writer_fixed.size != reader_fixed.size {
2013            return Err(ArrowError::SchemaError(format!(
2014                "Fixed size mismatch for {}: writer={}, reader={}",
2015                reader_fixed.name, writer_fixed.size, reader_fixed.size
2016            )));
2017        }
2018        self.parse_type(reader_schema, namespace)
2019    }
2020
2021    fn resolve_primitives(
2022        &mut self,
2023        write_primitive: PrimitiveType,
2024        read_primitive: PrimitiveType,
2025        reader_schema: &Schema<'a>,
2026    ) -> Result<AvroDataType, ArrowError> {
2027        if write_primitive == read_primitive {
2028            return self.parse_type(reader_schema, None);
2029        }
2030        let promotion = match (write_primitive, read_primitive) {
2031            (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
2032            (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
2033            (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
2034            (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
2035            (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
2036            (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
2037            (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
2038            (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
2039            _ => {
2040                return Err(ArrowError::ParseError(format!(
2041                    "Illegal promotion {write_primitive:?} to {read_primitive:?}"
2042                )));
2043            }
2044        };
2045        let mut datatype = self.parse_type(reader_schema, None)?;
2046        datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
2047        Ok(datatype)
2048    }
2049
2050    // Resolve writer vs. reader enum schemas according to Avro 1.11.1.
2051    //
2052    // # How enums resolve (writer to reader)
2053    // Per “Schema Resolution”:
2054    // * The two schemas must refer to the same (unqualified) enum name (or match
2055    //   via alias rewriting).
2056    // * If the writer’s symbol is not present in the reader’s enum and the reader
2057    //   enum has a `default`, that `default` symbol must be used; otherwise,
2058    //   error.
2059    //   https://avro.apache.org/docs/1.11.1/specification/#schema-resolution
2060    // * Avro “Aliases” are applied from the reader side to rewrite the writer’s
2061    //   names during resolution. For robustness across ecosystems, we also accept
2062    //   symmetry here (see note below).
2063    //   https://avro.apache.org/docs/1.11.1/specification/#aliases
2064    //
2065    // # Rationale for this code path
2066    // 1. Do the work once at schema‑resolution time. Avro serializes an enum as a
2067    //    writer‑side position. Mapping positions on the hot decoder path is expensive
2068    //    if done with string lookups. This method builds a `writer_index to reader_index`
2069    //    vector once, so decoding just does an O(1) table lookup.
2070    // 2. Adopt the reader’s symbol set and order. We return an Arrow
2071    //    `Dictionary(Int32, Utf8)` whose dictionary values are the reader enum
2072    //    symbols. This makes downstream semantics match the reader schema, including
2073    //    Avro’s sort order rule that orders enums by symbol position in the schema.
2074    //    https://avro.apache.org/docs/1.11.1/specification/#sort-order
2075    // 3. Honor Avro’s `default` for enums. Avro 1.9+ allows a type‑level default
2076    //    on the enum. When the writer emits a symbol unknown to the reader, we map it
2077    //    to the reader’s validated `default` symbol if present; otherwise we signal an
2078    //    error at decoding time.
2079    //    https://avro.apache.org/docs/1.11.1/specification/#enums
2080    //
2081    // # Implementation notes
2082    // * We first check that enum names match or are*alias‑equivalent. The Avro
2083    //   spec describes alias rewriting using reader aliases; this implementation
2084    //   additionally treats writer aliases as acceptable for name matching to be
2085    //   resilient with schemas produced by different tooling.
2086    // * We build `EnumMapping`:
2087    //   - `mapping[i]` = reader index of the writer symbol at writer index `i`.
2088    //   - If the writer symbol is absent and the reader has a default, we store the
2089    //     reader index of that default.
2090    //   - Otherwise we store `-1` as a sentinel meaning unresolvable; the decoder
2091    //     must treat encountering such a value as an error, per the spec.
2092    // * We persist the reader symbol list in field metadata under
2093    //   `AVRO_ENUM_SYMBOLS_METADATA_KEY`, so consumers can inspect the dictionary
2094    //   without needing the original Avro schema.
2095    // * The Arrow representation is `Dictionary(Int32, Utf8)`, which aligns with
2096    //   Avro’s integer index encoding for enums.
2097    //
2098    // # Examples
2099    // * Writer `["A","B","C"]`, Reader `["A","B"]`, Reader default `"A"`
2100    //     `mapping = [0, 1, 0]`, `default_index = 0`.
2101    // * Writer `["A","B"]`, Reader `["B","A"]` (no default)
2102    //     `mapping = [1, 0]`, `default_index = -1`.
2103    // * Writer `["A","B","C"]`, Reader `["A","B"]` (no default)
2104    //     `mapping = [0, 1, -1]` (decode must error on `"C"`).
2105    fn resolve_enums(
2106        &mut self,
2107        writer_enum: &Enum<'a>,
2108        reader_enum: &Enum<'a>,
2109        reader_schema: &Schema<'a>,
2110        namespace: Option<&'a str>,
2111    ) -> Result<AvroDataType, ArrowError> {
2112        ensure_names_match(
2113            "Enum",
2114            writer_enum.name,
2115            writer_enum.namespace,
2116            &writer_enum.aliases,
2117            reader_enum.name,
2118            reader_enum.namespace,
2119            &reader_enum.aliases,
2120        )?;
2121        if writer_enum.symbols == reader_enum.symbols {
2122            return self.parse_type(reader_schema, namespace);
2123        }
2124        let reader_index: HashMap<&str, i32> = reader_enum
2125            .symbols
2126            .iter()
2127            .enumerate()
2128            .map(|(index, &symbol)| (symbol, index as i32))
2129            .collect();
2130        let default_index: i32 = match reader_enum.default {
2131            Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
2132                ArrowError::SchemaError(format!(
2133                    "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
2134                    reader_enum.name,
2135                ))
2136            })?,
2137            None => -1,
2138        };
2139        let mapping: Vec<i32> = writer_enum
2140            .symbols
2141            .iter()
2142            .map(|&write_symbol| {
2143                reader_index
2144                    .get(write_symbol)
2145                    .copied()
2146                    .unwrap_or(default_index)
2147            })
2148            .collect();
2149        if self.strict_mode && mapping.iter().any(|&m| m < 0) {
2150            return Err(ArrowError::SchemaError(format!(
2151                "Reader enum '{}' does not cover all writer symbols and no default is provided",
2152                reader_enum.name
2153            )));
2154        }
2155        let mut dt = self.parse_type(reader_schema, namespace)?;
2156        dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
2157            mapping: Arc::from(mapping),
2158            default_index,
2159        }));
2160        let reader_ns = reader_enum.namespace.or(namespace);
2161        self.resolver
2162            .register(reader_enum.name, reader_ns, dt.clone());
2163        Ok(dt)
2164    }
2165
2166    #[inline]
2167    fn build_writer_lookup(
2168        writer_record: &Record<'a>,
2169    ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
2170        let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
2171        for (idx, wf) in writer_record.fields.iter().enumerate() {
2172            // Avro field names are unique; last-in wins are acceptable and match previous behavior.
2173            map.insert(wf.name, idx);
2174        }
2175        // Track ambiguous writer aliases (alias used by multiple writer fields)
2176        let mut ambiguous: HashSet<&str> = HashSet::new();
2177        for (idx, wf) in writer_record.fields.iter().enumerate() {
2178            for &alias in &wf.aliases {
2179                match map.entry(alias) {
2180                    Entry::Occupied(e) if *e.get() != idx => {
2181                        ambiguous.insert(alias);
2182                    }
2183                    Entry::Vacant(e) => {
2184                        e.insert(idx);
2185                    }
2186                    _ => {}
2187                }
2188            }
2189        }
2190        (map, ambiguous)
2191    }
2192
2193    fn resolve_records(
2194        &mut self,
2195        writer_record: &Record<'a>,
2196        reader_record: &Record<'a>,
2197        namespace: Option<&'a str>,
2198    ) -> Result<AvroDataType, ArrowError> {
2199        ensure_names_match(
2200            "Record",
2201            writer_record.name,
2202            writer_record.namespace,
2203            &writer_record.aliases,
2204            reader_record.name,
2205            reader_record.namespace,
2206            &reader_record.aliases,
2207        )?;
2208        let writer_ns = writer_record.namespace.or(namespace);
2209        let reader_ns = reader_record.namespace.or(namespace);
2210        let mut reader_md = reader_record.attributes.field_metadata();
2211        reader_md.insert(
2212            AVRO_NAME_METADATA_KEY.to_string(),
2213            reader_record.name.to_string(),
2214        );
2215        if let Some(ns) = reader_ns {
2216            reader_md.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
2217        }
2218        // Build writer lookup and ambiguous alias set.
2219        let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
2220        let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
2221        let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
2222        // Capture default field indices during the main loop (one pass).
2223        let mut default_fields: Vec<usize> = Vec::new();
2224        for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
2225            // Direct name match, then reader aliases (a writer alias map is pre-populated).
2226            let mut match_idx = writer_lookup.get(r_field.name).copied();
2227            let mut matched_via_alias: Option<&str> = None;
2228            if match_idx.is_none() {
2229                for &alias in &r_field.aliases {
2230                    if let Some(i) = writer_lookup.get(alias).copied() {
2231                        if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
2232                            return Err(ArrowError::SchemaError(format!(
2233                                "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
2234                                r_field.name
2235                            )));
2236                        }
2237                        match_idx = Some(i);
2238                        matched_via_alias = Some(alias);
2239                        break;
2240                    }
2241                }
2242            }
2243            if let Some(wi) = match_idx {
2244                if writer_to_reader[wi].is_none() {
2245                    let w_schema = &writer_record.fields[wi].r#type;
2246                    let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
2247                    writer_to_reader[wi] = Some(reader_idx);
2248                    reader_fields.push(AvroField {
2249                        name: r_field.name.to_owned(),
2250                        data_type: dt,
2251                    });
2252                    continue;
2253                } else if self.strict_mode {
2254                    // Writer field already mapped and strict_mode => error
2255                    let existing_reader = writer_to_reader[wi].unwrap();
2256                    let via = matched_via_alias
2257                        .map(|a| format!("alias '{a}'"))
2258                        .unwrap_or_else(|| "name match".to_string());
2259                    return Err(ArrowError::SchemaError(format!(
2260                        "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
2261                        writer_record.fields[wi].name
2262                    )));
2263                }
2264                // Non-strict and already mapped -> fall through to defaulting logic
2265            }
2266            // No match (or conflicted in non-strict mode): attach default per Avro spec.
2267            let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
2268            if let Some(default_json) = r_field.default.as_ref() {
2269                dt.resolution = Some(ResolutionInfo::DefaultValue(
2270                    dt.parse_and_store_default(default_json)?,
2271                ));
2272                default_fields.push(reader_idx);
2273            } else if dt.nullability() == Some(Nullability::NullFirst) {
2274                // The only valid implicit default for a union is the first branch (null-first case).
2275                dt.resolution = Some(ResolutionInfo::DefaultValue(
2276                    dt.parse_and_store_default(&Value::Null)?,
2277                ));
2278                default_fields.push(reader_idx);
2279            } else {
2280                return Err(ArrowError::SchemaError(format!(
2281                    "Reader field '{}' not present in writer schema must have a default value",
2282                    r_field.name
2283                )));
2284            }
2285            reader_fields.push(AvroField {
2286                name: r_field.name.to_owned(),
2287                data_type: dt,
2288            });
2289        }
2290        // Build writer field map.
2291        let writer_fields = writer_record
2292            .fields
2293            .iter()
2294            .enumerate()
2295            .map(|(writer_index, writer_field)| {
2296                if let Some(reader_index) = writer_to_reader[writer_index] {
2297                    Ok(ResolvedField::ToReader(reader_index))
2298                } else {
2299                    let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
2300                    Ok(ResolvedField::Skip(dt))
2301                }
2302            })
2303            .collect::<Result<_, ArrowError>>()?;
2304        let resolved = AvroDataType::new_with_resolution(
2305            Codec::Struct(Arc::from(reader_fields)),
2306            reader_md,
2307            None,
2308            Some(ResolutionInfo::Record(ResolvedRecord {
2309                writer_fields,
2310                default_fields: Arc::from(default_fields),
2311            })),
2312        );
2313        // Register a resolved record by reader name+namespace for potential named type refs.
2314        self.resolver
2315            .register(reader_record.name, reader_ns, resolved.clone());
2316        Ok(resolved)
2317    }
2318}
2319
2320#[cfg(test)]
2321mod tests {
2322    use super::*;
2323    use crate::schema::{
2324        AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2325        Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2326    };
2327    use indexmap::IndexMap;
2328    use serde_json::{self, Value};
2329
2330    fn create_schema_with_logical_type(
2331        primitive_type: PrimitiveType,
2332        logical_type: &'static str,
2333    ) -> Schema<'static> {
2334        let attributes = Attributes {
2335            logical_type: Some(logical_type),
2336            additional: Default::default(),
2337        };
2338
2339        Schema::Type(Type {
2340            r#type: TypeName::Primitive(primitive_type),
2341            attributes,
2342        })
2343    }
2344
2345    fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2346        let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2347        let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2348        let mut maker = Maker::new(false, false);
2349        maker
2350            .make_data_type(&writer_schema, Some(&reader_schema), None)
2351            .expect("promotion should resolve")
2352    }
2353
2354    fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2355        Schema::TypeName(TypeName::Primitive(pt))
2356    }
2357    fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2358        Schema::Union(branches)
2359    }
2360
2361    #[test]
2362    fn test_date_logical_type() {
2363        let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2364
2365        let mut maker = Maker::new(false, false);
2366        let result = maker.make_data_type(&schema, None, None).unwrap();
2367
2368        assert!(matches!(result.codec, Codec::Date32));
2369    }
2370
2371    #[test]
2372    fn test_time_millis_logical_type() {
2373        let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2374
2375        let mut maker = Maker::new(false, false);
2376        let result = maker.make_data_type(&schema, None, None).unwrap();
2377
2378        assert!(matches!(result.codec, Codec::TimeMillis));
2379    }
2380
2381    #[test]
2382    fn test_time_micros_logical_type() {
2383        let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2384
2385        let mut maker = Maker::new(false, false);
2386        let result = maker.make_data_type(&schema, None, None).unwrap();
2387
2388        assert!(matches!(result.codec, Codec::TimeMicros));
2389    }
2390
2391    #[test]
2392    fn test_timestamp_millis_logical_type() {
2393        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2394
2395        let mut maker = Maker::new(false, false);
2396        let result = maker.make_data_type(&schema, None, None).unwrap();
2397
2398        assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2399    }
2400
2401    #[test]
2402    fn test_timestamp_micros_logical_type() {
2403        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2404
2405        let mut maker = Maker::new(false, false);
2406        let result = maker.make_data_type(&schema, None, None).unwrap();
2407
2408        assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2409    }
2410
2411    #[test]
2412    fn test_local_timestamp_millis_logical_type() {
2413        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2414
2415        let mut maker = Maker::new(false, false);
2416        let result = maker.make_data_type(&schema, None, None).unwrap();
2417
2418        assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2419    }
2420
2421    #[test]
2422    fn test_local_timestamp_micros_logical_type() {
2423        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2424
2425        let mut maker = Maker::new(false, false);
2426        let result = maker.make_data_type(&schema, None, None).unwrap();
2427
2428        assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2429    }
2430
2431    #[test]
2432    fn test_uuid_type() {
2433        let mut codec = Codec::Fixed(16);
2434        if let c @ Codec::Fixed(16) = &mut codec {
2435            *c = Codec::Uuid;
2436        }
2437        assert!(matches!(codec, Codec::Uuid));
2438    }
2439
2440    #[test]
2441    fn test_duration_logical_type() {
2442        let mut codec = Codec::Fixed(12);
2443
2444        if let c @ Codec::Fixed(12) = &mut codec {
2445            *c = Codec::Interval;
2446        }
2447
2448        assert!(matches!(codec, Codec::Interval));
2449    }
2450
2451    #[test]
2452    fn test_decimal_logical_type_not_implemented() {
2453        let codec = Codec::Fixed(16);
2454
2455        let process_decimal = || -> Result<(), ArrowError> {
2456            if let Codec::Fixed(_) = codec {
2457                return Err(ArrowError::NotYetImplemented(
2458                    "Decimals are not currently supported".to_string(),
2459                ));
2460            }
2461            Ok(())
2462        };
2463
2464        let result = process_decimal();
2465
2466        assert!(result.is_err());
2467        if let Err(ArrowError::NotYetImplemented(msg)) = result {
2468            assert!(msg.contains("Decimals are not currently supported"));
2469        } else {
2470            panic!("Expected NotYetImplemented error");
2471        }
2472    }
2473    #[test]
2474    fn test_unknown_logical_type_added_to_metadata() {
2475        let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2476
2477        let mut maker = Maker::new(false, false);
2478        let result = maker.make_data_type(&schema, None, None).unwrap();
2479
2480        assert_eq!(
2481            result.metadata.get("logicalType"),
2482            Some(&"custom-type".to_string())
2483        );
2484    }
2485
2486    #[test]
2487    fn test_string_with_utf8view_enabled() {
2488        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2489
2490        let mut maker = Maker::new(true, false);
2491        let result = maker.make_data_type(&schema, None, None).unwrap();
2492
2493        assert!(matches!(result.codec, Codec::Utf8View));
2494    }
2495
2496    #[test]
2497    fn test_string_without_utf8view_enabled() {
2498        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2499
2500        let mut maker = Maker::new(false, false);
2501        let result = maker.make_data_type(&schema, None, None).unwrap();
2502
2503        assert!(matches!(result.codec, Codec::Utf8));
2504    }
2505
2506    #[test]
2507    fn test_record_with_string_and_utf8view_enabled() {
2508        let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2509
2510        let avro_field = crate::schema::Field {
2511            name: "string_field",
2512            r#type: field_schema,
2513            default: None,
2514            doc: None,
2515            aliases: vec![],
2516        };
2517
2518        let record = Record {
2519            name: "test_record",
2520            namespace: None,
2521            aliases: vec![],
2522            doc: None,
2523            fields: vec![avro_field],
2524            attributes: Attributes::default(),
2525        };
2526
2527        let schema = Schema::Complex(ComplexType::Record(record));
2528
2529        let mut maker = Maker::new(true, false);
2530        let result = maker.make_data_type(&schema, None, None).unwrap();
2531
2532        if let Codec::Struct(fields) = &result.codec {
2533            let first_field_codec = &fields[0].data_type().codec;
2534            assert!(matches!(first_field_codec, Codec::Utf8View));
2535        } else {
2536            panic!("Expected Struct codec");
2537        }
2538    }
2539
2540    #[test]
2541    fn test_union_with_strict_mode() {
2542        let schema = Schema::Union(vec![
2543            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2544            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2545        ]);
2546
2547        let mut maker = Maker::new(false, true);
2548        let result = maker.make_data_type(&schema, None, None);
2549
2550        assert!(result.is_err());
2551        match result {
2552            Err(ArrowError::SchemaError(msg)) => {
2553                assert!(msg.contains(
2554                    "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2555                ));
2556            }
2557            _ => panic!("Expected SchemaError"),
2558        }
2559    }
2560
2561    #[test]
2562    fn test_resolve_int_to_float_promotion() {
2563        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2564        assert!(matches!(result.codec, Codec::Float32));
2565        assert_eq!(
2566            result.resolution,
2567            Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2568        );
2569    }
2570
2571    #[test]
2572    fn test_resolve_int_to_double_promotion() {
2573        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2574        assert!(matches!(result.codec, Codec::Float64));
2575        assert_eq!(
2576            result.resolution,
2577            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2578        );
2579    }
2580
2581    #[test]
2582    fn test_resolve_long_to_float_promotion() {
2583        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2584        assert!(matches!(result.codec, Codec::Float32));
2585        assert_eq!(
2586            result.resolution,
2587            Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2588        );
2589    }
2590
2591    #[test]
2592    fn test_resolve_long_to_double_promotion() {
2593        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2594        assert!(matches!(result.codec, Codec::Float64));
2595        assert_eq!(
2596            result.resolution,
2597            Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2598        );
2599    }
2600
2601    #[test]
2602    fn test_resolve_float_to_double_promotion() {
2603        let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2604        assert!(matches!(result.codec, Codec::Float64));
2605        assert_eq!(
2606            result.resolution,
2607            Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2608        );
2609    }
2610
2611    #[test]
2612    fn test_resolve_string_to_bytes_promotion() {
2613        let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2614        assert!(matches!(result.codec, Codec::Binary));
2615        assert_eq!(
2616            result.resolution,
2617            Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2618        );
2619    }
2620
2621    #[test]
2622    fn test_resolve_bytes_to_string_promotion() {
2623        let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2624        assert!(matches!(result.codec, Codec::Utf8));
2625        assert_eq!(
2626            result.resolution,
2627            Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2628        );
2629    }
2630
2631    #[test]
2632    fn test_resolve_illegal_promotion_double_to_float_errors() {
2633        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2634        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2635        let mut maker = Maker::new(false, false);
2636        let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2637        assert!(result.is_err());
2638        match result {
2639            Err(ArrowError::ParseError(msg)) => {
2640                assert!(msg.contains("Illegal promotion"));
2641            }
2642            _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2643        }
2644    }
2645
2646    #[test]
2647    fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2648        let writer = Schema::Union(vec![
2649            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2650            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2651        ]);
2652        let reader = Schema::Union(vec![
2653            Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2654            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2655        ]);
2656        let mut maker = Maker::new(false, false);
2657        let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2658        assert!(matches!(result.codec, Codec::Float64));
2659        assert_eq!(
2660            result.resolution,
2661            Some(ResolutionInfo::Union(ResolvedUnion {
2662                writer_to_reader: [
2663                    None,
2664                    Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2665                ]
2666                .into(),
2667                writer_is_union: true,
2668                reader_is_union: true,
2669            }))
2670        );
2671        assert_eq!(result.nullability, Some(Nullability::NullFirst));
2672    }
2673
2674    #[test]
2675    fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2676        let writer = mk_union(vec![
2677            mk_primitive(PrimitiveType::String),
2678            mk_primitive(PrimitiveType::Long),
2679        ]);
2680        let reader = mk_primitive(PrimitiveType::Bytes);
2681        let mut maker = Maker::new(false, false);
2682        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2683        assert!(matches!(dt.codec(), Codec::Binary));
2684        let resolved = match dt.resolution {
2685            Some(ResolutionInfo::Union(u)) => u,
2686            other => panic!("expected union resolution info, got {other:?}"),
2687        };
2688        assert!(resolved.writer_is_union && !resolved.reader_is_union);
2689        assert_eq!(
2690            resolved.writer_to_reader.as_ref(),
2691            &[
2692                Some((0, ResolutionInfo::Promotion(Promotion::StringToBytes))),
2693                None
2694            ]
2695        );
2696    }
2697
2698    #[test]
2699    fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2700        let writer = mk_primitive(PrimitiveType::Long);
2701        let reader = mk_union(vec![
2702            mk_primitive(PrimitiveType::Long),
2703            mk_primitive(PrimitiveType::Double),
2704        ]);
2705        let mut maker = Maker::new(false, false);
2706        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2707        let resolved = match dt.resolution {
2708            Some(ResolutionInfo::Union(u)) => u,
2709            other => panic!("expected union resolution info, got {other:?}"),
2710        };
2711        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2712        assert_eq!(
2713            resolved.writer_to_reader.as_ref(),
2714            &[Some((0, ResolutionInfo::Promotion(Promotion::Direct)))]
2715        );
2716    }
2717
2718    #[test]
2719    fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2720        let writer = mk_primitive(PrimitiveType::Int);
2721        let reader = mk_union(vec![
2722            mk_primitive(PrimitiveType::Null),
2723            mk_primitive(PrimitiveType::Long),
2724            mk_primitive(PrimitiveType::String),
2725        ]);
2726        let mut maker = Maker::new(false, false);
2727        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2728        let resolved = match dt.resolution {
2729            Some(ResolutionInfo::Union(u)) => u,
2730            other => panic!("expected union resolution info, got {other:?}"),
2731        };
2732        assert_eq!(
2733            resolved.writer_to_reader.as_ref(),
2734            &[Some((1, ResolutionInfo::Promotion(Promotion::IntToLong)))]
2735        );
2736    }
2737
2738    #[test]
2739    fn test_resolve_writer_non_union_to_reader_union_preserves_inner_record_defaults() {
2740        // Writer: record Inner{a: int}
2741        // Reader: union [Inner{a: int, b: int default 42}, string]
2742        // The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
2743        let writer = Schema::Complex(ComplexType::Record(Record {
2744            name: "Inner",
2745            namespace: None,
2746            doc: None,
2747            aliases: vec![],
2748            fields: vec![AvroFieldSchema {
2749                name: "a",
2750                doc: None,
2751                r#type: mk_primitive(PrimitiveType::Int),
2752                default: None,
2753                aliases: vec![],
2754            }],
2755            attributes: Attributes::default(),
2756        }));
2757        let reader = mk_union(vec![
2758            Schema::Complex(ComplexType::Record(Record {
2759                name: "Inner",
2760                namespace: None,
2761                doc: None,
2762                aliases: vec![],
2763                fields: vec![
2764                    AvroFieldSchema {
2765                        name: "a",
2766                        doc: None,
2767                        r#type: mk_primitive(PrimitiveType::Int),
2768                        default: None,
2769                        aliases: vec![],
2770                    },
2771                    AvroFieldSchema {
2772                        name: "b",
2773                        doc: None,
2774                        r#type: mk_primitive(PrimitiveType::Int),
2775                        default: Some(Value::Number(serde_json::Number::from(42))),
2776                        aliases: vec![],
2777                    },
2778                ],
2779                attributes: Attributes::default(),
2780            })),
2781            mk_primitive(PrimitiveType::String),
2782        ]);
2783        let mut maker = Maker::new(false, false);
2784        let dt = maker
2785            .make_data_type(&writer, Some(&reader), None)
2786            .expect("resolution should succeed");
2787        // Verify the union resolution structure
2788        let resolved = match dt.resolution.as_ref() {
2789            Some(ResolutionInfo::Union(u)) => u,
2790            other => panic!("expected union resolution info, got {other:?}"),
2791        };
2792        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2793        assert_eq!(
2794            resolved.writer_to_reader.len(),
2795            1,
2796            "expected the non-union record to resolve to a union variant"
2797        );
2798        let resolution = match resolved.writer_to_reader.first().unwrap() {
2799            Some((0, resolution)) => resolution,
2800            other => panic!("unexpected writer-to-reader table value {other:?}"),
2801        };
2802        match resolution {
2803            ResolutionInfo::Record(ResolvedRecord {
2804                writer_fields,
2805                default_fields,
2806            }) => {
2807                assert_eq!(writer_fields.len(), 1);
2808                assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2809                assert_eq!(default_fields.len(), 1);
2810                assert_eq!(default_fields[0], 1);
2811            }
2812            other => panic!("unexpected resolution {other:?}"),
2813        }
2814        // The matching child (Inner at index 0) should have field b with DefaultValue
2815        let children = match dt.codec() {
2816            Codec::Union(children, _, _) => children,
2817            other => panic!("expected union codec, got {other:?}"),
2818        };
2819        let inner_fields = match children[0].codec() {
2820            Codec::Struct(f) => f,
2821            other => panic!("expected struct codec for Inner, got {other:?}"),
2822        };
2823        assert_eq!(inner_fields.len(), 2);
2824        assert_eq!(inner_fields[1].name(), "b");
2825        assert_eq!(
2826            inner_fields[1].data_type().resolution,
2827            Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2828            "field b should have DefaultValue(Int(42)) from schema resolution"
2829        );
2830    }
2831
2832    #[test]
2833    fn test_resolve_writer_union_to_reader_union_preserves_inner_record_defaults() {
2834        // Writer: record [string, Inner{a: int}]
2835        // Reader: union [Inner{a: int, b: int default 42}, string]
2836        // The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
2837        let writer = mk_union(vec![
2838            mk_primitive(PrimitiveType::String),
2839            Schema::Complex(ComplexType::Record(Record {
2840                name: "Inner",
2841                namespace: None,
2842                doc: None,
2843                aliases: vec![],
2844                fields: vec![AvroFieldSchema {
2845                    name: "a",
2846                    doc: None,
2847                    r#type: mk_primitive(PrimitiveType::Int),
2848                    default: None,
2849                    aliases: vec![],
2850                }],
2851                attributes: Attributes::default(),
2852            })),
2853        ]);
2854        let reader = mk_union(vec![
2855            Schema::Complex(ComplexType::Record(Record {
2856                name: "Inner",
2857                namespace: None,
2858                doc: None,
2859                aliases: vec![],
2860                fields: vec![
2861                    AvroFieldSchema {
2862                        name: "a",
2863                        doc: None,
2864                        r#type: mk_primitive(PrimitiveType::Int),
2865                        default: None,
2866                        aliases: vec![],
2867                    },
2868                    AvroFieldSchema {
2869                        name: "b",
2870                        doc: None,
2871                        r#type: mk_primitive(PrimitiveType::Int),
2872                        default: Some(Value::Number(serde_json::Number::from(42))),
2873                        aliases: vec![],
2874                    },
2875                ],
2876                attributes: Attributes::default(),
2877            })),
2878            mk_primitive(PrimitiveType::String),
2879        ]);
2880        let mut maker = Maker::new(false, false);
2881        let dt = maker
2882            .make_data_type(&writer, Some(&reader), None)
2883            .expect("resolution should succeed");
2884        // Verify the union resolution structure
2885        let resolved = match dt.resolution.as_ref() {
2886            Some(ResolutionInfo::Union(u)) => u,
2887            other => panic!("expected union resolution info, got {other:?}"),
2888        };
2889        assert!(resolved.writer_is_union && resolved.reader_is_union);
2890        assert_eq!(resolved.writer_to_reader.len(), 2);
2891        let resolution = match resolved.writer_to_reader[1].as_ref() {
2892            Some((0, resolution)) => resolution,
2893            other => panic!("unexpected writer-to-reader table value {other:?}"),
2894        };
2895        match resolution {
2896            ResolutionInfo::Record(ResolvedRecord {
2897                writer_fields,
2898                default_fields,
2899            }) => {
2900                assert_eq!(writer_fields.len(), 1);
2901                assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2902                assert_eq!(default_fields.len(), 1);
2903                assert_eq!(default_fields[0], 1);
2904            }
2905            other => panic!("unexpected resolution {other:?}"),
2906        }
2907        // The matching child (Inner at index 0) should have field b with DefaultValue
2908        let children = match dt.codec() {
2909            Codec::Union(children, _, _) => children,
2910            other => panic!("expected union codec, got {other:?}"),
2911        };
2912        let inner_fields = match children[0].codec() {
2913            Codec::Struct(f) => f,
2914            other => panic!("expected struct codec for Inner, got {other:?}"),
2915        };
2916        assert_eq!(inner_fields.len(), 2);
2917        assert_eq!(inner_fields[1].name(), "b");
2918        assert_eq!(
2919            inner_fields[1].data_type().resolution,
2920            Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2921            "field b should have DefaultValue(Int(42)) from schema resolution"
2922        );
2923    }
2924
2925    #[test]
2926    fn test_resolve_both_nullable_unions_direct_match() {
2927        let writer = mk_union(vec![
2928            mk_primitive(PrimitiveType::Null),
2929            mk_primitive(PrimitiveType::String),
2930        ]);
2931        let reader = mk_union(vec![
2932            mk_primitive(PrimitiveType::String),
2933            mk_primitive(PrimitiveType::Null),
2934        ]);
2935        let mut maker = Maker::new(false, false);
2936        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2937        assert!(matches!(dt.codec(), Codec::Utf8));
2938        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2939        assert_eq!(
2940            dt.resolution,
2941            Some(ResolutionInfo::Union(ResolvedUnion {
2942                writer_to_reader: [
2943                    None,
2944                    Some((0, ResolutionInfo::Promotion(Promotion::Direct)))
2945                ]
2946                .into(),
2947                writer_is_union: true,
2948                reader_is_union: true
2949            }))
2950        );
2951    }
2952
2953    #[test]
2954    fn test_resolve_both_nullable_unions_with_promotion() {
2955        let writer = mk_union(vec![
2956            mk_primitive(PrimitiveType::Null),
2957            mk_primitive(PrimitiveType::Int),
2958        ]);
2959        let reader = mk_union(vec![
2960            mk_primitive(PrimitiveType::Double),
2961            mk_primitive(PrimitiveType::Null),
2962        ]);
2963        let mut maker = Maker::new(false, false);
2964        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2965        assert!(matches!(dt.codec(), Codec::Float64));
2966        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2967        assert_eq!(
2968            dt.resolution,
2969            Some(ResolutionInfo::Union(ResolvedUnion {
2970                writer_to_reader: [
2971                    None,
2972                    Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2973                ]
2974                .into(),
2975                writer_is_union: true,
2976                reader_is_union: true
2977            }))
2978        );
2979    }
2980
2981    #[test]
2982    fn test_resolve_type_promotion() {
2983        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2984        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2985        let mut maker = Maker::new(false, false);
2986        let result = maker
2987            .make_data_type(&writer_schema, Some(&reader_schema), None)
2988            .unwrap();
2989        assert!(matches!(result.codec, Codec::Int64));
2990        assert_eq!(
2991            result.resolution,
2992            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2993        );
2994    }
2995
2996    #[test]
2997    fn test_nested_record_type_reuse_without_namespace() {
2998        let schema_str = r#"
2999        {
3000          "type": "record",
3001          "name": "Record",
3002          "fields": [
3003            {
3004              "name": "nested",
3005              "type": {
3006                "type": "record",
3007                "name": "Nested",
3008                "fields": [
3009                  { "name": "nested_int", "type": "int" }
3010                ]
3011              }
3012            },
3013            { "name": "nestedRecord", "type": "Nested" },
3014            { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
3015            { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
3016          ]
3017        }
3018        "#;
3019
3020        let schema: Schema = serde_json::from_str(schema_str).unwrap();
3021
3022        let mut maker = Maker::new(false, false);
3023        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3024
3025        if let Codec::Struct(fields) = avro_data_type.codec() {
3026            assert_eq!(fields.len(), 4);
3027
3028            // nested
3029            assert_eq!(fields[0].name(), "nested");
3030            let nested_data_type = fields[0].data_type();
3031            if let Codec::Struct(nested_fields) = nested_data_type.codec() {
3032                assert_eq!(nested_fields.len(), 1);
3033                assert_eq!(nested_fields[0].name(), "nested_int");
3034                assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
3035            } else {
3036                panic!(
3037                    "'nested' field is not a struct but {:?}",
3038                    nested_data_type.codec()
3039                );
3040            }
3041
3042            // nestedRecord
3043            assert_eq!(fields[1].name(), "nestedRecord");
3044            let nested_record_data_type = fields[1].data_type();
3045            assert_eq!(
3046                nested_record_data_type.codec().data_type(),
3047                nested_data_type.codec().data_type()
3048            );
3049
3050            // nestedArray
3051            assert_eq!(fields[2].name(), "nestedArray");
3052            if let Codec::List(item_type) = fields[2].data_type().codec() {
3053                assert_eq!(
3054                    item_type.codec().data_type(),
3055                    nested_data_type.codec().data_type()
3056                );
3057            } else {
3058                panic!("'nestedArray' field is not a list");
3059            }
3060
3061            // nestedMap
3062            assert_eq!(fields[3].name(), "nestedMap");
3063            if let Codec::Map(value_type) = fields[3].data_type().codec() {
3064                assert_eq!(
3065                    value_type.codec().data_type(),
3066                    nested_data_type.codec().data_type()
3067                );
3068            } else {
3069                panic!("'nestedMap' field is not a map");
3070            }
3071        } else {
3072            panic!("Top-level schema is not a struct");
3073        }
3074    }
3075
3076    #[test]
3077    fn test_nested_enum_type_reuse_with_namespace() {
3078        let schema_str = r#"
3079        {
3080          "type": "record",
3081          "name": "Record",
3082          "namespace": "record_ns",
3083          "fields": [
3084            {
3085              "name": "status",
3086              "type": {
3087                "type": "enum",
3088                "name": "Status",
3089                "namespace": "enum_ns",
3090                "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
3091              }
3092            },
3093            { "name": "backupStatus", "type": "enum_ns.Status" },
3094            { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
3095            { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
3096          ]
3097        }
3098        "#;
3099
3100        let schema: Schema = serde_json::from_str(schema_str).unwrap();
3101
3102        let mut maker = Maker::new(false, false);
3103        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3104
3105        if let Codec::Struct(fields) = avro_data_type.codec() {
3106            assert_eq!(fields.len(), 4);
3107
3108            // status
3109            assert_eq!(fields[0].name(), "status");
3110            let status_data_type = fields[0].data_type();
3111            if let Codec::Enum(symbols) = status_data_type.codec() {
3112                assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
3113            } else {
3114                panic!(
3115                    "'status' field is not an enum but {:?}",
3116                    status_data_type.codec()
3117                );
3118            }
3119
3120            // backupStatus
3121            assert_eq!(fields[1].name(), "backupStatus");
3122            let backup_status_data_type = fields[1].data_type();
3123            assert_eq!(
3124                backup_status_data_type.codec().data_type(),
3125                status_data_type.codec().data_type()
3126            );
3127
3128            // statusHistory
3129            assert_eq!(fields[2].name(), "statusHistory");
3130            if let Codec::List(item_type) = fields[2].data_type().codec() {
3131                assert_eq!(
3132                    item_type.codec().data_type(),
3133                    status_data_type.codec().data_type()
3134                );
3135            } else {
3136                panic!("'statusHistory' field is not a list");
3137            }
3138
3139            // statusMap
3140            assert_eq!(fields[3].name(), "statusMap");
3141            if let Codec::Map(value_type) = fields[3].data_type().codec() {
3142                assert_eq!(
3143                    value_type.codec().data_type(),
3144                    status_data_type.codec().data_type()
3145                );
3146            } else {
3147                panic!("'statusMap' field is not a map");
3148            }
3149        } else {
3150            panic!("Top-level schema is not a struct");
3151        }
3152    }
3153
3154    #[test]
3155    fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
3156        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3157        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3158        let mut maker = Maker::new(false, false);
3159        let data_type = maker
3160            .make_data_type(&writer_schema, Some(&reader_schema), None)
3161            .expect("resolution should succeed");
3162        let field = AvroField {
3163            name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
3164            data_type,
3165        };
3166        assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
3167        assert!(matches!(field.data_type().codec(), Codec::Utf8));
3168    }
3169
3170    fn json_string(s: &str) -> Value {
3171        Value::String(s.to_string())
3172    }
3173
3174    fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
3175        let stored = dt
3176            .metadata
3177            .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
3178            .cloned()
3179            .unwrap_or_default();
3180        let expected = serde_json::to_string(default_json).unwrap();
3181        assert_eq!(stored, expected, "stored default metadata should match");
3182    }
3183
3184    #[test]
3185    fn test_validate_and_store_default_null_and_nullability_rules() {
3186        let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
3187        let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
3188        assert_eq!(lit, AvroLiteral::Null);
3189        assert_default_stored(&dt_null, &Value::Null);
3190        let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3191        let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
3192        assert!(
3193            err.to_string()
3194                .contains("JSON null default is only valid for `null` type"),
3195            "unexpected error: {err}"
3196        );
3197        let mut dt_int_nf =
3198            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
3199        let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
3200        assert_eq!(lit2, AvroLiteral::Null);
3201        assert_default_stored(&dt_int_nf, &Value::Null);
3202        let mut dt_int_ns =
3203            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
3204        let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
3205        assert!(
3206            err2.to_string()
3207                .contains("JSON null default is only valid for `null` type"),
3208            "unexpected error: {err2}"
3209        );
3210    }
3211
3212    #[test]
3213    fn test_validate_and_store_default_primitives_and_temporal() {
3214        let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
3215        let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
3216        assert_eq!(lit, AvroLiteral::Boolean(true));
3217        assert_default_stored(&dt_bool, &Value::Bool(true));
3218        let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3219        let lit = dt_i32
3220            .parse_and_store_default(&serde_json::json!(123))
3221            .unwrap();
3222        assert_eq!(lit, AvroLiteral::Int(123));
3223        assert_default_stored(&dt_i32, &serde_json::json!(123));
3224        let err = dt_i32
3225            .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
3226            .unwrap_err();
3227        assert!(format!("{err}").contains("out of i32 range"));
3228        let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3229        let lit = dt_i64
3230            .parse_and_store_default(&serde_json::json!(1234567890))
3231            .unwrap();
3232        assert_eq!(lit, AvroLiteral::Long(1234567890));
3233        assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
3234        let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
3235        let lit = dt_f32
3236            .parse_and_store_default(&serde_json::json!(1.25))
3237            .unwrap();
3238        assert_eq!(lit, AvroLiteral::Float(1.25));
3239        assert_default_stored(&dt_f32, &serde_json::json!(1.25));
3240        let err = dt_f32
3241            .parse_and_store_default(&serde_json::json!(1e39))
3242            .unwrap_err();
3243        assert!(format!("{err}").contains("out of f32 range"));
3244        let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3245        let lit = dt_f64
3246            .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
3247            .unwrap();
3248        assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
3249        assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
3250        let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
3251        let l = dt_str
3252            .parse_and_store_default(&json_string("hello"))
3253            .unwrap();
3254        assert_eq!(l, AvroLiteral::String("hello".into()));
3255        assert_default_stored(&dt_str, &json_string("hello"));
3256        let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
3257        let l = dt_strv
3258            .parse_and_store_default(&json_string("view"))
3259            .unwrap();
3260        assert_eq!(l, AvroLiteral::String("view".into()));
3261        assert_default_stored(&dt_strv, &json_string("view"));
3262        let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
3263        let l = dt_uuid
3264            .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
3265            .unwrap();
3266        assert_eq!(
3267            l,
3268            AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
3269        );
3270        let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
3271        let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
3272        assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
3273        let err = dt_bin
3274            .parse_and_store_default(&json_string("€")) // U+20AC
3275            .unwrap_err();
3276        assert!(format!("{err}").contains("Invalid codepoint"));
3277        let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
3278        let ld = dt_date
3279            .parse_and_store_default(&serde_json::json!(1))
3280            .unwrap();
3281        assert_eq!(ld, AvroLiteral::Int(1));
3282        let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
3283        let lt = dt_tmill
3284            .parse_and_store_default(&serde_json::json!(86_400_000))
3285            .unwrap();
3286        assert_eq!(lt, AvroLiteral::Int(86_400_000));
3287        let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
3288        let ltm = dt_tmicros
3289            .parse_and_store_default(&serde_json::json!(1_000_000))
3290            .unwrap();
3291        assert_eq!(ltm, AvroLiteral::Long(1_000_000));
3292        let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
3293        let l1 = dt_ts_milli
3294            .parse_and_store_default(&serde_json::json!(123))
3295            .unwrap();
3296        assert_eq!(l1, AvroLiteral::Long(123));
3297        let mut dt_ts_micro =
3298            AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
3299        let l2 = dt_ts_micro
3300            .parse_and_store_default(&serde_json::json!(456))
3301            .unwrap();
3302        assert_eq!(l2, AvroLiteral::Long(456));
3303    }
3304
3305    #[cfg(feature = "avro_custom_types")]
3306    #[test]
3307    fn test_validate_and_store_default_custom_integer_ranges() {
3308        let mut dt_i8 = AvroDataType::new(Codec::Int8, HashMap::new(), None);
3309        let lit_i8 = dt_i8
3310            .parse_and_store_default(&serde_json::json!(i8::MAX))
3311            .unwrap();
3312        assert_eq!(lit_i8, AvroLiteral::Int(i8::MAX as i32));
3313        let err_i8_high = dt_i8
3314            .parse_and_store_default(&serde_json::json!(i8::MAX as i64 + 1))
3315            .unwrap_err();
3316        assert!(err_i8_high.to_string().contains("out of i8 range"));
3317        let err_i8_low = dt_i8
3318            .parse_and_store_default(&serde_json::json!(i8::MIN as i64 - 1))
3319            .unwrap_err();
3320        assert!(err_i8_low.to_string().contains("out of i8 range"));
3321
3322        let mut dt_i16 = AvroDataType::new(Codec::Int16, HashMap::new(), None);
3323        let lit_i16 = dt_i16
3324            .parse_and_store_default(&serde_json::json!(i16::MIN))
3325            .unwrap();
3326        assert_eq!(lit_i16, AvroLiteral::Int(i16::MIN as i32));
3327        let err_i16_high = dt_i16
3328            .parse_and_store_default(&serde_json::json!(i16::MAX as i64 + 1))
3329            .unwrap_err();
3330        assert!(err_i16_high.to_string().contains("out of i16 range"));
3331        let err_i16_low = dt_i16
3332            .parse_and_store_default(&serde_json::json!(i16::MIN as i64 - 1))
3333            .unwrap_err();
3334        assert!(err_i16_low.to_string().contains("out of i16 range"));
3335
3336        let mut dt_u8 = AvroDataType::new(Codec::UInt8, HashMap::new(), None);
3337        let lit_u8 = dt_u8
3338            .parse_and_store_default(&serde_json::json!(u8::MAX))
3339            .unwrap();
3340        assert_eq!(lit_u8, AvroLiteral::Int(u8::MAX as i32));
3341        let err_u8_neg = dt_u8
3342            .parse_and_store_default(&serde_json::json!(-1))
3343            .unwrap_err();
3344        assert!(err_u8_neg.to_string().contains("out of u8 range"));
3345        let err_u8_high = dt_u8
3346            .parse_and_store_default(&serde_json::json!(u8::MAX as i64 + 1))
3347            .unwrap_err();
3348        assert!(err_u8_high.to_string().contains("out of u8 range"));
3349
3350        let mut dt_u16 = AvroDataType::new(Codec::UInt16, HashMap::new(), None);
3351        let lit_u16 = dt_u16
3352            .parse_and_store_default(&serde_json::json!(u16::MAX))
3353            .unwrap();
3354        assert_eq!(lit_u16, AvroLiteral::Int(u16::MAX as i32));
3355        let err_u16_neg = dt_u16
3356            .parse_and_store_default(&serde_json::json!(-1))
3357            .unwrap_err();
3358        assert!(err_u16_neg.to_string().contains("out of u16 range"));
3359        let err_u16_high = dt_u16
3360            .parse_and_store_default(&serde_json::json!(u16::MAX as i64 + 1))
3361            .unwrap_err();
3362        assert!(err_u16_high.to_string().contains("out of u16 range"));
3363
3364        let mut dt_u32 = AvroDataType::new(Codec::UInt32, HashMap::new(), None);
3365        let lit_u32 = dt_u32
3366            .parse_and_store_default(&serde_json::json!(u32::MAX as i64))
3367            .unwrap();
3368        assert_eq!(lit_u32, AvroLiteral::Long(u32::MAX as i64));
3369        let err_u32_neg = dt_u32
3370            .parse_and_store_default(&serde_json::json!(-1))
3371            .unwrap_err();
3372        assert!(err_u32_neg.to_string().contains("out of u32 range"));
3373        let err_u32_high = dt_u32
3374            .parse_and_store_default(&serde_json::json!(u32::MAX as i64 + 1))
3375            .unwrap_err();
3376        assert!(err_u32_high.to_string().contains("out of u32 range"));
3377    }
3378
3379    #[test]
3380    fn test_validate_and_store_default_fixed_decimal_interval() {
3381        let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
3382        let l = dt_fixed
3383            .parse_and_store_default(&json_string("WXYZ"))
3384            .unwrap();
3385        assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
3386        let err = dt_fixed
3387            .parse_and_store_default(&json_string("TOO LONG"))
3388            .unwrap_err();
3389        assert!(err.to_string().contains("Default length"));
3390        let mut dt_dec_fixed =
3391            AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
3392        let l = dt_dec_fixed
3393            .parse_and_store_default(&json_string("abc"))
3394            .unwrap();
3395        assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
3396        let err = dt_dec_fixed
3397            .parse_and_store_default(&json_string("toolong"))
3398            .unwrap_err();
3399        assert!(err.to_string().contains("Default length"));
3400        let mut dt_dec_bytes =
3401            AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3402        let l = dt_dec_bytes
3403            .parse_and_store_default(&json_string("freeform"))
3404            .unwrap();
3405        assert_eq!(
3406            l,
3407            AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
3408        );
3409        let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
3410        let l = dt_interval
3411            .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
3412            .unwrap();
3413        assert_eq!(
3414            l,
3415            AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
3416        );
3417        let err = dt_interval
3418            .parse_and_store_default(&json_string("short"))
3419            .unwrap_err();
3420        assert!(err.to_string().contains("Default length"));
3421    }
3422
3423    #[test]
3424    fn test_validate_and_store_default_enum_list_map_struct() {
3425        let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
3426            .into_iter()
3427            .collect();
3428        let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
3429        let l = dt_enum
3430            .parse_and_store_default(&json_string("GREEN"))
3431            .unwrap();
3432        assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
3433        let err = dt_enum
3434            .parse_and_store_default(&json_string("YELLOW"))
3435            .unwrap_err();
3436        assert!(err.to_string().contains("Default enum symbol"));
3437        let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3438        let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
3439        let val = serde_json::json!([1, 2, 3]);
3440        let l = dt_list.parse_and_store_default(&val).unwrap();
3441        assert_eq!(
3442            l,
3443            AvroLiteral::Array(vec![
3444                AvroLiteral::Long(1),
3445                AvroLiteral::Long(2),
3446                AvroLiteral::Long(3)
3447            ])
3448        );
3449        let err = dt_list
3450            .parse_and_store_default(&serde_json::json!({"not":"array"}))
3451            .unwrap_err();
3452        assert!(err.to_string().contains("JSON array"));
3453        let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3454        let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
3455        let mv = serde_json::json!({"x": 1.5, "y": 2.5});
3456        let l = dt_map.parse_and_store_default(&mv).unwrap();
3457        let mut expected = IndexMap::new();
3458        expected.insert("x".into(), AvroLiteral::Double(1.5));
3459        expected.insert("y".into(), AvroLiteral::Double(2.5));
3460        assert_eq!(l, AvroLiteral::Map(expected));
3461        // Not object -> error
3462        let err = dt_map
3463            .parse_and_store_default(&serde_json::json!(123))
3464            .unwrap_err();
3465        assert!(err.to_string().contains("JSON object"));
3466        let mut field_a = AvroField {
3467            name: "a".into(),
3468            data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
3469        };
3470        let field_b = AvroField {
3471            name: "b".into(),
3472            data_type: AvroDataType::new(
3473                Codec::Int64,
3474                HashMap::new(),
3475                Some(Nullability::NullFirst),
3476            ),
3477        };
3478        let mut c_md = HashMap::new();
3479        c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
3480        let field_c = AvroField {
3481            name: "c".into(),
3482            data_type: AvroDataType::new(Codec::Utf8, c_md, None),
3483        };
3484        field_a.data_type.metadata.insert("doc".into(), "na".into());
3485        let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
3486        let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
3487        let default_obj = serde_json::json!({"a": 7});
3488        let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
3489        let mut expected = IndexMap::new();
3490        expected.insert("a".into(), AvroLiteral::Int(7));
3491        expected.insert("b".into(), AvroLiteral::Null);
3492        expected.insert("c".into(), AvroLiteral::String("xyz".into()));
3493        assert_eq!(l, AvroLiteral::Map(expected));
3494        assert_default_stored(&dt_struct, &default_obj);
3495        let req_field = AvroField {
3496            name: "req".into(),
3497            data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
3498        };
3499        let mut dt_bad = AvroDataType::new(
3500            Codec::Struct(Arc::from(vec![req_field])),
3501            HashMap::new(),
3502            None,
3503        );
3504        let err = dt_bad
3505            .parse_and_store_default(&serde_json::json!({}))
3506            .unwrap_err();
3507        assert!(
3508            err.to_string().contains("missing required subfield 'req'"),
3509            "unexpected error: {err}"
3510        );
3511        let err = dt_struct
3512            .parse_and_store_default(&serde_json::json!(10))
3513            .unwrap_err();
3514        err.to_string().contains("must be a JSON object");
3515    }
3516
3517    #[test]
3518    fn test_resolve_array_promotion_and_reader_metadata() {
3519        let mut w_add: HashMap<&str, Value> = HashMap::new();
3520        w_add.insert("who", json_string("writer"));
3521        let mut r_add: HashMap<&str, Value> = HashMap::new();
3522        r_add.insert("who", json_string("reader"));
3523        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3524            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3525            attributes: Attributes {
3526                logical_type: None,
3527                additional: w_add,
3528            },
3529        }));
3530        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3531            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
3532            attributes: Attributes {
3533                logical_type: None,
3534                additional: r_add,
3535            },
3536        }));
3537        let mut maker = Maker::new(false, false);
3538        let dt = maker
3539            .make_data_type(&writer_schema, Some(&reader_schema), None)
3540            .unwrap();
3541        assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
3542        if let Codec::List(inner) = dt.codec() {
3543            assert!(matches!(inner.codec(), Codec::Int64));
3544            assert_eq!(
3545                inner.resolution,
3546                Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3547            );
3548        } else {
3549            panic!("expected list codec");
3550        }
3551    }
3552
3553    #[test]
3554    fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
3555        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3556            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3557            attributes: Attributes::default(),
3558        }));
3559        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3560            items: Box::new(mk_union(vec![
3561                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3562                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3563            ])),
3564            attributes: Attributes::default(),
3565        }));
3566        let mut maker = Maker::new(false, false);
3567        let dt = maker
3568            .make_data_type(&writer_schema, Some(&reader_schema), None)
3569            .unwrap();
3570        if let Codec::List(inner) = dt.codec() {
3571            assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
3572            assert!(matches!(inner.codec(), Codec::Int32));
3573            match inner.resolution.as_ref() {
3574                Some(ResolutionInfo::Promotion(Promotion::Direct)) => {}
3575                other => panic!("expected Union resolution, got {other:?}"),
3576            }
3577        } else {
3578            panic!("expected List codec");
3579        }
3580    }
3581
3582    #[test]
3583    fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3584        let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3585            name: "MD5",
3586            namespace: None,
3587            aliases: vec!["Hash16"],
3588            size: 16,
3589            attributes: Attributes::default(),
3590        }));
3591        let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3592            name: "Hash16",
3593            namespace: None,
3594            aliases: vec![],
3595            size: 16,
3596            attributes: Attributes::default(),
3597        }));
3598        let mut maker = Maker::new(false, false);
3599        let dt = maker
3600            .make_data_type(&writer_schema, Some(&reader_schema), None)
3601            .unwrap();
3602        assert!(matches!(dt.codec(), Codec::Fixed(16)));
3603    }
3604
3605    #[cfg(feature = "avro_custom_types")]
3606    #[test]
3607    fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3608        let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3609            name: "ArrowIntervalMDN",
3610            namespace: None,
3611            aliases: vec![],
3612            size: 16,
3613            attributes: Attributes {
3614                logical_type: Some("arrow.interval-month-day-nano"),
3615                additional: Default::default(),
3616            },
3617        }));
3618        let mut maker = Maker::new(false, false);
3619        let dt = maker.make_data_type(&schema, None, None).unwrap();
3620        assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3621        assert_eq!(
3622            dt.codec.data_type(),
3623            DataType::Interval(IntervalUnit::MonthDayNano)
3624        );
3625    }
3626
3627    #[test]
3628    fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3629        let writer = Schema::Complex(ComplexType::Record(Record {
3630            name: "R",
3631            namespace: None,
3632            doc: None,
3633            aliases: vec![],
3634            fields: vec![
3635                crate::schema::Field {
3636                    name: "a",
3637                    doc: None,
3638                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3639                    default: None,
3640                    aliases: vec![],
3641                },
3642                crate::schema::Field {
3643                    name: "skipme",
3644                    doc: None,
3645                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3646                    default: None,
3647                    aliases: vec![],
3648                },
3649                crate::schema::Field {
3650                    name: "b",
3651                    doc: None,
3652                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3653                    default: None,
3654                    aliases: vec![],
3655                },
3656            ],
3657            attributes: Attributes::default(),
3658        }));
3659        let reader = Schema::Complex(ComplexType::Record(Record {
3660            name: "R",
3661            namespace: None,
3662            doc: None,
3663            aliases: vec![],
3664            fields: vec![
3665                crate::schema::Field {
3666                    name: "b",
3667                    doc: None,
3668                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3669                    default: None,
3670                    aliases: vec![],
3671                },
3672                crate::schema::Field {
3673                    name: "a",
3674                    doc: None,
3675                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3676                    default: None,
3677                    aliases: vec![],
3678                },
3679                crate::schema::Field {
3680                    name: "name",
3681                    doc: None,
3682                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3683                    default: Some(json_string("anon")),
3684                    aliases: vec![],
3685                },
3686                crate::schema::Field {
3687                    name: "opt",
3688                    doc: None,
3689                    r#type: Schema::Union(vec![
3690                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3691                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3692                    ]),
3693                    default: None, // should default to null because NullFirst
3694                    aliases: vec![],
3695                },
3696            ],
3697            attributes: Attributes::default(),
3698        }));
3699        let mut maker = Maker::new(false, false);
3700        let dt = maker
3701            .make_data_type(&writer, Some(&reader), None)
3702            .expect("record resolution");
3703        let fields = match dt.codec() {
3704            Codec::Struct(f) => f,
3705            other => panic!("expected struct, got {other:?}"),
3706        };
3707        assert_eq!(fields.len(), 4);
3708        assert_eq!(fields[0].name(), "b");
3709        assert_eq!(fields[1].name(), "a");
3710        assert_eq!(fields[2].name(), "name");
3711        assert_eq!(fields[3].name(), "opt");
3712        assert!(matches!(
3713            fields[1].data_type().resolution,
3714            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3715        ));
3716        let rec = match dt.resolution {
3717            Some(ResolutionInfo::Record(ref r)) => r.clone(),
3718            other => panic!("expected record resolution, got {other:?}"),
3719        };
3720        assert!(matches!(
3721            &rec.writer_fields[..],
3722            &[
3723                ResolvedField::ToReader(1),
3724                ResolvedField::Skip(_),
3725                ResolvedField::ToReader(0),
3726            ]
3727        ));
3728        assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3729        let ResolvedField::Skip(skip1) = &rec.writer_fields[1] else {
3730            panic!("should skip field 1")
3731        };
3732        assert!(matches!(skip1.codec(), Codec::Utf8));
3733        let name_md = &fields[2].data_type().metadata;
3734        assert_eq!(
3735            name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3736            Some(&"\"anon\"".to_string())
3737        );
3738        let opt_md = &fields[3].data_type().metadata;
3739        assert_eq!(
3740            opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3741            Some(&"null".to_string())
3742        );
3743    }
3744
3745    #[test]
3746    fn test_named_type_alias_resolution_record_cross_namespace() {
3747        let writer_record = Record {
3748            name: "PersonV2",
3749            namespace: Some("com.example.v2"),
3750            doc: None,
3751            aliases: vec!["com.example.Person"],
3752            fields: vec![
3753                AvroFieldSchema {
3754                    name: "name",
3755                    doc: None,
3756                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3757                    default: None,
3758                    aliases: vec![],
3759                },
3760                AvroFieldSchema {
3761                    name: "age",
3762                    doc: None,
3763                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3764                    default: None,
3765                    aliases: vec![],
3766                },
3767            ],
3768            attributes: Attributes::default(),
3769        };
3770        let reader_record = Record {
3771            name: "Person",
3772            namespace: Some("com.example"),
3773            doc: None,
3774            aliases: vec![],
3775            fields: writer_record.fields.clone(),
3776            attributes: Attributes::default(),
3777        };
3778        let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3779        let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3780        let mut maker = Maker::new(false, false);
3781        let result = maker
3782            .make_data_type(&writer_schema, Some(&reader_schema), None)
3783            .expect("record alias resolution should succeed");
3784        match result.codec {
3785            Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3786            other => panic!("expected struct, got {other:?}"),
3787        }
3788    }
3789
3790    #[test]
3791    fn test_named_type_alias_resolution_enum_cross_namespace() {
3792        let writer_enum = Enum {
3793            name: "ColorV2",
3794            namespace: Some("org.example.v2"),
3795            doc: None,
3796            aliases: vec!["org.example.Color"],
3797            symbols: vec!["RED", "GREEN", "BLUE"],
3798            default: None,
3799            attributes: Attributes::default(),
3800        };
3801        let reader_enum = Enum {
3802            name: "Color",
3803            namespace: Some("org.example"),
3804            doc: None,
3805            aliases: vec![],
3806            symbols: vec!["RED", "GREEN", "BLUE"],
3807            default: None,
3808            attributes: Attributes::default(),
3809        };
3810        let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3811        let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3812        let mut maker = Maker::new(false, false);
3813        maker
3814            .make_data_type(&writer_schema, Some(&reader_schema), None)
3815            .expect("enum alias resolution should succeed");
3816    }
3817
3818    #[test]
3819    fn test_named_type_alias_resolution_fixed_cross_namespace() {
3820        let writer_fixed = Fixed {
3821            name: "Fx10V2",
3822            namespace: Some("ns.v2"),
3823            aliases: vec!["ns.Fx10"],
3824            size: 10,
3825            attributes: Attributes::default(),
3826        };
3827        let reader_fixed = Fixed {
3828            name: "Fx10",
3829            namespace: Some("ns"),
3830            aliases: vec![],
3831            size: 10,
3832            attributes: Attributes::default(),
3833        };
3834        let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3835        let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3836        let mut maker = Maker::new(false, false);
3837        maker
3838            .make_data_type(&writer_schema, Some(&reader_schema), None)
3839            .expect("fixed alias resolution should succeed");
3840    }
3841}