Skip to main content

arrow_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Codec for Mapping Avro and Arrow types.
19
20use crate::schema::{
21    AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22    AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23    PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26    ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27    IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40/// Contains information about how to resolve differences between a writer's and a reader's schema.
41#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43    /// Indicates that the writer's type should be promoted to the reader's type.
44    Promotion(Promotion),
45    /// Indicates that a default value should be used for a field.
46    DefaultValue(AvroLiteral),
47    /// Provides mapping information for resolving enums.
48    EnumMapping(EnumMapping),
49    /// Provides resolution information for record fields.
50    Record(ResolvedRecord),
51    /// Provides mapping and shape info for resolving unions.
52    Union(ResolvedUnion),
53}
54
55/// Represents a literal Avro value.
56///
57/// This is used to represent default values in an Avro schema.
58#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60    /// Represents a null value.
61    Null,
62    /// Represents a boolean value.
63    Boolean(bool),
64    /// Represents an integer value.
65    Int(i32),
66    /// Represents a long value.
67    Long(i64),
68    /// Represents a float value.
69    Float(f32),
70    /// Represents a double value.
71    Double(f64),
72    /// Represents a bytes value.
73    Bytes(Vec<u8>),
74    /// Represents a string value.
75    String(String),
76    /// Represents an enum symbol.
77    Enum(String),
78    /// Represents a JSON array default for an Avro array, containing element literals.
79    Array(Vec<AvroLiteral>),
80    /// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals.
81    Map(IndexMap<String, AvroLiteral>),
82}
83
84/// Contains the necessary information to resolve a writer's record against a reader's record schema.
85#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87    /// Maps a writer's field index to the corresponding reader's field index.
88    /// `None` if the writer's field is not present in the reader's schema.
89    pub(crate) writer_to_reader: Arc<[Option<usize>]>,
90    /// A list of indices in the reader's schema for fields that have a default value.
91    pub(crate) default_fields: Arc<[usize]>,
92    /// For fields present in the writer's schema but not the reader's, this stores their data type.
93    /// This is needed to correctly skip over these fields during deserialization.
94    pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
95}
96
97/// Defines the type of promotion to be applied during schema resolution.
98///
99/// Schema resolution may require promoting a writer's data type to a reader's data type.
100/// For example, an `int` can be promoted to a `long`, `float`, or `double`.
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub(crate) enum Promotion {
103    /// Direct read with no data type promotion.
104    Direct,
105    /// Promotes an `int` to a `long`.
106    IntToLong,
107    /// Promotes an `int` to a `float`.
108    IntToFloat,
109    /// Promotes an `int` to a `double`.
110    IntToDouble,
111    /// Promotes a `long` to a `float`.
112    LongToFloat,
113    /// Promotes a `long` to a `double`.
114    LongToDouble,
115    /// Promotes a `float` to a `double`.
116    FloatToDouble,
117    /// Promotes a `string` to `bytes`.
118    StringToBytes,
119    /// Promotes `bytes` to a `string`.
120    BytesToString,
121}
122
123impl Display for Promotion {
124    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
125        match self {
126            Self::Direct => write!(formatter, "Direct"),
127            Self::IntToLong => write!(formatter, "Int->Long"),
128            Self::IntToFloat => write!(formatter, "Int->Float"),
129            Self::IntToDouble => write!(formatter, "Int->Double"),
130            Self::LongToFloat => write!(formatter, "Long->Float"),
131            Self::LongToDouble => write!(formatter, "Long->Double"),
132            Self::FloatToDouble => write!(formatter, "Float->Double"),
133            Self::StringToBytes => write!(formatter, "String->Bytes"),
134            Self::BytesToString => write!(formatter, "Bytes->String"),
135        }
136    }
137}
138
139/// Information required to resolve a writer union against a reader union (or single type).
140#[derive(Debug, Clone, PartialEq)]
141pub(crate) struct ResolvedUnion {
142    /// For each writer branch index, the reader branch index and how to read it.
143    /// `None` means the writer branch doesn't resolve against the reader.
144    pub(crate) writer_to_reader: Arc<[Option<(usize, ResolutionInfo)>]>,
145    /// Whether the writer schema at this site is a union
146    pub(crate) writer_is_union: bool,
147    /// Whether the reader schema at this site is a union
148    pub(crate) reader_is_union: bool,
149}
150
151/// Holds the mapping information for resolving Avro enums.
152///
153/// When resolving schemas, the writer's enum symbols must be mapped to the reader's symbols.
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub(crate) struct EnumMapping {
156    /// A mapping from the writer's symbol index to the reader's symbol index.
157    pub(crate) mapping: Arc<[i32]>,
158    /// The index to use for a writer's symbol that is not present in the reader's enum
159    /// and a default value is specified in the reader's schema.
160    pub(crate) default_index: i32,
161}
162
163#[cfg(feature = "canonical_extension_types")]
164fn with_extension_type(codec: &Codec, field: Field) -> Field {
165    match codec {
166        Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
167        _ => field,
168    }
169}
170
171/// An Avro datatype mapped to the arrow data model
172#[derive(Debug, Clone, PartialEq)]
173pub(crate) struct AvroDataType {
174    nullability: Option<Nullability>,
175    metadata: HashMap<String, String>,
176    codec: Codec,
177    pub(crate) resolution: Option<ResolutionInfo>,
178}
179
180impl AvroDataType {
181    /// Create a new [`AvroDataType`] with the given parts.
182    pub(crate) fn new(
183        codec: Codec,
184        metadata: HashMap<String, String>,
185        nullability: Option<Nullability>,
186    ) -> Self {
187        AvroDataType {
188            codec,
189            metadata,
190            nullability,
191            resolution: None,
192        }
193    }
194
195    #[inline]
196    fn new_with_resolution(
197        codec: Codec,
198        metadata: HashMap<String, String>,
199        nullability: Option<Nullability>,
200        resolution: Option<ResolutionInfo>,
201    ) -> Self {
202        Self {
203            codec,
204            metadata,
205            nullability,
206            resolution,
207        }
208    }
209
210    /// Returns an arrow [`Field`] with the given name
211    pub(crate) fn field_with_name(&self, name: &str) -> Field {
212        let mut nullable = self.nullability.is_some();
213        if !nullable {
214            if let Codec::Union(children, _, _) = self.codec() {
215                // If any encoded branch is `null`, mark field as nullable
216                if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
217                    nullable = true;
218                }
219            }
220        }
221        let data_type = self.codec.data_type();
222        let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
223        #[cfg(feature = "canonical_extension_types")]
224        return with_extension_type(&self.codec, field);
225        #[cfg(not(feature = "canonical_extension_types"))]
226        field
227    }
228
229    /// Returns a reference to the codec used by this data type
230    ///
231    /// The codec determines how Avro data is encoded and mapped to Arrow data types.
232    /// This is useful when we need to inspect or use the specific encoding of a field.
233    pub(crate) fn codec(&self) -> &Codec {
234        &self.codec
235    }
236
237    /// Returns the nullability status of this data type
238    ///
239    /// In Avro, nullability is represented through unions with null types.
240    /// The returned value indicates how nulls are encoded in the Avro format:
241    /// - `Some(Nullability::NullFirst)` - Nulls are encoded as the first union variant
242    /// - `Some(Nullability::NullSecond)` - Nulls are encoded as the second union variant
243    /// - `None` - The type is not nullable
244    pub(crate) fn nullability(&self) -> Option<Nullability> {
245        self.nullability
246    }
247
248    #[inline]
249    fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
250        fn expect_string<'v>(
251            default_json: &'v Value,
252            data_type: &str,
253        ) -> Result<&'v str, ArrowError> {
254            match default_json {
255                Value::String(s) => Ok(s.as_str()),
256                _ => Err(ArrowError::SchemaError(format!(
257                    "Default value must be a JSON string for {data_type}"
258                ))),
259            }
260        }
261
262        fn parse_bytes_default(
263            default_json: &Value,
264            expected_len: Option<usize>,
265        ) -> Result<Vec<u8>, ArrowError> {
266            let s = expect_string(default_json, "bytes/fixed logical types")?;
267            let mut out = Vec::with_capacity(s.len());
268            for ch in s.chars() {
269                let cp = ch as u32;
270                if cp > 0xFF {
271                    return Err(ArrowError::SchemaError(format!(
272                        "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
273                    )));
274                }
275                out.push(cp as u8);
276            }
277            if let Some(len) = expected_len {
278                if out.len() != len {
279                    return Err(ArrowError::SchemaError(format!(
280                        "Default length {} does not match expected fixed size {len}",
281                        out.len(),
282                    )));
283                }
284            }
285            Ok(out)
286        }
287
288        fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
289            match default_json {
290                Value::Number(n) => n.as_i64().ok_or_else(|| {
291                    ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
292                }),
293                _ => Err(ArrowError::SchemaError(format!(
294                    "Default {data_type} must be a JSON integer"
295                ))),
296            }
297        }
298
299        fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
300            match default_json {
301                Value::Number(n) => n.as_f64().ok_or_else(|| {
302                    ArrowError::SchemaError(format!("Default {data_type} must be a number"))
303                }),
304                _ => Err(ArrowError::SchemaError(format!(
305                    "Default {data_type} must be a JSON number"
306                ))),
307            }
308        }
309
310        // Handle JSON nulls per-spec: allowed only for `null` type or unions with null FIRST
311        if default_json.is_null() {
312            return match self.codec() {
313                Codec::Null => Ok(AvroLiteral::Null),
314                Codec::Union(encodings, _, _) if !encodings.is_empty()
315                    && matches!(encodings[0].codec(), Codec::Null) =>
316                    {
317                        Ok(AvroLiteral::Null)
318                    }
319                _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
320                _ => Err(ArrowError::SchemaError(
321                    "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
322                        .to_string(),
323                )),
324            };
325        }
326        let lit = match self.codec() {
327            Codec::Null => {
328                return Err(ArrowError::SchemaError(
329                    "Default for `null` type must be JSON null".to_string(),
330                ));
331            }
332            Codec::Boolean => match default_json {
333                Value::Bool(b) => AvroLiteral::Boolean(*b),
334                _ => {
335                    return Err(ArrowError::SchemaError(
336                        "Boolean default must be a JSON boolean".to_string(),
337                    ));
338                }
339            },
340            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
341                let i = parse_json_i64(default_json, "int")?;
342                if i < i32::MIN as i64 || i > i32::MAX as i64 {
343                    return Err(ArrowError::SchemaError(format!(
344                        "Default int {i} out of i32 range"
345                    )));
346                }
347                AvroLiteral::Int(i as i32)
348            }
349            Codec::Int64
350            | Codec::TimeMicros
351            | Codec::TimestampMillis(_)
352            | Codec::TimestampMicros(_)
353            | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
354            #[cfg(feature = "avro_custom_types")]
355            Codec::DurationNanos
356            | Codec::DurationMicros
357            | Codec::DurationMillis
358            | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
359            #[cfg(feature = "avro_custom_types")]
360            Codec::Int8 => {
361                let i = parse_json_i64(default_json, "int")?;
362                if i < i8::MIN as i64 || i > i8::MAX as i64 {
363                    return Err(ArrowError::SchemaError(format!(
364                        "Default int8 {i} out of i8 range"
365                    )));
366                }
367                AvroLiteral::Int(i as i32)
368            }
369            #[cfg(feature = "avro_custom_types")]
370            Codec::Int16 => {
371                let i = parse_json_i64(default_json, "int")?;
372                if i < i16::MIN as i64 || i > i16::MAX as i64 {
373                    return Err(ArrowError::SchemaError(format!(
374                        "Default int16 {i} out of i16 range"
375                    )));
376                }
377                AvroLiteral::Int(i as i32)
378            }
379            #[cfg(feature = "avro_custom_types")]
380            Codec::UInt8 => {
381                let i = parse_json_i64(default_json, "int")?;
382                if i < 0 || i > u8::MAX as i64 {
383                    return Err(ArrowError::SchemaError(format!(
384                        "Default uint8 {i} out of u8 range"
385                    )));
386                }
387                AvroLiteral::Int(i as i32)
388            }
389            #[cfg(feature = "avro_custom_types")]
390            Codec::UInt16 => {
391                let i = parse_json_i64(default_json, "int")?;
392                if i < 0 || i > u16::MAX as i64 {
393                    return Err(ArrowError::SchemaError(format!(
394                        "Default uint16 {i} out of u16 range"
395                    )));
396                }
397                AvroLiteral::Int(i as i32)
398            }
399            #[cfg(feature = "avro_custom_types")]
400            Codec::UInt32 => {
401                let i = parse_json_i64(default_json, "long")?;
402                if i < 0 || i > u32::MAX as i64 {
403                    return Err(ArrowError::SchemaError(format!(
404                        "Default uint32 {i} out of u32 range"
405                    )));
406                }
407                AvroLiteral::Long(i)
408            }
409            #[cfg(feature = "avro_custom_types")]
410            Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
411                AvroLiteral::Long(parse_json_i64(default_json, "long")?)
412            }
413            #[cfg(feature = "avro_custom_types")]
414            Codec::UInt64 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?),
415            #[cfg(feature = "avro_custom_types")]
416            Codec::Float16 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(2))?),
417            #[cfg(feature = "avro_custom_types")]
418            Codec::Time32Secs => {
419                let i = parse_json_i64(default_json, "int")?;
420                if i < i32::MIN as i64 || i > i32::MAX as i64 {
421                    return Err(ArrowError::SchemaError(format!(
422                        "Default time32-secs {i} out of i32 range"
423                    )));
424                }
425                AvroLiteral::Int(i as i32)
426            }
427            #[cfg(feature = "avro_custom_types")]
428            Codec::IntervalYearMonth => {
429                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
430            }
431            #[cfg(feature = "avro_custom_types")]
432            Codec::IntervalMonthDayNano => {
433                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
434            }
435            #[cfg(feature = "avro_custom_types")]
436            Codec::IntervalDayTime => {
437                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
438            }
439            Codec::Float32 => {
440                let f = parse_json_f64(default_json, "float")?;
441                if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
442                    return Err(ArrowError::SchemaError(format!(
443                        "Default float {f} out of f32 range or not finite"
444                    )));
445                }
446                AvroLiteral::Float(f as f32)
447            }
448            Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
449            Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
450                AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
451            }
452            Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
453            Codec::Fixed(sz) => {
454                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
455            }
456            Codec::Decimal(_, _, fixed_size) => {
457                AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
458            }
459            Codec::Enum(symbols) => {
460                let s = expect_string(default_json, "enum")?;
461                if symbols.iter().any(|sym| sym == s) {
462                    AvroLiteral::Enum(s.to_string())
463                } else {
464                    return Err(ArrowError::SchemaError(format!(
465                        "Default enum symbol {s:?} not found in reader enum symbols"
466                    )));
467                }
468            }
469            Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
470            Codec::List(item_dt) => match default_json {
471                Value::Array(items) => AvroLiteral::Array(
472                    items
473                        .iter()
474                        .map(|v| item_dt.parse_default_literal(v))
475                        .collect::<Result<_, _>>()?,
476                ),
477                _ => {
478                    return Err(ArrowError::SchemaError(
479                        "Default value must be a JSON array for Avro array type".to_string(),
480                    ));
481                }
482            },
483            Codec::Map(val_dt) => match default_json {
484                Value::Object(map) => {
485                    let mut out = IndexMap::with_capacity(map.len());
486                    for (k, v) in map {
487                        out.insert(k.clone(), val_dt.parse_default_literal(v)?);
488                    }
489                    AvroLiteral::Map(out)
490                }
491                _ => {
492                    return Err(ArrowError::SchemaError(
493                        "Default value must be a JSON object for Avro map type".to_string(),
494                    ));
495                }
496            },
497            Codec::Struct(fields) => match default_json {
498                Value::Object(obj) => {
499                    let mut out: IndexMap<String, AvroLiteral> =
500                        IndexMap::with_capacity(fields.len());
501                    for f in fields.as_ref() {
502                        let name = f.name().to_string();
503                        if let Some(sub) = obj.get(&name) {
504                            out.insert(name, f.data_type().parse_default_literal(sub)?);
505                        } else {
506                            // Cache metadata lookup once
507                            let stored_default =
508                                f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
509                            if stored_default.is_none()
510                                && f.data_type().nullability() == Some(Nullability::default())
511                            {
512                                out.insert(name, AvroLiteral::Null);
513                            } else if let Some(default_json) = stored_default {
514                                let v: Value =
515                                    serde_json::from_str(default_json).map_err(|e| {
516                                        ArrowError::SchemaError(format!(
517                                            "Failed to parse stored subfield default JSON for '{}': {e}",
518                                            f.name(),
519                                        ))
520                                    })?;
521                                out.insert(name, f.data_type().parse_default_literal(&v)?);
522                            } else {
523                                return Err(ArrowError::SchemaError(format!(
524                                    "Record default missing required subfield '{}' with non-nullable type {:?}",
525                                    f.name(),
526                                    f.data_type().codec()
527                                )));
528                            }
529                        }
530                    }
531                    AvroLiteral::Map(out)
532                }
533                _ => {
534                    return Err(ArrowError::SchemaError(
535                        "Default value for record/struct must be a JSON object".to_string(),
536                    ));
537                }
538            },
539            Codec::Union(encodings, _, _) => {
540                let Some(default_encoding) = encodings.first() else {
541                    return Err(ArrowError::SchemaError(
542                        "Union with no branches cannot have a default".to_string(),
543                    ));
544                };
545                default_encoding.parse_default_literal(default_json)?
546            }
547            #[cfg(feature = "avro_custom_types")]
548            Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
549        };
550        Ok(lit)
551    }
552
553    fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
554        let json_text = serde_json::to_string(default_json).map_err(|e| {
555            ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
556        })?;
557        self.metadata
558            .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
559        Ok(())
560    }
561
562    fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
563        let lit = self.parse_default_literal(default_json)?;
564        self.store_default(default_json)?;
565        Ok(lit)
566    }
567}
568
569/// A named [`AvroDataType`]
570#[derive(Debug, Clone, PartialEq)]
571pub(crate) struct AvroField {
572    name: String,
573    data_type: AvroDataType,
574}
575
576impl AvroField {
577    /// Returns the arrow [`Field`]
578    pub(crate) fn field(&self) -> Field {
579        self.data_type.field_with_name(&self.name)
580    }
581
582    /// Returns the [`AvroDataType`]
583    pub(crate) fn data_type(&self) -> &AvroDataType {
584        &self.data_type
585    }
586
587    /// Returns a new [`AvroField`] with Utf8View support enabled
588    ///
589    /// This will convert any Utf8 codecs to Utf8View codecs. This method is used to
590    /// enable potential performance optimizations in string-heavy workloads by using
591    /// Arrow's StringViewArray data structure.
592    ///
593    /// Returns a new `AvroField` with the same structure, but with string types
594    /// converted to use `Utf8View` instead of `Utf8`.
595    pub(crate) fn with_utf8view(&self) -> Self {
596        let mut field = self.clone();
597        if let Codec::Utf8 = field.data_type.codec {
598            field.data_type.codec = Codec::Utf8View;
599        }
600        field
601    }
602
603    /// Returns the name of this Avro field
604    ///
605    /// This is the field name as defined in the Avro schema.
606    /// It's used to identify fields within a record structure.
607    pub(crate) fn name(&self) -> &str {
608        &self.name
609    }
610}
611
612impl<'a> TryFrom<&Schema<'a>> for AvroField {
613    type Error = ArrowError;
614
615    fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
616        match schema {
617            Schema::Complex(ComplexType::Record(r)) => {
618                let mut resolver = Maker::new(false, false);
619                let data_type = resolver.make_data_type(schema, None, None)?;
620                Ok(AvroField {
621                    data_type,
622                    name: r.name.to_string(),
623                })
624            }
625            _ => Err(ArrowError::ParseError(format!(
626                "Expected record got {schema:?}"
627            ))),
628        }
629    }
630}
631
632/// Builder for an [`AvroField`]
633#[derive(Debug)]
634pub(crate) struct AvroFieldBuilder<'a> {
635    writer_schema: &'a Schema<'a>,
636    reader_schema: Option<&'a Schema<'a>>,
637    use_utf8view: bool,
638    strict_mode: bool,
639}
640
641impl<'a> AvroFieldBuilder<'a> {
642    /// Creates a new [`AvroFieldBuilder`] for a given writer schema.
643    pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
644        Self {
645            writer_schema,
646            reader_schema: None,
647            use_utf8view: false,
648            strict_mode: false,
649        }
650    }
651
652    /// Sets the reader schema for schema resolution.
653    ///
654    /// If a reader schema is provided, the builder will produce a resolved `AvroField`
655    /// that can handle differences between the writer's and reader's schemas.
656    #[inline]
657    pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
658        self.reader_schema = Some(reader_schema);
659        self
660    }
661
662    /// Enable or disable Utf8View support
663    pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
664        self.use_utf8view = use_utf8view;
665        self
666    }
667
668    /// Enable or disable strict mode.
669    pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
670        self.strict_mode = strict_mode;
671        self
672    }
673
674    /// Build an [`AvroField`] from the builder
675    pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
676        match self.writer_schema {
677            Schema::Complex(ComplexType::Record(r)) => {
678                let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
679                let data_type =
680                    resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
681                Ok(AvroField {
682                    name: r.name.to_string(),
683                    data_type,
684                })
685            }
686            _ => Err(ArrowError::ParseError(format!(
687                "Expected a Record schema to build an AvroField, but got {:?}",
688                self.writer_schema
689            ))),
690        }
691    }
692}
693
694/// An Avro encoding
695///
696/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
697#[derive(Debug, Clone, PartialEq)]
698pub(crate) enum Codec {
699    /// Represents Avro null type, maps to Arrow's Null data type
700    Null,
701    /// Represents Avro boolean type, maps to Arrow's Boolean data type
702    Boolean,
703    /// Represents Avro int type, maps to Arrow's Int32 data type
704    Int32,
705    /// Represents Avro long type, maps to Arrow's Int64 data type
706    Int64,
707    /// Represents Avro float type, maps to Arrow's Float32 data type
708    Float32,
709    /// Represents Avro double type, maps to Arrow's Float64 data type
710    Float64,
711    /// Represents Avro bytes type, maps to Arrow's Binary data type
712    Binary,
713    /// String data represented as UTF-8 encoded bytes, corresponding to Arrow's StringArray
714    Utf8,
715    /// String data represented as UTF-8 encoded bytes with an optimized view representation,
716    /// corresponding to Arrow's StringViewArray which provides better performance for string operations
717    ///
718    /// The Utf8View option can be enabled via `ReadOptions::use_utf8view`.
719    Utf8View,
720    /// Represents Avro date logical type, maps to Arrow's Date32 data type
721    Date32,
722    /// Represents Avro time-millis logical type, maps to Arrow's Time32(TimeUnit::Millisecond) data type
723    TimeMillis,
724    /// Represents Avro time-micros logical type, maps to Arrow's Time64(TimeUnit::Microsecond) data type
725    TimeMicros,
726    /// Represents Avro timestamp-millis or local-timestamp-millis logical type
727    ///
728    /// Maps to Arrow's Timestamp(TimeUnit::Millisecond) data type
729    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
730    TimestampMillis(bool),
731    /// Represents Avro timestamp-micros or local-timestamp-micros logical type
732    ///
733    /// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
734    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
735    TimestampMicros(bool),
736    /// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
737    ///
738    /// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
739    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
740    TimestampNanos(bool),
741    /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
742    /// The i32 parameter indicates the fixed binary size
743    Fixed(i32),
744    /// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types
745    ///
746    /// The fields are `(precision, scale, fixed_size)`.
747    /// - `precision` (`usize`): Total number of digits.
748    /// - `scale` (`Option<usize>`): Number of fractional digits.
749    /// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
750    Decimal(usize, Option<usize>, Option<usize>),
751    /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16.
752    Uuid,
753    /// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type.
754    ///
755    /// The enclosed value contains the enum's symbols.
756    Enum(Arc<[String]>),
757    /// Represents Avro array type, maps to Arrow's List data type
758    List(Arc<AvroDataType>),
759    /// Represents Avro record type, maps to Arrow's Struct data type
760    Struct(Arc<[AvroField]>),
761    /// Represents Avro map type, maps to Arrow's Map data type
762    Map(Arc<AvroDataType>),
763    /// Represents Avro duration logical type, maps to Arrow's Interval(IntervalUnit::MonthDayNano) data type
764    Interval,
765    /// Represents Avro union type, maps to Arrow's Union data type
766    Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
767    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Nanosecond)
768    #[cfg(feature = "avro_custom_types")]
769    DurationNanos,
770    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Microsecond)
771    #[cfg(feature = "avro_custom_types")]
772    DurationMicros,
773    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Millisecond)
774    #[cfg(feature = "avro_custom_types")]
775    DurationMillis,
776    /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Second)
777    #[cfg(feature = "avro_custom_types")]
778    DurationSeconds,
779    #[cfg(feature = "avro_custom_types")]
780    RunEndEncoded(Arc<AvroDataType>, u8),
781    /// Arrow Int8 custom logical type (arrow.int8)
782    #[cfg(feature = "avro_custom_types")]
783    Int8,
784    /// Arrow Int16 custom logical type (arrow.int16)
785    #[cfg(feature = "avro_custom_types")]
786    Int16,
787    /// Arrow UInt8 custom logical type (arrow.uint8)
788    #[cfg(feature = "avro_custom_types")]
789    UInt8,
790    /// Arrow UInt16 custom logical type (arrow.uint16)
791    #[cfg(feature = "avro_custom_types")]
792    UInt16,
793    /// Arrow UInt32 custom logical type (arrow.uint32)
794    #[cfg(feature = "avro_custom_types")]
795    UInt32,
796    /// Arrow UInt64 custom logical type (arrow.uint64) - stored as fixed(8)
797    #[cfg(feature = "avro_custom_types")]
798    UInt64,
799    /// Arrow Float16 custom logical type (arrow.float16) - stored as fixed(2)
800    #[cfg(feature = "avro_custom_types")]
801    Float16,
802    /// Arrow Date64 custom logical type (arrow.date64)
803    #[cfg(feature = "avro_custom_types")]
804    Date64,
805    /// Arrow Time64(Nanosecond) custom logical type (arrow.time64-nanosecond)
806    #[cfg(feature = "avro_custom_types")]
807    TimeNanos,
808    /// Arrow Time32(Second) custom logical type (arrow.time32-second)
809    #[cfg(feature = "avro_custom_types")]
810    Time32Secs,
811    /// Arrow Timestamp(Second) custom logical type (arrow.timestamp-second)
812    /// The bool indicates UTC (true) or local (false)
813    #[cfg(feature = "avro_custom_types")]
814    TimestampSecs(bool),
815    /// Arrow Interval(YearMonth) custom logical type (arrow.interval-year-month)
816    #[cfg(feature = "avro_custom_types")]
817    IntervalYearMonth,
818    /// Arrow Interval(MonthDayNano) custom logical type (arrow.interval-month-day-nano)
819    #[cfg(feature = "avro_custom_types")]
820    IntervalMonthDayNano,
821    /// Arrow Interval(DayTime) custom logical type (arrow.interval-day-time)
822    #[cfg(feature = "avro_custom_types")]
823    IntervalDayTime,
824}
825
826impl Codec {
827    fn data_type(&self) -> DataType {
828        match self {
829            Self::Null => DataType::Null,
830            Self::Boolean => DataType::Boolean,
831            Self::Int32 => DataType::Int32,
832            Self::Int64 => DataType::Int64,
833            Self::Float32 => DataType::Float32,
834            Self::Float64 => DataType::Float64,
835            Self::Binary => DataType::Binary,
836            Self::Utf8 => DataType::Utf8,
837            Self::Utf8View => DataType::Utf8View,
838            Self::Date32 => DataType::Date32,
839            Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
840            Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
841            Self::TimestampMillis(is_utc) => {
842                DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
843            }
844            Self::TimestampMicros(is_utc) => {
845                DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
846            }
847            Self::TimestampNanos(is_utc) => {
848                DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
849            }
850            Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
851            Self::Fixed(size) => DataType::FixedSizeBinary(*size),
852            Self::Decimal(precision, scale, _size) => {
853                let p = *precision as u8;
854                let s = scale.unwrap_or(0) as i8;
855                #[cfg(feature = "small_decimals")]
856                {
857                    if *precision <= DECIMAL32_MAX_PRECISION as usize {
858                        DataType::Decimal32(p, s)
859                    } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
860                        DataType::Decimal64(p, s)
861                    } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
862                        DataType::Decimal128(p, s)
863                    } else {
864                        DataType::Decimal256(p, s)
865                    }
866                }
867                #[cfg(not(feature = "small_decimals"))]
868                {
869                    if *precision <= DECIMAL128_MAX_PRECISION as usize {
870                        DataType::Decimal128(p, s)
871                    } else {
872                        DataType::Decimal256(p, s)
873                    }
874                }
875            }
876            Self::Uuid => DataType::FixedSizeBinary(16),
877            Self::Enum(_) => {
878                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
879            }
880            Self::List(f) => {
881                DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
882            }
883            Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
884            Self::Map(value_type) => {
885                let val_field = value_type.field_with_name("value");
886                DataType::Map(
887                    Arc::new(Field::new(
888                        "entries",
889                        DataType::Struct(Fields::from(vec![
890                            Field::new("key", DataType::Utf8, false),
891                            val_field,
892                        ])),
893                        false,
894                    )),
895                    false,
896                )
897            }
898            Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
899            #[cfg(feature = "avro_custom_types")]
900            Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
901            #[cfg(feature = "avro_custom_types")]
902            Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
903            #[cfg(feature = "avro_custom_types")]
904            Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
905            #[cfg(feature = "avro_custom_types")]
906            Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
907            #[cfg(feature = "avro_custom_types")]
908            Self::RunEndEncoded(values, bits) => {
909                let run_ends_dt = match *bits {
910                    16 => DataType::Int16,
911                    32 => DataType::Int32,
912                    64 => DataType::Int64,
913                    _ => unreachable!(),
914                };
915                DataType::RunEndEncoded(
916                    Arc::new(Field::new("run_ends", run_ends_dt, false)),
917                    Arc::new(Field::new("values", values.codec().data_type(), true)),
918                )
919            }
920            #[cfg(feature = "avro_custom_types")]
921            Self::Int8 => DataType::Int8,
922            #[cfg(feature = "avro_custom_types")]
923            Self::Int16 => DataType::Int16,
924            #[cfg(feature = "avro_custom_types")]
925            Self::UInt8 => DataType::UInt8,
926            #[cfg(feature = "avro_custom_types")]
927            Self::UInt16 => DataType::UInt16,
928            #[cfg(feature = "avro_custom_types")]
929            Self::UInt32 => DataType::UInt32,
930            #[cfg(feature = "avro_custom_types")]
931            Self::UInt64 => DataType::UInt64,
932            #[cfg(feature = "avro_custom_types")]
933            Self::Float16 => DataType::Float16,
934            #[cfg(feature = "avro_custom_types")]
935            Self::Date64 => DataType::Date64,
936            #[cfg(feature = "avro_custom_types")]
937            Self::TimeNanos => DataType::Time64(TimeUnit::Nanosecond),
938            #[cfg(feature = "avro_custom_types")]
939            Self::Time32Secs => DataType::Time32(TimeUnit::Second),
940            #[cfg(feature = "avro_custom_types")]
941            Self::TimestampSecs(is_utc) => {
942                DataType::Timestamp(TimeUnit::Second, is_utc.then(|| "+00:00".into()))
943            }
944            #[cfg(feature = "avro_custom_types")]
945            Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
946            #[cfg(feature = "avro_custom_types")]
947            Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
948            #[cfg(feature = "avro_custom_types")]
949            Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
950        }
951    }
952
953    /// Converts a string codec to use Utf8View if requested
954    ///
955    /// The conversion only happens if both:
956    /// 1. `use_utf8view` is true
957    /// 2. The codec is currently `Utf8`
958    pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
959        if use_utf8view && matches!(self, Self::Utf8) {
960            Self::Utf8View
961        } else {
962            self
963        }
964    }
965
966    #[inline]
967    fn union_field_name(&self) -> String {
968        UnionFieldKind::from(self).as_ref().to_owned()
969    }
970}
971
972impl From<PrimitiveType> for Codec {
973    fn from(value: PrimitiveType) -> Self {
974        match value {
975            PrimitiveType::Null => Self::Null,
976            PrimitiveType::Boolean => Self::Boolean,
977            PrimitiveType::Int => Self::Int32,
978            PrimitiveType::Long => Self::Int64,
979            PrimitiveType::Float => Self::Float32,
980            PrimitiveType::Double => Self::Float64,
981            PrimitiveType::Bytes => Self::Binary,
982            PrimitiveType::String => Self::Utf8,
983        }
984    }
985}
986
987/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro
988/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian).
989///
990/// Per Avro spec (Decimal logical type), for a fixed length `n`:
991/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋.
992///
993/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports
994/// Decimal256, which is 32 bytes and has max precision 76).
995const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
996    // Precomputed exact table for n = 1..=32
997    // 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28,
998    // 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52,
999    // 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76
1000    const MAX_P: [usize; 32] = [
1001        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1002        59, 62, 64, 67, 69, 71, 74, 76,
1003    ];
1004    match n {
1005        1..=32 => Some(MAX_P[n - 1]),
1006        _ => None,
1007    }
1008}
1009
1010fn parse_decimal_attributes(
1011    attributes: &Attributes,
1012    fallback_size: Option<usize>,
1013    precision_required: bool,
1014) -> Result<(usize, usize, Option<usize>), ArrowError> {
1015    let precision = attributes
1016        .additional
1017        .get("precision")
1018        .and_then(|v| v.as_u64())
1019        .or(if precision_required { None } else { Some(10) })
1020        .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
1021        as usize;
1022    let scale = attributes
1023        .additional
1024        .get("scale")
1025        .and_then(|v| v.as_u64())
1026        .unwrap_or(0) as usize;
1027    let size = attributes
1028        .additional
1029        .get("size")
1030        .and_then(|v| v.as_u64())
1031        .map(|s| s as usize)
1032        .or(fallback_size);
1033    if precision == 0 {
1034        return Err(ArrowError::ParseError(
1035            "Decimal requires precision > 0".to_string(),
1036        ));
1037    }
1038    if scale > precision {
1039        return Err(ArrowError::ParseError(format!(
1040            "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
1041        )));
1042    }
1043    if precision > DECIMAL256_MAX_PRECISION as usize {
1044        return Err(ArrowError::ParseError(format!(
1045            "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
1046            DECIMAL256_MAX_PRECISION
1047        )));
1048    }
1049    if let Some(sz) = size {
1050        let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
1051            ArrowError::ParseError(format!(
1052                "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
1053            ))
1054        })?;
1055        if precision > max_p {
1056            return Err(ArrowError::ParseError(format!(
1057                "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
1058            )));
1059        }
1060    }
1061    Ok((precision, scale, size))
1062}
1063
1064#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
1065#[strum(serialize_all = "snake_case")]
1066enum UnionFieldKind {
1067    Null,
1068    Boolean,
1069    Int,
1070    Long,
1071    Float,
1072    Double,
1073    Bytes,
1074    String,
1075    Date,
1076    TimeMillis,
1077    TimeMicros,
1078    TimestampMillisUtc,
1079    TimestampMillisLocal,
1080    TimestampMicrosUtc,
1081    TimestampMicrosLocal,
1082    TimestampNanosUtc,
1083    TimestampNanosLocal,
1084    Duration,
1085    Fixed,
1086    Decimal,
1087    Enum,
1088    Array,
1089    Record,
1090    Map,
1091    Uuid,
1092    Union,
1093}
1094
1095impl From<&Codec> for UnionFieldKind {
1096    fn from(c: &Codec) -> Self {
1097        match c {
1098            Codec::Null => Self::Null,
1099            Codec::Boolean => Self::Boolean,
1100            Codec::Int32 => Self::Int,
1101            Codec::Int64 => Self::Long,
1102            Codec::Float32 => Self::Float,
1103            Codec::Float64 => Self::Double,
1104            Codec::Binary => Self::Bytes,
1105            Codec::Utf8 | Codec::Utf8View => Self::String,
1106            Codec::Date32 => Self::Date,
1107            Codec::TimeMillis => Self::TimeMillis,
1108            Codec::TimeMicros => Self::TimeMicros,
1109            Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
1110            Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
1111            Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
1112            Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
1113            Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
1114            Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
1115            Codec::Interval => Self::Duration,
1116            Codec::Fixed(_) => Self::Fixed,
1117            Codec::Decimal(..) => Self::Decimal,
1118            Codec::Enum(_) => Self::Enum,
1119            Codec::List(_) => Self::Array,
1120            Codec::Struct(_) => Self::Record,
1121            Codec::Map(_) => Self::Map,
1122            Codec::Uuid => Self::Uuid,
1123            Codec::Union(..) => Self::Union,
1124            #[cfg(feature = "avro_custom_types")]
1125            Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
1126            #[cfg(feature = "avro_custom_types")]
1127            Codec::DurationNanos
1128            | Codec::DurationMicros
1129            | Codec::DurationMillis
1130            | Codec::DurationSeconds => Self::Duration,
1131            #[cfg(feature = "avro_custom_types")]
1132            Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 => Self::Int,
1133            #[cfg(feature = "avro_custom_types")]
1134            Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
1135                Self::Long
1136            }
1137            #[cfg(feature = "avro_custom_types")]
1138            Codec::Time32Secs => Self::TimeMillis, // Closest standard type
1139            #[cfg(feature = "avro_custom_types")]
1140            Codec::UInt64
1141            | Codec::Float16
1142            | Codec::IntervalYearMonth
1143            | Codec::IntervalMonthDayNano
1144            | Codec::IntervalDayTime => Self::Fixed,
1145        }
1146    }
1147}
1148
1149fn union_branch_name(dt: &AvroDataType) -> String {
1150    if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
1151        if name.contains(".") {
1152            // Full name
1153            return name.to_string();
1154        }
1155        if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1156            return format!("{ns}.{name}");
1157        }
1158        return name.to_string();
1159    }
1160    dt.codec.union_field_name()
1161}
1162
1163fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
1164    let arrow_fields: Vec<Field> = encodings
1165        .iter()
1166        .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1167        .collect();
1168    let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1169    UnionFields::try_new(type_ids, arrow_fields)
1170}
1171
1172/// Resolves Avro type names to [`AvroDataType`]
1173///
1174/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1175#[derive(Debug, Default)]
1176struct Resolver<'a> {
1177    map: HashMap<(&'a str, &'a str), AvroDataType>,
1178}
1179
1180impl<'a> Resolver<'a> {
1181    fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1182        self.map.insert((namespace.unwrap_or(""), name), schema);
1183    }
1184
1185    fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1186        let (namespace, name) = name
1187            .rsplit_once('.')
1188            .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1189        self.map
1190            .get(&(namespace, name))
1191            .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1192            .cloned()
1193    }
1194}
1195
1196fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1197    let mut out = HashSet::with_capacity(1 + aliases.len());
1198    let (full, _) = make_full_name(name, ns, None);
1199    out.insert(full);
1200    for a in aliases {
1201        let (fa, _) = make_full_name(a, None, ns);
1202        out.insert(fa);
1203    }
1204    out
1205}
1206
1207fn names_match(
1208    writer_name: &str,
1209    writer_namespace: Option<&str>,
1210    writer_aliases: &[&str],
1211    reader_name: &str,
1212    reader_namespace: Option<&str>,
1213    reader_aliases: &[&str],
1214) -> bool {
1215    let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1216    let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1217    // If the canonical full names match, or any alias matches cross-wise.
1218    !writer_set.is_disjoint(&reader_set)
1219}
1220
1221fn ensure_names_match(
1222    data_type: &str,
1223    writer_name: &str,
1224    writer_namespace: Option<&str>,
1225    writer_aliases: &[&str],
1226    reader_name: &str,
1227    reader_namespace: Option<&str>,
1228    reader_aliases: &[&str],
1229) -> Result<(), ArrowError> {
1230    if names_match(
1231        writer_name,
1232        writer_namespace,
1233        writer_aliases,
1234        reader_name,
1235        reader_namespace,
1236        reader_aliases,
1237    ) {
1238        Ok(())
1239    } else {
1240        Err(ArrowError::ParseError(format!(
1241            "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1242        )))
1243    }
1244}
1245
1246fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1247    match schema {
1248        Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1249        Schema::Type(Type {
1250            r#type: TypeName::Primitive(primitive),
1251            ..
1252        }) => Some(*primitive),
1253        _ => None,
1254    }
1255}
1256
1257fn nullable_union_variants<'x, 'y>(
1258    variant: &'y [Schema<'x>],
1259) -> Option<(Nullability, &'y Schema<'x>)> {
1260    if variant.len() != 2 {
1261        return None;
1262    }
1263    let is_null = |schema: &Schema<'x>| {
1264        matches!(
1265            schema,
1266            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1267        )
1268    };
1269    match (is_null(&variant[0]), is_null(&variant[1])) {
1270        (true, false) => Some((Nullability::NullFirst, &variant[1])),
1271        (false, true) => Some((Nullability::NullSecond, &variant[0])),
1272        _ => None,
1273    }
1274}
1275
1276#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1277enum UnionBranchKey {
1278    Named(String),
1279    Primitive(PrimitiveType),
1280    Array,
1281    Map,
1282}
1283
1284fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1285    let (name, namespace) = match s {
1286        Schema::TypeName(TypeName::Primitive(p))
1287        | Schema::Type(Type {
1288            r#type: TypeName::Primitive(p),
1289            ..
1290        }) => return Some(UnionBranchKey::Primitive(*p)),
1291        Schema::TypeName(TypeName::Ref(name))
1292        | Schema::Type(Type {
1293            r#type: TypeName::Ref(name),
1294            ..
1295        }) => (name, None),
1296        Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1297        Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1298        Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1299        Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1300        Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1301        Schema::Union(_) => return None,
1302    };
1303    let (full, _) = make_full_name(name, namespace, enclosing_ns);
1304    Some(UnionBranchKey::Named(full))
1305}
1306
1307fn union_first_duplicate<'a>(
1308    branches: &'a [Schema<'a>],
1309    enclosing_ns: Option<&'a str>,
1310) -> Option<String> {
1311    let mut seen = HashSet::with_capacity(branches.len());
1312    for schema in branches {
1313        if let Some(key) = branch_key_of(schema, enclosing_ns) {
1314            if !seen.insert(key.clone()) {
1315                let msg = match key {
1316                    UnionBranchKey::Named(full) => format!("named type {full}"),
1317                    UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1318                    UnionBranchKey::Array => "array".to_string(),
1319                    UnionBranchKey::Map => "map".to_string(),
1320                };
1321                return Some(msg);
1322            }
1323        }
1324    }
1325    None
1326}
1327
1328/// Resolves Avro type names to [`AvroDataType`]
1329///
1330/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
1331struct Maker<'a> {
1332    resolver: Resolver<'a>,
1333    use_utf8view: bool,
1334    strict_mode: bool,
1335}
1336
1337impl<'a> Maker<'a> {
1338    fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1339        Self {
1340            resolver: Default::default(),
1341            use_utf8view,
1342            strict_mode,
1343        }
1344    }
1345
1346    #[cfg(feature = "avro_custom_types")]
1347    #[inline]
1348    fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1349        if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1350            let mut inner = (*values).clone();
1351            inner.nullability = Some(nb);
1352            dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1353        }
1354    }
1355
1356    fn make_data_type<'s>(
1357        &mut self,
1358        writer_schema: &'s Schema<'a>,
1359        reader_schema: Option<&'s Schema<'a>>,
1360        namespace: Option<&'a str>,
1361    ) -> Result<AvroDataType, ArrowError> {
1362        match reader_schema {
1363            Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1364            None => self.parse_type(writer_schema, namespace),
1365        }
1366    }
1367
1368    /// Parses a [`AvroDataType`] from the provided `Schema` and the given `name` and `namespace`
1369    ///
1370    /// `name`: is the name used to refer to `schema` in its parent
1371    /// `namespace`: an optional qualifier used as part of a type hierarchy
1372    /// If the data type is a string, convert to use Utf8View if requested
1373    ///
1374    /// This function is used during the schema conversion process to determine whether
1375    /// string data should be represented as StringArray (default) or StringViewArray.
1376    ///
1377    /// `use_utf8view`: if true, use Utf8View instead of Utf8 for string types
1378    ///
1379    /// See [`Resolver`] for more information
1380    fn parse_type<'s>(
1381        &mut self,
1382        schema: &'s Schema<'a>,
1383        namespace: Option<&'a str>,
1384    ) -> Result<AvroDataType, ArrowError> {
1385        match schema {
1386            Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1387                Codec::from(*p).with_utf8view(self.use_utf8view),
1388                Default::default(),
1389                None,
1390            )),
1391            Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1392            Schema::Union(f) => {
1393                let null = f
1394                    .iter()
1395                    .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1396                match (f.len() == 2, null) {
1397                    (true, Some(0)) => {
1398                        let mut field = self.parse_type(&f[1], namespace)?;
1399                        field.nullability = Some(Nullability::NullFirst);
1400                        #[cfg(feature = "avro_custom_types")]
1401                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1402                        return Ok(field);
1403                    }
1404                    (true, Some(1)) => {
1405                        if self.strict_mode {
1406                            return Err(ArrowError::SchemaError(
1407                                "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1408                                    .to_string(),
1409                            ));
1410                        }
1411                        let mut field = self.parse_type(&f[0], namespace)?;
1412                        field.nullability = Some(Nullability::NullSecond);
1413                        #[cfg(feature = "avro_custom_types")]
1414                        Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1415                        return Ok(field);
1416                    }
1417                    _ => {}
1418                }
1419                // Validate: unions may not immediately contain unions
1420                if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1421                    return Err(ArrowError::SchemaError(
1422                        "Avro unions may not immediately contain other unions".to_string(),
1423                    ));
1424                }
1425                // Validate: duplicates (named by full name; non-named by kind)
1426                if let Some(dup) = union_first_duplicate(f, namespace) {
1427                    return Err(ArrowError::SchemaError(format!(
1428                        "Avro union contains duplicate branch type: {dup}"
1429                    )));
1430                }
1431                // Parse all branches
1432                let children: Vec<AvroDataType> = f
1433                    .iter()
1434                    .map(|s| self.parse_type(s, namespace))
1435                    .collect::<Result<_, _>>()?;
1436                // Build Arrow layout once here
1437                let union_fields = build_union_fields(&children)?;
1438                Ok(AvroDataType::new(
1439                    Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1440                    Default::default(),
1441                    None,
1442                ))
1443            }
1444            Schema::Complex(c) => match c {
1445                ComplexType::Record(r) => {
1446                    let namespace = r.namespace.or(namespace);
1447                    let mut metadata = r.attributes.field_metadata();
1448                    let fields = r
1449                        .fields
1450                        .iter()
1451                        .map(|field| {
1452                            Ok(AvroField {
1453                                name: field.name.to_string(),
1454                                data_type: self.parse_type(&field.r#type, namespace)?,
1455                            })
1456                        })
1457                        .collect::<Result<_, ArrowError>>()?;
1458                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1459                    if let Some(ns) = namespace {
1460                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1461                    }
1462                    let field = AvroDataType {
1463                        nullability: None,
1464                        codec: Codec::Struct(fields),
1465                        metadata,
1466                        resolution: None,
1467                    };
1468                    self.resolver.register(r.name, namespace, field.clone());
1469                    Ok(field)
1470                }
1471                ComplexType::Array(a) => {
1472                    let field = self.parse_type(a.items.as_ref(), namespace)?;
1473                    Ok(AvroDataType {
1474                        nullability: None,
1475                        metadata: a.attributes.field_metadata(),
1476                        codec: Codec::List(Arc::new(field)),
1477                        resolution: None,
1478                    })
1479                }
1480                ComplexType::Fixed(f) => {
1481                    let size = f.size.try_into().map_err(|e| {
1482                        ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1483                    })?;
1484                    let namespace = f.namespace.or(namespace);
1485                    let mut metadata = f.attributes.field_metadata();
1486                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1487                    if let Some(ns) = namespace {
1488                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1489                    }
1490                    let field = match f.attributes.logical_type {
1491                        Some("decimal") => {
1492                            let (precision, scale, _) =
1493                                parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1494                            AvroDataType {
1495                                nullability: None,
1496                                metadata,
1497                                codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1498                                resolution: None,
1499                            }
1500                        }
1501                        Some("duration") => {
1502                            if size != 12 {
1503                                return Err(ArrowError::ParseError(format!(
1504                                    "Invalid fixed size for Duration: {size}, must be 12"
1505                                )));
1506                            };
1507                            AvroDataType {
1508                                nullability: None,
1509                                metadata,
1510                                codec: Codec::Interval,
1511                                resolution: None,
1512                            }
1513                        }
1514                        #[cfg(feature = "avro_custom_types")]
1515                        Some("arrow.uint64") if size == 8 => AvroDataType {
1516                            nullability: None,
1517                            metadata,
1518                            codec: Codec::UInt64,
1519                            resolution: None,
1520                        },
1521                        #[cfg(feature = "avro_custom_types")]
1522                        Some("arrow.float16") if size == 2 => AvroDataType {
1523                            nullability: None,
1524                            metadata,
1525                            codec: Codec::Float16,
1526                            resolution: None,
1527                        },
1528                        #[cfg(feature = "avro_custom_types")]
1529                        Some("arrow.interval-year-month") if size == 4 => AvroDataType {
1530                            nullability: None,
1531                            metadata,
1532                            codec: Codec::IntervalYearMonth,
1533                            resolution: None,
1534                        },
1535                        #[cfg(feature = "avro_custom_types")]
1536                        Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1537                            nullability: None,
1538                            metadata,
1539                            codec: Codec::IntervalMonthDayNano,
1540                            resolution: None,
1541                        },
1542                        #[cfg(feature = "avro_custom_types")]
1543                        Some("arrow.interval-day-time") if size == 8 => AvroDataType {
1544                            nullability: None,
1545                            metadata,
1546                            codec: Codec::IntervalDayTime,
1547                            resolution: None,
1548                        },
1549                        _ => AvroDataType {
1550                            nullability: None,
1551                            metadata,
1552                            codec: Codec::Fixed(size),
1553                            resolution: None,
1554                        },
1555                    };
1556                    self.resolver.register(f.name, namespace, field.clone());
1557                    Ok(field)
1558                }
1559                ComplexType::Enum(e) => {
1560                    let namespace = e.namespace.or(namespace);
1561                    let symbols = e
1562                        .symbols
1563                        .iter()
1564                        .map(|s| s.to_string())
1565                        .collect::<Arc<[String]>>();
1566                    let mut metadata = e.attributes.field_metadata();
1567                    let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1568                        ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1569                    })?;
1570                    metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1571                    metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1572                    if let Some(ns) = namespace {
1573                        metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1574                    }
1575                    let field = AvroDataType {
1576                        nullability: None,
1577                        metadata,
1578                        codec: Codec::Enum(symbols),
1579                        resolution: None,
1580                    };
1581                    self.resolver.register(e.name, namespace, field.clone());
1582                    Ok(field)
1583                }
1584                ComplexType::Map(m) => {
1585                    let val = self.parse_type(&m.values, namespace)?;
1586                    Ok(AvroDataType {
1587                        nullability: None,
1588                        metadata: m.attributes.field_metadata(),
1589                        codec: Codec::Map(Arc::new(val)),
1590                        resolution: None,
1591                    })
1592                }
1593            },
1594            Schema::Type(t) => {
1595                let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1596                // https://avro.apache.org/docs/1.11.1/specification/#logical-types
1597                match (t.attributes.logical_type, &mut field.codec) {
1598                    (Some("decimal"), c @ Codec::Binary) => {
1599                        let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1600                        *c = Codec::Decimal(prec, Some(sc), None);
1601                    }
1602                    (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1603                    (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1604                    (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1605                    (Some("timestamp-millis"), c @ Codec::Int64) => {
1606                        *c = Codec::TimestampMillis(true)
1607                    }
1608                    (Some("timestamp-micros"), c @ Codec::Int64) => {
1609                        *c = Codec::TimestampMicros(true)
1610                    }
1611                    (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1612                        *c = Codec::TimestampMillis(false)
1613                    }
1614                    (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1615                        *c = Codec::TimestampMicros(false)
1616                    }
1617                    (Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
1618                    (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1619                        *c = Codec::TimestampNanos(false)
1620                    }
1621                    (Some("uuid"), c @ Codec::Utf8) => {
1622                        // Map Avro string+logicalType=uuid into the UUID Codec,
1623                        // and preserve the logicalType in Arrow field metadata
1624                        // so writers can round-trip it correctly.
1625                        *c = Codec::Uuid;
1626                        field.metadata.insert("logicalType".into(), "uuid".into());
1627                    }
1628                    #[cfg(feature = "avro_custom_types")]
1629                    (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1630                    #[cfg(feature = "avro_custom_types")]
1631                    (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1632                    #[cfg(feature = "avro_custom_types")]
1633                    (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1634                    #[cfg(feature = "avro_custom_types")]
1635                    (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1636                        *c = Codec::DurationSeconds
1637                    }
1638                    #[cfg(feature = "avro_custom_types")]
1639                    (Some("arrow.run-end-encoded"), _) => {
1640                        let bits_u8: u8 = t
1641                            .attributes
1642                            .additional
1643                            .get("arrow.runEndIndexBits")
1644                            .and_then(|v| v.as_u64())
1645                            .and_then(|n| u8::try_from(n).ok())
1646                            .ok_or_else(|| ArrowError::ParseError(
1647                                "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1648                                    .to_string(),
1649                            ))?;
1650                        if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1651                            return Err(ArrowError::ParseError(format!(
1652                                "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1653                            )));
1654                        }
1655                        // Wrap the parsed underlying site as REE
1656                        let values_site = field.clone();
1657                        field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1658                    }
1659                    // Arrow-specific integer width types
1660                    #[cfg(feature = "avro_custom_types")]
1661                    (Some("arrow.int8"), c @ Codec::Int32) => *c = Codec::Int8,
1662                    #[cfg(feature = "avro_custom_types")]
1663                    (Some("arrow.int16"), c @ Codec::Int32) => *c = Codec::Int16,
1664                    #[cfg(feature = "avro_custom_types")]
1665                    (Some("arrow.uint8"), c @ Codec::Int32) => *c = Codec::UInt8,
1666                    #[cfg(feature = "avro_custom_types")]
1667                    (Some("arrow.uint16"), c @ Codec::Int32) => *c = Codec::UInt16,
1668                    #[cfg(feature = "avro_custom_types")]
1669                    (Some("arrow.uint32"), c @ Codec::Int64) => *c = Codec::UInt32,
1670                    #[cfg(feature = "avro_custom_types")]
1671                    (Some("arrow.uint64"), c @ Codec::Fixed(8)) => *c = Codec::UInt64,
1672                    // Arrow Float16 stored as fixed(2)
1673                    #[cfg(feature = "avro_custom_types")]
1674                    (Some("arrow.float16"), c @ Codec::Fixed(2)) => *c = Codec::Float16,
1675                    // Arrow Date64 custom type
1676                    #[cfg(feature = "avro_custom_types")]
1677                    (Some("arrow.date64"), c @ Codec::Int64) => *c = Codec::Date64,
1678                    // Arrow time/timestamp types with second precision
1679                    #[cfg(feature = "avro_custom_types")]
1680                    (Some("arrow.time64-nanosecond"), c @ Codec::Int64) => *c = Codec::TimeNanos,
1681                    #[cfg(feature = "avro_custom_types")]
1682                    (Some("arrow.time32-second"), c @ Codec::Int32) => *c = Codec::Time32Secs,
1683                    #[cfg(feature = "avro_custom_types")]
1684                    (Some("arrow.timestamp-second"), c @ Codec::Int64) => {
1685                        *c = Codec::TimestampSecs(true)
1686                    }
1687                    #[cfg(feature = "avro_custom_types")]
1688                    (Some("arrow.local-timestamp-second"), c @ Codec::Int64) => {
1689                        *c = Codec::TimestampSecs(false)
1690                    }
1691                    // Arrow interval types
1692                    #[cfg(feature = "avro_custom_types")]
1693                    (Some("arrow.interval-year-month"), c @ Codec::Fixed(4)) => {
1694                        *c = Codec::IntervalYearMonth
1695                    }
1696                    #[cfg(feature = "avro_custom_types")]
1697                    (Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1698                        *c = Codec::IntervalMonthDayNano
1699                    }
1700                    #[cfg(feature = "avro_custom_types")]
1701                    (Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
1702                        *c = Codec::IntervalDayTime
1703                    }
1704                    (Some(logical), _) => {
1705                        // Insert unrecognized logical type into metadata map
1706                        field.metadata.insert("logicalType".into(), logical.into());
1707                    }
1708                    (None, _) => {}
1709                }
1710                if matches!(field.codec, Codec::Int64) {
1711                    if let Some(unit) = t
1712                        .attributes
1713                        .additional
1714                        .get("arrowTimeUnit")
1715                        .and_then(|v| v.as_str())
1716                    {
1717                        if unit == "nanosecond" {
1718                            field.codec = Codec::TimestampNanos(false);
1719                        }
1720                    }
1721                }
1722                if !t.attributes.additional.is_empty() {
1723                    for (k, v) in &t.attributes.additional {
1724                        field.metadata.insert(k.to_string(), v.to_string());
1725                    }
1726                }
1727                Ok(field)
1728            }
1729        }
1730    }
1731
1732    fn resolve_type<'s>(
1733        &mut self,
1734        writer_schema: &'s Schema<'a>,
1735        reader_schema: &'s Schema<'a>,
1736        namespace: Option<&'a str>,
1737    ) -> Result<AvroDataType, ArrowError> {
1738        if let (Some(write_primitive), Some(read_primitive)) =
1739            (primitive_of(writer_schema), primitive_of(reader_schema))
1740        {
1741            return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1742        }
1743        match (writer_schema, reader_schema) {
1744            (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1745                let writer_variants = writer_variants.as_slice();
1746                let reader_variants = reader_variants.as_slice();
1747                match (
1748                    nullable_union_variants(writer_variants),
1749                    nullable_union_variants(reader_variants),
1750                ) {
1751                    (Some((w_nb, w_nonnull)), Some((r_nb, r_nonnull))) => {
1752                        let mut dt = self.resolve_type(w_nonnull, r_nonnull, namespace)?;
1753                        let mut writer_to_reader = vec![None, None];
1754                        writer_to_reader[w_nb.non_null_index()] = Some((
1755                            r_nb.non_null_index(),
1756                            dt.resolution
1757                                .take()
1758                                .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct)),
1759                        ));
1760                        dt.nullability = Some(w_nb);
1761                        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1762                            writer_to_reader: Arc::from(writer_to_reader),
1763                            writer_is_union: true,
1764                            reader_is_union: true,
1765                        }));
1766                        #[cfg(feature = "avro_custom_types")]
1767                        Self::propagate_nullability_into_ree(&mut dt, w_nb);
1768                        Ok(dt)
1769                    }
1770                    _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1771                }
1772            }
1773            (Schema::Union(writer_variants), reader_non_union) => {
1774                let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1775                    .iter()
1776                    .map(|writer| {
1777                        self.resolve_type(writer, reader_non_union, namespace)
1778                            .ok()
1779                            .map(|tmp| {
1780                                let resolution = tmp
1781                                    .resolution
1782                                    .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1783                                (0usize, resolution)
1784                            })
1785                    })
1786                    .collect();
1787                let mut dt = self.parse_type(reader_non_union, namespace)?;
1788                dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1789                    writer_to_reader: Arc::from(writer_to_reader),
1790                    writer_is_union: true,
1791                    reader_is_union: false,
1792                }));
1793                Ok(dt)
1794            }
1795            (writer_non_union, Schema::Union(reader_variants)) => {
1796                if let Some((nullability, non_null_branch)) =
1797                    nullable_union_variants(reader_variants)
1798                {
1799                    let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1800                    #[cfg(feature = "avro_custom_types")]
1801                    Self::propagate_nullability_into_ree(&mut dt, nullability);
1802                    dt.nullability = Some(nullability);
1803                    // Ensure resolution is set to a non-Union variant to suppress
1804                    // reading the union tag which is the default behavior.
1805                    if dt.resolution.is_none() {
1806                        dt.resolution = Some(ResolutionInfo::Promotion(Promotion::Direct));
1807                    }
1808                    Ok(dt)
1809                } else {
1810                    let Some((match_idx, mut match_dt)) =
1811                        self.find_best_union_match(writer_non_union, reader_variants, namespace)
1812                    else {
1813                        return Err(ArrowError::SchemaError(
1814                            "Writer schema does not match any reader union branch".to_string(),
1815                        ));
1816                    };
1817                    // Steal the resolution info from the matching reader branch
1818                    // for the Union resolution, but preserve possible resolution
1819                    // information on its inner types.
1820                    // For other branches, resolution is irrelevant,
1821                    // so just parse them.
1822                    let resolution = match_dt
1823                        .resolution
1824                        .take()
1825                        .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1826                    let mut match_dt = Some(match_dt);
1827                    let children = reader_variants
1828                        .iter()
1829                        .enumerate()
1830                        .map(|(idx, variant)| {
1831                            if idx == match_idx {
1832                                Ok(match_dt.take().unwrap())
1833                            } else {
1834                                self.parse_type(variant, namespace)
1835                            }
1836                        })
1837                        .collect::<Result<Vec<_>, _>>()?;
1838                    let union_fields = build_union_fields(&children)?;
1839                    let mut dt = AvroDataType::new(
1840                        Codec::Union(children.into(), union_fields, UnionMode::Dense),
1841                        Default::default(),
1842                        None,
1843                    );
1844                    dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1845                        writer_to_reader: Arc::from(vec![Some((match_idx, resolution))]),
1846                        writer_is_union: false,
1847                        reader_is_union: true,
1848                    }));
1849                    Ok(dt)
1850                }
1851            }
1852            (
1853                Schema::Complex(ComplexType::Array(writer_array)),
1854                Schema::Complex(ComplexType::Array(reader_array)),
1855            ) => self.resolve_array(writer_array, reader_array, namespace),
1856            (
1857                Schema::Complex(ComplexType::Map(writer_map)),
1858                Schema::Complex(ComplexType::Map(reader_map)),
1859            ) => self.resolve_map(writer_map, reader_map, namespace),
1860            (
1861                Schema::Complex(ComplexType::Fixed(writer_fixed)),
1862                Schema::Complex(ComplexType::Fixed(reader_fixed)),
1863            ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1864            (
1865                Schema::Complex(ComplexType::Record(writer_record)),
1866                Schema::Complex(ComplexType::Record(reader_record)),
1867            ) => self.resolve_records(writer_record, reader_record, namespace),
1868            (
1869                Schema::Complex(ComplexType::Enum(writer_enum)),
1870                Schema::Complex(ComplexType::Enum(reader_enum)),
1871            ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1872            (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1873            (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1874            _ => Err(ArrowError::NotYetImplemented(
1875                "Other resolutions not yet implemented".to_string(),
1876            )),
1877        }
1878    }
1879
1880    fn find_best_union_match(
1881        &mut self,
1882        writer: &Schema<'a>,
1883        reader_variants: &[Schema<'a>],
1884        namespace: Option<&'a str>,
1885    ) -> Option<(usize, AvroDataType)> {
1886        let mut first_resolution = None;
1887        for (reader_index, reader) in reader_variants.iter().enumerate() {
1888            if let Ok(dt) = self.resolve_type(writer, reader, namespace) {
1889                match &dt.resolution {
1890                    None | Some(ResolutionInfo::Promotion(Promotion::Direct)) => {
1891                        // An exact match is best, return immediately.
1892                        return Some((reader_index, dt));
1893                    }
1894                    Some(_) => {
1895                        if first_resolution.is_none() {
1896                            // Store the first valid promotion but keep searching for a direct match.
1897                            first_resolution = Some((reader_index, dt));
1898                        }
1899                    }
1900                };
1901            }
1902        }
1903        first_resolution
1904    }
1905
1906    fn resolve_unions<'s>(
1907        &mut self,
1908        writer_variants: &'s [Schema<'a>],
1909        reader_variants: &'s [Schema<'a>],
1910        namespace: Option<&'a str>,
1911    ) -> Result<AvroDataType, ArrowError> {
1912        let mut resolved_reader_encodings = HashMap::new();
1913        let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1914            .iter()
1915            .map(|writer| {
1916                self.find_best_union_match(writer, reader_variants, namespace)
1917                    .map(|(match_idx, mut match_dt)| {
1918                        let resolution = match_dt
1919                            .resolution
1920                            .take()
1921                            .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1922                        // TODO: check for overlapping reader variants?
1923                        // They should not be possible in a valid schema.
1924                        resolved_reader_encodings.insert(match_idx, match_dt);
1925                        (match_idx, resolution)
1926                    })
1927            })
1928            .collect();
1929        let reader_encodings: Vec<AvroDataType> = reader_variants
1930            .iter()
1931            .enumerate()
1932            .map(|(reader_idx, reader_schema)| {
1933                if let Some(resolved) = resolved_reader_encodings.remove(&reader_idx) {
1934                    Ok(resolved)
1935                } else {
1936                    self.parse_type(reader_schema, namespace)
1937                }
1938            })
1939            .collect::<Result<_, _>>()?;
1940        let union_fields = build_union_fields(&reader_encodings)?;
1941        let mut dt = AvroDataType::new(
1942            Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1943            Default::default(),
1944            None,
1945        );
1946        dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1947            writer_to_reader: Arc::from(writer_to_reader),
1948            writer_is_union: true,
1949            reader_is_union: true,
1950        }));
1951        Ok(dt)
1952    }
1953
1954    fn resolve_array(
1955        &mut self,
1956        writer_array: &Array<'a>,
1957        reader_array: &Array<'a>,
1958        namespace: Option<&'a str>,
1959    ) -> Result<AvroDataType, ArrowError> {
1960        Ok(AvroDataType {
1961            nullability: None,
1962            metadata: reader_array.attributes.field_metadata(),
1963            codec: Codec::List(Arc::new(self.make_data_type(
1964                writer_array.items.as_ref(),
1965                Some(reader_array.items.as_ref()),
1966                namespace,
1967            )?)),
1968            resolution: None,
1969        })
1970    }
1971
1972    fn resolve_map(
1973        &mut self,
1974        writer_map: &Map<'a>,
1975        reader_map: &Map<'a>,
1976        namespace: Option<&'a str>,
1977    ) -> Result<AvroDataType, ArrowError> {
1978        Ok(AvroDataType {
1979            nullability: None,
1980            metadata: reader_map.attributes.field_metadata(),
1981            codec: Codec::Map(Arc::new(self.make_data_type(
1982                &writer_map.values,
1983                Some(&reader_map.values),
1984                namespace,
1985            )?)),
1986            resolution: None,
1987        })
1988    }
1989
1990    fn resolve_fixed<'s>(
1991        &mut self,
1992        writer_fixed: &Fixed<'a>,
1993        reader_fixed: &Fixed<'a>,
1994        reader_schema: &'s Schema<'a>,
1995        namespace: Option<&'a str>,
1996    ) -> Result<AvroDataType, ArrowError> {
1997        ensure_names_match(
1998            "Fixed",
1999            writer_fixed.name,
2000            writer_fixed.namespace,
2001            &writer_fixed.aliases,
2002            reader_fixed.name,
2003            reader_fixed.namespace,
2004            &reader_fixed.aliases,
2005        )?;
2006        if writer_fixed.size != reader_fixed.size {
2007            return Err(ArrowError::SchemaError(format!(
2008                "Fixed size mismatch for {}: writer={}, reader={}",
2009                reader_fixed.name, writer_fixed.size, reader_fixed.size
2010            )));
2011        }
2012        self.parse_type(reader_schema, namespace)
2013    }
2014
2015    fn resolve_primitives(
2016        &mut self,
2017        write_primitive: PrimitiveType,
2018        read_primitive: PrimitiveType,
2019        reader_schema: &Schema<'a>,
2020    ) -> Result<AvroDataType, ArrowError> {
2021        if write_primitive == read_primitive {
2022            return self.parse_type(reader_schema, None);
2023        }
2024        let promotion = match (write_primitive, read_primitive) {
2025            (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
2026            (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
2027            (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
2028            (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
2029            (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
2030            (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
2031            (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
2032            (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
2033            _ => {
2034                return Err(ArrowError::ParseError(format!(
2035                    "Illegal promotion {write_primitive:?} to {read_primitive:?}"
2036                )));
2037            }
2038        };
2039        let mut datatype = self.parse_type(reader_schema, None)?;
2040        datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
2041        Ok(datatype)
2042    }
2043
2044    // Resolve writer vs. reader enum schemas according to Avro 1.11.1.
2045    //
2046    // # How enums resolve (writer to reader)
2047    // Per “Schema Resolution”:
2048    // * The two schemas must refer to the same (unqualified) enum name (or match
2049    //   via alias rewriting).
2050    // * If the writer’s symbol is not present in the reader’s enum and the reader
2051    //   enum has a `default`, that `default` symbol must be used; otherwise,
2052    //   error.
2053    //   https://avro.apache.org/docs/1.11.1/specification/#schema-resolution
2054    // * Avro “Aliases” are applied from the reader side to rewrite the writer’s
2055    //   names during resolution. For robustness across ecosystems, we also accept
2056    //   symmetry here (see note below).
2057    //   https://avro.apache.org/docs/1.11.1/specification/#aliases
2058    //
2059    // # Rationale for this code path
2060    // 1. Do the work once at schema‑resolution time. Avro serializes an enum as a
2061    //    writer‑side position. Mapping positions on the hot decoder path is expensive
2062    //    if done with string lookups. This method builds a `writer_index to reader_index`
2063    //    vector once, so decoding just does an O(1) table lookup.
2064    // 2. Adopt the reader’s symbol set and order. We return an Arrow
2065    //    `Dictionary(Int32, Utf8)` whose dictionary values are the reader enum
2066    //    symbols. This makes downstream semantics match the reader schema, including
2067    //    Avro’s sort order rule that orders enums by symbol position in the schema.
2068    //    https://avro.apache.org/docs/1.11.1/specification/#sort-order
2069    // 3. Honor Avro’s `default` for enums. Avro 1.9+ allows a type‑level default
2070    //    on the enum. When the writer emits a symbol unknown to the reader, we map it
2071    //    to the reader’s validated `default` symbol if present; otherwise we signal an
2072    //    error at decoding time.
2073    //    https://avro.apache.org/docs/1.11.1/specification/#enums
2074    //
2075    // # Implementation notes
2076    // * We first check that enum names match or are*alias‑equivalent. The Avro
2077    //   spec describes alias rewriting using reader aliases; this implementation
2078    //   additionally treats writer aliases as acceptable for name matching to be
2079    //   resilient with schemas produced by different tooling.
2080    // * We build `EnumMapping`:
2081    //   - `mapping[i]` = reader index of the writer symbol at writer index `i`.
2082    //   - If the writer symbol is absent and the reader has a default, we store the
2083    //     reader index of that default.
2084    //   - Otherwise we store `-1` as a sentinel meaning unresolvable; the decoder
2085    //     must treat encountering such a value as an error, per the spec.
2086    // * We persist the reader symbol list in field metadata under
2087    //   `AVRO_ENUM_SYMBOLS_METADATA_KEY`, so consumers can inspect the dictionary
2088    //   without needing the original Avro schema.
2089    // * The Arrow representation is `Dictionary(Int32, Utf8)`, which aligns with
2090    //   Avro’s integer index encoding for enums.
2091    //
2092    // # Examples
2093    // * Writer `["A","B","C"]`, Reader `["A","B"]`, Reader default `"A"`
2094    //     `mapping = [0, 1, 0]`, `default_index = 0`.
2095    // * Writer `["A","B"]`, Reader `["B","A"]` (no default)
2096    //     `mapping = [1, 0]`, `default_index = -1`.
2097    // * Writer `["A","B","C"]`, Reader `["A","B"]` (no default)
2098    //     `mapping = [0, 1, -1]` (decode must error on `"C"`).
2099    fn resolve_enums(
2100        &mut self,
2101        writer_enum: &Enum<'a>,
2102        reader_enum: &Enum<'a>,
2103        reader_schema: &Schema<'a>,
2104        namespace: Option<&'a str>,
2105    ) -> Result<AvroDataType, ArrowError> {
2106        ensure_names_match(
2107            "Enum",
2108            writer_enum.name,
2109            writer_enum.namespace,
2110            &writer_enum.aliases,
2111            reader_enum.name,
2112            reader_enum.namespace,
2113            &reader_enum.aliases,
2114        )?;
2115        if writer_enum.symbols == reader_enum.symbols {
2116            return self.parse_type(reader_schema, namespace);
2117        }
2118        let reader_index: HashMap<&str, i32> = reader_enum
2119            .symbols
2120            .iter()
2121            .enumerate()
2122            .map(|(index, &symbol)| (symbol, index as i32))
2123            .collect();
2124        let default_index: i32 = match reader_enum.default {
2125            Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
2126                ArrowError::SchemaError(format!(
2127                    "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
2128                    reader_enum.name,
2129                ))
2130            })?,
2131            None => -1,
2132        };
2133        let mapping: Vec<i32> = writer_enum
2134            .symbols
2135            .iter()
2136            .map(|&write_symbol| {
2137                reader_index
2138                    .get(write_symbol)
2139                    .copied()
2140                    .unwrap_or(default_index)
2141            })
2142            .collect();
2143        if self.strict_mode && mapping.iter().any(|&m| m < 0) {
2144            return Err(ArrowError::SchemaError(format!(
2145                "Reader enum '{}' does not cover all writer symbols and no default is provided",
2146                reader_enum.name
2147            )));
2148        }
2149        let mut dt = self.parse_type(reader_schema, namespace)?;
2150        dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
2151            mapping: Arc::from(mapping),
2152            default_index,
2153        }));
2154        let reader_ns = reader_enum.namespace.or(namespace);
2155        self.resolver
2156            .register(reader_enum.name, reader_ns, dt.clone());
2157        Ok(dt)
2158    }
2159
2160    #[inline]
2161    fn build_writer_lookup(
2162        writer_record: &Record<'a>,
2163    ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
2164        let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
2165        for (idx, wf) in writer_record.fields.iter().enumerate() {
2166            // Avro field names are unique; last-in wins are acceptable and match previous behavior.
2167            map.insert(wf.name, idx);
2168        }
2169        // Track ambiguous writer aliases (alias used by multiple writer fields)
2170        let mut ambiguous: HashSet<&str> = HashSet::new();
2171        for (idx, wf) in writer_record.fields.iter().enumerate() {
2172            for &alias in &wf.aliases {
2173                match map.entry(alias) {
2174                    Entry::Occupied(e) if *e.get() != idx => {
2175                        ambiguous.insert(alias);
2176                    }
2177                    Entry::Vacant(e) => {
2178                        e.insert(idx);
2179                    }
2180                    _ => {}
2181                }
2182            }
2183        }
2184        (map, ambiguous)
2185    }
2186
2187    fn resolve_records(
2188        &mut self,
2189        writer_record: &Record<'a>,
2190        reader_record: &Record<'a>,
2191        namespace: Option<&'a str>,
2192    ) -> Result<AvroDataType, ArrowError> {
2193        ensure_names_match(
2194            "Record",
2195            writer_record.name,
2196            writer_record.namespace,
2197            &writer_record.aliases,
2198            reader_record.name,
2199            reader_record.namespace,
2200            &reader_record.aliases,
2201        )?;
2202        let writer_ns = writer_record.namespace.or(namespace);
2203        let reader_ns = reader_record.namespace.or(namespace);
2204        let mut reader_md = reader_record.attributes.field_metadata();
2205        reader_md.insert(
2206            AVRO_NAME_METADATA_KEY.to_string(),
2207            reader_record.name.to_string(),
2208        );
2209        if let Some(ns) = reader_ns {
2210            reader_md.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
2211        }
2212        // Build writer lookup and ambiguous alias set.
2213        let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
2214        let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
2215        let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
2216        // Capture default field indices during the main loop (one pass).
2217        let mut default_fields: Vec<usize> = Vec::new();
2218        for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
2219            // Direct name match, then reader aliases (a writer alias map is pre-populated).
2220            let mut match_idx = writer_lookup.get(r_field.name).copied();
2221            let mut matched_via_alias: Option<&str> = None;
2222            if match_idx.is_none() {
2223                for &alias in &r_field.aliases {
2224                    if let Some(i) = writer_lookup.get(alias).copied() {
2225                        if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
2226                            return Err(ArrowError::SchemaError(format!(
2227                                "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
2228                                r_field.name
2229                            )));
2230                        }
2231                        match_idx = Some(i);
2232                        matched_via_alias = Some(alias);
2233                        break;
2234                    }
2235                }
2236            }
2237            if let Some(wi) = match_idx {
2238                if writer_to_reader[wi].is_none() {
2239                    let w_schema = &writer_record.fields[wi].r#type;
2240                    let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
2241                    writer_to_reader[wi] = Some(reader_idx);
2242                    reader_fields.push(AvroField {
2243                        name: r_field.name.to_owned(),
2244                        data_type: dt,
2245                    });
2246                    continue;
2247                } else if self.strict_mode {
2248                    // Writer field already mapped and strict_mode => error
2249                    let existing_reader = writer_to_reader[wi].unwrap();
2250                    let via = matched_via_alias
2251                        .map(|a| format!("alias '{a}'"))
2252                        .unwrap_or_else(|| "name match".to_string());
2253                    return Err(ArrowError::SchemaError(format!(
2254                        "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
2255                        writer_record.fields[wi].name
2256                    )));
2257                }
2258                // Non-strict and already mapped -> fall through to defaulting logic
2259            }
2260            // No match (or conflicted in non-strict mode): attach default per Avro spec.
2261            let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
2262            if let Some(default_json) = r_field.default.as_ref() {
2263                dt.resolution = Some(ResolutionInfo::DefaultValue(
2264                    dt.parse_and_store_default(default_json)?,
2265                ));
2266                default_fields.push(reader_idx);
2267            } else if dt.nullability() == Some(Nullability::NullFirst) {
2268                // The only valid implicit default for a union is the first branch (null-first case).
2269                dt.resolution = Some(ResolutionInfo::DefaultValue(
2270                    dt.parse_and_store_default(&Value::Null)?,
2271                ));
2272                default_fields.push(reader_idx);
2273            } else {
2274                return Err(ArrowError::SchemaError(format!(
2275                    "Reader field '{}' not present in writer schema must have a default value",
2276                    r_field.name
2277                )));
2278            }
2279            reader_fields.push(AvroField {
2280                name: r_field.name.to_owned(),
2281                data_type: dt,
2282            });
2283        }
2284        // Build skip_fields in writer order; pre-size and push.
2285        let mut skip_fields: Vec<Option<AvroDataType>> =
2286            Vec::with_capacity(writer_record.fields.len());
2287        for (writer_index, writer_field) in writer_record.fields.iter().enumerate() {
2288            if writer_to_reader[writer_index].is_some() {
2289                skip_fields.push(None);
2290            } else {
2291                skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?));
2292            }
2293        }
2294        let resolved = AvroDataType::new_with_resolution(
2295            Codec::Struct(Arc::from(reader_fields)),
2296            reader_md,
2297            None,
2298            Some(ResolutionInfo::Record(ResolvedRecord {
2299                writer_to_reader: Arc::from(writer_to_reader),
2300                default_fields: Arc::from(default_fields),
2301                skip_fields: Arc::from(skip_fields),
2302            })),
2303        );
2304        // Register a resolved record by reader name+namespace for potential named type refs.
2305        self.resolver
2306            .register(reader_record.name, reader_ns, resolved.clone());
2307        Ok(resolved)
2308    }
2309}
2310
2311#[cfg(test)]
2312mod tests {
2313    use super::*;
2314    use crate::schema::{
2315        AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2316        Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2317    };
2318    use indexmap::IndexMap;
2319    use serde_json::{self, Value};
2320
2321    fn create_schema_with_logical_type(
2322        primitive_type: PrimitiveType,
2323        logical_type: &'static str,
2324    ) -> Schema<'static> {
2325        let attributes = Attributes {
2326            logical_type: Some(logical_type),
2327            additional: Default::default(),
2328        };
2329
2330        Schema::Type(Type {
2331            r#type: TypeName::Primitive(primitive_type),
2332            attributes,
2333        })
2334    }
2335
2336    fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2337        let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2338        let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2339        let mut maker = Maker::new(false, false);
2340        maker
2341            .make_data_type(&writer_schema, Some(&reader_schema), None)
2342            .expect("promotion should resolve")
2343    }
2344
2345    fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2346        Schema::TypeName(TypeName::Primitive(pt))
2347    }
2348    fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2349        Schema::Union(branches)
2350    }
2351
2352    #[test]
2353    fn test_date_logical_type() {
2354        let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2355
2356        let mut maker = Maker::new(false, false);
2357        let result = maker.make_data_type(&schema, None, None).unwrap();
2358
2359        assert!(matches!(result.codec, Codec::Date32));
2360    }
2361
2362    #[test]
2363    fn test_time_millis_logical_type() {
2364        let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2365
2366        let mut maker = Maker::new(false, false);
2367        let result = maker.make_data_type(&schema, None, None).unwrap();
2368
2369        assert!(matches!(result.codec, Codec::TimeMillis));
2370    }
2371
2372    #[test]
2373    fn test_time_micros_logical_type() {
2374        let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2375
2376        let mut maker = Maker::new(false, false);
2377        let result = maker.make_data_type(&schema, None, None).unwrap();
2378
2379        assert!(matches!(result.codec, Codec::TimeMicros));
2380    }
2381
2382    #[test]
2383    fn test_timestamp_millis_logical_type() {
2384        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2385
2386        let mut maker = Maker::new(false, false);
2387        let result = maker.make_data_type(&schema, None, None).unwrap();
2388
2389        assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2390    }
2391
2392    #[test]
2393    fn test_timestamp_micros_logical_type() {
2394        let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2395
2396        let mut maker = Maker::new(false, false);
2397        let result = maker.make_data_type(&schema, None, None).unwrap();
2398
2399        assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2400    }
2401
2402    #[test]
2403    fn test_local_timestamp_millis_logical_type() {
2404        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2405
2406        let mut maker = Maker::new(false, false);
2407        let result = maker.make_data_type(&schema, None, None).unwrap();
2408
2409        assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2410    }
2411
2412    #[test]
2413    fn test_local_timestamp_micros_logical_type() {
2414        let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2415
2416        let mut maker = Maker::new(false, false);
2417        let result = maker.make_data_type(&schema, None, None).unwrap();
2418
2419        assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2420    }
2421
2422    #[test]
2423    fn test_uuid_type() {
2424        let mut codec = Codec::Fixed(16);
2425        if let c @ Codec::Fixed(16) = &mut codec {
2426            *c = Codec::Uuid;
2427        }
2428        assert!(matches!(codec, Codec::Uuid));
2429    }
2430
2431    #[test]
2432    fn test_duration_logical_type() {
2433        let mut codec = Codec::Fixed(12);
2434
2435        if let c @ Codec::Fixed(12) = &mut codec {
2436            *c = Codec::Interval;
2437        }
2438
2439        assert!(matches!(codec, Codec::Interval));
2440    }
2441
2442    #[test]
2443    fn test_decimal_logical_type_not_implemented() {
2444        let codec = Codec::Fixed(16);
2445
2446        let process_decimal = || -> Result<(), ArrowError> {
2447            if let Codec::Fixed(_) = codec {
2448                return Err(ArrowError::NotYetImplemented(
2449                    "Decimals are not currently supported".to_string(),
2450                ));
2451            }
2452            Ok(())
2453        };
2454
2455        let result = process_decimal();
2456
2457        assert!(result.is_err());
2458        if let Err(ArrowError::NotYetImplemented(msg)) = result {
2459            assert!(msg.contains("Decimals are not currently supported"));
2460        } else {
2461            panic!("Expected NotYetImplemented error");
2462        }
2463    }
2464    #[test]
2465    fn test_unknown_logical_type_added_to_metadata() {
2466        let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2467
2468        let mut maker = Maker::new(false, false);
2469        let result = maker.make_data_type(&schema, None, None).unwrap();
2470
2471        assert_eq!(
2472            result.metadata.get("logicalType"),
2473            Some(&"custom-type".to_string())
2474        );
2475    }
2476
2477    #[test]
2478    fn test_string_with_utf8view_enabled() {
2479        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2480
2481        let mut maker = Maker::new(true, false);
2482        let result = maker.make_data_type(&schema, None, None).unwrap();
2483
2484        assert!(matches!(result.codec, Codec::Utf8View));
2485    }
2486
2487    #[test]
2488    fn test_string_without_utf8view_enabled() {
2489        let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2490
2491        let mut maker = Maker::new(false, false);
2492        let result = maker.make_data_type(&schema, None, None).unwrap();
2493
2494        assert!(matches!(result.codec, Codec::Utf8));
2495    }
2496
2497    #[test]
2498    fn test_record_with_string_and_utf8view_enabled() {
2499        let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2500
2501        let avro_field = crate::schema::Field {
2502            name: "string_field",
2503            r#type: field_schema,
2504            default: None,
2505            doc: None,
2506            aliases: vec![],
2507        };
2508
2509        let record = Record {
2510            name: "test_record",
2511            namespace: None,
2512            aliases: vec![],
2513            doc: None,
2514            fields: vec![avro_field],
2515            attributes: Attributes::default(),
2516        };
2517
2518        let schema = Schema::Complex(ComplexType::Record(record));
2519
2520        let mut maker = Maker::new(true, false);
2521        let result = maker.make_data_type(&schema, None, None).unwrap();
2522
2523        if let Codec::Struct(fields) = &result.codec {
2524            let first_field_codec = &fields[0].data_type().codec;
2525            assert!(matches!(first_field_codec, Codec::Utf8View));
2526        } else {
2527            panic!("Expected Struct codec");
2528        }
2529    }
2530
2531    #[test]
2532    fn test_union_with_strict_mode() {
2533        let schema = Schema::Union(vec![
2534            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2535            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2536        ]);
2537
2538        let mut maker = Maker::new(false, true);
2539        let result = maker.make_data_type(&schema, None, None);
2540
2541        assert!(result.is_err());
2542        match result {
2543            Err(ArrowError::SchemaError(msg)) => {
2544                assert!(msg.contains(
2545                    "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2546                ));
2547            }
2548            _ => panic!("Expected SchemaError"),
2549        }
2550    }
2551
2552    #[test]
2553    fn test_resolve_int_to_float_promotion() {
2554        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2555        assert!(matches!(result.codec, Codec::Float32));
2556        assert_eq!(
2557            result.resolution,
2558            Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2559        );
2560    }
2561
2562    #[test]
2563    fn test_resolve_int_to_double_promotion() {
2564        let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2565        assert!(matches!(result.codec, Codec::Float64));
2566        assert_eq!(
2567            result.resolution,
2568            Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2569        );
2570    }
2571
2572    #[test]
2573    fn test_resolve_long_to_float_promotion() {
2574        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2575        assert!(matches!(result.codec, Codec::Float32));
2576        assert_eq!(
2577            result.resolution,
2578            Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2579        );
2580    }
2581
2582    #[test]
2583    fn test_resolve_long_to_double_promotion() {
2584        let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2585        assert!(matches!(result.codec, Codec::Float64));
2586        assert_eq!(
2587            result.resolution,
2588            Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2589        );
2590    }
2591
2592    #[test]
2593    fn test_resolve_float_to_double_promotion() {
2594        let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2595        assert!(matches!(result.codec, Codec::Float64));
2596        assert_eq!(
2597            result.resolution,
2598            Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2599        );
2600    }
2601
2602    #[test]
2603    fn test_resolve_string_to_bytes_promotion() {
2604        let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2605        assert!(matches!(result.codec, Codec::Binary));
2606        assert_eq!(
2607            result.resolution,
2608            Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2609        );
2610    }
2611
2612    #[test]
2613    fn test_resolve_bytes_to_string_promotion() {
2614        let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2615        assert!(matches!(result.codec, Codec::Utf8));
2616        assert_eq!(
2617            result.resolution,
2618            Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2619        );
2620    }
2621
2622    #[test]
2623    fn test_resolve_illegal_promotion_double_to_float_errors() {
2624        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2625        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2626        let mut maker = Maker::new(false, false);
2627        let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2628        assert!(result.is_err());
2629        match result {
2630            Err(ArrowError::ParseError(msg)) => {
2631                assert!(msg.contains("Illegal promotion"));
2632            }
2633            _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2634        }
2635    }
2636
2637    #[test]
2638    fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2639        let writer = Schema::Union(vec![
2640            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2641            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2642        ]);
2643        let reader = Schema::Union(vec![
2644            Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2645            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2646        ]);
2647        let mut maker = Maker::new(false, false);
2648        let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2649        assert!(matches!(result.codec, Codec::Float64));
2650        assert_eq!(
2651            result.resolution,
2652            Some(ResolutionInfo::Union(ResolvedUnion {
2653                writer_to_reader: [
2654                    None,
2655                    Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2656                ]
2657                .into(),
2658                writer_is_union: true,
2659                reader_is_union: true,
2660            }))
2661        );
2662        assert_eq!(result.nullability, Some(Nullability::NullFirst));
2663    }
2664
2665    #[test]
2666    fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2667        let writer = mk_union(vec![
2668            mk_primitive(PrimitiveType::String),
2669            mk_primitive(PrimitiveType::Long),
2670        ]);
2671        let reader = mk_primitive(PrimitiveType::Bytes);
2672        let mut maker = Maker::new(false, false);
2673        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2674        assert!(matches!(dt.codec(), Codec::Binary));
2675        let resolved = match dt.resolution {
2676            Some(ResolutionInfo::Union(u)) => u,
2677            other => panic!("expected union resolution info, got {other:?}"),
2678        };
2679        assert!(resolved.writer_is_union && !resolved.reader_is_union);
2680        assert_eq!(
2681            resolved.writer_to_reader.as_ref(),
2682            &[
2683                Some((0, ResolutionInfo::Promotion(Promotion::StringToBytes))),
2684                None
2685            ]
2686        );
2687    }
2688
2689    #[test]
2690    fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2691        let writer = mk_primitive(PrimitiveType::Long);
2692        let reader = mk_union(vec![
2693            mk_primitive(PrimitiveType::Long),
2694            mk_primitive(PrimitiveType::Double),
2695        ]);
2696        let mut maker = Maker::new(false, false);
2697        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2698        let resolved = match dt.resolution {
2699            Some(ResolutionInfo::Union(u)) => u,
2700            other => panic!("expected union resolution info, got {other:?}"),
2701        };
2702        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2703        assert_eq!(
2704            resolved.writer_to_reader.as_ref(),
2705            &[Some((0, ResolutionInfo::Promotion(Promotion::Direct)))]
2706        );
2707    }
2708
2709    #[test]
2710    fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2711        let writer = mk_primitive(PrimitiveType::Int);
2712        let reader = mk_union(vec![
2713            mk_primitive(PrimitiveType::Null),
2714            mk_primitive(PrimitiveType::Long),
2715            mk_primitive(PrimitiveType::String),
2716        ]);
2717        let mut maker = Maker::new(false, false);
2718        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2719        let resolved = match dt.resolution {
2720            Some(ResolutionInfo::Union(u)) => u,
2721            other => panic!("expected union resolution info, got {other:?}"),
2722        };
2723        assert_eq!(
2724            resolved.writer_to_reader.as_ref(),
2725            &[Some((1, ResolutionInfo::Promotion(Promotion::IntToLong)))]
2726        );
2727    }
2728
2729    #[test]
2730    fn test_resolve_writer_non_union_to_reader_union_preserves_inner_record_defaults() {
2731        // Writer: record Inner{a: int}
2732        // Reader: union [Inner{a: int, b: int default 42}, string]
2733        // The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
2734        let writer = Schema::Complex(ComplexType::Record(Record {
2735            name: "Inner",
2736            namespace: None,
2737            doc: None,
2738            aliases: vec![],
2739            fields: vec![AvroFieldSchema {
2740                name: "a",
2741                doc: None,
2742                r#type: mk_primitive(PrimitiveType::Int),
2743                default: None,
2744                aliases: vec![],
2745            }],
2746            attributes: Attributes::default(),
2747        }));
2748        let reader = mk_union(vec![
2749            Schema::Complex(ComplexType::Record(Record {
2750                name: "Inner",
2751                namespace: None,
2752                doc: None,
2753                aliases: vec![],
2754                fields: vec![
2755                    AvroFieldSchema {
2756                        name: "a",
2757                        doc: None,
2758                        r#type: mk_primitive(PrimitiveType::Int),
2759                        default: None,
2760                        aliases: vec![],
2761                    },
2762                    AvroFieldSchema {
2763                        name: "b",
2764                        doc: None,
2765                        r#type: mk_primitive(PrimitiveType::Int),
2766                        default: Some(Value::Number(serde_json::Number::from(42))),
2767                        aliases: vec![],
2768                    },
2769                ],
2770                attributes: Attributes::default(),
2771            })),
2772            mk_primitive(PrimitiveType::String),
2773        ]);
2774        let mut maker = Maker::new(false, false);
2775        let dt = maker
2776            .make_data_type(&writer, Some(&reader), None)
2777            .expect("resolution should succeed");
2778        // Verify the union resolution structure
2779        let resolved = match dt.resolution.as_ref() {
2780            Some(ResolutionInfo::Union(u)) => u,
2781            other => panic!("expected union resolution info, got {other:?}"),
2782        };
2783        assert!(!resolved.writer_is_union && resolved.reader_is_union);
2784        assert_eq!(
2785            resolved.writer_to_reader.len(),
2786            1,
2787            "expected the non-union record to resolve to a union variant"
2788        );
2789        let resolution = match resolved.writer_to_reader.first().unwrap() {
2790            Some((0, resolution)) => resolution,
2791            other => panic!("unexpected writer-to-reader table value {other:?}"),
2792        };
2793        match resolution {
2794            ResolutionInfo::Record(ResolvedRecord {
2795                writer_to_reader,
2796                default_fields,
2797                skip_fields,
2798            }) => {
2799                assert_eq!(writer_to_reader.len(), 1);
2800                assert_eq!(writer_to_reader[0], Some(0));
2801                assert_eq!(default_fields.len(), 1);
2802                assert_eq!(default_fields[0], 1);
2803                assert_eq!(skip_fields.len(), 1);
2804                assert_eq!(skip_fields[0], None);
2805            }
2806            other => panic!("unexpected resolution {other:?}"),
2807        }
2808        // The matching child (Inner at index 0) should have field b with DefaultValue
2809        let children = match dt.codec() {
2810            Codec::Union(children, _, _) => children,
2811            other => panic!("expected union codec, got {other:?}"),
2812        };
2813        let inner_fields = match children[0].codec() {
2814            Codec::Struct(f) => f,
2815            other => panic!("expected struct codec for Inner, got {other:?}"),
2816        };
2817        assert_eq!(inner_fields.len(), 2);
2818        assert_eq!(inner_fields[1].name(), "b");
2819        assert_eq!(
2820            inner_fields[1].data_type().resolution,
2821            Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2822            "field b should have DefaultValue(Int(42)) from schema resolution"
2823        );
2824    }
2825
2826    #[test]
2827    fn test_resolve_writer_union_to_reader_union_preserves_inner_record_defaults() {
2828        // Writer: record [string, Inner{a: int}]
2829        // Reader: union [Inner{a: int, b: int default 42}, string]
2830        // The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
2831        let writer = mk_union(vec![
2832            mk_primitive(PrimitiveType::String),
2833            Schema::Complex(ComplexType::Record(Record {
2834                name: "Inner",
2835                namespace: None,
2836                doc: None,
2837                aliases: vec![],
2838                fields: vec![AvroFieldSchema {
2839                    name: "a",
2840                    doc: None,
2841                    r#type: mk_primitive(PrimitiveType::Int),
2842                    default: None,
2843                    aliases: vec![],
2844                }],
2845                attributes: Attributes::default(),
2846            })),
2847        ]);
2848        let reader = mk_union(vec![
2849            Schema::Complex(ComplexType::Record(Record {
2850                name: "Inner",
2851                namespace: None,
2852                doc: None,
2853                aliases: vec![],
2854                fields: vec![
2855                    AvroFieldSchema {
2856                        name: "a",
2857                        doc: None,
2858                        r#type: mk_primitive(PrimitiveType::Int),
2859                        default: None,
2860                        aliases: vec![],
2861                    },
2862                    AvroFieldSchema {
2863                        name: "b",
2864                        doc: None,
2865                        r#type: mk_primitive(PrimitiveType::Int),
2866                        default: Some(Value::Number(serde_json::Number::from(42))),
2867                        aliases: vec![],
2868                    },
2869                ],
2870                attributes: Attributes::default(),
2871            })),
2872            mk_primitive(PrimitiveType::String),
2873        ]);
2874        let mut maker = Maker::new(false, false);
2875        let dt = maker
2876            .make_data_type(&writer, Some(&reader), None)
2877            .expect("resolution should succeed");
2878        // Verify the union resolution structure
2879        let resolved = match dt.resolution.as_ref() {
2880            Some(ResolutionInfo::Union(u)) => u,
2881            other => panic!("expected union resolution info, got {other:?}"),
2882        };
2883        assert!(resolved.writer_is_union && resolved.reader_is_union);
2884        assert_eq!(resolved.writer_to_reader.len(), 2);
2885        let resolution = match resolved.writer_to_reader[1].as_ref() {
2886            Some((0, resolution)) => resolution,
2887            other => panic!("unexpected writer-to-reader table value {other:?}"),
2888        };
2889        match resolution {
2890            ResolutionInfo::Record(ResolvedRecord {
2891                writer_to_reader,
2892                default_fields,
2893                skip_fields,
2894            }) => {
2895                assert_eq!(writer_to_reader.len(), 1);
2896                assert_eq!(writer_to_reader[0], Some(0));
2897                assert_eq!(default_fields.len(), 1);
2898                assert_eq!(default_fields[0], 1);
2899                assert_eq!(skip_fields.len(), 1);
2900                assert_eq!(skip_fields[0], None);
2901            }
2902            other => panic!("unexpected resolution {other:?}"),
2903        }
2904        // The matching child (Inner at index 0) should have field b with DefaultValue
2905        let children = match dt.codec() {
2906            Codec::Union(children, _, _) => children,
2907            other => panic!("expected union codec, got {other:?}"),
2908        };
2909        let inner_fields = match children[0].codec() {
2910            Codec::Struct(f) => f,
2911            other => panic!("expected struct codec for Inner, got {other:?}"),
2912        };
2913        assert_eq!(inner_fields.len(), 2);
2914        assert_eq!(inner_fields[1].name(), "b");
2915        assert_eq!(
2916            inner_fields[1].data_type().resolution,
2917            Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2918            "field b should have DefaultValue(Int(42)) from schema resolution"
2919        );
2920    }
2921
2922    #[test]
2923    fn test_resolve_both_nullable_unions_direct_match() {
2924        let writer = mk_union(vec![
2925            mk_primitive(PrimitiveType::Null),
2926            mk_primitive(PrimitiveType::String),
2927        ]);
2928        let reader = mk_union(vec![
2929            mk_primitive(PrimitiveType::String),
2930            mk_primitive(PrimitiveType::Null),
2931        ]);
2932        let mut maker = Maker::new(false, false);
2933        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2934        assert!(matches!(dt.codec(), Codec::Utf8));
2935        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2936        assert_eq!(
2937            dt.resolution,
2938            Some(ResolutionInfo::Union(ResolvedUnion {
2939                writer_to_reader: [
2940                    None,
2941                    Some((0, ResolutionInfo::Promotion(Promotion::Direct)))
2942                ]
2943                .into(),
2944                writer_is_union: true,
2945                reader_is_union: true
2946            }))
2947        );
2948    }
2949
2950    #[test]
2951    fn test_resolve_both_nullable_unions_with_promotion() {
2952        let writer = mk_union(vec![
2953            mk_primitive(PrimitiveType::Null),
2954            mk_primitive(PrimitiveType::Int),
2955        ]);
2956        let reader = mk_union(vec![
2957            mk_primitive(PrimitiveType::Double),
2958            mk_primitive(PrimitiveType::Null),
2959        ]);
2960        let mut maker = Maker::new(false, false);
2961        let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2962        assert!(matches!(dt.codec(), Codec::Float64));
2963        assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2964        assert_eq!(
2965            dt.resolution,
2966            Some(ResolutionInfo::Union(ResolvedUnion {
2967                writer_to_reader: [
2968                    None,
2969                    Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2970                ]
2971                .into(),
2972                writer_is_union: true,
2973                reader_is_union: true
2974            }))
2975        );
2976    }
2977
2978    #[test]
2979    fn test_resolve_type_promotion() {
2980        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2981        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2982        let mut maker = Maker::new(false, false);
2983        let result = maker
2984            .make_data_type(&writer_schema, Some(&reader_schema), None)
2985            .unwrap();
2986        assert!(matches!(result.codec, Codec::Int64));
2987        assert_eq!(
2988            result.resolution,
2989            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2990        );
2991    }
2992
2993    #[test]
2994    fn test_nested_record_type_reuse_without_namespace() {
2995        let schema_str = r#"
2996        {
2997          "type": "record",
2998          "name": "Record",
2999          "fields": [
3000            {
3001              "name": "nested",
3002              "type": {
3003                "type": "record",
3004                "name": "Nested",
3005                "fields": [
3006                  { "name": "nested_int", "type": "int" }
3007                ]
3008              }
3009            },
3010            { "name": "nestedRecord", "type": "Nested" },
3011            { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
3012            { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
3013          ]
3014        }
3015        "#;
3016
3017        let schema: Schema = serde_json::from_str(schema_str).unwrap();
3018
3019        let mut maker = Maker::new(false, false);
3020        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3021
3022        if let Codec::Struct(fields) = avro_data_type.codec() {
3023            assert_eq!(fields.len(), 4);
3024
3025            // nested
3026            assert_eq!(fields[0].name(), "nested");
3027            let nested_data_type = fields[0].data_type();
3028            if let Codec::Struct(nested_fields) = nested_data_type.codec() {
3029                assert_eq!(nested_fields.len(), 1);
3030                assert_eq!(nested_fields[0].name(), "nested_int");
3031                assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
3032            } else {
3033                panic!(
3034                    "'nested' field is not a struct but {:?}",
3035                    nested_data_type.codec()
3036                );
3037            }
3038
3039            // nestedRecord
3040            assert_eq!(fields[1].name(), "nestedRecord");
3041            let nested_record_data_type = fields[1].data_type();
3042            assert_eq!(
3043                nested_record_data_type.codec().data_type(),
3044                nested_data_type.codec().data_type()
3045            );
3046
3047            // nestedArray
3048            assert_eq!(fields[2].name(), "nestedArray");
3049            if let Codec::List(item_type) = fields[2].data_type().codec() {
3050                assert_eq!(
3051                    item_type.codec().data_type(),
3052                    nested_data_type.codec().data_type()
3053                );
3054            } else {
3055                panic!("'nestedArray' field is not a list");
3056            }
3057
3058            // nestedMap
3059            assert_eq!(fields[3].name(), "nestedMap");
3060            if let Codec::Map(value_type) = fields[3].data_type().codec() {
3061                assert_eq!(
3062                    value_type.codec().data_type(),
3063                    nested_data_type.codec().data_type()
3064                );
3065            } else {
3066                panic!("'nestedMap' field is not a map");
3067            }
3068        } else {
3069            panic!("Top-level schema is not a struct");
3070        }
3071    }
3072
3073    #[test]
3074    fn test_nested_enum_type_reuse_with_namespace() {
3075        let schema_str = r#"
3076        {
3077          "type": "record",
3078          "name": "Record",
3079          "namespace": "record_ns",
3080          "fields": [
3081            {
3082              "name": "status",
3083              "type": {
3084                "type": "enum",
3085                "name": "Status",
3086                "namespace": "enum_ns",
3087                "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
3088              }
3089            },
3090            { "name": "backupStatus", "type": "enum_ns.Status" },
3091            { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
3092            { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
3093          ]
3094        }
3095        "#;
3096
3097        let schema: Schema = serde_json::from_str(schema_str).unwrap();
3098
3099        let mut maker = Maker::new(false, false);
3100        let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3101
3102        if let Codec::Struct(fields) = avro_data_type.codec() {
3103            assert_eq!(fields.len(), 4);
3104
3105            // status
3106            assert_eq!(fields[0].name(), "status");
3107            let status_data_type = fields[0].data_type();
3108            if let Codec::Enum(symbols) = status_data_type.codec() {
3109                assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
3110            } else {
3111                panic!(
3112                    "'status' field is not an enum but {:?}",
3113                    status_data_type.codec()
3114                );
3115            }
3116
3117            // backupStatus
3118            assert_eq!(fields[1].name(), "backupStatus");
3119            let backup_status_data_type = fields[1].data_type();
3120            assert_eq!(
3121                backup_status_data_type.codec().data_type(),
3122                status_data_type.codec().data_type()
3123            );
3124
3125            // statusHistory
3126            assert_eq!(fields[2].name(), "statusHistory");
3127            if let Codec::List(item_type) = fields[2].data_type().codec() {
3128                assert_eq!(
3129                    item_type.codec().data_type(),
3130                    status_data_type.codec().data_type()
3131                );
3132            } else {
3133                panic!("'statusHistory' field is not a list");
3134            }
3135
3136            // statusMap
3137            assert_eq!(fields[3].name(), "statusMap");
3138            if let Codec::Map(value_type) = fields[3].data_type().codec() {
3139                assert_eq!(
3140                    value_type.codec().data_type(),
3141                    status_data_type.codec().data_type()
3142                );
3143            } else {
3144                panic!("'statusMap' field is not a map");
3145            }
3146        } else {
3147            panic!("Top-level schema is not a struct");
3148        }
3149    }
3150
3151    #[test]
3152    fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
3153        let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3154        let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3155        let mut maker = Maker::new(false, false);
3156        let data_type = maker
3157            .make_data_type(&writer_schema, Some(&reader_schema), None)
3158            .expect("resolution should succeed");
3159        let field = AvroField {
3160            name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
3161            data_type,
3162        };
3163        assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
3164        assert!(matches!(field.data_type().codec(), Codec::Utf8));
3165    }
3166
3167    fn json_string(s: &str) -> Value {
3168        Value::String(s.to_string())
3169    }
3170
3171    fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
3172        let stored = dt
3173            .metadata
3174            .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
3175            .cloned()
3176            .unwrap_or_default();
3177        let expected = serde_json::to_string(default_json).unwrap();
3178        assert_eq!(stored, expected, "stored default metadata should match");
3179    }
3180
3181    #[test]
3182    fn test_validate_and_store_default_null_and_nullability_rules() {
3183        let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
3184        let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
3185        assert_eq!(lit, AvroLiteral::Null);
3186        assert_default_stored(&dt_null, &Value::Null);
3187        let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3188        let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
3189        assert!(
3190            err.to_string()
3191                .contains("JSON null default is only valid for `null` type"),
3192            "unexpected error: {err}"
3193        );
3194        let mut dt_int_nf =
3195            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
3196        let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
3197        assert_eq!(lit2, AvroLiteral::Null);
3198        assert_default_stored(&dt_int_nf, &Value::Null);
3199        let mut dt_int_ns =
3200            AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
3201        let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
3202        assert!(
3203            err2.to_string()
3204                .contains("JSON null default is only valid for `null` type"),
3205            "unexpected error: {err2}"
3206        );
3207    }
3208
3209    #[test]
3210    fn test_validate_and_store_default_primitives_and_temporal() {
3211        let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
3212        let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
3213        assert_eq!(lit, AvroLiteral::Boolean(true));
3214        assert_default_stored(&dt_bool, &Value::Bool(true));
3215        let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3216        let lit = dt_i32
3217            .parse_and_store_default(&serde_json::json!(123))
3218            .unwrap();
3219        assert_eq!(lit, AvroLiteral::Int(123));
3220        assert_default_stored(&dt_i32, &serde_json::json!(123));
3221        let err = dt_i32
3222            .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
3223            .unwrap_err();
3224        assert!(format!("{err}").contains("out of i32 range"));
3225        let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3226        let lit = dt_i64
3227            .parse_and_store_default(&serde_json::json!(1234567890))
3228            .unwrap();
3229        assert_eq!(lit, AvroLiteral::Long(1234567890));
3230        assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
3231        let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
3232        let lit = dt_f32
3233            .parse_and_store_default(&serde_json::json!(1.25))
3234            .unwrap();
3235        assert_eq!(lit, AvroLiteral::Float(1.25));
3236        assert_default_stored(&dt_f32, &serde_json::json!(1.25));
3237        let err = dt_f32
3238            .parse_and_store_default(&serde_json::json!(1e39))
3239            .unwrap_err();
3240        assert!(format!("{err}").contains("out of f32 range"));
3241        let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3242        let lit = dt_f64
3243            .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
3244            .unwrap();
3245        assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
3246        assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
3247        let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
3248        let l = dt_str
3249            .parse_and_store_default(&json_string("hello"))
3250            .unwrap();
3251        assert_eq!(l, AvroLiteral::String("hello".into()));
3252        assert_default_stored(&dt_str, &json_string("hello"));
3253        let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
3254        let l = dt_strv
3255            .parse_and_store_default(&json_string("view"))
3256            .unwrap();
3257        assert_eq!(l, AvroLiteral::String("view".into()));
3258        assert_default_stored(&dt_strv, &json_string("view"));
3259        let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
3260        let l = dt_uuid
3261            .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
3262            .unwrap();
3263        assert_eq!(
3264            l,
3265            AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
3266        );
3267        let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
3268        let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
3269        assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
3270        let err = dt_bin
3271            .parse_and_store_default(&json_string("€")) // U+20AC
3272            .unwrap_err();
3273        assert!(format!("{err}").contains("Invalid codepoint"));
3274        let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
3275        let ld = dt_date
3276            .parse_and_store_default(&serde_json::json!(1))
3277            .unwrap();
3278        assert_eq!(ld, AvroLiteral::Int(1));
3279        let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
3280        let lt = dt_tmill
3281            .parse_and_store_default(&serde_json::json!(86_400_000))
3282            .unwrap();
3283        assert_eq!(lt, AvroLiteral::Int(86_400_000));
3284        let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
3285        let ltm = dt_tmicros
3286            .parse_and_store_default(&serde_json::json!(1_000_000))
3287            .unwrap();
3288        assert_eq!(ltm, AvroLiteral::Long(1_000_000));
3289        let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
3290        let l1 = dt_ts_milli
3291            .parse_and_store_default(&serde_json::json!(123))
3292            .unwrap();
3293        assert_eq!(l1, AvroLiteral::Long(123));
3294        let mut dt_ts_micro =
3295            AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
3296        let l2 = dt_ts_micro
3297            .parse_and_store_default(&serde_json::json!(456))
3298            .unwrap();
3299        assert_eq!(l2, AvroLiteral::Long(456));
3300    }
3301
3302    #[cfg(feature = "avro_custom_types")]
3303    #[test]
3304    fn test_validate_and_store_default_custom_integer_ranges() {
3305        let mut dt_i8 = AvroDataType::new(Codec::Int8, HashMap::new(), None);
3306        let lit_i8 = dt_i8
3307            .parse_and_store_default(&serde_json::json!(i8::MAX))
3308            .unwrap();
3309        assert_eq!(lit_i8, AvroLiteral::Int(i8::MAX as i32));
3310        let err_i8_high = dt_i8
3311            .parse_and_store_default(&serde_json::json!(i8::MAX as i64 + 1))
3312            .unwrap_err();
3313        assert!(err_i8_high.to_string().contains("out of i8 range"));
3314        let err_i8_low = dt_i8
3315            .parse_and_store_default(&serde_json::json!(i8::MIN as i64 - 1))
3316            .unwrap_err();
3317        assert!(err_i8_low.to_string().contains("out of i8 range"));
3318
3319        let mut dt_i16 = AvroDataType::new(Codec::Int16, HashMap::new(), None);
3320        let lit_i16 = dt_i16
3321            .parse_and_store_default(&serde_json::json!(i16::MIN))
3322            .unwrap();
3323        assert_eq!(lit_i16, AvroLiteral::Int(i16::MIN as i32));
3324        let err_i16_high = dt_i16
3325            .parse_and_store_default(&serde_json::json!(i16::MAX as i64 + 1))
3326            .unwrap_err();
3327        assert!(err_i16_high.to_string().contains("out of i16 range"));
3328        let err_i16_low = dt_i16
3329            .parse_and_store_default(&serde_json::json!(i16::MIN as i64 - 1))
3330            .unwrap_err();
3331        assert!(err_i16_low.to_string().contains("out of i16 range"));
3332
3333        let mut dt_u8 = AvroDataType::new(Codec::UInt8, HashMap::new(), None);
3334        let lit_u8 = dt_u8
3335            .parse_and_store_default(&serde_json::json!(u8::MAX))
3336            .unwrap();
3337        assert_eq!(lit_u8, AvroLiteral::Int(u8::MAX as i32));
3338        let err_u8_neg = dt_u8
3339            .parse_and_store_default(&serde_json::json!(-1))
3340            .unwrap_err();
3341        assert!(err_u8_neg.to_string().contains("out of u8 range"));
3342        let err_u8_high = dt_u8
3343            .parse_and_store_default(&serde_json::json!(u8::MAX as i64 + 1))
3344            .unwrap_err();
3345        assert!(err_u8_high.to_string().contains("out of u8 range"));
3346
3347        let mut dt_u16 = AvroDataType::new(Codec::UInt16, HashMap::new(), None);
3348        let lit_u16 = dt_u16
3349            .parse_and_store_default(&serde_json::json!(u16::MAX))
3350            .unwrap();
3351        assert_eq!(lit_u16, AvroLiteral::Int(u16::MAX as i32));
3352        let err_u16_neg = dt_u16
3353            .parse_and_store_default(&serde_json::json!(-1))
3354            .unwrap_err();
3355        assert!(err_u16_neg.to_string().contains("out of u16 range"));
3356        let err_u16_high = dt_u16
3357            .parse_and_store_default(&serde_json::json!(u16::MAX as i64 + 1))
3358            .unwrap_err();
3359        assert!(err_u16_high.to_string().contains("out of u16 range"));
3360
3361        let mut dt_u32 = AvroDataType::new(Codec::UInt32, HashMap::new(), None);
3362        let lit_u32 = dt_u32
3363            .parse_and_store_default(&serde_json::json!(u32::MAX as i64))
3364            .unwrap();
3365        assert_eq!(lit_u32, AvroLiteral::Long(u32::MAX as i64));
3366        let err_u32_neg = dt_u32
3367            .parse_and_store_default(&serde_json::json!(-1))
3368            .unwrap_err();
3369        assert!(err_u32_neg.to_string().contains("out of u32 range"));
3370        let err_u32_high = dt_u32
3371            .parse_and_store_default(&serde_json::json!(u32::MAX as i64 + 1))
3372            .unwrap_err();
3373        assert!(err_u32_high.to_string().contains("out of u32 range"));
3374    }
3375
3376    #[test]
3377    fn test_validate_and_store_default_fixed_decimal_interval() {
3378        let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
3379        let l = dt_fixed
3380            .parse_and_store_default(&json_string("WXYZ"))
3381            .unwrap();
3382        assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
3383        let err = dt_fixed
3384            .parse_and_store_default(&json_string("TOO LONG"))
3385            .unwrap_err();
3386        assert!(err.to_string().contains("Default length"));
3387        let mut dt_dec_fixed =
3388            AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
3389        let l = dt_dec_fixed
3390            .parse_and_store_default(&json_string("abc"))
3391            .unwrap();
3392        assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
3393        let err = dt_dec_fixed
3394            .parse_and_store_default(&json_string("toolong"))
3395            .unwrap_err();
3396        assert!(err.to_string().contains("Default length"));
3397        let mut dt_dec_bytes =
3398            AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3399        let l = dt_dec_bytes
3400            .parse_and_store_default(&json_string("freeform"))
3401            .unwrap();
3402        assert_eq!(
3403            l,
3404            AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
3405        );
3406        let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
3407        let l = dt_interval
3408            .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
3409            .unwrap();
3410        assert_eq!(
3411            l,
3412            AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
3413        );
3414        let err = dt_interval
3415            .parse_and_store_default(&json_string("short"))
3416            .unwrap_err();
3417        assert!(err.to_string().contains("Default length"));
3418    }
3419
3420    #[test]
3421    fn test_validate_and_store_default_enum_list_map_struct() {
3422        let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
3423            .into_iter()
3424            .collect();
3425        let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
3426        let l = dt_enum
3427            .parse_and_store_default(&json_string("GREEN"))
3428            .unwrap();
3429        assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
3430        let err = dt_enum
3431            .parse_and_store_default(&json_string("YELLOW"))
3432            .unwrap_err();
3433        assert!(err.to_string().contains("Default enum symbol"));
3434        let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3435        let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
3436        let val = serde_json::json!([1, 2, 3]);
3437        let l = dt_list.parse_and_store_default(&val).unwrap();
3438        assert_eq!(
3439            l,
3440            AvroLiteral::Array(vec![
3441                AvroLiteral::Long(1),
3442                AvroLiteral::Long(2),
3443                AvroLiteral::Long(3)
3444            ])
3445        );
3446        let err = dt_list
3447            .parse_and_store_default(&serde_json::json!({"not":"array"}))
3448            .unwrap_err();
3449        assert!(err.to_string().contains("JSON array"));
3450        let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3451        let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
3452        let mv = serde_json::json!({"x": 1.5, "y": 2.5});
3453        let l = dt_map.parse_and_store_default(&mv).unwrap();
3454        let mut expected = IndexMap::new();
3455        expected.insert("x".into(), AvroLiteral::Double(1.5));
3456        expected.insert("y".into(), AvroLiteral::Double(2.5));
3457        assert_eq!(l, AvroLiteral::Map(expected));
3458        // Not object -> error
3459        let err = dt_map
3460            .parse_and_store_default(&serde_json::json!(123))
3461            .unwrap_err();
3462        assert!(err.to_string().contains("JSON object"));
3463        let mut field_a = AvroField {
3464            name: "a".into(),
3465            data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
3466        };
3467        let field_b = AvroField {
3468            name: "b".into(),
3469            data_type: AvroDataType::new(
3470                Codec::Int64,
3471                HashMap::new(),
3472                Some(Nullability::NullFirst),
3473            ),
3474        };
3475        let mut c_md = HashMap::new();
3476        c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
3477        let field_c = AvroField {
3478            name: "c".into(),
3479            data_type: AvroDataType::new(Codec::Utf8, c_md, None),
3480        };
3481        field_a.data_type.metadata.insert("doc".into(), "na".into());
3482        let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
3483        let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
3484        let default_obj = serde_json::json!({"a": 7});
3485        let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
3486        let mut expected = IndexMap::new();
3487        expected.insert("a".into(), AvroLiteral::Int(7));
3488        expected.insert("b".into(), AvroLiteral::Null);
3489        expected.insert("c".into(), AvroLiteral::String("xyz".into()));
3490        assert_eq!(l, AvroLiteral::Map(expected));
3491        assert_default_stored(&dt_struct, &default_obj);
3492        let req_field = AvroField {
3493            name: "req".into(),
3494            data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
3495        };
3496        let mut dt_bad = AvroDataType::new(
3497            Codec::Struct(Arc::from(vec![req_field])),
3498            HashMap::new(),
3499            None,
3500        );
3501        let err = dt_bad
3502            .parse_and_store_default(&serde_json::json!({}))
3503            .unwrap_err();
3504        assert!(
3505            err.to_string().contains("missing required subfield 'req'"),
3506            "unexpected error: {err}"
3507        );
3508        let err = dt_struct
3509            .parse_and_store_default(&serde_json::json!(10))
3510            .unwrap_err();
3511        err.to_string().contains("must be a JSON object");
3512    }
3513
3514    #[test]
3515    fn test_resolve_array_promotion_and_reader_metadata() {
3516        let mut w_add: HashMap<&str, Value> = HashMap::new();
3517        w_add.insert("who", json_string("writer"));
3518        let mut r_add: HashMap<&str, Value> = HashMap::new();
3519        r_add.insert("who", json_string("reader"));
3520        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3521            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3522            attributes: Attributes {
3523                logical_type: None,
3524                additional: w_add,
3525            },
3526        }));
3527        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3528            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
3529            attributes: Attributes {
3530                logical_type: None,
3531                additional: r_add,
3532            },
3533        }));
3534        let mut maker = Maker::new(false, false);
3535        let dt = maker
3536            .make_data_type(&writer_schema, Some(&reader_schema), None)
3537            .unwrap();
3538        assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
3539        if let Codec::List(inner) = dt.codec() {
3540            assert!(matches!(inner.codec(), Codec::Int64));
3541            assert_eq!(
3542                inner.resolution,
3543                Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3544            );
3545        } else {
3546            panic!("expected list codec");
3547        }
3548    }
3549
3550    #[test]
3551    fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
3552        let writer_schema = Schema::Complex(ComplexType::Array(Array {
3553            items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3554            attributes: Attributes::default(),
3555        }));
3556        let reader_schema = Schema::Complex(ComplexType::Array(Array {
3557            items: Box::new(mk_union(vec![
3558                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3559                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3560            ])),
3561            attributes: Attributes::default(),
3562        }));
3563        let mut maker = Maker::new(false, false);
3564        let dt = maker
3565            .make_data_type(&writer_schema, Some(&reader_schema), None)
3566            .unwrap();
3567        if let Codec::List(inner) = dt.codec() {
3568            assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
3569            assert!(matches!(inner.codec(), Codec::Int32));
3570            match inner.resolution.as_ref() {
3571                Some(ResolutionInfo::Promotion(Promotion::Direct)) => {}
3572                other => panic!("expected Union resolution, got {other:?}"),
3573            }
3574        } else {
3575            panic!("expected List codec");
3576        }
3577    }
3578
3579    #[test]
3580    fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3581        let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3582            name: "MD5",
3583            namespace: None,
3584            aliases: vec!["Hash16"],
3585            size: 16,
3586            attributes: Attributes::default(),
3587        }));
3588        let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3589            name: "Hash16",
3590            namespace: None,
3591            aliases: vec![],
3592            size: 16,
3593            attributes: Attributes::default(),
3594        }));
3595        let mut maker = Maker::new(false, false);
3596        let dt = maker
3597            .make_data_type(&writer_schema, Some(&reader_schema), None)
3598            .unwrap();
3599        assert!(matches!(dt.codec(), Codec::Fixed(16)));
3600    }
3601
3602    #[cfg(feature = "avro_custom_types")]
3603    #[test]
3604    fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3605        let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3606            name: "ArrowIntervalMDN",
3607            namespace: None,
3608            aliases: vec![],
3609            size: 16,
3610            attributes: Attributes {
3611                logical_type: Some("arrow.interval-month-day-nano"),
3612                additional: Default::default(),
3613            },
3614        }));
3615        let mut maker = Maker::new(false, false);
3616        let dt = maker.make_data_type(&schema, None, None).unwrap();
3617        assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3618        assert_eq!(
3619            dt.codec.data_type(),
3620            DataType::Interval(IntervalUnit::MonthDayNano)
3621        );
3622    }
3623
3624    #[test]
3625    fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3626        let writer = Schema::Complex(ComplexType::Record(Record {
3627            name: "R",
3628            namespace: None,
3629            doc: None,
3630            aliases: vec![],
3631            fields: vec![
3632                crate::schema::Field {
3633                    name: "a",
3634                    doc: None,
3635                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3636                    default: None,
3637                    aliases: vec![],
3638                },
3639                crate::schema::Field {
3640                    name: "skipme",
3641                    doc: None,
3642                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3643                    default: None,
3644                    aliases: vec![],
3645                },
3646                crate::schema::Field {
3647                    name: "b",
3648                    doc: None,
3649                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3650                    default: None,
3651                    aliases: vec![],
3652                },
3653            ],
3654            attributes: Attributes::default(),
3655        }));
3656        let reader = Schema::Complex(ComplexType::Record(Record {
3657            name: "R",
3658            namespace: None,
3659            doc: None,
3660            aliases: vec![],
3661            fields: vec![
3662                crate::schema::Field {
3663                    name: "b",
3664                    doc: None,
3665                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3666                    default: None,
3667                    aliases: vec![],
3668                },
3669                crate::schema::Field {
3670                    name: "a",
3671                    doc: None,
3672                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3673                    default: None,
3674                    aliases: vec![],
3675                },
3676                crate::schema::Field {
3677                    name: "name",
3678                    doc: None,
3679                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3680                    default: Some(json_string("anon")),
3681                    aliases: vec![],
3682                },
3683                crate::schema::Field {
3684                    name: "opt",
3685                    doc: None,
3686                    r#type: Schema::Union(vec![
3687                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3688                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3689                    ]),
3690                    default: None, // should default to null because NullFirst
3691                    aliases: vec![],
3692                },
3693            ],
3694            attributes: Attributes::default(),
3695        }));
3696        let mut maker = Maker::new(false, false);
3697        let dt = maker
3698            .make_data_type(&writer, Some(&reader), None)
3699            .expect("record resolution");
3700        let fields = match dt.codec() {
3701            Codec::Struct(f) => f,
3702            other => panic!("expected struct, got {other:?}"),
3703        };
3704        assert_eq!(fields.len(), 4);
3705        assert_eq!(fields[0].name(), "b");
3706        assert_eq!(fields[1].name(), "a");
3707        assert_eq!(fields[2].name(), "name");
3708        assert_eq!(fields[3].name(), "opt");
3709        assert!(matches!(
3710            fields[1].data_type().resolution,
3711            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3712        ));
3713        let rec = match dt.resolution {
3714            Some(ResolutionInfo::Record(ref r)) => r.clone(),
3715            other => panic!("expected record resolution, got {other:?}"),
3716        };
3717        assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
3718        assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3719        assert!(rec.skip_fields[0].is_none());
3720        assert!(rec.skip_fields[2].is_none());
3721        let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
3722        assert!(matches!(skip1.codec(), Codec::Utf8));
3723        let name_md = &fields[2].data_type().metadata;
3724        assert_eq!(
3725            name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3726            Some(&"\"anon\"".to_string())
3727        );
3728        let opt_md = &fields[3].data_type().metadata;
3729        assert_eq!(
3730            opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3731            Some(&"null".to_string())
3732        );
3733    }
3734
3735    #[test]
3736    fn test_named_type_alias_resolution_record_cross_namespace() {
3737        let writer_record = Record {
3738            name: "PersonV2",
3739            namespace: Some("com.example.v2"),
3740            doc: None,
3741            aliases: vec!["com.example.Person"],
3742            fields: vec![
3743                AvroFieldSchema {
3744                    name: "name",
3745                    doc: None,
3746                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3747                    default: None,
3748                    aliases: vec![],
3749                },
3750                AvroFieldSchema {
3751                    name: "age",
3752                    doc: None,
3753                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3754                    default: None,
3755                    aliases: vec![],
3756                },
3757            ],
3758            attributes: Attributes::default(),
3759        };
3760        let reader_record = Record {
3761            name: "Person",
3762            namespace: Some("com.example"),
3763            doc: None,
3764            aliases: vec![],
3765            fields: writer_record.fields.clone(),
3766            attributes: Attributes::default(),
3767        };
3768        let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3769        let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3770        let mut maker = Maker::new(false, false);
3771        let result = maker
3772            .make_data_type(&writer_schema, Some(&reader_schema), None)
3773            .expect("record alias resolution should succeed");
3774        match result.codec {
3775            Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3776            other => panic!("expected struct, got {other:?}"),
3777        }
3778    }
3779
3780    #[test]
3781    fn test_named_type_alias_resolution_enum_cross_namespace() {
3782        let writer_enum = Enum {
3783            name: "ColorV2",
3784            namespace: Some("org.example.v2"),
3785            doc: None,
3786            aliases: vec!["org.example.Color"],
3787            symbols: vec!["RED", "GREEN", "BLUE"],
3788            default: None,
3789            attributes: Attributes::default(),
3790        };
3791        let reader_enum = Enum {
3792            name: "Color",
3793            namespace: Some("org.example"),
3794            doc: None,
3795            aliases: vec![],
3796            symbols: vec!["RED", "GREEN", "BLUE"],
3797            default: None,
3798            attributes: Attributes::default(),
3799        };
3800        let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3801        let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3802        let mut maker = Maker::new(false, false);
3803        maker
3804            .make_data_type(&writer_schema, Some(&reader_schema), None)
3805            .expect("enum alias resolution should succeed");
3806    }
3807
3808    #[test]
3809    fn test_named_type_alias_resolution_fixed_cross_namespace() {
3810        let writer_fixed = Fixed {
3811            name: "Fx10V2",
3812            namespace: Some("ns.v2"),
3813            aliases: vec!["ns.Fx10"],
3814            size: 10,
3815            attributes: Attributes::default(),
3816        };
3817        let reader_fixed = Fixed {
3818            name: "Fx10",
3819            namespace: Some("ns"),
3820            aliases: vec![],
3821            size: 10,
3822            attributes: Attributes::default(),
3823        };
3824        let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3825        let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3826        let mut maker = Maker::new(false, false);
3827        maker
3828            .make_data_type(&writer_schema, Some(&reader_schema), None)
3829            .expect("fixed alias resolution should succeed");
3830    }
3831}