arrow_avro/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_schema::{
19    ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
20    UnionMode,
21};
22use serde::{Deserialize, Serialize};
23use serde_json::{json, Map as JsonMap, Value};
24#[cfg(feature = "sha256")]
25use sha2::{Digest, Sha256};
26use std::cmp::PartialEq;
27use std::collections::hash_map::Entry;
28use std::collections::{HashMap, HashSet};
29use strum_macros::AsRefStr;
30
31/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
32pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
33
34/// The Confluent "magic" byte (`0x00`)
35pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
36
37/// The maximum possible length of a prefix.
38/// SHA256 (32) + single-object magic (2)
39pub const MAX_PREFIX_LEN: usize = 34;
40
41/// The metadata key used for storing the JSON encoded [`Schema`]
42pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
43
44/// Metadata key used to represent Avro enum symbols in an Arrow schema.
45pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
46
47/// Metadata key used to store the default value of a field in an Avro schema.
48pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
49
50/// Metadata key used to store the name of a type in an Avro schema.
51pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
52
53/// Metadata key used to store the name of a type in an Avro schema.
54pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
55
56/// Metadata key used to store the documentation for a type in an Avro schema.
57pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
58
59/// Default name for the root record in an Avro schema.
60pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
61
62/// Compare two Avro schemas for equality (identical schemas).
63/// Returns true if the schemas have the same parsing canonical form (i.e., logically identical).
64pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result<bool, ArrowError> {
65    let canon_writer = AvroSchema::generate_canonical_form(writer)?;
66    let canon_reader = AvroSchema::generate_canonical_form(reader)?;
67    Ok(canon_writer == canon_reader)
68}
69
70/// Avro types are not nullable, with nullability instead encoded as a union
71/// where one of the variants is the null type.
72///
73/// To accommodate this, we specially case two-variant unions where one of the
74/// variants is the null type, and use this to derive arrow's notion of nullability
75#[derive(Debug, Copy, Clone, PartialEq, Default)]
76pub enum Nullability {
77    /// The nulls are encoded as the first union variant
78    #[default]
79    NullFirst,
80    /// The nulls are encoded as the second union variant
81    NullSecond,
82}
83
84/// Either a [`PrimitiveType`] or a reference to a previously defined named type
85///
86/// <https://avro.apache.org/docs/1.11.1/specification/#names>
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88#[serde(untagged)]
89/// A type name in an Avro schema
90///
91/// This represents the different ways a type can be referenced in an Avro schema.
92pub enum TypeName<'a> {
93    /// A primitive type like null, boolean, int, etc.
94    Primitive(PrimitiveType),
95    /// A reference to another named type
96    Ref(&'a str),
97}
98
99/// A primitive type
100///
101/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
103#[serde(rename_all = "camelCase")]
104#[strum(serialize_all = "lowercase")]
105pub enum PrimitiveType {
106    /// null: no value
107    Null,
108    /// boolean: a binary value
109    Boolean,
110    /// int: 32-bit signed integer
111    Int,
112    /// long: 64-bit signed integer
113    Long,
114    /// float: single precision (32-bit) IEEE 754 floating-point number
115    Float,
116    /// double: double precision (64-bit) IEEE 754 floating-point number
117    Double,
118    /// bytes: sequence of 8-bit unsigned bytes
119    Bytes,
120    /// string: Unicode character sequence
121    String,
122}
123
124/// Additional attributes within a [`Schema`]
125///
126/// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration>
127#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct Attributes<'a> {
130    /// A logical type name
131    ///
132    /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types>
133    #[serde(default)]
134    pub logical_type: Option<&'a str>,
135
136    /// Additional JSON attributes
137    #[serde(flatten)]
138    pub additional: HashMap<&'a str, Value>,
139}
140
141impl Attributes<'_> {
142    /// Returns the field metadata for this [`Attributes`]
143    pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
144        self.additional
145            .iter()
146            .map(|(k, v)| (k.to_string(), v.to_string()))
147            .collect()
148    }
149}
150
151/// A type definition that is not a variant of [`ComplexType`]
152#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
153#[serde(rename_all = "camelCase")]
154pub struct Type<'a> {
155    /// The type of this Avro data structure
156    #[serde(borrow)]
157    pub r#type: TypeName<'a>,
158    /// Additional attributes associated with this type
159    #[serde(flatten)]
160    pub attributes: Attributes<'a>,
161}
162
163/// An Avro schema
164///
165/// This represents the different shapes of Avro schemas as defined in the specification.
166/// See <https://avro.apache.org/docs/1.11.1/specification/#schemas> for more details.
167#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
168#[serde(untagged)]
169pub enum Schema<'a> {
170    /// A direct type name (primitive or reference)
171    #[serde(borrow)]
172    TypeName(TypeName<'a>),
173    /// A union of multiple schemas (e.g., ["null", "string"])
174    #[serde(borrow)]
175    Union(Vec<Schema<'a>>),
176    /// A complex type such as record, array, map, etc.
177    #[serde(borrow)]
178    Complex(ComplexType<'a>),
179    /// A type with attributes
180    #[serde(borrow)]
181    Type(Type<'a>),
182}
183
184/// A complex type
185///
186/// <https://avro.apache.org/docs/1.11.1/specification/#complex-types>
187#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
188#[serde(tag = "type", rename_all = "camelCase")]
189pub enum ComplexType<'a> {
190    /// Record type: a sequence of fields with names and types
191    #[serde(borrow)]
192    Record(Record<'a>),
193    /// Enum type: a set of named values
194    #[serde(borrow)]
195    Enum(Enum<'a>),
196    /// Array type: a sequence of values of the same type
197    #[serde(borrow)]
198    Array(Array<'a>),
199    /// Map type: a mapping from strings to values of the same type
200    #[serde(borrow)]
201    Map(Map<'a>),
202    /// Fixed type: a fixed-size byte array
203    #[serde(borrow)]
204    Fixed(Fixed<'a>),
205}
206
207/// A record
208///
209/// <https://avro.apache.org/docs/1.11.1/specification/#schema-record>
210#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
211pub struct Record<'a> {
212    /// Name of the record
213    #[serde(borrow)]
214    pub name: &'a str,
215    /// Optional namespace for the record, provides a way to organize names
216    #[serde(borrow, default)]
217    pub namespace: Option<&'a str>,
218    /// Optional documentation string for the record
219    #[serde(borrow, default)]
220    pub doc: Option<&'a str>,
221    /// Alternative names for this record
222    #[serde(borrow, default)]
223    pub aliases: Vec<&'a str>,
224    /// The fields contained in this record
225    #[serde(borrow)]
226    pub fields: Vec<Field<'a>>,
227    /// Additional attributes for this record
228    #[serde(flatten)]
229    pub attributes: Attributes<'a>,
230}
231
232/// A field within a [`Record`]
233#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
234pub struct Field<'a> {
235    /// Name of the field within the record
236    #[serde(borrow)]
237    pub name: &'a str,
238    /// Optional documentation for this field
239    #[serde(borrow, default)]
240    pub doc: Option<&'a str>,
241    /// The field's type definition
242    #[serde(borrow)]
243    pub r#type: Schema<'a>,
244    /// Optional default value for this field
245    #[serde(default)]
246    pub default: Option<Value>,
247}
248
249/// An enumeration
250///
251/// <https://avro.apache.org/docs/1.11.1/specification/#enums>
252#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
253pub struct Enum<'a> {
254    /// Name of the enum
255    #[serde(borrow)]
256    pub name: &'a str,
257    /// Optional namespace for the enum, provides organizational structure
258    #[serde(borrow, default)]
259    pub namespace: Option<&'a str>,
260    /// Optional documentation string describing the enum
261    #[serde(borrow, default)]
262    pub doc: Option<&'a str>,
263    /// Alternative names for this enum
264    #[serde(borrow, default)]
265    pub aliases: Vec<&'a str>,
266    /// The symbols (values) that this enum can have
267    #[serde(borrow)]
268    pub symbols: Vec<&'a str>,
269    /// Optional default value for this enum
270    #[serde(borrow, default)]
271    pub default: Option<&'a str>,
272    /// Additional attributes for this enum
273    #[serde(flatten)]
274    pub attributes: Attributes<'a>,
275}
276
277/// An array
278///
279/// <https://avro.apache.org/docs/1.11.1/specification/#arrays>
280#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
281pub struct Array<'a> {
282    /// The schema for items in this array
283    #[serde(borrow)]
284    pub items: Box<Schema<'a>>,
285    /// Additional attributes for this array
286    #[serde(flatten)]
287    pub attributes: Attributes<'a>,
288}
289
290/// A map
291///
292/// <https://avro.apache.org/docs/1.11.1/specification/#maps>
293#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
294pub struct Map<'a> {
295    /// The schema for values in this map
296    #[serde(borrow)]
297    pub values: Box<Schema<'a>>,
298    /// Additional attributes for this map
299    #[serde(flatten)]
300    pub attributes: Attributes<'a>,
301}
302
303/// A fixed length binary array
304///
305/// <https://avro.apache.org/docs/1.11.1/specification/#fixed>
306#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
307pub struct Fixed<'a> {
308    /// Name of the fixed type
309    #[serde(borrow)]
310    pub name: &'a str,
311    /// Optional namespace for the fixed type
312    #[serde(borrow, default)]
313    pub namespace: Option<&'a str>,
314    /// Alternative names for this fixed type
315    #[serde(borrow, default)]
316    pub aliases: Vec<&'a str>,
317    /// The number of bytes in this fixed type
318    pub size: usize,
319    /// Additional attributes for this fixed type
320    #[serde(flatten)]
321    pub attributes: Attributes<'a>,
322}
323
324/// A wrapper for an Avro schema in its JSON string representation.
325#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
326pub struct AvroSchema {
327    /// The Avro schema as a JSON string.
328    pub json_string: String,
329}
330
331impl TryFrom<&ArrowSchema> for AvroSchema {
332    type Error = ArrowError;
333
334    /// Converts an `ArrowSchema` to `AvroSchema`, delegating to
335    /// `AvroSchema::from_arrow_with_options` with `None` so that the
336    /// union null ordering is decided by `Nullability::default()`.
337    fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
338        AvroSchema::from_arrow_with_options(schema, None)
339    }
340}
341
342impl AvroSchema {
343    /// Creates a new `AvroSchema` from a JSON string.
344    pub fn new(json_string: String) -> Self {
345        Self { json_string }
346    }
347
348    pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
349        serde_json::from_str(self.json_string.as_str())
350            .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
351    }
352
353    /// Returns the fingerprint of the schema, computed using the specified [`FingerprintAlgorithm`].
354    ///
355    /// The fingerprint is computed over the schema's Parsed Canonical Form
356    /// as defined by the Avro specification. Depending on `hash_type`, this
357    /// will return one of the supported [`Fingerprint`] variants:
358    /// - [`Fingerprint::Rabin`] for [`FingerprintAlgorithm::Rabin`]
359    /// - `Fingerprint::MD5` for `FingerprintAlgorithm::MD5`
360    /// - `Fingerprint::SHA256` for `FingerprintAlgorithm::SHA256`
361    ///
362    /// Note: [`FingerprintAlgorithm::None`] cannot be used to generate a fingerprint
363    /// and will result in an error. If you intend to use a Schema Registry ID-based
364    /// wire format, load or set the [`Fingerprint::Id`] directly via [`Fingerprint::load_fingerprint_id`]
365    /// or [`SchemaStore::set`].
366    ///
367    /// See also: <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
368    ///
369    /// # Errors
370    /// Returns an error if deserializing the schema fails, if generating the
371    /// canonical form of the schema fails, or if `hash_type` is [`FingerprintAlgorithm::None`].
372    ///
373    /// # Examples
374    /// ```
375    /// use arrow_avro::schema::{AvroSchema, FingerprintAlgorithm};
376    ///
377    /// let avro = AvroSchema::new("\"string\"".to_string());
378    /// let fp = avro.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
379    /// ```
380    pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
381        Self::generate_fingerprint(&self.schema()?, hash_type)
382    }
383
384    pub(crate) fn generate_fingerprint(
385        schema: &Schema,
386        hash_type: FingerprintAlgorithm,
387    ) -> Result<Fingerprint, ArrowError> {
388        let canonical = Self::generate_canonical_form(schema).map_err(|e| {
389            ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
390        })?;
391        match hash_type {
392            FingerprintAlgorithm::Rabin => {
393                Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
394            }
395            FingerprintAlgorithm::None => Err(ArrowError::SchemaError(
396                "FingerprintAlgorithm of None cannot be used to generate a fingerprint; \
397                if using Fingerprint::Id, pass the registry ID in instead using the set method."
398                    .to_string(),
399            )),
400            #[cfg(feature = "md5")]
401            FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
402            #[cfg(feature = "sha256")]
403            FingerprintAlgorithm::SHA256 => {
404                Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
405            }
406        }
407    }
408
409    /// Generates the 64-bit Rabin fingerprint for the given `Schema`.
410    ///
411    /// The fingerprint is computed from the canonical form of the schema.
412    /// This is also known as `CRC-64-AVRO`.
413    ///
414    /// # Returns
415    /// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint.
416    pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, ArrowError> {
417        Self::generate_fingerprint(schema, FingerprintAlgorithm::Rabin)
418    }
419
420    /// Generates the Parsed Canonical Form for the given [`Schema`].
421    ///
422    /// The canonical form is a standardized JSON representation of the schema,
423    /// primarily used for generating a schema fingerprint for equality checking.
424    ///
425    /// This form strips attributes that do not affect the schema's identity,
426    /// such as `doc` fields, `aliases`, and any properties not defined in the
427    /// Avro specification.
428    ///
429    /// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
430    pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
431        build_canonical(schema, None)
432    }
433
434    /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given null‑union order.
435    ///
436    /// If the input Arrow schema already contains Avro JSON in
437    /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
438    /// the exact header encoding alignment; otherwise, a new JSON is generated
439    /// honoring `null_union_order` at **all nullable sites**.
440    pub fn from_arrow_with_options(
441        schema: &ArrowSchema,
442        null_order: Option<Nullability>,
443    ) -> Result<AvroSchema, ArrowError> {
444        if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
445            return Ok(AvroSchema::new(json.clone()));
446        }
447        let order = null_order.unwrap_or_default();
448        let mut name_gen = NameGenerator::default();
449        let fields_json = schema
450            .fields()
451            .iter()
452            .map(|f| arrow_field_to_avro(f, &mut name_gen, order))
453            .collect::<Result<Vec<_>, _>>()?;
454        let record_name = schema
455            .metadata
456            .get(AVRO_NAME_METADATA_KEY)
457            .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
458        let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
459        record.insert("type".into(), Value::String("record".into()));
460        record.insert(
461            "name".into(),
462            Value::String(sanitise_avro_name(record_name)),
463        );
464        if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
465            record.insert("namespace".into(), Value::String(ns.clone()));
466        }
467        if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
468            record.insert("doc".into(), Value::String(doc.clone()));
469        }
470        record.insert("fields".into(), Value::Array(fields_json));
471        extend_with_passthrough_metadata(&mut record, &schema.metadata);
472        let json_string = serde_json::to_string(&Value::Object(record))
473            .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
474        Ok(AvroSchema::new(json_string))
475    }
476}
477
478/// A stack-allocated, fixed-size buffer for the prefix.
479#[derive(Debug, Copy, Clone)]
480pub struct Prefix {
481    buf: [u8; MAX_PREFIX_LEN],
482    len: u8,
483}
484
485impl Prefix {
486    #[inline]
487    pub(crate) fn as_slice(&self) -> &[u8] {
488        &self.buf[..self.len as usize]
489    }
490}
491
492/// Defines the strategy for generating the per-record prefix for an Avro binary stream.
493#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
494pub enum FingerprintStrategy {
495    /// Use the 64-bit Rabin fingerprint (default for single-object encoding).
496    #[default]
497    Rabin,
498    /// Use a Confluent Schema Registry 32-bit ID.
499    Id(u32),
500    #[cfg(feature = "md5")]
501    /// Use the 128-bit MD5 fingerprint.
502    MD5,
503    #[cfg(feature = "sha256")]
504    /// Use the 256-bit SHA-256 fingerprint.
505    SHA256,
506}
507
508impl From<Fingerprint> for FingerprintStrategy {
509    fn from(f: Fingerprint) -> Self {
510        Self::from(&f)
511    }
512}
513
514impl From<FingerprintAlgorithm> for FingerprintStrategy {
515    fn from(f: FingerprintAlgorithm) -> Self {
516        match f {
517            FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
518            FingerprintAlgorithm::None => FingerprintStrategy::Id(0),
519            #[cfg(feature = "md5")]
520            FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
521            #[cfg(feature = "sha256")]
522            FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
523        }
524    }
525}
526
527impl From<&Fingerprint> for FingerprintStrategy {
528    fn from(f: &Fingerprint) -> Self {
529        match f {
530            Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
531            Fingerprint::Id(id) => FingerprintStrategy::Id(*id),
532            #[cfg(feature = "md5")]
533            Fingerprint::MD5(_) => FingerprintStrategy::MD5,
534            #[cfg(feature = "sha256")]
535            Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
536        }
537    }
538}
539
540/// Supported fingerprint algorithms for Avro schema identification.
541/// For use with Confluent Schema Registry IDs, set to None.
542#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
543pub enum FingerprintAlgorithm {
544    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
545    #[default]
546    Rabin,
547    /// Represents a fingerprint not based on a hash algorithm, (e.g., a 32-bit Schema Registry ID.)
548    None,
549    #[cfg(feature = "md5")]
550    /// 128-bit MD5 message digest.
551    MD5,
552    #[cfg(feature = "sha256")]
553    /// 256-bit SHA-256 digest.
554    SHA256,
555}
556
557/// Allow easy extraction of the algorithm used to create a fingerprint.
558impl From<&Fingerprint> for FingerprintAlgorithm {
559    fn from(fp: &Fingerprint) -> Self {
560        match fp {
561            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
562            Fingerprint::Id(_) => FingerprintAlgorithm::None,
563            #[cfg(feature = "md5")]
564            Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
565            #[cfg(feature = "sha256")]
566            Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
567        }
568    }
569}
570
571impl From<FingerprintStrategy> for FingerprintAlgorithm {
572    fn from(s: FingerprintStrategy) -> Self {
573        Self::from(&s)
574    }
575}
576
577impl From<&FingerprintStrategy> for FingerprintAlgorithm {
578    fn from(s: &FingerprintStrategy) -> Self {
579        match s {
580            FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
581            FingerprintStrategy::Id(_) => FingerprintAlgorithm::None,
582            #[cfg(feature = "md5")]
583            FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
584            #[cfg(feature = "sha256")]
585            FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
586        }
587    }
588}
589
590/// A schema fingerprint in one of the supported formats.
591///
592/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
593/// instance always stores only one variant, matching its configured
594/// `FingerprintAlgorithm`, but the enum makes the API uniform.
595///
596/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
597/// <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
598#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
599pub enum Fingerprint {
600    /// A 64-bit Rabin fingerprint.
601    Rabin(u64),
602    /// A 32-bit Schema Registry ID.
603    Id(u32),
604    #[cfg(feature = "md5")]
605    /// A 128-bit MD5 fingerprint.
606    MD5([u8; 16]),
607    #[cfg(feature = "sha256")]
608    /// A 256-bit SHA-256 fingerprint.
609    SHA256([u8; 32]),
610}
611
612impl From<FingerprintStrategy> for Fingerprint {
613    fn from(s: FingerprintStrategy) -> Self {
614        Self::from(&s)
615    }
616}
617
618impl From<&FingerprintStrategy> for Fingerprint {
619    fn from(s: &FingerprintStrategy) -> Self {
620        match s {
621            FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
622            FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
623            #[cfg(feature = "md5")]
624            FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
625            #[cfg(feature = "sha256")]
626            FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
627        }
628    }
629}
630
631impl From<FingerprintAlgorithm> for Fingerprint {
632    fn from(s: FingerprintAlgorithm) -> Self {
633        match s {
634            FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
635            FingerprintAlgorithm::None => Fingerprint::Id(0),
636            #[cfg(feature = "md5")]
637            FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
638            #[cfg(feature = "sha256")]
639            FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
640        }
641    }
642}
643
644impl Fingerprint {
645    /// Loads the 32-bit Schema Registry fingerprint (Confluent Schema Registry ID).
646    ///
647    /// The provided `id` is in big-endian wire order; this converts it to host order
648    /// and returns `Fingerprint::Id`.
649    ///
650    /// # Returns
651    /// A `Fingerprint::Id` variant containing the 32-bit fingerprint.
652    pub fn load_fingerprint_id(id: u32) -> Self {
653        Fingerprint::Id(u32::from_be(id))
654    }
655
656    /// Constructs a serialized prefix represented as a `Vec<u8>` based on the variant of the enum.
657    ///
658    /// This method serializes data in different formats depending on the variant of `self`:
659    /// - **`Id(id)`**: Uses the Confluent wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
660    ///   followed by the big-endian byte representation of the `id`.
661    /// - **`Rabin(val)`**: Uses the Avro single-object specification format. This includes a different magic header
662    ///   (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte representation of the `val`.
663    /// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard extension that adds the
664    ///   `SINGLE_OBJECT_MAGIC` header followed by the provided `bytes`.
665    /// - **`SHA256(bytes)`** (optional, `sha256` feature enabled): Similar to the `MD5` variant, this is
666    ///   a non-standard extension that attaches the `SINGLE_OBJECT_MAGIC` header followed by the given `bytes`.
667    ///
668    /// # Returns
669    ///
670    /// A `Prefix` containing the serialized prefix data.
671    ///
672    /// # Features
673    ///
674    /// - You can optionally enable the `md5` feature to include the `MD5` variant.
675    /// - You can optionally enable the `sha256` feature to include the `SHA256` variant.
676    ///
677    pub fn make_prefix(&self) -> Prefix {
678        let mut buf = [0u8; MAX_PREFIX_LEN];
679        let len = match self {
680            Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
681            Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
682            #[cfg(feature = "md5")]
683            Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
684            #[cfg(feature = "sha256")]
685            Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
686        };
687        Prefix { buf, len }
688    }
689}
690
691fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
692    buf: &mut [u8; MAX_PREFIX_LEN],
693    magic: &[u8; MAGIC_LEN],
694    payload: &[u8; PAYLOAD_LEN],
695) -> u8 {
696    debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
697    let total = MAGIC_LEN + PAYLOAD_LEN;
698    let prefix_slice = &mut buf[..total];
699    prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
700    prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
701    total as u8
702}
703
704/// An in-memory cache of Avro schemas, indexed by their fingerprint.
705///
706/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently.
707/// Each schema is associated with a unique [`Fingerprint`], which is generated based
708/// on the schema's canonical form and a specific hashing algorithm.
709///
710/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin,
711/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations.
712/// This ensures consistency when generating fingerprints and looking up schemas.
713/// All schemas registered will have their fingerprint computed with this algorithm, and
714/// lookups must use a matching fingerprint.
715///
716/// # Examples
717///
718/// ```no_run
719/// // Create a new store with the default Rabin fingerprinting.
720/// use arrow_avro::schema::{AvroSchema, SchemaStore};
721///
722/// let mut store = SchemaStore::new();
723/// let schema = AvroSchema::new("\"string\"".to_string());
724/// // Register the schema to get its fingerprint.
725/// let fingerprint = store.register(schema.clone()).unwrap();
726/// // Use the fingerprint to look up the schema.
727/// let retrieved_schema = store.lookup(&fingerprint).cloned();
728/// assert_eq!(retrieved_schema, Some(schema));
729/// ```
730#[derive(Debug, Clone, Default)]
731pub struct SchemaStore {
732    /// The hashing algorithm used for generating fingerprints.
733    fingerprint_algorithm: FingerprintAlgorithm,
734    /// A map from a schema's fingerprint to the schema itself.
735    schemas: HashMap<Fingerprint, AvroSchema>,
736}
737
738impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
739    type Error = ArrowError;
740
741    /// Creates a `SchemaStore` from a HashMap of schemas.
742    /// Each schema in the HashMap is registered with the new store.
743    fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
744        Ok(Self {
745            schemas,
746            ..Self::default()
747        })
748    }
749}
750
751impl SchemaStore {
752    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
753    pub fn new() -> Self {
754        Self::default()
755    }
756
757    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
758    pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
759        Self {
760            fingerprint_algorithm,
761            ..Self::default()
762        }
763    }
764
765    /// Registers a schema with the store and the provided fingerprint.
766    /// Note: Confluent wire format implementations should leverage this method.
767    ///
768    /// A schema is set in the store, using the provided fingerprint. If a schema
769    /// with the same fingerprint does not already exist in the store, the new schema
770    /// is inserted. If the fingerprint already exists, the existing schema is not overwritten.
771    ///
772    /// # Arguments
773    ///
774    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to register.
775    /// * `schema` - The `AvroSchema` to register.
776    ///
777    /// # Returns
778    ///
779    /// A `Result` returning the provided `Fingerprint` of the schema if successful,
780    /// or an `ArrowError` on failure.
781    pub fn set(
782        &mut self,
783        fingerprint: Fingerprint,
784        schema: AvroSchema,
785    ) -> Result<Fingerprint, ArrowError> {
786        match self.schemas.entry(fingerprint) {
787            Entry::Occupied(entry) => {
788                if entry.get() != &schema {
789                    return Err(ArrowError::ComputeError(format!(
790                        "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
791                    )));
792                }
793            }
794            Entry::Vacant(entry) => {
795                entry.insert(schema);
796            }
797        }
798        Ok(fingerprint)
799    }
800
801    /// Registers a schema with the store and returns its fingerprint.
802    ///
803    /// A fingerprint is calculated for the given schema using the store's configured
804    /// hash type. If a schema with the same fingerprint does not already exist in the
805    /// store, the new schema is inserted. If the fingerprint already exists, the
806    /// existing schema is not overwritten. If FingerprintAlgorithm is set to None, this
807    /// method will return an error. Confluent wire format implementations should leverage the
808    /// set method instead.
809    ///
810    /// # Arguments
811    ///
812    /// * `schema` - The `AvroSchema` to register.
813    ///
814    /// # Returns
815    ///
816    /// A `Result` containing the `Fingerprint` of the schema if successful,
817    /// or an `ArrowError` on failure.
818    pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
819        if self.fingerprint_algorithm == FingerprintAlgorithm::None {
820            return Err(ArrowError::SchemaError(
821                "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
822            Use the set method directly instead, providing a valid fingerprint"
823                    .to_string(),
824            ));
825        }
826        let fingerprint =
827            AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
828        self.set(fingerprint, schema)?;
829        Ok(fingerprint)
830    }
831
832    /// Looks up a schema by its `Fingerprint`.
833    ///
834    /// # Arguments
835    ///
836    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to look up.
837    ///
838    /// # Returns
839    ///
840    /// An `Option` containing a clone of the `AvroSchema` if found, otherwise `None`.
841    pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
842        self.schemas.get(fingerprint)
843    }
844
845    /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently
846    /// held by this [`SchemaStore`].
847    ///
848    /// The order of the returned fingerprints is unspecified and should not be
849    /// relied upon.
850    pub fn fingerprints(&self) -> Vec<Fingerprint> {
851        self.schemas.keys().copied().collect()
852    }
853
854    /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting.
855    pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
856        self.fingerprint_algorithm
857    }
858}
859
860fn quote(s: &str) -> Result<String, ArrowError> {
861    serde_json::to_string(s)
862        .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
863}
864
865// Avro names are defined by a `name` and an optional `namespace`.
866// The full name is composed of the namespace and the name, separated by a dot.
867//
868// Avro specification defines two ways to specify a full name:
869// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
870//    In this case, the `namespace` attribute is ignored.
871// 2. The `name` attribute contains the simple name (e.g., "d") and the
872//    `namespace` attribute contains the namespace (e.g., "a.b.c").
873//
874// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
875// Complex paths with quotes or backticks like `a."hi".b` are not supported.
876//
877// This function constructs the full name and extracts the namespace,
878// handling both ways of specifying the name. It prioritizes a namespace
879// defined within the `name` attribute itself, then the explicit `namespace_attr`,
880// and finally the `enclosing_ns`.
881pub(crate) fn make_full_name(
882    name: &str,
883    namespace_attr: Option<&str>,
884    enclosing_ns: Option<&str>,
885) -> (String, Option<String>) {
886    // `name` already contains a dot then treat as full-name, ignore namespace.
887    if let Some((ns, _)) = name.rsplit_once('.') {
888        return (name.to_string(), Some(ns.to_string()));
889    }
890    match namespace_attr.or(enclosing_ns) {
891        Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
892        None => (name.to_string(), None),
893    }
894}
895
896fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
897    Ok(match schema {
898        Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
899            TypeName::Primitive(pt) => quote(pt.as_ref())?,
900            TypeName::Ref(name) => {
901                let (full_name, _) = make_full_name(name, None, enclosing_ns);
902                quote(&full_name)?
903            }
904        },
905        Schema::Union(branches) => format!(
906            "[{}]",
907            branches
908                .iter()
909                .map(|b| build_canonical(b, enclosing_ns))
910                .collect::<Result<Vec<_>, _>>()?
911                .join(",")
912        ),
913        Schema::Complex(ct) => match ct {
914            ComplexType::Record(r) => {
915                let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
916                let fields = r
917                    .fields
918                    .iter()
919                    .map(|f| {
920                        let field_type =
921                            build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
922                        Ok(format!(
923                            r#"{{"name":{},"type":{}}}"#,
924                            quote(f.name)?,
925                            field_type
926                        ))
927                    })
928                    .collect::<Result<Vec<_>, ArrowError>>()?
929                    .join(",");
930                format!(
931                    r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
932                    quote(&full_name)?,
933                )
934            }
935            ComplexType::Enum(e) => {
936                let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
937                let symbols = e
938                    .symbols
939                    .iter()
940                    .map(|s| quote(s))
941                    .collect::<Result<Vec<_>, _>>()?
942                    .join(",");
943                format!(
944                    r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
945                    quote(&full_name)?
946                )
947            }
948            ComplexType::Array(arr) => format!(
949                r#"{{"type":"array","items":{}}}"#,
950                build_canonical(&arr.items, enclosing_ns)?
951            ),
952            ComplexType::Map(map) => format!(
953                r#"{{"type":"map","values":{}}}"#,
954                build_canonical(&map.values, enclosing_ns)?
955            ),
956            ComplexType::Fixed(f) => {
957                let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
958                format!(
959                    r#"{{"name":{},"type":"fixed","size":{}}}"#,
960                    quote(&full_name)?,
961                    f.size
962                )
963            }
964        },
965    })
966}
967
968/// 64‑bit Rabin fingerprint as described in the Avro spec.
969const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
970
971/// Build one entry of the polynomial‑division table.
972///
973/// We cannot yet write `for _ in 0..8` here: `for` loops rely on
974/// `Iterator::next`, which is not `const` on stable Rust.  Until the
975/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
976/// loop is the only option in a `const fn`
977const fn one_entry(i: usize) -> u64 {
978    let mut fp = i as u64;
979    let mut j = 0;
980    while j < 8 {
981        fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
982        j += 1;
983    }
984    fp
985}
986
987/// Build the full 256‑entry table at compile time.
988///
989/// We cannot yet write `for _ in 0..256` here: `for` loops rely on
990/// `Iterator::next`, which is not `const` on stable Rust.  Until the
991/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
992/// loop is the only option in a `const fn`
993const fn build_table() -> [u64; 256] {
994    let mut table = [0u64; 256];
995    let mut i = 0;
996    while i < 256 {
997        table[i] = one_entry(i);
998        i += 1;
999    }
1000    table
1001}
1002
1003/// The pre‑computed table.
1004static FINGERPRINT_TABLE: [u64; 256] = build_table();
1005
1006/// Computes the 64-bit Rabin fingerprint for a given canonical schema string.
1007/// This implementation is based on the Avro specification for schema fingerprinting.
1008pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1009    let mut fp = EMPTY;
1010    for &byte in canonical_form.as_bytes() {
1011        let idx = ((fp as u8) ^ byte) as usize;
1012        fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1013    }
1014    fp
1015}
1016
1017#[cfg(feature = "md5")]
1018/// Compute the **128‑bit MD5** fingerprint of the canonical form.
1019///
1020/// Returns a 16‑byte array (`[u8; 16]`) containing the full MD5 digest,
1021/// exactly as required by the Avro specification.
1022#[inline]
1023pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1024    let digest = md5::compute(canonical_form.as_bytes());
1025    digest.0
1026}
1027
1028#[cfg(feature = "sha256")]
1029/// Compute the **256‑bit SHA‑256** fingerprint of the canonical form.
1030///
1031/// Returns a 32‑byte array (`[u8; 32]`) containing the full SHA‑256 digest.
1032#[inline]
1033pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1034    let mut hasher = Sha256::new();
1035    hasher.update(canonical_form.as_bytes());
1036    let digest = hasher.finalize();
1037    digest.into()
1038}
1039
1040#[inline]
1041fn is_internal_arrow_key(key: &str) -> bool {
1042    key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1043}
1044
1045/// Copies Arrow schema metadata entries to the provided JSON map,
1046/// skipping keys that are Avro-reserved, internal Arrow keys, or
1047/// nested under the `avro.schema.` namespace. Values that parse as
1048/// JSON are inserted as JSON; otherwise the raw string is preserved.
1049fn extend_with_passthrough_metadata(
1050    target: &mut JsonMap<String, Value>,
1051    metadata: &HashMap<String, String>,
1052) {
1053    for (meta_key, meta_val) in metadata {
1054        if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1055            continue;
1056        }
1057        let json_val =
1058            serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1059        target.insert(meta_key.clone(), json_val);
1060    }
1061}
1062
1063// Sanitize an arbitrary string so it is a valid Avro field or type name
1064fn sanitise_avro_name(base_name: &str) -> String {
1065    if base_name.is_empty() {
1066        return "_".to_owned();
1067    }
1068    let mut out: String = base_name
1069        .chars()
1070        .map(|char| {
1071            if char.is_ascii_alphanumeric() || char == '_' {
1072                char
1073            } else {
1074                '_'
1075            }
1076        })
1077        .collect();
1078    if out.as_bytes()[0].is_ascii_digit() {
1079        out.insert(0, '_');
1080    }
1081    out
1082}
1083
1084#[derive(Default)]
1085struct NameGenerator {
1086    used: HashSet<String>,
1087    counters: HashMap<String, usize>,
1088}
1089
1090impl NameGenerator {
1091    fn make_unique(&mut self, field_name: &str) -> String {
1092        let field_name = sanitise_avro_name(field_name);
1093        if self.used.insert(field_name.clone()) {
1094            self.counters.insert(field_name.clone(), 1);
1095            return field_name;
1096        }
1097        let counter = self.counters.entry(field_name.clone()).or_insert(1);
1098        loop {
1099            let candidate = format!("{field_name}_{}", *counter);
1100            if self.used.insert(candidate.clone()) {
1101                return candidate;
1102            }
1103            *counter += 1;
1104        }
1105    }
1106}
1107
1108fn merge_extras(schema: Value, mut extras: JsonMap<String, Value>) -> Value {
1109    if extras.is_empty() {
1110        return schema;
1111    }
1112    match schema {
1113        Value::Object(mut map) => {
1114            map.extend(extras);
1115            Value::Object(map)
1116        }
1117        Value::Array(mut union) => {
1118            // For unions, we cannot attach attributes to the array itself (per Avro spec).
1119            // As a fallback for extension metadata, attach extras to the first non-null branch object.
1120            if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1121                let original = std::mem::take(non_null);
1122                *non_null = merge_extras(original, extras);
1123            }
1124            Value::Array(union)
1125        }
1126        primitive => {
1127            let mut map = JsonMap::with_capacity(extras.len() + 1);
1128            map.insert("type".into(), primitive);
1129            map.extend(extras);
1130            Value::Object(map)
1131        }
1132    }
1133}
1134
1135#[inline]
1136fn is_avro_json_null(v: &Value) -> bool {
1137    matches!(v, Value::String(s) if s == "null")
1138}
1139
1140fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1141    let null = Value::String("null".into());
1142    match inner {
1143        Value::Array(mut union) => {
1144            union.retain(|v| !is_avro_json_null(v));
1145            match null_order {
1146                Nullability::NullFirst => union.insert(0, null),
1147                Nullability::NullSecond => union.push(null),
1148            }
1149            Value::Array(union)
1150        }
1151        other => match null_order {
1152            Nullability::NullFirst => Value::Array(vec![null, other]),
1153            Nullability::NullSecond => Value::Array(vec![other, null]),
1154        },
1155    }
1156}
1157
1158fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1159    match branch {
1160        Value::String(t) => Ok(format!("P:{t}")),
1161        Value::Object(map) => {
1162            let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1163                ArrowError::SchemaError("Union branch object missing string 'type'".into())
1164            })?;
1165            match t {
1166                "record" | "enum" | "fixed" => {
1167                    let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1168                        ArrowError::SchemaError(format!(
1169                            "Union branch '{t}' missing required 'name'"
1170                        ))
1171                    })?;
1172                    Ok(format!("N:{t}:{name}"))
1173                }
1174                "array" | "map" => Ok(format!("C:{t}")),
1175                other => Ok(format!("P:{other}")),
1176            }
1177        }
1178        Value::Array(_) => Err(ArrowError::SchemaError(
1179            "Avro union may not immediately contain another union".into(),
1180        )),
1181        _ => Err(ArrowError::SchemaError(
1182            "Invalid JSON for Avro union branch".into(),
1183        )),
1184    }
1185}
1186
1187fn datatype_to_avro(
1188    dt: &DataType,
1189    field_name: &str,
1190    metadata: &HashMap<String, String>,
1191    name_gen: &mut NameGenerator,
1192    null_order: Nullability,
1193) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1194    let mut extras = JsonMap::new();
1195    let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1196        if *scale < 0 {
1197            return Err(ArrowError::SchemaError(format!(
1198                "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1199            )));
1200        }
1201        if (*scale as usize) > (*precision as usize) {
1202            return Err(ArrowError::SchemaError(format!(
1203                "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1204                 must be <= precision ({precision})"
1205            )));
1206        }
1207
1208        let mut meta = JsonMap::from_iter([
1209            ("logicalType".into(), json!("decimal")),
1210            ("precision".into(), json!(*precision)),
1211            ("scale".into(), json!(*scale)),
1212        ]);
1213        if let Some(size) = metadata
1214            .get("size")
1215            .and_then(|val| val.parse::<usize>().ok())
1216        {
1217            meta.insert("type".into(), json!("fixed"));
1218            meta.insert("size".into(), json!(size));
1219            meta.insert("name".into(), json!(name_gen.make_unique(field_name)));
1220        } else {
1221            meta.insert("type".into(), json!("bytes"));
1222        }
1223        Ok(Value::Object(meta))
1224    };
1225    let val = match dt {
1226        DataType::Null => Value::String("null".into()),
1227        DataType::Boolean => Value::String("boolean".into()),
1228        DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 | DataType::Int32 => {
1229            Value::String("int".into())
1230        }
1231        DataType::UInt32 | DataType::Int64 | DataType::UInt64 => Value::String("long".into()),
1232        DataType::Float16 | DataType::Float32 => Value::String("float".into()),
1233        DataType::Float64 => Value::String("double".into()),
1234        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1235        DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1236        DataType::BinaryView => {
1237            extras.insert("arrowBinaryView".into(), Value::Bool(true));
1238            Value::String("bytes".into())
1239        }
1240        DataType::FixedSizeBinary(len) => {
1241            let is_uuid = metadata
1242                .get("logicalType")
1243                .is_some_and(|value| value == "uuid")
1244                || (*len == 16
1245                    && metadata
1246                        .get("ARROW:extension:name")
1247                        .is_some_and(|value| value == "uuid"));
1248            if is_uuid {
1249                json!({ "type": "string", "logicalType": "uuid" })
1250            } else {
1251                json!({
1252                    "type": "fixed",
1253                    "name": name_gen.make_unique(field_name),
1254                    "size": len
1255                })
1256            }
1257        }
1258        #[cfg(feature = "small_decimals")]
1259        DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1260            handle_decimal(precision, scale)?
1261        }
1262        DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1263            handle_decimal(precision, scale)?
1264        }
1265        DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1266        DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1267        DataType::Time32(unit) => match unit {
1268            TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1269            TimeUnit::Second => {
1270                extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1271                Value::String("int".into())
1272            }
1273            _ => Value::String("int".into()),
1274        },
1275        DataType::Time64(unit) => match unit {
1276            TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1277            TimeUnit::Nanosecond => {
1278                extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1279                Value::String("long".into())
1280            }
1281            _ => Value::String("long".into()),
1282        },
1283        DataType::Timestamp(unit, tz) => {
1284            let logical_type = match (unit, tz.is_some()) {
1285                (TimeUnit::Millisecond, true) => "timestamp-millis",
1286                (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1287                (TimeUnit::Microsecond, true) => "timestamp-micros",
1288                (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1289                (TimeUnit::Second, _) => {
1290                    extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1291                    return Ok((Value::String("long".into()), extras));
1292                }
1293                (TimeUnit::Nanosecond, _) => {
1294                    extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1295                    return Ok((Value::String("long".into()), extras));
1296                }
1297            };
1298            json!({ "type": "long", "logicalType": logical_type })
1299        }
1300        DataType::Duration(unit) => {
1301            #[cfg(feature = "avro_custom_types")]
1302            {
1303                // When the feature is enabled, create an Avro schema object
1304                // with the correct `logicalType` annotation.
1305                let logical_type = match unit {
1306                    TimeUnit::Second => "arrow.duration-seconds",
1307                    TimeUnit::Millisecond => "arrow.duration-millis",
1308                    TimeUnit::Microsecond => "arrow.duration-micros",
1309                    TimeUnit::Nanosecond => "arrow.duration-nanos",
1310                };
1311                json!({ "type": "long", "logicalType": logical_type })
1312            }
1313            #[cfg(not(feature = "avro_custom_types"))]
1314            {
1315                Value::String("long".into())
1316            }
1317        }
1318        DataType::Interval(IntervalUnit::MonthDayNano) => json!({
1319            "type": "fixed",
1320            "name": name_gen.make_unique(&format!("{field_name}_duration")),
1321            "size": 12,
1322            "logicalType": "duration"
1323        }),
1324        DataType::Interval(IntervalUnit::YearMonth) => {
1325            extras.insert(
1326                "arrowIntervalUnit".into(),
1327                Value::String("yearmonth".into()),
1328            );
1329            Value::String("long".into())
1330        }
1331        DataType::Interval(IntervalUnit::DayTime) => {
1332            extras.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1333            Value::String("long".into())
1334        }
1335        DataType::List(child) | DataType::LargeList(child) => {
1336            if matches!(dt, DataType::LargeList(_)) {
1337                extras.insert("arrowLargeList".into(), Value::Bool(true));
1338            }
1339            let items_schema = process_datatype(
1340                child.data_type(),
1341                child.name(),
1342                child.metadata(),
1343                name_gen,
1344                null_order,
1345                child.is_nullable(),
1346            )?;
1347            json!({
1348                "type": "array",
1349                "items": items_schema
1350            })
1351        }
1352        DataType::ListView(child) | DataType::LargeListView(child) => {
1353            if matches!(dt, DataType::LargeListView(_)) {
1354                extras.insert("arrowLargeList".into(), Value::Bool(true));
1355            }
1356            extras.insert("arrowListView".into(), Value::Bool(true));
1357            let items_schema = process_datatype(
1358                child.data_type(),
1359                child.name(),
1360                child.metadata(),
1361                name_gen,
1362                null_order,
1363                child.is_nullable(),
1364            )?;
1365            json!({
1366                "type": "array",
1367                "items": items_schema
1368            })
1369        }
1370        DataType::FixedSizeList(child, len) => {
1371            extras.insert("arrowFixedSize".into(), json!(len));
1372            let items_schema = process_datatype(
1373                child.data_type(),
1374                child.name(),
1375                child.metadata(),
1376                name_gen,
1377                null_order,
1378                child.is_nullable(),
1379            )?;
1380            json!({
1381                "type": "array",
1382                "items": items_schema
1383            })
1384        }
1385        DataType::Map(entries, _) => {
1386            let value_field = match entries.data_type() {
1387                DataType::Struct(fs) => &fs[1],
1388                _ => {
1389                    return Err(ArrowError::SchemaError(
1390                        "Map 'entries' field must be Struct(key,value)".into(),
1391                    ))
1392                }
1393            };
1394            let values_schema = process_datatype(
1395                value_field.data_type(),
1396                value_field.name(),
1397                value_field.metadata(),
1398                name_gen,
1399                null_order,
1400                value_field.is_nullable(),
1401            )?;
1402            json!({
1403                "type": "map",
1404                "values": values_schema
1405            })
1406        }
1407        DataType::Struct(fields) => {
1408            let avro_fields = fields
1409                .iter()
1410                .map(|field| arrow_field_to_avro(field, name_gen, null_order))
1411                .collect::<Result<Vec<_>, _>>()?;
1412            json!({
1413                "type": "record",
1414                "name": name_gen.make_unique(field_name),
1415                "fields": avro_fields
1416            })
1417        }
1418        DataType::Dictionary(_, value) => {
1419            if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1420                let symbols: Vec<&str> =
1421                    serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1422                json!({
1423                    "type": "enum",
1424                    "name": name_gen.make_unique(field_name),
1425                    "symbols": symbols
1426                })
1427            } else {
1428                process_datatype(
1429                    value.as_ref(),
1430                    field_name,
1431                    metadata,
1432                    name_gen,
1433                    null_order,
1434                    false,
1435                )?
1436            }
1437        }
1438        DataType::RunEndEncoded(_, values) => process_datatype(
1439            values.data_type(),
1440            values.name(),
1441            values.metadata(),
1442            name_gen,
1443            null_order,
1444            false,
1445        )?,
1446        DataType::Union(fields, mode) => {
1447            let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1448            let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1449            for (type_id, field_ref) in fields.iter() {
1450                // NOTE: `process_datatype` would wrap nullability; force is_nullable=false here.
1451                let (branch_schema, _branch_extras) = datatype_to_avro(
1452                    field_ref.data_type(),
1453                    field_ref.name(),
1454                    field_ref.metadata(),
1455                    name_gen,
1456                    null_order,
1457                )?;
1458                // Avro unions cannot immediately contain another union
1459                if matches!(branch_schema, Value::Array(_)) {
1460                    return Err(ArrowError::SchemaError(
1461                        "Avro union may not immediately contain another union".into(),
1462                    ));
1463                }
1464                branches.push(branch_schema);
1465                type_ids.push(type_id as i32);
1466            }
1467            let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1468            for b in &branches {
1469                let sig = union_branch_signature(b)?;
1470                if !seen.insert(sig) {
1471                    return Err(ArrowError::SchemaError(
1472                        "Avro union contains duplicate branch types (disallowed by spec)".into(),
1473                    ));
1474                }
1475            }
1476            extras.insert(
1477                "arrowUnionMode".into(),
1478                Value::String(
1479                    match mode {
1480                        UnionMode::Sparse => "sparse",
1481                        UnionMode::Dense => "dense",
1482                    }
1483                    .to_string(),
1484                ),
1485            );
1486            extras.insert(
1487                "arrowUnionTypeIds".into(),
1488                Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1489            );
1490
1491            Value::Array(branches)
1492        }
1493        other => {
1494            return Err(ArrowError::NotYetImplemented(format!(
1495                "Arrow type {other:?} has no Avro representation"
1496            )))
1497        }
1498    };
1499    Ok((val, extras))
1500}
1501
1502fn process_datatype(
1503    dt: &DataType,
1504    field_name: &str,
1505    metadata: &HashMap<String, String>,
1506    name_gen: &mut NameGenerator,
1507    null_order: Nullability,
1508    is_nullable: bool,
1509) -> Result<Value, ArrowError> {
1510    let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order)?;
1511    let mut merged = merge_extras(schema, extras);
1512    if is_nullable {
1513        merged = wrap_nullable(merged, null_order)
1514    }
1515    Ok(merged)
1516}
1517
1518fn arrow_field_to_avro(
1519    field: &ArrowField,
1520    name_gen: &mut NameGenerator,
1521    null_order: Nullability,
1522) -> Result<Value, ArrowError> {
1523    let avro_name = sanitise_avro_name(field.name());
1524    let schema_value = process_datatype(
1525        field.data_type(),
1526        &avro_name,
1527        field.metadata(),
1528        name_gen,
1529        null_order,
1530        field.is_nullable(),
1531    )?;
1532    // Build the field map
1533    let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1534    map.insert("name".into(), Value::String(avro_name));
1535    map.insert("type".into(), schema_value);
1536    // Transfer selected metadata
1537    for (meta_key, meta_val) in field.metadata() {
1538        if is_internal_arrow_key(meta_key) {
1539            continue;
1540        }
1541        match meta_key.as_str() {
1542            AVRO_DOC_METADATA_KEY => {
1543                map.insert("doc".into(), Value::String(meta_val.clone()));
1544            }
1545            AVRO_FIELD_DEFAULT_METADATA_KEY => {
1546                let default_value = serde_json::from_str(meta_val)
1547                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1548                map.insert("default".into(), default_value);
1549            }
1550            _ => {
1551                let json_val = serde_json::from_str(meta_val)
1552                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1553                map.insert(meta_key.clone(), json_val);
1554            }
1555        }
1556    }
1557    Ok(Value::Object(map))
1558}
1559
1560#[cfg(test)]
1561mod tests {
1562    use super::*;
1563    use crate::codec::{AvroDataType, AvroField};
1564    use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1565    use serde_json::json;
1566    use std::sync::Arc;
1567
1568    fn int_schema() -> Schema<'static> {
1569        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1570    }
1571
1572    fn record_schema() -> Schema<'static> {
1573        Schema::Complex(ComplexType::Record(Record {
1574            name: "record1",
1575            namespace: Some("test.namespace"),
1576            doc: Some("A test record"),
1577            aliases: vec![],
1578            fields: vec![
1579                Field {
1580                    name: "field1",
1581                    doc: Some("An integer field"),
1582                    r#type: int_schema(),
1583                    default: None,
1584                },
1585                Field {
1586                    name: "field2",
1587                    doc: None,
1588                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1589                    default: None,
1590                },
1591            ],
1592            attributes: Attributes::default(),
1593        }))
1594    }
1595
1596    fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
1597        let mut sb = SchemaBuilder::new();
1598        sb.push(field);
1599        sb.finish()
1600    }
1601
1602    fn assert_json_contains(avro_json: &str, needle: &str) {
1603        assert!(
1604            avro_json.contains(needle),
1605            "JSON did not contain `{needle}` : {avro_json}"
1606        )
1607    }
1608
1609    #[test]
1610    fn test_deserialize() {
1611        let t: Schema = serde_json::from_str("\"string\"").unwrap();
1612        assert_eq!(
1613            t,
1614            Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
1615        );
1616
1617        let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
1618        assert_eq!(
1619            t,
1620            Schema::Union(vec![
1621                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1622                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1623            ])
1624        );
1625
1626        let t: Type = serde_json::from_str(
1627            r#"{
1628                   "type":"long",
1629                   "logicalType":"timestamp-micros"
1630                }"#,
1631        )
1632        .unwrap();
1633
1634        let timestamp = Type {
1635            r#type: TypeName::Primitive(PrimitiveType::Long),
1636            attributes: Attributes {
1637                logical_type: Some("timestamp-micros"),
1638                additional: Default::default(),
1639            },
1640        };
1641
1642        assert_eq!(t, timestamp);
1643
1644        let t: ComplexType = serde_json::from_str(
1645            r#"{
1646                   "type":"fixed",
1647                   "name":"fixed",
1648                   "namespace":"topLevelRecord.value",
1649                   "size":11,
1650                   "logicalType":"decimal",
1651                   "precision":25,
1652                   "scale":2
1653                }"#,
1654        )
1655        .unwrap();
1656
1657        let decimal = ComplexType::Fixed(Fixed {
1658            name: "fixed",
1659            namespace: Some("topLevelRecord.value"),
1660            aliases: vec![],
1661            size: 11,
1662            attributes: Attributes {
1663                logical_type: Some("decimal"),
1664                additional: vec![("precision", json!(25)), ("scale", json!(2))]
1665                    .into_iter()
1666                    .collect(),
1667            },
1668        });
1669
1670        assert_eq!(t, decimal);
1671
1672        let schema: Schema = serde_json::from_str(
1673            r#"{
1674               "type":"record",
1675               "name":"topLevelRecord",
1676               "fields":[
1677                  {
1678                     "name":"value",
1679                     "type":[
1680                        {
1681                           "type":"fixed",
1682                           "name":"fixed",
1683                           "namespace":"topLevelRecord.value",
1684                           "size":11,
1685                           "logicalType":"decimal",
1686                           "precision":25,
1687                           "scale":2
1688                        },
1689                        "null"
1690                     ]
1691                  }
1692               ]
1693            }"#,
1694        )
1695        .unwrap();
1696
1697        assert_eq!(
1698            schema,
1699            Schema::Complex(ComplexType::Record(Record {
1700                name: "topLevelRecord",
1701                namespace: None,
1702                doc: None,
1703                aliases: vec![],
1704                fields: vec![Field {
1705                    name: "value",
1706                    doc: None,
1707                    r#type: Schema::Union(vec![
1708                        Schema::Complex(decimal),
1709                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1710                    ]),
1711                    default: None,
1712                },],
1713                attributes: Default::default(),
1714            }))
1715        );
1716
1717        let schema: Schema = serde_json::from_str(
1718            r#"{
1719                  "type": "record",
1720                  "name": "LongList",
1721                  "aliases": ["LinkedLongs"],
1722                  "fields" : [
1723                    {"name": "value", "type": "long"},
1724                    {"name": "next", "type": ["null", "LongList"]}
1725                  ]
1726                }"#,
1727        )
1728        .unwrap();
1729
1730        assert_eq!(
1731            schema,
1732            Schema::Complex(ComplexType::Record(Record {
1733                name: "LongList",
1734                namespace: None,
1735                doc: None,
1736                aliases: vec!["LinkedLongs"],
1737                fields: vec![
1738                    Field {
1739                        name: "value",
1740                        doc: None,
1741                        r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
1742                        default: None,
1743                    },
1744                    Field {
1745                        name: "next",
1746                        doc: None,
1747                        r#type: Schema::Union(vec![
1748                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1749                            Schema::TypeName(TypeName::Ref("LongList")),
1750                        ]),
1751                        default: None,
1752                    }
1753                ],
1754                attributes: Attributes::default(),
1755            }))
1756        );
1757
1758        // Recursive schema are not supported
1759        let err = AvroField::try_from(&schema).unwrap_err().to_string();
1760        assert_eq!(err, "Parser error: Failed to resolve .LongList");
1761
1762        let schema: Schema = serde_json::from_str(
1763            r#"{
1764               "type":"record",
1765               "name":"topLevelRecord",
1766               "fields":[
1767                  {
1768                     "name":"id",
1769                     "type":[
1770                        "int",
1771                        "null"
1772                     ]
1773                  },
1774                  {
1775                     "name":"timestamp_col",
1776                     "type":[
1777                        {
1778                           "type":"long",
1779                           "logicalType":"timestamp-micros"
1780                        },
1781                        "null"
1782                     ]
1783                  }
1784               ]
1785            }"#,
1786        )
1787        .unwrap();
1788
1789        assert_eq!(
1790            schema,
1791            Schema::Complex(ComplexType::Record(Record {
1792                name: "topLevelRecord",
1793                namespace: None,
1794                doc: None,
1795                aliases: vec![],
1796                fields: vec![
1797                    Field {
1798                        name: "id",
1799                        doc: None,
1800                        r#type: Schema::Union(vec![
1801                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1802                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1803                        ]),
1804                        default: None,
1805                    },
1806                    Field {
1807                        name: "timestamp_col",
1808                        doc: None,
1809                        r#type: Schema::Union(vec![
1810                            Schema::Type(timestamp),
1811                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1812                        ]),
1813                        default: None,
1814                    }
1815                ],
1816                attributes: Default::default(),
1817            }))
1818        );
1819        let codec = AvroField::try_from(&schema).unwrap();
1820        assert_eq!(
1821            codec.field(),
1822            arrow_schema::Field::new(
1823                "topLevelRecord",
1824                DataType::Struct(Fields::from(vec![
1825                    arrow_schema::Field::new("id", DataType::Int32, true),
1826                    arrow_schema::Field::new(
1827                        "timestamp_col",
1828                        DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1829                        true
1830                    ),
1831                ])),
1832                false
1833            )
1834        );
1835
1836        let schema: Schema = serde_json::from_str(
1837            r#"{
1838                  "type": "record",
1839                  "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
1840                  "fields": [
1841                    {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
1842                    {"name": "clientProtocol", "type": ["null", "string"]},
1843                    {"name": "serverHash", "type": "MD5"},
1844                    {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
1845                  ]
1846            }"#,
1847        )
1848        .unwrap();
1849
1850        assert_eq!(
1851            schema,
1852            Schema::Complex(ComplexType::Record(Record {
1853                name: "HandshakeRequest",
1854                namespace: Some("org.apache.avro.ipc"),
1855                doc: None,
1856                aliases: vec![],
1857                fields: vec![
1858                    Field {
1859                        name: "clientHash",
1860                        doc: None,
1861                        r#type: Schema::Complex(ComplexType::Fixed(Fixed {
1862                            name: "MD5",
1863                            namespace: None,
1864                            aliases: vec![],
1865                            size: 16,
1866                            attributes: Default::default(),
1867                        })),
1868                        default: None,
1869                    },
1870                    Field {
1871                        name: "clientProtocol",
1872                        doc: None,
1873                        r#type: Schema::Union(vec![
1874                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1875                            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1876                        ]),
1877                        default: None,
1878                    },
1879                    Field {
1880                        name: "serverHash",
1881                        doc: None,
1882                        r#type: Schema::TypeName(TypeName::Ref("MD5")),
1883                        default: None,
1884                    },
1885                    Field {
1886                        name: "meta",
1887                        doc: None,
1888                        r#type: Schema::Union(vec![
1889                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1890                            Schema::Complex(ComplexType::Map(Map {
1891                                values: Box::new(Schema::TypeName(TypeName::Primitive(
1892                                    PrimitiveType::Bytes
1893                                ))),
1894                                attributes: Default::default(),
1895                            })),
1896                        ]),
1897                        default: None,
1898                    }
1899                ],
1900                attributes: Default::default(),
1901            }))
1902        );
1903    }
1904
1905    #[test]
1906    fn test_new_schema_store() {
1907        let store = SchemaStore::new();
1908        assert!(store.schemas.is_empty());
1909    }
1910
1911    #[test]
1912    fn test_try_from_schemas_rabin() {
1913        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
1914        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
1915        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
1916        schemas.insert(
1917            int_avro_schema
1918                .fingerprint(FingerprintAlgorithm::Rabin)
1919                .unwrap(),
1920            int_avro_schema.clone(),
1921        );
1922        schemas.insert(
1923            record_avro_schema
1924                .fingerprint(FingerprintAlgorithm::Rabin)
1925                .unwrap(),
1926            record_avro_schema.clone(),
1927        );
1928        let store = SchemaStore::try_from(schemas).unwrap();
1929        let int_fp = int_avro_schema
1930            .fingerprint(FingerprintAlgorithm::Rabin)
1931            .unwrap();
1932        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
1933        let rec_fp = record_avro_schema
1934            .fingerprint(FingerprintAlgorithm::Rabin)
1935            .unwrap();
1936        assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
1937    }
1938
1939    #[test]
1940    fn test_try_from_with_duplicates() {
1941        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
1942        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
1943        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
1944        schemas.insert(
1945            int_avro_schema
1946                .fingerprint(FingerprintAlgorithm::Rabin)
1947                .unwrap(),
1948            int_avro_schema.clone(),
1949        );
1950        schemas.insert(
1951            record_avro_schema
1952                .fingerprint(FingerprintAlgorithm::Rabin)
1953                .unwrap(),
1954            record_avro_schema.clone(),
1955        );
1956        // Insert duplicate of int schema
1957        schemas.insert(
1958            int_avro_schema
1959                .fingerprint(FingerprintAlgorithm::Rabin)
1960                .unwrap(),
1961            int_avro_schema.clone(),
1962        );
1963        let store = SchemaStore::try_from(schemas).unwrap();
1964        assert_eq!(store.schemas.len(), 2);
1965        let int_fp = int_avro_schema
1966            .fingerprint(FingerprintAlgorithm::Rabin)
1967            .unwrap();
1968        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
1969    }
1970
1971    #[test]
1972    fn test_register_and_lookup_rabin() {
1973        let mut store = SchemaStore::new();
1974        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
1975        let fp_enum = store.register(schema.clone()).unwrap();
1976        match fp_enum {
1977            Fingerprint::Rabin(fp_val) => {
1978                assert_eq!(
1979                    store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
1980                    Some(schema.clone())
1981                );
1982                assert!(store
1983                    .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
1984                    .is_none());
1985            }
1986            Fingerprint::Id(id) => {
1987                unreachable!("This test should only generate Rabin fingerprints")
1988            }
1989            #[cfg(feature = "md5")]
1990            Fingerprint::MD5(id) => {
1991                unreachable!("This test should only generate Rabin fingerprints")
1992            }
1993            #[cfg(feature = "sha256")]
1994            Fingerprint::SHA256(id) => {
1995                unreachable!("This test should only generate Rabin fingerprints")
1996            }
1997        }
1998    }
1999
2000    #[test]
2001    fn test_set_and_lookup_id() {
2002        let mut store = SchemaStore::new();
2003        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2004        let id = 42u32;
2005        let fp = Fingerprint::Id(id);
2006        let out_fp = store.set(fp, schema.clone()).unwrap();
2007        assert_eq!(out_fp, fp);
2008        assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2009        assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2010    }
2011
2012    #[test]
2013    fn test_register_duplicate_schema() {
2014        let mut store = SchemaStore::new();
2015        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2016        let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2017        let fingerprint1 = store.register(schema1).unwrap();
2018        let fingerprint2 = store.register(schema2).unwrap();
2019        assert_eq!(fingerprint1, fingerprint2);
2020        assert_eq!(store.schemas.len(), 1);
2021    }
2022
2023    #[test]
2024    fn test_set_and_lookup_with_provided_fingerprint() {
2025        let mut store = SchemaStore::new();
2026        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2027        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2028        let out_fp = store.set(fp, schema.clone()).unwrap();
2029        assert_eq!(out_fp, fp);
2030        assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2031    }
2032
2033    #[test]
2034    fn test_set_duplicate_same_schema_ok() {
2035        let mut store = SchemaStore::new();
2036        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2037        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2038        let _ = store.set(fp, schema.clone()).unwrap();
2039        let _ = store.set(fp, schema.clone()).unwrap();
2040        assert_eq!(store.schemas.len(), 1);
2041    }
2042
2043    #[test]
2044    fn test_set_duplicate_different_schema_collision_error() {
2045        let mut store = SchemaStore::new();
2046        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2047        let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2048        // Use the same Fingerprint::Id to simulate a collision across different schemas
2049        let fp = Fingerprint::Id(123);
2050        let _ = store.set(fp, schema1).unwrap();
2051        let err = store.set(fp, schema2).unwrap_err();
2052        let msg = format!("{err}");
2053        assert!(msg.contains("Schema fingerprint collision"));
2054    }
2055
2056    #[test]
2057    fn test_canonical_form_generation_primitive() {
2058        let schema = int_schema();
2059        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2060        assert_eq!(canonical_form, r#""int""#);
2061    }
2062
2063    #[test]
2064    fn test_canonical_form_generation_record() {
2065        let schema = record_schema();
2066        let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2067        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2068        assert_eq!(canonical_form, expected_canonical_form);
2069    }
2070
2071    #[test]
2072    fn test_fingerprint_calculation() {
2073        let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2074        let expected_fingerprint = 10505236152925314060;
2075        let fingerprint = compute_fingerprint_rabin(canonical_form);
2076        assert_eq!(fingerprint, expected_fingerprint);
2077    }
2078
2079    #[test]
2080    fn test_register_and_lookup_complex_schema() {
2081        let mut store = SchemaStore::new();
2082        let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2083        let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2084        let expected_fingerprint =
2085            Fingerprint::Rabin(super::compute_fingerprint_rabin(canonical_form));
2086        let fingerprint = store.register(schema.clone()).unwrap();
2087        assert_eq!(fingerprint, expected_fingerprint);
2088        let looked_up = store.lookup(&fingerprint).cloned();
2089        assert_eq!(looked_up, Some(schema));
2090    }
2091
2092    #[test]
2093    fn test_fingerprints_returns_all_keys() {
2094        let mut store = SchemaStore::new();
2095        let fp_int = store
2096            .register(AvroSchema::new(
2097                serde_json::to_string(&int_schema()).unwrap(),
2098            ))
2099            .unwrap();
2100        let fp_record = store
2101            .register(AvroSchema::new(
2102                serde_json::to_string(&record_schema()).unwrap(),
2103            ))
2104            .unwrap();
2105        let fps = store.fingerprints();
2106        assert_eq!(fps.len(), 2);
2107        assert!(fps.contains(&fp_int));
2108        assert!(fps.contains(&fp_record));
2109    }
2110
2111    #[test]
2112    fn test_canonical_form_strips_attributes() {
2113        let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2114            name: "record_with_attrs",
2115            namespace: None,
2116            doc: Some("This doc should be stripped"),
2117            aliases: vec!["alias1", "alias2"],
2118            fields: vec![Field {
2119                name: "f1",
2120                doc: Some("field doc"),
2121                r#type: Schema::Type(Type {
2122                    r#type: TypeName::Primitive(PrimitiveType::Bytes),
2123                    attributes: Attributes {
2124                        logical_type: None,
2125                        additional: HashMap::from([("precision", json!(4))]),
2126                    },
2127                }),
2128                default: None,
2129            }],
2130            attributes: Attributes {
2131                logical_type: None,
2132                additional: HashMap::from([("custom_attr", json!("value"))]),
2133            },
2134        }));
2135        let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2136        let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2137        assert_eq!(canonical_form, expected_canonical_form);
2138    }
2139
2140    #[test]
2141    fn test_primitive_mappings() {
2142        let cases = vec![
2143            (DataType::Boolean, "\"boolean\""),
2144            (DataType::Int8, "\"int\""),
2145            (DataType::Int16, "\"int\""),
2146            (DataType::Int32, "\"int\""),
2147            (DataType::Int64, "\"long\""),
2148            (DataType::UInt8, "\"int\""),
2149            (DataType::UInt16, "\"int\""),
2150            (DataType::UInt32, "\"long\""),
2151            (DataType::UInt64, "\"long\""),
2152            (DataType::Float16, "\"float\""),
2153            (DataType::Float32, "\"float\""),
2154            (DataType::Float64, "\"double\""),
2155            (DataType::Utf8, "\"string\""),
2156            (DataType::Binary, "\"bytes\""),
2157        ];
2158        for (dt, avro_token) in cases {
2159            let field = ArrowField::new("col", dt.clone(), false);
2160            let arrow_schema = single_field_schema(field);
2161            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2162            assert_json_contains(&avro.json_string, avro_token);
2163        }
2164    }
2165
2166    #[test]
2167    fn test_temporal_mappings() {
2168        let cases = vec![
2169            (DataType::Date32, "\"logicalType\":\"date\""),
2170            (
2171                DataType::Time32(TimeUnit::Millisecond),
2172                "\"logicalType\":\"time-millis\"",
2173            ),
2174            (
2175                DataType::Time64(TimeUnit::Microsecond),
2176                "\"logicalType\":\"time-micros\"",
2177            ),
2178            (
2179                DataType::Timestamp(TimeUnit::Millisecond, None),
2180                "\"logicalType\":\"local-timestamp-millis\"",
2181            ),
2182            (
2183                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2184                "\"logicalType\":\"timestamp-micros\"",
2185            ),
2186        ];
2187        for (dt, needle) in cases {
2188            let field = ArrowField::new("ts", dt.clone(), true);
2189            let arrow_schema = single_field_schema(field);
2190            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2191            assert_json_contains(&avro.json_string, needle);
2192        }
2193    }
2194
2195    #[test]
2196    fn test_decimal_and_uuid() {
2197        let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2198        let dec_schema = single_field_schema(decimal_field);
2199        let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2200        assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2201        assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2202        assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2203        let mut md = HashMap::new();
2204        md.insert("logicalType".into(), "uuid".into());
2205        let uuid_field =
2206            ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2207        let uuid_schema = single_field_schema(uuid_field);
2208        let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2209        assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2210    }
2211
2212    #[test]
2213    fn test_interval_duration() {
2214        let interval_field = ArrowField::new(
2215            "span",
2216            DataType::Interval(IntervalUnit::MonthDayNano),
2217            false,
2218        );
2219        let s = single_field_schema(interval_field);
2220        let avro = AvroSchema::try_from(&s).unwrap();
2221        assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2222        assert_json_contains(&avro.json_string, "\"size\":12");
2223        let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2224        let s2 = single_field_schema(dur_field);
2225        let avro2 = AvroSchema::try_from(&s2).unwrap();
2226        #[cfg(feature = "avro_custom_types")]
2227        assert_json_contains(
2228            &avro2.json_string,
2229            "\"logicalType\":\"arrow.duration-nanos\"",
2230        );
2231    }
2232
2233    #[test]
2234    fn test_complex_types() {
2235        let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2236        let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2237        let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2238        assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2239        assert_json_contains(&avro_list.json_string, "\"items\"");
2240        let value_field = ArrowField::new("value", DataType::Boolean, true);
2241        let entries_struct = ArrowField::new(
2242            "entries",
2243            DataType::Struct(Fields::from(vec![
2244                ArrowField::new("key", DataType::Utf8, false),
2245                value_field.clone(),
2246            ])),
2247            false,
2248        );
2249        let map_dt = DataType::Map(Arc::new(entries_struct), false);
2250        let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
2251        let avro_map = AvroSchema::try_from(&map_schema).unwrap();
2252        assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
2253        assert_json_contains(&avro_map.json_string, "\"values\"");
2254        let struct_dt = DataType::Struct(Fields::from(vec![
2255            ArrowField::new("f1", DataType::Int64, false),
2256            ArrowField::new("f2", DataType::Utf8, true),
2257        ]));
2258        let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
2259        let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
2260        assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
2261        assert_json_contains(&avro_struct.json_string, "\"null\"");
2262    }
2263
2264    #[test]
2265    fn test_enum_dictionary() {
2266        let mut md = HashMap::new();
2267        md.insert(
2268            AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
2269            "[\"OPEN\",\"CLOSED\"]".into(),
2270        );
2271        let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2272        let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
2273        let schema = single_field_schema(field);
2274        let avro = AvroSchema::try_from(&schema).unwrap();
2275        assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
2276        assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
2277    }
2278
2279    #[test]
2280    fn test_run_end_encoded() {
2281        let ree_dt = DataType::RunEndEncoded(
2282            Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
2283            Arc::new(ArrowField::new("values", DataType::Utf8, false)),
2284        );
2285        let s = single_field_schema(ArrowField::new("text", ree_dt, false));
2286        let avro = AvroSchema::try_from(&s).unwrap();
2287        assert_json_contains(&avro.json_string, "\"string\"");
2288    }
2289
2290    #[test]
2291    fn test_dense_union() {
2292        let uf: UnionFields = vec![
2293            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
2294            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
2295        ]
2296        .into_iter()
2297        .collect();
2298        let union_dt = DataType::Union(uf, UnionMode::Dense);
2299        let s = single_field_schema(ArrowField::new("u", union_dt, false));
2300        let avro =
2301            AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
2302        let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
2303        let fields = v
2304            .get("fields")
2305            .and_then(|x| x.as_array())
2306            .expect("fields array");
2307        let u_field = fields
2308            .iter()
2309            .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
2310            .expect("field 'u'");
2311        let union = u_field.get("type").expect("u.type");
2312        let arr = union.as_array().expect("u.type must be Avro union array");
2313        assert_eq!(arr.len(), 2, "expected two union branches");
2314        let first = &arr[0];
2315        let obj = first
2316            .as_object()
2317            .expect("first branch should be an object with metadata");
2318        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
2319        assert_eq!(
2320            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
2321            Some("dense")
2322        );
2323        let type_ids: Vec<i64> = obj
2324            .get("arrowUnionTypeIds")
2325            .and_then(|a| a.as_array())
2326            .expect("arrowUnionTypeIds array")
2327            .iter()
2328            .map(|n| n.as_i64().expect("i64"))
2329            .collect();
2330        assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
2331        assert_eq!(arr[1], Value::String("string".into()));
2332    }
2333
2334    #[test]
2335    fn round_trip_primitive() {
2336        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
2337        let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
2338        let decoded = avro_schema.schema().unwrap();
2339        assert!(matches!(decoded, Schema::Complex(_)));
2340    }
2341
2342    #[test]
2343    fn test_name_generator_sanitization_and_uniqueness() {
2344        let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
2345        let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
2346        let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
2347        let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
2348        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2349        assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
2350        assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
2351        assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
2352    }
2353
2354    #[test]
2355    fn test_date64_logical_type_mapping() {
2356        let field = ArrowField::new("d", DataType::Date64, true);
2357        let schema = single_field_schema(field);
2358        let avro = AvroSchema::try_from(&schema).unwrap();
2359        assert_json_contains(
2360            &avro.json_string,
2361            "\"logicalType\":\"local-timestamp-millis\"",
2362        );
2363    }
2364
2365    #[test]
2366    fn test_duration_list_extras_propagated() {
2367        let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
2368        let list_dt = DataType::List(Arc::new(child));
2369        let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
2370        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2371        #[cfg(feature = "avro_custom_types")]
2372        assert_json_contains(
2373            &avro.json_string,
2374            "\"logicalType\":\"arrow.duration-micros\"",
2375        );
2376    }
2377
2378    #[test]
2379    fn test_interval_yearmonth_extra() {
2380        let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
2381        let schema = single_field_schema(field);
2382        let avro = AvroSchema::try_from(&schema).unwrap();
2383        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
2384    }
2385
2386    #[test]
2387    fn test_interval_daytime_extra() {
2388        let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
2389        let schema = single_field_schema(field);
2390        let avro = AvroSchema::try_from(&schema).unwrap();
2391        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
2392    }
2393
2394    #[test]
2395    fn test_fixed_size_list_extra() {
2396        let child = ArrowField::new("item", DataType::Int32, false);
2397        let dt = DataType::FixedSizeList(Arc::new(child), 3);
2398        let schema = single_field_schema(ArrowField::new("triples", dt, false));
2399        let avro = AvroSchema::try_from(&schema).unwrap();
2400        assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
2401    }
2402
2403    #[test]
2404    fn test_map_duration_value_extra() {
2405        let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
2406        let entries_struct = ArrowField::new(
2407            "entries",
2408            DataType::Struct(Fields::from(vec![
2409                ArrowField::new("key", DataType::Utf8, false),
2410                val_field,
2411            ])),
2412            false,
2413        );
2414        let map_dt = DataType::Map(Arc::new(entries_struct), false);
2415        let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
2416        let avro = AvroSchema::try_from(&schema).unwrap();
2417        #[cfg(feature = "avro_custom_types")]
2418        assert_json_contains(
2419            &avro.json_string,
2420            "\"logicalType\":\"arrow.duration-seconds\"",
2421        );
2422    }
2423
2424    #[test]
2425    fn test_schema_with_non_string_defaults_decodes_successfully() {
2426        let schema_json = r#"{
2427            "type": "record",
2428            "name": "R",
2429            "fields": [
2430                {"name": "a", "type": "int", "default": 0},
2431                {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
2432                {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
2433                {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
2434                    {"name": "flag", "type": "boolean", "default": true},
2435                    {"name": "name", "type": "string", "default": "hi"}
2436                ]}, "default": {"flag": false, "name": "d"}},
2437                {"name": "u", "type": ["int", "null"], "default": 42}
2438            ]
2439        }"#;
2440
2441        let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
2442        match &schema {
2443            Schema::Complex(ComplexType::Record(_)) => {}
2444            other => panic!("expected record schema, got: {:?}", other),
2445        }
2446        // Avro to Arrow conversion
2447        let field = crate::codec::AvroField::try_from(&schema)
2448            .expect("Avro->Arrow conversion should succeed");
2449        let arrow_field = field.field();
2450
2451        // Build expected Arrow field
2452        let expected_list_item = ArrowField::new(
2453            arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
2454            DataType::Int64,
2455            false,
2456        );
2457        let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
2458
2459        let expected_map_value = ArrowField::new("value", DataType::Float64, false);
2460        let expected_entries = ArrowField::new(
2461            "entries",
2462            DataType::Struct(Fields::from(vec![
2463                ArrowField::new("key", DataType::Utf8, false),
2464                expected_map_value,
2465            ])),
2466            false,
2467        );
2468        let expected_c =
2469            ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
2470
2471        let expected_inner = ArrowField::new(
2472            "inner",
2473            DataType::Struct(Fields::from(vec![
2474                ArrowField::new("flag", DataType::Boolean, false),
2475                ArrowField::new("name", DataType::Utf8, false),
2476            ])),
2477            false,
2478        );
2479
2480        let expected = ArrowField::new(
2481            "R",
2482            DataType::Struct(Fields::from(vec![
2483                ArrowField::new("a", DataType::Int32, false),
2484                expected_b,
2485                expected_c,
2486                expected_inner,
2487                ArrowField::new("u", DataType::Int32, true),
2488            ])),
2489            false,
2490        );
2491
2492        assert_eq!(arrow_field, expected);
2493    }
2494
2495    #[test]
2496    fn default_order_is_consistent() {
2497        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
2498        let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
2499        let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
2500        assert_eq!(a, b.unwrap().json_string);
2501    }
2502
2503    #[test]
2504    fn test_union_branch_missing_name_errors() {
2505        for t in ["record", "enum", "fixed"] {
2506            let branch = json!({ "type": t });
2507            let err = union_branch_signature(&branch).unwrap_err().to_string();
2508            assert!(
2509                err.contains(&format!("Union branch '{t}' missing required 'name'")),
2510                "expected missing-name error for {t}, got: {err}"
2511            );
2512        }
2513    }
2514
2515    #[test]
2516    fn test_union_branch_named_type_signature_includes_name() {
2517        let rec = json!({ "type": "record", "name": "Foo" });
2518        assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
2519        let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
2520        assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
2521        let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
2522        assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
2523    }
2524}