arrow_avro/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Schema representations for Arrow.
19
20#[cfg(feature = "canonical_extension_types")]
21use arrow_schema::extension::ExtensionType;
22use arrow_schema::{
23    ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
24    UnionMode,
25};
26use serde::{Deserialize, Serialize};
27use serde_json::{Map as JsonMap, Value, json};
28#[cfg(feature = "sha256")]
29use sha2::{Digest, Sha256};
30use std::borrow::Cow;
31use std::cmp::PartialEq;
32use std::collections::hash_map::Entry;
33use std::collections::{HashMap, HashSet};
34use strum_macros::AsRefStr;
35
36/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
37pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
38
39/// The Confluent "magic" byte (`0x00`)
40pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
41
42/// The maximum possible length of a prefix.
43/// SHA256 (32) + single-object magic (2)
44pub const MAX_PREFIX_LEN: usize = 34;
45
46/// The metadata key used for storing the JSON encoded `Schema`
47pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
48
49/// Metadata key used to represent Avro enum symbols in an Arrow schema.
50pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
51
52/// Metadata key used to store the default value of a field in an Avro schema.
53pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
54
55/// Metadata key used to store the name of a type in an Avro schema.
56pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
57
58/// Metadata key used to store the name of a type in an Avro schema.
59pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
60
61/// Metadata key used to store the documentation for a type in an Avro schema.
62pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
63
64/// Default name for the root record in an Avro schema.
65pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
66
67/// Avro types are not nullable, with nullability instead encoded as a union
68/// where one of the variants is the null type.
69///
70/// To accommodate this, we specially case two-variant unions where one of the
71/// variants is the null type, and use this to derive arrow's notion of nullability
72#[derive(Debug, Copy, Clone, PartialEq, Default)]
73pub(crate) enum Nullability {
74    /// The nulls are encoded as the first union variant
75    #[default]
76    NullFirst,
77    /// The nulls are encoded as the second union variant
78    NullSecond,
79}
80
81/// Either a [`PrimitiveType`] or a reference to a previously defined named type
82///
83/// <https://avro.apache.org/docs/1.11.1/specification/#names>
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85#[serde(untagged)]
86/// A type name in an Avro schema
87///
88/// This represents the different ways a type can be referenced in an Avro schema.
89pub(crate) enum TypeName<'a> {
90    /// A primitive type like null, boolean, int, etc.
91    Primitive(PrimitiveType),
92    /// A reference to another named type
93    Ref(&'a str),
94}
95
96/// A primitive type
97///
98/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
100#[serde(rename_all = "camelCase")]
101#[strum(serialize_all = "lowercase")]
102pub(crate) enum PrimitiveType {
103    /// null: no value
104    Null,
105    /// boolean: a binary value
106    Boolean,
107    /// int: 32-bit signed integer
108    Int,
109    /// long: 64-bit signed integer
110    Long,
111    /// float: single precision (32-bit) IEEE 754 floating-point number
112    Float,
113    /// double: double precision (64-bit) IEEE 754 floating-point number
114    Double,
115    /// bytes: sequence of 8-bit unsigned bytes
116    Bytes,
117    /// string: Unicode character sequence
118    String,
119}
120
121/// Additional attributes within a `Schema`
122///
123/// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration>
124#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
125#[serde(rename_all = "camelCase")]
126pub(crate) struct Attributes<'a> {
127    /// A logical type name
128    ///
129    /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types>
130    #[serde(default)]
131    pub(crate) logical_type: Option<&'a str>,
132
133    /// Additional JSON attributes
134    #[serde(flatten)]
135    pub(crate) additional: HashMap<&'a str, Value>,
136}
137
138impl Attributes<'_> {
139    /// Returns the field metadata for this [`Attributes`]
140    pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
141        self.additional
142            .iter()
143            .map(|(k, v)| (k.to_string(), v.to_string()))
144            .collect()
145    }
146}
147
148/// A type definition that is not a variant of [`ComplexType`]
149#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
150#[serde(rename_all = "camelCase")]
151pub(crate) struct Type<'a> {
152    /// The type of this Avro data structure
153    #[serde(borrow)]
154    pub(crate) r#type: TypeName<'a>,
155    /// Additional attributes associated with this type
156    #[serde(flatten)]
157    pub(crate) attributes: Attributes<'a>,
158}
159
160/// An Avro schema
161///
162/// This represents the different shapes of Avro schemas as defined in the specification.
163/// See <https://avro.apache.org/docs/1.11.1/specification/#schemas> for more details.
164#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(untagged)]
166pub(crate) enum Schema<'a> {
167    /// A direct type name (primitive or reference)
168    #[serde(borrow)]
169    TypeName(TypeName<'a>),
170    /// A union of multiple schemas (e.g., ["null", "string"])
171    #[serde(borrow)]
172    Union(Vec<Schema<'a>>),
173    /// A complex type such as record, array, map, etc.
174    #[serde(borrow)]
175    Complex(ComplexType<'a>),
176    /// A type with attributes
177    #[serde(borrow)]
178    Type(Type<'a>),
179}
180
181/// A complex type
182///
183/// <https://avro.apache.org/docs/1.11.1/specification/#complex-types>
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(tag = "type", rename_all = "camelCase")]
186pub(crate) enum ComplexType<'a> {
187    /// Record type: a sequence of fields with names and types
188    #[serde(borrow)]
189    Record(Record<'a>),
190    /// Enum type: a set of named values
191    #[serde(borrow)]
192    Enum(Enum<'a>),
193    /// Array type: a sequence of values of the same type
194    #[serde(borrow)]
195    Array(Array<'a>),
196    /// Map type: a mapping from strings to values of the same type
197    #[serde(borrow)]
198    Map(Map<'a>),
199    /// Fixed type: a fixed-size byte array
200    #[serde(borrow)]
201    Fixed(Fixed<'a>),
202}
203
204/// A record
205///
206/// <https://avro.apache.org/docs/1.11.1/specification/#schema-record>
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub(crate) struct Record<'a> {
209    /// Name of the record
210    #[serde(borrow)]
211    pub(crate) name: &'a str,
212    /// Optional namespace for the record, provides a way to organize names
213    #[serde(borrow, default)]
214    pub(crate) namespace: Option<&'a str>,
215    /// Optional documentation string for the record
216    #[serde(borrow, default)]
217    pub(crate) doc: Option<Cow<'a, str>>,
218    /// Alternative names for this record
219    #[serde(borrow, default)]
220    pub(crate) aliases: Vec<&'a str>,
221    /// The fields contained in this record
222    #[serde(borrow)]
223    pub(crate) fields: Vec<Field<'a>>,
224    /// Additional attributes for this record
225    #[serde(flatten)]
226    pub(crate) attributes: Attributes<'a>,
227}
228
229fn deserialize_default<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
230where
231    D: serde::Deserializer<'de>,
232{
233    Value::deserialize(deserializer).map(Some)
234}
235
236/// A field within a [`Record`]
237#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
238pub(crate) struct Field<'a> {
239    /// Name of the field within the record
240    #[serde(borrow)]
241    pub(crate) name: &'a str,
242    /// Optional documentation for this field
243    #[serde(borrow, default)]
244    pub(crate) doc: Option<Cow<'a, str>>,
245    /// The field's type definition
246    #[serde(borrow)]
247    pub(crate) r#type: Schema<'a>,
248    /// Optional default value for this field
249    #[serde(deserialize_with = "deserialize_default", default)]
250    pub(crate) default: Option<Value>,
251    /// Alternative names (aliases) for this field (Avro spec: field-level aliases).
252    /// Borrowed from input JSON where possible.
253    #[serde(borrow, default)]
254    pub(crate) aliases: Vec<&'a str>,
255}
256
257/// An enumeration
258///
259/// <https://avro.apache.org/docs/1.11.1/specification/#enums>
260#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
261pub(crate) struct Enum<'a> {
262    /// Name of the enum
263    #[serde(borrow)]
264    pub(crate) name: &'a str,
265    /// Optional namespace for the enum, provides organizational structure
266    #[serde(borrow, default)]
267    pub(crate) namespace: Option<&'a str>,
268    /// Optional documentation string describing the enum
269    #[serde(borrow, default)]
270    pub(crate) doc: Option<Cow<'a, str>>,
271    /// Alternative names for this enum
272    #[serde(borrow, default)]
273    pub(crate) aliases: Vec<&'a str>,
274    /// The symbols (values) that this enum can have
275    #[serde(borrow)]
276    pub(crate) symbols: Vec<&'a str>,
277    /// Optional default value for this enum
278    #[serde(borrow, default)]
279    pub(crate) default: Option<&'a str>,
280    /// Additional attributes for this enum
281    #[serde(flatten)]
282    pub(crate) attributes: Attributes<'a>,
283}
284
285/// An array
286///
287/// <https://avro.apache.org/docs/1.11.1/specification/#arrays>
288#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
289pub(crate) struct Array<'a> {
290    /// The schema for items in this array
291    #[serde(borrow)]
292    pub(crate) items: Box<Schema<'a>>,
293    /// Additional attributes for this array
294    #[serde(flatten)]
295    pub(crate) attributes: Attributes<'a>,
296}
297
298/// A map
299///
300/// <https://avro.apache.org/docs/1.11.1/specification/#maps>
301#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
302pub(crate) struct Map<'a> {
303    /// The schema for values in this map
304    #[serde(borrow)]
305    pub(crate) values: Box<Schema<'a>>,
306    /// Additional attributes for this map
307    #[serde(flatten)]
308    pub(crate) attributes: Attributes<'a>,
309}
310
311/// A fixed length binary array
312///
313/// <https://avro.apache.org/docs/1.11.1/specification/#fixed>
314#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
315pub(crate) struct Fixed<'a> {
316    /// Name of the fixed type
317    #[serde(borrow)]
318    pub(crate) name: &'a str,
319    /// Optional namespace for the fixed type
320    #[serde(borrow, default)]
321    pub(crate) namespace: Option<&'a str>,
322    /// Alternative names for this fixed type
323    #[serde(borrow, default)]
324    pub(crate) aliases: Vec<&'a str>,
325    /// The number of bytes in this fixed type
326    pub(crate) size: usize,
327    /// Additional attributes for this fixed type
328    #[serde(flatten)]
329    pub(crate) attributes: Attributes<'a>,
330}
331
332#[derive(Debug, Copy, Clone, PartialEq, Default)]
333pub(crate) struct AvroSchemaOptions {
334    pub(crate) null_order: Option<Nullability>,
335    pub(crate) strip_metadata: bool,
336}
337
338/// A wrapper for an Avro schema in its JSON string representation.
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340pub struct AvroSchema {
341    /// The Avro schema as a JSON string.
342    pub json_string: String,
343}
344
345impl TryFrom<&ArrowSchema> for AvroSchema {
346    type Error = ArrowError;
347
348    /// Converts an `ArrowSchema` to `AvroSchema`, delegating to
349    /// `AvroSchema::from_arrow_with_options` with `None` so that the
350    /// union null ordering is decided by `Nullability::default()`.
351    fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
352        AvroSchema::from_arrow_with_options(schema, None)
353    }
354}
355
356impl AvroSchema {
357    /// Creates a new `AvroSchema` from a JSON string.
358    pub fn new(json_string: String) -> Self {
359        Self { json_string }
360    }
361
362    pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
363        serde_json::from_str(self.json_string.as_str())
364            .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
365    }
366
367    /// Returns the fingerprint of the schema, computed using the specified [`FingerprintAlgorithm`].
368    ///
369    /// The fingerprint is computed over the schema's Parsed Canonical Form
370    /// as defined by the Avro specification. Depending on `hash_type`, this
371    /// will return one of the supported [`Fingerprint`] variants:
372    /// - [`Fingerprint::Rabin`] for [`FingerprintAlgorithm::Rabin`]
373    /// - `Fingerprint::MD5` for `FingerprintAlgorithm::MD5`
374    /// - `Fingerprint::SHA256` for `FingerprintAlgorithm::SHA256`
375    ///
376    /// Note: [`FingerprintAlgorithm::Id`] or [`FingerprintAlgorithm::Id64`] cannot be used to generate a fingerprint
377    /// and will result in an error. If you intend to use a Schema Registry ID-based
378    /// wire format, either use [`SchemaStore::set`] or load the [`Fingerprint::Id`] directly via [`Fingerprint::load_fingerprint_id`] or for
379    /// [`Fingerprint::Id64`] via [`Fingerprint::load_fingerprint_id64`].
380    ///
381    /// See also: <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
382    ///
383    /// # Errors
384    /// Returns an error if deserializing the schema fails, if generating the
385    /// canonical form of the schema fails, or if `hash_type` is [`FingerprintAlgorithm::Id`].
386    ///
387    /// # Examples
388    /// ```
389    /// use arrow_avro::schema::{AvroSchema, FingerprintAlgorithm};
390    ///
391    /// let avro = AvroSchema::new("\"string\"".to_string());
392    /// let fp = avro.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
393    /// ```
394    pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
395        Self::generate_fingerprint(&self.schema()?, hash_type)
396    }
397
398    pub(crate) fn generate_fingerprint(
399        schema: &Schema,
400        hash_type: FingerprintAlgorithm,
401    ) -> Result<Fingerprint, ArrowError> {
402        let canonical = Self::generate_canonical_form(schema).map_err(|e| {
403            ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
404        })?;
405        match hash_type {
406            FingerprintAlgorithm::Rabin => {
407                Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
408            }
409            FingerprintAlgorithm::Id | FingerprintAlgorithm::Id64 => Err(ArrowError::SchemaError(
410                "FingerprintAlgorithm of Id or Id64 cannot be used to generate a fingerprint; \
411                if using Fingerprint::Id, pass the registry ID in instead using the set method."
412                    .to_string(),
413            )),
414            #[cfg(feature = "md5")]
415            FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
416            #[cfg(feature = "sha256")]
417            FingerprintAlgorithm::SHA256 => {
418                Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
419            }
420        }
421    }
422
423    /// Generates the Parsed Canonical Form for the given `Schema`.
424    ///
425    /// The canonical form is a standardized JSON representation of the schema,
426    /// primarily used for generating a schema fingerprint for equality checking.
427    ///
428    /// This form strips attributes that do not affect the schema's identity,
429    /// such as `doc` fields, `aliases`, and any properties not defined in the
430    /// Avro specification.
431    ///
432    /// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
433    pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
434        build_canonical(schema, None)
435    }
436
437    /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given null‑union order and optionally stripping internal Arrow metadata.
438    ///
439    /// If the input Arrow schema already contains Avro JSON in
440    /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
441    /// the exact header encoding alignment; otherwise, a new JSON is generated
442    /// honoring `null_union_order` at **all nullable sites**.
443    pub(crate) fn from_arrow_with_options(
444        schema: &ArrowSchema,
445        options: Option<AvroSchemaOptions>,
446    ) -> Result<AvroSchema, ArrowError> {
447        let opts = options.unwrap_or_default();
448        let order = opts.null_order.unwrap_or_default();
449        let strip = opts.strip_metadata;
450        if !strip {
451            if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
452                return Ok(AvroSchema::new(json.clone()));
453            }
454        }
455        let mut name_gen = NameGenerator::default();
456        let fields_json = schema
457            .fields()
458            .iter()
459            .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
460            .collect::<Result<Vec<_>, _>>()?;
461        let record_name = schema
462            .metadata
463            .get(AVRO_NAME_METADATA_KEY)
464            .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
465        let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
466        record.insert("type".into(), Value::String("record".into()));
467        record.insert(
468            "name".into(),
469            Value::String(sanitise_avro_name(record_name)),
470        );
471        if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
472            record.insert("namespace".into(), Value::String(ns.clone()));
473        }
474        if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
475            record.insert("doc".into(), Value::String(doc.clone()));
476        }
477        record.insert("fields".into(), Value::Array(fields_json));
478        extend_with_passthrough_metadata(&mut record, &schema.metadata);
479        let json_string = serde_json::to_string(&Value::Object(record))
480            .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
481        Ok(AvroSchema::new(json_string))
482    }
483}
484
485/// A stack-allocated, fixed-size buffer for the prefix.
486#[derive(Debug, Copy, Clone)]
487pub(crate) struct Prefix {
488    buf: [u8; MAX_PREFIX_LEN],
489    len: u8,
490}
491
492impl Prefix {
493    #[inline]
494    pub(crate) fn as_slice(&self) -> &[u8] {
495        &self.buf[..self.len as usize]
496    }
497}
498
499/// Defines the strategy for generating the per-record prefix for an Avro binary stream.
500#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
501pub enum FingerprintStrategy {
502    /// Use the 64-bit Rabin fingerprint (default for single-object encoding).
503    #[default]
504    Rabin,
505    /// Use a Confluent Schema Registry 32-bit ID.
506    Id(u32),
507    /// Use an Apicurio Schema Registry 64-bit ID.
508    Id64(u64),
509    #[cfg(feature = "md5")]
510    /// Use the 128-bit MD5 fingerprint.
511    MD5,
512    #[cfg(feature = "sha256")]
513    /// Use the 256-bit SHA-256 fingerprint.
514    SHA256,
515}
516
517impl From<Fingerprint> for FingerprintStrategy {
518    fn from(f: Fingerprint) -> Self {
519        Self::from(&f)
520    }
521}
522
523impl From<FingerprintAlgorithm> for FingerprintStrategy {
524    fn from(f: FingerprintAlgorithm) -> Self {
525        match f {
526            FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
527            FingerprintAlgorithm::Id => FingerprintStrategy::Id(0),
528            FingerprintAlgorithm::Id64 => FingerprintStrategy::Id64(0),
529            #[cfg(feature = "md5")]
530            FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
531            #[cfg(feature = "sha256")]
532            FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
533        }
534    }
535}
536
537impl From<&Fingerprint> for FingerprintStrategy {
538    fn from(f: &Fingerprint) -> Self {
539        match f {
540            Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
541            Fingerprint::Id(_) => FingerprintStrategy::Id(0),
542            Fingerprint::Id64(_) => FingerprintStrategy::Id64(0),
543            #[cfg(feature = "md5")]
544            Fingerprint::MD5(_) => FingerprintStrategy::MD5,
545            #[cfg(feature = "sha256")]
546            Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
547        }
548    }
549}
550
551/// Supported fingerprint algorithms for Avro schema identification.
552/// For use with Confluent Schema Registry IDs, set to None.
553#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
554pub enum FingerprintAlgorithm {
555    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
556    #[default]
557    Rabin,
558    /// Represents a 32 bit fingerprint not based on a hash algorithm, (e.g., a 32-bit Schema Registry ID.)
559    Id,
560    /// Represents a 64 bit fingerprint not based on a hash algorithm, (e.g., a 64-bit Schema Registry ID.)
561    Id64,
562    #[cfg(feature = "md5")]
563    /// 128-bit MD5 message digest.
564    MD5,
565    #[cfg(feature = "sha256")]
566    /// 256-bit SHA-256 digest.
567    SHA256,
568}
569
570/// Allow easy extraction of the algorithm used to create a fingerprint.
571impl From<&Fingerprint> for FingerprintAlgorithm {
572    fn from(fp: &Fingerprint) -> Self {
573        match fp {
574            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
575            Fingerprint::Id(_) => FingerprintAlgorithm::Id,
576            Fingerprint::Id64(_) => FingerprintAlgorithm::Id64,
577            #[cfg(feature = "md5")]
578            Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
579            #[cfg(feature = "sha256")]
580            Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
581        }
582    }
583}
584
585impl From<FingerprintStrategy> for FingerprintAlgorithm {
586    fn from(s: FingerprintStrategy) -> Self {
587        Self::from(&s)
588    }
589}
590
591impl From<&FingerprintStrategy> for FingerprintAlgorithm {
592    fn from(s: &FingerprintStrategy) -> Self {
593        match s {
594            FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
595            FingerprintStrategy::Id(_) => FingerprintAlgorithm::Id,
596            FingerprintStrategy::Id64(_) => FingerprintAlgorithm::Id64,
597            #[cfg(feature = "md5")]
598            FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
599            #[cfg(feature = "sha256")]
600            FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
601        }
602    }
603}
604
605/// A schema fingerprint in one of the supported formats.
606///
607/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
608/// instance always stores only one variant, matching its configured
609/// `FingerprintAlgorithm`, but the enum makes the API uniform.
610///
611/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
612/// <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
613#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
614pub enum Fingerprint {
615    /// A 64-bit Rabin fingerprint.
616    Rabin(u64),
617    /// A 32-bit Schema Registry ID.
618    Id(u32),
619    /// A 64-bit Schema Registry ID.
620    Id64(u64),
621    #[cfg(feature = "md5")]
622    /// A 128-bit MD5 fingerprint.
623    MD5([u8; 16]),
624    #[cfg(feature = "sha256")]
625    /// A 256-bit SHA-256 fingerprint.
626    SHA256([u8; 32]),
627}
628
629impl From<FingerprintStrategy> for Fingerprint {
630    fn from(s: FingerprintStrategy) -> Self {
631        Self::from(&s)
632    }
633}
634
635impl From<&FingerprintStrategy> for Fingerprint {
636    fn from(s: &FingerprintStrategy) -> Self {
637        match s {
638            FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
639            FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
640            FingerprintStrategy::Id64(id) => Fingerprint::Id64(*id),
641            #[cfg(feature = "md5")]
642            FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
643            #[cfg(feature = "sha256")]
644            FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
645        }
646    }
647}
648
649impl From<FingerprintAlgorithm> for Fingerprint {
650    fn from(s: FingerprintAlgorithm) -> Self {
651        match s {
652            FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
653            FingerprintAlgorithm::Id => Fingerprint::Id(0),
654            FingerprintAlgorithm::Id64 => Fingerprint::Id64(0),
655            #[cfg(feature = "md5")]
656            FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
657            #[cfg(feature = "sha256")]
658            FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
659        }
660    }
661}
662
663impl Fingerprint {
664    /// Loads the 32-bit Schema Registry fingerprint (Confluent Schema Registry ID).
665    ///
666    /// The provided `id` is in big-endian wire order; this converts it to host order
667    /// and returns `Fingerprint::Id`.
668    ///
669    /// # Returns
670    /// A `Fingerprint::Id` variant containing the 32-bit fingerprint.
671    pub fn load_fingerprint_id(id: u32) -> Self {
672        Fingerprint::Id(u32::from_be(id))
673    }
674
675    /// Loads the 64-bit Schema Registry fingerprint (Apicurio Schema Registry ID).
676    ///
677    /// The provided `id` is in big-endian wire order; this converts it to host order
678    /// and returns `Fingerprint::Id64`.
679    ///
680    /// # Returns
681    /// A `Fingerprint::Id64` variant containing the 64-bit fingerprint.
682    pub fn load_fingerprint_id64(id: u64) -> Self {
683        Fingerprint::Id64(u64::from_be(id))
684    }
685
686    /// Constructs a serialized prefix represented as a `Vec<u8>` based on the variant of the enum.
687    ///
688    /// This method serializes data in different formats depending on the variant of `self`:
689    /// - **`Id(id)`**: Uses the Confluent wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
690    ///   followed by the big-endian byte representation of the `id`.
691    /// - **`Id64(id)`**: Uses the Apicurio wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
692    ///   followed by the big-endian 8-byte representation of the `id`.
693    /// - **`Rabin(val)`**: Uses the Avro single-object specification format. This includes a different magic header
694    ///   (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte representation of the `val`.
695    /// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard extension that adds the
696    ///   `SINGLE_OBJECT_MAGIC` header followed by the provided `bytes`.
697    /// - **`SHA256(bytes)`** (optional, `sha256` feature enabled): Similar to the `MD5` variant, this is
698    ///   a non-standard extension that attaches the `SINGLE_OBJECT_MAGIC` header followed by the given `bytes`.
699    ///
700    /// # Returns
701    ///
702    /// A `Prefix` containing the serialized prefix data.
703    ///
704    /// # Features
705    ///
706    /// - You can optionally enable the `md5` feature to include the `MD5` variant.
707    /// - You can optionally enable the `sha256` feature to include the `SHA256` variant.
708    ///
709    pub(crate) fn make_prefix(&self) -> Prefix {
710        let mut buf = [0u8; MAX_PREFIX_LEN];
711        let len = match self {
712            Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
713            Self::Id64(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
714            Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
715            #[cfg(feature = "md5")]
716            Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
717            #[cfg(feature = "sha256")]
718            Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
719        };
720        Prefix { buf, len }
721    }
722}
723
724fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
725    buf: &mut [u8; MAX_PREFIX_LEN],
726    magic: &[u8; MAGIC_LEN],
727    payload: &[u8; PAYLOAD_LEN],
728) -> u8 {
729    debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
730    let total = MAGIC_LEN + PAYLOAD_LEN;
731    let prefix_slice = &mut buf[..total];
732    prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
733    prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
734    total as u8
735}
736
737/// An in-memory cache of Avro schemas, indexed by their fingerprint.
738///
739/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently.
740/// Each schema is associated with a unique [`Fingerprint`], which is generated based
741/// on the schema's canonical form and a specific hashing algorithm.
742///
743/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin,
744/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations.
745/// This ensures consistency when generating fingerprints and looking up schemas.
746/// All schemas registered will have their fingerprint computed with this algorithm, and
747/// lookups must use a matching fingerprint.
748///
749/// # Examples
750///
751/// ```no_run
752/// // Create a new store with the default Rabin fingerprinting.
753/// use arrow_avro::schema::{AvroSchema, SchemaStore};
754///
755/// let mut store = SchemaStore::new();
756/// let schema = AvroSchema::new("\"string\"".to_string());
757/// // Register the schema to get its fingerprint.
758/// let fingerprint = store.register(schema.clone()).unwrap();
759/// // Use the fingerprint to look up the schema.
760/// let retrieved_schema = store.lookup(&fingerprint).cloned();
761/// assert_eq!(retrieved_schema, Some(schema));
762/// ```
763#[derive(Debug, Clone, Default)]
764pub struct SchemaStore {
765    /// The hashing algorithm used for generating fingerprints.
766    fingerprint_algorithm: FingerprintAlgorithm,
767    /// A map from a schema's fingerprint to the schema itself.
768    schemas: HashMap<Fingerprint, AvroSchema>,
769}
770
771impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
772    type Error = ArrowError;
773
774    /// Creates a `SchemaStore` from a HashMap of schemas.
775    /// Each schema in the HashMap is registered with the new store.
776    fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
777        Ok(Self {
778            schemas,
779            ..Self::default()
780        })
781    }
782}
783
784impl SchemaStore {
785    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
786    pub fn new() -> Self {
787        Self::default()
788    }
789
790    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
791    pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
792        Self {
793            fingerprint_algorithm,
794            ..Self::default()
795        }
796    }
797
798    /// Registers a schema with the store and the provided fingerprint.
799    /// Note: Confluent wire format implementations should leverage this method.
800    ///
801    /// A schema is set in the store, using the provided fingerprint. If a schema
802    /// with the same fingerprint does not already exist in the store, the new schema
803    /// is inserted. If the fingerprint already exists, the existing schema is not overwritten.
804    ///
805    /// # Arguments
806    ///
807    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to register.
808    /// * `schema` - The `AvroSchema` to register.
809    ///
810    /// # Returns
811    ///
812    /// A `Result` returning the provided `Fingerprint` of the schema if successful,
813    /// or an `ArrowError` on failure.
814    pub fn set(
815        &mut self,
816        fingerprint: Fingerprint,
817        schema: AvroSchema,
818    ) -> Result<Fingerprint, ArrowError> {
819        match self.schemas.entry(fingerprint) {
820            Entry::Occupied(entry) => {
821                if entry.get() != &schema {
822                    return Err(ArrowError::ComputeError(format!(
823                        "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
824                    )));
825                }
826            }
827            Entry::Vacant(entry) => {
828                entry.insert(schema);
829            }
830        }
831        Ok(fingerprint)
832    }
833
834    /// Registers a schema with the store and returns its fingerprint.
835    ///
836    /// A fingerprint is calculated for the given schema using the store's configured
837    /// hash type. If a schema with the same fingerprint does not already exist in the
838    /// store, the new schema is inserted. If the fingerprint already exists, the
839    /// existing schema is not overwritten. If FingerprintAlgorithm is set to Id or Id64, this
840    /// method will return an error. Confluent wire format implementations should leverage the
841    /// set method instead.
842    ///
843    /// # Arguments
844    ///
845    /// * `schema` - The `AvroSchema` to register.
846    ///
847    /// # Returns
848    ///
849    /// A `Result` containing the `Fingerprint` of the schema if successful,
850    /// or an `ArrowError` on failure.
851    pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
852        if self.fingerprint_algorithm == FingerprintAlgorithm::Id
853            || self.fingerprint_algorithm == FingerprintAlgorithm::Id64
854        {
855            return Err(ArrowError::SchemaError(
856                "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
857            Use the set method directly instead, providing a valid fingerprint"
858                    .to_string(),
859            ));
860        }
861        let fingerprint =
862            AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
863        self.set(fingerprint, schema)?;
864        Ok(fingerprint)
865    }
866
867    /// Looks up a schema by its `Fingerprint`.
868    ///
869    /// # Arguments
870    ///
871    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to look up.
872    ///
873    /// # Returns
874    ///
875    /// An `Option` containing a clone of the `AvroSchema` if found, otherwise `None`.
876    pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
877        self.schemas.get(fingerprint)
878    }
879
880    /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently
881    /// held by this [`SchemaStore`].
882    ///
883    /// The order of the returned fingerprints is unspecified and should not be
884    /// relied upon.
885    pub fn fingerprints(&self) -> Vec<Fingerprint> {
886        self.schemas.keys().copied().collect()
887    }
888
889    /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting.
890    pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
891        self.fingerprint_algorithm
892    }
893}
894
895fn quote(s: &str) -> Result<String, ArrowError> {
896    serde_json::to_string(s)
897        .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
898}
899
900// Avro names are defined by a `name` and an optional `namespace`.
901// The full name is composed of the namespace and the name, separated by a dot.
902//
903// Avro specification defines two ways to specify a full name:
904// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
905//    In this case, the `namespace` attribute is ignored.
906// 2. The `name` attribute contains the simple name (e.g., "d") and the
907//    `namespace` attribute contains the namespace (e.g., "a.b.c").
908//
909// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
910// Complex paths with quotes or backticks like `a."hi".b` are not supported.
911//
912// This function constructs the full name and extracts the namespace,
913// handling both ways of specifying the name. It prioritizes a namespace
914// defined within the `name` attribute itself, then the explicit `namespace_attr`,
915// and finally the `enclosing_ns`.
916pub(crate) fn make_full_name(
917    name: &str,
918    namespace_attr: Option<&str>,
919    enclosing_ns: Option<&str>,
920) -> (String, Option<String>) {
921    // `name` already contains a dot then treat as full-name, ignore namespace.
922    if let Some((ns, _)) = name.rsplit_once('.') {
923        return (name.to_string(), Some(ns.to_string()));
924    }
925    match namespace_attr.or(enclosing_ns) {
926        Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
927        None => (name.to_string(), None),
928    }
929}
930
931fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
932    Ok(match schema {
933        Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
934            TypeName::Primitive(pt) => quote(pt.as_ref())?,
935            TypeName::Ref(name) => {
936                let (full_name, _) = make_full_name(name, None, enclosing_ns);
937                quote(&full_name)?
938            }
939        },
940        Schema::Union(branches) => format!(
941            "[{}]",
942            branches
943                .iter()
944                .map(|b| build_canonical(b, enclosing_ns))
945                .collect::<Result<Vec<_>, _>>()?
946                .join(",")
947        ),
948        Schema::Complex(ct) => match ct {
949            ComplexType::Record(r) => {
950                let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
951                let fields = r
952                    .fields
953                    .iter()
954                    .map(|f| {
955                        // PCF [STRIP] per Avro spec: keep only attributes relevant to parsing
956                        // ("name" and "type" for fields) and **strip others** such as doc,
957                        // default, order, and **aliases**. This preserves canonicalization. See:
958                        // https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas
959                        let field_type =
960                            build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
961                        Ok(format!(
962                            r#"{{"name":{},"type":{}}}"#,
963                            quote(f.name)?,
964                            field_type
965                        ))
966                    })
967                    .collect::<Result<Vec<_>, ArrowError>>()?
968                    .join(",");
969                format!(
970                    r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
971                    quote(&full_name)?,
972                )
973            }
974            ComplexType::Enum(e) => {
975                let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
976                let symbols = e
977                    .symbols
978                    .iter()
979                    .map(|s| quote(s))
980                    .collect::<Result<Vec<_>, _>>()?
981                    .join(",");
982                format!(
983                    r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
984                    quote(&full_name)?
985                )
986            }
987            ComplexType::Array(arr) => format!(
988                r#"{{"type":"array","items":{}}}"#,
989                build_canonical(&arr.items, enclosing_ns)?
990            ),
991            ComplexType::Map(map) => format!(
992                r#"{{"type":"map","values":{}}}"#,
993                build_canonical(&map.values, enclosing_ns)?
994            ),
995            ComplexType::Fixed(f) => {
996                let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
997                format!(
998                    r#"{{"name":{},"type":"fixed","size":{}}}"#,
999                    quote(&full_name)?,
1000                    f.size
1001                )
1002            }
1003        },
1004    })
1005}
1006
1007/// 64‑bit Rabin fingerprint as described in the Avro spec.
1008const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
1009
1010/// Build one entry of the polynomial‑division table.
1011///
1012/// We cannot yet write `for _ in 0..8` here: `for` loops rely on
1013/// `Iterator::next`, which is not `const` on stable Rust.  Until the
1014/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
1015/// loop is the only option in a `const fn`
1016const fn one_entry(i: usize) -> u64 {
1017    let mut fp = i as u64;
1018    let mut j = 0;
1019    while j < 8 {
1020        fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
1021        j += 1;
1022    }
1023    fp
1024}
1025
1026/// Build the full 256‑entry table at compile time.
1027///
1028/// We cannot yet write `for _ in 0..256` here: `for` loops rely on
1029/// `Iterator::next`, which is not `const` on stable Rust.  Until the
1030/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
1031/// loop is the only option in a `const fn`
1032const fn build_table() -> [u64; 256] {
1033    let mut table = [0u64; 256];
1034    let mut i = 0;
1035    while i < 256 {
1036        table[i] = one_entry(i);
1037        i += 1;
1038    }
1039    table
1040}
1041
1042/// The pre‑computed table.
1043static FINGERPRINT_TABLE: [u64; 256] = build_table();
1044
1045/// Computes the 64-bit Rabin fingerprint for a given canonical schema string.
1046/// This implementation is based on the Avro specification for schema fingerprinting.
1047pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1048    let mut fp = EMPTY;
1049    for &byte in canonical_form.as_bytes() {
1050        let idx = ((fp as u8) ^ byte) as usize;
1051        fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1052    }
1053    fp
1054}
1055
1056#[cfg(feature = "md5")]
1057/// Compute the **128‑bit MD5** fingerprint of the canonical form.
1058///
1059/// Returns a 16‑byte array (`[u8; 16]`) containing the full MD5 digest,
1060/// exactly as required by the Avro specification.
1061#[inline]
1062pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1063    let digest = md5::compute(canonical_form.as_bytes());
1064    digest.0
1065}
1066
1067#[cfg(feature = "sha256")]
1068/// Compute the **256‑bit SHA‑256** fingerprint of the canonical form.
1069///
1070/// Returns a 32‑byte array (`[u8; 32]`) containing the full SHA‑256 digest.
1071#[inline]
1072pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1073    let mut hasher = Sha256::new();
1074    hasher.update(canonical_form.as_bytes());
1075    let digest = hasher.finalize();
1076    digest.into()
1077}
1078
1079#[inline]
1080fn is_internal_arrow_key(key: &str) -> bool {
1081    key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1082}
1083
1084/// Copies Arrow schema metadata entries to the provided JSON map,
1085/// skipping keys that are Avro-reserved, internal Arrow keys, or
1086/// nested under the `avro.schema.` namespace. Values that parse as
1087/// JSON are inserted as JSON; otherwise the raw string is preserved.
1088fn extend_with_passthrough_metadata(
1089    target: &mut JsonMap<String, Value>,
1090    metadata: &HashMap<String, String>,
1091) {
1092    for (meta_key, meta_val) in metadata {
1093        if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1094            continue;
1095        }
1096        let json_val =
1097            serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1098        target.insert(meta_key.clone(), json_val);
1099    }
1100}
1101
1102// Sanitize an arbitrary string so it is a valid Avro field or type name
1103fn sanitise_avro_name(base_name: &str) -> String {
1104    if base_name.is_empty() {
1105        return "_".to_owned();
1106    }
1107    let mut out: String = base_name
1108        .chars()
1109        .map(|char| {
1110            if char.is_ascii_alphanumeric() || char == '_' {
1111                char
1112            } else {
1113                '_'
1114            }
1115        })
1116        .collect();
1117    if out.as_bytes()[0].is_ascii_digit() {
1118        out.insert(0, '_');
1119    }
1120    out
1121}
1122
1123#[derive(Default)]
1124struct NameGenerator {
1125    used: HashSet<String>,
1126    counters: HashMap<String, usize>,
1127}
1128
1129impl NameGenerator {
1130    fn make_unique(&mut self, field_name: &str) -> String {
1131        let field_name = sanitise_avro_name(field_name);
1132        if self.used.insert(field_name.clone()) {
1133            self.counters.insert(field_name.clone(), 1);
1134            return field_name;
1135        }
1136        let counter = self.counters.entry(field_name.clone()).or_insert(1);
1137        loop {
1138            let candidate = format!("{field_name}_{}", *counter);
1139            if self.used.insert(candidate.clone()) {
1140                return candidate;
1141            }
1142            *counter += 1;
1143        }
1144    }
1145}
1146
1147fn merge_extras(schema: Value, extras: JsonMap<String, Value>) -> Value {
1148    if extras.is_empty() {
1149        return schema;
1150    }
1151    match schema {
1152        Value::Object(mut map) => {
1153            map.extend(extras);
1154            Value::Object(map)
1155        }
1156        Value::Array(mut union) => {
1157            // For unions, we cannot attach attributes to the array itself (per Avro spec).
1158            // As a fallback for extension metadata, attach extras to the first non-null branch object.
1159            if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1160                let original = std::mem::take(non_null);
1161                *non_null = merge_extras(original, extras);
1162            }
1163            Value::Array(union)
1164        }
1165        primitive => {
1166            let mut map = JsonMap::with_capacity(extras.len() + 1);
1167            map.insert("type".into(), primitive);
1168            map.extend(extras);
1169            Value::Object(map)
1170        }
1171    }
1172}
1173
1174#[inline]
1175fn is_avro_json_null(v: &Value) -> bool {
1176    matches!(v, Value::String(s) if s == "null")
1177}
1178
1179fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1180    let null = Value::String("null".into());
1181    match inner {
1182        Value::Array(mut union) => {
1183            // If this site is already a union and already contains "null",
1184            // preserve the branch order exactly. Reordering "null" breaks
1185            // the correspondence between Arrow union child order (type_ids)
1186            // and the Avro branch index written on the wire.
1187            if union.iter().any(is_avro_json_null) {
1188                return Value::Array(union);
1189            }
1190            // Otherwise, inject "null" without reordering existing branches.
1191            match null_order {
1192                Nullability::NullFirst => union.insert(0, null),
1193                Nullability::NullSecond => union.push(null),
1194            }
1195            Value::Array(union)
1196        }
1197        other => match null_order {
1198            Nullability::NullFirst => Value::Array(vec![null, other]),
1199            Nullability::NullSecond => Value::Array(vec![other, null]),
1200        },
1201    }
1202}
1203
1204fn min_fixed_bytes_for_precision(p: usize) -> usize {
1205    // From the spec: max precision for n=1..=32 bytes:
1206    // [2,4,6,9,11,14,16,18,21,23,26,28,31,33,35,38,40,43,45,47,50,52,55,57,59,62,64,67,69,71,74,76]
1207    const MAX_P: [usize; 32] = [
1208        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1209        59, 62, 64, 67, 69, 71, 74, 76,
1210    ];
1211    for (i, &max_p) in MAX_P.iter().enumerate() {
1212        if p <= max_p {
1213            return i + 1;
1214        }
1215    }
1216    32 // saturate at Decimal256
1217}
1218
1219fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1220    match branch {
1221        Value::String(t) => Ok(format!("P:{t}")),
1222        Value::Object(map) => {
1223            let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1224                ArrowError::SchemaError("Union branch object missing string 'type'".into())
1225            })?;
1226            match t {
1227                "record" | "enum" | "fixed" => {
1228                    let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1229                        ArrowError::SchemaError(format!(
1230                            "Union branch '{t}' missing required 'name'"
1231                        ))
1232                    })?;
1233                    Ok(format!("N:{t}:{name}"))
1234                }
1235                "array" | "map" => Ok(format!("C:{t}")),
1236                other => Ok(format!("P:{other}")),
1237            }
1238        }
1239        Value::Array(_) => Err(ArrowError::SchemaError(
1240            "Avro union may not immediately contain another union".into(),
1241        )),
1242        _ => Err(ArrowError::SchemaError(
1243            "Invalid JSON for Avro union branch".into(),
1244        )),
1245    }
1246}
1247
1248fn datatype_to_avro(
1249    dt: &DataType,
1250    field_name: &str,
1251    metadata: &HashMap<String, String>,
1252    name_gen: &mut NameGenerator,
1253    null_order: Nullability,
1254    strip: bool,
1255) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1256    let mut extras = JsonMap::new();
1257    let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1258        if *scale < 0 {
1259            return Err(ArrowError::SchemaError(format!(
1260                "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1261            )));
1262        }
1263        if (*scale as usize) > (*precision as usize) {
1264            return Err(ArrowError::SchemaError(format!(
1265                "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1266                 must be <= precision ({precision})"
1267            )));
1268        }
1269        let mut meta = JsonMap::from_iter([
1270            ("logicalType".into(), json!("decimal")),
1271            ("precision".into(), json!(*precision)),
1272            ("scale".into(), json!(*scale)),
1273        ]);
1274        let mut fixed_size = metadata.get("size").and_then(|v| v.parse::<usize>().ok());
1275        let carries_name = metadata.contains_key(AVRO_NAME_METADATA_KEY)
1276            || metadata.contains_key(AVRO_NAMESPACE_METADATA_KEY);
1277        if fixed_size.is_none() && carries_name {
1278            fixed_size = Some(min_fixed_bytes_for_precision(*precision as usize));
1279        }
1280        if let Some(size) = fixed_size {
1281            meta.insert("type".into(), json!("fixed"));
1282            meta.insert("size".into(), json!(size));
1283            let chosen_name = metadata
1284                .get(AVRO_NAME_METADATA_KEY)
1285                .map(|s| sanitise_avro_name(s))
1286                .unwrap_or_else(|| name_gen.make_unique(field_name));
1287            meta.insert("name".into(), json!(chosen_name));
1288            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1289                meta.insert("namespace".into(), json!(ns));
1290            }
1291        } else {
1292            // default to bytes-backed decimal
1293            meta.insert("type".into(), json!("bytes"));
1294        }
1295        Ok(Value::Object(meta))
1296    };
1297    let val = match dt {
1298        DataType::Null => Value::String("null".into()),
1299        DataType::Boolean => Value::String("boolean".into()),
1300        DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 | DataType::Int32 => {
1301            Value::String("int".into())
1302        }
1303        DataType::UInt32 | DataType::Int64 | DataType::UInt64 => Value::String("long".into()),
1304        DataType::Float16 | DataType::Float32 => Value::String("float".into()),
1305        DataType::Float64 => Value::String("double".into()),
1306        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1307        DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1308        DataType::BinaryView => {
1309            if !strip {
1310                extras.insert("arrowBinaryView".into(), Value::Bool(true));
1311            }
1312            Value::String("bytes".into())
1313        }
1314        DataType::FixedSizeBinary(len) => {
1315            let md_is_uuid = metadata
1316                .get("logicalType")
1317                .map(|s| s.trim_matches('"') == "uuid")
1318                .unwrap_or(false);
1319            #[cfg(feature = "canonical_extension_types")]
1320            let ext_is_uuid = metadata
1321                .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
1322                .map(|v| v == arrow_schema::extension::Uuid::NAME || v == "uuid")
1323                .unwrap_or(false);
1324            #[cfg(not(feature = "canonical_extension_types"))]
1325            let ext_is_uuid = false;
1326            let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
1327            if is_uuid {
1328                json!({ "type": "string", "logicalType": "uuid" })
1329            } else {
1330                let chosen_name = metadata
1331                    .get(AVRO_NAME_METADATA_KEY)
1332                    .map(|s| sanitise_avro_name(s))
1333                    .unwrap_or_else(|| name_gen.make_unique(field_name));
1334                let mut obj = JsonMap::from_iter([
1335                    ("type".into(), json!("fixed")),
1336                    ("name".into(), json!(chosen_name)),
1337                    ("size".into(), json!(len)),
1338                ]);
1339                if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1340                    obj.insert("namespace".into(), json!(ns));
1341                }
1342                Value::Object(obj)
1343            }
1344        }
1345        #[cfg(feature = "small_decimals")]
1346        DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1347            handle_decimal(precision, scale)?
1348        }
1349        DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1350            handle_decimal(precision, scale)?
1351        }
1352        DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1353        DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1354        DataType::Time32(unit) => match unit {
1355            TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1356            TimeUnit::Second => {
1357                if !strip {
1358                    extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1359                }
1360                Value::String("int".into())
1361            }
1362            _ => Value::String("int".into()),
1363        },
1364        DataType::Time64(unit) => match unit {
1365            TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1366            TimeUnit::Nanosecond => {
1367                if !strip {
1368                    extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1369                }
1370                Value::String("long".into())
1371            }
1372            _ => Value::String("long".into()),
1373        },
1374        DataType::Timestamp(unit, tz) => {
1375            let logical_type = match (unit, tz.is_some()) {
1376                (TimeUnit::Millisecond, true) => "timestamp-millis",
1377                (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1378                (TimeUnit::Microsecond, true) => "timestamp-micros",
1379                (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1380                (TimeUnit::Nanosecond, true) => "timestamp-nanos",
1381                (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
1382                (TimeUnit::Second, _) => {
1383                    if !strip {
1384                        extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1385                    }
1386                    return Ok((Value::String("long".into()), extras));
1387                }
1388            };
1389            if !strip && matches!(unit, TimeUnit::Nanosecond) {
1390                extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1391            }
1392            json!({ "type": "long", "logicalType": logical_type })
1393        }
1394        #[cfg(not(feature = "avro_custom_types"))]
1395        DataType::Duration(_unit) => Value::String("long".into()),
1396        #[cfg(feature = "avro_custom_types")]
1397        DataType::Duration(unit) => {
1398            // When the feature is enabled, create an Avro schema object
1399            // with the correct `logicalType` annotation.
1400            let logical_type = match unit {
1401                TimeUnit::Second => "arrow.duration-seconds",
1402                TimeUnit::Millisecond => "arrow.duration-millis",
1403                TimeUnit::Microsecond => "arrow.duration-micros",
1404                TimeUnit::Nanosecond => "arrow.duration-nanos",
1405            };
1406            json!({ "type": "long", "logicalType": logical_type })
1407        }
1408        DataType::Interval(IntervalUnit::MonthDayNano) => {
1409            // Avro duration logical type: fixed(12) with months/days/millis per spec.
1410            let chosen_name = metadata
1411                .get(AVRO_NAME_METADATA_KEY)
1412                .map(|s| sanitise_avro_name(s))
1413                .unwrap_or_else(|| name_gen.make_unique(field_name));
1414            let mut obj = JsonMap::from_iter([
1415                ("type".into(), json!("fixed")),
1416                ("name".into(), json!(chosen_name)),
1417                ("size".into(), json!(12)),
1418                ("logicalType".into(), json!("duration")),
1419            ]);
1420            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1421                obj.insert("namespace".into(), json!(ns));
1422            }
1423            json!(obj)
1424        }
1425        DataType::Interval(IntervalUnit::YearMonth) => {
1426            if !strip {
1427                extras.insert(
1428                    "arrowIntervalUnit".into(),
1429                    Value::String("yearmonth".into()),
1430                );
1431            }
1432            Value::String("long".into())
1433        }
1434        DataType::Interval(IntervalUnit::DayTime) => {
1435            if !strip {
1436                extras.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1437            }
1438            Value::String("long".into())
1439        }
1440        DataType::List(child) | DataType::LargeList(child) => {
1441            if matches!(dt, DataType::LargeList(_)) && !strip {
1442                extras.insert("arrowLargeList".into(), Value::Bool(true));
1443            }
1444            let items_schema = process_datatype(
1445                child.data_type(),
1446                child.name(),
1447                child.metadata(),
1448                name_gen,
1449                null_order,
1450                child.is_nullable(),
1451                strip,
1452            )?;
1453            json!({
1454                "type": "array",
1455                "items": items_schema
1456            })
1457        }
1458        DataType::ListView(child) | DataType::LargeListView(child) => {
1459            if matches!(dt, DataType::LargeListView(_)) && !strip {
1460                extras.insert("arrowLargeList".into(), Value::Bool(true));
1461            }
1462            if !strip {
1463                extras.insert("arrowListView".into(), Value::Bool(true));
1464            }
1465            let items_schema = process_datatype(
1466                child.data_type(),
1467                child.name(),
1468                child.metadata(),
1469                name_gen,
1470                null_order,
1471                child.is_nullable(),
1472                strip,
1473            )?;
1474            json!({
1475                "type": "array",
1476                "items": items_schema
1477            })
1478        }
1479        DataType::FixedSizeList(child, len) => {
1480            if !strip {
1481                extras.insert("arrowFixedSize".into(), json!(len));
1482            }
1483            let items_schema = process_datatype(
1484                child.data_type(),
1485                child.name(),
1486                child.metadata(),
1487                name_gen,
1488                null_order,
1489                child.is_nullable(),
1490                strip,
1491            )?;
1492            json!({
1493                "type": "array",
1494                "items": items_schema
1495            })
1496        }
1497        DataType::Map(entries, _) => {
1498            let value_field = match entries.data_type() {
1499                DataType::Struct(fs) => &fs[1],
1500                _ => {
1501                    return Err(ArrowError::SchemaError(
1502                        "Map 'entries' field must be Struct(key,value)".into(),
1503                    ));
1504                }
1505            };
1506            let values_schema = process_datatype(
1507                value_field.data_type(),
1508                value_field.name(),
1509                value_field.metadata(),
1510                name_gen,
1511                null_order,
1512                value_field.is_nullable(),
1513                strip,
1514            )?;
1515            json!({
1516                "type": "map",
1517                "values": values_schema
1518            })
1519        }
1520        DataType::Struct(fields) => {
1521            let avro_fields = fields
1522                .iter()
1523                .map(|field| arrow_field_to_avro(field, name_gen, null_order, strip))
1524                .collect::<Result<Vec<_>, _>>()?;
1525            // Prefer avro.name/avro.namespace when provided on the struct field metadata
1526            let chosen_name = metadata
1527                .get(AVRO_NAME_METADATA_KEY)
1528                .map(|s| sanitise_avro_name(s))
1529                .unwrap_or_else(|| name_gen.make_unique(field_name));
1530            let mut obj = JsonMap::from_iter([
1531                ("type".into(), json!("record")),
1532                ("name".into(), json!(chosen_name)),
1533                ("fields".into(), Value::Array(avro_fields)),
1534            ]);
1535            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1536                obj.insert("namespace".into(), json!(ns));
1537            }
1538            Value::Object(obj)
1539        }
1540        DataType::Dictionary(_, value) => {
1541            if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1542                let symbols: Vec<&str> =
1543                    serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1544                // Prefer avro.name/namespace when provided for enums
1545                let chosen_name = metadata
1546                    .get(AVRO_NAME_METADATA_KEY)
1547                    .map(|s| sanitise_avro_name(s))
1548                    .unwrap_or_else(|| name_gen.make_unique(field_name));
1549                let mut obj = JsonMap::from_iter([
1550                    ("type".into(), json!("enum")),
1551                    ("name".into(), json!(chosen_name)),
1552                    ("symbols".into(), json!(symbols)),
1553                ]);
1554                if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1555                    obj.insert("namespace".into(), json!(ns));
1556                }
1557                Value::Object(obj)
1558            } else {
1559                process_datatype(
1560                    value.as_ref(),
1561                    field_name,
1562                    metadata,
1563                    name_gen,
1564                    null_order,
1565                    false,
1566                    strip,
1567                )?
1568            }
1569        }
1570        #[cfg(feature = "avro_custom_types")]
1571        DataType::RunEndEncoded(run_ends, values) => {
1572            let bits = match run_ends.data_type() {
1573                DataType::Int16 => 16,
1574                DataType::Int32 => 32,
1575                DataType::Int64 => 64,
1576                other => {
1577                    return Err(ArrowError::SchemaError(format!(
1578                        "RunEndEncoded requires Int16/Int32/Int64 for run_ends, found: {other:?}"
1579                    )));
1580                }
1581            };
1582            // Build the value site schema, preserving its own nullability
1583            let (value_schema, value_extras) = datatype_to_avro(
1584                values.data_type(),
1585                values.name(),
1586                values.metadata(),
1587                name_gen,
1588                null_order,
1589                strip,
1590            )?;
1591            let mut merged = merge_extras(value_schema, value_extras);
1592            if values.is_nullable() {
1593                merged = wrap_nullable(merged, null_order);
1594            }
1595            let mut extras = JsonMap::new();
1596            extras.insert("logicalType".into(), json!("arrow.run-end-encoded"));
1597            extras.insert("arrow.runEndIndexBits".into(), json!(bits));
1598            return Ok((merged, extras));
1599        }
1600        #[cfg(not(feature = "avro_custom_types"))]
1601        DataType::RunEndEncoded(_run_ends, values) => {
1602            let (value_schema, _extras) = datatype_to_avro(
1603                values.data_type(),
1604                values.name(),
1605                values.metadata(),
1606                name_gen,
1607                null_order,
1608                strip,
1609            )?;
1610            return Ok((value_schema, JsonMap::new()));
1611        }
1612        DataType::Union(fields, mode) => {
1613            let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1614            let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1615            for (type_id, field_ref) in fields.iter() {
1616                // NOTE: `process_datatype` would wrap nullability; force is_nullable=false here.
1617                let (branch_schema, _branch_extras) = datatype_to_avro(
1618                    field_ref.data_type(),
1619                    field_ref.name(),
1620                    field_ref.metadata(),
1621                    name_gen,
1622                    null_order,
1623                    strip,
1624                )?;
1625                // Avro unions cannot immediately contain another union
1626                if matches!(branch_schema, Value::Array(_)) {
1627                    return Err(ArrowError::SchemaError(
1628                        "Avro union may not immediately contain another union".into(),
1629                    ));
1630                }
1631                branches.push(branch_schema);
1632                type_ids.push(type_id as i32);
1633            }
1634            let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1635            for b in &branches {
1636                let sig = union_branch_signature(b)?;
1637                if !seen.insert(sig) {
1638                    return Err(ArrowError::SchemaError(
1639                        "Avro union contains duplicate branch types (disallowed by spec)".into(),
1640                    ));
1641                }
1642            }
1643            if !strip {
1644                extras.insert(
1645                    "arrowUnionMode".into(),
1646                    Value::String(
1647                        match mode {
1648                            UnionMode::Sparse => "sparse",
1649                            UnionMode::Dense => "dense",
1650                        }
1651                        .to_string(),
1652                    ),
1653                );
1654                extras.insert(
1655                    "arrowUnionTypeIds".into(),
1656                    Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1657                );
1658            }
1659            Value::Array(branches)
1660        }
1661        #[cfg(not(feature = "small_decimals"))]
1662        other => {
1663            return Err(ArrowError::NotYetImplemented(format!(
1664                "Arrow type {other:?} has no Avro representation"
1665            )));
1666        }
1667    };
1668    Ok((val, extras))
1669}
1670
1671fn process_datatype(
1672    dt: &DataType,
1673    field_name: &str,
1674    metadata: &HashMap<String, String>,
1675    name_gen: &mut NameGenerator,
1676    null_order: Nullability,
1677    is_nullable: bool,
1678    strip: bool,
1679) -> Result<Value, ArrowError> {
1680    let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order, strip)?;
1681    let mut merged = merge_extras(schema, extras);
1682    if is_nullable {
1683        merged = wrap_nullable(merged, null_order)
1684    }
1685    Ok(merged)
1686}
1687
1688fn arrow_field_to_avro(
1689    field: &ArrowField,
1690    name_gen: &mut NameGenerator,
1691    null_order: Nullability,
1692    strip: bool,
1693) -> Result<Value, ArrowError> {
1694    let avro_name = sanitise_avro_name(field.name());
1695    let schema_value = process_datatype(
1696        field.data_type(),
1697        &avro_name,
1698        field.metadata(),
1699        name_gen,
1700        null_order,
1701        field.is_nullable(),
1702        strip,
1703    )?;
1704    // Build the field map
1705    let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1706    map.insert("name".into(), Value::String(avro_name));
1707    map.insert("type".into(), schema_value);
1708    // Transfer selected metadata
1709    for (meta_key, meta_val) in field.metadata() {
1710        if is_internal_arrow_key(meta_key) {
1711            continue;
1712        }
1713        match meta_key.as_str() {
1714            AVRO_DOC_METADATA_KEY => {
1715                map.insert("doc".into(), Value::String(meta_val.clone()));
1716            }
1717            AVRO_FIELD_DEFAULT_METADATA_KEY => {
1718                let default_value = serde_json::from_str(meta_val)
1719                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1720                map.insert("default".into(), default_value);
1721            }
1722            _ => {
1723                let json_val = serde_json::from_str(meta_val)
1724                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1725                map.insert(meta_key.clone(), json_val);
1726            }
1727        }
1728    }
1729    Ok(Value::Object(map))
1730}
1731
1732#[cfg(test)]
1733mod tests {
1734    use super::*;
1735    use crate::codec::{AvroField, AvroFieldBuilder};
1736    use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1737    use serde_json::json;
1738    use std::sync::Arc;
1739
1740    fn int_schema() -> Schema<'static> {
1741        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1742    }
1743
1744    fn record_schema() -> Schema<'static> {
1745        Schema::Complex(ComplexType::Record(Record {
1746            name: "record1",
1747            namespace: Some("test.namespace"),
1748            doc: Some(Cow::from("A test record")),
1749            aliases: vec![],
1750            fields: vec![
1751                Field {
1752                    name: "field1",
1753                    doc: Some(Cow::from("An integer field")),
1754                    r#type: int_schema(),
1755                    default: None,
1756                    aliases: vec![],
1757                },
1758                Field {
1759                    name: "field2",
1760                    doc: None,
1761                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1762                    default: None,
1763                    aliases: vec![],
1764                },
1765            ],
1766            attributes: Attributes::default(),
1767        }))
1768    }
1769
1770    fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
1771        let mut sb = SchemaBuilder::new();
1772        sb.push(field);
1773        sb.finish()
1774    }
1775
1776    fn assert_json_contains(avro_json: &str, needle: &str) {
1777        assert!(
1778            avro_json.contains(needle),
1779            "JSON did not contain `{needle}` : {avro_json}"
1780        )
1781    }
1782
1783    #[test]
1784    fn test_deserialize() {
1785        let t: Schema = serde_json::from_str("\"string\"").unwrap();
1786        assert_eq!(
1787            t,
1788            Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
1789        );
1790
1791        let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
1792        assert_eq!(
1793            t,
1794            Schema::Union(vec![
1795                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1796                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1797            ])
1798        );
1799
1800        let t: Type = serde_json::from_str(
1801            r#"{
1802                   "type":"long",
1803                   "logicalType":"timestamp-micros"
1804                }"#,
1805        )
1806        .unwrap();
1807
1808        let timestamp = Type {
1809            r#type: TypeName::Primitive(PrimitiveType::Long),
1810            attributes: Attributes {
1811                logical_type: Some("timestamp-micros"),
1812                additional: Default::default(),
1813            },
1814        };
1815
1816        assert_eq!(t, timestamp);
1817
1818        let t: ComplexType = serde_json::from_str(
1819            r#"{
1820                   "type":"fixed",
1821                   "name":"fixed",
1822                   "namespace":"topLevelRecord.value",
1823                   "size":11,
1824                   "logicalType":"decimal",
1825                   "precision":25,
1826                   "scale":2
1827                }"#,
1828        )
1829        .unwrap();
1830
1831        let decimal = ComplexType::Fixed(Fixed {
1832            name: "fixed",
1833            namespace: Some("topLevelRecord.value"),
1834            aliases: vec![],
1835            size: 11,
1836            attributes: Attributes {
1837                logical_type: Some("decimal"),
1838                additional: vec![("precision", json!(25)), ("scale", json!(2))]
1839                    .into_iter()
1840                    .collect(),
1841            },
1842        });
1843
1844        assert_eq!(t, decimal);
1845
1846        let schema: Schema = serde_json::from_str(
1847            r#"{
1848               "type":"record",
1849               "name":"topLevelRecord",
1850               "fields":[
1851                  {
1852                     "name":"value",
1853                     "type":[
1854                        {
1855                           "type":"fixed",
1856                           "name":"fixed",
1857                           "namespace":"topLevelRecord.value",
1858                           "size":11,
1859                           "logicalType":"decimal",
1860                           "precision":25,
1861                           "scale":2
1862                        },
1863                        "null"
1864                     ]
1865                  }
1866               ]
1867            }"#,
1868        )
1869        .unwrap();
1870
1871        assert_eq!(
1872            schema,
1873            Schema::Complex(ComplexType::Record(Record {
1874                name: "topLevelRecord",
1875                namespace: None,
1876                doc: None,
1877                aliases: vec![],
1878                fields: vec![Field {
1879                    name: "value",
1880                    doc: None,
1881                    r#type: Schema::Union(vec![
1882                        Schema::Complex(decimal),
1883                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1884                    ]),
1885                    default: None,
1886                    aliases: vec![],
1887                },],
1888                attributes: Default::default(),
1889            }))
1890        );
1891
1892        let schema: Schema = serde_json::from_str(
1893            r#"{
1894                  "type": "record",
1895                  "name": "LongList",
1896                  "aliases": ["LinkedLongs"],
1897                  "fields" : [
1898                    {"name": "value", "type": "long"},
1899                    {"name": "next", "type": ["null", "LongList"]}
1900                  ]
1901                }"#,
1902        )
1903        .unwrap();
1904
1905        assert_eq!(
1906            schema,
1907            Schema::Complex(ComplexType::Record(Record {
1908                name: "LongList",
1909                namespace: None,
1910                doc: None,
1911                aliases: vec!["LinkedLongs"],
1912                fields: vec![
1913                    Field {
1914                        name: "value",
1915                        doc: None,
1916                        r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
1917                        default: None,
1918                        aliases: vec![],
1919                    },
1920                    Field {
1921                        name: "next",
1922                        doc: None,
1923                        r#type: Schema::Union(vec![
1924                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1925                            Schema::TypeName(TypeName::Ref("LongList")),
1926                        ]),
1927                        default: None,
1928                        aliases: vec![],
1929                    }
1930                ],
1931                attributes: Attributes::default(),
1932            }))
1933        );
1934
1935        // Recursive schema are not supported
1936        let err = AvroField::try_from(&schema).unwrap_err().to_string();
1937        assert_eq!(err, "Parser error: Failed to resolve .LongList");
1938
1939        let schema: Schema = serde_json::from_str(
1940            r#"{
1941               "type":"record",
1942               "name":"topLevelRecord",
1943               "fields":[
1944                  {
1945                     "name":"id",
1946                     "type":[
1947                        "int",
1948                        "null"
1949                     ]
1950                  },
1951                  {
1952                     "name":"timestamp_col",
1953                     "type":[
1954                        {
1955                           "type":"long",
1956                           "logicalType":"timestamp-micros"
1957                        },
1958                        "null"
1959                     ]
1960                  }
1961               ]
1962            }"#,
1963        )
1964        .unwrap();
1965
1966        assert_eq!(
1967            schema,
1968            Schema::Complex(ComplexType::Record(Record {
1969                name: "topLevelRecord",
1970                namespace: None,
1971                doc: None,
1972                aliases: vec![],
1973                fields: vec![
1974                    Field {
1975                        name: "id",
1976                        doc: None,
1977                        r#type: Schema::Union(vec![
1978                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1979                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1980                        ]),
1981                        default: None,
1982                        aliases: vec![],
1983                    },
1984                    Field {
1985                        name: "timestamp_col",
1986                        doc: None,
1987                        r#type: Schema::Union(vec![
1988                            Schema::Type(timestamp),
1989                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1990                        ]),
1991                        default: None,
1992                        aliases: vec![],
1993                    }
1994                ],
1995                attributes: Default::default(),
1996            }))
1997        );
1998        let codec = AvroField::try_from(&schema).unwrap();
1999        let expected_arrow_field = arrow_schema::Field::new(
2000            "topLevelRecord",
2001            DataType::Struct(Fields::from(vec![
2002                arrow_schema::Field::new("id", DataType::Int32, true),
2003                arrow_schema::Field::new(
2004                    "timestamp_col",
2005                    DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2006                    true,
2007                ),
2008            ])),
2009            false,
2010        )
2011        .with_metadata(std::collections::HashMap::from([(
2012            AVRO_NAME_METADATA_KEY.to_string(),
2013            "topLevelRecord".to_string(),
2014        )]));
2015
2016        assert_eq!(codec.field(), expected_arrow_field);
2017
2018        let schema: Schema = serde_json::from_str(
2019            r#"{
2020                  "type": "record",
2021                  "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
2022                  "fields": [
2023                    {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
2024                    {"name": "clientProtocol", "type": ["null", "string"]},
2025                    {"name": "serverHash", "type": "MD5"},
2026                    {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
2027                  ]
2028            }"#,
2029        )
2030        .unwrap();
2031
2032        assert_eq!(
2033            schema,
2034            Schema::Complex(ComplexType::Record(Record {
2035                name: "HandshakeRequest",
2036                namespace: Some("org.apache.avro.ipc"),
2037                doc: None,
2038                aliases: vec![],
2039                fields: vec![
2040                    Field {
2041                        name: "clientHash",
2042                        doc: None,
2043                        r#type: Schema::Complex(ComplexType::Fixed(Fixed {
2044                            name: "MD5",
2045                            namespace: None,
2046                            aliases: vec![],
2047                            size: 16,
2048                            attributes: Default::default(),
2049                        })),
2050                        default: None,
2051                        aliases: vec![],
2052                    },
2053                    Field {
2054                        name: "clientProtocol",
2055                        doc: None,
2056                        r#type: Schema::Union(vec![
2057                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2058                            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2059                        ]),
2060                        default: None,
2061                        aliases: vec![],
2062                    },
2063                    Field {
2064                        name: "serverHash",
2065                        doc: None,
2066                        r#type: Schema::TypeName(TypeName::Ref("MD5")),
2067                        default: None,
2068                        aliases: vec![],
2069                    },
2070                    Field {
2071                        name: "meta",
2072                        doc: None,
2073                        r#type: Schema::Union(vec![
2074                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2075                            Schema::Complex(ComplexType::Map(Map {
2076                                values: Box::new(Schema::TypeName(TypeName::Primitive(
2077                                    PrimitiveType::Bytes
2078                                ))),
2079                                attributes: Default::default(),
2080                            })),
2081                        ]),
2082                        default: None,
2083                        aliases: vec![],
2084                    }
2085                ],
2086                attributes: Default::default(),
2087            }))
2088        );
2089    }
2090
2091    #[test]
2092    fn test_canonical_form_generation_comprehensive_record() {
2093        // NOTE: This schema is identical to the one used in test_deserialize_comprehensive.
2094        let json_str = r#"{
2095          "type": "record",
2096          "name": "E2eComprehensive",
2097          "namespace": "org.apache.arrow.avrotests.v1",
2098          "doc": "Comprehensive Avro writer schema to exercise arrow-avro Reader/Decoder paths.",
2099          "fields": [
2100            {"name": "id", "type": "long", "doc": "Primary row id", "aliases": ["identifier"]},
2101            {"name": "flag", "type": "boolean", "default": true, "doc": "A sample boolean with default true"},
2102            {"name": "ratio_f32", "type": "float", "default": 0.0, "doc": "Float32 example"},
2103            {"name": "ratio_f64", "type": "double", "default": 0.0, "doc": "Float64 example"},
2104            {"name": "count_i32", "type": "int", "default": 0, "doc": "Int32 example"},
2105            {"name": "count_i64", "type": "long", "default": 0, "doc": "Int64 example"},
2106            {"name": "opt_i32_nullfirst", "type": ["null", "int"], "default": null, "doc": "Nullable int (null-first)"},
2107            {"name": "opt_str_nullsecond", "type": ["string", "null"], "default": "", "aliases": ["old_opt_str"], "doc": "Nullable string (null-second). Default is empty string."},
2108            {"name": "tri_union_prim", "type": ["int", "string", "boolean"], "default": 0, "doc": "Union[int, string, boolean] with default on first branch (int=0)."},
2109            {"name": "str_utf8", "type": "string", "default": "default", "doc": "Plain Utf8 string (Reader may use Utf8View)."},
2110            {"name": "raw_bytes", "type": "bytes", "default": "", "doc": "Raw bytes field"},
2111            {"name": "fx16_plain", "type": {"type": "fixed", "name": "Fx16", "namespace": "org.apache.arrow.avrotests.v1.types", "aliases": ["Fixed16Old"], "size": 16}, "doc": "Plain fixed(16)"},
2112            {"name": "dec_bytes_s10_2", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}, "doc": "Decimal encoded on bytes, precision 10, scale 2"},
2113            {"name": "dec_fix_s20_4", "type": {"type": "fixed", "name": "DecFix20", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 20, "logicalType": "decimal", "precision": 20, "scale": 4}, "doc": "Decimal encoded on fixed(20), precision 20, scale 4"},
2114            {"name": "uuid_str", "type": {"type": "string", "logicalType": "uuid"}, "doc": "UUID logical type on string"},
2115            {"name": "d_date", "type": {"type": "int", "logicalType": "date"}, "doc": "Date32: days since 1970-01-01"},
2116            {"name": "t_millis", "type": {"type": "int", "logicalType": "time-millis"}, "doc": "Time32-millis"},
2117            {"name": "t_micros", "type": {"type": "long", "logicalType": "time-micros"}, "doc": "Time64-micros"},
2118            {"name": "ts_millis_utc", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Timestamp ms (UTC)"},
2119            {"name": "ts_micros_utc", "type": {"type": "long", "logicalType": "timestamp-micros"}, "doc": "Timestamp µs (UTC)"},
2120            {"name": "ts_millis_local", "type": {"type": "long", "logicalType": "local-timestamp-millis"}, "doc": "Local timestamp ms"},
2121            {"name": "ts_micros_local", "type": {"type": "long", "logicalType": "local-timestamp-micros"}, "doc": "Local timestamp µs"},
2122            {"name": "interval_mdn", "type": {"type": "fixed", "name": "Dur12", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 12, "logicalType": "duration"}, "doc": "Duration: fixed(12) little-endian (months, days, millis)"},
2123            {"name": "status", "type": {"type": "enum", "name": "Status", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["UNKNOWN", "NEW", "PROCESSING", "DONE"], "aliases": ["State"], "doc": "Processing status enum with default"}, "default": "UNKNOWN", "doc": "Enum field using default when resolving"},
2124            {"name": "arr_union", "type": {"type": "array", "items": ["long", "string", "null"]}, "default": [], "doc": "Array whose items are a union[long,string,null]"},
2125            {"name": "map_union", "type": {"type": "map", "values": ["null", "double", "string"]}, "default": {}, "doc": "Map whose values are a union[null,double,string]"},
2126            {"name": "address", "type": {"type": "record", "name": "Address", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Postal address with defaults and field alias", "fields": [
2127                {"name": "street", "type": "string", "default": "", "aliases": ["street_name"], "doc": "Street (field alias = street_name)"},
2128                {"name": "zip", "type": "int", "default": 0, "doc": "ZIP/postal code"},
2129                {"name": "country", "type": "string", "default": "US", "doc": "Country code"}
2130            ]}, "doc": "Embedded Address record"},
2131            {"name": "maybe_auth", "type": {"type": "record", "name": "MaybeAuth", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Optional auth token model", "fields": [
2132                {"name": "user", "type": "string", "doc": "Username"},
2133                {"name": "token", "type": ["null", "bytes"], "default": null, "doc": "Nullable auth token"}
2134            ]}},
2135            {"name": "union_enum_record_array_map", "type": [
2136                {"type": "enum", "name": "Color", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["RED", "GREEN", "BLUE"], "doc": "Color enum"},
2137                {"type": "record", "name": "RecA", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]},
2138                {"type": "record", "name": "RecB", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "x", "type": "long"}, {"name": "y", "type": "bytes"}]},
2139                {"type": "array", "items": "long"},
2140                {"type": "map", "values": "string"}
2141            ], "doc": "Union of enum, two records, array, and map"},
2142            {"name": "union_date_or_fixed4", "type": [
2143                {"type": "int", "logicalType": "date"},
2144                {"type": "fixed", "name": "Fx4", "size": 4}
2145            ], "doc": "Union of date(int) or fixed(4)"},
2146            {"name": "union_interval_or_string", "type": [
2147                {"type": "fixed", "name": "Dur12U", "size": 12, "logicalType": "duration"},
2148                "string"
2149            ], "doc": "Union of duration(fixed12) or string"},
2150            {"name": "union_uuid_or_fixed10", "type": [
2151                {"type": "string", "logicalType": "uuid"},
2152                {"type": "fixed", "name": "Fx10", "size": 10}
2153            ], "doc": "Union of UUID string or fixed(10)"},
2154            {"name": "array_records_with_union", "type": {"type": "array", "items": {
2155                "type": "record", "name": "KV", "namespace": "org.apache.arrow.avrotests.v1.types",
2156                "fields": [
2157                    {"name": "key", "type": "string"},
2158                    {"name": "val", "type": ["null", "int", "long"], "default": null}
2159                ]
2160            }}, "doc": "Array<record{key, val: union[null,int,long]}>", "default": []},
2161            {"name": "union_map_or_array_int", "type": [
2162                {"type": "map", "values": "int"},
2163                {"type": "array", "items": "int"}
2164            ], "doc": "Union[map<string,int>, array<int>]"},
2165            {"name": "renamed_with_default", "type": "int", "default": 42, "aliases": ["old_count"], "doc": "Field with alias and default"},
2166            {"name": "person", "type": {"type": "record", "name": "PersonV2", "namespace": "com.example.v2", "aliases": ["com.example.Person"], "doc": "Person record with alias pointing to previous namespace/name", "fields": [
2167                {"name": "name", "type": "string"},
2168                {"name": "age", "type": "int", "default": 0}
2169            ]}, "doc": "Record using type alias for schema evolution tests"}
2170          ]
2171        }"#;
2172        let avro = AvroSchema::new(json_str.to_string());
2173        let parsed = avro.schema().expect("schema should deserialize");
2174        let expected_canonical_form = r#"{"name":"org.apache.arrow.avrotests.v1.E2eComprehensive","type":"record","fields":[{"name":"id","type":"long"},{"name":"flag","type":"boolean"},{"name":"ratio_f32","type":"float"},{"name":"ratio_f64","type":"double"},{"name":"count_i32","type":"int"},{"name":"count_i64","type":"long"},{"name":"opt_i32_nullfirst","type":["null","int"]},{"name":"opt_str_nullsecond","type":["string","null"]},{"name":"tri_union_prim","type":["int","string","boolean"]},{"name":"str_utf8","type":"string"},{"name":"raw_bytes","type":"bytes"},{"name":"fx16_plain","type":{"name":"org.apache.arrow.avrotests.v1.types.Fx16","type":"fixed","size":16}},{"name":"dec_bytes_s10_2","type":"bytes"},{"name":"dec_fix_s20_4","type":{"name":"org.apache.arrow.avrotests.v1.types.DecFix20","type":"fixed","size":20}},{"name":"uuid_str","type":"string"},{"name":"d_date","type":"int"},{"name":"t_millis","type":"int"},{"name":"t_micros","type":"long"},{"name":"ts_millis_utc","type":"long"},{"name":"ts_micros_utc","type":"long"},{"name":"ts_millis_local","type":"long"},{"name":"ts_micros_local","type":"long"},{"name":"interval_mdn","type":{"name":"org.apache.arrow.avrotests.v1.types.Dur12","type":"fixed","size":12}},{"name":"status","type":{"name":"org.apache.arrow.avrotests.v1.types.Status","type":"enum","symbols":["UNKNOWN","NEW","PROCESSING","DONE"]}},{"name":"arr_union","type":{"type":"array","items":["long","string","null"]}},{"name":"map_union","type":{"type":"map","values":["null","double","string"]}},{"name":"address","type":{"name":"org.apache.arrow.avrotests.v1.types.Address","type":"record","fields":[{"name":"street","type":"string"},{"name":"zip","type":"int"},{"name":"country","type":"string"}]}},{"name":"maybe_auth","type":{"name":"org.apache.arrow.avrotests.v1.types.MaybeAuth","type":"record","fields":[{"name":"user","type":"string"},{"name":"token","type":["null","bytes"]}]}},{"name":"union_enum_record_array_map","type":[{"name":"org.apache.arrow.avrotests.v1.types.Color","type":"enum","symbols":["RED","GREEN","BLUE"]},{"name":"org.apache.arrow.avrotests.v1.types.RecA","type":"record","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]},{"name":"org.apache.arrow.avrotests.v1.types.RecB","type":"record","fields":[{"name":"x","type":"long"},{"name":"y","type":"bytes"}]},{"type":"array","items":"long"},{"type":"map","values":"string"}]},{"name":"union_date_or_fixed4","type":["int",{"name":"org.apache.arrow.avrotests.v1.Fx4","type":"fixed","size":4}]},{"name":"union_interval_or_string","type":[{"name":"org.apache.arrow.avrotests.v1.Dur12U","type":"fixed","size":12},"string"]},{"name":"union_uuid_or_fixed10","type":["string",{"name":"org.apache.arrow.avrotests.v1.Fx10","type":"fixed","size":10}]},{"name":"array_records_with_union","type":{"type":"array","items":{"name":"org.apache.arrow.avrotests.v1.types.KV","type":"record","fields":[{"name":"key","type":"string"},{"name":"val","type":["null","int","long"]}]}}},{"name":"union_map_or_array_int","type":[{"type":"map","values":"int"},{"type":"array","items":"int"}]},{"name":"renamed_with_default","type":"int"},{"name":"person","type":{"name":"com.example.v2.PersonV2","type":"record","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}}]}"#;
2175        let canonical_form =
2176            AvroSchema::generate_canonical_form(&parsed).expect("canonical form should be built");
2177        assert_eq!(
2178            canonical_form, expected_canonical_form,
2179            "Canonical form must match Avro spec PCF exactly"
2180        );
2181    }
2182
2183    #[test]
2184    fn test_new_schema_store() {
2185        let store = SchemaStore::new();
2186        assert!(store.schemas.is_empty());
2187    }
2188
2189    #[test]
2190    fn test_try_from_schemas_rabin() {
2191        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2192        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2193        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2194        schemas.insert(
2195            int_avro_schema
2196                .fingerprint(FingerprintAlgorithm::Rabin)
2197                .unwrap(),
2198            int_avro_schema.clone(),
2199        );
2200        schemas.insert(
2201            record_avro_schema
2202                .fingerprint(FingerprintAlgorithm::Rabin)
2203                .unwrap(),
2204            record_avro_schema.clone(),
2205        );
2206        let store = SchemaStore::try_from(schemas).unwrap();
2207        let int_fp = int_avro_schema
2208            .fingerprint(FingerprintAlgorithm::Rabin)
2209            .unwrap();
2210        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2211        let rec_fp = record_avro_schema
2212            .fingerprint(FingerprintAlgorithm::Rabin)
2213            .unwrap();
2214        assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
2215    }
2216
2217    #[test]
2218    fn test_try_from_with_duplicates() {
2219        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2220        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2221        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2222        schemas.insert(
2223            int_avro_schema
2224                .fingerprint(FingerprintAlgorithm::Rabin)
2225                .unwrap(),
2226            int_avro_schema.clone(),
2227        );
2228        schemas.insert(
2229            record_avro_schema
2230                .fingerprint(FingerprintAlgorithm::Rabin)
2231                .unwrap(),
2232            record_avro_schema.clone(),
2233        );
2234        // Insert duplicate of int schema
2235        schemas.insert(
2236            int_avro_schema
2237                .fingerprint(FingerprintAlgorithm::Rabin)
2238                .unwrap(),
2239            int_avro_schema.clone(),
2240        );
2241        let store = SchemaStore::try_from(schemas).unwrap();
2242        assert_eq!(store.schemas.len(), 2);
2243        let int_fp = int_avro_schema
2244            .fingerprint(FingerprintAlgorithm::Rabin)
2245            .unwrap();
2246        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2247    }
2248
2249    #[test]
2250    fn test_register_and_lookup_rabin() {
2251        let mut store = SchemaStore::new();
2252        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2253        let fp_enum = store.register(schema.clone()).unwrap();
2254        match fp_enum {
2255            Fingerprint::Rabin(fp_val) => {
2256                assert_eq!(
2257                    store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
2258                    Some(schema.clone())
2259                );
2260                assert!(
2261                    store
2262                        .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
2263                        .is_none()
2264                );
2265            }
2266            Fingerprint::Id(_id) => {
2267                unreachable!("This test should only generate Rabin fingerprints")
2268            }
2269            Fingerprint::Id64(_id) => {
2270                unreachable!("This test should only generate Rabin fingerprints")
2271            }
2272            #[cfg(feature = "md5")]
2273            Fingerprint::MD5(_id) => {
2274                unreachable!("This test should only generate Rabin fingerprints")
2275            }
2276            #[cfg(feature = "sha256")]
2277            Fingerprint::SHA256(_id) => {
2278                unreachable!("This test should only generate Rabin fingerprints")
2279            }
2280        }
2281    }
2282
2283    #[test]
2284    fn test_set_and_lookup_id() {
2285        let mut store = SchemaStore::new();
2286        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2287        let id = 42u32;
2288        let fp = Fingerprint::Id(id);
2289        let out_fp = store.set(fp, schema.clone()).unwrap();
2290        assert_eq!(out_fp, fp);
2291        assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2292        assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2293    }
2294
2295    #[test]
2296    fn test_set_and_lookup_id64() {
2297        let mut store = SchemaStore::new();
2298        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2299        let id64: u64 = 0xDEAD_BEEF_DEAD_BEEF;
2300        let fp = Fingerprint::Id64(id64);
2301        let out_fp = store.set(fp, schema.clone()).unwrap();
2302        assert_eq!(out_fp, fp, "set should return the same Id64 fingerprint");
2303        assert_eq!(
2304            store.lookup(&fp).cloned(),
2305            Some(schema.clone()),
2306            "lookup should find the schema by Id64"
2307        );
2308        assert!(
2309            store
2310                .lookup(&Fingerprint::Id64(id64.wrapping_add(1)))
2311                .is_none(),
2312            "lookup with a different Id64 must return None"
2313        );
2314    }
2315
2316    #[test]
2317    fn test_fingerprint_id64_conversions() {
2318        let algo_from_fp = FingerprintAlgorithm::from(&Fingerprint::Id64(123));
2319        assert_eq!(algo_from_fp, FingerprintAlgorithm::Id64);
2320        let fp_from_algo = Fingerprint::from(FingerprintAlgorithm::Id64);
2321        assert!(matches!(fp_from_algo, Fingerprint::Id64(0)));
2322        let strategy_from_fp = FingerprintStrategy::from(Fingerprint::Id64(5));
2323        assert!(matches!(strategy_from_fp, FingerprintStrategy::Id64(0)));
2324        let algo_from_strategy = FingerprintAlgorithm::from(strategy_from_fp);
2325        assert_eq!(algo_from_strategy, FingerprintAlgorithm::Id64);
2326    }
2327
2328    #[test]
2329    fn test_register_duplicate_schema() {
2330        let mut store = SchemaStore::new();
2331        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2332        let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2333        let fingerprint1 = store.register(schema1).unwrap();
2334        let fingerprint2 = store.register(schema2).unwrap();
2335        assert_eq!(fingerprint1, fingerprint2);
2336        assert_eq!(store.schemas.len(), 1);
2337    }
2338
2339    #[test]
2340    fn test_set_and_lookup_with_provided_fingerprint() {
2341        let mut store = SchemaStore::new();
2342        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2343        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2344        let out_fp = store.set(fp, schema.clone()).unwrap();
2345        assert_eq!(out_fp, fp);
2346        assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2347    }
2348
2349    #[test]
2350    fn test_set_duplicate_same_schema_ok() {
2351        let mut store = SchemaStore::new();
2352        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2353        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2354        let _ = store.set(fp, schema.clone()).unwrap();
2355        let _ = store.set(fp, schema.clone()).unwrap();
2356        assert_eq!(store.schemas.len(), 1);
2357    }
2358
2359    #[test]
2360    fn test_set_duplicate_different_schema_collision_error() {
2361        let mut store = SchemaStore::new();
2362        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2363        let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2364        // Use the same Fingerprint::Id to simulate a collision across different schemas
2365        let fp = Fingerprint::Id(123);
2366        let _ = store.set(fp, schema1).unwrap();
2367        let err = store.set(fp, schema2).unwrap_err();
2368        let msg = format!("{err}");
2369        assert!(msg.contains("Schema fingerprint collision"));
2370    }
2371
2372    #[test]
2373    fn test_canonical_form_generation_primitive() {
2374        let schema = int_schema();
2375        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2376        assert_eq!(canonical_form, r#""int""#);
2377    }
2378
2379    #[test]
2380    fn test_canonical_form_generation_record() {
2381        let schema = record_schema();
2382        let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2383        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2384        assert_eq!(canonical_form, expected_canonical_form);
2385    }
2386
2387    #[test]
2388    fn test_fingerprint_calculation() {
2389        let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2390        let expected_fingerprint = 10505236152925314060;
2391        let fingerprint = compute_fingerprint_rabin(canonical_form);
2392        assert_eq!(fingerprint, expected_fingerprint);
2393    }
2394
2395    #[test]
2396    fn test_register_and_lookup_complex_schema() {
2397        let mut store = SchemaStore::new();
2398        let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2399        let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2400        let expected_fingerprint = Fingerprint::Rabin(compute_fingerprint_rabin(canonical_form));
2401        let fingerprint = store.register(schema.clone()).unwrap();
2402        assert_eq!(fingerprint, expected_fingerprint);
2403        let looked_up = store.lookup(&fingerprint).cloned();
2404        assert_eq!(looked_up, Some(schema));
2405    }
2406
2407    #[test]
2408    fn test_fingerprints_returns_all_keys() {
2409        let mut store = SchemaStore::new();
2410        let fp_int = store
2411            .register(AvroSchema::new(
2412                serde_json::to_string(&int_schema()).unwrap(),
2413            ))
2414            .unwrap();
2415        let fp_record = store
2416            .register(AvroSchema::new(
2417                serde_json::to_string(&record_schema()).unwrap(),
2418            ))
2419            .unwrap();
2420        let fps = store.fingerprints();
2421        assert_eq!(fps.len(), 2);
2422        assert!(fps.contains(&fp_int));
2423        assert!(fps.contains(&fp_record));
2424    }
2425
2426    #[test]
2427    fn test_canonical_form_strips_attributes() {
2428        let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2429            name: "record_with_attrs",
2430            namespace: None,
2431            doc: Some(Cow::from("This doc should be stripped")),
2432            aliases: vec!["alias1", "alias2"],
2433            fields: vec![Field {
2434                name: "f1",
2435                doc: Some(Cow::from("field doc")),
2436                r#type: Schema::Type(Type {
2437                    r#type: TypeName::Primitive(PrimitiveType::Bytes),
2438                    attributes: Attributes {
2439                        logical_type: None,
2440                        additional: HashMap::from([("precision", json!(4))]),
2441                    },
2442                }),
2443                default: None,
2444                aliases: vec![],
2445            }],
2446            attributes: Attributes {
2447                logical_type: None,
2448                additional: HashMap::from([("custom_attr", json!("value"))]),
2449            },
2450        }));
2451        let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2452        let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2453        assert_eq!(canonical_form, expected_canonical_form);
2454    }
2455
2456    #[test]
2457    fn test_primitive_mappings() {
2458        let cases = vec![
2459            (DataType::Boolean, "\"boolean\""),
2460            (DataType::Int8, "\"int\""),
2461            (DataType::Int16, "\"int\""),
2462            (DataType::Int32, "\"int\""),
2463            (DataType::Int64, "\"long\""),
2464            (DataType::UInt8, "\"int\""),
2465            (DataType::UInt16, "\"int\""),
2466            (DataType::UInt32, "\"long\""),
2467            (DataType::UInt64, "\"long\""),
2468            (DataType::Float16, "\"float\""),
2469            (DataType::Float32, "\"float\""),
2470            (DataType::Float64, "\"double\""),
2471            (DataType::Utf8, "\"string\""),
2472            (DataType::Binary, "\"bytes\""),
2473        ];
2474        for (dt, avro_token) in cases {
2475            let field = ArrowField::new("col", dt.clone(), false);
2476            let arrow_schema = single_field_schema(field);
2477            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2478            assert_json_contains(&avro.json_string, avro_token);
2479        }
2480    }
2481
2482    #[test]
2483    fn test_temporal_mappings() {
2484        let cases = vec![
2485            (DataType::Date32, "\"logicalType\":\"date\""),
2486            (
2487                DataType::Time32(TimeUnit::Millisecond),
2488                "\"logicalType\":\"time-millis\"",
2489            ),
2490            (
2491                DataType::Time64(TimeUnit::Microsecond),
2492                "\"logicalType\":\"time-micros\"",
2493            ),
2494            (
2495                DataType::Timestamp(TimeUnit::Millisecond, None),
2496                "\"logicalType\":\"local-timestamp-millis\"",
2497            ),
2498            (
2499                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2500                "\"logicalType\":\"timestamp-micros\"",
2501            ),
2502        ];
2503        for (dt, needle) in cases {
2504            let field = ArrowField::new("ts", dt.clone(), true);
2505            let arrow_schema = single_field_schema(field);
2506            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2507            assert_json_contains(&avro.json_string, needle);
2508        }
2509    }
2510
2511    #[test]
2512    fn test_decimal_and_uuid() {
2513        let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2514        let dec_schema = single_field_schema(decimal_field);
2515        let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2516        assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2517        assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2518        assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2519        let mut md = HashMap::new();
2520        md.insert("logicalType".into(), "uuid".into());
2521        let uuid_field =
2522            ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2523        let uuid_schema = single_field_schema(uuid_field);
2524        let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2525        assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2526    }
2527
2528    #[cfg(feature = "avro_custom_types")]
2529    #[test]
2530    fn test_interval_duration() {
2531        let interval_field = ArrowField::new(
2532            "span",
2533            DataType::Interval(IntervalUnit::MonthDayNano),
2534            false,
2535        );
2536        let s = single_field_schema(interval_field);
2537        let avro = AvroSchema::try_from(&s).unwrap();
2538        assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2539        assert_json_contains(&avro.json_string, "\"size\":12");
2540        let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2541        let s2 = single_field_schema(dur_field);
2542        let avro2 = AvroSchema::try_from(&s2).unwrap();
2543        assert_json_contains(
2544            &avro2.json_string,
2545            "\"logicalType\":\"arrow.duration-nanos\"",
2546        );
2547    }
2548
2549    #[test]
2550    fn test_complex_types() {
2551        let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2552        let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2553        let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2554        assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2555        assert_json_contains(&avro_list.json_string, "\"items\"");
2556        let value_field = ArrowField::new("value", DataType::Boolean, true);
2557        let entries_struct = ArrowField::new(
2558            "entries",
2559            DataType::Struct(Fields::from(vec![
2560                ArrowField::new("key", DataType::Utf8, false),
2561                value_field.clone(),
2562            ])),
2563            false,
2564        );
2565        let map_dt = DataType::Map(Arc::new(entries_struct), false);
2566        let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
2567        let avro_map = AvroSchema::try_from(&map_schema).unwrap();
2568        assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
2569        assert_json_contains(&avro_map.json_string, "\"values\"");
2570        let struct_dt = DataType::Struct(Fields::from(vec![
2571            ArrowField::new("f1", DataType::Int64, false),
2572            ArrowField::new("f2", DataType::Utf8, true),
2573        ]));
2574        let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
2575        let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
2576        assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
2577        assert_json_contains(&avro_struct.json_string, "\"null\"");
2578    }
2579
2580    #[test]
2581    fn test_enum_dictionary() {
2582        let mut md = HashMap::new();
2583        md.insert(
2584            AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
2585            "[\"OPEN\",\"CLOSED\"]".into(),
2586        );
2587        let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2588        let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
2589        let schema = single_field_schema(field);
2590        let avro = AvroSchema::try_from(&schema).unwrap();
2591        assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
2592        assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
2593    }
2594
2595    #[test]
2596    fn test_run_end_encoded() {
2597        let ree_dt = DataType::RunEndEncoded(
2598            Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
2599            Arc::new(ArrowField::new("values", DataType::Utf8, false)),
2600        );
2601        let s = single_field_schema(ArrowField::new("text", ree_dt, false));
2602        let avro = AvroSchema::try_from(&s).unwrap();
2603        assert_json_contains(&avro.json_string, "\"string\"");
2604    }
2605
2606    #[test]
2607    fn test_dense_union() {
2608        let uf: UnionFields = vec![
2609            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
2610            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
2611        ]
2612        .into_iter()
2613        .collect();
2614        let union_dt = DataType::Union(uf, UnionMode::Dense);
2615        let s = single_field_schema(ArrowField::new("u", union_dt, false));
2616        let avro =
2617            AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
2618        let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
2619        let fields = v
2620            .get("fields")
2621            .and_then(|x| x.as_array())
2622            .expect("fields array");
2623        let u_field = fields
2624            .iter()
2625            .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
2626            .expect("field 'u'");
2627        let union = u_field.get("type").expect("u.type");
2628        let arr = union.as_array().expect("u.type must be Avro union array");
2629        assert_eq!(arr.len(), 2, "expected two union branches");
2630        let first = &arr[0];
2631        let obj = first
2632            .as_object()
2633            .expect("first branch should be an object with metadata");
2634        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
2635        assert_eq!(
2636            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
2637            Some("dense")
2638        );
2639        let type_ids: Vec<i64> = obj
2640            .get("arrowUnionTypeIds")
2641            .and_then(|a| a.as_array())
2642            .expect("arrowUnionTypeIds array")
2643            .iter()
2644            .map(|n| n.as_i64().expect("i64"))
2645            .collect();
2646        assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
2647        assert_eq!(arr[1], Value::String("string".into()));
2648    }
2649
2650    #[test]
2651    fn round_trip_primitive() {
2652        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
2653        let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
2654        let decoded = avro_schema.schema().unwrap();
2655        assert!(matches!(decoded, Schema::Complex(_)));
2656    }
2657
2658    #[test]
2659    fn test_name_generator_sanitization_and_uniqueness() {
2660        let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
2661        let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
2662        let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
2663        let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
2664        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2665        assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
2666        assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
2667        assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
2668    }
2669
2670    #[test]
2671    fn test_date64_logical_type_mapping() {
2672        let field = ArrowField::new("d", DataType::Date64, true);
2673        let schema = single_field_schema(field);
2674        let avro = AvroSchema::try_from(&schema).unwrap();
2675        assert_json_contains(
2676            &avro.json_string,
2677            "\"logicalType\":\"local-timestamp-millis\"",
2678        );
2679    }
2680
2681    #[cfg(feature = "avro_custom_types")]
2682    #[test]
2683    fn test_duration_list_extras_propagated() {
2684        let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
2685        let list_dt = DataType::List(Arc::new(child));
2686        let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
2687        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2688        assert_json_contains(
2689            &avro.json_string,
2690            "\"logicalType\":\"arrow.duration-micros\"",
2691        );
2692    }
2693
2694    #[test]
2695    fn test_interval_yearmonth_extra() {
2696        let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
2697        let schema = single_field_schema(field);
2698        let avro = AvroSchema::try_from(&schema).unwrap();
2699        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
2700    }
2701
2702    #[test]
2703    fn test_interval_daytime_extra() {
2704        let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
2705        let schema = single_field_schema(field);
2706        let avro = AvroSchema::try_from(&schema).unwrap();
2707        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
2708    }
2709
2710    #[test]
2711    fn test_fixed_size_list_extra() {
2712        let child = ArrowField::new("item", DataType::Int32, false);
2713        let dt = DataType::FixedSizeList(Arc::new(child), 3);
2714        let schema = single_field_schema(ArrowField::new("triples", dt, false));
2715        let avro = AvroSchema::try_from(&schema).unwrap();
2716        assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
2717    }
2718
2719    #[cfg(feature = "avro_custom_types")]
2720    #[test]
2721    fn test_map_duration_value_extra() {
2722        let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
2723        let entries_struct = ArrowField::new(
2724            "entries",
2725            DataType::Struct(Fields::from(vec![
2726                ArrowField::new("key", DataType::Utf8, false),
2727                val_field,
2728            ])),
2729            false,
2730        );
2731        let map_dt = DataType::Map(Arc::new(entries_struct), false);
2732        let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
2733        let avro = AvroSchema::try_from(&schema).unwrap();
2734        assert_json_contains(
2735            &avro.json_string,
2736            "\"logicalType\":\"arrow.duration-seconds\"",
2737        );
2738    }
2739
2740    #[test]
2741    fn test_schema_with_non_string_defaults_decodes_successfully() {
2742        let schema_json = r#"{
2743            "type": "record",
2744            "name": "R",
2745            "fields": [
2746                {"name": "a", "type": "int", "default": 0},
2747                {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
2748                {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
2749                {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
2750                    {"name": "flag", "type": "boolean", "default": true},
2751                    {"name": "name", "type": "string", "default": "hi"}
2752                ]}, "default": {"flag": false, "name": "d"}},
2753                {"name": "u", "type": ["int", "null"], "default": 42}
2754            ]
2755        }"#;
2756        let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
2757        match &schema {
2758            Schema::Complex(ComplexType::Record(_)) => {}
2759            other => panic!("expected record schema, got: {:?}", other),
2760        }
2761        // Avro to Arrow conversion
2762        let field = crate::codec::AvroField::try_from(&schema)
2763            .expect("Avro->Arrow conversion should succeed");
2764        let arrow_field = field.field();
2765        // Build expected Arrow field
2766        let expected_list_item = ArrowField::new(
2767            arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
2768            DataType::Int64,
2769            false,
2770        );
2771        let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
2772
2773        let expected_map_value = ArrowField::new("value", DataType::Float64, false);
2774        let expected_entries = ArrowField::new(
2775            "entries",
2776            DataType::Struct(Fields::from(vec![
2777                ArrowField::new("key", DataType::Utf8, false),
2778                expected_map_value,
2779            ])),
2780            false,
2781        );
2782        let expected_c =
2783            ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
2784        let mut inner_md = std::collections::HashMap::new();
2785        inner_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "Inner".to_string());
2786        let expected_inner = ArrowField::new(
2787            "inner",
2788            DataType::Struct(Fields::from(vec![
2789                ArrowField::new("flag", DataType::Boolean, false),
2790                ArrowField::new("name", DataType::Utf8, false),
2791            ])),
2792            false,
2793        )
2794        .with_metadata(inner_md);
2795        let mut root_md = std::collections::HashMap::new();
2796        root_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "R".to_string());
2797        let expected = ArrowField::new(
2798            "R",
2799            DataType::Struct(Fields::from(vec![
2800                ArrowField::new("a", DataType::Int32, false),
2801                expected_b,
2802                expected_c,
2803                expected_inner,
2804                ArrowField::new("u", DataType::Int32, true),
2805            ])),
2806            false,
2807        )
2808        .with_metadata(root_md);
2809        assert_eq!(arrow_field, expected);
2810    }
2811
2812    #[test]
2813    fn default_order_is_consistent() {
2814        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
2815        let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
2816        let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
2817        assert_eq!(a, b.unwrap().json_string);
2818    }
2819
2820    #[test]
2821    fn test_union_branch_missing_name_errors() {
2822        for t in ["record", "enum", "fixed"] {
2823            let branch = json!({ "type": t });
2824            let err = union_branch_signature(&branch).unwrap_err().to_string();
2825            assert!(
2826                err.contains(&format!("Union branch '{t}' missing required 'name'")),
2827                "expected missing-name error for {t}, got: {err}"
2828            );
2829        }
2830    }
2831
2832    #[test]
2833    fn test_union_branch_named_type_signature_includes_name() {
2834        let rec = json!({ "type": "record", "name": "Foo" });
2835        assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
2836        let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
2837        assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
2838        let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
2839        assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
2840    }
2841
2842    #[test]
2843    fn test_record_field_alias_resolution_without_default() {
2844        let writer_json = r#"{
2845          "type":"record",
2846          "name":"R",
2847          "fields":[{"name":"old","type":"int"}]
2848        }"#;
2849        let reader_json = r#"{
2850          "type":"record",
2851          "name":"R",
2852          "fields":[{"name":"new","aliases":["old"],"type":"int"}]
2853        }"#;
2854        let writer: Schema = serde_json::from_str(writer_json).unwrap();
2855        let reader: Schema = serde_json::from_str(reader_json).unwrap();
2856        let resolved = AvroFieldBuilder::new(&writer)
2857            .with_reader_schema(&reader)
2858            .with_utf8view(false)
2859            .with_strict_mode(false)
2860            .build()
2861            .unwrap();
2862        let expected = ArrowField::new(
2863            "R",
2864            DataType::Struct(Fields::from(vec![ArrowField::new(
2865                "new",
2866                DataType::Int32,
2867                false,
2868            )])),
2869            false,
2870        );
2871        assert_eq!(resolved.field(), expected);
2872    }
2873
2874    #[test]
2875    fn test_record_field_alias_ambiguous_in_strict_mode_errors() {
2876        let writer_json = r#"{
2877          "type":"record",
2878          "name":"R",
2879          "fields":[
2880            {"name":"a","type":"int","aliases":["old"]},
2881            {"name":"b","type":"int","aliases":["old"]}
2882          ]
2883        }"#;
2884        let reader_json = r#"{
2885          "type":"record",
2886          "name":"R",
2887          "fields":[{"name":"target","type":"int","aliases":["old"]}]
2888        }"#;
2889        let writer: Schema = serde_json::from_str(writer_json).unwrap();
2890        let reader: Schema = serde_json::from_str(reader_json).unwrap();
2891        let err = AvroFieldBuilder::new(&writer)
2892            .with_reader_schema(&reader)
2893            .with_utf8view(false)
2894            .with_strict_mode(true)
2895            .build()
2896            .unwrap_err()
2897            .to_string();
2898        assert!(
2899            err.contains("Ambiguous alias 'old'"),
2900            "expected ambiguous-alias error, got: {err}"
2901        );
2902    }
2903
2904    #[test]
2905    fn test_pragmatic_writer_field_alias_mapping_non_strict() {
2906        let writer_json = r#"{
2907          "type":"record",
2908          "name":"R",
2909          "fields":[{"name":"before","type":"int","aliases":["now"]}]
2910        }"#;
2911        let reader_json = r#"{
2912          "type":"record",
2913          "name":"R",
2914          "fields":[{"name":"now","type":"int"}]
2915        }"#;
2916        let writer: Schema = serde_json::from_str(writer_json).unwrap();
2917        let reader: Schema = serde_json::from_str(reader_json).unwrap();
2918        let resolved = AvroFieldBuilder::new(&writer)
2919            .with_reader_schema(&reader)
2920            .with_utf8view(false)
2921            .with_strict_mode(false)
2922            .build()
2923            .unwrap();
2924        let expected = ArrowField::new(
2925            "R",
2926            DataType::Struct(Fields::from(vec![ArrowField::new(
2927                "now",
2928                DataType::Int32,
2929                false,
2930            )])),
2931            false,
2932        );
2933        assert_eq!(resolved.field(), expected);
2934    }
2935
2936    #[test]
2937    fn test_missing_reader_field_null_first_no_default_is_ok() {
2938        let writer_json = r#"{
2939          "type":"record",
2940          "name":"R",
2941          "fields":[{"name":"a","type":"int"}]
2942        }"#;
2943        let reader_json = r#"{
2944          "type":"record",
2945          "name":"R",
2946          "fields":[
2947            {"name":"a","type":"int"},
2948            {"name":"b","type":["null","int"]}
2949          ]
2950        }"#;
2951        let writer: Schema = serde_json::from_str(writer_json).unwrap();
2952        let reader: Schema = serde_json::from_str(reader_json).unwrap();
2953        let resolved = AvroFieldBuilder::new(&writer)
2954            .with_reader_schema(&reader)
2955            .with_utf8view(false)
2956            .with_strict_mode(false)
2957            .build()
2958            .unwrap();
2959        let expected = ArrowField::new(
2960            "R",
2961            DataType::Struct(Fields::from(vec![
2962                ArrowField::new("a", DataType::Int32, false),
2963                ArrowField::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
2964                    AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(),
2965                    "null".to_string(),
2966                )])),
2967            ])),
2968            false,
2969        );
2970        assert_eq!(resolved.field(), expected);
2971    }
2972
2973    #[test]
2974    fn test_missing_reader_field_null_second_without_default_errors() {
2975        let writer_json = r#"{
2976          "type":"record",
2977          "name":"R",
2978          "fields":[{"name":"a","type":"int"}]
2979        }"#;
2980        let reader_json = r#"{
2981          "type":"record",
2982          "name":"R",
2983          "fields":[
2984            {"name":"a","type":"int"},
2985            {"name":"b","type":["int","null"]}
2986          ]
2987        }"#;
2988        let writer: Schema = serde_json::from_str(writer_json).unwrap();
2989        let reader: Schema = serde_json::from_str(reader_json).unwrap();
2990        let err = AvroFieldBuilder::new(&writer)
2991            .with_reader_schema(&reader)
2992            .with_utf8view(false)
2993            .with_strict_mode(false)
2994            .build()
2995            .unwrap_err()
2996            .to_string();
2997        assert!(
2998            err.contains("must have a default value"),
2999            "expected missing-default error, got: {err}"
3000        );
3001    }
3002
3003    #[test]
3004    fn test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
3005        let field = ArrowField::new("x", DataType::Int32, true);
3006        let injected_json =
3007            r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3008                .to_string();
3009        let mut md = HashMap::new();
3010        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
3011        md.insert("custom".to_string(), "123".to_string());
3012        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3013        let opts = AvroSchemaOptions {
3014            null_order: Some(Nullability::NullSecond),
3015            strip_metadata: false,
3016        };
3017        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3018        assert_eq!(
3019            out.json_string, injected_json,
3020            "When strip_metadata=false and avro.schema is present, return the embedded JSON verbatim"
3021        );
3022        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3023        assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
3024        assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
3025    }
3026
3027    #[test]
3028    fn test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough() {
3029        let field = ArrowField::new("x", DataType::Int32, true);
3030        let injected_json =
3031            r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3032                .to_string();
3033        let mut md = HashMap::new();
3034        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
3035        md.insert("custom_meta".to_string(), "7".to_string());
3036        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3037        let opts = AvroSchemaOptions {
3038            null_order: Some(Nullability::NullFirst),
3039            strip_metadata: true,
3040        };
3041        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3042        assert_json_contains(&out.json_string, "\"type\":\"record\"");
3043        assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
3044        assert_json_contains(&out.json_string, "\"custom_meta\":7");
3045    }
3046
3047    #[test]
3048    fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
3049        let field = ArrowField::new("s", DataType::Utf8, true);
3050        let arrow_schema = single_field_schema(field);
3051        let opts = AvroSchemaOptions {
3052            null_order: Some(Nullability::NullFirst),
3053            strip_metadata: true,
3054        };
3055        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3056        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3057        let arr = v["fields"][0]["type"]
3058            .as_array()
3059            .expect("nullable primitive should be Avro union array");
3060        assert_eq!(arr[0], Value::String("null".into()));
3061        assert_eq!(arr[1], Value::String("string".into()));
3062    }
3063
3064    #[test]
3065    fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
3066        let field = ArrowField::new("s", DataType::Utf8, true);
3067        let arrow_schema = single_field_schema(field);
3068        let opts = AvroSchemaOptions {
3069            null_order: Some(Nullability::NullSecond),
3070            strip_metadata: true,
3071        };
3072        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3073        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3074        let arr = v["fields"][0]["type"]
3075            .as_array()
3076            .expect("nullable primitive should be Avro union array");
3077        assert_eq!(arr[0], Value::String("string".into()));
3078        assert_eq!(arr[1], Value::String("null".into()));
3079    }
3080
3081    #[test]
3082    fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata() {
3083        let uf: UnionFields = vec![
3084            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3085            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
3086        ]
3087        .into_iter()
3088        .collect();
3089        let union_dt = DataType::Union(uf, UnionMode::Dense);
3090        let arrow_schema = single_field_schema(ArrowField::new("u", union_dt, true));
3091        let with_extras = AvroSchema::from_arrow_with_options(
3092            &arrow_schema,
3093            Some(AvroSchemaOptions {
3094                null_order: Some(Nullability::NullFirst),
3095                strip_metadata: false,
3096            }),
3097        )
3098        .unwrap();
3099        let v_with: Value = serde_json::from_str(&with_extras.json_string).unwrap();
3100        let union_arr = v_with["fields"][0]["type"].as_array().expect("union array");
3101        let first_obj = union_arr
3102            .iter()
3103            .find(|b| b.is_object())
3104            .expect("expected an object branch with extras");
3105        let obj = first_obj.as_object().unwrap();
3106        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3107        assert_eq!(
3108            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3109            Some("dense")
3110        );
3111        let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
3112            .as_array()
3113            .expect("arrowUnionTypeIds array")
3114            .iter()
3115            .map(|n| n.as_i64().expect("i64"))
3116            .collect();
3117        assert_eq!(type_ids, vec![2, 7]);
3118        let stripped = AvroSchema::from_arrow_with_options(
3119            &arrow_schema,
3120            Some(AvroSchemaOptions {
3121                null_order: Some(Nullability::NullFirst),
3122                strip_metadata: true,
3123            }),
3124        )
3125        .unwrap();
3126        let v_stripped: Value = serde_json::from_str(&stripped.json_string).unwrap();
3127        let union_arr2 = v_stripped["fields"][0]["type"]
3128            .as_array()
3129            .expect("union array");
3130        assert!(
3131            !union_arr2.iter().any(|b| b
3132                .as_object()
3133                .is_some_and(|m| m.contains_key("arrowUnionMode"))),
3134            "extras must be removed when strip_metadata=true"
3135        );
3136        assert_eq!(union_arr2[0], Value::String("null".into()));
3137        assert_eq!(union_arr2[1], Value::String("int".into()));
3138        assert_eq!(union_arr2[2], Value::String("string".into()));
3139    }
3140}