Skip to main content

arrow_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Codec for Mapping Avro and Arrow types.
19
20use crate::schema::{
21    AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22    AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23    PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27    IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40/// Contains information about how to resolve differences between a writer's and a reader's schema.
41#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43    /// Indicates that the writer's type should be promoted to the reader's type.
44    Promotion(Promotion),
45    /// Indicates that a default value should be used for a field.
46    DefaultValue(AvroLiteral),
47    /// Provides mapping information for resolving enums.
48    EnumMapping(EnumMapping),
49    /// Provides resolution information for record fields.
50    Record(ResolvedRecord),
51    /// Provides mapping and shape info for resolving unions.
52    Union(ResolvedUnion),
53}
54
55/// Represents a literal Avro value.
56///
57/// This is used to represent default values in an Avro schema.
58#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60    /// Represents a null value.
61    Null,
62    /// Represents a boolean value.
63    Boolean(bool),
64    /// Represents an integer value.
65    Int(i32),
66    /// Represents a long value.
67    Long(i64),
68    /// Represents a float value.
69    Float(f32),
70    /// Represents a double value.
71    Double(f64),
72    /// Represents a bytes value.
73    Bytes(Vec<u8>),
74    /// Represents a string value.
75    String(String),
76    /// Represents an enum symbol.
77    Enum(String),
78    /// Represents a JSON array default for an Avro array, containing element literals.
79    Array(Vec<AvroLiteral>),
80    /// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals.
81    Map(IndexMap<String, AvroLiteral>),
82}
83
84/// Contains the necessary information to resolve a writer's record against a reader's record schema.
85#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87    /// Maps a writer's field index to the field's resolution against the reader's schema.
88    pub(crate) writer_fields: Arc<[ResolvedField]>,
89    /// A list of indices in the reader's schema for fields that have a default value.
90    pub(crate) default_fields: Arc<[usize]>,
91}
92
93/// Resolution information for record fields in the writer schema.
94#[derive(Debug, Clone, PartialEq)]
95pub(crate) enum ResolvedField {
96    /// Resolves to a field indexed in the reader schema.
97    ToReader(usize),
98    /// For fields present in the writer's schema but not the reader's, this stores their data type.
99    /// This is needed to correctly skip over these fields during deserialization.
100    Skip(AvroDataType),
101}
102
103/// Defines the type of promotion to be applied during schema resolution.
104///
105/// Schema resolution may require promoting a writer's data type to a reader's data type.
106/// For example, an `int` can be promoted to a `long`, `float`, or `double`.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub(crate) enum Promotion {
109    /// Direct read with no data type promotion.
110    Direct,
111    /// Promotes an `int` to a `long`.
112    IntToLong,
113    /// Promotes an `int` to a `float`.
114    IntToFloat,
115    /// Promotes an `int` to a `double`.
116    IntToDouble,
117    /// Promotes a `long` to a `float`.
118    LongToFloat,
119    /// Promotes a `long` to a `double`.
120    LongToDouble,
121    /// Promotes a `float` to a `double`.
122    FloatToDouble,
123    /// Promotes a `string` to `bytes`.
124    StringToBytes,
125    /// Promotes `bytes` to a `string`.
126    BytesToString,
127}
128
129impl Display for Promotion {
130    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
131        match self {
132            Self::Direct => write!(formatter, "Direct"),
133            Self::IntToLong => write!(formatter, "Int->Long"),
134            Self::IntToFloat => write!(formatter, "Int->Float"),
135            Self::IntToDouble => write!(formatter, "Int->Double"),
136            Self::LongToFloat => write!(formatter, "Long->Float"),
137            Self::LongToDouble => write!(formatter, "Long->Double"),
138            Self::FloatToDouble => write!(formatter, "Float->Double"),
139            Self::StringToBytes => write!(formatter, "String->Bytes"),
140            Self::BytesToString => write!(formatter, "Bytes->String"),
141        }
142    }
143}
144
145/// Information required to resolve a writer union against a reader union (or single type).
146#[derive(Debug, Clone, PartialEq)]
147pub(crate) struct ResolvedUnion {
148    /// For each writer branch index, the reader branch index and how to read it.
149    /// `None` means the writer branch doesn't resolve against the reader.
150    pub(crate) writer_to_reader: Arc<[Option<(usize, ResolutionInfo)>]>,
151    /// Whether the writer schema at this site is a union
152    pub(crate) writer_is_union: bool,
153    /// Whether the reader schema at this site is a union
154    pub(crate) reader_is_union: bool,
155}
156
157/// Holds the mapping information for resolving Avro enums.
158///
159/// When resolving schemas, the writer's enum symbols must be mapped to the reader's symbols.
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub(crate) struct EnumMapping {
162    /// A mapping from the writer's symbol index to the reader's symbol index.
163    pub(crate) mapping: Arc<[i32]>,
164    /// The index to use for a writer's symbol that is not present in the reader's enum
165    /// and a default value is specified in the reader's schema.
166    pub(crate) default_index: i32,
167}
168
169#[cfg(feature = "canonical_extension_types")]
170fn with_extension_type(codec: &Codec, field: Field) -> Field {
171    match codec {
172        Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
173        _ => field,
174    }
175}
176
177/// An Avro datatype mapped to the arrow data model
178#[derive(Debug, Clone, PartialEq)]
179pub(crate) struct AvroDataType {
180    nullability: Option<Nullability>,
181    metadata: HashMap<String, String>,
182    codec: Codec,
183    pub(crate) resolution: Option<ResolutionInfo>,
184}
185
186impl AvroDataType {
187    /// Create a new [`AvroDataType`] with the given parts.
188    pub(crate) fn new(
189        codec: Codec,
190        metadata: HashMap<String, String>,
191        nullability: Option<Nullability>,
192    ) -> Self {
193        AvroDataType {
194            codec,
195            metadata,
196            nullability,
197            resolution: None,
198        }
199    }
200
201    #[inline]
202    fn new_with_resolution(
203        codec: Codec,
204        metadata: HashMap<String, String>,
205        nullability: Option<Nullability>,
206        resolution: Option<ResolutionInfo>,
207    ) -> Self {
208        Self {
209            codec,
210            metadata,
211            nullability,
212            resolution,
213        }
214    }
215
216    /// Returns an arrow [`Field`] with the given name
217    pub(crate) fn field_with_name(&self, name: &str) -> Field {
218        let mut nullable = self.nullability.is_some();
219        if !nullable {
220            if let Codec::Union(children, _, _) = self.codec() {
221                // If any encoded branch is `null`, mark field as nullable
222                if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
223                    nullable = true;
224                }
225            }
226        }
227        let data_type = self.codec.data_type();
228        let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
229        #[cfg(feature = "canonical_extension_types")]
230        return with_extension_type(&self.codec, field);
231        #[cfg(not(feature = "canonical_extension_types"))]
232        field
233    }
234
235    /// Returns a reference to the codec used by this data type
236    ///
237    /// The codec determines how Avro data is encoded and mapped to Arrow data types.
238    /// This is useful when we need to inspect or use the specific encoding of a field.
239    pub(crate) fn codec(&self) -> &Codec {
240        &self.codec
241    }
242
243    /// Returns the nullability status of this data type
244    ///
245    /// In Avro, nullability is represented through unions with null types.
246    /// The returned value indicates how nulls are encoded in the Avro format:
247    /// - `Some(Nullability::NullFirst)` - Nulls are encoded as the first union variant
248    /// - `Some(Nullability::NullSecond)` - Nulls are encoded as the second union variant
249    /// - `None` - The type is not nullable
250    pub(crate) fn nullability(&self) -> Option<Nullability> {
251        self.nullability
252    }
253
254    #[inline]
255    fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
256        fn expect_string<'v>(
257            default_json: &'v Value,
258            data_type: &str,
259        ) -> Result<&'v str, ArrowError> {
260            match default_json {
261                Value::String(s) => Ok(s.as_str()),
262                _ => Err(ArrowError::SchemaError(format!(
263                    "Default value must be a JSON string for {data_type}"
264                ))),
265            }
266        }
267
268        fn parse_bytes_default(
269            default_json: &Value,
270            expected_len: Option<usize>,
271        ) -> Result<Vec<u8>, ArrowError> {
272            let s = expect_string(default_json, "bytes/fixed logical types")?;
273            let mut out = Vec::with_capacity(s.len());
274            for ch in s.chars() {
275                let cp = ch as u32;
276                if cp > 0xFF {
277                    return Err(ArrowError::SchemaError(format!(
278                        "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
279                    )));
280                }
281                out.push(cp as u8);
282            }
283            if let Some(len) = expected_len {
284                if out.len() != len {
285                    return Err(ArrowError::SchemaError(format!(
286                        "Default length {} does not match expected fixed size {len}",
287                        out.len(),
288                    )));
289                }
290            }
291            Ok(out)
292        }
293
294        fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
295            match default_json {
296                Value::Number(n) => n.as_i64().ok_or_else(|| {
297                    ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
298                }),
299                _ => Err(ArrowError::SchemaError(format!(
300                    "Default {data_type} must be a JSON integer"
301                ))),
302            }
303        }
304
305        fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
306            match default_json {
307                Value::Number(n) => n.as_f64().ok_or_else(|| {
308                    ArrowError::SchemaError(format!("Default {data_type} must be a number"))
309                }),
310                _ => Err(ArrowError::SchemaError(format!(
311                    "Default {data_type} must be a JSON number"
312                ))),
313            }
314        }
315
316        // Handle JSON nulls per-spec: allowed only for `null` type or unions with null FIRST
317        if default_json.is_null() {
318            return match self.codec() {
319                Codec::Null => Ok(AvroLiteral::Null),
320                Codec::Union(encodings, _, _) if !encodings.is_empty()
321                    && matches!(encodings[0].codec(), Codec::Null) =>
322                    {
323                        Ok(AvroLiteral::Null)
324                    }
325                _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
326                _ => Err(ArrowError::SchemaError(
327                    "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
328                        .to_string(),
329                )),
330            };
331        }
332        let lit = match self.codec() {
333            Codec::Null => {
334                return Err(ArrowError::SchemaError(
335                    "Default for `null` type must be JSON null".to_string(),
336                ));
337            }
338            Codec::Boolean => match default_json {
339                Value::Bool(b) => AvroLiteral::Boolean(*b),
340                _ => {
341                    return Err(ArrowError::SchemaError(
342                        "Boolean default must be a JSON boolean".to_string(),
343                    ));
344                }
345            },
346            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
347                let i = parse_json_i64(default_json, "int")?;
348                if i < i32::MIN as i64 || i > i32::MAX as i64 {
349                    return Err(ArrowError::SchemaError(format!(
350                        "Default int {i} out of i32 range"
351                    )));
352                }
353                AvroLiteral::Int(i as i32)
354            }
355            Codec::Int64
356            | Codec::TimeMicros
357            | Codec::TimestampMillis(_)
358            | Codec::TimestampMicros(_)
359            | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
360            #[cfg(feature = "avro_custom_types")]
361            Codec::DurationNanos
362            | Codec::DurationMicros
363            | Codec::DurationMillis
364            | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
365            #[cfg(feature = "avro_custom_types")]
366            Codec::Int8 => {
367                let i = parse_json_i64(default_json, "int")?;
368                if i < i8::MIN as i64 || i > i8::MAX as i64 {
369                    return Err(ArrowError::SchemaError(format!(
370                        "Default int8 {i} out of i8 range"
371                    )));
372                }
373                AvroLiteral::Int(i as i32)
374            }
375            #[cfg(feature = "avro_custom_types")]
376            Codec::Int16 => {
377                let i = parse_json_i64(default_json, "int")?;
378                if i < i16::MIN as i64 || i > i16::MAX as i64 {
379                    return Err(ArrowError::SchemaError(format!(
380                        "Default int16 {i} out of i16 range"
381                    )));
382                }
383                AvroLiteral::Int(i as i32)
384            }
385            #[cfg(feature = "avro_custom_types")]
386            Codec::UInt8 => {
387                let i = parse_json_i64(default_json, "int")?;
388                if i < 0 || i > u8::MAX as i64 {
389                    return Err(ArrowError::SchemaError(format!(
390                        "Default uint8 {i} out of u8 range"
391                    )));
392                }
393                AvroLiteral::Int(i as i32)
394            }
395            #[cfg(feature = "avro_custom_types")]
396            Codec::UInt16 => {
397                let i = parse_json_i64(default_json, "int")?;
398                if i < 0 || i > u16::MAX as i64 {
399                    return Err(ArrowError::SchemaError(format!(
400                        "Default uint16 {i} out of u16 range"
401                    )));
402                }
403                AvroLiteral::Int(i as i32)
404            }
405            #[cfg(feature = "avro_custom_types")]
406            Codec::UInt32 => {
407                let i = parse_json_i64(default_json, "long")?;
408                if i < 0 || i > u32::MAX as i64 {
409                    return Err(ArrowError::SchemaError(format!(
410                        "Default uint32 {i} out of u32 range"
411                    )));
412                }
413                AvroLiteral::Long(i)
414            }
415            #[cfg(feature = "avro_custom_types")]
416            Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
417                AvroLiteral::Long(parse_json_i64(default_json, "long")?)
418            }
419            #[cfg(feature = "avro_custom_types")]
420            Codec::UInt64 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?),
421            #[cfg(feature = "avro_custom_types")]
422            Codec::Float16 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(2))?),
423            #[cfg(feature = "avro_custom_types")]
424            Codec::Time32Secs => {
425                let i = parse_json_i64(default_json, "int")?;
426                if i < i32::MIN as i64 || i > i32::MAX as i64 {
427                    return Err(ArrowError::SchemaError(format!(
428                        "Default time32-secs {i} out of i32 range"
429                    )));
430                }
431                AvroLiteral::Int(i as i32)
432            }
433            #[cfg(feature = "avro_custom_types")]
434            Codec::IntervalYearMonth => {
435                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
436            }
437            #[cfg(feature = "avro_custom_types")]
438            Codec::IntervalMonthDayNano => {
439                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
440            }
441            #[cfg(feature = "avro_custom_types")]
442            Codec::IntervalDayTime => {
443                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
444            }
445            Codec::Float32 => {
446                let f = parse_json_f64(default_json, "float")?;
447                if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
448                    return Err(ArrowError::SchemaError(format!(
449                        "Default float {f} out of f32 range or not finite"
450                    )));
451                }
452                AvroLiteral::Float(f as f32)
453            }
454            Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
455            Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
456                AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
457            }
458            Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
459            Codec::Fixed(sz) => {
460                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
461            }
462            Codec::Decimal(_, _, fixed_size) => {
463                AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
464            }
465            Codec::Enum(symbols) => {
466                let s = expect_string(default_json, "enum")?;
467                if symbols.iter().any(|sym| sym == s) {
468                    AvroLiteral::Enum(s.to_string())
469                } else {
470                    return Err(ArrowError::SchemaError(format!(
471                        "Default enum symbol {s:?} not found in reader enum symbols"
472                    )));
473                }
474            }
475            Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
476            Codec::List(item_dt) => match default_json {
477                Value::Array(items) => AvroLiteral::Array(
478                    items
479                        .iter()
480                        .map(|v| item_dt.parse_default_literal(v))
481                        .collect::<Result<_, _>>()?,
482                ),
483                _ => {
484                    return Err(ArrowError::SchemaError(
485                        "Default value must be a JSON array for Avro array type".to_string(),
486                    ));
487                }
488            },
489            Codec::Map(val_dt) => match default_json {
490                Value::Object(map) => {
491                    let mut out = IndexMap::with_capacity(map.len());
492                    for (k, v) in map {
493                        out.insert(k.clone(), val_dt.parse_default_literal(v)?);
494                    }
495                    AvroLiteral::Map(out)
496                }
497                _ => {
498                    return Err(ArrowError::SchemaError(
499                        "Default value must be a JSON object for Avro map type".to_string(),
500                    ));
501                }
502            },
503            Codec::Struct(fields) => match default_json {
504                Value::Object(obj) => {
505                    let mut out: IndexMap<String, AvroLiteral> =
506                        IndexMap::with_capacity(fields.len());
507                    for f in fields.as_ref() {
508                        let name = f.name().to_string();
509                        if let Some(sub) = obj.get(&name) {
510                            out.insert(name, f.data_type().parse_default_literal(sub)?);
511                        } else {
512                            // Cache metadata lookup once
513                            let stored_default =
514                                f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
515                            if stored_default.is_none()
516                                && f.data_type().nullability() == Some(Nullability::default())
517                            {
518                                out.insert(name, AvroLiteral::Null);
519                            } else if let Some(default_json) = stored_default {
520                                let v: Value =
521                                    serde_json::from_str(default_json).map_err(|e| {
522                                        ArrowError::SchemaError(format!(
523                                            "Failed to parse stored subfield default JSON for '{}': {e}",
524                                            f.name(),
525                                        ))
526                                    })?;
527                                out.insert(name, f.data_type().parse_default_literal(&v)?);
528                            } else {
529                                return Err(ArrowError::SchemaError(format!(
530                                    "Record default missing required subfield '{}' with non-nullable type {:?}",
531                                    f.name(),
532                                    f.data_type().codec()
533                                )));
534                            }
535                        }
536                    }
537                    AvroLiteral::Map(out)
538                }
539                _ => {
540                    return Err(ArrowError::SchemaError(
541                        "Default value for record/struct must be a JSON object".to_string(),
542                    ));
543                }
544            },
545            Codec::Union(encodings, _, _) => {
546                let Some(default_encoding) = encodings.first() else {
547                    return Err(ArrowError::SchemaError(
548                        "Union with no branches cannot have a default".to_string(),
549                    ));
550                };
551                default_encoding.parse_default_literal(default_json)?
552            }
553            #[cfg(feature = "avro_custom_types")]
554            Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
555        };
556        Ok(lit)
557    }
558
559    fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
560        let json_text = serde_json::to_string(default_json).map_err(|e| {
561            ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
562        })?;
563        self.metadata
564            .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
565        Ok(())
566    }
567
568    fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
569        let lit = self.parse_default_literal(default_json)?;
570        self.store_default(default_json)?;
571        Ok(lit)
572    }
573}
574
575/// A named [`AvroDataType`]
576#[derive(Debug, Clone, PartialEq)]
577pub(crate) struct AvroField {
578    name: String,
579    data_type: AvroDataType,
580}
581
582impl AvroField {
583    /// Returns the arrow [`Field`]
584    pub(crate) fn field(&self) -> Field {
585        self.data_type.field_with_name(&self.name)
586    }
587
588    /// Returns the [`AvroDataType`]
589    pub(crate) fn data_type(&self) -> &AvroDataType {
590        &self.data_type
591    }
592
593    /// Returns a new [`AvroField`] with Utf8View support enabled
594    ///
595    /// This will convert any Utf8 codecs to Utf8View codecs. This method is used to
596    /// enable potential performance optimizations in string-heavy workloads by using
597    /// Arrow's StringViewArray data structure.
598    ///
599    /// Returns a new `AvroField` with the same structure, but with string types
600    /// converted to use `Utf8View` instead of `Utf8`.
601    pub(crate) fn with_utf8view(&self) -> Self {
602        let mut field = self.clone();
603        if let Codec::Utf8 = field.data_type.codec {
604            field.data_type.codec = Codec::Utf8View;
605        }
606        field
607    }
608
609    /// Returns the name of this Avro field
610    ///
611    /// This is the field name as defined in the Avro schema.
612    /// It's used to identify fields within a record structure.
613    pub(crate) fn name(&self) -> &str {
614        &self.name
615    }
616}
617
618impl<'a> TryFrom<&Schema<'a>> for AvroField {
619    type Error = ArrowError;
620
621    fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
622        match schema {
623            Schema::Complex(ComplexType::Record(r)) => {
624                let mut resolver = Maker::new(false, false, Tz::default());
625                let data_type = resolver.make_data_type(schema, None, None)?;
626                Ok(AvroField {
627                    data_type,
628                    name: r.name.to_string(),
629                })
630            }
631            _ => Err(ArrowError::ParseError(format!(
632                "Expected record got {schema:?}"
633            ))),
634        }
635    }
636}
637
638/// Builder for an [`AvroField`]
639#[derive(Debug)]
640pub(crate) struct AvroFieldBuilder<'a> {
641    writer_schema: &'a Schema<'a>,
642    reader_schema: Option<&'a Schema<'a>>,
643    use_utf8view: bool,
644    strict_mode: bool,
645    tz: Tz,
646}
647
648impl<'a> AvroFieldBuilder<'a> {
649    /// Creates a new [`AvroFieldBuilder`] for a given writer schema.
650    pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
651        Self {
652            writer_schema,
653            reader_schema: None,
654            use_utf8view: false,
655            strict_mode: false,
656            tz: Tz::default(),
657        }
658    }
659
660    /// Sets the reader schema for schema resolution.
661    ///
662    /// If a reader schema is provided, the builder will produce a resolved `AvroField`
663    /// that can handle differences between the writer's and reader's schemas.
664    #[inline]
665    pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
666        self.reader_schema = Some(reader_schema);
667        self
668    }
669
670    /// Enable or disable Utf8View support
671    pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
672        self.use_utf8view = use_utf8view;
673        self
674    }
675
676    /// Enable or disable strict mode.
677    pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
678        self.strict_mode = strict_mode;
679        self
680    }
681
682    /// Sets the timezone representation for timestamps.
683    pub(crate) fn with_tz(mut self, tz: Tz) -> Self {
684        self.tz = tz;
685        self
686    }
687
688    /// Build an [`AvroField`] from the builder
689    pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
690        match self.writer_schema {
691            Schema::Complex(ComplexType::Record(r)) => {
692                let mut resolver = Maker::new(self.use_utf8view, self.strict_mode, self.tz);
693                let data_type =
694                    resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
695                Ok(AvroField {
696                    name: r.name.to_string(),
697                    data_type,
698                })
699            }
700            _ => Err(ArrowError::ParseError(format!(
701                "Expected a Record schema to build an AvroField, but got {:?}",
702                self.writer_schema
703            ))),
704        }
705    }
706}
707
708/// Timezone representation for timestamps.
709///
710/// Avro only distinguishes between UTC and local time (no timezone), but Arrow supports
711/// any of the two identifiers of the UTC timezone: "+00:00" and "UTC".
712/// The data types using these time zone IDs behave identically, but are not logically equal.
713#[derive(Debug, Copy, Clone, PartialEq, Default)]
714pub enum Tz {
715    /// Represent Avro `timestamp-*` logical types with "+00:00" timezone ID
716    #[default]
717    OffsetZero,
718    /// Represent Avro `timestamp-*` logical types with "UTC" timezone ID
719    Utc,
720}
721
722impl Tz {
723    /// Returns the string identifier for this timezone representation
724    pub fn as_str(&self) -> &'static str {
725        match self {
726            Self::OffsetZero => "+00:00",
727            Self::Utc => "UTC",
728        }
729    }
730}
731
732impl Display for Tz {
733    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
734        f.write_str(self.as_str())
735    }
736}
737
738/// An Avro encoding
739///
740/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
741#[derive(Debug, Clone, PartialEq)]
742pub(crate) enum Codec {
743    /// Represents Avro null type, maps to Arrow's Null data type
744    Null,
745    /// Represents Avro boolean type, maps to Arrow's Boolean data type
746    Boolean,
747    /// Represents Avro int type, maps to Arrow's Int32 data type
748    Int32,
749    /// Represents Avro long type, maps to Arrow's Int64 data type
750    Int64,
751    /// Represents Avro float type, maps to Arrow's Float32 data type
752    Float32,
753    /// Represents Avro double type, maps to Arrow's Float64 data type
754    Float64,
755    /// Represents Avro bytes type, maps to Arrow's Binary data type
756    Binary,
757    /// String data represented as UTF-8 encoded bytes, corresponding to Arrow's StringArray
758    Utf8,
759    /// String data represented as UTF-8 encoded bytes with an optimized view representation,
760    /// corresponding to Arrow's StringViewArray which provides better performance for string operations
761    ///
762    /// The Utf8View option can be enabled via `ReadOptions::use_utf8view`.
763    Utf8View,
764    /// Represents Avro date logical type, maps to Arrow's Date32 data type
765    Date32,
766    /// Represents Avro time-millis logical type, maps to Arrow's Time32(TimeUnit::Millisecond) data type
767    TimeMillis,
768    /// Represents Avro time-micros logical type, maps to Arrow's Time64(TimeUnit::Microsecond) data type
769    TimeMicros,
770    /// Represents Avro timestamp-millis or local-timestamp-millis logical type
771    ///
772    /// Maps to Arrow's Timestamp(TimeUnit::Millisecond) data type
773    /// The parameter indicates whether the timestamp has a UTC timezone (Some) or is local time (None)
774    TimestampMillis(Option<Tz>),
775    /// Represents Avro timestamp-micros or local-timestamp-micros logical type
776    ///
777    /// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
778    /// The parameter indicates whether the timestamp has a UTC timezone (Some) or is local time (None)
779    TimestampMicros(Option<Tz>),
780    /// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
781    ///
782    /// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
783    /// The parameter indicates whether the timestamp has a UTC timezone (Some) or is local time (None)
784    TimestampNanos(Option<Tz>),
785    /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
786    /// The i32 parameter indicates the fixed binary size
787    Fixed(i32),
788    /// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types
789    ///
790    /// The fields are `(precision, scale, fixed_size)`.
791    /// - `precision` (`usize`): Total number of digits.
792    /// - `scale` (`Option<usize>`): Number of fractional digits.
793    /// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
794    Decimal(usize, Option<usize>, Option<usize>),
795    /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16.
796    Uuid,
797    /// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type.
798    ///
799    /// The enclosed value contains the enum's symbols.
800    Enum(Arc<[String]>),
801    /// Represents Avro array type, maps to Arrow's List data type
802    List(Arc<AvroDataType>),
803    /// Represents Avro record type, maps to Arrow's Struct data type
804    Struct(Arc<[AvroField]>),
805    /// Represents Avro map type, maps to Arrow's Map data type
806    Map(Arc<AvroDataType>),
807    /// Represents Avro duration logical type, maps to Arrow's Interval(IntervalUnit::MonthDayNano) data type
808    Interval,
809    /// Represents Avro union type, maps to Arrow's Union data type
810    Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
811    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Nanosecond)
812    #[cfg(feature = "avro_custom_types")]
813    DurationNanos,
814    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Microsecond)
815    #[cfg(feature = "avro_custom_types")]
816    DurationMicros,
817    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Millisecond)
818    #[cfg(feature = "avro_custom_types")]
819    DurationMillis,
820    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Second)
821    #[cfg(feature = "avro_custom_types")]
822    DurationSeconds,
823    #[cfg(feature = "avro_custom_types")]
824    RunEndEncoded(Arc<AvroDataType>, u8),
825    /// Arrow Int8 custom logical type (arrow.int8)
826    #[cfg(feature = "avro_custom_types")]
827    Int8,
828    /// Arrow Int16 custom logical type (arrow.int16)
829    #[cfg(feature = "avro_custom_types")]
830    Int16,
831    /// Arrow UInt8 custom logical type (arrow.uint8)
832    #[cfg(feature = "avro_custom_types")]
833    UInt8,
834    /// Arrow UInt16 custom logical type (arrow.uint16)
835    #[cfg(feature = "avro_custom_types")]
836    UInt16,
837    /// Arrow UInt32 custom logical type (arrow.uint32)
838    #[cfg(feature = "avro_custom_types")]
839    UInt32,
840    /// Arrow UInt64 custom logical type (arrow.uint64) - stored as fixed(8)
841    #[cfg(feature = "avro_custom_types")]
842    UInt64,
843    /// Arrow Float16 custom logical type (arrow.float16) - stored as fixed(2)
844    #[cfg(feature = "avro_custom_types")]
845    Float16,
846    /// Arrow Date64 custom logical type (arrow.date64)
847    #[cfg(feature = "avro_custom_types")]
848    Date64,
849    /// Arrow Time64(Nanosecond) custom logical type (arrow.time64-nanosecond)
850    #[cfg(feature = "avro_custom_types")]
851    TimeNanos,
852    /// Arrow Time32(Second) custom logical type (arrow.time32-second)
853    #[cfg(feature = "avro_custom_types")]
854    Time32Secs,
855    /// Arrow Timestamp(Second) custom logical type (arrow.timestamp-second)
856    /// The bool indicates UTC (true) or local (false)
857    #[cfg(feature = "avro_custom_types")]
858    TimestampSecs(bool),
859    /// Arrow Interval(YearMonth) custom logical type (arrow.interval-year-month)
860    #[cfg(feature = "avro_custom_types")]
861    IntervalYearMonth,
862    /// Arrow Interval(MonthDayNano) custom logical type (arrow.interval-month-day-nano)
863    #[cfg(feature = "avro_custom_types")]
864    IntervalMonthDayNano,
865    /// Arrow Interval(DayTime) custom logical type (arrow.interval-day-time)
866    #[cfg(feature = "avro_custom_types")]
867    IntervalDayTime,
868}
869
870impl Codec {
871    fn data_type(&self) -> DataType {
872        match self {
873            Self::Null => DataType::Null,
874            Self::Boolean => DataType::Boolean,
875            Self::Int32 => DataType::Int32,
876            Self::Int64 => DataType::Int64,
877            Self::Float32 => DataType::Float32,
878            Self::Float64 => DataType::Float64,
879            Self::Binary => DataType::Binary,
880            Self::Utf8 => DataType::Utf8,
881            Self::Utf8View => DataType::Utf8View,
882            Self::Date32 => DataType::Date32,
883            Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
884            Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
885            Self::TimestampMillis(tz) => DataType::Timestamp(
886                TimeUnit::Millisecond,
887                tz.as_ref().map(|tz| tz.as_str().into()),
888            ),
889            Self::TimestampMicros(tz) => DataType::Timestamp(
890                TimeUnit::Microsecond,
891                tz.as_ref().map(|tz| tz.as_str().into()),
892            ),
893            Self::TimestampNanos(tz) => DataType::Timestamp(
894                TimeUnit::Nanosecond,
895                tz.as_ref().map(|tz| tz.as_str().into()),
896            ),
897            Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
898            Self::Fixed(size) => DataType::FixedSizeBinary(*size),
899            Self::Decimal(precision, scale, _size) => {
900                let p = *precision as u8;
901                let s = scale.unwrap_or(0) as i8;
902                #[cfg(feature = "small_decimals")]
903                {
904                    if *precision <= DECIMAL32_MAX_PRECISION as usize {
905                        DataType::Decimal32(p, s)
906                    } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
907                        DataType::Decimal64(p, s)
908                    } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
909                        DataType::Decimal128(p, s)
910                    } else {
911                        DataType::Decimal256(p, s)
912                    }
913                }
914                #[cfg(not(feature = "small_decimals"))]
915                {
916                    if *precision <= DECIMAL128_MAX_PRECISION as usize {
917                        DataType::Decimal128(p, s)
918                    } else {
919                        DataType::Decimal256(p, s)
920                    }
921                }
922            }
923            Self::Uuid => DataType::FixedSizeBinary(16),
924            Self::Enum(_) => {
925                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
926            }
927            Self::List(f) => {
928                DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
929            }
930            Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
931            Self::Map(value_type) => {
932                let val_field = value_type.field_with_name("value");
933                DataType::Map(
934                    Arc::new(Field::new(
935                        "entries",
936                        DataType::Struct(Fields::from(vec![
937                            Field::new("key", DataType::Utf8, false),
938                            val_field,
939                        ])),
940                        false,
941                    )),
942                    false,
943                )
944            }
945            Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
946            #[cfg(feature = "avro_custom_types")]
947            Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
948            #[cfg(feature = "avro_custom_types")]
949            Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
950            #[cfg(feature = "avro_custom_types")]
951            Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
952            #[cfg(feature = "avro_custom_types")]
953            Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
954            #[cfg(feature = "avro_custom_types")]
955            Self::RunEndEncoded(values, bits) => {
956                let run_ends_dt = match *bits {
957                    16 => DataType::Int16,
958                    32 => DataType::Int32,
959                    64 => DataType::Int64,
960                    _ => unreachable!(),
961                };
962                DataType::RunEndEncoded(
963                    Arc::new(Field::new("run_ends", run_ends_dt, false)),
964                    Arc::new(Field::new("values", values.codec().data_type(), true)),
965                )
966            }
967            #[cfg(feature = "avro_custom_types")]
968            Self::Int8 => DataType::Int8,
969            #[cfg(feature = "avro_custom_types")]
970            Self::Int16 => DataType::Int16,
971            #[cfg(feature = "avro_custom_types")]
972            Self::UInt8 => DataType::UInt8,
973            #[cfg(feature = "avro_custom_types")]
974            Self::UInt16 => DataType::UInt16,
975            #[cfg(feature = "avro_custom_types")]
976            Self::UInt32 => DataType::UInt32,
977            #[cfg(feature = "avro_custom_types")]
978            Self::UInt64 => DataType::UInt64,
979            #[cfg(feature = "avro_custom_types")]
980            Self::Float16 => DataType::Float16,
981            #[cfg(feature = "avro_custom_types")]
982            Self::Date64 => DataType::Date64,
983            #[cfg(feature = "avro_custom_types")]
984            Self::TimeNanos => DataType::Time64(TimeUnit::Nanosecond),
985            #[cfg(feature = "avro_custom_types")]
986            Self::Time32Secs => DataType::Time32(TimeUnit::Second),
987            #[cfg(feature = "avro_custom_types")]
988            Self::TimestampSecs(is_utc) => {
989                DataType::Timestamp(TimeUnit::Second, is_utc.then(|| "+00:00".into()))
990            }
991            #[cfg(feature = "avro_custom_types")]
992            Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
993            #[cfg(feature = "avro_custom_types")]
994            Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
995            #[cfg(feature = "avro_custom_types")]
996            Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
997        }
998    }
999
1000    /// Converts a string codec to use Utf8View if requested
1001    ///
1002    /// The conversion only happens if both:
1003    /// 1. `use_utf8view` is true
1004    /// 2. The codec is currently `Utf8`
1005    pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
1006        if use_utf8view && matches!(self, Self::Utf8) {
1007            Self::Utf8View
1008        } else {
1009            self
1010        }
1011    }
1012
1013    #[inline]
1014    fn union_field_name(&self) -> String {
1015        UnionFieldKind::from(self).as_ref().to_owned()
1016    }
1017}
1018
1019impl From<PrimitiveType> for Codec {
1020    fn from(value: PrimitiveType) -> Self {
1021        match value {
1022            PrimitiveType::Null => Self::Null,
1023            PrimitiveType::Boolean => Self::Boolean,
1024            PrimitiveType::Int => Self::Int32,
1025            PrimitiveType::Long => Self::Int64,
1026            PrimitiveType::Float => Self::Float32,
1027            PrimitiveType::Double => Self::Float64,
1028            PrimitiveType::Bytes => Self::Binary,
1029            PrimitiveType::String => Self::Utf8,
1030        }
1031    }
1032}
1033
1034/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro
1035/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian).
1036///
1037/// Per Avro spec (Decimal logical type), for a fixed length `n`:
1038/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋.
1039///
1040/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports
1041/// Decimal256, which is 32 bytes and has max precision 76).
1042const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
1043    // Precomputed exact table for n = 1..=32
1044    // 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28,
1045    // 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52,
1046    // 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76
1047    const MAX_P: [usize; 32] = [
1048        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1049        59, 62, 64, 67, 69, 71, 74, 76,
1050    ];
1051    match n {
1052        1..=32 => Some(MAX_P[n - 1]),
1053        _ => None,
1054    }
1055}
1056
1057fn parse_decimal_attributes(
1058    attributes: &Attributes,
1059    fallback_size: Option<usize>,
1060    precision_required: bool,
1061) -> Result<(usize, usize, Option<usize>), ArrowError> {
1062    let precision = attributes
1063        .additional
1064        .get("precision")
1065        .and_then(|v| v.as_u64())
1066        .or(if precision_required { None } else { Some(10) })
1067        .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
1068        as usize;
1069    let scale = attributes
1070        .additional
1071        .get("scale")
1072        .and_then(|v| v.as_u64())
1073        .unwrap_or(0) as usize;
1074    let size = attributes
1075        .additional
1076        .get("size")
1077        .and_then(|v| v.as_u64())
1078        .map(|s| s as usize)
1079        .or(fallback_size);
1080    if precision == 0 {
1081        return Err(ArrowError::ParseError(
1082            "Decimal requires precision > 0".to_string(),
1083        ));
1084    }
1085    if scale > precision {
1086        return Err(ArrowError::ParseError(format!(
1087            "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
1088        )));
1089    }
1090    if precision > DECIMAL256_MAX_PRECISION as usize {
1091        return Err(ArrowError::ParseError(format!(
1092            "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
1093            DECIMAL256_MAX_PRECISION
1094        )));
1095    }
1096    if let Some(sz) = size {
1097        let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
1098            ArrowError::ParseError(format!(
1099                "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
1100            ))
1101        })?;
1102        if precision > max_p {
1103            return Err(ArrowError::ParseError(format!(
1104                "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
1105            )));
1106        }
1107    }
1108    Ok((precision, scale, size))
1109}
1110
1111#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
1112#[strum(serialize_all = "snake_case")]
1113enum UnionFieldKind {
1114    Null,
1115    Boolean,
1116    Int,
1117    Long,
1118    Float,
1119    Double,
1120    Bytes,
1121    String,
1122    Date,
1123    TimeMillis,
1124    TimeMicros,
1125    TimestampMillisUtc,
1126    TimestampMillisLocal,
1127    TimestampMicrosUtc,
1128    TimestampMicrosLocal,
1129    TimestampNanosUtc,
1130    TimestampNanosLocal,
1131    Duration,
1132    Fixed,
1133    Decimal,
1134    Enum,
1135    Array,
1136    Record,
1137    Map,
1138    Uuid,
1139    Union,
1140}
1141
1142impl From<&Codec> for UnionFieldKind {
1143    fn from(c: &Codec) -> Self {
1144        match c {
1145            Codec::Null => Self::Null,
1146            Codec::Boolean => Self::Boolean,
1147            Codec::Int32 => Self::Int,
1148            Codec::Int64 => Self::Long,
1149            Codec::Float32 => Self::Float,
1150            Codec::Float64 => Self::Double,
1151            Codec::Binary => Self::Bytes,
1152            Codec::Utf8 | Codec::Utf8View => Self::String,
1153            Codec::Date32 => Self::Date,
1154            Codec::TimeMillis => Self::TimeMillis,
1155            Codec::TimeMicros => Self::TimeMicros,
1156            Codec::TimestampMillis(Some(Tz::OffsetZero)) => Self::TimestampMillisUtc,
1157            Codec::TimestampMillis(Some(Tz::Utc)) => Self::TimestampMillisUtc,
1158            Codec::TimestampMillis(None) => Self::TimestampMillisLocal,
1159            Codec::TimestampMicros(Some(Tz::OffsetZero)) => Self::TimestampMicrosUtc,
1160            Codec::TimestampMicros(Some(Tz::Utc)) => Self::TimestampMicrosUtc,
1161            Codec::TimestampMicros(None) => Self::TimestampMicrosLocal,
1162            Codec::TimestampNanos(Some(Tz::OffsetZero)) => Self::TimestampNanosUtc,
1163            Codec::TimestampNanos(Some(Tz::Utc)) => Self::TimestampNanosUtc,
1164            Codec::TimestampNanos(None) => Self::TimestampNanosLocal,
1165            Codec::Interval => Self::Duration,
1166            Codec::Fixed(_) => Self::Fixed,
1167            Codec::Decimal(..) => Self::Decimal,
1168            Codec::Enum(_) => Self::Enum,
1169            Codec::List(_) => Self::Array,
1170            Codec::Struct(_) => Self::Record,
1171            Codec::Map(_) => Self::Map,
1172            Codec::Uuid => Self::Uuid,
1173            Codec::Union(..) => Self::Union,
1174            #[cfg(feature = "avro_custom_types")]
1175            Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
1176            #[cfg(feature = "avro_custom_types")]
1177            Codec::DurationNanos
1178            | Codec::DurationMicros
1179            | Codec::DurationMillis
1180            | Codec::DurationSeconds => Self::Duration,
1181            #[cfg(feature = "avro_custom_types")]
1182            Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 => Self::Int,
1183            #[cfg(feature = "avro_custom_types")]
1184            Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
1185                Self::Long
1186            }
1187            #[cfg(feature = "avro_custom_types")]
1188            Codec::Time32Secs => Self::TimeMillis, // Closest standard type
1189            #[cfg(feature = "avro_custom_types")]
1190            Codec::UInt64
1191            | Codec::Float16
1192            | Codec::IntervalYearMonth
1193            | Codec::IntervalMonthDayNano
1194            | Codec::IntervalDayTime => Self::Fixed,
1195        }
1196    }
1197}
1198
1199fn union_branch_name(dt: &AvroDataType) -> String {
1200    if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
1201        if name.contains(".") {
1202            // Full name
1203            return name.to_string();
1204        }
1205        if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1206            return format!("{ns}.{name}");
1207        }
1208        return name.to_string();
1209    }
1210    dt.codec.union_field_name()
1211}
1212
1213fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
1214    let arrow_fields: Vec<Field> = encodings
1215        .iter()
1216        .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1217        .collect();
1218    let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1219    UnionFields::try_new(type_ids, arrow_fields)
1220}
1221
1222/// Resolves Avro type names to [`AvroDataType`]
1223///
1224/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1225#[derive(Debug, Default)]
1226struct Resolver<'a> {
1227    map: HashMap<(&'a str, &'a str), AvroDataType>,
1228}
1229
1230impl<'a> Resolver<'a> {
1231    fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1232        self.map.insert((namespace.unwrap_or(""), name), schema);
1233    }
1234
1235    fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1236        let (namespace, name) = name
1237            .rsplit_once('.')
1238            .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1239        self.map
1240            .get(&(namespace, name))
1241            .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1242            .cloned()
1243    }
1244}
1245
1246fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1247    let mut out = HashSet::with_capacity(1 + aliases.len());
1248    let (full, _) = make_full_name(name, ns, None);
1249    out.insert(full);
1250    for a in aliases {
1251        let (fa, _) = make_full_name(a, None, ns);
1252        out.insert(fa);
1253    }
1254    out
1255}
1256
1257fn names_match(
1258    writer_name: &str,
1259    writer_namespace: Option<&str>,
1260    writer_aliases: &[&str],
1261    reader_name: &str,
1262    reader_namespace: Option<&str>,
1263    reader_aliases: &[&str],
1264) -> bool {
1265    let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1266    let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1267    // If the canonical full names match, or any alias matches cross-wise.
1268    !writer_set.is_disjoint(&reader_set)
1269}
1270
1271fn ensure_names_match(
1272    data_type: &str,
1273    writer_name: &str,
1274    writer_namespace: Option<&str>,
1275    writer_aliases: &[&str],
1276    reader_name: &str,
1277    reader_namespace: Option<&str>,
1278    reader_aliases: &[&str],
1279) -> Result<(), ArrowError> {
1280    if names_match(
1281        writer_name,
1282        writer_namespace,
1283        writer_aliases,
1284        reader_name,
1285        reader_namespace,
1286        reader_aliases,
1287    ) {
1288        Ok(())
1289    } else {
1290        Err(ArrowError::ParseError(format!(
1291            "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1292        )))
1293    }
1294}
1295
1296fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1297    match schema {
1298        Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1299        Schema::Type(Type {
1300            r#type: TypeName::Primitive(primitive),
1301            ..
1302        }) => Some(*primitive),
1303        _ => None,
1304    }
1305}
1306
1307fn nullable_union_variants<'x, 'y>(
1308    variant: &'y [Schema<'x>],
1309) -> Option<(Nullability, &'y Schema<'x>)> {
1310    if variant.len() != 2 {
1311        return None;
1312    }
1313    let is_null = |schema: &Schema<'x>| {
1314        matches!(
1315            schema,
1316            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1317        )
1318    };
1319    match (is_null(&variant[0]), is_null(&variant[1])) {
1320        (true, false) => Some((Nullability::NullFirst, &variant[1])),
1321        (false, true) => Some((Nullability::NullSecond, &variant[0])),
1322        _ => None,
1323    }
1324}
1325
1326#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1327enum UnionBranchKey {
1328    Named(String),
1329    Primitive(PrimitiveType),
1330    Array,
1331    Map,
1332}
1333
1334fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1335    let (name, namespace) = match s {
1336        Schema::TypeName(TypeName::Primitive(p))
1337        | Schema::Type(Type {
1338            r#type: TypeName::Primitive(p),
1339            ..
1340        }) => return Some(UnionBranchKey::Primitive(*p)),
1341        Schema::TypeName(TypeName::Ref(name))
1342        | Schema::Type(Type {
1343            r#type: TypeName::Ref(name),
1344            ..
1345        }) => (name, None),
1346        Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1347        Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1348        Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1349        Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1350        Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1351        Schema::Union(_) => return None,
1352    };
1353    let (full, _) = make_full_name(name, namespace, enclosing_ns);
1354    Some(UnionBranchKey::Named(full))
1355}
1356
1357fn union_first_duplicate<'a>(
1358    branches: &'a [Schema<'a>],
1359    enclosing_ns: Option<&'a str>,
1360) -> Option<String> {
1361    let mut seen = HashSet::with_capacity(branches.len());
1362    for schema in branches {
1363        if let Some(key) = branch_key_of(schema, enclosing_ns) {
1364            if !seen.insert(key.clone()) {
1365                let msg = match key {
1366                    UnionBranchKey::Named(full) => format!("named type {full}"),
1367                    UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1368                    UnionBranchKey::Array => "array".to_string(),
1369                    UnionBranchKey::Map => "map".to_string(),
1370                };
1371                return Some(msg);
1372            }
1373        }
1374    }
1375    None
1376}
1377
1378/// Resolves Avro type names to [`AvroDataType`]
1379///
1380/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1381struct Maker<'a> {
1382    resolver: Resolver<'a>,
1383    use_utf8view: bool,
1384    strict_mode: bool,
1385    tz: Tz,
1386}
1387
1388impl<'a> Maker<'a> {
1389    fn new(use_utf8view: bool, strict_mode: bool, tz: Tz) -> Self {
1390        Self {
1391            resolver: Default::default(),
1392            use_utf8view,
1393            strict_mode,
1394            tz,
1395        }
1396    }
1397
1398    #[cfg(feature = "avro_custom_types")]
1399    #[inline]
1400    fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1401        if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1402            let mut inner = (*values).clone();
1403            inner.nullability = Some(nb);
1404            dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1405        }
1406    }
1407
1408    fn make_data_type<'s>(
1409        &mut self,
1410        writer_schema: &'s Schema<'a>,
1411        reader_schema: Option<&'s Schema<'a>>,
1412        namespace: Option<&'a str>,
1413    ) -> Result<AvroDataType, ArrowError> {
1414        match reader_schema {
1415            Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1416            None => self.parse_type(writer_schema, namespace),
1417        }
1418    }
1419
1420    /// Parses a [`AvroDataType`] from the provided `Schema` and the given `name` and `namespace`
1421    ///
1422    /// `name`: is the name used to refer to `schema` in its parent
1423    /// `namespace`: an optional qualifier used as part of a type hierarchy
1424    /// If the data type is a string, convert to use Utf8View if requested
1425    ///
1426    /// This function is used during the schema conversion process to determine whether
1427    /// string data should be represented as StringArray (default) or StringViewArray.
1428    ///
1429    /// `use_utf8view`: if true, use Utf8View instead of Utf8 for string types
1430    ///
1431    /// See [`Resolver`] for more information
1432    fn parse_type<'s>(
1433        &mut self,
1434        schema: &'s Schema<'a>,
1435        namespace: Option<&'a str>,
1436    ) -> Result<AvroDataType, ArrowError> {
1437        match schema {
1438            Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1439                Codec::from(*p).with_utf8view(self.use_utf8view),
1440                Default::default(),
1441                None,
1442            )),
1443            Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1444            Schema::Union(f) => {
1445                let null = f
1446                    .iter()
1447                    .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1448                match (f.len() == 2, null) {
1449                    (true, Some(0)) => {
1450                        let mut field = self.parse_type(&f[1], namespace)?;
1451                        field.nullability = Some(Nullability::NullFirst);
1452                        #[cfg(feature = "avro_custom_types")]
1453                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1454                        return Ok(field);
1455                    }
1456                    (true, Some(1)) => {
1457                        if self.strict_mode {
1458                            return Err(ArrowError::SchemaError(
1459                                "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1460                                    .to_string(),
1461                            ));
1462                        }
1463                        let mut field = self.parse_type(&f[0], namespace)?;
1464                        field.nullability = Some(Nullability::NullSecond);
1465                        #[cfg(feature = "avro_custom_types")]
1466                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1467                        return Ok(field);
1468                    }
1469                    _ => {}
1470                }
1471                // Validate: unions may not immediately contain unions
1472                if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1473                    return Err(ArrowError::SchemaError(
1474                        "Avro unions may not immediately contain other unions".to_string(),
1475                    ));
1476                }
1477                // Validate: duplicates (named by full name; non-named by kind)
1478                if let Some(dup) = union_first_duplicate(f, namespace) {
1479                    return Err(ArrowError::SchemaError(format!(
1480                        "Avro union contains duplicate branch type: {dup}"
1481                    )));
1482                }
1483                // Parse all branches
1484                let children: Vec<AvroDataType> = f
1485                    .iter()
1486                    .map(|s| self.parse_type(s, namespace))
1487                    .collect::<Result<_, _>>()?;
1488                // Build Arrow layout once here
1489                let union_fields = build_union_fields(&children)?;
1490                Ok(AvroDataType::new(
1491                    Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1492                    Default::default(),
1493                    None,
1494                ))
1495            }
1496            Schema::Complex(c) => match c {
1497                ComplexType::Record(r) => {
1498                    let namespace = r.namespace.or(namespace);
1499                    let mut metadata = r.attributes.field_metadata();
1500                    let fields = r
1501                        .fields
1502                        .iter()
1503                        .map(|field| {
1504                            Ok(AvroField {
1505                                name: field.name.to_string(),
1506                                data_type: self.parse_type(&field.r#type, namespace)?,
1507                            })
1508                        })
1509                        .collect::<Result<_, ArrowError>>()?;
1510                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1511                    if let Some(ns) = namespace {
1512                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1513                    }
1514                    let field = AvroDataType {
1515                        nullability: None,
1516                        codec: Codec::Struct(fields),
1517                        metadata,
1518                        resolution: None,
1519                    };
1520                    self.resolver.register(r.name, namespace, field.clone());
1521                    Ok(field)
1522                }
1523                ComplexType::Array(a) => {
1524                    let field = self.parse_type(a.items.as_ref(), namespace)?;
1525                    Ok(AvroDataType {
1526                        nullability: None,
1527                        metadata: a.attributes.field_metadata(),
1528                        codec: Codec::List(Arc::new(field)),
1529                        resolution: None,
1530                    })
1531                }
1532                ComplexType::Fixed(f) => {
1533                    let size = f.size.try_into().map_err(|e| {
1534                        ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1535                    })?;
1536                    let namespace = f.namespace.or(namespace);
1537                    let mut metadata = f.attributes.field_metadata();
1538                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1539                    if let Some(ns) = namespace {
1540                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1541                    }
1542                    let field = match f.attributes.logical_type {
1543                        Some("decimal") => {
1544                            let (precision, scale, _) =
1545                                parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1546                            AvroDataType {
1547                                nullability: None,
1548                                metadata,
1549                                codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1550                                resolution: None,
1551                            }
1552                        }
1553                        Some("duration") => {
1554                            if size != 12 {
1555                                return Err(ArrowError::ParseError(format!(
1556                                    "Invalid fixed size for Duration: {size}, must be 12"
1557                                )));
1558                            };
1559                            AvroDataType {
1560                                nullability: None,
1561                                metadata,
1562                                codec: Codec::Interval,
1563                                resolution: None,
1564                            }
1565                        }
1566                        #[cfg(feature = "avro_custom_types")]
1567                        Some("arrow.uint64") if size == 8 => AvroDataType {
1568                            nullability: None,
1569                            metadata,
1570                            codec: Codec::UInt64,
1571                            resolution: None,
1572                        },
1573                        #[cfg(feature = "avro_custom_types")]
1574                        Some("arrow.float16") if size == 2 => AvroDataType {
1575                            nullability: None,
1576                            metadata,
1577                            codec: Codec::Float16,
1578                            resolution: None,
1579                        },
1580                        #[cfg(feature = "avro_custom_types")]
1581                        Some("arrow.interval-year-month") if size == 4 => AvroDataType {
1582                            nullability: None,
1583                            metadata,
1584                            codec: Codec::IntervalYearMonth,
1585                            resolution: None,
1586                        },
1587                        #[cfg(feature = "avro_custom_types")]
1588                        Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1589                            nullability: None,
1590                            metadata,
1591                            codec: Codec::IntervalMonthDayNano,
1592                            resolution: None,
1593                        },
1594                        #[cfg(feature = "avro_custom_types")]
1595                        Some("arrow.interval-day-time") if size == 8 => AvroDataType {
1596                            nullability: None,
1597                            metadata,
1598                            codec: Codec::IntervalDayTime,
1599                            resolution: None,
1600                        },
1601                        _ => AvroDataType {
1602                            nullability: None,
1603                            metadata,
1604                            codec: Codec::Fixed(size),
1605                            resolution: None,
1606                        },
1607                    };
1608                    self.resolver.register(f.name, namespace, field.clone());
1609                    Ok(field)
1610                }
1611                ComplexType::Enum(e) => {
1612                    let namespace = e.namespace.or(namespace);
1613                    let symbols = e
1614                        .symbols
1615                        .iter()
1616                        .map(|s| s.to_string())
1617                        .collect::<Arc<[String]>>();
1618                    let mut metadata = e.attributes.field_metadata();
1619                    let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1620                        ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1621                    })?;
1622                    metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1623                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1624                    if let Some(ns) = namespace {
1625                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1626                    }
1627                    let field = AvroDataType {
1628                        nullability: None,
1629                        metadata,
1630                        codec: Codec::Enum(symbols),
1631                        resolution: None,
1632                    };
1633                    self.resolver.register(e.name, namespace, field.clone());
1634                    Ok(field)
1635                }
1636                ComplexType::Map(m) => {
1637                    let val = self.parse_type(&m.values, namespace)?;
1638                    Ok(AvroDataType {
1639                        nullability: None,
1640                        metadata: m.attributes.field_metadata(),
1641                        codec: Codec::Map(Arc::new(val)),
1642                        resolution: None,
1643                    })
1644                }
1645            },
1646            Schema::Type(t) => {
1647                let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1648                // https://avro.apache.org/docs/1.11.1/specification/#logical-types
1649                match (t.attributes.logical_type, &mut field.codec) {
1650                    (Some("decimal"), c @ Codec::Binary) => {
1651                        let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1652                        *c = Codec::Decimal(prec, Some(sc), None);
1653                    }
1654                    (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1655                    (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1656                    (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1657                    (Some("timestamp-millis"), c @ Codec::Int64) => {
1658                        *c = Codec::TimestampMillis(Some(self.tz))
1659                    }
1660                    (Some("timestamp-micros"), c @ Codec::Int64) => {
1661                        *c = Codec::TimestampMicros(Some(self.tz))
1662                    }
1663                    (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1664                        *c = Codec::TimestampMillis(None)
1665                    }
1666                    (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1667                        *c = Codec::TimestampMicros(None)
1668                    }
1669                    (Some("timestamp-nanos"), c @ Codec::Int64) => {
1670                        *c = Codec::TimestampNanos(Some(self.tz))
1671                    }
1672                    (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1673                        *c = Codec::TimestampNanos(None)
1674                    }
1675                    (Some("uuid"), c @ Codec::Utf8) => {
1676                        // Map Avro string+logicalType=uuid into the UUID Codec,
1677                        // and preserve the logicalType in Arrow field metadata
1678                        // so writers can round-trip it correctly.
1679                        *c = Codec::Uuid;
1680                        field.metadata.insert("logicalType".into(), "uuid".into());
1681                    }
1682                    #[cfg(feature = "avro_custom_types")]
1683                    (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1684                    #[cfg(feature = "avro_custom_types")]
1685                    (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1686                    #[cfg(feature = "avro_custom_types")]
1687                    (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1688                    #[cfg(feature = "avro_custom_types")]
1689                    (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1690                        *c = Codec::DurationSeconds
1691                    }
1692                    #[cfg(feature = "avro_custom_types")]
1693                    (Some("arrow.run-end-encoded"), _) => {
1694                        let bits_u8: u8 = t
1695                            .attributes
1696                            .additional
1697                            .get("arrow.runEndIndexBits")
1698                            .and_then(|v| v.as_u64())
1699                            .and_then(|n| u8::try_from(n).ok())
1700                            .ok_or_else(|| ArrowError::ParseError(
1701                                "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1702                                    .to_string(),
1703                            ))?;
1704                        if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1705                            return Err(ArrowError::ParseError(format!(
1706                                "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1707                            )));
1708                        }
1709                        // Wrap the parsed underlying site as REE
1710                        let values_site = field.clone();
1711                        field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1712                    }
1713                    // Arrow-specific integer width types
1714                    #[cfg(feature = "avro_custom_types")]
1715                    (Some("arrow.int8"), c @ Codec::Int32) => *c = Codec::Int8,
1716                    #[cfg(feature = "avro_custom_types")]
1717                    (Some("arrow.int16"), c @ Codec::Int32) => *c = Codec::Int16,
1718                    #[cfg(feature = "avro_custom_types")]
1719                    (Some("arrow.uint8"), c @ Codec::Int32) => *c = Codec::UInt8,
1720                    #[cfg(feature = "avro_custom_types")]
1721                    (Some("arrow.uint16"), c @ Codec::Int32) => *c = Codec::UInt16,
1722                    #[cfg(feature = "avro_custom_types")]
1723                    (Some("arrow.uint32"), c @ Codec::Int64) => *c = Codec::UInt32,
1724                    #[cfg(feature = "avro_custom_types")]
1725                    (Some("arrow.uint64"), c @ Codec::Fixed(8)) => *c = Codec::UInt64,
1726                    // Arrow Float16 stored as fixed(2)
1727                    #[cfg(feature = "avro_custom_types")]
1728                    (Some("arrow.float16"), c @ Codec::Fixed(2)) => *c = Codec::Float16,
1729                    // Arrow Date64 custom type
1730                    #[cfg(feature = "avro_custom_types")]
1731                    (Some("arrow.date64"), c @ Codec::Int64) => *c = Codec::Date64,
1732                    // Arrow time/timestamp types with second precision
1733                    #[cfg(feature = "avro_custom_types")]
1734                    (Some("arrow.time64-nanosecond"), c @ Codec::Int64) => *c = Codec::TimeNanos,
1735                    #[cfg(feature = "avro_custom_types")]
1736                    (Some("arrow.time32-second"), c @ Codec::Int32) => *c = Codec::Time32Secs,
1737                    #[cfg(feature = "avro_custom_types")]
1738                    (Some("arrow.timestamp-second"), c @ Codec::Int64) => {
1739                        *c = Codec::TimestampSecs(true)
1740                    }
1741                    #[cfg(feature = "avro_custom_types")]
1742                    (Some("arrow.local-timestamp-second"), c @ Codec::Int64) => {
1743                        *c = Codec::TimestampSecs(false)
1744                    }
1745                    // Arrow interval types
1746                    #[cfg(feature = "avro_custom_types")]
1747                    (Some("arrow.interval-year-month"), c @ Codec::Fixed(4)) => {
1748                        *c = Codec::IntervalYearMonth
1749                    }
1750                    #[cfg(feature = "avro_custom_types")]
1751                    (Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1752                        *c = Codec::IntervalMonthDayNano
1753                    }
1754                    #[cfg(feature = "avro_custom_types")]
1755                    (Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
1756                        *c = Codec::IntervalDayTime
1757                    }
1758                    (Some(logical), _) => {
1759                        // Insert unrecognized logical type into metadata map
1760                        field.metadata.insert("logicalType".into(), logical.into());
1761                    }
1762                    (None, _) => {}
1763                }
1764                if matches!(field.codec, Codec::Int64) {
1765                    if let Some(unit) = t
1766                        .attributes
1767                        .additional
1768                        .get("arrowTimeUnit")
1769                        .and_then(|v| v.as_str())
1770                    {
1771                        if unit == "nanosecond" {
1772                            field.codec = Codec::TimestampNanos(Some(self.tz));
1773                        }
1774                    }
1775                }
1776                if !t.attributes.additional.is_empty() {
1777                    for (k, v) in &t.attributes.additional {
1778                        field.metadata.insert(k.to_string(), v.to_string());
1779                    }
1780                }
1781                Ok(field)
1782            }
1783        }
1784    }
1785
1786    fn resolve_type<'s>(
1787        &mut self,
1788        writer_schema: &'s Schema<'a>,
1789        reader_schema: &'s Schema<'a>,
1790        namespace: Option<&'a str>,
1791    ) -> Result<AvroDataType, ArrowError> {
1792        if let (Some(write_primitive), Some(read_primitive)) =
1793            (primitive_of(writer_schema), primitive_of(reader_schema))
1794        {
1795            return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1796        }
1797        match (writer_schema, reader_schema) {
1798            (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1799                let writer_variants = writer_variants.as_slice();
1800                let reader_variants = reader_variants.as_slice();
1801                match (
1802                    nullable_union_variants(writer_variants),
1803                    nullable_union_variants(reader_variants),
1804                ) {
1805                    (Some((w_nb, w_nonnull)), Some((r_nb, r_nonnull))) => {
1806                        let mut dt = self.resolve_type(w_nonnull, r_nonnull, namespace)?;
1807                        let mut writer_to_reader = vec![None, None];
1808                        writer_to_reader[w_nb.non_null_index()] = Some((
1809                            r_nb.non_null_index(),
1810                            dt.resolution
1811                                .take()
1812                                .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct)),
1813                        ));
1814                        dt.nullability = Some(w_nb);
1815                        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1816                            writer_to_reader: Arc::from(writer_to_reader),
1817                            writer_is_union: true,
1818                            reader_is_union: true,
1819                        }));
1820                        #[cfg(feature = "avro_custom_types")]
1821                        Self::propagate_nullability_into_ree(&mut dt, w_nb);
1822                        Ok(dt)
1823                    }
1824                    _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1825                }
1826            }
1827            (Schema::Union(writer_variants), reader_non_union) => {
1828                let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1829                    .iter()
1830                    .map(|writer| {
1831                        self.resolve_type(writer, reader_non_union, namespace)
1832                            .ok()
1833                            .map(|tmp| {
1834                                let resolution = tmp
1835                                    .resolution
1836                                    .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1837                                (0usize, resolution)
1838                            })
1839                    })
1840                    .collect();
1841                let mut dt = self.parse_type(reader_non_union, namespace)?;
1842                dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1843                    writer_to_reader: Arc::from(writer_to_reader),
1844                    writer_is_union: true,
1845                    reader_is_union: false,
1846                }));
1847                Ok(dt)
1848            }
1849            (writer_non_union, Schema::Union(reader_variants)) => {
1850                if let Some((nullability, non_null_branch)) =
1851                    nullable_union_variants(reader_variants)
1852                {
1853                    let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1854                    #[cfg(feature = "avro_custom_types")]
1855                    Self::propagate_nullability_into_ree(&mut dt, nullability);
1856                    dt.nullability = Some(nullability);
1857                    // Ensure resolution is set to a non-Union variant to suppress
1858                    // reading the union tag which is the default behavior.
1859                    if dt.resolution.is_none() {
1860                        dt.resolution = Some(ResolutionInfo::Promotion(Promotion::Direct));
1861                    }
1862                    Ok(dt)
1863                } else {
1864                    let Some((match_idx, mut match_dt)) =
1865                        self.find_best_union_match(writer_non_union, reader_variants, namespace)
1866                    else {
1867                        return Err(ArrowError::SchemaError(
1868                            "Writer schema does not match any reader union branch".to_string(),
1869                        ));
1870                    };
1871                    // Steal the resolution info from the matching reader branch
1872                    // for the Union resolution, but preserve possible resolution
1873                    // information on its inner types.
1874                    // For other branches, resolution is irrelevant,
1875                    // so just parse them.
1876                    let resolution = match_dt
1877                        .resolution
1878                        .take()
1879                        .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1880                    let mut match_dt = Some(match_dt);
1881                    let children = reader_variants
1882                        .iter()
1883                        .enumerate()
1884                        .map(|(idx, variant)| {
1885                            if idx == match_idx {
1886                                Ok(match_dt.take().unwrap())
1887                            } else {
1888                                self.parse_type(variant, namespace)
1889                            }
1890                        })
1891                        .collect::<Result<Vec<_>, _>>()?;
1892                    let union_fields = build_union_fields(&children)?;
1893                    let mut dt = AvroDataType::new(
1894                        Codec::Union(children.into(), union_fields, UnionMode::Dense),
1895                        Default::default(),
1896                        None,
1897                    );
1898                    dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1899                        writer_to_reader: Arc::from(vec![Some((match_idx, resolution))]),
1900                        writer_is_union: false,
1901                        reader_is_union: true,
1902                    }));
1903                    Ok(dt)
1904                }
1905            }
1906            (
1907                Schema::Complex(ComplexType::Array(writer_array)),
1908                Schema::Complex(ComplexType::Array(reader_array)),
1909            ) => self.resolve_array(writer_array, reader_array, namespace),
1910            (
1911                Schema::Complex(ComplexType::Map(writer_map)),
1912                Schema::Complex(ComplexType::Map(reader_map)),
1913            ) => self.resolve_map(writer_map, reader_map, namespace),
1914            (
1915                Schema::Complex(ComplexType::Fixed(writer_fixed)),
1916                Schema::Complex(ComplexType::Fixed(reader_fixed)),
1917            ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1918            (
1919                Schema::Complex(ComplexType::Record(writer_record)),
1920                Schema::Complex(ComplexType::Record(reader_record)),
1921            ) => self.resolve_records(writer_record, reader_record, namespace),
1922            (
1923                Schema::Complex(ComplexType::Enum(writer_enum)),
1924                Schema::Complex(ComplexType::Enum(reader_enum)),
1925            ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1926            (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1927            (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1928            _ => Err(ArrowError::NotYetImplemented(
1929                "Other resolutions not yet implemented".to_string(),
1930            )),
1931        }
1932    }
1933
1934    fn find_best_union_match(
1935        &mut self,
1936        writer: &Schema<'a>,
1937        reader_variants: &[Schema<'a>],
1938        namespace: Option<&'a str>,
1939    ) -> Option<(usize, AvroDataType)> {
1940        let mut first_resolution = None;
1941        for (reader_index, reader) in reader_variants.iter().enumerate() {
1942            if let Ok(dt) = self.resolve_type(writer, reader, namespace) {
1943                match &dt.resolution {
1944                    None | Some(ResolutionInfo::Promotion(Promotion::Direct)) => {
1945                        // An exact match is best, return immediately.
1946                        return Some((reader_index, dt));
1947                    }
1948                    Some(_) => {
1949                        if first_resolution.is_none() {
1950                            // Store the first valid promotion but keep searching for a direct match.
1951                            first_resolution = Some((reader_index, dt));
1952                        }
1953                    }
1954                };
1955            }
1956        }
1957        first_resolution
1958    }
1959
1960    fn resolve_unions<'s>(
1961        &mut self,
1962        writer_variants: &'s [Schema<'a>],
1963        reader_variants: &'s [Schema<'a>],
1964        namespace: Option<&'a str>,
1965    ) -> Result<AvroDataType, ArrowError> {
1966        let mut resolved_reader_encodings = HashMap::new();
1967        let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1968            .iter()
1969            .map(|writer| {
1970                self.find_best_union_match(writer, reader_variants, namespace)
1971                    .map(|(match_idx, mut match_dt)| {
1972                        let resolution = match_dt
1973                            .resolution
1974                            .take()
1975                            .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1976                        // TODO: check for overlapping reader variants?
1977                        // They should not be possible in a valid schema.
1978                        resolved_reader_encodings.insert(match_idx, match_dt);
1979                        (match_idx, resolution)
1980                    })
1981            })
1982            .collect();
1983        let reader_encodings: Vec<AvroDataType> = reader_variants
1984            .iter()
1985            .enumerate()
1986            .map(|(reader_idx, reader_schema)| {
1987                if let Some(resolved) = resolved_reader_encodings.remove(&reader_idx) {
1988                    Ok(resolved)
1989                } else {
1990                    self.parse_type(reader_schema, namespace)
1991                }
1992            })
1993            .collect::<Result<_, _>>()?;
1994        let union_fields = build_union_fields(&reader_encodings)?;
1995        let mut dt = AvroDataType::new(
1996            Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1997            Default::default(),
1998            None,
1999        );
2000        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
2001            writer_to_reader: Arc::from(writer_to_reader),
2002            writer_is_union: true,
2003            reader_is_union: true,
2004        }));
2005        Ok(dt)
2006    }
2007
2008    fn resolve_array(
2009        &mut self,
2010        writer_array: &Array<'a>,
2011        reader_array: &Array<'a>,
2012        namespace: Option<&'a str>,
2013    ) -> Result<AvroDataType, ArrowError> {
2014        Ok(AvroDataType {
2015            nullability: None,
2016            metadata: reader_array.attributes.field_metadata(),
2017            codec: Codec::List(Arc::new(self.make_data_type(
2018                writer_array.items.as_ref(),
2019                Some(reader_array.items.as_ref()),
2020                namespace,
2021            )?)),
2022            resolution: None,
2023        })
2024    }
2025
2026    fn resolve_map(
2027        &mut self,
2028        writer_map: &Map<'a>,
2029        reader_map: &Map<'a>,
2030        namespace: Option<&'a str>,
2031    ) -> Result<AvroDataType, ArrowError> {
2032        Ok(AvroDataType {
2033            nullability: None,
2034            metadata: reader_map.attributes.field_metadata(),
2035            codec: Codec::Map(Arc::new(self.make_data_type(
2036                &writer_map.values,
2037                Some(&reader_map.values),
2038                namespace,
2039            )?)),
2040            resolution: None,
2041        })
2042    }
2043
2044    fn resolve_fixed<'s>(
2045        &mut self,
2046        writer_fixed: &Fixed<'a>,
2047        reader_fixed: &Fixed<'a>,
2048        reader_schema: &'s Schema<'a>,
2049        namespace: Option<&'a str>,
2050    ) -> Result<AvroDataType, ArrowError> {
2051        ensure_names_match(
2052            "Fixed",
2053            writer_fixed.name,
2054            writer_fixed.namespace,
2055            &writer_fixed.aliases,
2056            reader_fixed.name,
2057            reader_fixed.namespace,
2058            &reader_fixed.aliases,
2059        )?;
2060        if writer_fixed.size != reader_fixed.size {
2061            return Err(ArrowError::SchemaError(format!(
2062                "Fixed size mismatch for {}: writer={}, reader={}",
2063                reader_fixed.name, writer_fixed.size, reader_fixed.size
2064            )));
2065        }
2066        self.parse_type(reader_schema, namespace)
2067    }
2068
2069    fn resolve_primitives(
2070        &mut self,
2071        write_primitive: PrimitiveType,
2072        read_primitive: PrimitiveType,
2073        reader_schema: &Schema<'a>,
2074    ) -> Result<AvroDataType, ArrowError> {
2075        if write_primitive == read_primitive {
2076            return self.parse_type(reader_schema, None);
2077        }
2078        let promotion = match (write_primitive, read_primitive) {
2079            (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
2080            (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
2081            (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
2082            (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
2083            (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
2084            (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
2085            (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
2086            (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
2087            _ => {
2088                return Err(ArrowError::ParseError(format!(
2089                    "Illegal promotion {write_primitive:?} to {read_primitive:?}"
2090                )));
2091            }
2092        };
2093        let mut datatype = self.parse_type(reader_schema, None)?;
2094        datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
2095        Ok(datatype)
2096    }
2097
2098    // Resolve writer vs. reader enum schemas according to Avro 1.11.1.
2099    //
2100    // # How enums resolve (writer to reader)
2101    // Per “Schema Resolution”:
2102    // * The two schemas must refer to the same (unqualified) enum name (or match
2103    //   via alias rewriting).
2104    // * If the writer’s symbol is not present in the reader’s enum and the reader
2105    //   enum has a `default`, that `default` symbol must be used; otherwise,
2106    //   error.
2107    //   https://avro.apache.org/docs/1.11.1/specification/#schema-resolution
2108    // * Avro “Aliases” are applied from the reader side to rewrite the writer’s
2109    //   names during resolution. For robustness across ecosystems, we also accept
2110    //   symmetry here (see note below).
2111    //   https://avro.apache.org/docs/1.11.1/specification/#aliases
2112    //
2113    // # Rationale for this code path
2114    // 1. Do the work once at schema‑resolution time. Avro serializes an enum as a
2115    //    writer‑side position. Mapping positions on the hot decoder path is expensive
2116    //    if done with string lookups. This method builds a `writer_index to reader_index`
2117    //    vector once, so decoding just does an O(1) table lookup.
2118    // 2. Adopt the reader’s symbol set and order. We return an Arrow
2119    //    `Dictionary(Int32, Utf8)` whose dictionary values are the reader enum
2120    //    symbols. This makes downstream semantics match the reader schema, including
2121    //    Avro’s sort order rule that orders enums by symbol position in the schema.
2122    //    https://avro.apache.org/docs/1.11.1/specification/#sort-order
2123    // 3. Honor Avro’s `default` for enums. Avro 1.9+ allows a type‑level default
2124    //    on the enum. When the writer emits a symbol unknown to the reader, we map it
2125    //    to the reader’s validated `default` symbol if present; otherwise we signal an
2126    //    error at decoding time.
2127    //    https://avro.apache.org/docs/1.11.1/specification/#enums
2128    //
2129    // # Implementation notes
2130    // * We first check that enum names match or are*alias‑equivalent. The Avro
2131    //   spec describes alias rewriting using reader aliases; this implementation
2132    //   additionally treats writer aliases as acceptable for name matching to be
2133    //   resilient with schemas produced by different tooling.
2134    // * We build `EnumMapping`:
2135    //   - `mapping[i]` = reader index of the writer symbol at writer index `i`.
2136    //   - If the writer symbol is absent and the reader has a default, we store the
2137    //     reader index of that default.
2138    //   - Otherwise we store `-1` as a sentinel meaning unresolvable; the decoder
2139    //     must treat encountering such a value as an error, per the spec.
2140    // * We persist the reader symbol list in field metadata under
2141    //   `AVRO_ENUM_SYMBOLS_METADATA_KEY`, so consumers can inspect the dictionary
2142    //   without needing the original Avro schema.
2143    // * The Arrow representation is `Dictionary(Int32, Utf8)`, which aligns with
2144    //   Avro’s integer index encoding for enums.
2145    //
2146    // # Examples
2147    // * Writer `["A","B","C"]`, Reader `["A","B"]`, Reader default `"A"`
2148    //     `mapping = [0, 1, 0]`, `default_index = 0`.
2149    // * Writer `["A","B"]`, Reader `["B","A"]` (no default)
2150    //     `mapping = [1, 0]`, `default_index = -1`.
2151    // * Writer `["A","B","C"]`, Reader `["A","B"]` (no default)
2152    //     `mapping = [0, 1, -1]` (decode must error on `"C"`).
2153    fn resolve_enums(
2154        &mut self,
2155        writer_enum: &Enum<'a>,
2156        reader_enum: &Enum<'a>,
2157        reader_schema: &Schema<'a>,
2158        namespace: Option<&'a str>,
2159    ) -> Result<AvroDataType, ArrowError> {
2160        ensure_names_match(
2161            "Enum",
2162            writer_enum.name,
2163            writer_enum.namespace,
2164            &writer_enum.aliases,
2165            reader_enum.name,
2166            reader_enum.namespace,
2167            &reader_enum.aliases,
2168        )?;
2169        if writer_enum.symbols == reader_enum.symbols {
2170            return self.parse_type(reader_schema, namespace);
2171        }
2172        let reader_index: HashMap<&str, i32> = reader_enum
2173            .symbols
2174            .iter()
2175            .enumerate()
2176            .map(|(index, &symbol)| (symbol, index as i32))
2177            .collect();
2178        let default_index: i32 = match reader_enum.default {
2179            Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
2180                ArrowError::SchemaError(format!(
2181                    "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
2182                    reader_enum.name,
2183                ))
2184            })?,
2185            None => -1,
2186        };
2187        let mapping: Vec<i32> = writer_enum
2188            .symbols
2189            .iter()
2190            .map(|&write_symbol| {
2191                reader_index
2192                    .get(write_symbol)
2193                    .copied()
2194                    .unwrap_or(default_index)
2195            })
2196            .collect();
2197        if self.strict_mode && mapping.iter().any(|&m| m < 0) {
2198            return Err(ArrowError::SchemaError(format!(
2199                "Reader enum '{}' does not cover all writer symbols and no default is provided",
2200                reader_enum.name
2201            )));
2202        }
2203        let mut dt = self.parse_type(reader_schema, namespace)?;
2204        dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
2205            mapping: Arc::from(mapping),
2206            default_index,
2207        }));
2208        let reader_ns = reader_enum.namespace.or(namespace);
2209        self.resolver
2210            .register(reader_enum.name, reader_ns, dt.clone());
2211        Ok(dt)
2212    }
2213
2214    #[inline]
2215    fn build_writer_lookup(
2216        writer_record: &Record<'a>,
2217    ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
2218        let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
2219        for (idx, wf) in writer_record.fields.iter().enumerate() {
2220            // Avro field names are unique; last-in wins are acceptable and match previous behavior.
2221            map.insert(wf.name, idx);
2222        }
2223        // Track ambiguous writer aliases (alias used by multiple writer fields)
2224        let mut ambiguous: HashSet<&str> = HashSet::new();
2225        for (idx, wf) in writer_record.fields.iter().enumerate() {
2226            for &alias in &wf.aliases {
2227                match map.entry(alias) {
2228                    Entry::Occupied(e) if *e.get() != idx => {
2229                        ambiguous.insert(alias);
2230                    }
2231                    Entry::Vacant(e) => {
2232                        e.insert(idx);
2233                    }
2234                    _ => {}
2235                }
2236            }
2237        }
2238        (map, ambiguous)
2239    }
2240
2241    fn resolve_records(
2242        &mut self,
2243        writer_record: &Record<'a>,
2244        reader_record: &Record<'a>,
2245        namespace: Option<&'a str>,
2246    ) -> Result<AvroDataType, ArrowError> {
2247        ensure_names_match(
2248            "Record",
2249            writer_record.name,
2250            writer_record.namespace,
2251            &writer_record.aliases,
2252            reader_record.name,
2253            reader_record.namespace,
2254            &reader_record.aliases,
2255        )?;
2256        let writer_ns = writer_record.namespace.or(namespace);
2257        let reader_ns = reader_record.namespace.or(namespace);
2258        let mut reader_md = reader_record.attributes.field_metadata();
2259        reader_md.insert(
2260            AVRO_NAME_METADATA_KEY.to_string(),
2261            reader_record.name.to_string(),
2262        );
2263        if let Some(ns) = reader_ns {
2264            reader_md.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
2265        }
2266        // Build writer lookup and ambiguous alias set.
2267        let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
2268        let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
2269        let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
2270        // Capture default field indices during the main loop (one pass).
2271        let mut default_fields: Vec<usize> = Vec::new();
2272        for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
2273            // Direct name match, then reader aliases (a writer alias map is pre-populated).
2274            let mut match_idx = writer_lookup.get(r_field.name).copied();
2275            let mut matched_via_alias: Option<&str> = None;
2276            if match_idx.is_none() {
2277                for &alias in &r_field.aliases {
2278                    if let Some(i) = writer_lookup.get(alias).copied() {
2279                        if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
2280                            return Err(ArrowError::SchemaError(format!(
2281                                "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
2282                                r_field.name
2283                            )));
2284                        }
2285                        match_idx = Some(i);
2286                        matched_via_alias = Some(alias);
2287                        break;
2288                    }
2289                }
2290            }
2291            if let Some(wi) = match_idx {
2292                if writer_to_reader[wi].is_none() {
2293                    let w_schema = &writer_record.fields[wi].r#type;
2294                    let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
2295                    writer_to_reader[wi] = Some(reader_idx);
2296                    reader_fields.push(AvroField {
2297                        name: r_field.name.to_owned(),
2298                        data_type: dt,
2299                    });
2300                    continue;
2301                } else if self.strict_mode {
2302                    // Writer field already mapped and strict_mode => error
2303                    let existing_reader = writer_to_reader[wi].unwrap();
2304                    let via = matched_via_alias
2305                        .map(|a| format!("alias '{a}'"))
2306                        .unwrap_or_else(|| "name match".to_string());
2307                    return Err(ArrowError::SchemaError(format!(
2308                        "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
2309                        writer_record.fields[wi].name
2310                    )));
2311                }
2312                // Non-strict and already mapped -> fall through to defaulting logic
2313            }
2314            // No match (or conflicted in non-strict mode): attach default per Avro spec.
2315            let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
2316            if let Some(default_json) = r_field.default.as_ref() {
2317                dt.resolution = Some(ResolutionInfo::DefaultValue(
2318                    dt.parse_and_store_default(default_json)?,
2319                ));
2320                default_fields.push(reader_idx);
2321            } else if dt.nullability() == Some(Nullability::NullFirst) {
2322                // The only valid implicit default for a union is the first branch (null-first case).
2323                dt.resolution = Some(ResolutionInfo::DefaultValue(
2324                    dt.parse_and_store_default(&Value::Null)?,
2325                ));
2326                default_fields.push(reader_idx);
2327            } else {
2328                return Err(ArrowError::SchemaError(format!(
2329                    "Reader field '{}' not present in writer schema must have a default value",
2330                    r_field.name
2331                )));
2332            }
2333            reader_fields.push(AvroField {
2334                name: r_field.name.to_owned(),
2335                data_type: dt,
2336            });
2337        }
2338        // Build writer field map.
2339        let writer_fields = writer_record
2340            .fields
2341            .iter()
2342            .enumerate()
2343            .map(|(writer_index, writer_field)| {
2344                if let Some(reader_index) = writer_to_reader[writer_index] {
2345                    Ok(ResolvedField::ToReader(reader_index))
2346                } else {
2347                    let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
2348                    Ok(ResolvedField::Skip(dt))
2349                }
2350            })
2351            .collect::<Result<_, ArrowError>>()?;
2352        let resolved = AvroDataType::new_with_resolution(
2353            Codec::Struct(Arc::from(reader_fields)),
2354            reader_md,
2355            None,
2356            Some(ResolutionInfo::Record(ResolvedRecord {
2357                writer_fields,
2358                default_fields: Arc::from(default_fields),
2359            })),
2360        );
2361        // Register a resolved record by reader name+namespace for potential named type refs.
2362        self.resolver
2363            .register(reader_record.name, reader_ns, resolved.clone());
2364        Ok(resolved)
2365    }
2366}
2367
2368#[cfg(test)]
2369mod tests {
2370    use super::*;
2371    use crate::schema::{
2372        AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2373        Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2374    };
2375    use indexmap::IndexMap;
2376    use serde_json::{self, Value};
2377
2378    fn create_schema_with_logical_type(
2379        primitive_type: PrimitiveType,
2380        logical_type: &'static str,
2381    ) -> Schema<'static> {
2382        let attributes = Attributes {
2383            logical_type: Some(logical_type),
2384            additional: Default::default(),
2385        };
2386
2387        Schema::Type(Type {
2388            r#type: TypeName::Primitive(primitive_type),
2389            attributes,
2390        })
2391    }
2392
2393    fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2394        let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2395        let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2396        let mut maker = Maker::new(false, false, Tz::default());
2397        maker
2398            .make_data_type(&writer_schema, Some(&reader_schema), None)
2399            .expect("promotion should resolve")
2400    }
2401
2402    fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2403        Schema::TypeName(TypeName::Primitive(pt))
2404    }
2405    fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2406        Schema::Union(branches)
2407    }
2408
2409    #[test]
2410    fn test_date_logical_type() {
2411        let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2412
2413        let mut maker = Maker::new(false, false, Tz::default());
2414        let result = maker.make_data_type(&schema, None, None).unwrap();
2415
2416        assert!(matches!(result.codec, Codec::Date32));
2417    }
2418
2419    #[test]
2420    fn test_time_millis_logical_type() {
2421        let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2422
2423        let mut maker = Maker::new(false, false, Tz::default());
2424        let result = maker.make_data_type(&schema, None, None).unwrap();
2425
2426        assert!(matches!(result.codec, Codec::TimeMillis));
2427    }
2428
2429    #[test]
2430    fn test_time_micros_logical_type() {
2431        let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2432
2433        let mut maker = Maker::new(false, false, Tz::default());
2434        let result = maker.make_data_type(&schema, None, None).unwrap();
2435
2436        assert!(matches!(result.codec, Codec::TimeMicros));
2437    }
2438
2439    #[test]
2440    fn test_timestamp_millis_logical_type() {
2441        for tz in [Tz::OffsetZero, Tz::Utc] {
2442            let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2443
2444            let mut maker = Maker::new(false, false, tz);
2445            let result = maker.make_data_type(&schema, None, None).unwrap();
2446
2447            let Codec::TimestampMillis(Some(actual_tz)) = result.codec else {
2448                panic!("Expected TimestampMillis codec");
2449            };
2450            assert_eq!(actual_tz, tz);
2451        }
2452    }
2453
2454    #[test]
2455    fn test_timestamp_micros_logical_type() {
2456        for tz in [Tz::OffsetZero, Tz::Utc] {
2457            let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2458
2459            let mut maker = Maker::new(false, false, tz);
2460            let result = maker.make_data_type(&schema, None, None).unwrap();
2461
2462            let Codec::TimestampMicros(Some(actual_tz)) = result.codec else {
2463                panic!("Expected TimestampMicros codec");
2464            };
2465            assert_eq!(actual_tz, tz);
2466        }
2467    }
2468
2469    #[test]
2470    fn test_timestamp_nanos_logical_type() {
2471        for tz in [Tz::OffsetZero, Tz::Utc] {
2472            let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-nanos");
2473
2474            let mut maker = Maker::new(false, false, tz);
2475            let result = maker.make_data_type(&schema, None, None).unwrap();
2476
2477            let Codec::TimestampNanos(Some(actual_tz)) = result.codec else {
2478                panic!("Expected TimestampNanos codec");
2479            };
2480            assert_eq!(actual_tz, tz);
2481        }
2482    }
2483
2484    #[test]
2485    fn test_local_timestamp_millis_logical_type() {
2486        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2487
2488        let mut maker = Maker::new(false, false, Tz::default());
2489        let result = maker.make_data_type(&schema, None, None).unwrap();
2490
2491        assert!(matches!(result.codec, Codec::TimestampMillis(None)));
2492    }
2493
2494    #[test]
2495    fn test_local_timestamp_micros_logical_type() {
2496        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2497
2498        let mut maker = Maker::new(false, false, Tz::default());
2499        let result = maker.make_data_type(&schema, None, None).unwrap();
2500
2501        assert!(matches!(result.codec, Codec::TimestampMicros(None)));
2502    }
2503
2504    #[test]
2505    fn test_local_timestamp_nanos_logical_type() {
2506        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-nanos");
2507
2508        let mut maker = Maker::new(false, false, Tz::default());
2509        let result = maker.make_data_type(&schema, None, None).unwrap();
2510
2511        assert!(matches!(result.codec, Codec::TimestampNanos(None)));
2512    }
2513
2514    #[test]
2515    fn test_uuid_type() {
2516        let mut codec = Codec::Fixed(16);
2517        if let c @ Codec::Fixed(16) = &mut codec {
2518            *c = Codec::Uuid;
2519        }
2520        assert!(matches!(codec, Codec::Uuid));
2521    }
2522
2523    #[test]
2524    fn test_duration_logical_type() {
2525        let mut codec = Codec::Fixed(12);
2526
2527        if let c @ Codec::Fixed(12) = &mut codec {
2528            *c = Codec::Interval;
2529        }
2530
2531        assert!(matches!(codec, Codec::Interval));
2532    }
2533
2534    #[test]
2535    fn test_decimal_logical_type_not_implemented() {
2536        let codec = Codec::Fixed(16);
2537
2538        let process_decimal = || -> Result<(), ArrowError> {
2539            if let Codec::Fixed(_) = codec {
2540                return Err(ArrowError::NotYetImplemented(
2541                    "Decimals are not currently supported".to_string(),
2542                ));
2543            }
2544            Ok(())
2545        };
2546
2547        let result = process_decimal();
2548
2549        assert!(result.is_err());
2550        if let Err(ArrowError::NotYetImplemented(msg)) = result {
2551            assert!(msg.contains("Decimals are not currently supported"));
2552        } else {
2553            panic!("Expected NotYetImplemented error");
2554        }
2555    }
2556    #[test]
2557    fn test_unknown_logical_type_added_to_metadata() {
2558        let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2559
2560        let mut maker = Maker::new(false, false, Tz::default());
2561        let result = maker.make_data_type(&schema, None, None).unwrap();
2562
2563        assert_eq!(
2564            result.metadata.get("logicalType"),
2565            Some(&"custom-type".to_string())
2566        );
2567    }
2568
2569    #[test]
2570    fn test_string_with_utf8view_enabled() {
2571        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2572
2573        let mut maker = Maker::new(true, false, Tz::default());
2574        let result = maker.make_data_type(&schema, None, None).unwrap();
2575
2576        assert!(matches!(result.codec, Codec::Utf8View));
2577    }
2578
2579    #[test]
2580    fn test_string_without_utf8view_enabled() {
2581        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2582
2583        let mut maker = Maker::new(false, false, Tz::default());
2584        let result = maker.make_data_type(&schema, None, None).unwrap();
2585
2586        assert!(matches!(result.codec, Codec::Utf8));
2587    }
2588
2589    #[test]
2590    fn test_record_with_string_and_utf8view_enabled() {
2591        let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2592
2593        let avro_field = crate::schema::Field {
2594            name: "string_field",
2595            r#type: field_schema,
2596            default: None,
2597            doc: None,
2598            aliases: vec![],
2599        };
2600
2601        let record = Record {
2602            name: "test_record",
2603            namespace: None,
2604            aliases: vec![],
2605            doc: None,
2606            fields: vec![avro_field],
2607            attributes: Attributes::default(),
2608        };
2609
2610        let schema = Schema::Complex(ComplexType::Record(record));
2611
2612        let mut maker = Maker::new(true, false, Tz::default());
2613        let result = maker.make_data_type(&schema, None, None).unwrap();
2614
2615        if let Codec::Struct(fields) = &result.codec {
2616            let first_field_codec = &fields[0].data_type().codec;
2617            assert!(matches!(first_field_codec, Codec::Utf8View));
2618        } else {
2619            panic!("Expected Struct codec");
2620        }
2621    }
2622
2623    #[test]
2624    fn test_union_with_strict_mode() {
2625        let schema = Schema::Union(vec![
2626            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2627            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2628        ]);
2629
2630        let mut maker = Maker::new(false, true, Tz::default());
2631        let result = maker.make_data_type(&schema, None, None);
2632
2633        assert!(result.is_err());
2634        match result {
2635            Err(ArrowError::SchemaError(msg)) => {
2636                assert!(msg.contains(
2637                    "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2638                ));
2639            }
2640            _ => panic!("Expected SchemaError"),
2641        }
2642    }
2643
2644    #[test]
2645    fn test_resolve_int_to_float_promotion() {
2646        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2647        assert!(matches!(result.codec, Codec::Float32));
2648        assert_eq!(
2649            result.resolution,
2650            Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2651        );
2652    }
2653
2654    #[test]
2655    fn test_resolve_int_to_double_promotion() {
2656        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2657        assert!(matches!(result.codec, Codec::Float64));
2658        assert_eq!(
2659            result.resolution,
2660            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2661        );
2662    }
2663
2664    #[test]
2665    fn test_resolve_long_to_float_promotion() {
2666        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2667        assert!(matches!(result.codec, Codec::Float32));
2668        assert_eq!(
2669            result.resolution,
2670            Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2671        );
2672    }
2673
2674    #[test]
2675    fn test_resolve_long_to_double_promotion() {
2676        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2677        assert!(matches!(result.codec, Codec::Float64));
2678        assert_eq!(
2679            result.resolution,
2680            Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2681        );
2682    }
2683
2684    #[test]
2685    fn test_resolve_float_to_double_promotion() {
2686        let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2687        assert!(matches!(result.codec, Codec::Float64));
2688        assert_eq!(
2689            result.resolution,
2690            Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2691        );
2692    }
2693
2694    #[test]
2695    fn test_resolve_string_to_bytes_promotion() {
2696        let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2697        assert!(matches!(result.codec, Codec::Binary));
2698        assert_eq!(
2699            result.resolution,
2700            Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2701        );
2702    }
2703
2704    #[test]
2705    fn test_resolve_bytes_to_string_promotion() {
2706        let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2707        assert!(matches!(result.codec, Codec::Utf8));
2708        assert_eq!(
2709            result.resolution,
2710            Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2711        );
2712    }
2713
2714    #[test]
2715    fn test_resolve_illegal_promotion_double_to_float_errors() {
2716        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2717        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2718        let mut maker = Maker::new(false, false, Tz::default());
2719        let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2720        assert!(result.is_err());
2721        match result {
2722            Err(ArrowError::ParseError(msg)) => {
2723                assert!(msg.contains("Illegal promotion"));
2724            }
2725            _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2726        }
2727    }
2728
2729    #[test]
2730    fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2731        let writer = Schema::Union(vec![
2732            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2733            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2734        ]);
2735        let reader = Schema::Union(vec![
2736            Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2737            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2738        ]);
2739        let mut maker = Maker::new(false, false, Tz::default());
2740        let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2741        assert!(matches!(result.codec, Codec::Float64));
2742        assert_eq!(
2743            result.resolution,
2744            Some(ResolutionInfo::Union(ResolvedUnion {
2745                writer_to_reader: [
2746                    None,
2747                    Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2748                ]
2749                .into(),
2750                writer_is_union: true,
2751                reader_is_union: true,
2752            }))
2753        );
2754        assert_eq!(result.nullability, Some(Nullability::NullFirst));
2755    }
2756
2757    #[test]
2758    fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2759        let writer = mk_union(vec![
2760            mk_primitive(PrimitiveType::String),
2761            mk_primitive(PrimitiveType::Long),
2762        ]);
2763        let reader = mk_primitive(PrimitiveType::Bytes);
2764        let mut maker = Maker::new(false, false, Tz::default());
2765        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2766        assert!(matches!(dt.codec(), Codec::Binary));
2767        let resolved = match dt.resolution {
2768            Some(ResolutionInfo::Union(u)) => u,
2769            other => panic!("expected union resolution info, got {other:?}"),
2770        };
2771        assert!(resolved.writer_is_union && !resolved.reader_is_union);
2772        assert_eq!(
2773            resolved.writer_to_reader.as_ref(),
2774            &[
2775                Some((0, ResolutionInfo::Promotion(Promotion::StringToBytes))),
2776                None
2777            ]
2778        );
2779    }
2780
2781    #[test]
2782    fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2783        let writer = mk_primitive(PrimitiveType::Long);
2784        let reader = mk_union(vec![
2785            mk_primitive(PrimitiveType::Long),
2786            mk_primitive(PrimitiveType::Double),
2787        ]);
2788        let mut maker = Maker::new(false, false, Tz::default());
2789        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2790        let resolved = match dt.resolution {
2791            Some(ResolutionInfo::Union(u)) => u,
2792            other => panic!("expected union resolution info, got {other:?}"),
2793        };
2794        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2795        assert_eq!(
2796            resolved.writer_to_reader.as_ref(),
2797            &[Some((0, ResolutionInfo::Promotion(Promotion::Direct)))]
2798        );
2799    }
2800
2801    #[test]
2802    fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2803        let writer = mk_primitive(PrimitiveType::Int);
2804        let reader = mk_union(vec![
2805            mk_primitive(PrimitiveType::Null),
2806            mk_primitive(PrimitiveType::Long),
2807            mk_primitive(PrimitiveType::String),
2808        ]);
2809        let mut maker = Maker::new(false, false, Tz::default());
2810        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2811        let resolved = match dt.resolution {
2812            Some(ResolutionInfo::Union(u)) => u,
2813            other => panic!("expected union resolution info, got {other:?}"),
2814        };
2815        assert_eq!(
2816            resolved.writer_to_reader.as_ref(),
2817            &[Some((1, ResolutionInfo::Promotion(Promotion::IntToLong)))]
2818        );
2819    }
2820
2821    #[test]
2822    fn test_resolve_writer_non_union_to_reader_union_preserves_inner_record_defaults() {
2823        // Writer: record Inner{a: int}
2824        // Reader: union [Inner{a: int, b: int default 42}, string]
2825        // The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
2826        let writer = Schema::Complex(ComplexType::Record(Record {
2827            name: "Inner",
2828            namespace: None,
2829            doc: None,
2830            aliases: vec![],
2831            fields: vec![AvroFieldSchema {
2832                name: "a",
2833                doc: None,
2834                r#type: mk_primitive(PrimitiveType::Int),
2835                default: None,
2836                aliases: vec![],
2837            }],
2838            attributes: Attributes::default(),
2839        }));
2840        let reader = mk_union(vec![
2841            Schema::Complex(ComplexType::Record(Record {
2842                name: "Inner",
2843                namespace: None,
2844                doc: None,
2845                aliases: vec![],
2846                fields: vec![
2847                    AvroFieldSchema {
2848                        name: "a",
2849                        doc: None,
2850                        r#type: mk_primitive(PrimitiveType::Int),
2851                        default: None,
2852                        aliases: vec![],
2853                    },
2854                    AvroFieldSchema {
2855                        name: "b",
2856                        doc: None,
2857                        r#type: mk_primitive(PrimitiveType::Int),
2858                        default: Some(Value::Number(serde_json::Number::from(42))),
2859                        aliases: vec![],
2860                    },
2861                ],
2862                attributes: Attributes::default(),
2863            })),
2864            mk_primitive(PrimitiveType::String),
2865        ]);
2866        let mut maker = Maker::new(false, false, Default::default());
2867        let dt = maker
2868            .make_data_type(&writer, Some(&reader), None)
2869            .expect("resolution should succeed");
2870        // Verify the union resolution structure
2871        let resolved = match dt.resolution.as_ref() {
2872            Some(ResolutionInfo::Union(u)) => u,
2873            other => panic!("expected union resolution info, got {other:?}"),
2874        };
2875        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2876        assert_eq!(
2877            resolved.writer_to_reader.len(),
2878            1,
2879            "expected the non-union record to resolve to a union variant"
2880        );
2881        let resolution = match resolved.writer_to_reader.first().unwrap() {
2882            Some((0, resolution)) => resolution,
2883            other => panic!("unexpected writer-to-reader table value {other:?}"),
2884        };
2885        match resolution {
2886            ResolutionInfo::Record(ResolvedRecord {
2887                writer_fields,
2888                default_fields,
2889            }) => {
2890                assert_eq!(writer_fields.len(), 1);
2891                assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2892                assert_eq!(default_fields.len(), 1);
2893                assert_eq!(default_fields[0], 1);
2894            }
2895            other => panic!("unexpected resolution {other:?}"),
2896        }
2897        // The matching child (Inner at index 0) should have field b with DefaultValue
2898        let children = match dt.codec() {
2899            Codec::Union(children, _, _) => children,
2900            other => panic!("expected union codec, got {other:?}"),
2901        };
2902        let inner_fields = match children[0].codec() {
2903            Codec::Struct(f) => f,
2904            other => panic!("expected struct codec for Inner, got {other:?}"),
2905        };
2906        assert_eq!(inner_fields.len(), 2);
2907        assert_eq!(inner_fields[1].name(), "b");
2908        assert_eq!(
2909            inner_fields[1].data_type().resolution,
2910            Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2911            "field b should have DefaultValue(Int(42)) from schema resolution"
2912        );
2913    }
2914
2915    #[test]
2916    fn test_resolve_writer_union_to_reader_union_preserves_inner_record_defaults() {
2917        // Writer: record [string, Inner{a: int}]
2918        // Reader: union [Inner{a: int, b: int default 42}, string]
2919        // The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
2920        let writer = mk_union(vec![
2921            mk_primitive(PrimitiveType::String),
2922            Schema::Complex(ComplexType::Record(Record {
2923                name: "Inner",
2924                namespace: None,
2925                doc: None,
2926                aliases: vec![],
2927                fields: vec![AvroFieldSchema {
2928                    name: "a",
2929                    doc: None,
2930                    r#type: mk_primitive(PrimitiveType::Int),
2931                    default: None,
2932                    aliases: vec![],
2933                }],
2934                attributes: Attributes::default(),
2935            })),
2936        ]);
2937        let reader = mk_union(vec![
2938            Schema::Complex(ComplexType::Record(Record {
2939                name: "Inner",
2940                namespace: None,
2941                doc: None,
2942                aliases: vec![],
2943                fields: vec![
2944                    AvroFieldSchema {
2945                        name: "a",
2946                        doc: None,
2947                        r#type: mk_primitive(PrimitiveType::Int),
2948                        default: None,
2949                        aliases: vec![],
2950                    },
2951                    AvroFieldSchema {
2952                        name: "b",
2953                        doc: None,
2954                        r#type: mk_primitive(PrimitiveType::Int),
2955                        default: Some(Value::Number(serde_json::Number::from(42))),
2956                        aliases: vec![],
2957                    },
2958                ],
2959                attributes: Attributes::default(),
2960            })),
2961            mk_primitive(PrimitiveType::String),
2962        ]);
2963        let mut maker = Maker::new(false, false, Default::default());
2964        let dt = maker
2965            .make_data_type(&writer, Some(&reader), None)
2966            .expect("resolution should succeed");
2967        // Verify the union resolution structure
2968        let resolved = match dt.resolution.as_ref() {
2969            Some(ResolutionInfo::Union(u)) => u,
2970            other => panic!("expected union resolution info, got {other:?}"),
2971        };
2972        assert!(resolved.writer_is_union && resolved.reader_is_union);
2973        assert_eq!(resolved.writer_to_reader.len(), 2);
2974        let resolution = match resolved.writer_to_reader[1].as_ref() {
2975            Some((0, resolution)) => resolution,
2976            other => panic!("unexpected writer-to-reader table value {other:?}"),
2977        };
2978        match resolution {
2979            ResolutionInfo::Record(ResolvedRecord {
2980                writer_fields,
2981                default_fields,
2982            }) => {
2983                assert_eq!(writer_fields.len(), 1);
2984                assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2985                assert_eq!(default_fields.len(), 1);
2986                assert_eq!(default_fields[0], 1);
2987            }
2988            other => panic!("unexpected resolution {other:?}"),
2989        }
2990        // The matching child (Inner at index 0) should have field b with DefaultValue
2991        let children = match dt.codec() {
2992            Codec::Union(children, _, _) => children,
2993            other => panic!("expected union codec, got {other:?}"),
2994        };
2995        let inner_fields = match children[0].codec() {
2996            Codec::Struct(f) => f,
2997            other => panic!("expected struct codec for Inner, got {other:?}"),
2998        };
2999        assert_eq!(inner_fields.len(), 2);
3000        assert_eq!(inner_fields[1].name(), "b");
3001        assert_eq!(
3002            inner_fields[1].data_type().resolution,
3003            Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
3004            "field b should have DefaultValue(Int(42)) from schema resolution"
3005        );
3006    }
3007
3008    #[test]
3009    fn test_resolve_both_nullable_unions_direct_match() {
3010        let writer = mk_union(vec![
3011            mk_primitive(PrimitiveType::Null),
3012            mk_primitive(PrimitiveType::String),
3013        ]);
3014        let reader = mk_union(vec![
3015            mk_primitive(PrimitiveType::String),
3016            mk_primitive(PrimitiveType::Null),
3017        ]);
3018        let mut maker = Maker::new(false, false, Tz::default());
3019        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
3020        assert!(matches!(dt.codec(), Codec::Utf8));
3021        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
3022        assert_eq!(
3023            dt.resolution,
3024            Some(ResolutionInfo::Union(ResolvedUnion {
3025                writer_to_reader: [
3026                    None,
3027                    Some((0, ResolutionInfo::Promotion(Promotion::Direct)))
3028                ]
3029                .into(),
3030                writer_is_union: true,
3031                reader_is_union: true
3032            }))
3033        );
3034    }
3035
3036    #[test]
3037    fn test_resolve_both_nullable_unions_with_promotion() {
3038        let writer = mk_union(vec![
3039            mk_primitive(PrimitiveType::Null),
3040            mk_primitive(PrimitiveType::Int),
3041        ]);
3042        let reader = mk_union(vec![
3043            mk_primitive(PrimitiveType::Double),
3044            mk_primitive(PrimitiveType::Null),
3045        ]);
3046        let mut maker = Maker::new(false, false, Tz::default());
3047        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
3048        assert!(matches!(dt.codec(), Codec::Float64));
3049        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
3050        assert_eq!(
3051            dt.resolution,
3052            Some(ResolutionInfo::Union(ResolvedUnion {
3053                writer_to_reader: [
3054                    None,
3055                    Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
3056                ]
3057                .into(),
3058                writer_is_union: true,
3059                reader_is_union: true
3060            }))
3061        );
3062    }
3063
3064    #[test]
3065    fn test_resolve_type_promotion() {
3066        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3067        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
3068        let mut maker = Maker::new(false, false, Tz::default());
3069        let result = maker
3070            .make_data_type(&writer_schema, Some(&reader_schema), None)
3071            .unwrap();
3072        assert!(matches!(result.codec, Codec::Int64));
3073        assert_eq!(
3074            result.resolution,
3075            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3076        );
3077    }
3078
3079    #[test]
3080    fn test_nested_record_type_reuse_without_namespace() {
3081        let schema_str = r#"
3082        {
3083          "type": "record",
3084          "name": "Record",
3085          "fields": [
3086            {
3087              "name": "nested",
3088              "type": {
3089                "type": "record",
3090                "name": "Nested",
3091                "fields": [
3092                  { "name": "nested_int", "type": "int" }
3093                ]
3094              }
3095            },
3096            { "name": "nestedRecord", "type": "Nested" },
3097            { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
3098            { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
3099          ]
3100        }
3101        "#;
3102
3103        let schema: Schema = serde_json::from_str(schema_str).unwrap();
3104
3105        let mut maker = Maker::new(false, false, Tz::default());
3106        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3107
3108        if let Codec::Struct(fields) = avro_data_type.codec() {
3109            assert_eq!(fields.len(), 4);
3110
3111            // nested
3112            assert_eq!(fields[0].name(), "nested");
3113            let nested_data_type = fields[0].data_type();
3114            if let Codec::Struct(nested_fields) = nested_data_type.codec() {
3115                assert_eq!(nested_fields.len(), 1);
3116                assert_eq!(nested_fields[0].name(), "nested_int");
3117                assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
3118            } else {
3119                panic!(
3120                    "'nested' field is not a struct but {:?}",
3121                    nested_data_type.codec()
3122                );
3123            }
3124
3125            // nestedRecord
3126            assert_eq!(fields[1].name(), "nestedRecord");
3127            let nested_record_data_type = fields[1].data_type();
3128            assert_eq!(
3129                nested_record_data_type.codec().data_type(),
3130                nested_data_type.codec().data_type()
3131            );
3132
3133            // nestedArray
3134            assert_eq!(fields[2].name(), "nestedArray");
3135            if let Codec::List(item_type) = fields[2].data_type().codec() {
3136                assert_eq!(
3137                    item_type.codec().data_type(),
3138                    nested_data_type.codec().data_type()
3139                );
3140            } else {
3141                panic!("'nestedArray' field is not a list");
3142            }
3143
3144            // nestedMap
3145            assert_eq!(fields[3].name(), "nestedMap");
3146            if let Codec::Map(value_type) = fields[3].data_type().codec() {
3147                assert_eq!(
3148                    value_type.codec().data_type(),
3149                    nested_data_type.codec().data_type()
3150                );
3151            } else {
3152                panic!("'nestedMap' field is not a map");
3153            }
3154        } else {
3155            panic!("Top-level schema is not a struct");
3156        }
3157    }
3158
3159    #[test]
3160    fn test_nested_enum_type_reuse_with_namespace() {
3161        let schema_str = r#"
3162        {
3163          "type": "record",
3164          "name": "Record",
3165          "namespace": "record_ns",
3166          "fields": [
3167            {
3168              "name": "status",
3169              "type": {
3170                "type": "enum",
3171                "name": "Status",
3172                "namespace": "enum_ns",
3173                "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
3174              }
3175            },
3176            { "name": "backupStatus", "type": "enum_ns.Status" },
3177            { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
3178            { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
3179          ]
3180        }
3181        "#;
3182
3183        let schema: Schema = serde_json::from_str(schema_str).unwrap();
3184
3185        let mut maker = Maker::new(false, false, Tz::default());
3186        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3187
3188        if let Codec::Struct(fields) = avro_data_type.codec() {
3189            assert_eq!(fields.len(), 4);
3190
3191            // status
3192            assert_eq!(fields[0].name(), "status");
3193            let status_data_type = fields[0].data_type();
3194            if let Codec::Enum(symbols) = status_data_type.codec() {
3195                assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
3196            } else {
3197                panic!(
3198                    "'status' field is not an enum but {:?}",
3199                    status_data_type.codec()
3200                );
3201            }
3202
3203            // backupStatus
3204            assert_eq!(fields[1].name(), "backupStatus");
3205            let backup_status_data_type = fields[1].data_type();
3206            assert_eq!(
3207                backup_status_data_type.codec().data_type(),
3208                status_data_type.codec().data_type()
3209            );
3210
3211            // statusHistory
3212            assert_eq!(fields[2].name(), "statusHistory");
3213            if let Codec::List(item_type) = fields[2].data_type().codec() {
3214                assert_eq!(
3215                    item_type.codec().data_type(),
3216                    status_data_type.codec().data_type()
3217                );
3218            } else {
3219                panic!("'statusHistory' field is not a list");
3220            }
3221
3222            // statusMap
3223            assert_eq!(fields[3].name(), "statusMap");
3224            if let Codec::Map(value_type) = fields[3].data_type().codec() {
3225                assert_eq!(
3226                    value_type.codec().data_type(),
3227                    status_data_type.codec().data_type()
3228                );
3229            } else {
3230                panic!("'statusMap' field is not a map");
3231            }
3232        } else {
3233            panic!("Top-level schema is not a struct");
3234        }
3235    }
3236
3237    #[test]
3238    fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
3239        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3240        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3241        let mut maker = Maker::new(false, false, Tz::default());
3242        let data_type = maker
3243            .make_data_type(&writer_schema, Some(&reader_schema), None)
3244            .expect("resolution should succeed");
3245        let field = AvroField {
3246            name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
3247            data_type,
3248        };
3249        assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
3250        assert!(matches!(field.data_type().codec(), Codec::Utf8));
3251    }
3252
3253    fn json_string(s: &str) -> Value {
3254        Value::String(s.to_string())
3255    }
3256
3257    fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
3258        let stored = dt
3259            .metadata
3260            .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
3261            .cloned()
3262            .unwrap_or_default();
3263        let expected = serde_json::to_string(default_json).unwrap();
3264        assert_eq!(stored, expected, "stored default metadata should match");
3265    }
3266
3267    #[test]
3268    fn test_validate_and_store_default_null_and_nullability_rules() {
3269        let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
3270        let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
3271        assert_eq!(lit, AvroLiteral::Null);
3272        assert_default_stored(&dt_null, &Value::Null);
3273        let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3274        let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
3275        assert!(
3276            err.to_string()
3277                .contains("JSON null default is only valid for `null` type"),
3278            "unexpected error: {err}"
3279        );
3280        let mut dt_int_nf =
3281            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
3282        let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
3283        assert_eq!(lit2, AvroLiteral::Null);
3284        assert_default_stored(&dt_int_nf, &Value::Null);
3285        let mut dt_int_ns =
3286            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
3287        let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
3288        assert!(
3289            err2.to_string()
3290                .contains("JSON null default is only valid for `null` type"),
3291            "unexpected error: {err2}"
3292        );
3293    }
3294
3295    #[test]
3296    fn test_validate_and_store_default_primitives_and_temporal() {
3297        let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
3298        let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
3299        assert_eq!(lit, AvroLiteral::Boolean(true));
3300        assert_default_stored(&dt_bool, &Value::Bool(true));
3301        let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3302        let lit = dt_i32
3303            .parse_and_store_default(&serde_json::json!(123))
3304            .unwrap();
3305        assert_eq!(lit, AvroLiteral::Int(123));
3306        assert_default_stored(&dt_i32, &serde_json::json!(123));
3307        let err = dt_i32
3308            .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
3309            .unwrap_err();
3310        assert!(format!("{err}").contains("out of i32 range"));
3311        let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3312        let lit = dt_i64
3313            .parse_and_store_default(&serde_json::json!(1234567890))
3314            .unwrap();
3315        assert_eq!(lit, AvroLiteral::Long(1234567890));
3316        assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
3317        let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
3318        let lit = dt_f32
3319            .parse_and_store_default(&serde_json::json!(1.25))
3320            .unwrap();
3321        assert_eq!(lit, AvroLiteral::Float(1.25));
3322        assert_default_stored(&dt_f32, &serde_json::json!(1.25));
3323        let err = dt_f32
3324            .parse_and_store_default(&serde_json::json!(1e39))
3325            .unwrap_err();
3326        assert!(format!("{err}").contains("out of f32 range"));
3327        let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3328        let lit = dt_f64
3329            .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
3330            .unwrap();
3331        assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
3332        assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
3333        let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
3334        let l = dt_str
3335            .parse_and_store_default(&json_string("hello"))
3336            .unwrap();
3337        assert_eq!(l, AvroLiteral::String("hello".into()));
3338        assert_default_stored(&dt_str, &json_string("hello"));
3339        let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
3340        let l = dt_strv
3341            .parse_and_store_default(&json_string("view"))
3342            .unwrap();
3343        assert_eq!(l, AvroLiteral::String("view".into()));
3344        assert_default_stored(&dt_strv, &json_string("view"));
3345        let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
3346        let l = dt_uuid
3347            .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
3348            .unwrap();
3349        assert_eq!(
3350            l,
3351            AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
3352        );
3353        let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
3354        let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
3355        assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
3356        let err = dt_bin
3357            .parse_and_store_default(&json_string("€")) // U+20AC
3358            .unwrap_err();
3359        assert!(format!("{err}").contains("Invalid codepoint"));
3360        let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
3361        let ld = dt_date
3362            .parse_and_store_default(&serde_json::json!(1))
3363            .unwrap();
3364        assert_eq!(ld, AvroLiteral::Int(1));
3365        let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
3366        let lt = dt_tmill
3367            .parse_and_store_default(&serde_json::json!(86_400_000))
3368            .unwrap();
3369        assert_eq!(lt, AvroLiteral::Int(86_400_000));
3370        let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
3371        let ltm = dt_tmicros
3372            .parse_and_store_default(&serde_json::json!(1_000_000))
3373            .unwrap();
3374        assert_eq!(ltm, AvroLiteral::Long(1_000_000));
3375        let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(None), HashMap::new(), None);
3376        let l1 = dt_ts_milli
3377            .parse_and_store_default(&serde_json::json!(123))
3378            .unwrap();
3379        assert_eq!(l1, AvroLiteral::Long(123));
3380        let mut dt_ts_micro = AvroDataType::new(Codec::TimestampMicros(None), HashMap::new(), None);
3381        let l2 = dt_ts_micro
3382            .parse_and_store_default(&serde_json::json!(456))
3383            .unwrap();
3384        assert_eq!(l2, AvroLiteral::Long(456));
3385    }
3386
3387    #[cfg(feature = "avro_custom_types")]
3388    #[test]
3389    fn test_validate_and_store_default_custom_integer_ranges() {
3390        let mut dt_i8 = AvroDataType::new(Codec::Int8, HashMap::new(), None);
3391        let lit_i8 = dt_i8
3392            .parse_and_store_default(&serde_json::json!(i8::MAX))
3393            .unwrap();
3394        assert_eq!(lit_i8, AvroLiteral::Int(i8::MAX as i32));
3395        let err_i8_high = dt_i8
3396            .parse_and_store_default(&serde_json::json!(i8::MAX as i64 + 1))
3397            .unwrap_err();
3398        assert!(err_i8_high.to_string().contains("out of i8 range"));
3399        let err_i8_low = dt_i8
3400            .parse_and_store_default(&serde_json::json!(i8::MIN as i64 - 1))
3401            .unwrap_err();
3402        assert!(err_i8_low.to_string().contains("out of i8 range"));
3403
3404        let mut dt_i16 = AvroDataType::new(Codec::Int16, HashMap::new(), None);
3405        let lit_i16 = dt_i16
3406            .parse_and_store_default(&serde_json::json!(i16::MIN))
3407            .unwrap();
3408        assert_eq!(lit_i16, AvroLiteral::Int(i16::MIN as i32));
3409        let err_i16_high = dt_i16
3410            .parse_and_store_default(&serde_json::json!(i16::MAX as i64 + 1))
3411            .unwrap_err();
3412        assert!(err_i16_high.to_string().contains("out of i16 range"));
3413        let err_i16_low = dt_i16
3414            .parse_and_store_default(&serde_json::json!(i16::MIN as i64 - 1))
3415            .unwrap_err();
3416        assert!(err_i16_low.to_string().contains("out of i16 range"));
3417
3418        let mut dt_u8 = AvroDataType::new(Codec::UInt8, HashMap::new(), None);
3419        let lit_u8 = dt_u8
3420            .parse_and_store_default(&serde_json::json!(u8::MAX))
3421            .unwrap();
3422        assert_eq!(lit_u8, AvroLiteral::Int(u8::MAX as i32));
3423        let err_u8_neg = dt_u8
3424            .parse_and_store_default(&serde_json::json!(-1))
3425            .unwrap_err();
3426        assert!(err_u8_neg.to_string().contains("out of u8 range"));
3427        let err_u8_high = dt_u8
3428            .parse_and_store_default(&serde_json::json!(u8::MAX as i64 + 1))
3429            .unwrap_err();
3430        assert!(err_u8_high.to_string().contains("out of u8 range"));
3431
3432        let mut dt_u16 = AvroDataType::new(Codec::UInt16, HashMap::new(), None);
3433        let lit_u16 = dt_u16
3434            .parse_and_store_default(&serde_json::json!(u16::MAX))
3435            .unwrap();
3436        assert_eq!(lit_u16, AvroLiteral::Int(u16::MAX as i32));
3437        let err_u16_neg = dt_u16
3438            .parse_and_store_default(&serde_json::json!(-1))
3439            .unwrap_err();
3440        assert!(err_u16_neg.to_string().contains("out of u16 range"));
3441        let err_u16_high = dt_u16
3442            .parse_and_store_default(&serde_json::json!(u16::MAX as i64 + 1))
3443            .unwrap_err();
3444        assert!(err_u16_high.to_string().contains("out of u16 range"));
3445
3446        let mut dt_u32 = AvroDataType::new(Codec::UInt32, HashMap::new(), None);
3447        let lit_u32 = dt_u32
3448            .parse_and_store_default(&serde_json::json!(u32::MAX as i64))
3449            .unwrap();
3450        assert_eq!(lit_u32, AvroLiteral::Long(u32::MAX as i64));
3451        let err_u32_neg = dt_u32
3452            .parse_and_store_default(&serde_json::json!(-1))
3453            .unwrap_err();
3454        assert!(err_u32_neg.to_string().contains("out of u32 range"));
3455        let err_u32_high = dt_u32
3456            .parse_and_store_default(&serde_json::json!(u32::MAX as i64 + 1))
3457            .unwrap_err();
3458        assert!(err_u32_high.to_string().contains("out of u32 range"));
3459    }
3460
3461    #[test]
3462    fn test_validate_and_store_default_fixed_decimal_interval() {
3463        let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
3464        let l = dt_fixed
3465            .parse_and_store_default(&json_string("WXYZ"))
3466            .unwrap();
3467        assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
3468        let err = dt_fixed
3469            .parse_and_store_default(&json_string("TOO LONG"))
3470            .unwrap_err();
3471        assert!(err.to_string().contains("Default length"));
3472        let mut dt_dec_fixed =
3473            AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
3474        let l = dt_dec_fixed
3475            .parse_and_store_default(&json_string("abc"))
3476            .unwrap();
3477        assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
3478        let err = dt_dec_fixed
3479            .parse_and_store_default(&json_string("toolong"))
3480            .unwrap_err();
3481        assert!(err.to_string().contains("Default length"));
3482        let mut dt_dec_bytes =
3483            AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3484        let l = dt_dec_bytes
3485            .parse_and_store_default(&json_string("freeform"))
3486            .unwrap();
3487        assert_eq!(
3488            l,
3489            AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
3490        );
3491        let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
3492        let l = dt_interval
3493            .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
3494            .unwrap();
3495        assert_eq!(
3496            l,
3497            AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
3498        );
3499        let err = dt_interval
3500            .parse_and_store_default(&json_string("short"))
3501            .unwrap_err();
3502        assert!(err.to_string().contains("Default length"));
3503    }
3504
3505    #[test]
3506    fn test_validate_and_store_default_enum_list_map_struct() {
3507        let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
3508            .into_iter()
3509            .collect();
3510        let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
3511        let l = dt_enum
3512            .parse_and_store_default(&json_string("GREEN"))
3513            .unwrap();
3514        assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
3515        let err = dt_enum
3516            .parse_and_store_default(&json_string("YELLOW"))
3517            .unwrap_err();
3518        assert!(err.to_string().contains("Default enum symbol"));
3519        let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3520        let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
3521        let val = serde_json::json!([1, 2, 3]);
3522        let l = dt_list.parse_and_store_default(&val).unwrap();
3523        assert_eq!(
3524            l,
3525            AvroLiteral::Array(vec![
3526                AvroLiteral::Long(1),
3527                AvroLiteral::Long(2),
3528                AvroLiteral::Long(3)
3529            ])
3530        );
3531        let err = dt_list
3532            .parse_and_store_default(&serde_json::json!({"not":"array"}))
3533            .unwrap_err();
3534        assert!(err.to_string().contains("JSON array"));
3535        let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3536        let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
3537        let mv = serde_json::json!({"x": 1.5, "y": 2.5});
3538        let l = dt_map.parse_and_store_default(&mv).unwrap();
3539        let mut expected = IndexMap::new();
3540        expected.insert("x".into(), AvroLiteral::Double(1.5));
3541        expected.insert("y".into(), AvroLiteral::Double(2.5));
3542        assert_eq!(l, AvroLiteral::Map(expected));
3543        // Not object -> error
3544        let err = dt_map
3545            .parse_and_store_default(&serde_json::json!(123))
3546            .unwrap_err();
3547        assert!(err.to_string().contains("JSON object"));
3548        let mut field_a = AvroField {
3549            name: "a".into(),
3550            data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
3551        };
3552        let field_b = AvroField {
3553            name: "b".into(),
3554            data_type: AvroDataType::new(
3555                Codec::Int64,
3556                HashMap::new(),
3557                Some(Nullability::NullFirst),
3558            ),
3559        };
3560        let mut c_md = HashMap::new();
3561        c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
3562        let field_c = AvroField {
3563            name: "c".into(),
3564            data_type: AvroDataType::new(Codec::Utf8, c_md, None),
3565        };
3566        field_a.data_type.metadata.insert("doc".into(), "na".into());
3567        let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
3568        let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
3569        let default_obj = serde_json::json!({"a": 7});
3570        let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
3571        let mut expected = IndexMap::new();
3572        expected.insert("a".into(), AvroLiteral::Int(7));
3573        expected.insert("b".into(), AvroLiteral::Null);
3574        expected.insert("c".into(), AvroLiteral::String("xyz".into()));
3575        assert_eq!(l, AvroLiteral::Map(expected));
3576        assert_default_stored(&dt_struct, &default_obj);
3577        let req_field = AvroField {
3578            name: "req".into(),
3579            data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
3580        };
3581        let mut dt_bad = AvroDataType::new(
3582            Codec::Struct(Arc::from(vec![req_field])),
3583            HashMap::new(),
3584            None,
3585        );
3586        let err = dt_bad
3587            .parse_and_store_default(&serde_json::json!({}))
3588            .unwrap_err();
3589        assert!(
3590            err.to_string().contains("missing required subfield 'req'"),
3591            "unexpected error: {err}"
3592        );
3593        let err = dt_struct
3594            .parse_and_store_default(&serde_json::json!(10))
3595            .unwrap_err();
3596        err.to_string().contains("must be a JSON object");
3597    }
3598
3599    #[test]
3600    fn test_resolve_array_promotion_and_reader_metadata() {
3601        let mut w_add: HashMap<&str, Value> = HashMap::new();
3602        w_add.insert("who", json_string("writer"));
3603        let mut r_add: HashMap<&str, Value> = HashMap::new();
3604        r_add.insert("who", json_string("reader"));
3605        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3606            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3607            attributes: Attributes {
3608                logical_type: None,
3609                additional: w_add,
3610            },
3611        }));
3612        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3613            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
3614            attributes: Attributes {
3615                logical_type: None,
3616                additional: r_add,
3617            },
3618        }));
3619        let mut maker = Maker::new(false, false, Tz::default());
3620        let dt = maker
3621            .make_data_type(&writer_schema, Some(&reader_schema), None)
3622            .unwrap();
3623        assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
3624        if let Codec::List(inner) = dt.codec() {
3625            assert!(matches!(inner.codec(), Codec::Int64));
3626            assert_eq!(
3627                inner.resolution,
3628                Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3629            );
3630        } else {
3631            panic!("expected list codec");
3632        }
3633    }
3634
3635    #[test]
3636    fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
3637        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3638            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3639            attributes: Attributes::default(),
3640        }));
3641        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3642            items: Box::new(mk_union(vec![
3643                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3644                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3645            ])),
3646            attributes: Attributes::default(),
3647        }));
3648        let mut maker = Maker::new(false, false, Tz::default());
3649        let dt = maker
3650            .make_data_type(&writer_schema, Some(&reader_schema), None)
3651            .unwrap();
3652        if let Codec::List(inner) = dt.codec() {
3653            assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
3654            assert!(matches!(inner.codec(), Codec::Int32));
3655            match inner.resolution.as_ref() {
3656                Some(ResolutionInfo::Promotion(Promotion::Direct)) => {}
3657                other => panic!("expected Union resolution, got {other:?}"),
3658            }
3659        } else {
3660            panic!("expected List codec");
3661        }
3662    }
3663
3664    #[test]
3665    fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3666        let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3667            name: "MD5",
3668            namespace: None,
3669            aliases: vec!["Hash16"],
3670            size: 16,
3671            attributes: Attributes::default(),
3672        }));
3673        let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3674            name: "Hash16",
3675            namespace: None,
3676            aliases: vec![],
3677            size: 16,
3678            attributes: Attributes::default(),
3679        }));
3680        let mut maker = Maker::new(false, false, Tz::default());
3681        let dt = maker
3682            .make_data_type(&writer_schema, Some(&reader_schema), None)
3683            .unwrap();
3684        assert!(matches!(dt.codec(), Codec::Fixed(16)));
3685    }
3686
3687    #[cfg(feature = "avro_custom_types")]
3688    #[test]
3689    fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3690        let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3691            name: "ArrowIntervalMDN",
3692            namespace: None,
3693            aliases: vec![],
3694            size: 16,
3695            attributes: Attributes {
3696                logical_type: Some("arrow.interval-month-day-nano"),
3697                additional: Default::default(),
3698            },
3699        }));
3700        let mut maker = Maker::new(false, false, Default::default());
3701        let dt = maker.make_data_type(&schema, None, None).unwrap();
3702        assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3703        assert_eq!(
3704            dt.codec.data_type(),
3705            DataType::Interval(IntervalUnit::MonthDayNano)
3706        );
3707    }
3708
3709    #[test]
3710    fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3711        let writer = Schema::Complex(ComplexType::Record(Record {
3712            name: "R",
3713            namespace: None,
3714            doc: None,
3715            aliases: vec![],
3716            fields: vec![
3717                crate::schema::Field {
3718                    name: "a",
3719                    doc: None,
3720                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3721                    default: None,
3722                    aliases: vec![],
3723                },
3724                crate::schema::Field {
3725                    name: "skipme",
3726                    doc: None,
3727                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3728                    default: None,
3729                    aliases: vec![],
3730                },
3731                crate::schema::Field {
3732                    name: "b",
3733                    doc: None,
3734                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3735                    default: None,
3736                    aliases: vec![],
3737                },
3738            ],
3739            attributes: Attributes::default(),
3740        }));
3741        let reader = Schema::Complex(ComplexType::Record(Record {
3742            name: "R",
3743            namespace: None,
3744            doc: None,
3745            aliases: vec![],
3746            fields: vec![
3747                crate::schema::Field {
3748                    name: "b",
3749                    doc: None,
3750                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3751                    default: None,
3752                    aliases: vec![],
3753                },
3754                crate::schema::Field {
3755                    name: "a",
3756                    doc: None,
3757                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3758                    default: None,
3759                    aliases: vec![],
3760                },
3761                crate::schema::Field {
3762                    name: "name",
3763                    doc: None,
3764                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3765                    default: Some(json_string("anon")),
3766                    aliases: vec![],
3767                },
3768                crate::schema::Field {
3769                    name: "opt",
3770                    doc: None,
3771                    r#type: Schema::Union(vec![
3772                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3773                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3774                    ]),
3775                    default: None, // should default to null because NullFirst
3776                    aliases: vec![],
3777                },
3778            ],
3779            attributes: Attributes::default(),
3780        }));
3781        let mut maker = Maker::new(false, false, Tz::default());
3782        let dt = maker
3783            .make_data_type(&writer, Some(&reader), None)
3784            .expect("record resolution");
3785        let fields = match dt.codec() {
3786            Codec::Struct(f) => f,
3787            other => panic!("expected struct, got {other:?}"),
3788        };
3789        assert_eq!(fields.len(), 4);
3790        assert_eq!(fields[0].name(), "b");
3791        assert_eq!(fields[1].name(), "a");
3792        assert_eq!(fields[2].name(), "name");
3793        assert_eq!(fields[3].name(), "opt");
3794        assert!(matches!(
3795            fields[1].data_type().resolution,
3796            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3797        ));
3798        let rec = match dt.resolution {
3799            Some(ResolutionInfo::Record(ref r)) => r.clone(),
3800            other => panic!("expected record resolution, got {other:?}"),
3801        };
3802        assert!(matches!(
3803            &rec.writer_fields[..],
3804            &[
3805                ResolvedField::ToReader(1),
3806                ResolvedField::Skip(_),
3807                ResolvedField::ToReader(0),
3808            ]
3809        ));
3810        assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3811        let ResolvedField::Skip(skip1) = &rec.writer_fields[1] else {
3812            panic!("should skip field 1")
3813        };
3814        assert!(matches!(skip1.codec(), Codec::Utf8));
3815        let name_md = &fields[2].data_type().metadata;
3816        assert_eq!(
3817            name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3818            Some(&"\"anon\"".to_string())
3819        );
3820        let opt_md = &fields[3].data_type().metadata;
3821        assert_eq!(
3822            opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3823            Some(&"null".to_string())
3824        );
3825    }
3826
3827    #[test]
3828    fn test_named_type_alias_resolution_record_cross_namespace() {
3829        let writer_record = Record {
3830            name: "PersonV2",
3831            namespace: Some("com.example.v2"),
3832            doc: None,
3833            aliases: vec!["com.example.Person"],
3834            fields: vec![
3835                AvroFieldSchema {
3836                    name: "name",
3837                    doc: None,
3838                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3839                    default: None,
3840                    aliases: vec![],
3841                },
3842                AvroFieldSchema {
3843                    name: "age",
3844                    doc: None,
3845                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3846                    default: None,
3847                    aliases: vec![],
3848                },
3849            ],
3850            attributes: Attributes::default(),
3851        };
3852        let reader_record = Record {
3853            name: "Person",
3854            namespace: Some("com.example"),
3855            doc: None,
3856            aliases: vec![],
3857            fields: writer_record.fields.clone(),
3858            attributes: Attributes::default(),
3859        };
3860        let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3861        let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3862        let mut maker = Maker::new(false, false, Tz::default());
3863        let result = maker
3864            .make_data_type(&writer_schema, Some(&reader_schema), None)
3865            .expect("record alias resolution should succeed");
3866        match result.codec {
3867            Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3868            other => panic!("expected struct, got {other:?}"),
3869        }
3870    }
3871
3872    #[test]
3873    fn test_named_type_alias_resolution_enum_cross_namespace() {
3874        let writer_enum = Enum {
3875            name: "ColorV2",
3876            namespace: Some("org.example.v2"),
3877            doc: None,
3878            aliases: vec!["org.example.Color"],
3879            symbols: vec!["RED", "GREEN", "BLUE"],
3880            default: None,
3881            attributes: Attributes::default(),
3882        };
3883        let reader_enum = Enum {
3884            name: "Color",
3885            namespace: Some("org.example"),
3886            doc: None,
3887            aliases: vec![],
3888            symbols: vec!["RED", "GREEN", "BLUE"],
3889            default: None,
3890            attributes: Attributes::default(),
3891        };
3892        let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3893        let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3894        let mut maker = Maker::new(false, false, Tz::default());
3895        maker
3896            .make_data_type(&writer_schema, Some(&reader_schema), None)
3897            .expect("enum alias resolution should succeed");
3898    }
3899
3900    #[test]
3901    fn test_named_type_alias_resolution_fixed_cross_namespace() {
3902        let writer_fixed = Fixed {
3903            name: "Fx10V2",
3904            namespace: Some("ns.v2"),
3905            aliases: vec!["ns.Fx10"],
3906            size: 10,
3907            attributes: Attributes::default(),
3908        };
3909        let reader_fixed = Fixed {
3910            name: "Fx10",
3911            namespace: Some("ns"),
3912            aliases: vec![],
3913            size: 10,
3914            attributes: Attributes::default(),
3915        };
3916        let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3917        let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3918        let mut maker = Maker::new(false, false, Tz::default());
3919        maker
3920            .make_data_type(&writer_schema, Some(&reader_schema), None)
3921            .expect("fixed alias resolution should succeed");
3922    }
3923}