Skip to main content

arrow_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Codec for Mapping Avro and Arrow types.
19
20use crate::schema::{
21    AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22    AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23    PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27    IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40/// Contains information about how to resolve differences between a writer's and a reader's schema.
41#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43    /// Indicates that the writer's type should be promoted to the reader's type.
44    Promotion(Promotion),
45    /// Indicates that a default value should be used for a field.
46    DefaultValue(AvroLiteral),
47    /// Provides mapping information for resolving enums.
48    EnumMapping(EnumMapping),
49    /// Provides resolution information for record fields.
50    Record(ResolvedRecord),
51    /// Provides mapping and shape info for resolving unions.
52    Union(ResolvedUnion),
53}
54
55/// Represents a literal Avro value.
56///
57/// This is used to represent default values in an Avro schema.
58#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60    /// Represents a null value.
61    Null,
62    /// Represents a boolean value.
63    Boolean(bool),
64    /// Represents an integer value.
65    Int(i32),
66    /// Represents a long value.
67    Long(i64),
68    /// Represents a float value.
69    Float(f32),
70    /// Represents a double value.
71    Double(f64),
72    /// Represents a bytes value.
73    Bytes(Vec<u8>),
74    /// Represents a string value.
75    String(String),
76    /// Represents an enum symbol.
77    Enum(String),
78    /// Represents a JSON array default for an Avro array, containing element literals.
79    Array(Vec<AvroLiteral>),
80    /// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals.
81    Map(IndexMap<String, AvroLiteral>),
82}
83
84/// Contains the necessary information to resolve a writer's record against a reader's record schema.
85#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87    /// Maps a writer's field index to the field's resolution against the reader's schema.
88    pub(crate) writer_fields: Arc<[ResolvedField]>,
89    /// A list of indices in the reader's schema for fields that have a default value.
90    pub(crate) default_fields: Arc<[usize]>,
91}
92
93/// Resolution information for record fields in the writer schema.
94#[derive(Debug, Clone, PartialEq)]
95pub(crate) enum ResolvedField {
96    /// Resolves to a field indexed in the reader schema.
97    /// The `AvroDataType` is the writer's type for this field, used by the Skipper
98    /// to correctly consume writer bytes when the whole record is being skipped.
99    ToReader(usize, AvroDataType),
100    /// For fields present in the writer's schema but not the reader's, this stores their data type.
101    /// This is needed to correctly skip over these fields during deserialization.
102    Skip(AvroDataType),
103}
104
105/// Defines the type of promotion to be applied during schema resolution.
106///
107/// Schema resolution may require promoting a writer's data type to a reader's data type.
108/// For example, an `int` can be promoted to a `long`, `float`, or `double`.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub(crate) enum Promotion {
111    /// Direct read with no data type promotion.
112    Direct,
113    /// Promotes an `int` to a `long`.
114    IntToLong,
115    /// Promotes an `int` to a `float`.
116    IntToFloat,
117    /// Promotes an `int` to a `double`.
118    IntToDouble,
119    /// Promotes a `long` to a `float`.
120    LongToFloat,
121    /// Promotes a `long` to a `double`.
122    LongToDouble,
123    /// Promotes a `float` to a `double`.
124    FloatToDouble,
125    /// Promotes a `string` to `bytes`.
126    StringToBytes,
127    /// Promotes `bytes` to a `string`.
128    BytesToString,
129}
130
131impl Display for Promotion {
132    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
133        match self {
134            Self::Direct => write!(formatter, "Direct"),
135            Self::IntToLong => write!(formatter, "Int->Long"),
136            Self::IntToFloat => write!(formatter, "Int->Float"),
137            Self::IntToDouble => write!(formatter, "Int->Double"),
138            Self::LongToFloat => write!(formatter, "Long->Float"),
139            Self::LongToDouble => write!(formatter, "Long->Double"),
140            Self::FloatToDouble => write!(formatter, "Float->Double"),
141            Self::StringToBytes => write!(formatter, "String->Bytes"),
142            Self::BytesToString => write!(formatter, "Bytes->String"),
143        }
144    }
145}
146
147/// Information required to resolve a writer union against a reader union (or single type).
148#[derive(Debug, Clone, PartialEq)]
149pub(crate) struct ResolvedUnion {
150    /// For each writer branch index, the reader branch index and how to read it.
151    /// `None` means the writer branch doesn't resolve against the reader.
152    pub(crate) writer_to_reader: Arc<[Option<(usize, ResolutionInfo)>]>,
153    /// Whether the writer schema at this site is a union
154    pub(crate) writer_is_union: bool,
155    /// Whether the reader schema at this site is a union
156    pub(crate) reader_is_union: bool,
157}
158
159/// Holds the mapping information for resolving Avro enums.
160///
161/// When resolving schemas, the writer's enum symbols must be mapped to the reader's symbols.
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub(crate) struct EnumMapping {
164    /// A mapping from the writer's symbol index to the reader's symbol index.
165    pub(crate) mapping: Arc<[i32]>,
166    /// The index to use for a writer's symbol that is not present in the reader's enum
167    /// and a default value is specified in the reader's schema.
168    pub(crate) default_index: i32,
169}
170
171#[cfg(feature = "canonical_extension_types")]
172fn with_extension_type(codec: &Codec, field: Field) -> Field {
173    match codec {
174        Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
175        _ => field,
176    }
177}
178
179/// An Avro datatype mapped to the arrow data model
180#[derive(Debug, Clone, PartialEq)]
181pub(crate) struct AvroDataType {
182    nullability: Option<Nullability>,
183    metadata: HashMap<String, String>,
184    codec: Codec,
185    pub(crate) resolution: Option<ResolutionInfo>,
186}
187
188impl AvroDataType {
189    /// Create a new [`AvroDataType`] with the given parts.
190    pub(crate) fn new(
191        codec: Codec,
192        metadata: HashMap<String, String>,
193        nullability: Option<Nullability>,
194    ) -> Self {
195        AvroDataType {
196            codec,
197            metadata,
198            nullability,
199            resolution: None,
200        }
201    }
202
203    #[inline]
204    fn new_with_resolution(
205        codec: Codec,
206        metadata: HashMap<String, String>,
207        nullability: Option<Nullability>,
208        resolution: Option<ResolutionInfo>,
209    ) -> Self {
210        Self {
211            codec,
212            metadata,
213            nullability,
214            resolution,
215        }
216    }
217
218    /// Returns an arrow [`Field`] with the given name
219    pub(crate) fn field_with_name(&self, name: &str) -> Field {
220        let mut nullable = self.nullability.is_some();
221        if !nullable {
222            if let Codec::Union(children, _, _) = self.codec() {
223                // If any encoded branch is `null`, mark field as nullable
224                if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
225                    nullable = true;
226                }
227            }
228        }
229        let data_type = self.codec.data_type();
230        let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
231        #[cfg(feature = "canonical_extension_types")]
232        return with_extension_type(&self.codec, field);
233        #[cfg(not(feature = "canonical_extension_types"))]
234        field
235    }
236
237    /// Returns a reference to the codec used by this data type
238    ///
239    /// The codec determines how Avro data is encoded and mapped to Arrow data types.
240    /// This is useful when we need to inspect or use the specific encoding of a field.
241    pub(crate) fn codec(&self) -> &Codec {
242        &self.codec
243    }
244
245    /// Returns the nullability status of this data type
246    ///
247    /// In Avro, nullability is represented through unions with null types.
248    /// The returned value indicates how nulls are encoded in the Avro format:
249    /// - `Some(Nullability::NullFirst)` - Nulls are encoded as the first union variant
250    /// - `Some(Nullability::NullSecond)` - Nulls are encoded as the second union variant
251    /// - `None` - The type is not nullable
252    pub(crate) fn nullability(&self) -> Option<Nullability> {
253        self.nullability
254    }
255
256    #[inline]
257    fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
258        fn expect_string<'v>(
259            default_json: &'v Value,
260            data_type: &str,
261        ) -> Result<&'v str, ArrowError> {
262            match default_json {
263                Value::String(s) => Ok(s.as_str()),
264                _ => Err(ArrowError::SchemaError(format!(
265                    "Default value must be a JSON string for {data_type}"
266                ))),
267            }
268        }
269
270        fn parse_bytes_default(
271            default_json: &Value,
272            expected_len: Option<usize>,
273        ) -> Result<Vec<u8>, ArrowError> {
274            let s = expect_string(default_json, "bytes/fixed logical types")?;
275            let mut out = Vec::with_capacity(s.len());
276            for ch in s.chars() {
277                let cp = ch as u32;
278                if cp > 0xFF {
279                    return Err(ArrowError::SchemaError(format!(
280                        "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
281                    )));
282                }
283                out.push(cp as u8);
284            }
285            if let Some(len) = expected_len {
286                if out.len() != len {
287                    return Err(ArrowError::SchemaError(format!(
288                        "Default length {} does not match expected fixed size {len}",
289                        out.len(),
290                    )));
291                }
292            }
293            Ok(out)
294        }
295
296        fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
297            match default_json {
298                Value::Number(n) => n.as_i64().ok_or_else(|| {
299                    ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
300                }),
301                _ => Err(ArrowError::SchemaError(format!(
302                    "Default {data_type} must be a JSON integer"
303                ))),
304            }
305        }
306
307        fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
308            match default_json {
309                Value::Number(n) => n.as_f64().ok_or_else(|| {
310                    ArrowError::SchemaError(format!("Default {data_type} must be a number"))
311                }),
312                _ => Err(ArrowError::SchemaError(format!(
313                    "Default {data_type} must be a JSON number"
314                ))),
315            }
316        }
317
318        // Handle JSON nulls per-spec: allowed only for `null` type or unions with null FIRST
319        if default_json.is_null() {
320            return match self.codec() {
321                Codec::Null => Ok(AvroLiteral::Null),
322                Codec::Union(encodings, _, _) if !encodings.is_empty()
323                    && matches!(encodings[0].codec(), Codec::Null) =>
324                    {
325                        Ok(AvroLiteral::Null)
326                    }
327                _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
328                _ => Err(ArrowError::SchemaError(
329                    "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
330                        .to_string(),
331                )),
332            };
333        }
334        let lit = match self.codec() {
335            Codec::Null => {
336                return Err(ArrowError::SchemaError(
337                    "Default for `null` type must be JSON null".to_string(),
338                ));
339            }
340            Codec::Boolean => match default_json {
341                Value::Bool(b) => AvroLiteral::Boolean(*b),
342                _ => {
343                    return Err(ArrowError::SchemaError(
344                        "Boolean default must be a JSON boolean".to_string(),
345                    ));
346                }
347            },
348            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
349                let i = parse_json_i64(default_json, "int")?;
350                if i < i32::MIN as i64 || i > i32::MAX as i64 {
351                    return Err(ArrowError::SchemaError(format!(
352                        "Default int {i} out of i32 range"
353                    )));
354                }
355                AvroLiteral::Int(i as i32)
356            }
357            Codec::Int64
358            | Codec::TimeMicros
359            | Codec::TimestampMillis(_)
360            | Codec::TimestampMicros(_)
361            | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
362            #[cfg(feature = "avro_custom_types")]
363            Codec::DurationNanos
364            | Codec::DurationMicros
365            | Codec::DurationMillis
366            | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
367            #[cfg(feature = "avro_custom_types")]
368            Codec::Int8 => {
369                let i = parse_json_i64(default_json, "int")?;
370                if i < i8::MIN as i64 || i > i8::MAX as i64 {
371                    return Err(ArrowError::SchemaError(format!(
372                        "Default int8 {i} out of i8 range"
373                    )));
374                }
375                AvroLiteral::Int(i as i32)
376            }
377            #[cfg(feature = "avro_custom_types")]
378            Codec::Int16 => {
379                let i = parse_json_i64(default_json, "int")?;
380                if i < i16::MIN as i64 || i > i16::MAX as i64 {
381                    return Err(ArrowError::SchemaError(format!(
382                        "Default int16 {i} out of i16 range"
383                    )));
384                }
385                AvroLiteral::Int(i as i32)
386            }
387            #[cfg(feature = "avro_custom_types")]
388            Codec::UInt8 => {
389                let i = parse_json_i64(default_json, "int")?;
390                if i < 0 || i > u8::MAX as i64 {
391                    return Err(ArrowError::SchemaError(format!(
392                        "Default uint8 {i} out of u8 range"
393                    )));
394                }
395                AvroLiteral::Int(i as i32)
396            }
397            #[cfg(feature = "avro_custom_types")]
398            Codec::UInt16 => {
399                let i = parse_json_i64(default_json, "int")?;
400                if i < 0 || i > u16::MAX as i64 {
401                    return Err(ArrowError::SchemaError(format!(
402                        "Default uint16 {i} out of u16 range"
403                    )));
404                }
405                AvroLiteral::Int(i as i32)
406            }
407            #[cfg(feature = "avro_custom_types")]
408            Codec::UInt32 => {
409                let i = parse_json_i64(default_json, "long")?;
410                if i < 0 || i > u32::MAX as i64 {
411                    return Err(ArrowError::SchemaError(format!(
412                        "Default uint32 {i} out of u32 range"
413                    )));
414                }
415                AvroLiteral::Long(i)
416            }
417            #[cfg(feature = "avro_custom_types")]
418            Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
419                AvroLiteral::Long(parse_json_i64(default_json, "long")?)
420            }
421            #[cfg(feature = "avro_custom_types")]
422            Codec::UInt64 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?),
423            #[cfg(feature = "avro_custom_types")]
424            Codec::Float16 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(2))?),
425            #[cfg(feature = "avro_custom_types")]
426            Codec::Time32Secs => {
427                let i = parse_json_i64(default_json, "int")?;
428                if i < i32::MIN as i64 || i > i32::MAX as i64 {
429                    return Err(ArrowError::SchemaError(format!(
430                        "Default time32-secs {i} out of i32 range"
431                    )));
432                }
433                AvroLiteral::Int(i as i32)
434            }
435            #[cfg(feature = "avro_custom_types")]
436            Codec::IntervalYearMonth => {
437                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
438            }
439            #[cfg(feature = "avro_custom_types")]
440            Codec::IntervalMonthDayNano => {
441                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
442            }
443            #[cfg(feature = "avro_custom_types")]
444            Codec::IntervalDayTime => {
445                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
446            }
447            Codec::Float32 => {
448                let f = parse_json_f64(default_json, "float")?;
449                if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
450                    return Err(ArrowError::SchemaError(format!(
451                        "Default float {f} out of f32 range or not finite"
452                    )));
453                }
454                AvroLiteral::Float(f as f32)
455            }
456            Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
457            Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
458                AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
459            }
460            Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
461            Codec::Fixed(sz) => {
462                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
463            }
464            Codec::Decimal(_, _, fixed_size) => {
465                AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
466            }
467            Codec::Enum(symbols) => {
468                let s = expect_string(default_json, "enum")?;
469                if symbols.iter().any(|sym| sym == s) {
470                    AvroLiteral::Enum(s.to_string())
471                } else {
472                    return Err(ArrowError::SchemaError(format!(
473                        "Default enum symbol {s:?} not found in reader enum symbols"
474                    )));
475                }
476            }
477            Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
478            Codec::List(item_dt) => match default_json {
479                Value::Array(items) => AvroLiteral::Array(
480                    items
481                        .iter()
482                        .map(|v| item_dt.parse_default_literal(v))
483                        .collect::<Result<_, _>>()?,
484                ),
485                _ => {
486                    return Err(ArrowError::SchemaError(
487                        "Default value must be a JSON array for Avro array type".to_string(),
488                    ));
489                }
490            },
491            Codec::Map(val_dt) => match default_json {
492                Value::Object(map) => {
493                    let mut out = IndexMap::with_capacity(map.len());
494                    for (k, v) in map {
495                        out.insert(k.clone(), val_dt.parse_default_literal(v)?);
496                    }
497                    AvroLiteral::Map(out)
498                }
499                _ => {
500                    return Err(ArrowError::SchemaError(
501                        "Default value must be a JSON object for Avro map type".to_string(),
502                    ));
503                }
504            },
505            Codec::Struct(fields) => match default_json {
506                Value::Object(obj) => {
507                    let mut out: IndexMap<String, AvroLiteral> =
508                        IndexMap::with_capacity(fields.len());
509                    for f in fields.as_ref() {
510                        let name = f.name().to_string();
511                        if let Some(sub) = obj.get(&name) {
512                            out.insert(name, f.data_type().parse_default_literal(sub)?);
513                        } else {
514                            // Cache metadata lookup once
515                            let stored_default =
516                                f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
517                            if stored_default.is_none()
518                                && f.data_type().nullability() == Some(Nullability::default())
519                            {
520                                out.insert(name, AvroLiteral::Null);
521                            } else if let Some(default_json) = stored_default {
522                                let v: Value =
523                                    serde_json::from_str(default_json).map_err(|e| {
524                                        ArrowError::SchemaError(format!(
525                                            "Failed to parse stored subfield default JSON for '{}': {e}",
526                                            f.name(),
527                                        ))
528                                    })?;
529                                out.insert(name, f.data_type().parse_default_literal(&v)?);
530                            } else {
531                                return Err(ArrowError::SchemaError(format!(
532                                    "Record default missing required subfield '{}' with non-nullable type {:?}",
533                                    f.name(),
534                                    f.data_type().codec()
535                                )));
536                            }
537                        }
538                    }
539                    AvroLiteral::Map(out)
540                }
541                _ => {
542                    return Err(ArrowError::SchemaError(
543                        "Default value for record/struct must be a JSON object".to_string(),
544                    ));
545                }
546            },
547            Codec::Union(encodings, _, _) => {
548                let Some(default_encoding) = encodings.first() else {
549                    return Err(ArrowError::SchemaError(
550                        "Union with no branches cannot have a default".to_string(),
551                    ));
552                };
553                default_encoding.parse_default_literal(default_json)?
554            }
555            #[cfg(feature = "avro_custom_types")]
556            Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
557        };
558        Ok(lit)
559    }
560
561    fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
562        let json_text = serde_json::to_string(default_json).map_err(|e| {
563            ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
564        })?;
565        self.metadata
566            .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
567        Ok(())
568    }
569
570    fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
571        let lit = self.parse_default_literal(default_json)?;
572        self.store_default(default_json)?;
573        Ok(lit)
574    }
575}
576
577/// A named [`AvroDataType`]
578#[derive(Debug, Clone, PartialEq)]
579pub(crate) struct AvroField {
580    name: String,
581    data_type: AvroDataType,
582}
583
584impl AvroField {
585    /// Returns the arrow [`Field`]
586    pub(crate) fn field(&self) -> Field {
587        self.data_type.field_with_name(&self.name)
588    }
589
590    /// Returns the [`AvroDataType`]
591    pub(crate) fn data_type(&self) -> &AvroDataType {
592        &self.data_type
593    }
594
595    /// Returns a new [`AvroField`] with Utf8View support enabled
596    ///
597    /// This will convert any Utf8 codecs to Utf8View codecs. This method is used to
598    /// enable potential performance optimizations in string-heavy workloads by using
599    /// Arrow's StringViewArray data structure.
600    ///
601    /// Returns a new `AvroField` with the same structure, but with string types
602    /// converted to use `Utf8View` instead of `Utf8`.
603    pub(crate) fn with_utf8view(&self) -> Self {
604        let mut field = self.clone();
605        if let Codec::Utf8 = field.data_type.codec {
606            field.data_type.codec = Codec::Utf8View;
607        }
608        field
609    }
610
611    /// Returns the name of this Avro field
612    ///
613    /// This is the field name as defined in the Avro schema.
614    /// It's used to identify fields within a record structure.
615    pub(crate) fn name(&self) -> &str {
616        &self.name
617    }
618}
619
620impl<'a> TryFrom<&Schema<'a>> for AvroField {
621    type Error = ArrowError;
622
623    fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
624        match schema {
625            Schema::Complex(ComplexType::Record(r)) => {
626                let mut resolver = Maker::new(false, false, Tz::default());
627                let data_type = resolver.make_data_type(schema, None, None)?;
628                Ok(AvroField {
629                    data_type,
630                    name: r.name.to_string(),
631                })
632            }
633            _ => Err(ArrowError::ParseError(format!(
634                "Expected record got {schema:?}"
635            ))),
636        }
637    }
638}
639
640/// Builder for an [`AvroField`]
641#[derive(Debug)]
642pub(crate) struct AvroFieldBuilder<'a> {
643    writer_schema: &'a Schema<'a>,
644    reader_schema: Option<&'a Schema<'a>>,
645    use_utf8view: bool,
646    strict_mode: bool,
647    tz: Tz,
648}
649
650impl<'a> AvroFieldBuilder<'a> {
651    /// Creates a new [`AvroFieldBuilder`] for a given writer schema.
652    pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
653        Self {
654            writer_schema,
655            reader_schema: None,
656            use_utf8view: false,
657            strict_mode: false,
658            tz: Tz::default(),
659        }
660    }
661
662    /// Sets the reader schema for schema resolution.
663    ///
664    /// If a reader schema is provided, the builder will produce a resolved `AvroField`
665    /// that can handle differences between the writer's and reader's schemas.
666    #[inline]
667    pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
668        self.reader_schema = Some(reader_schema);
669        self
670    }
671
672    /// Enable or disable Utf8View support
673    pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
674        self.use_utf8view = use_utf8view;
675        self
676    }
677
678    /// Enable or disable strict mode.
679    pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
680        self.strict_mode = strict_mode;
681        self
682    }
683
684    /// Sets the timezone representation for timestamps.
685    pub(crate) fn with_tz(mut self, tz: Tz) -> Self {
686        self.tz = tz;
687        self
688    }
689
690    /// Build an [`AvroField`] from the builder
691    pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
692        match self.writer_schema {
693            Schema::Complex(ComplexType::Record(r)) => {
694                let mut resolver = Maker::new(self.use_utf8view, self.strict_mode, self.tz);
695                let data_type =
696                    resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
697                Ok(AvroField {
698                    name: r.name.to_string(),
699                    data_type,
700                })
701            }
702            _ => Err(ArrowError::ParseError(format!(
703                "Expected a Record schema to build an AvroField, but got {:?}",
704                self.writer_schema
705            ))),
706        }
707    }
708}
709
710/// Timezone representation for timestamps.
711///
712/// Avro only distinguishes between UTC and local time (no timezone), but Arrow supports
713/// any of the two identifiers of the UTC timezone: "+00:00" and "UTC".
714/// The data types using these time zone IDs behave identically, but are not logically equal.
715#[derive(Debug, Copy, Clone, PartialEq, Default)]
716pub enum Tz {
717    /// Represent Avro `timestamp-*` logical types with "+00:00" timezone ID
718    #[default]
719    OffsetZero,
720    /// Represent Avro `timestamp-*` logical types with "UTC" timezone ID
721    Utc,
722}
723
724impl Tz {
725    /// Returns the string identifier for this timezone representation
726    pub fn as_str(&self) -> &'static str {
727        match self {
728            Self::OffsetZero => "+00:00",
729            Self::Utc => "UTC",
730        }
731    }
732}
733
734impl Display for Tz {
735    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
736        f.write_str(self.as_str())
737    }
738}
739
740/// An Avro encoding
741///
742/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
743#[derive(Debug, Clone, PartialEq)]
744pub(crate) enum Codec {
745    /// Represents Avro null type, maps to Arrow's Null data type
746    Null,
747    /// Represents Avro boolean type, maps to Arrow's Boolean data type
748    Boolean,
749    /// Represents Avro int type, maps to Arrow's Int32 data type
750    Int32,
751    /// Represents Avro long type, maps to Arrow's Int64 data type
752    Int64,
753    /// Represents Avro float type, maps to Arrow's Float32 data type
754    Float32,
755    /// Represents Avro double type, maps to Arrow's Float64 data type
756    Float64,
757    /// Represents Avro bytes type, maps to Arrow's Binary data type
758    Binary,
759    /// String data represented as UTF-8 encoded bytes, corresponding to Arrow's StringArray
760    Utf8,
761    /// String data represented as UTF-8 encoded bytes with an optimized view representation,
762    /// corresponding to Arrow's StringViewArray which provides better performance for string operations
763    ///
764    /// The Utf8View option can be enabled via `ReadOptions::use_utf8view`.
765    Utf8View,
766    /// Represents Avro date logical type, maps to Arrow's Date32 data type
767    Date32,
768    /// Represents Avro time-millis logical type, maps to Arrow's Time32(TimeUnit::Millisecond) data type
769    TimeMillis,
770    /// Represents Avro time-micros logical type, maps to Arrow's Time64(TimeUnit::Microsecond) data type
771    TimeMicros,
772    /// Represents Avro timestamp-millis or local-timestamp-millis logical type
773    ///
774    /// Maps to Arrow's Timestamp(TimeUnit::Millisecond) data type
775    /// The parameter indicates whether the timestamp has a UTC timezone (Some) or is local time (None)
776    TimestampMillis(Option<Tz>),
777    /// Represents Avro timestamp-micros or local-timestamp-micros logical type
778    ///
779    /// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
780    /// The parameter indicates whether the timestamp has a UTC timezone (Some) or is local time (None)
781    TimestampMicros(Option<Tz>),
782    /// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
783    ///
784    /// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
785    /// The parameter indicates whether the timestamp has a UTC timezone (Some) or is local time (None)
786    TimestampNanos(Option<Tz>),
787    /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
788    /// The i32 parameter indicates the fixed binary size
789    Fixed(i32),
790    /// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types
791    ///
792    /// The fields are `(precision, scale, fixed_size)`.
793    /// - `precision` (`usize`): Total number of digits.
794    /// - `scale` (`Option<usize>`): Number of fractional digits.
795    /// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
796    Decimal(usize, Option<usize>, Option<usize>),
797    /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16.
798    Uuid,
799    /// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type.
800    ///
801    /// The enclosed value contains the enum's symbols.
802    Enum(Arc<[String]>),
803    /// Represents Avro array type, maps to Arrow's List data type
804    List(Arc<AvroDataType>),
805    /// Represents Avro record type, maps to Arrow's Struct data type
806    Struct(Arc<[AvroField]>),
807    /// Represents Avro map type, maps to Arrow's Map data type
808    Map(Arc<AvroDataType>),
809    /// Represents Avro duration logical type, maps to Arrow's Interval(IntervalUnit::MonthDayNano) data type
810    Interval,
811    /// Represents Avro union type, maps to Arrow's Union data type
812    Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
813    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Nanosecond)
814    #[cfg(feature = "avro_custom_types")]
815    DurationNanos,
816    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Microsecond)
817    #[cfg(feature = "avro_custom_types")]
818    DurationMicros,
819    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Millisecond)
820    #[cfg(feature = "avro_custom_types")]
821    DurationMillis,
822    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Second)
823    #[cfg(feature = "avro_custom_types")]
824    DurationSeconds,
825    #[cfg(feature = "avro_custom_types")]
826    RunEndEncoded(Arc<AvroDataType>, u8),
827    /// Arrow Int8 custom logical type (arrow.int8)
828    #[cfg(feature = "avro_custom_types")]
829    Int8,
830    /// Arrow Int16 custom logical type (arrow.int16)
831    #[cfg(feature = "avro_custom_types")]
832    Int16,
833    /// Arrow UInt8 custom logical type (arrow.uint8)
834    #[cfg(feature = "avro_custom_types")]
835    UInt8,
836    /// Arrow UInt16 custom logical type (arrow.uint16)
837    #[cfg(feature = "avro_custom_types")]
838    UInt16,
839    /// Arrow UInt32 custom logical type (arrow.uint32)
840    #[cfg(feature = "avro_custom_types")]
841    UInt32,
842    /// Arrow UInt64 custom logical type (arrow.uint64) - stored as fixed(8)
843    #[cfg(feature = "avro_custom_types")]
844    UInt64,
845    /// Arrow Float16 custom logical type (arrow.float16) - stored as fixed(2)
846    #[cfg(feature = "avro_custom_types")]
847    Float16,
848    /// Arrow Date64 custom logical type (arrow.date64)
849    #[cfg(feature = "avro_custom_types")]
850    Date64,
851    /// Arrow Time64(Nanosecond) custom logical type (arrow.time64-nanosecond)
852    #[cfg(feature = "avro_custom_types")]
853    TimeNanos,
854    /// Arrow Time32(Second) custom logical type (arrow.time32-second)
855    #[cfg(feature = "avro_custom_types")]
856    Time32Secs,
857    /// Arrow Timestamp(Second) custom logical type (arrow.timestamp-second)
858    /// The bool indicates UTC (true) or local (false)
859    #[cfg(feature = "avro_custom_types")]
860    TimestampSecs(bool),
861    /// Arrow Interval(YearMonth) custom logical type (arrow.interval-year-month)
862    #[cfg(feature = "avro_custom_types")]
863    IntervalYearMonth,
864    /// Arrow Interval(MonthDayNano) custom logical type (arrow.interval-month-day-nano)
865    #[cfg(feature = "avro_custom_types")]
866    IntervalMonthDayNano,
867    /// Arrow Interval(DayTime) custom logical type (arrow.interval-day-time)
868    #[cfg(feature = "avro_custom_types")]
869    IntervalDayTime,
870}
871
872impl Codec {
873    fn data_type(&self) -> DataType {
874        match self {
875            Self::Null => DataType::Null,
876            Self::Boolean => DataType::Boolean,
877            Self::Int32 => DataType::Int32,
878            Self::Int64 => DataType::Int64,
879            Self::Float32 => DataType::Float32,
880            Self::Float64 => DataType::Float64,
881            Self::Binary => DataType::Binary,
882            Self::Utf8 => DataType::Utf8,
883            Self::Utf8View => DataType::Utf8View,
884            Self::Date32 => DataType::Date32,
885            Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
886            Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
887            Self::TimestampMillis(tz) => DataType::Timestamp(
888                TimeUnit::Millisecond,
889                tz.as_ref().map(|tz| tz.as_str().into()),
890            ),
891            Self::TimestampMicros(tz) => DataType::Timestamp(
892                TimeUnit::Microsecond,
893                tz.as_ref().map(|tz| tz.as_str().into()),
894            ),
895            Self::TimestampNanos(tz) => DataType::Timestamp(
896                TimeUnit::Nanosecond,
897                tz.as_ref().map(|tz| tz.as_str().into()),
898            ),
899            Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
900            Self::Fixed(size) => DataType::FixedSizeBinary(*size),
901            Self::Decimal(precision, scale, _size) => {
902                let p = *precision as u8;
903                let s = scale.unwrap_or(0) as i8;
904                #[cfg(feature = "small_decimals")]
905                {
906                    if *precision <= DECIMAL32_MAX_PRECISION as usize {
907                        DataType::Decimal32(p, s)
908                    } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
909                        DataType::Decimal64(p, s)
910                    } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
911                        DataType::Decimal128(p, s)
912                    } else {
913                        DataType::Decimal256(p, s)
914                    }
915                }
916                #[cfg(not(feature = "small_decimals"))]
917                {
918                    if *precision <= DECIMAL128_MAX_PRECISION as usize {
919                        DataType::Decimal128(p, s)
920                    } else {
921                        DataType::Decimal256(p, s)
922                    }
923                }
924            }
925            Self::Uuid => DataType::FixedSizeBinary(16),
926            Self::Enum(_) => {
927                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
928            }
929            Self::List(f) => {
930                DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
931            }
932            Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
933            Self::Map(value_type) => {
934                let val_field = value_type.field_with_name("value");
935                DataType::Map(
936                    Arc::new(Field::new(
937                        "entries",
938                        DataType::Struct(Fields::from(vec![
939                            Field::new("key", DataType::Utf8, false),
940                            val_field,
941                        ])),
942                        false,
943                    )),
944                    false,
945                )
946            }
947            Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
948            #[cfg(feature = "avro_custom_types")]
949            Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
950            #[cfg(feature = "avro_custom_types")]
951            Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
952            #[cfg(feature = "avro_custom_types")]
953            Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
954            #[cfg(feature = "avro_custom_types")]
955            Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
956            #[cfg(feature = "avro_custom_types")]
957            Self::RunEndEncoded(values, bits) => {
958                let run_ends_dt = match *bits {
959                    16 => DataType::Int16,
960                    32 => DataType::Int32,
961                    64 => DataType::Int64,
962                    _ => unreachable!(),
963                };
964                DataType::RunEndEncoded(
965                    Arc::new(Field::new("run_ends", run_ends_dt, false)),
966                    Arc::new(Field::new("values", values.codec().data_type(), true)),
967                )
968            }
969            #[cfg(feature = "avro_custom_types")]
970            Self::Int8 => DataType::Int8,
971            #[cfg(feature = "avro_custom_types")]
972            Self::Int16 => DataType::Int16,
973            #[cfg(feature = "avro_custom_types")]
974            Self::UInt8 => DataType::UInt8,
975            #[cfg(feature = "avro_custom_types")]
976            Self::UInt16 => DataType::UInt16,
977            #[cfg(feature = "avro_custom_types")]
978            Self::UInt32 => DataType::UInt32,
979            #[cfg(feature = "avro_custom_types")]
980            Self::UInt64 => DataType::UInt64,
981            #[cfg(feature = "avro_custom_types")]
982            Self::Float16 => DataType::Float16,
983            #[cfg(feature = "avro_custom_types")]
984            Self::Date64 => DataType::Date64,
985            #[cfg(feature = "avro_custom_types")]
986            Self::TimeNanos => DataType::Time64(TimeUnit::Nanosecond),
987            #[cfg(feature = "avro_custom_types")]
988            Self::Time32Secs => DataType::Time32(TimeUnit::Second),
989            #[cfg(feature = "avro_custom_types")]
990            Self::TimestampSecs(is_utc) => {
991                DataType::Timestamp(TimeUnit::Second, is_utc.then(|| "+00:00".into()))
992            }
993            #[cfg(feature = "avro_custom_types")]
994            Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
995            #[cfg(feature = "avro_custom_types")]
996            Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
997            #[cfg(feature = "avro_custom_types")]
998            Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
999        }
1000    }
1001
1002    /// Converts a string codec to use Utf8View if requested
1003    ///
1004    /// The conversion only happens if both:
1005    /// 1. `use_utf8view` is true
1006    /// 2. The codec is currently `Utf8`
1007    pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
1008        if use_utf8view && matches!(self, Self::Utf8) {
1009            Self::Utf8View
1010        } else {
1011            self
1012        }
1013    }
1014
1015    #[inline]
1016    fn union_field_name(&self) -> String {
1017        UnionFieldKind::from(self).as_ref().to_owned()
1018    }
1019}
1020
1021impl From<PrimitiveType> for Codec {
1022    fn from(value: PrimitiveType) -> Self {
1023        match value {
1024            PrimitiveType::Null => Self::Null,
1025            PrimitiveType::Boolean => Self::Boolean,
1026            PrimitiveType::Int => Self::Int32,
1027            PrimitiveType::Long => Self::Int64,
1028            PrimitiveType::Float => Self::Float32,
1029            PrimitiveType::Double => Self::Float64,
1030            PrimitiveType::Bytes => Self::Binary,
1031            PrimitiveType::String => Self::Utf8,
1032        }
1033    }
1034}
1035
1036/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro
1037/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian).
1038///
1039/// Per Avro spec (Decimal logical type), for a fixed length `n`:
1040/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋.
1041///
1042/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports
1043/// Decimal256, which is 32 bytes and has max precision 76).
1044const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
1045    // Precomputed exact table for n = 1..=32
1046    // 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28,
1047    // 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52,
1048    // 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76
1049    const MAX_P: [usize; 32] = [
1050        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1051        59, 62, 64, 67, 69, 71, 74, 76,
1052    ];
1053    match n {
1054        1..=32 => Some(MAX_P[n - 1]),
1055        _ => None,
1056    }
1057}
1058
1059fn parse_decimal_attributes(
1060    attributes: &Attributes,
1061    fallback_size: Option<usize>,
1062    precision_required: bool,
1063) -> Result<(usize, usize, Option<usize>), ArrowError> {
1064    let precision = attributes
1065        .additional
1066        .get("precision")
1067        .and_then(|v| v.as_u64())
1068        .or(if precision_required { None } else { Some(10) })
1069        .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
1070        as usize;
1071    let scale = attributes
1072        .additional
1073        .get("scale")
1074        .and_then(|v| v.as_u64())
1075        .unwrap_or(0) as usize;
1076    let size = attributes
1077        .additional
1078        .get("size")
1079        .and_then(|v| v.as_u64())
1080        .map(|s| s as usize)
1081        .or(fallback_size);
1082    if precision == 0 {
1083        return Err(ArrowError::ParseError(
1084            "Decimal requires precision > 0".to_string(),
1085        ));
1086    }
1087    if scale > precision {
1088        return Err(ArrowError::ParseError(format!(
1089            "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
1090        )));
1091    }
1092    if precision > DECIMAL256_MAX_PRECISION as usize {
1093        return Err(ArrowError::ParseError(format!(
1094            "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
1095            DECIMAL256_MAX_PRECISION
1096        )));
1097    }
1098    if let Some(sz) = size {
1099        let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
1100            ArrowError::ParseError(format!(
1101                "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
1102            ))
1103        })?;
1104        if precision > max_p {
1105            return Err(ArrowError::ParseError(format!(
1106                "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
1107            )));
1108        }
1109    }
1110    Ok((precision, scale, size))
1111}
1112
1113#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
1114#[strum(serialize_all = "snake_case")]
1115enum UnionFieldKind {
1116    Null,
1117    Boolean,
1118    Int,
1119    Long,
1120    Float,
1121    Double,
1122    Bytes,
1123    String,
1124    Date,
1125    TimeMillis,
1126    TimeMicros,
1127    TimestampMillisUtc,
1128    TimestampMillisLocal,
1129    TimestampMicrosUtc,
1130    TimestampMicrosLocal,
1131    TimestampNanosUtc,
1132    TimestampNanosLocal,
1133    Duration,
1134    Fixed,
1135    Decimal,
1136    Enum,
1137    Array,
1138    Record,
1139    Map,
1140    Uuid,
1141    Union,
1142}
1143
1144impl From<&Codec> for UnionFieldKind {
1145    fn from(c: &Codec) -> Self {
1146        match c {
1147            Codec::Null => Self::Null,
1148            Codec::Boolean => Self::Boolean,
1149            Codec::Int32 => Self::Int,
1150            Codec::Int64 => Self::Long,
1151            Codec::Float32 => Self::Float,
1152            Codec::Float64 => Self::Double,
1153            Codec::Binary => Self::Bytes,
1154            Codec::Utf8 | Codec::Utf8View => Self::String,
1155            Codec::Date32 => Self::Date,
1156            Codec::TimeMillis => Self::TimeMillis,
1157            Codec::TimeMicros => Self::TimeMicros,
1158            Codec::TimestampMillis(Some(Tz::OffsetZero)) => Self::TimestampMillisUtc,
1159            Codec::TimestampMillis(Some(Tz::Utc)) => Self::TimestampMillisUtc,
1160            Codec::TimestampMillis(None) => Self::TimestampMillisLocal,
1161            Codec::TimestampMicros(Some(Tz::OffsetZero)) => Self::TimestampMicrosUtc,
1162            Codec::TimestampMicros(Some(Tz::Utc)) => Self::TimestampMicrosUtc,
1163            Codec::TimestampMicros(None) => Self::TimestampMicrosLocal,
1164            Codec::TimestampNanos(Some(Tz::OffsetZero)) => Self::TimestampNanosUtc,
1165            Codec::TimestampNanos(Some(Tz::Utc)) => Self::TimestampNanosUtc,
1166            Codec::TimestampNanos(None) => Self::TimestampNanosLocal,
1167            Codec::Interval => Self::Duration,
1168            Codec::Fixed(_) => Self::Fixed,
1169            Codec::Decimal(..) => Self::Decimal,
1170            Codec::Enum(_) => Self::Enum,
1171            Codec::List(_) => Self::Array,
1172            Codec::Struct(_) => Self::Record,
1173            Codec::Map(_) => Self::Map,
1174            Codec::Uuid => Self::Uuid,
1175            Codec::Union(..) => Self::Union,
1176            #[cfg(feature = "avro_custom_types")]
1177            Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
1178            #[cfg(feature = "avro_custom_types")]
1179            Codec::DurationNanos
1180            | Codec::DurationMicros
1181            | Codec::DurationMillis
1182            | Codec::DurationSeconds => Self::Duration,
1183            #[cfg(feature = "avro_custom_types")]
1184            Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 => Self::Int,
1185            #[cfg(feature = "avro_custom_types")]
1186            Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
1187                Self::Long
1188            }
1189            #[cfg(feature = "avro_custom_types")]
1190            Codec::Time32Secs => Self::TimeMillis, // Closest standard type
1191            #[cfg(feature = "avro_custom_types")]
1192            Codec::UInt64
1193            | Codec::Float16
1194            | Codec::IntervalYearMonth
1195            | Codec::IntervalMonthDayNano
1196            | Codec::IntervalDayTime => Self::Fixed,
1197        }
1198    }
1199}
1200
1201fn union_branch_name(dt: &AvroDataType) -> String {
1202    if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
1203        if name.contains(".") {
1204            // Full name
1205            return name.to_string();
1206        }
1207        if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1208            return format!("{ns}.{name}");
1209        }
1210        return name.to_string();
1211    }
1212    dt.codec.union_field_name()
1213}
1214
1215fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
1216    let arrow_fields: Vec<Field> = encodings
1217        .iter()
1218        .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1219        .collect();
1220    let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1221    UnionFields::try_new(type_ids, arrow_fields)
1222}
1223
1224/// Resolves Avro type names to [`AvroDataType`]
1225///
1226/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1227#[derive(Debug, Default)]
1228struct Resolver<'a> {
1229    map: HashMap<(&'a str, &'a str), AvroDataType>,
1230}
1231
1232impl<'a> Resolver<'a> {
1233    fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1234        self.map.insert((namespace.unwrap_or(""), name), schema);
1235    }
1236
1237    fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1238        let (namespace, name) = name
1239            .rsplit_once('.')
1240            .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1241        self.map
1242            .get(&(namespace, name))
1243            .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1244            .cloned()
1245    }
1246}
1247
1248fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1249    let mut out = HashSet::with_capacity(1 + aliases.len());
1250    let (full, _) = make_full_name(name, ns, None);
1251    out.insert(full);
1252    for a in aliases {
1253        let (fa, _) = make_full_name(a, None, ns);
1254        out.insert(fa);
1255    }
1256    out
1257}
1258
1259fn names_match(
1260    writer_name: &str,
1261    writer_namespace: Option<&str>,
1262    writer_aliases: &[&str],
1263    reader_name: &str,
1264    reader_namespace: Option<&str>,
1265    reader_aliases: &[&str],
1266) -> bool {
1267    let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1268    let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1269    // If the canonical full names match, or any alias matches cross-wise.
1270    !writer_set.is_disjoint(&reader_set)
1271}
1272
1273fn ensure_names_match(
1274    data_type: &str,
1275    writer_name: &str,
1276    writer_namespace: Option<&str>,
1277    writer_aliases: &[&str],
1278    reader_name: &str,
1279    reader_namespace: Option<&str>,
1280    reader_aliases: &[&str],
1281) -> Result<(), ArrowError> {
1282    if names_match(
1283        writer_name,
1284        writer_namespace,
1285        writer_aliases,
1286        reader_name,
1287        reader_namespace,
1288        reader_aliases,
1289    ) {
1290        Ok(())
1291    } else {
1292        Err(ArrowError::ParseError(format!(
1293            "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1294        )))
1295    }
1296}
1297
1298fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1299    match schema {
1300        Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1301        Schema::Type(Type {
1302            r#type: TypeName::Primitive(primitive),
1303            ..
1304        }) => Some(*primitive),
1305        _ => None,
1306    }
1307}
1308
1309fn nullable_union_variants<'x, 'y>(
1310    variant: &'y [Schema<'x>],
1311) -> Option<(Nullability, &'y Schema<'x>)> {
1312    if variant.len() != 2 {
1313        return None;
1314    }
1315    let is_null = |schema: &Schema<'x>| {
1316        matches!(
1317            schema,
1318            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1319        )
1320    };
1321    match (is_null(&variant[0]), is_null(&variant[1])) {
1322        (true, false) => Some((Nullability::NullFirst, &variant[1])),
1323        (false, true) => Some((Nullability::NullSecond, &variant[0])),
1324        _ => None,
1325    }
1326}
1327
1328#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1329enum UnionBranchKey {
1330    Named(String),
1331    Primitive(PrimitiveType),
1332    Array,
1333    Map,
1334}
1335
1336fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1337    let (name, namespace) = match s {
1338        Schema::TypeName(TypeName::Primitive(p))
1339        | Schema::Type(Type {
1340            r#type: TypeName::Primitive(p),
1341            ..
1342        }) => return Some(UnionBranchKey::Primitive(*p)),
1343        Schema::TypeName(TypeName::Ref(name))
1344        | Schema::Type(Type {
1345            r#type: TypeName::Ref(name),
1346            ..
1347        }) => (name, None),
1348        Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1349        Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1350        Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1351        Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1352        Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1353        Schema::Union(_) => return None,
1354    };
1355    let (full, _) = make_full_name(name, namespace, enclosing_ns);
1356    Some(UnionBranchKey::Named(full))
1357}
1358
1359fn union_first_duplicate<'a>(
1360    branches: &'a [Schema<'a>],
1361    enclosing_ns: Option<&'a str>,
1362) -> Option<String> {
1363    let mut seen = HashSet::with_capacity(branches.len());
1364    for schema in branches {
1365        if let Some(key) = branch_key_of(schema, enclosing_ns) {
1366            if !seen.insert(key.clone()) {
1367                let msg = match key {
1368                    UnionBranchKey::Named(full) => format!("named type {full}"),
1369                    UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1370                    UnionBranchKey::Array => "array".to_string(),
1371                    UnionBranchKey::Map => "map".to_string(),
1372                };
1373                return Some(msg);
1374            }
1375        }
1376    }
1377    None
1378}
1379
1380/// Resolves Avro type names to [`AvroDataType`]
1381///
1382/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1383struct Maker<'a> {
1384    resolver: Resolver<'a>,
1385    use_utf8view: bool,
1386    strict_mode: bool,
1387    tz: Tz,
1388}
1389
1390impl<'a> Maker<'a> {
1391    fn new(use_utf8view: bool, strict_mode: bool, tz: Tz) -> Self {
1392        Self {
1393            resolver: Default::default(),
1394            use_utf8view,
1395            strict_mode,
1396            tz,
1397        }
1398    }
1399
1400    #[cfg(feature = "avro_custom_types")]
1401    #[inline]
1402    fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1403        if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1404            let mut inner = (*values).clone();
1405            inner.nullability = Some(nb);
1406            dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1407        }
1408    }
1409
1410    fn make_data_type<'s>(
1411        &mut self,
1412        writer_schema: &'s Schema<'a>,
1413        reader_schema: Option<&'s Schema<'a>>,
1414        namespace: Option<&'a str>,
1415    ) -> Result<AvroDataType, ArrowError> {
1416        match reader_schema {
1417            Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1418            None => self.parse_type(writer_schema, namespace),
1419        }
1420    }
1421
1422    /// Parses a [`AvroDataType`] from the provided `Schema` and the given `name` and `namespace`
1423    ///
1424    /// `name`: is the name used to refer to `schema` in its parent
1425    /// `namespace`: an optional qualifier used as part of a type hierarchy
1426    /// If the data type is a string, convert to use Utf8View if requested
1427    ///
1428    /// This function is used during the schema conversion process to determine whether
1429    /// string data should be represented as StringArray (default) or StringViewArray.
1430    ///
1431    /// `use_utf8view`: if true, use Utf8View instead of Utf8 for string types
1432    ///
1433    /// See [`Resolver`] for more information
1434    fn parse_type<'s>(
1435        &mut self,
1436        schema: &'s Schema<'a>,
1437        namespace: Option<&'a str>,
1438    ) -> Result<AvroDataType, ArrowError> {
1439        match schema {
1440            Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1441                Codec::from(*p).with_utf8view(self.use_utf8view),
1442                Default::default(),
1443                None,
1444            )),
1445            Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1446            Schema::Union(f) => {
1447                let null = f
1448                    .iter()
1449                    .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1450                match (f.len() == 2, null) {
1451                    (true, Some(0)) => {
1452                        let mut field = self.parse_type(&f[1], namespace)?;
1453                        field.nullability = Some(Nullability::NullFirst);
1454                        #[cfg(feature = "avro_custom_types")]
1455                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1456                        return Ok(field);
1457                    }
1458                    (true, Some(1)) => {
1459                        if self.strict_mode {
1460                            return Err(ArrowError::SchemaError(
1461                                "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1462                                    .to_string(),
1463                            ));
1464                        }
1465                        let mut field = self.parse_type(&f[0], namespace)?;
1466                        field.nullability = Some(Nullability::NullSecond);
1467                        #[cfg(feature = "avro_custom_types")]
1468                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1469                        return Ok(field);
1470                    }
1471                    _ => {}
1472                }
1473                // Validate: unions may not immediately contain unions
1474                if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1475                    return Err(ArrowError::SchemaError(
1476                        "Avro unions may not immediately contain other unions".to_string(),
1477                    ));
1478                }
1479                // Validate: duplicates (named by full name; non-named by kind)
1480                if let Some(dup) = union_first_duplicate(f, namespace) {
1481                    return Err(ArrowError::SchemaError(format!(
1482                        "Avro union contains duplicate branch type: {dup}"
1483                    )));
1484                }
1485                // Parse all branches
1486                let children: Vec<AvroDataType> = f
1487                    .iter()
1488                    .map(|s| self.parse_type(s, namespace))
1489                    .collect::<Result<_, _>>()?;
1490                // Build Arrow layout once here
1491                let union_fields = build_union_fields(&children)?;
1492                Ok(AvroDataType::new(
1493                    Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1494                    Default::default(),
1495                    None,
1496                ))
1497            }
1498            Schema::Complex(c) => match c {
1499                ComplexType::Record(r) => {
1500                    let namespace = r.namespace.or(namespace);
1501                    let mut metadata = r.attributes.field_metadata();
1502                    let fields = r
1503                        .fields
1504                        .iter()
1505                        .map(|field| {
1506                            Ok(AvroField {
1507                                name: field.name.to_string(),
1508                                data_type: self.parse_type(&field.r#type, namespace)?,
1509                            })
1510                        })
1511                        .collect::<Result<_, ArrowError>>()?;
1512                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1513                    if let Some(ns) = namespace {
1514                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1515                    }
1516                    let field = AvroDataType {
1517                        nullability: None,
1518                        codec: Codec::Struct(fields),
1519                        metadata,
1520                        resolution: None,
1521                    };
1522                    self.resolver.register(r.name, namespace, field.clone());
1523                    Ok(field)
1524                }
1525                ComplexType::Array(a) => {
1526                    let field = self.parse_type(a.items.as_ref(), namespace)?;
1527                    Ok(AvroDataType {
1528                        nullability: None,
1529                        metadata: a.attributes.field_metadata(),
1530                        codec: Codec::List(Arc::new(field)),
1531                        resolution: None,
1532                    })
1533                }
1534                ComplexType::Fixed(f) => {
1535                    let size = f.size.try_into().map_err(|e| {
1536                        ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1537                    })?;
1538                    let namespace = f.namespace.or(namespace);
1539                    let mut metadata = f.attributes.field_metadata();
1540                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1541                    if let Some(ns) = namespace {
1542                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1543                    }
1544                    let field = match f.attributes.logical_type {
1545                        Some("decimal") => {
1546                            let (precision, scale, _) =
1547                                parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1548                            AvroDataType {
1549                                nullability: None,
1550                                metadata,
1551                                codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1552                                resolution: None,
1553                            }
1554                        }
1555                        Some("duration") => {
1556                            if size != 12 {
1557                                return Err(ArrowError::ParseError(format!(
1558                                    "Invalid fixed size for Duration: {size}, must be 12"
1559                                )));
1560                            };
1561                            AvroDataType {
1562                                nullability: None,
1563                                metadata,
1564                                codec: Codec::Interval,
1565                                resolution: None,
1566                            }
1567                        }
1568                        #[cfg(feature = "avro_custom_types")]
1569                        Some("arrow.uint64") if size == 8 => AvroDataType {
1570                            nullability: None,
1571                            metadata,
1572                            codec: Codec::UInt64,
1573                            resolution: None,
1574                        },
1575                        #[cfg(feature = "avro_custom_types")]
1576                        Some("arrow.float16") if size == 2 => AvroDataType {
1577                            nullability: None,
1578                            metadata,
1579                            codec: Codec::Float16,
1580                            resolution: None,
1581                        },
1582                        #[cfg(feature = "avro_custom_types")]
1583                        Some("arrow.interval-year-month") if size == 4 => AvroDataType {
1584                            nullability: None,
1585                            metadata,
1586                            codec: Codec::IntervalYearMonth,
1587                            resolution: None,
1588                        },
1589                        #[cfg(feature = "avro_custom_types")]
1590                        Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1591                            nullability: None,
1592                            metadata,
1593                            codec: Codec::IntervalMonthDayNano,
1594                            resolution: None,
1595                        },
1596                        #[cfg(feature = "avro_custom_types")]
1597                        Some("arrow.interval-day-time") if size == 8 => AvroDataType {
1598                            nullability: None,
1599                            metadata,
1600                            codec: Codec::IntervalDayTime,
1601                            resolution: None,
1602                        },
1603                        _ => AvroDataType {
1604                            nullability: None,
1605                            metadata,
1606                            codec: Codec::Fixed(size),
1607                            resolution: None,
1608                        },
1609                    };
1610                    self.resolver.register(f.name, namespace, field.clone());
1611                    Ok(field)
1612                }
1613                ComplexType::Enum(e) => {
1614                    let namespace = e.namespace.or(namespace);
1615                    let symbols = e
1616                        .symbols
1617                        .iter()
1618                        .map(|s| s.to_string())
1619                        .collect::<Arc<[String]>>();
1620                    let mut metadata = e.attributes.field_metadata();
1621                    let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1622                        ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1623                    })?;
1624                    metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1625                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1626                    if let Some(ns) = namespace {
1627                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1628                    }
1629                    let field = AvroDataType {
1630                        nullability: None,
1631                        metadata,
1632                        codec: Codec::Enum(symbols),
1633                        resolution: None,
1634                    };
1635                    self.resolver.register(e.name, namespace, field.clone());
1636                    Ok(field)
1637                }
1638                ComplexType::Map(m) => {
1639                    let val = self.parse_type(&m.values, namespace)?;
1640                    Ok(AvroDataType {
1641                        nullability: None,
1642                        metadata: m.attributes.field_metadata(),
1643                        codec: Codec::Map(Arc::new(val)),
1644                        resolution: None,
1645                    })
1646                }
1647            },
1648            Schema::Type(t) => {
1649                let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1650                // https://avro.apache.org/docs/1.11.1/specification/#logical-types
1651                match (t.attributes.logical_type, &mut field.codec) {
1652                    (Some("decimal"), c @ Codec::Binary) => {
1653                        let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1654                        *c = Codec::Decimal(prec, Some(sc), None);
1655                    }
1656                    (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1657                    (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1658                    (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1659                    (Some("timestamp-millis"), c @ Codec::Int64) => {
1660                        *c = Codec::TimestampMillis(Some(self.tz))
1661                    }
1662                    (Some("timestamp-micros"), c @ Codec::Int64) => {
1663                        *c = Codec::TimestampMicros(Some(self.tz))
1664                    }
1665                    (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1666                        *c = Codec::TimestampMillis(None)
1667                    }
1668                    (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1669                        *c = Codec::TimestampMicros(None)
1670                    }
1671                    (Some("timestamp-nanos"), c @ Codec::Int64) => {
1672                        *c = Codec::TimestampNanos(Some(self.tz))
1673                    }
1674                    (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1675                        *c = Codec::TimestampNanos(None)
1676                    }
1677                    (Some("uuid"), c @ Codec::Utf8) => {
1678                        // Map Avro string+logicalType=uuid into the UUID Codec,
1679                        // and preserve the logicalType in Arrow field metadata
1680                        // so writers can round-trip it correctly.
1681                        *c = Codec::Uuid;
1682                        field.metadata.insert("logicalType".into(), "uuid".into());
1683                    }
1684                    #[cfg(feature = "avro_custom_types")]
1685                    (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1686                    #[cfg(feature = "avro_custom_types")]
1687                    (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1688                    #[cfg(feature = "avro_custom_types")]
1689                    (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1690                    #[cfg(feature = "avro_custom_types")]
1691                    (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1692                        *c = Codec::DurationSeconds
1693                    }
1694                    #[cfg(feature = "avro_custom_types")]
1695                    (Some("arrow.run-end-encoded"), _) => {
1696                        let bits_u8: u8 = t
1697                            .attributes
1698                            .additional
1699                            .get("arrow.runEndIndexBits")
1700                            .and_then(|v| v.as_u64())
1701                            .and_then(|n| u8::try_from(n).ok())
1702                            .ok_or_else(|| ArrowError::ParseError(
1703                                "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1704                                    .to_string(),
1705                            ))?;
1706                        if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1707                            return Err(ArrowError::ParseError(format!(
1708                                "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1709                            )));
1710                        }
1711                        // Wrap the parsed underlying site as REE
1712                        let values_site = field.clone();
1713                        field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1714                    }
1715                    // Arrow-specific integer width types
1716                    #[cfg(feature = "avro_custom_types")]
1717                    (Some("arrow.int8"), c @ Codec::Int32) => *c = Codec::Int8,
1718                    #[cfg(feature = "avro_custom_types")]
1719                    (Some("arrow.int16"), c @ Codec::Int32) => *c = Codec::Int16,
1720                    #[cfg(feature = "avro_custom_types")]
1721                    (Some("arrow.uint8"), c @ Codec::Int32) => *c = Codec::UInt8,
1722                    #[cfg(feature = "avro_custom_types")]
1723                    (Some("arrow.uint16"), c @ Codec::Int32) => *c = Codec::UInt16,
1724                    #[cfg(feature = "avro_custom_types")]
1725                    (Some("arrow.uint32"), c @ Codec::Int64) => *c = Codec::UInt32,
1726                    #[cfg(feature = "avro_custom_types")]
1727                    (Some("arrow.uint64"), c @ Codec::Fixed(8)) => *c = Codec::UInt64,
1728                    // Arrow Float16 stored as fixed(2)
1729                    #[cfg(feature = "avro_custom_types")]
1730                    (Some("arrow.float16"), c @ Codec::Fixed(2)) => *c = Codec::Float16,
1731                    // Arrow Date64 custom type
1732                    #[cfg(feature = "avro_custom_types")]
1733                    (Some("arrow.date64"), c @ Codec::Int64) => *c = Codec::Date64,
1734                    // Arrow time/timestamp types with second precision
1735                    #[cfg(feature = "avro_custom_types")]
1736                    (Some("arrow.time64-nanosecond"), c @ Codec::Int64) => *c = Codec::TimeNanos,
1737                    #[cfg(feature = "avro_custom_types")]
1738                    (Some("arrow.time32-second"), c @ Codec::Int32) => *c = Codec::Time32Secs,
1739                    #[cfg(feature = "avro_custom_types")]
1740                    (Some("arrow.timestamp-second"), c @ Codec::Int64) => {
1741                        *c = Codec::TimestampSecs(true)
1742                    }
1743                    #[cfg(feature = "avro_custom_types")]
1744                    (Some("arrow.local-timestamp-second"), c @ Codec::Int64) => {
1745                        *c = Codec::TimestampSecs(false)
1746                    }
1747                    // Arrow interval types
1748                    #[cfg(feature = "avro_custom_types")]
1749                    (Some("arrow.interval-year-month"), c @ Codec::Fixed(4)) => {
1750                        *c = Codec::IntervalYearMonth
1751                    }
1752                    #[cfg(feature = "avro_custom_types")]
1753                    (Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1754                        *c = Codec::IntervalMonthDayNano
1755                    }
1756                    #[cfg(feature = "avro_custom_types")]
1757                    (Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
1758                        *c = Codec::IntervalDayTime
1759                    }
1760                    (Some(logical), _) => {
1761                        // Insert unrecognized logical type into metadata map
1762                        field.metadata.insert("logicalType".into(), logical.into());
1763                    }
1764                    (None, _) => {}
1765                }
1766                if matches!(field.codec, Codec::Int64) {
1767                    if let Some(unit) = t
1768                        .attributes
1769                        .additional
1770                        .get("arrowTimeUnit")
1771                        .and_then(|v| v.as_str())
1772                    {
1773                        if unit == "nanosecond" {
1774                            field.codec = Codec::TimestampNanos(Some(self.tz));
1775                        }
1776                    }
1777                }
1778                if !t.attributes.additional.is_empty() {
1779                    for (k, v) in &t.attributes.additional {
1780                        field.metadata.insert(k.to_string(), v.to_string());
1781                    }
1782                }
1783                Ok(field)
1784            }
1785        }
1786    }
1787
1788    fn resolve_type<'s>(
1789        &mut self,
1790        writer_schema: &'s Schema<'a>,
1791        reader_schema: &'s Schema<'a>,
1792        namespace: Option<&'a str>,
1793    ) -> Result<AvroDataType, ArrowError> {
1794        if let (Some(write_primitive), Some(read_primitive)) =
1795            (primitive_of(writer_schema), primitive_of(reader_schema))
1796        {
1797            return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1798        }
1799        match (writer_schema, reader_schema) {
1800            (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1801                let writer_variants = writer_variants.as_slice();
1802                let reader_variants = reader_variants.as_slice();
1803                match (
1804                    nullable_union_variants(writer_variants),
1805                    nullable_union_variants(reader_variants),
1806                ) {
1807                    (Some((w_nb, w_nonnull)), Some((r_nb, r_nonnull))) => {
1808                        let mut dt = self.resolve_type(w_nonnull, r_nonnull, namespace)?;
1809                        let mut writer_to_reader = vec![None, None];
1810                        writer_to_reader[w_nb.non_null_index()] = Some((
1811                            r_nb.non_null_index(),
1812                            dt.resolution
1813                                .take()
1814                                .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct)),
1815                        ));
1816                        dt.nullability = Some(w_nb);
1817                        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1818                            writer_to_reader: Arc::from(writer_to_reader),
1819                            writer_is_union: true,
1820                            reader_is_union: true,
1821                        }));
1822                        #[cfg(feature = "avro_custom_types")]
1823                        Self::propagate_nullability_into_ree(&mut dt, w_nb);
1824                        Ok(dt)
1825                    }
1826                    _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1827                }
1828            }
1829            (Schema::Union(writer_variants), reader_non_union) => {
1830                let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1831                    .iter()
1832                    .map(|writer| {
1833                        self.resolve_type(writer, reader_non_union, namespace)
1834                            .ok()
1835                            .map(|tmp| {
1836                                let resolution = tmp
1837                                    .resolution
1838                                    .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1839                                (0usize, resolution)
1840                            })
1841                    })
1842                    .collect();
1843                let mut dt = self.parse_type(reader_non_union, namespace)?;
1844                dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1845                    writer_to_reader: Arc::from(writer_to_reader),
1846                    writer_is_union: true,
1847                    reader_is_union: false,
1848                }));
1849                Ok(dt)
1850            }
1851            (writer_non_union, Schema::Union(reader_variants)) => {
1852                if let Some((nullability, non_null_branch)) =
1853                    nullable_union_variants(reader_variants)
1854                {
1855                    let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1856                    #[cfg(feature = "avro_custom_types")]
1857                    Self::propagate_nullability_into_ree(&mut dt, nullability);
1858                    dt.nullability = Some(nullability);
1859                    // Ensure resolution is set to a non-Union variant to suppress
1860                    // reading the union tag which is the default behavior.
1861                    if dt.resolution.is_none() {
1862                        dt.resolution = Some(ResolutionInfo::Promotion(Promotion::Direct));
1863                    }
1864                    Ok(dt)
1865                } else {
1866                    let Some((match_idx, mut match_dt)) =
1867                        self.find_best_union_match(writer_non_union, reader_variants, namespace)
1868                    else {
1869                        return Err(ArrowError::SchemaError(
1870                            "Writer schema does not match any reader union branch".to_string(),
1871                        ));
1872                    };
1873                    // Steal the resolution info from the matching reader branch
1874                    // for the Union resolution, but preserve possible resolution
1875                    // information on its inner types.
1876                    // For other branches, resolution is irrelevant,
1877                    // so just parse them.
1878                    let resolution = match_dt
1879                        .resolution
1880                        .take()
1881                        .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1882                    let mut match_dt = Some(match_dt);
1883                    let children = reader_variants
1884                        .iter()
1885                        .enumerate()
1886                        .map(|(idx, variant)| {
1887                            if idx == match_idx {
1888                                Ok(match_dt.take().unwrap())
1889                            } else {
1890                                self.parse_type(variant, namespace)
1891                            }
1892                        })
1893                        .collect::<Result<Vec<_>, _>>()?;
1894                    let union_fields = build_union_fields(&children)?;
1895                    let mut dt = AvroDataType::new(
1896                        Codec::Union(children.into(), union_fields, UnionMode::Dense),
1897                        Default::default(),
1898                        None,
1899                    );
1900                    dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1901                        writer_to_reader: Arc::from(vec![Some((match_idx, resolution))]),
1902                        writer_is_union: false,
1903                        reader_is_union: true,
1904                    }));
1905                    Ok(dt)
1906                }
1907            }
1908            (
1909                Schema::Complex(ComplexType::Array(writer_array)),
1910                Schema::Complex(ComplexType::Array(reader_array)),
1911            ) => self.resolve_array(writer_array, reader_array, namespace),
1912            (
1913                Schema::Complex(ComplexType::Map(writer_map)),
1914                Schema::Complex(ComplexType::Map(reader_map)),
1915            ) => self.resolve_map(writer_map, reader_map, namespace),
1916            (
1917                Schema::Complex(ComplexType::Fixed(writer_fixed)),
1918                Schema::Complex(ComplexType::Fixed(reader_fixed)),
1919            ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1920            (
1921                Schema::Complex(ComplexType::Record(writer_record)),
1922                Schema::Complex(ComplexType::Record(reader_record)),
1923            ) => self.resolve_records(writer_record, reader_record, namespace),
1924            (
1925                Schema::Complex(ComplexType::Enum(writer_enum)),
1926                Schema::Complex(ComplexType::Enum(reader_enum)),
1927            ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1928            (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1929            (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1930            _ => Err(ArrowError::NotYetImplemented(
1931                "Other resolutions not yet implemented".to_string(),
1932            )),
1933        }
1934    }
1935
1936    fn find_best_union_match(
1937        &mut self,
1938        writer: &Schema<'a>,
1939        reader_variants: &[Schema<'a>],
1940        namespace: Option<&'a str>,
1941    ) -> Option<(usize, AvroDataType)> {
1942        let mut first_resolution = None;
1943        for (reader_index, reader) in reader_variants.iter().enumerate() {
1944            if let Ok(dt) = self.resolve_type(writer, reader, namespace) {
1945                match &dt.resolution {
1946                    None | Some(ResolutionInfo::Promotion(Promotion::Direct)) => {
1947                        // An exact match is best, return immediately.
1948                        return Some((reader_index, dt));
1949                    }
1950                    Some(_) => {
1951                        if first_resolution.is_none() {
1952                            // Store the first valid promotion but keep searching for a direct match.
1953                            first_resolution = Some((reader_index, dt));
1954                        }
1955                    }
1956                };
1957            }
1958        }
1959        first_resolution
1960    }
1961
1962    fn resolve_unions<'s>(
1963        &mut self,
1964        writer_variants: &'s [Schema<'a>],
1965        reader_variants: &'s [Schema<'a>],
1966        namespace: Option<&'a str>,
1967    ) -> Result<AvroDataType, ArrowError> {
1968        let mut resolved_reader_encodings = HashMap::new();
1969        let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1970            .iter()
1971            .map(|writer| {
1972                self.find_best_union_match(writer, reader_variants, namespace)
1973                    .map(|(match_idx, mut match_dt)| {
1974                        let resolution = match_dt
1975                            .resolution
1976                            .take()
1977                            .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1978                        // TODO: check for overlapping reader variants?
1979                        // They should not be possible in a valid schema.
1980                        resolved_reader_encodings.insert(match_idx, match_dt);
1981                        (match_idx, resolution)
1982                    })
1983            })
1984            .collect();
1985        let reader_encodings: Vec<AvroDataType> = reader_variants
1986            .iter()
1987            .enumerate()
1988            .map(|(reader_idx, reader_schema)| {
1989                if let Some(resolved) = resolved_reader_encodings.remove(&reader_idx) {
1990                    Ok(resolved)
1991                } else {
1992                    self.parse_type(reader_schema, namespace)
1993                }
1994            })
1995            .collect::<Result<_, _>>()?;
1996        let union_fields = build_union_fields(&reader_encodings)?;
1997        let mut dt = AvroDataType::new(
1998            Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1999            Default::default(),
2000            None,
2001        );
2002        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
2003            writer_to_reader: Arc::from(writer_to_reader),
2004            writer_is_union: true,
2005            reader_is_union: true,
2006        }));
2007        Ok(dt)
2008    }
2009
2010    fn resolve_array(
2011        &mut self,
2012        writer_array: &Array<'a>,
2013        reader_array: &Array<'a>,
2014        namespace: Option<&'a str>,
2015    ) -> Result<AvroDataType, ArrowError> {
2016        Ok(AvroDataType {
2017            nullability: None,
2018            metadata: reader_array.attributes.field_metadata(),
2019            codec: Codec::List(Arc::new(self.make_data_type(
2020                writer_array.items.as_ref(),
2021                Some(reader_array.items.as_ref()),
2022                namespace,
2023            )?)),
2024            resolution: None,
2025        })
2026    }
2027
2028    fn resolve_map(
2029        &mut self,
2030        writer_map: &Map<'a>,
2031        reader_map: &Map<'a>,
2032        namespace: Option<&'a str>,
2033    ) -> Result<AvroDataType, ArrowError> {
2034        Ok(AvroDataType {
2035            nullability: None,
2036            metadata: reader_map.attributes.field_metadata(),
2037            codec: Codec::Map(Arc::new(self.make_data_type(
2038                &writer_map.values,
2039                Some(&reader_map.values),
2040                namespace,
2041            )?)),
2042            resolution: None,
2043        })
2044    }
2045
2046    fn resolve_fixed<'s>(
2047        &mut self,
2048        writer_fixed: &Fixed<'a>,
2049        reader_fixed: &Fixed<'a>,
2050        reader_schema: &'s Schema<'a>,
2051        namespace: Option<&'a str>,
2052    ) -> Result<AvroDataType, ArrowError> {
2053        ensure_names_match(
2054            "Fixed",
2055            writer_fixed.name,
2056            writer_fixed.namespace,
2057            &writer_fixed.aliases,
2058            reader_fixed.name,
2059            reader_fixed.namespace,
2060            &reader_fixed.aliases,
2061        )?;
2062        if writer_fixed.size != reader_fixed.size {
2063            return Err(ArrowError::SchemaError(format!(
2064                "Fixed size mismatch for {}: writer={}, reader={}",
2065                reader_fixed.name, writer_fixed.size, reader_fixed.size
2066            )));
2067        }
2068        self.parse_type(reader_schema, namespace)
2069    }
2070
2071    fn resolve_primitives(
2072        &mut self,
2073        write_primitive: PrimitiveType,
2074        read_primitive: PrimitiveType,
2075        reader_schema: &Schema<'a>,
2076    ) -> Result<AvroDataType, ArrowError> {
2077        if write_primitive == read_primitive {
2078            return self.parse_type(reader_schema, None);
2079        }
2080        let promotion = match (write_primitive, read_primitive) {
2081            (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
2082            (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
2083            (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
2084            (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
2085            (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
2086            (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
2087            (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
2088            (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
2089            _ => {
2090                return Err(ArrowError::ParseError(format!(
2091                    "Illegal promotion {write_primitive:?} to {read_primitive:?}"
2092                )));
2093            }
2094        };
2095        let mut datatype = self.parse_type(reader_schema, None)?;
2096        datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
2097        Ok(datatype)
2098    }
2099
2100    // Resolve writer vs. reader enum schemas according to Avro 1.11.1.
2101    //
2102    // # How enums resolve (writer to reader)
2103    // Per “Schema Resolution”:
2104    // * The two schemas must refer to the same (unqualified) enum name (or match
2105    //   via alias rewriting).
2106    // * If the writer’s symbol is not present in the reader’s enum and the reader
2107    //   enum has a `default`, that `default` symbol must be used; otherwise,
2108    //   error.
2109    //   https://avro.apache.org/docs/1.11.1/specification/#schema-resolution
2110    // * Avro “Aliases” are applied from the reader side to rewrite the writer’s
2111    //   names during resolution. For robustness across ecosystems, we also accept
2112    //   symmetry here (see note below).
2113    //   https://avro.apache.org/docs/1.11.1/specification/#aliases
2114    //
2115    // # Rationale for this code path
2116    // 1. Do the work once at schema‑resolution time. Avro serializes an enum as a
2117    //    writer‑side position. Mapping positions on the hot decoder path is expensive
2118    //    if done with string lookups. This method builds a `writer_index to reader_index`
2119    //    vector once, so decoding just does an O(1) table lookup.
2120    // 2. Adopt the reader’s symbol set and order. We return an Arrow
2121    //    `Dictionary(Int32, Utf8)` whose dictionary values are the reader enum
2122    //    symbols. This makes downstream semantics match the reader schema, including
2123    //    Avro’s sort order rule that orders enums by symbol position in the schema.
2124    //    https://avro.apache.org/docs/1.11.1/specification/#sort-order
2125    // 3. Honor Avro’s `default` for enums. Avro 1.9+ allows a type‑level default
2126    //    on the enum. When the writer emits a symbol unknown to the reader, we map it
2127    //    to the reader’s validated `default` symbol if present; otherwise we signal an
2128    //    error at decoding time.
2129    //    https://avro.apache.org/docs/1.11.1/specification/#enums
2130    //
2131    // # Implementation notes
2132    // * We first check that enum names match or are*alias‑equivalent. The Avro
2133    //   spec describes alias rewriting using reader aliases; this implementation
2134    //   additionally treats writer aliases as acceptable for name matching to be
2135    //   resilient with schemas produced by different tooling.
2136    // * We build `EnumMapping`:
2137    //   - `mapping[i]` = reader index of the writer symbol at writer index `i`.
2138    //   - If the writer symbol is absent and the reader has a default, we store the
2139    //     reader index of that default.
2140    //   - Otherwise we store `-1` as a sentinel meaning unresolvable; the decoder
2141    //     must treat encountering such a value as an error, per the spec.
2142    // * We persist the reader symbol list in field metadata under
2143    //   `AVRO_ENUM_SYMBOLS_METADATA_KEY`, so consumers can inspect the dictionary
2144    //   without needing the original Avro schema.
2145    // * The Arrow representation is `Dictionary(Int32, Utf8)`, which aligns with
2146    //   Avro’s integer index encoding for enums.
2147    //
2148    // # Examples
2149    // * Writer `["A","B","C"]`, Reader `["A","B"]`, Reader default `"A"`
2150    //     `mapping = [0, 1, 0]`, `default_index = 0`.
2151    // * Writer `["A","B"]`, Reader `["B","A"]` (no default)
2152    //     `mapping = [1, 0]`, `default_index = -1`.
2153    // * Writer `["A","B","C"]`, Reader `["A","B"]` (no default)
2154    //     `mapping = [0, 1, -1]` (decode must error on `"C"`).
2155    fn resolve_enums(
2156        &mut self,
2157        writer_enum: &Enum<'a>,
2158        reader_enum: &Enum<'a>,
2159        reader_schema: &Schema<'a>,
2160        namespace: Option<&'a str>,
2161    ) -> Result<AvroDataType, ArrowError> {
2162        ensure_names_match(
2163            "Enum",
2164            writer_enum.name,
2165            writer_enum.namespace,
2166            &writer_enum.aliases,
2167            reader_enum.name,
2168            reader_enum.namespace,
2169            &reader_enum.aliases,
2170        )?;
2171        if writer_enum.symbols == reader_enum.symbols {
2172            return self.parse_type(reader_schema, namespace);
2173        }
2174        let reader_index: HashMap<&str, i32> = reader_enum
2175            .symbols
2176            .iter()
2177            .enumerate()
2178            .map(|(index, &symbol)| (symbol, index as i32))
2179            .collect();
2180        let default_index: i32 = match reader_enum.default {
2181            Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
2182                ArrowError::SchemaError(format!(
2183                    "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
2184                    reader_enum.name,
2185                ))
2186            })?,
2187            None => -1,
2188        };
2189        let mapping: Vec<i32> = writer_enum
2190            .symbols
2191            .iter()
2192            .map(|&write_symbol| {
2193                reader_index
2194                    .get(write_symbol)
2195                    .copied()
2196                    .unwrap_or(default_index)
2197            })
2198            .collect();
2199        if self.strict_mode && mapping.iter().any(|&m| m < 0) {
2200            return Err(ArrowError::SchemaError(format!(
2201                "Reader enum '{}' does not cover all writer symbols and no default is provided",
2202                reader_enum.name
2203            )));
2204        }
2205        let mut dt = self.parse_type(reader_schema, namespace)?;
2206        dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
2207            mapping: Arc::from(mapping),
2208            default_index,
2209        }));
2210        let reader_ns = reader_enum.namespace.or(namespace);
2211        self.resolver
2212            .register(reader_enum.name, reader_ns, dt.clone());
2213        Ok(dt)
2214    }
2215
2216    #[inline]
2217    fn build_writer_lookup(
2218        writer_record: &Record<'a>,
2219    ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
2220        let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
2221        for (idx, wf) in writer_record.fields.iter().enumerate() {
2222            // Avro field names are unique; last-in wins are acceptable and match previous behavior.
2223            map.insert(wf.name, idx);
2224        }
2225        // Track ambiguous writer aliases (alias used by multiple writer fields)
2226        let mut ambiguous: HashSet<&str> = HashSet::new();
2227        for (idx, wf) in writer_record.fields.iter().enumerate() {
2228            for &alias in &wf.aliases {
2229                match map.entry(alias) {
2230                    Entry::Occupied(e) if *e.get() != idx => {
2231                        ambiguous.insert(alias);
2232                    }
2233                    Entry::Vacant(e) => {
2234                        e.insert(idx);
2235                    }
2236                    _ => {}
2237                }
2238            }
2239        }
2240        (map, ambiguous)
2241    }
2242
2243    fn resolve_records(
2244        &mut self,
2245        writer_record: &Record<'a>,
2246        reader_record: &Record<'a>,
2247        namespace: Option<&'a str>,
2248    ) -> Result<AvroDataType, ArrowError> {
2249        ensure_names_match(
2250            "Record",
2251            writer_record.name,
2252            writer_record.namespace,
2253            &writer_record.aliases,
2254            reader_record.name,
2255            reader_record.namespace,
2256            &reader_record.aliases,
2257        )?;
2258        let writer_ns = writer_record.namespace.or(namespace);
2259        let reader_ns = reader_record.namespace.or(namespace);
2260        let mut reader_md = reader_record.attributes.field_metadata();
2261        reader_md.insert(
2262            AVRO_NAME_METADATA_KEY.to_string(),
2263            reader_record.name.to_string(),
2264        );
2265        if let Some(ns) = reader_ns {
2266            reader_md.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
2267        }
2268        // Build writer lookup and ambiguous alias set.
2269        let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
2270        let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
2271        let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
2272        // Capture default field indices during the main loop (one pass).
2273        let mut default_fields: Vec<usize> = Vec::new();
2274        for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
2275            // Direct name match, then reader aliases (a writer alias map is pre-populated).
2276            let mut match_idx = writer_lookup.get(r_field.name).copied();
2277            let mut matched_via_alias: Option<&str> = None;
2278            if match_idx.is_none() {
2279                for &alias in &r_field.aliases {
2280                    if let Some(i) = writer_lookup.get(alias).copied() {
2281                        if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
2282                            return Err(ArrowError::SchemaError(format!(
2283                                "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
2284                                r_field.name
2285                            )));
2286                        }
2287                        match_idx = Some(i);
2288                        matched_via_alias = Some(alias);
2289                        break;
2290                    }
2291                }
2292            }
2293            if let Some(wi) = match_idx {
2294                if writer_to_reader[wi].is_none() {
2295                    let w_schema = &writer_record.fields[wi].r#type;
2296                    let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
2297                    writer_to_reader[wi] = Some(reader_idx);
2298                    reader_fields.push(AvroField {
2299                        name: r_field.name.to_owned(),
2300                        data_type: dt,
2301                    });
2302                    continue;
2303                } else if self.strict_mode {
2304                    // Writer field already mapped and strict_mode => error
2305                    let existing_reader = writer_to_reader[wi].unwrap();
2306                    let via = matched_via_alias
2307                        .map(|a| format!("alias '{a}'"))
2308                        .unwrap_or_else(|| "name match".to_string());
2309                    return Err(ArrowError::SchemaError(format!(
2310                        "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
2311                        writer_record.fields[wi].name
2312                    )));
2313                }
2314                // Non-strict and already mapped -> fall through to defaulting logic
2315            }
2316            // No match (or conflicted in non-strict mode): attach default per Avro spec.
2317            let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
2318            if let Some(default_json) = r_field.default.as_ref() {
2319                dt.resolution = Some(ResolutionInfo::DefaultValue(
2320                    dt.parse_and_store_default(default_json)?,
2321                ));
2322                default_fields.push(reader_idx);
2323            } else if dt.nullability() == Some(Nullability::NullFirst) {
2324                // The only valid implicit default for a union is the first branch (null-first case).
2325                dt.resolution = Some(ResolutionInfo::DefaultValue(
2326                    dt.parse_and_store_default(&Value::Null)?,
2327                ));
2328                default_fields.push(reader_idx);
2329            } else {
2330                return Err(ArrowError::SchemaError(format!(
2331                    "Reader field '{}' not present in writer schema must have a default value",
2332                    r_field.name
2333                )));
2334            }
2335            reader_fields.push(AvroField {
2336                name: r_field.name.to_owned(),
2337                data_type: dt,
2338            });
2339        }
2340        // Build writer field map.
2341        let writer_fields = writer_record
2342            .fields
2343            .iter()
2344            .enumerate()
2345            .map(|(writer_index, writer_field)| {
2346                let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
2347                if let Some(reader_index) = writer_to_reader[writer_index] {
2348                    Ok(ResolvedField::ToReader(reader_index, dt))
2349                } else {
2350                    Ok(ResolvedField::Skip(dt))
2351                }
2352            })
2353            .collect::<Result<_, ArrowError>>()?;
2354        let resolved = AvroDataType::new_with_resolution(
2355            Codec::Struct(Arc::from(reader_fields)),
2356            reader_md,
2357            None,
2358            Some(ResolutionInfo::Record(ResolvedRecord {
2359                writer_fields,
2360                default_fields: Arc::from(default_fields),
2361            })),
2362        );
2363        // Register a resolved record by reader name+namespace for potential named type refs.
2364        self.resolver
2365            .register(reader_record.name, reader_ns, resolved.clone());
2366        Ok(resolved)
2367    }
2368}
2369
2370#[cfg(test)]
2371mod tests {
2372    use super::*;
2373    use crate::schema::{
2374        AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2375        Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2376    };
2377    use indexmap::IndexMap;
2378    use serde_json::{self, Value};
2379
2380    fn create_schema_with_logical_type(
2381        primitive_type: PrimitiveType,
2382        logical_type: &'static str,
2383    ) -> Schema<'static> {
2384        let attributes = Attributes {
2385            logical_type: Some(logical_type),
2386            additional: Default::default(),
2387        };
2388
2389        Schema::Type(Type {
2390            r#type: TypeName::Primitive(primitive_type),
2391            attributes,
2392        })
2393    }
2394
2395    fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2396        let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2397        let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2398        let mut maker = Maker::new(false, false, Tz::default());
2399        maker
2400            .make_data_type(&writer_schema, Some(&reader_schema), None)
2401            .expect("promotion should resolve")
2402    }
2403
2404    fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2405        Schema::TypeName(TypeName::Primitive(pt))
2406    }
2407    fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2408        Schema::Union(branches)
2409    }
2410
2411    #[test]
2412    fn test_date_logical_type() {
2413        let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2414
2415        let mut maker = Maker::new(false, false, Tz::default());
2416        let result = maker.make_data_type(&schema, None, None).unwrap();
2417
2418        assert!(matches!(result.codec, Codec::Date32));
2419    }
2420
2421    #[test]
2422    fn test_time_millis_logical_type() {
2423        let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2424
2425        let mut maker = Maker::new(false, false, Tz::default());
2426        let result = maker.make_data_type(&schema, None, None).unwrap();
2427
2428        assert!(matches!(result.codec, Codec::TimeMillis));
2429    }
2430
2431    #[test]
2432    fn test_time_micros_logical_type() {
2433        let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2434
2435        let mut maker = Maker::new(false, false, Tz::default());
2436        let result = maker.make_data_type(&schema, None, None).unwrap();
2437
2438        assert!(matches!(result.codec, Codec::TimeMicros));
2439    }
2440
2441    #[test]
2442    fn test_timestamp_millis_logical_type() {
2443        for tz in [Tz::OffsetZero, Tz::Utc] {
2444            let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2445
2446            let mut maker = Maker::new(false, false, tz);
2447            let result = maker.make_data_type(&schema, None, None).unwrap();
2448
2449            let Codec::TimestampMillis(Some(actual_tz)) = result.codec else {
2450                panic!("Expected TimestampMillis codec");
2451            };
2452            assert_eq!(actual_tz, tz);
2453        }
2454    }
2455
2456    #[test]
2457    fn test_timestamp_micros_logical_type() {
2458        for tz in [Tz::OffsetZero, Tz::Utc] {
2459            let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2460
2461            let mut maker = Maker::new(false, false, tz);
2462            let result = maker.make_data_type(&schema, None, None).unwrap();
2463
2464            let Codec::TimestampMicros(Some(actual_tz)) = result.codec else {
2465                panic!("Expected TimestampMicros codec");
2466            };
2467            assert_eq!(actual_tz, tz);
2468        }
2469    }
2470
2471    #[test]
2472    fn test_timestamp_nanos_logical_type() {
2473        for tz in [Tz::OffsetZero, Tz::Utc] {
2474            let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-nanos");
2475
2476            let mut maker = Maker::new(false, false, tz);
2477            let result = maker.make_data_type(&schema, None, None).unwrap();
2478
2479            let Codec::TimestampNanos(Some(actual_tz)) = result.codec else {
2480                panic!("Expected TimestampNanos codec");
2481            };
2482            assert_eq!(actual_tz, tz);
2483        }
2484    }
2485
2486    #[test]
2487    fn test_local_timestamp_millis_logical_type() {
2488        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2489
2490        let mut maker = Maker::new(false, false, Tz::default());
2491        let result = maker.make_data_type(&schema, None, None).unwrap();
2492
2493        assert!(matches!(result.codec, Codec::TimestampMillis(None)));
2494    }
2495
2496    #[test]
2497    fn test_local_timestamp_micros_logical_type() {
2498        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2499
2500        let mut maker = Maker::new(false, false, Tz::default());
2501        let result = maker.make_data_type(&schema, None, None).unwrap();
2502
2503        assert!(matches!(result.codec, Codec::TimestampMicros(None)));
2504    }
2505
2506    #[test]
2507    fn test_local_timestamp_nanos_logical_type() {
2508        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-nanos");
2509
2510        let mut maker = Maker::new(false, false, Tz::default());
2511        let result = maker.make_data_type(&schema, None, None).unwrap();
2512
2513        assert!(matches!(result.codec, Codec::TimestampNanos(None)));
2514    }
2515
2516    #[test]
2517    fn test_uuid_type() {
2518        let mut codec = Codec::Fixed(16);
2519        if let c @ Codec::Fixed(16) = &mut codec {
2520            *c = Codec::Uuid;
2521        }
2522        assert!(matches!(codec, Codec::Uuid));
2523    }
2524
2525    #[test]
2526    fn test_duration_logical_type() {
2527        let mut codec = Codec::Fixed(12);
2528
2529        if let c @ Codec::Fixed(12) = &mut codec {
2530            *c = Codec::Interval;
2531        }
2532
2533        assert!(matches!(codec, Codec::Interval));
2534    }
2535
2536    #[test]
2537    fn test_decimal_logical_type_not_implemented() {
2538        let codec = Codec::Fixed(16);
2539
2540        let process_decimal = || -> Result<(), ArrowError> {
2541            if let Codec::Fixed(_) = codec {
2542                return Err(ArrowError::NotYetImplemented(
2543                    "Decimals are not currently supported".to_string(),
2544                ));
2545            }
2546            Ok(())
2547        };
2548
2549        let result = process_decimal();
2550
2551        assert!(result.is_err());
2552        if let Err(ArrowError::NotYetImplemented(msg)) = result {
2553            assert!(msg.contains("Decimals are not currently supported"));
2554        } else {
2555            panic!("Expected NotYetImplemented error");
2556        }
2557    }
2558    #[test]
2559    fn test_unknown_logical_type_added_to_metadata() {
2560        let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2561
2562        let mut maker = Maker::new(false, false, Tz::default());
2563        let result = maker.make_data_type(&schema, None, None).unwrap();
2564
2565        assert_eq!(
2566            result.metadata.get("logicalType"),
2567            Some(&"custom-type".to_string())
2568        );
2569    }
2570
2571    #[test]
2572    fn test_string_with_utf8view_enabled() {
2573        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2574
2575        let mut maker = Maker::new(true, false, Tz::default());
2576        let result = maker.make_data_type(&schema, None, None).unwrap();
2577
2578        assert!(matches!(result.codec, Codec::Utf8View));
2579    }
2580
2581    #[test]
2582    fn test_string_without_utf8view_enabled() {
2583        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2584
2585        let mut maker = Maker::new(false, false, Tz::default());
2586        let result = maker.make_data_type(&schema, None, None).unwrap();
2587
2588        assert!(matches!(result.codec, Codec::Utf8));
2589    }
2590
2591    #[test]
2592    fn test_record_with_string_and_utf8view_enabled() {
2593        let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2594
2595        let avro_field = crate::schema::Field {
2596            name: "string_field",
2597            r#type: field_schema,
2598            default: None,
2599            doc: None,
2600            aliases: vec![],
2601        };
2602
2603        let record = Record {
2604            name: "test_record",
2605            namespace: None,
2606            aliases: vec![],
2607            doc: None,
2608            fields: vec![avro_field],
2609            attributes: Attributes::default(),
2610        };
2611
2612        let schema = Schema::Complex(ComplexType::Record(record));
2613
2614        let mut maker = Maker::new(true, false, Tz::default());
2615        let result = maker.make_data_type(&schema, None, None).unwrap();
2616
2617        if let Codec::Struct(fields) = &result.codec {
2618            let first_field_codec = &fields[0].data_type().codec;
2619            assert!(matches!(first_field_codec, Codec::Utf8View));
2620        } else {
2621            panic!("Expected Struct codec");
2622        }
2623    }
2624
2625    #[test]
2626    fn test_union_with_strict_mode() {
2627        let schema = Schema::Union(vec![
2628            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2629            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2630        ]);
2631
2632        let mut maker = Maker::new(false, true, Tz::default());
2633        let result = maker.make_data_type(&schema, None, None);
2634
2635        assert!(result.is_err());
2636        match result {
2637            Err(ArrowError::SchemaError(msg)) => {
2638                assert!(msg.contains(
2639                    "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2640                ));
2641            }
2642            _ => panic!("Expected SchemaError"),
2643        }
2644    }
2645
2646    #[test]
2647    fn test_resolve_int_to_float_promotion() {
2648        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2649        assert!(matches!(result.codec, Codec::Float32));
2650        assert_eq!(
2651            result.resolution,
2652            Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2653        );
2654    }
2655
2656    #[test]
2657    fn test_resolve_int_to_double_promotion() {
2658        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2659        assert!(matches!(result.codec, Codec::Float64));
2660        assert_eq!(
2661            result.resolution,
2662            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2663        );
2664    }
2665
2666    #[test]
2667    fn test_resolve_long_to_float_promotion() {
2668        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2669        assert!(matches!(result.codec, Codec::Float32));
2670        assert_eq!(
2671            result.resolution,
2672            Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2673        );
2674    }
2675
2676    #[test]
2677    fn test_resolve_long_to_double_promotion() {
2678        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2679        assert!(matches!(result.codec, Codec::Float64));
2680        assert_eq!(
2681            result.resolution,
2682            Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2683        );
2684    }
2685
2686    #[test]
2687    fn test_resolve_float_to_double_promotion() {
2688        let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2689        assert!(matches!(result.codec, Codec::Float64));
2690        assert_eq!(
2691            result.resolution,
2692            Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2693        );
2694    }
2695
2696    #[test]
2697    fn test_resolve_string_to_bytes_promotion() {
2698        let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2699        assert!(matches!(result.codec, Codec::Binary));
2700        assert_eq!(
2701            result.resolution,
2702            Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2703        );
2704    }
2705
2706    #[test]
2707    fn test_resolve_bytes_to_string_promotion() {
2708        let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2709        assert!(matches!(result.codec, Codec::Utf8));
2710        assert_eq!(
2711            result.resolution,
2712            Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2713        );
2714    }
2715
2716    #[test]
2717    fn test_resolve_illegal_promotion_double_to_float_errors() {
2718        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2719        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2720        let mut maker = Maker::new(false, false, Tz::default());
2721        let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2722        assert!(result.is_err());
2723        match result {
2724            Err(ArrowError::ParseError(msg)) => {
2725                assert!(msg.contains("Illegal promotion"));
2726            }
2727            _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2728        }
2729    }
2730
2731    #[test]
2732    fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2733        let writer = Schema::Union(vec![
2734            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2735            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2736        ]);
2737        let reader = Schema::Union(vec![
2738            Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2739            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2740        ]);
2741        let mut maker = Maker::new(false, false, Tz::default());
2742        let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2743        assert!(matches!(result.codec, Codec::Float64));
2744        assert_eq!(
2745            result.resolution,
2746            Some(ResolutionInfo::Union(ResolvedUnion {
2747                writer_to_reader: [
2748                    None,
2749                    Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2750                ]
2751                .into(),
2752                writer_is_union: true,
2753                reader_is_union: true,
2754            }))
2755        );
2756        assert_eq!(result.nullability, Some(Nullability::NullFirst));
2757    }
2758
2759    #[test]
2760    fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2761        let writer = mk_union(vec![
2762            mk_primitive(PrimitiveType::String),
2763            mk_primitive(PrimitiveType::Long),
2764        ]);
2765        let reader = mk_primitive(PrimitiveType::Bytes);
2766        let mut maker = Maker::new(false, false, Tz::default());
2767        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2768        assert!(matches!(dt.codec(), Codec::Binary));
2769        let resolved = match dt.resolution {
2770            Some(ResolutionInfo::Union(u)) => u,
2771            other => panic!("expected union resolution info, got {other:?}"),
2772        };
2773        assert!(resolved.writer_is_union && !resolved.reader_is_union);
2774        assert_eq!(
2775            resolved.writer_to_reader.as_ref(),
2776            &[
2777                Some((0, ResolutionInfo::Promotion(Promotion::StringToBytes))),
2778                None
2779            ]
2780        );
2781    }
2782
2783    #[test]
2784    fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2785        let writer = mk_primitive(PrimitiveType::Long);
2786        let reader = mk_union(vec![
2787            mk_primitive(PrimitiveType::Long),
2788            mk_primitive(PrimitiveType::Double),
2789        ]);
2790        let mut maker = Maker::new(false, false, Tz::default());
2791        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2792        let resolved = match dt.resolution {
2793            Some(ResolutionInfo::Union(u)) => u,
2794            other => panic!("expected union resolution info, got {other:?}"),
2795        };
2796        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2797        assert_eq!(
2798            resolved.writer_to_reader.as_ref(),
2799            &[Some((0, ResolutionInfo::Promotion(Promotion::Direct)))]
2800        );
2801    }
2802
2803    #[test]
2804    fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2805        let writer = mk_primitive(PrimitiveType::Int);
2806        let reader = mk_union(vec![
2807            mk_primitive(PrimitiveType::Null),
2808            mk_primitive(PrimitiveType::Long),
2809            mk_primitive(PrimitiveType::String),
2810        ]);
2811        let mut maker = Maker::new(false, false, Tz::default());
2812        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2813        let resolved = match dt.resolution {
2814            Some(ResolutionInfo::Union(u)) => u,
2815            other => panic!("expected union resolution info, got {other:?}"),
2816        };
2817        assert_eq!(
2818            resolved.writer_to_reader.as_ref(),
2819            &[Some((1, ResolutionInfo::Promotion(Promotion::IntToLong)))]
2820        );
2821    }
2822
2823    #[test]
2824    fn test_resolve_writer_non_union_to_reader_union_preserves_inner_record_defaults() {
2825        // Writer: record Inner{a: int}
2826        // Reader: union [Inner{a: int, b: int default 42}, string]
2827        // The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
2828        let writer = Schema::Complex(ComplexType::Record(Record {
2829            name: "Inner",
2830            namespace: None,
2831            doc: None,
2832            aliases: vec![],
2833            fields: vec![AvroFieldSchema {
2834                name: "a",
2835                doc: None,
2836                r#type: mk_primitive(PrimitiveType::Int),
2837                default: None,
2838                aliases: vec![],
2839            }],
2840            attributes: Attributes::default(),
2841        }));
2842        let reader = mk_union(vec![
2843            Schema::Complex(ComplexType::Record(Record {
2844                name: "Inner",
2845                namespace: None,
2846                doc: None,
2847                aliases: vec![],
2848                fields: vec![
2849                    AvroFieldSchema {
2850                        name: "a",
2851                        doc: None,
2852                        r#type: mk_primitive(PrimitiveType::Int),
2853                        default: None,
2854                        aliases: vec![],
2855                    },
2856                    AvroFieldSchema {
2857                        name: "b",
2858                        doc: None,
2859                        r#type: mk_primitive(PrimitiveType::Int),
2860                        default: Some(Value::Number(serde_json::Number::from(42))),
2861                        aliases: vec![],
2862                    },
2863                ],
2864                attributes: Attributes::default(),
2865            })),
2866            mk_primitive(PrimitiveType::String),
2867        ]);
2868        let mut maker = Maker::new(false, false, Default::default());
2869        let dt = maker
2870            .make_data_type(&writer, Some(&reader), None)
2871            .expect("resolution should succeed");
2872        // Verify the union resolution structure
2873        let resolved = match dt.resolution.as_ref() {
2874            Some(ResolutionInfo::Union(u)) => u,
2875            other => panic!("expected union resolution info, got {other:?}"),
2876        };
2877        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2878        assert_eq!(
2879            resolved.writer_to_reader.len(),
2880            1,
2881            "expected the non-union record to resolve to a union variant"
2882        );
2883        let resolution = match resolved.writer_to_reader.first().unwrap() {
2884            Some((0, resolution)) => resolution,
2885            other => panic!("unexpected writer-to-reader table value {other:?}"),
2886        };
2887        match resolution {
2888            ResolutionInfo::Record(ResolvedRecord {
2889                writer_fields,
2890                default_fields,
2891            }) => {
2892                assert_eq!(writer_fields.len(), 1);
2893                assert!(matches!(writer_fields[0], ResolvedField::ToReader(0, _)));
2894                assert_eq!(default_fields.len(), 1);
2895                assert_eq!(default_fields[0], 1);
2896            }
2897            other => panic!("unexpected resolution {other:?}"),
2898        }
2899        // The matching child (Inner at index 0) should have field b with DefaultValue
2900        let children = match dt.codec() {
2901            Codec::Union(children, _, _) => children,
2902            other => panic!("expected union codec, got {other:?}"),
2903        };
2904        let inner_fields = match children[0].codec() {
2905            Codec::Struct(f) => f,
2906            other => panic!("expected struct codec for Inner, got {other:?}"),
2907        };
2908        assert_eq!(inner_fields.len(), 2);
2909        assert_eq!(inner_fields[1].name(), "b");
2910        assert_eq!(
2911            inner_fields[1].data_type().resolution,
2912            Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2913            "field b should have DefaultValue(Int(42)) from schema resolution"
2914        );
2915    }
2916
2917    #[test]
2918    fn test_resolve_writer_union_to_reader_union_preserves_inner_record_defaults() {
2919        // Writer: record [string, Inner{a: int}]
2920        // Reader: union [Inner{a: int, b: int default 42}, string]
2921        // The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
2922        let writer = mk_union(vec![
2923            mk_primitive(PrimitiveType::String),
2924            Schema::Complex(ComplexType::Record(Record {
2925                name: "Inner",
2926                namespace: None,
2927                doc: None,
2928                aliases: vec![],
2929                fields: vec![AvroFieldSchema {
2930                    name: "a",
2931                    doc: None,
2932                    r#type: mk_primitive(PrimitiveType::Int),
2933                    default: None,
2934                    aliases: vec![],
2935                }],
2936                attributes: Attributes::default(),
2937            })),
2938        ]);
2939        let reader = mk_union(vec![
2940            Schema::Complex(ComplexType::Record(Record {
2941                name: "Inner",
2942                namespace: None,
2943                doc: None,
2944                aliases: vec![],
2945                fields: vec![
2946                    AvroFieldSchema {
2947                        name: "a",
2948                        doc: None,
2949                        r#type: mk_primitive(PrimitiveType::Int),
2950                        default: None,
2951                        aliases: vec![],
2952                    },
2953                    AvroFieldSchema {
2954                        name: "b",
2955                        doc: None,
2956                        r#type: mk_primitive(PrimitiveType::Int),
2957                        default: Some(Value::Number(serde_json::Number::from(42))),
2958                        aliases: vec![],
2959                    },
2960                ],
2961                attributes: Attributes::default(),
2962            })),
2963            mk_primitive(PrimitiveType::String),
2964        ]);
2965        let mut maker = Maker::new(false, false, Default::default());
2966        let dt = maker
2967            .make_data_type(&writer, Some(&reader), None)
2968            .expect("resolution should succeed");
2969        // Verify the union resolution structure
2970        let resolved = match dt.resolution.as_ref() {
2971            Some(ResolutionInfo::Union(u)) => u,
2972            other => panic!("expected union resolution info, got {other:?}"),
2973        };
2974        assert!(resolved.writer_is_union && resolved.reader_is_union);
2975        assert_eq!(resolved.writer_to_reader.len(), 2);
2976        let resolution = match resolved.writer_to_reader[1].as_ref() {
2977            Some((0, resolution)) => resolution,
2978            other => panic!("unexpected writer-to-reader table value {other:?}"),
2979        };
2980        match resolution {
2981            ResolutionInfo::Record(ResolvedRecord {
2982                writer_fields,
2983                default_fields,
2984            }) => {
2985                assert_eq!(writer_fields.len(), 1);
2986                assert!(matches!(writer_fields[0], ResolvedField::ToReader(0, _)));
2987                assert_eq!(default_fields.len(), 1);
2988                assert_eq!(default_fields[0], 1);
2989            }
2990            other => panic!("unexpected resolution {other:?}"),
2991        }
2992        // The matching child (Inner at index 0) should have field b with DefaultValue
2993        let children = match dt.codec() {
2994            Codec::Union(children, _, _) => children,
2995            other => panic!("expected union codec, got {other:?}"),
2996        };
2997        let inner_fields = match children[0].codec() {
2998            Codec::Struct(f) => f,
2999            other => panic!("expected struct codec for Inner, got {other:?}"),
3000        };
3001        assert_eq!(inner_fields.len(), 2);
3002        assert_eq!(inner_fields[1].name(), "b");
3003        assert_eq!(
3004            inner_fields[1].data_type().resolution,
3005            Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
3006            "field b should have DefaultValue(Int(42)) from schema resolution"
3007        );
3008    }
3009
3010    #[test]
3011    fn test_resolve_both_nullable_unions_direct_match() {
3012        let writer = mk_union(vec![
3013            mk_primitive(PrimitiveType::Null),
3014            mk_primitive(PrimitiveType::String),
3015        ]);
3016        let reader = mk_union(vec![
3017            mk_primitive(PrimitiveType::String),
3018            mk_primitive(PrimitiveType::Null),
3019        ]);
3020        let mut maker = Maker::new(false, false, Tz::default());
3021        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
3022        assert!(matches!(dt.codec(), Codec::Utf8));
3023        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
3024        assert_eq!(
3025            dt.resolution,
3026            Some(ResolutionInfo::Union(ResolvedUnion {
3027                writer_to_reader: [
3028                    None,
3029                    Some((0, ResolutionInfo::Promotion(Promotion::Direct)))
3030                ]
3031                .into(),
3032                writer_is_union: true,
3033                reader_is_union: true
3034            }))
3035        );
3036    }
3037
3038    #[test]
3039    fn test_resolve_both_nullable_unions_with_promotion() {
3040        let writer = mk_union(vec![
3041            mk_primitive(PrimitiveType::Null),
3042            mk_primitive(PrimitiveType::Int),
3043        ]);
3044        let reader = mk_union(vec![
3045            mk_primitive(PrimitiveType::Double),
3046            mk_primitive(PrimitiveType::Null),
3047        ]);
3048        let mut maker = Maker::new(false, false, Tz::default());
3049        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
3050        assert!(matches!(dt.codec(), Codec::Float64));
3051        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
3052        assert_eq!(
3053            dt.resolution,
3054            Some(ResolutionInfo::Union(ResolvedUnion {
3055                writer_to_reader: [
3056                    None,
3057                    Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
3058                ]
3059                .into(),
3060                writer_is_union: true,
3061                reader_is_union: true
3062            }))
3063        );
3064    }
3065
3066    #[test]
3067    fn test_resolve_type_promotion() {
3068        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3069        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
3070        let mut maker = Maker::new(false, false, Tz::default());
3071        let result = maker
3072            .make_data_type(&writer_schema, Some(&reader_schema), None)
3073            .unwrap();
3074        assert!(matches!(result.codec, Codec::Int64));
3075        assert_eq!(
3076            result.resolution,
3077            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3078        );
3079    }
3080
3081    #[test]
3082    fn test_nested_record_type_reuse_without_namespace() {
3083        let schema_str = r#"
3084        {
3085          "type": "record",
3086          "name": "Record",
3087          "fields": [
3088            {
3089              "name": "nested",
3090              "type": {
3091                "type": "record",
3092                "name": "Nested",
3093                "fields": [
3094                  { "name": "nested_int", "type": "int" }
3095                ]
3096              }
3097            },
3098            { "name": "nestedRecord", "type": "Nested" },
3099            { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
3100            { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
3101          ]
3102        }
3103        "#;
3104
3105        let schema: Schema = serde_json::from_str(schema_str).unwrap();
3106
3107        let mut maker = Maker::new(false, false, Tz::default());
3108        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3109
3110        if let Codec::Struct(fields) = avro_data_type.codec() {
3111            assert_eq!(fields.len(), 4);
3112
3113            // nested
3114            assert_eq!(fields[0].name(), "nested");
3115            let nested_data_type = fields[0].data_type();
3116            if let Codec::Struct(nested_fields) = nested_data_type.codec() {
3117                assert_eq!(nested_fields.len(), 1);
3118                assert_eq!(nested_fields[0].name(), "nested_int");
3119                assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
3120            } else {
3121                panic!(
3122                    "'nested' field is not a struct but {:?}",
3123                    nested_data_type.codec()
3124                );
3125            }
3126
3127            // nestedRecord
3128            assert_eq!(fields[1].name(), "nestedRecord");
3129            let nested_record_data_type = fields[1].data_type();
3130            assert_eq!(
3131                nested_record_data_type.codec().data_type(),
3132                nested_data_type.codec().data_type()
3133            );
3134
3135            // nestedArray
3136            assert_eq!(fields[2].name(), "nestedArray");
3137            if let Codec::List(item_type) = fields[2].data_type().codec() {
3138                assert_eq!(
3139                    item_type.codec().data_type(),
3140                    nested_data_type.codec().data_type()
3141                );
3142            } else {
3143                panic!("'nestedArray' field is not a list");
3144            }
3145
3146            // nestedMap
3147            assert_eq!(fields[3].name(), "nestedMap");
3148            if let Codec::Map(value_type) = fields[3].data_type().codec() {
3149                assert_eq!(
3150                    value_type.codec().data_type(),
3151                    nested_data_type.codec().data_type()
3152                );
3153            } else {
3154                panic!("'nestedMap' field is not a map");
3155            }
3156        } else {
3157            panic!("Top-level schema is not a struct");
3158        }
3159    }
3160
3161    #[test]
3162    fn test_nested_enum_type_reuse_with_namespace() {
3163        let schema_str = r#"
3164        {
3165          "type": "record",
3166          "name": "Record",
3167          "namespace": "record_ns",
3168          "fields": [
3169            {
3170              "name": "status",
3171              "type": {
3172                "type": "enum",
3173                "name": "Status",
3174                "namespace": "enum_ns",
3175                "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
3176              }
3177            },
3178            { "name": "backupStatus", "type": "enum_ns.Status" },
3179            { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
3180            { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
3181          ]
3182        }
3183        "#;
3184
3185        let schema: Schema = serde_json::from_str(schema_str).unwrap();
3186
3187        let mut maker = Maker::new(false, false, Tz::default());
3188        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3189
3190        if let Codec::Struct(fields) = avro_data_type.codec() {
3191            assert_eq!(fields.len(), 4);
3192
3193            // status
3194            assert_eq!(fields[0].name(), "status");
3195            let status_data_type = fields[0].data_type();
3196            if let Codec::Enum(symbols) = status_data_type.codec() {
3197                assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
3198            } else {
3199                panic!(
3200                    "'status' field is not an enum but {:?}",
3201                    status_data_type.codec()
3202                );
3203            }
3204
3205            // backupStatus
3206            assert_eq!(fields[1].name(), "backupStatus");
3207            let backup_status_data_type = fields[1].data_type();
3208            assert_eq!(
3209                backup_status_data_type.codec().data_type(),
3210                status_data_type.codec().data_type()
3211            );
3212
3213            // statusHistory
3214            assert_eq!(fields[2].name(), "statusHistory");
3215            if let Codec::List(item_type) = fields[2].data_type().codec() {
3216                assert_eq!(
3217                    item_type.codec().data_type(),
3218                    status_data_type.codec().data_type()
3219                );
3220            } else {
3221                panic!("'statusHistory' field is not a list");
3222            }
3223
3224            // statusMap
3225            assert_eq!(fields[3].name(), "statusMap");
3226            if let Codec::Map(value_type) = fields[3].data_type().codec() {
3227                assert_eq!(
3228                    value_type.codec().data_type(),
3229                    status_data_type.codec().data_type()
3230                );
3231            } else {
3232                panic!("'statusMap' field is not a map");
3233            }
3234        } else {
3235            panic!("Top-level schema is not a struct");
3236        }
3237    }
3238
3239    #[test]
3240    fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
3241        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3242        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3243        let mut maker = Maker::new(false, false, Tz::default());
3244        let data_type = maker
3245            .make_data_type(&writer_schema, Some(&reader_schema), None)
3246            .expect("resolution should succeed");
3247        let field = AvroField {
3248            name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
3249            data_type,
3250        };
3251        assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
3252        assert!(matches!(field.data_type().codec(), Codec::Utf8));
3253    }
3254
3255    fn json_string(s: &str) -> Value {
3256        Value::String(s.to_string())
3257    }
3258
3259    fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
3260        let stored = dt
3261            .metadata
3262            .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
3263            .cloned()
3264            .unwrap_or_default();
3265        let expected = serde_json::to_string(default_json).unwrap();
3266        assert_eq!(stored, expected, "stored default metadata should match");
3267    }
3268
3269    #[test]
3270    fn test_validate_and_store_default_null_and_nullability_rules() {
3271        let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
3272        let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
3273        assert_eq!(lit, AvroLiteral::Null);
3274        assert_default_stored(&dt_null, &Value::Null);
3275        let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3276        let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
3277        assert!(
3278            err.to_string()
3279                .contains("JSON null default is only valid for `null` type"),
3280            "unexpected error: {err}"
3281        );
3282        let mut dt_int_nf =
3283            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
3284        let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
3285        assert_eq!(lit2, AvroLiteral::Null);
3286        assert_default_stored(&dt_int_nf, &Value::Null);
3287        let mut dt_int_ns =
3288            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
3289        let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
3290        assert!(
3291            err2.to_string()
3292                .contains("JSON null default is only valid for `null` type"),
3293            "unexpected error: {err2}"
3294        );
3295    }
3296
3297    #[test]
3298    fn test_validate_and_store_default_primitives_and_temporal() {
3299        let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
3300        let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
3301        assert_eq!(lit, AvroLiteral::Boolean(true));
3302        assert_default_stored(&dt_bool, &Value::Bool(true));
3303        let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3304        let lit = dt_i32
3305            .parse_and_store_default(&serde_json::json!(123))
3306            .unwrap();
3307        assert_eq!(lit, AvroLiteral::Int(123));
3308        assert_default_stored(&dt_i32, &serde_json::json!(123));
3309        let err = dt_i32
3310            .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
3311            .unwrap_err();
3312        assert!(format!("{err}").contains("out of i32 range"));
3313        let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3314        let lit = dt_i64
3315            .parse_and_store_default(&serde_json::json!(1234567890))
3316            .unwrap();
3317        assert_eq!(lit, AvroLiteral::Long(1234567890));
3318        assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
3319        let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
3320        let lit = dt_f32
3321            .parse_and_store_default(&serde_json::json!(1.25))
3322            .unwrap();
3323        assert_eq!(lit, AvroLiteral::Float(1.25));
3324        assert_default_stored(&dt_f32, &serde_json::json!(1.25));
3325        let err = dt_f32
3326            .parse_and_store_default(&serde_json::json!(1e39))
3327            .unwrap_err();
3328        assert!(format!("{err}").contains("out of f32 range"));
3329        let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3330        let lit = dt_f64
3331            .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
3332            .unwrap();
3333        assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
3334        assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
3335        let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
3336        let l = dt_str
3337            .parse_and_store_default(&json_string("hello"))
3338            .unwrap();
3339        assert_eq!(l, AvroLiteral::String("hello".into()));
3340        assert_default_stored(&dt_str, &json_string("hello"));
3341        let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
3342        let l = dt_strv
3343            .parse_and_store_default(&json_string("view"))
3344            .unwrap();
3345        assert_eq!(l, AvroLiteral::String("view".into()));
3346        assert_default_stored(&dt_strv, &json_string("view"));
3347        let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
3348        let l = dt_uuid
3349            .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
3350            .unwrap();
3351        assert_eq!(
3352            l,
3353            AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
3354        );
3355        let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
3356        let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
3357        assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
3358        let err = dt_bin
3359            .parse_and_store_default(&json_string("€")) // U+20AC
3360            .unwrap_err();
3361        assert!(format!("{err}").contains("Invalid codepoint"));
3362        let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
3363        let ld = dt_date
3364            .parse_and_store_default(&serde_json::json!(1))
3365            .unwrap();
3366        assert_eq!(ld, AvroLiteral::Int(1));
3367        let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
3368        let lt = dt_tmill
3369            .parse_and_store_default(&serde_json::json!(86_400_000))
3370            .unwrap();
3371        assert_eq!(lt, AvroLiteral::Int(86_400_000));
3372        let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
3373        let ltm = dt_tmicros
3374            .parse_and_store_default(&serde_json::json!(1_000_000))
3375            .unwrap();
3376        assert_eq!(ltm, AvroLiteral::Long(1_000_000));
3377        let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(None), HashMap::new(), None);
3378        let l1 = dt_ts_milli
3379            .parse_and_store_default(&serde_json::json!(123))
3380            .unwrap();
3381        assert_eq!(l1, AvroLiteral::Long(123));
3382        let mut dt_ts_micro = AvroDataType::new(Codec::TimestampMicros(None), HashMap::new(), None);
3383        let l2 = dt_ts_micro
3384            .parse_and_store_default(&serde_json::json!(456))
3385            .unwrap();
3386        assert_eq!(l2, AvroLiteral::Long(456));
3387    }
3388
3389    #[cfg(feature = "avro_custom_types")]
3390    #[test]
3391    fn test_validate_and_store_default_custom_integer_ranges() {
3392        let mut dt_i8 = AvroDataType::new(Codec::Int8, HashMap::new(), None);
3393        let lit_i8 = dt_i8
3394            .parse_and_store_default(&serde_json::json!(i8::MAX))
3395            .unwrap();
3396        assert_eq!(lit_i8, AvroLiteral::Int(i8::MAX as i32));
3397        let err_i8_high = dt_i8
3398            .parse_and_store_default(&serde_json::json!(i8::MAX as i64 + 1))
3399            .unwrap_err();
3400        assert!(err_i8_high.to_string().contains("out of i8 range"));
3401        let err_i8_low = dt_i8
3402            .parse_and_store_default(&serde_json::json!(i8::MIN as i64 - 1))
3403            .unwrap_err();
3404        assert!(err_i8_low.to_string().contains("out of i8 range"));
3405
3406        let mut dt_i16 = AvroDataType::new(Codec::Int16, HashMap::new(), None);
3407        let lit_i16 = dt_i16
3408            .parse_and_store_default(&serde_json::json!(i16::MIN))
3409            .unwrap();
3410        assert_eq!(lit_i16, AvroLiteral::Int(i16::MIN as i32));
3411        let err_i16_high = dt_i16
3412            .parse_and_store_default(&serde_json::json!(i16::MAX as i64 + 1))
3413            .unwrap_err();
3414        assert!(err_i16_high.to_string().contains("out of i16 range"));
3415        let err_i16_low = dt_i16
3416            .parse_and_store_default(&serde_json::json!(i16::MIN as i64 - 1))
3417            .unwrap_err();
3418        assert!(err_i16_low.to_string().contains("out of i16 range"));
3419
3420        let mut dt_u8 = AvroDataType::new(Codec::UInt8, HashMap::new(), None);
3421        let lit_u8 = dt_u8
3422            .parse_and_store_default(&serde_json::json!(u8::MAX))
3423            .unwrap();
3424        assert_eq!(lit_u8, AvroLiteral::Int(u8::MAX as i32));
3425        let err_u8_neg = dt_u8
3426            .parse_and_store_default(&serde_json::json!(-1))
3427            .unwrap_err();
3428        assert!(err_u8_neg.to_string().contains("out of u8 range"));
3429        let err_u8_high = dt_u8
3430            .parse_and_store_default(&serde_json::json!(u8::MAX as i64 + 1))
3431            .unwrap_err();
3432        assert!(err_u8_high.to_string().contains("out of u8 range"));
3433
3434        let mut dt_u16 = AvroDataType::new(Codec::UInt16, HashMap::new(), None);
3435        let lit_u16 = dt_u16
3436            .parse_and_store_default(&serde_json::json!(u16::MAX))
3437            .unwrap();
3438        assert_eq!(lit_u16, AvroLiteral::Int(u16::MAX as i32));
3439        let err_u16_neg = dt_u16
3440            .parse_and_store_default(&serde_json::json!(-1))
3441            .unwrap_err();
3442        assert!(err_u16_neg.to_string().contains("out of u16 range"));
3443        let err_u16_high = dt_u16
3444            .parse_and_store_default(&serde_json::json!(u16::MAX as i64 + 1))
3445            .unwrap_err();
3446        assert!(err_u16_high.to_string().contains("out of u16 range"));
3447
3448        let mut dt_u32 = AvroDataType::new(Codec::UInt32, HashMap::new(), None);
3449        let lit_u32 = dt_u32
3450            .parse_and_store_default(&serde_json::json!(u32::MAX as i64))
3451            .unwrap();
3452        assert_eq!(lit_u32, AvroLiteral::Long(u32::MAX as i64));
3453        let err_u32_neg = dt_u32
3454            .parse_and_store_default(&serde_json::json!(-1))
3455            .unwrap_err();
3456        assert!(err_u32_neg.to_string().contains("out of u32 range"));
3457        let err_u32_high = dt_u32
3458            .parse_and_store_default(&serde_json::json!(u32::MAX as i64 + 1))
3459            .unwrap_err();
3460        assert!(err_u32_high.to_string().contains("out of u32 range"));
3461    }
3462
3463    #[test]
3464    fn test_validate_and_store_default_fixed_decimal_interval() {
3465        let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
3466        let l = dt_fixed
3467            .parse_and_store_default(&json_string("WXYZ"))
3468            .unwrap();
3469        assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
3470        let err = dt_fixed
3471            .parse_and_store_default(&json_string("TOO LONG"))
3472            .unwrap_err();
3473        assert!(err.to_string().contains("Default length"));
3474        let mut dt_dec_fixed =
3475            AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
3476        let l = dt_dec_fixed
3477            .parse_and_store_default(&json_string("abc"))
3478            .unwrap();
3479        assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
3480        let err = dt_dec_fixed
3481            .parse_and_store_default(&json_string("toolong"))
3482            .unwrap_err();
3483        assert!(err.to_string().contains("Default length"));
3484        let mut dt_dec_bytes =
3485            AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3486        let l = dt_dec_bytes
3487            .parse_and_store_default(&json_string("freeform"))
3488            .unwrap();
3489        assert_eq!(
3490            l,
3491            AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
3492        );
3493        let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
3494        let l = dt_interval
3495            .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
3496            .unwrap();
3497        assert_eq!(
3498            l,
3499            AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
3500        );
3501        let err = dt_interval
3502            .parse_and_store_default(&json_string("short"))
3503            .unwrap_err();
3504        assert!(err.to_string().contains("Default length"));
3505    }
3506
3507    #[test]
3508    fn test_validate_and_store_default_enum_list_map_struct() {
3509        let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
3510            .into_iter()
3511            .collect();
3512        let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
3513        let l = dt_enum
3514            .parse_and_store_default(&json_string("GREEN"))
3515            .unwrap();
3516        assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
3517        let err = dt_enum
3518            .parse_and_store_default(&json_string("YELLOW"))
3519            .unwrap_err();
3520        assert!(err.to_string().contains("Default enum symbol"));
3521        let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3522        let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
3523        let val = serde_json::json!([1, 2, 3]);
3524        let l = dt_list.parse_and_store_default(&val).unwrap();
3525        assert_eq!(
3526            l,
3527            AvroLiteral::Array(vec![
3528                AvroLiteral::Long(1),
3529                AvroLiteral::Long(2),
3530                AvroLiteral::Long(3)
3531            ])
3532        );
3533        let err = dt_list
3534            .parse_and_store_default(&serde_json::json!({"not":"array"}))
3535            .unwrap_err();
3536        assert!(err.to_string().contains("JSON array"));
3537        let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3538        let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
3539        let mv = serde_json::json!({"x": 1.5, "y": 2.5});
3540        let l = dt_map.parse_and_store_default(&mv).unwrap();
3541        let mut expected = IndexMap::new();
3542        expected.insert("x".into(), AvroLiteral::Double(1.5));
3543        expected.insert("y".into(), AvroLiteral::Double(2.5));
3544        assert_eq!(l, AvroLiteral::Map(expected));
3545        // Not object -> error
3546        let err = dt_map
3547            .parse_and_store_default(&serde_json::json!(123))
3548            .unwrap_err();
3549        assert!(err.to_string().contains("JSON object"));
3550        let mut field_a = AvroField {
3551            name: "a".into(),
3552            data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
3553        };
3554        let field_b = AvroField {
3555            name: "b".into(),
3556            data_type: AvroDataType::new(
3557                Codec::Int64,
3558                HashMap::new(),
3559                Some(Nullability::NullFirst),
3560            ),
3561        };
3562        let mut c_md = HashMap::new();
3563        c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
3564        let field_c = AvroField {
3565            name: "c".into(),
3566            data_type: AvroDataType::new(Codec::Utf8, c_md, None),
3567        };
3568        field_a.data_type.metadata.insert("doc".into(), "na".into());
3569        let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
3570        let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
3571        let default_obj = serde_json::json!({"a": 7});
3572        let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
3573        let mut expected = IndexMap::new();
3574        expected.insert("a".into(), AvroLiteral::Int(7));
3575        expected.insert("b".into(), AvroLiteral::Null);
3576        expected.insert("c".into(), AvroLiteral::String("xyz".into()));
3577        assert_eq!(l, AvroLiteral::Map(expected));
3578        assert_default_stored(&dt_struct, &default_obj);
3579        let req_field = AvroField {
3580            name: "req".into(),
3581            data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
3582        };
3583        let mut dt_bad = AvroDataType::new(
3584            Codec::Struct(Arc::from(vec![req_field])),
3585            HashMap::new(),
3586            None,
3587        );
3588        let err = dt_bad
3589            .parse_and_store_default(&serde_json::json!({}))
3590            .unwrap_err();
3591        assert!(
3592            err.to_string().contains("missing required subfield 'req'"),
3593            "unexpected error: {err}"
3594        );
3595        let err = dt_struct
3596            .parse_and_store_default(&serde_json::json!(10))
3597            .unwrap_err();
3598        err.to_string().contains("must be a JSON object");
3599    }
3600
3601    #[test]
3602    fn test_resolve_array_promotion_and_reader_metadata() {
3603        let mut w_add: HashMap<&str, Value> = HashMap::new();
3604        w_add.insert("who", json_string("writer"));
3605        let mut r_add: HashMap<&str, Value> = HashMap::new();
3606        r_add.insert("who", json_string("reader"));
3607        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3608            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3609            attributes: Attributes {
3610                logical_type: None,
3611                additional: w_add,
3612            },
3613        }));
3614        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3615            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
3616            attributes: Attributes {
3617                logical_type: None,
3618                additional: r_add,
3619            },
3620        }));
3621        let mut maker = Maker::new(false, false, Tz::default());
3622        let dt = maker
3623            .make_data_type(&writer_schema, Some(&reader_schema), None)
3624            .unwrap();
3625        assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
3626        if let Codec::List(inner) = dt.codec() {
3627            assert!(matches!(inner.codec(), Codec::Int64));
3628            assert_eq!(
3629                inner.resolution,
3630                Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3631            );
3632        } else {
3633            panic!("expected list codec");
3634        }
3635    }
3636
3637    #[test]
3638    fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
3639        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3640            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3641            attributes: Attributes::default(),
3642        }));
3643        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3644            items: Box::new(mk_union(vec![
3645                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3646                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3647            ])),
3648            attributes: Attributes::default(),
3649        }));
3650        let mut maker = Maker::new(false, false, Tz::default());
3651        let dt = maker
3652            .make_data_type(&writer_schema, Some(&reader_schema), None)
3653            .unwrap();
3654        if let Codec::List(inner) = dt.codec() {
3655            assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
3656            assert!(matches!(inner.codec(), Codec::Int32));
3657            match inner.resolution.as_ref() {
3658                Some(ResolutionInfo::Promotion(Promotion::Direct)) => {}
3659                other => panic!("expected Union resolution, got {other:?}"),
3660            }
3661        } else {
3662            panic!("expected List codec");
3663        }
3664    }
3665
3666    #[test]
3667    fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3668        let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3669            name: "MD5",
3670            namespace: None,
3671            aliases: vec!["Hash16"],
3672            size: 16,
3673            attributes: Attributes::default(),
3674        }));
3675        let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3676            name: "Hash16",
3677            namespace: None,
3678            aliases: vec![],
3679            size: 16,
3680            attributes: Attributes::default(),
3681        }));
3682        let mut maker = Maker::new(false, false, Tz::default());
3683        let dt = maker
3684            .make_data_type(&writer_schema, Some(&reader_schema), None)
3685            .unwrap();
3686        assert!(matches!(dt.codec(), Codec::Fixed(16)));
3687    }
3688
3689    #[cfg(feature = "avro_custom_types")]
3690    #[test]
3691    fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3692        let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3693            name: "ArrowIntervalMDN",
3694            namespace: None,
3695            aliases: vec![],
3696            size: 16,
3697            attributes: Attributes {
3698                logical_type: Some("arrow.interval-month-day-nano"),
3699                additional: Default::default(),
3700            },
3701        }));
3702        let mut maker = Maker::new(false, false, Default::default());
3703        let dt = maker.make_data_type(&schema, None, None).unwrap();
3704        assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3705        assert_eq!(
3706            dt.codec.data_type(),
3707            DataType::Interval(IntervalUnit::MonthDayNano)
3708        );
3709    }
3710
3711    #[test]
3712    fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3713        let writer = Schema::Complex(ComplexType::Record(Record {
3714            name: "R",
3715            namespace: None,
3716            doc: None,
3717            aliases: vec![],
3718            fields: vec![
3719                crate::schema::Field {
3720                    name: "a",
3721                    doc: None,
3722                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3723                    default: None,
3724                    aliases: vec![],
3725                },
3726                crate::schema::Field {
3727                    name: "skipme",
3728                    doc: None,
3729                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3730                    default: None,
3731                    aliases: vec![],
3732                },
3733                crate::schema::Field {
3734                    name: "b",
3735                    doc: None,
3736                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3737                    default: None,
3738                    aliases: vec![],
3739                },
3740            ],
3741            attributes: Attributes::default(),
3742        }));
3743        let reader = Schema::Complex(ComplexType::Record(Record {
3744            name: "R",
3745            namespace: None,
3746            doc: None,
3747            aliases: vec![],
3748            fields: vec![
3749                crate::schema::Field {
3750                    name: "b",
3751                    doc: None,
3752                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3753                    default: None,
3754                    aliases: vec![],
3755                },
3756                crate::schema::Field {
3757                    name: "a",
3758                    doc: None,
3759                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3760                    default: None,
3761                    aliases: vec![],
3762                },
3763                crate::schema::Field {
3764                    name: "name",
3765                    doc: None,
3766                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3767                    default: Some(json_string("anon")),
3768                    aliases: vec![],
3769                },
3770                crate::schema::Field {
3771                    name: "opt",
3772                    doc: None,
3773                    r#type: Schema::Union(vec![
3774                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3775                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3776                    ]),
3777                    default: None, // should default to null because NullFirst
3778                    aliases: vec![],
3779                },
3780            ],
3781            attributes: Attributes::default(),
3782        }));
3783        let mut maker = Maker::new(false, false, Tz::default());
3784        let dt = maker
3785            .make_data_type(&writer, Some(&reader), None)
3786            .expect("record resolution");
3787        let fields = match dt.codec() {
3788            Codec::Struct(f) => f,
3789            other => panic!("expected struct, got {other:?}"),
3790        };
3791        assert_eq!(fields.len(), 4);
3792        assert_eq!(fields[0].name(), "b");
3793        assert_eq!(fields[1].name(), "a");
3794        assert_eq!(fields[2].name(), "name");
3795        assert_eq!(fields[3].name(), "opt");
3796        assert!(matches!(
3797            fields[1].data_type().resolution,
3798            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3799        ));
3800        let rec = match dt.resolution {
3801            Some(ResolutionInfo::Record(ref r)) => r.clone(),
3802            other => panic!("expected record resolution, got {other:?}"),
3803        };
3804        assert!(matches!(
3805            &rec.writer_fields[..],
3806            &[
3807                ResolvedField::ToReader(1, _),
3808                ResolvedField::Skip(_),
3809                ResolvedField::ToReader(0, _),
3810            ]
3811        ));
3812        assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3813        let ResolvedField::Skip(skip1) = &rec.writer_fields[1] else {
3814            panic!("should skip field 1")
3815        };
3816        assert!(matches!(skip1.codec(), Codec::Utf8));
3817        let name_md = &fields[2].data_type().metadata;
3818        assert_eq!(
3819            name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3820            Some(&"\"anon\"".to_string())
3821        );
3822        let opt_md = &fields[3].data_type().metadata;
3823        assert_eq!(
3824            opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3825            Some(&"null".to_string())
3826        );
3827    }
3828
3829    #[test]
3830    fn test_named_type_alias_resolution_record_cross_namespace() {
3831        let writer_record = Record {
3832            name: "PersonV2",
3833            namespace: Some("com.example.v2"),
3834            doc: None,
3835            aliases: vec!["com.example.Person"],
3836            fields: vec![
3837                AvroFieldSchema {
3838                    name: "name",
3839                    doc: None,
3840                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3841                    default: None,
3842                    aliases: vec![],
3843                },
3844                AvroFieldSchema {
3845                    name: "age",
3846                    doc: None,
3847                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3848                    default: None,
3849                    aliases: vec![],
3850                },
3851            ],
3852            attributes: Attributes::default(),
3853        };
3854        let reader_record = Record {
3855            name: "Person",
3856            namespace: Some("com.example"),
3857            doc: None,
3858            aliases: vec![],
3859            fields: writer_record.fields.clone(),
3860            attributes: Attributes::default(),
3861        };
3862        let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3863        let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3864        let mut maker = Maker::new(false, false, Tz::default());
3865        let result = maker
3866            .make_data_type(&writer_schema, Some(&reader_schema), None)
3867            .expect("record alias resolution should succeed");
3868        match result.codec {
3869            Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3870            other => panic!("expected struct, got {other:?}"),
3871        }
3872    }
3873
3874    #[test]
3875    fn test_named_type_alias_resolution_enum_cross_namespace() {
3876        let writer_enum = Enum {
3877            name: "ColorV2",
3878            namespace: Some("org.example.v2"),
3879            doc: None,
3880            aliases: vec!["org.example.Color"],
3881            symbols: vec!["RED", "GREEN", "BLUE"],
3882            default: None,
3883            attributes: Attributes::default(),
3884        };
3885        let reader_enum = Enum {
3886            name: "Color",
3887            namespace: Some("org.example"),
3888            doc: None,
3889            aliases: vec![],
3890            symbols: vec!["RED", "GREEN", "BLUE"],
3891            default: None,
3892            attributes: Attributes::default(),
3893        };
3894        let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3895        let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3896        let mut maker = Maker::new(false, false, Tz::default());
3897        maker
3898            .make_data_type(&writer_schema, Some(&reader_schema), None)
3899            .expect("enum alias resolution should succeed");
3900    }
3901
3902    #[test]
3903    fn test_named_type_alias_resolution_fixed_cross_namespace() {
3904        let writer_fixed = Fixed {
3905            name: "Fx10V2",
3906            namespace: Some("ns.v2"),
3907            aliases: vec!["ns.Fx10"],
3908            size: 10,
3909            attributes: Attributes::default(),
3910        };
3911        let reader_fixed = Fixed {
3912            name: "Fx10",
3913            namespace: Some("ns"),
3914            aliases: vec![],
3915            size: 10,
3916            attributes: Attributes::default(),
3917        };
3918        let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3919        let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3920        let mut maker = Maker::new(false, false, Tz::default());
3921        maker
3922            .make_data_type(&writer_schema, Some(&reader_schema), None)
3923            .expect("fixed alias resolution should succeed");
3924    }
3925}