arrow_avro/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Schema representations for Arrow.
19
20#[cfg(feature = "canonical_extension_types")]
21use arrow_schema::extension::ExtensionType;
22use arrow_schema::{
23    ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
24    UnionMode,
25};
26use serde::{Deserialize, Serialize};
27use serde_json::{Map as JsonMap, Value, json};
28#[cfg(feature = "sha256")]
29use sha2::{Digest, Sha256};
30use std::borrow::Cow;
31use std::cmp::PartialEq;
32use std::collections::hash_map::Entry;
33use std::collections::{HashMap, HashSet};
34use strum_macros::AsRefStr;
35
36/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
37pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
38
39/// The Confluent "magic" byte (`0x00`)
40pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
41
42/// The maximum possible length of a prefix.
43/// SHA256 (32) + single-object magic (2)
44pub const MAX_PREFIX_LEN: usize = 34;
45
46/// The metadata key used for storing the JSON encoded `Schema`
47pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
48
49/// Metadata key used to represent Avro enum symbols in an Arrow schema.
50pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
51
52/// Metadata key used to store the default value of a field in an Avro schema.
53pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
54
55/// Metadata key used to store the name of a type in an Avro schema.
56pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
57
58/// Metadata key used to store the name of a type in an Avro schema.
59pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
60
61/// Metadata key used to store the documentation for a type in an Avro schema.
62pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
63
64/// Default name for the root record in an Avro schema.
65pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
66
67/// Avro types are not nullable, with nullability instead encoded as a union
68/// where one of the variants is the null type.
69///
70/// To accommodate this, we specially case two-variant unions where one of the
71/// variants is the null type, and use this to derive arrow's notion of nullability
72#[derive(Debug, Copy, Clone, PartialEq, Default)]
73pub(crate) enum Nullability {
74    /// The nulls are encoded as the first union variant
75    #[default]
76    NullFirst,
77    /// The nulls are encoded as the second union variant
78    NullSecond,
79}
80
81/// Either a [`PrimitiveType`] or a reference to a previously defined named type
82///
83/// <https://avro.apache.org/docs/1.11.1/specification/#names>
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85#[serde(untagged)]
86/// A type name in an Avro schema
87///
88/// This represents the different ways a type can be referenced in an Avro schema.
89pub(crate) enum TypeName<'a> {
90    /// A primitive type like null, boolean, int, etc.
91    Primitive(PrimitiveType),
92    /// A reference to another named type
93    Ref(&'a str),
94}
95
96/// A primitive type
97///
98/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
100#[serde(rename_all = "camelCase")]
101#[strum(serialize_all = "lowercase")]
102pub(crate) enum PrimitiveType {
103    /// null: no value
104    Null,
105    /// boolean: a binary value
106    Boolean,
107    /// int: 32-bit signed integer
108    Int,
109    /// long: 64-bit signed integer
110    Long,
111    /// float: single precision (32-bit) IEEE 754 floating-point number
112    Float,
113    /// double: double precision (64-bit) IEEE 754 floating-point number
114    Double,
115    /// bytes: sequence of 8-bit unsigned bytes
116    Bytes,
117    /// string: Unicode character sequence
118    String,
119}
120
121/// Additional attributes within a `Schema`
122///
123/// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration>
124#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
125#[serde(rename_all = "camelCase")]
126pub(crate) struct Attributes<'a> {
127    /// A logical type name
128    ///
129    /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types>
130    #[serde(default)]
131    pub(crate) logical_type: Option<&'a str>,
132
133    /// Additional JSON attributes
134    #[serde(flatten)]
135    pub(crate) additional: HashMap<&'a str, Value>,
136}
137
138impl Attributes<'_> {
139    /// Returns the field metadata for this [`Attributes`]
140    pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
141        self.additional
142            .iter()
143            .map(|(k, v)| (k.to_string(), v.to_string()))
144            .collect()
145    }
146}
147
148/// A type definition that is not a variant of [`ComplexType`]
149#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
150#[serde(rename_all = "camelCase")]
151pub(crate) struct Type<'a> {
152    /// The type of this Avro data structure
153    #[serde(borrow)]
154    pub(crate) r#type: TypeName<'a>,
155    /// Additional attributes associated with this type
156    #[serde(flatten)]
157    pub(crate) attributes: Attributes<'a>,
158}
159
160/// An Avro schema
161///
162/// This represents the different shapes of Avro schemas as defined in the specification.
163/// See <https://avro.apache.org/docs/1.11.1/specification/#schemas> for more details.
164#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(untagged)]
166pub(crate) enum Schema<'a> {
167    /// A direct type name (primitive or reference)
168    #[serde(borrow)]
169    TypeName(TypeName<'a>),
170    /// A union of multiple schemas (e.g., ["null", "string"])
171    #[serde(borrow)]
172    Union(Vec<Schema<'a>>),
173    /// A complex type such as record, array, map, etc.
174    #[serde(borrow)]
175    Complex(ComplexType<'a>),
176    /// A type with attributes
177    #[serde(borrow)]
178    Type(Type<'a>),
179}
180
181/// A complex type
182///
183/// <https://avro.apache.org/docs/1.11.1/specification/#complex-types>
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(tag = "type", rename_all = "camelCase")]
186pub(crate) enum ComplexType<'a> {
187    /// Record type: a sequence of fields with names and types
188    #[serde(borrow)]
189    Record(Record<'a>),
190    /// Enum type: a set of named values
191    #[serde(borrow)]
192    Enum(Enum<'a>),
193    /// Array type: a sequence of values of the same type
194    #[serde(borrow)]
195    Array(Array<'a>),
196    /// Map type: a mapping from strings to values of the same type
197    #[serde(borrow)]
198    Map(Map<'a>),
199    /// Fixed type: a fixed-size byte array
200    #[serde(borrow)]
201    Fixed(Fixed<'a>),
202}
203
204/// A record
205///
206/// <https://avro.apache.org/docs/1.11.1/specification/#schema-record>
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub(crate) struct Record<'a> {
209    /// Name of the record
210    #[serde(borrow)]
211    pub(crate) name: &'a str,
212    /// Optional namespace for the record, provides a way to organize names
213    #[serde(borrow, default)]
214    pub(crate) namespace: Option<&'a str>,
215    /// Optional documentation string for the record
216    #[serde(borrow, default)]
217    pub(crate) doc: Option<Cow<'a, str>>,
218    /// Alternative names for this record
219    #[serde(borrow, default)]
220    pub(crate) aliases: Vec<&'a str>,
221    /// The fields contained in this record
222    #[serde(borrow)]
223    pub(crate) fields: Vec<Field<'a>>,
224    /// Additional attributes for this record
225    #[serde(flatten)]
226    pub(crate) attributes: Attributes<'a>,
227}
228
229fn deserialize_default<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
230where
231    D: serde::Deserializer<'de>,
232{
233    Value::deserialize(deserializer).map(Some)
234}
235
236/// A field within a [`Record`]
237#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
238pub(crate) struct Field<'a> {
239    /// Name of the field within the record
240    #[serde(borrow)]
241    pub(crate) name: &'a str,
242    /// Optional documentation for this field
243    #[serde(borrow, default)]
244    pub(crate) doc: Option<Cow<'a, str>>,
245    /// The field's type definition
246    #[serde(borrow)]
247    pub(crate) r#type: Schema<'a>,
248    /// Optional default value for this field
249    #[serde(deserialize_with = "deserialize_default", default)]
250    pub(crate) default: Option<Value>,
251    /// Alternative names (aliases) for this field (Avro spec: field-level aliases).
252    /// Borrowed from input JSON where possible.
253    #[serde(borrow, default)]
254    pub(crate) aliases: Vec<&'a str>,
255}
256
257/// An enumeration
258///
259/// <https://avro.apache.org/docs/1.11.1/specification/#enums>
260#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
261pub(crate) struct Enum<'a> {
262    /// Name of the enum
263    #[serde(borrow)]
264    pub(crate) name: &'a str,
265    /// Optional namespace for the enum, provides organizational structure
266    #[serde(borrow, default)]
267    pub(crate) namespace: Option<&'a str>,
268    /// Optional documentation string describing the enum
269    #[serde(borrow, default)]
270    pub(crate) doc: Option<Cow<'a, str>>,
271    /// Alternative names for this enum
272    #[serde(borrow, default)]
273    pub(crate) aliases: Vec<&'a str>,
274    /// The symbols (values) that this enum can have
275    #[serde(borrow)]
276    pub(crate) symbols: Vec<&'a str>,
277    /// Optional default value for this enum
278    #[serde(borrow, default)]
279    pub(crate) default: Option<&'a str>,
280    /// Additional attributes for this enum
281    #[serde(flatten)]
282    pub(crate) attributes: Attributes<'a>,
283}
284
285/// An array
286///
287/// <https://avro.apache.org/docs/1.11.1/specification/#arrays>
288#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
289pub(crate) struct Array<'a> {
290    /// The schema for items in this array
291    #[serde(borrow)]
292    pub(crate) items: Box<Schema<'a>>,
293    /// Additional attributes for this array
294    #[serde(flatten)]
295    pub(crate) attributes: Attributes<'a>,
296}
297
298/// A map
299///
300/// <https://avro.apache.org/docs/1.11.1/specification/#maps>
301#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
302pub(crate) struct Map<'a> {
303    /// The schema for values in this map
304    #[serde(borrow)]
305    pub(crate) values: Box<Schema<'a>>,
306    /// Additional attributes for this map
307    #[serde(flatten)]
308    pub(crate) attributes: Attributes<'a>,
309}
310
311/// A fixed length binary array
312///
313/// <https://avro.apache.org/docs/1.11.1/specification/#fixed>
314#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
315pub(crate) struct Fixed<'a> {
316    /// Name of the fixed type
317    #[serde(borrow)]
318    pub(crate) name: &'a str,
319    /// Optional namespace for the fixed type
320    #[serde(borrow, default)]
321    pub(crate) namespace: Option<&'a str>,
322    /// Alternative names for this fixed type
323    #[serde(borrow, default)]
324    pub(crate) aliases: Vec<&'a str>,
325    /// The number of bytes in this fixed type
326    pub(crate) size: usize,
327    /// Additional attributes for this fixed type
328    #[serde(flatten)]
329    pub(crate) attributes: Attributes<'a>,
330}
331
332#[derive(Debug, Copy, Clone, PartialEq, Default)]
333pub(crate) struct AvroSchemaOptions {
334    pub(crate) null_order: Option<Nullability>,
335    pub(crate) strip_metadata: bool,
336}
337
338/// A wrapper for an Avro schema in its JSON string representation.
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340pub struct AvroSchema {
341    /// The Avro schema as a JSON string.
342    pub json_string: String,
343}
344
345impl TryFrom<&ArrowSchema> for AvroSchema {
346    type Error = ArrowError;
347
348    /// Converts an `ArrowSchema` to `AvroSchema`, delegating to
349    /// `AvroSchema::from_arrow_with_options` with `None` so that the
350    /// union null ordering is decided by `Nullability::default()`.
351    fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
352        AvroSchema::from_arrow_with_options(schema, None)
353    }
354}
355
356impl AvroSchema {
357    /// Creates a new `AvroSchema` from a JSON string.
358    pub fn new(json_string: String) -> Self {
359        Self { json_string }
360    }
361
362    pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
363        serde_json::from_str(self.json_string.as_str())
364            .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
365    }
366
367    /// Returns the fingerprint of the schema, computed using the specified [`FingerprintAlgorithm`].
368    ///
369    /// The fingerprint is computed over the schema's Parsed Canonical Form
370    /// as defined by the Avro specification. Depending on `hash_type`, this
371    /// will return one of the supported [`Fingerprint`] variants:
372    /// - [`Fingerprint::Rabin`] for [`FingerprintAlgorithm::Rabin`]
373    /// - `Fingerprint::MD5` for `FingerprintAlgorithm::MD5`
374    /// - `Fingerprint::SHA256` for `FingerprintAlgorithm::SHA256`
375    ///
376    /// Note: [`FingerprintAlgorithm::Id`] or [`FingerprintAlgorithm::Id64`] cannot be used to generate a fingerprint
377    /// and will result in an error. If you intend to use a Schema Registry ID-based
378    /// wire format, either use [`SchemaStore::set`] or load the [`Fingerprint::Id`] directly via [`Fingerprint::load_fingerprint_id`] or for
379    /// [`Fingerprint::Id64`] via [`Fingerprint::load_fingerprint_id64`].
380    ///
381    /// See also: <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
382    ///
383    /// # Errors
384    /// Returns an error if deserializing the schema fails, if generating the
385    /// canonical form of the schema fails, or if `hash_type` is [`FingerprintAlgorithm::Id`].
386    ///
387    /// # Examples
388    /// ```
389    /// use arrow_avro::schema::{AvroSchema, FingerprintAlgorithm};
390    ///
391    /// let avro = AvroSchema::new("\"string\"".to_string());
392    /// let fp = avro.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
393    /// ```
394    pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
395        Self::generate_fingerprint(&self.schema()?, hash_type)
396    }
397
398    pub(crate) fn project(&self, projection: &[usize]) -> Result<Self, ArrowError> {
399        let mut value: Value = serde_json::from_str(&self.json_string)
400            .map_err(|e| ArrowError::AvroError(format!("Invalid Avro schema JSON: {e}")))?;
401        let obj = value.as_object_mut().ok_or_else(|| {
402            ArrowError::AvroError(
403                "Projected schema must be a JSON object Avro record schema".to_string(),
404            )
405        })?;
406        match obj.get("type").and_then(|v| v.as_str()) {
407            Some("record") => {}
408            Some(other) => {
409                return Err(ArrowError::AvroError(format!(
410                    "Projected schema must be an Avro record, found type '{other}'"
411                )));
412            }
413            None => {
414                return Err(ArrowError::AvroError(
415                    "Projected schema missing required 'type' field".to_string(),
416                ));
417            }
418        }
419        let fields_val = obj.get_mut("fields").ok_or_else(|| {
420            ArrowError::AvroError("Avro record schema missing required 'fields'".to_string())
421        })?;
422        let projected_fields = {
423            let mut original_fields = match fields_val {
424                Value::Array(arr) => std::mem::take(arr),
425                _ => {
426                    return Err(ArrowError::AvroError(
427                        "Avro record schema 'fields' must be an array".to_string(),
428                    ));
429                }
430            };
431            let len = original_fields.len();
432            let mut seen: HashSet<usize> = HashSet::with_capacity(projection.len());
433            let mut out: Vec<Value> = Vec::with_capacity(projection.len());
434            for &i in projection {
435                if i >= len {
436                    return Err(ArrowError::AvroError(format!(
437                        "Projection index {i} out of bounds for record with {len} fields"
438                    )));
439                }
440                if !seen.insert(i) {
441                    return Err(ArrowError::AvroError(format!(
442                        "Duplicate projection index {i}"
443                    )));
444                }
445                out.push(std::mem::replace(&mut original_fields[i], Value::Null));
446            }
447            out
448        };
449        *fields_val = Value::Array(projected_fields);
450        let json_string = serde_json::to_string(&value).map_err(|e| {
451            ArrowError::AvroError(format!(
452                "Failed to serialize projected Avro schema JSON: {e}"
453            ))
454        })?;
455        Ok(Self::new(json_string))
456    }
457
458    pub(crate) fn generate_fingerprint(
459        schema: &Schema,
460        hash_type: FingerprintAlgorithm,
461    ) -> Result<Fingerprint, ArrowError> {
462        let canonical = Self::generate_canonical_form(schema).map_err(|e| {
463            ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
464        })?;
465        match hash_type {
466            FingerprintAlgorithm::Rabin => {
467                Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
468            }
469            FingerprintAlgorithm::Id | FingerprintAlgorithm::Id64 => Err(ArrowError::SchemaError(
470                "FingerprintAlgorithm of Id or Id64 cannot be used to generate a fingerprint; \
471                if using Fingerprint::Id, pass the registry ID in instead using the set method."
472                    .to_string(),
473            )),
474            #[cfg(feature = "md5")]
475            FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
476            #[cfg(feature = "sha256")]
477            FingerprintAlgorithm::SHA256 => {
478                Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
479            }
480        }
481    }
482
483    /// Generates the Parsed Canonical Form for the given `Schema`.
484    ///
485    /// The canonical form is a standardized JSON representation of the schema,
486    /// primarily used for generating a schema fingerprint for equality checking.
487    ///
488    /// This form strips attributes that do not affect the schema's identity,
489    /// such as `doc` fields, `aliases`, and any properties not defined in the
490    /// Avro specification.
491    ///
492    /// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
493    pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
494        build_canonical(schema, None)
495    }
496
497    /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given null‑union order and optionally stripping internal Arrow metadata.
498    ///
499    /// If the input Arrow schema already contains Avro JSON in
500    /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
501    /// the exact header encoding alignment; otherwise, a new JSON is generated
502    /// honoring `null_union_order` at **all nullable sites**.
503    pub(crate) fn from_arrow_with_options(
504        schema: &ArrowSchema,
505        options: Option<AvroSchemaOptions>,
506    ) -> Result<AvroSchema, ArrowError> {
507        let opts = options.unwrap_or_default();
508        let order = opts.null_order.unwrap_or_default();
509        let strip = opts.strip_metadata;
510        if !strip {
511            if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
512                return Ok(AvroSchema::new(json.clone()));
513            }
514        }
515        let mut name_gen = NameGenerator::default();
516        let fields_json = schema
517            .fields()
518            .iter()
519            .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
520            .collect::<Result<Vec<_>, _>>()?;
521        let record_name = schema
522            .metadata
523            .get(AVRO_NAME_METADATA_KEY)
524            .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
525        let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
526        record.insert("type".into(), Value::String("record".into()));
527        record.insert(
528            "name".into(),
529            Value::String(sanitise_avro_name(record_name)),
530        );
531        if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
532            record.insert("namespace".into(), Value::String(ns.clone()));
533        }
534        if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
535            record.insert("doc".into(), Value::String(doc.clone()));
536        }
537        record.insert("fields".into(), Value::Array(fields_json));
538        extend_with_passthrough_metadata(&mut record, &schema.metadata);
539        let json_string = serde_json::to_string(&Value::Object(record))
540            .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
541        Ok(AvroSchema::new(json_string))
542    }
543}
544
545/// A stack-allocated, fixed-size buffer for the prefix.
546#[derive(Debug, Copy, Clone)]
547pub(crate) struct Prefix {
548    buf: [u8; MAX_PREFIX_LEN],
549    len: u8,
550}
551
552impl Prefix {
553    #[inline]
554    pub(crate) fn as_slice(&self) -> &[u8] {
555        &self.buf[..self.len as usize]
556    }
557}
558
559/// Defines the strategy for generating the per-record prefix for an Avro binary stream.
560#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
561pub enum FingerprintStrategy {
562    /// Use the 64-bit Rabin fingerprint (default for single-object encoding).
563    #[default]
564    Rabin,
565    /// Use a Confluent Schema Registry 32-bit ID.
566    Id(u32),
567    /// Use an Apicurio Schema Registry 64-bit ID.
568    Id64(u64),
569    #[cfg(feature = "md5")]
570    /// Use the 128-bit MD5 fingerprint.
571    MD5,
572    #[cfg(feature = "sha256")]
573    /// Use the 256-bit SHA-256 fingerprint.
574    SHA256,
575}
576
577impl From<Fingerprint> for FingerprintStrategy {
578    fn from(f: Fingerprint) -> Self {
579        Self::from(&f)
580    }
581}
582
583impl From<FingerprintAlgorithm> for FingerprintStrategy {
584    fn from(f: FingerprintAlgorithm) -> Self {
585        match f {
586            FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
587            FingerprintAlgorithm::Id => FingerprintStrategy::Id(0),
588            FingerprintAlgorithm::Id64 => FingerprintStrategy::Id64(0),
589            #[cfg(feature = "md5")]
590            FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
591            #[cfg(feature = "sha256")]
592            FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
593        }
594    }
595}
596
597impl From<&Fingerprint> for FingerprintStrategy {
598    fn from(f: &Fingerprint) -> Self {
599        match f {
600            Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
601            Fingerprint::Id(_) => FingerprintStrategy::Id(0),
602            Fingerprint::Id64(_) => FingerprintStrategy::Id64(0),
603            #[cfg(feature = "md5")]
604            Fingerprint::MD5(_) => FingerprintStrategy::MD5,
605            #[cfg(feature = "sha256")]
606            Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
607        }
608    }
609}
610
611/// Supported fingerprint algorithms for Avro schema identification.
612/// For use with Confluent Schema Registry IDs, set to None.
613#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
614pub enum FingerprintAlgorithm {
615    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
616    #[default]
617    Rabin,
618    /// Represents a 32 bit fingerprint not based on a hash algorithm, (e.g., a 32-bit Schema Registry ID.)
619    Id,
620    /// Represents a 64 bit fingerprint not based on a hash algorithm, (e.g., a 64-bit Schema Registry ID.)
621    Id64,
622    #[cfg(feature = "md5")]
623    /// 128-bit MD5 message digest.
624    MD5,
625    #[cfg(feature = "sha256")]
626    /// 256-bit SHA-256 digest.
627    SHA256,
628}
629
630/// Allow easy extraction of the algorithm used to create a fingerprint.
631impl From<&Fingerprint> for FingerprintAlgorithm {
632    fn from(fp: &Fingerprint) -> Self {
633        match fp {
634            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
635            Fingerprint::Id(_) => FingerprintAlgorithm::Id,
636            Fingerprint::Id64(_) => FingerprintAlgorithm::Id64,
637            #[cfg(feature = "md5")]
638            Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
639            #[cfg(feature = "sha256")]
640            Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
641        }
642    }
643}
644
645impl From<FingerprintStrategy> for FingerprintAlgorithm {
646    fn from(s: FingerprintStrategy) -> Self {
647        Self::from(&s)
648    }
649}
650
651impl From<&FingerprintStrategy> for FingerprintAlgorithm {
652    fn from(s: &FingerprintStrategy) -> Self {
653        match s {
654            FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
655            FingerprintStrategy::Id(_) => FingerprintAlgorithm::Id,
656            FingerprintStrategy::Id64(_) => FingerprintAlgorithm::Id64,
657            #[cfg(feature = "md5")]
658            FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
659            #[cfg(feature = "sha256")]
660            FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
661        }
662    }
663}
664
665/// A schema fingerprint in one of the supported formats.
666///
667/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
668/// instance always stores only one variant, matching its configured
669/// `FingerprintAlgorithm`, but the enum makes the API uniform.
670///
671/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
672/// <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
673#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
674pub enum Fingerprint {
675    /// A 64-bit Rabin fingerprint.
676    Rabin(u64),
677    /// A 32-bit Schema Registry ID.
678    Id(u32),
679    /// A 64-bit Schema Registry ID.
680    Id64(u64),
681    #[cfg(feature = "md5")]
682    /// A 128-bit MD5 fingerprint.
683    MD5([u8; 16]),
684    #[cfg(feature = "sha256")]
685    /// A 256-bit SHA-256 fingerprint.
686    SHA256([u8; 32]),
687}
688
689impl From<FingerprintStrategy> for Fingerprint {
690    fn from(s: FingerprintStrategy) -> Self {
691        Self::from(&s)
692    }
693}
694
695impl From<&FingerprintStrategy> for Fingerprint {
696    fn from(s: &FingerprintStrategy) -> Self {
697        match s {
698            FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
699            FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
700            FingerprintStrategy::Id64(id) => Fingerprint::Id64(*id),
701            #[cfg(feature = "md5")]
702            FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
703            #[cfg(feature = "sha256")]
704            FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
705        }
706    }
707}
708
709impl From<FingerprintAlgorithm> for Fingerprint {
710    fn from(s: FingerprintAlgorithm) -> Self {
711        match s {
712            FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
713            FingerprintAlgorithm::Id => Fingerprint::Id(0),
714            FingerprintAlgorithm::Id64 => Fingerprint::Id64(0),
715            #[cfg(feature = "md5")]
716            FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
717            #[cfg(feature = "sha256")]
718            FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
719        }
720    }
721}
722
723impl Fingerprint {
724    /// Loads the 32-bit Schema Registry fingerprint (Confluent Schema Registry ID).
725    ///
726    /// The provided `id` is in big-endian wire order; this converts it to host order
727    /// and returns `Fingerprint::Id`.
728    ///
729    /// # Returns
730    /// A `Fingerprint::Id` variant containing the 32-bit fingerprint.
731    pub fn load_fingerprint_id(id: u32) -> Self {
732        Fingerprint::Id(u32::from_be(id))
733    }
734
735    /// Loads the 64-bit Schema Registry fingerprint (Apicurio Schema Registry ID).
736    ///
737    /// The provided `id` is in big-endian wire order; this converts it to host order
738    /// and returns `Fingerprint::Id64`.
739    ///
740    /// # Returns
741    /// A `Fingerprint::Id64` variant containing the 64-bit fingerprint.
742    pub fn load_fingerprint_id64(id: u64) -> Self {
743        Fingerprint::Id64(u64::from_be(id))
744    }
745
746    /// Constructs a serialized prefix represented as a `Vec<u8>` based on the variant of the enum.
747    ///
748    /// This method serializes data in different formats depending on the variant of `self`:
749    /// - **`Id(id)`**: Uses the Confluent wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
750    ///   followed by the big-endian byte representation of the `id`.
751    /// - **`Id64(id)`**: Uses the Apicurio wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
752    ///   followed by the big-endian 8-byte representation of the `id`.
753    /// - **`Rabin(val)`**: Uses the Avro single-object specification format. This includes a different magic header
754    ///   (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte representation of the `val`.
755    /// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard extension that adds the
756    ///   `SINGLE_OBJECT_MAGIC` header followed by the provided `bytes`.
757    /// - **`SHA256(bytes)`** (optional, `sha256` feature enabled): Similar to the `MD5` variant, this is
758    ///   a non-standard extension that attaches the `SINGLE_OBJECT_MAGIC` header followed by the given `bytes`.
759    ///
760    /// # Returns
761    ///
762    /// A `Prefix` containing the serialized prefix data.
763    ///
764    /// # Features
765    ///
766    /// - You can optionally enable the `md5` feature to include the `MD5` variant.
767    /// - You can optionally enable the `sha256` feature to include the `SHA256` variant.
768    ///
769    pub(crate) fn make_prefix(&self) -> Prefix {
770        let mut buf = [0u8; MAX_PREFIX_LEN];
771        let len = match self {
772            Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
773            Self::Id64(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
774            Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
775            #[cfg(feature = "md5")]
776            Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
777            #[cfg(feature = "sha256")]
778            Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
779        };
780        Prefix { buf, len }
781    }
782}
783
784fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
785    buf: &mut [u8; MAX_PREFIX_LEN],
786    magic: &[u8; MAGIC_LEN],
787    payload: &[u8; PAYLOAD_LEN],
788) -> u8 {
789    debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
790    let total = MAGIC_LEN + PAYLOAD_LEN;
791    let prefix_slice = &mut buf[..total];
792    prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
793    prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
794    total as u8
795}
796
797/// An in-memory cache of Avro schemas, indexed by their fingerprint.
798///
799/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently.
800/// Each schema is associated with a unique [`Fingerprint`], which is generated based
801/// on the schema's canonical form and a specific hashing algorithm.
802///
803/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin,
804/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations.
805/// This ensures consistency when generating fingerprints and looking up schemas.
806/// All schemas registered will have their fingerprint computed with this algorithm, and
807/// lookups must use a matching fingerprint.
808///
809/// # Examples
810///
811/// ```no_run
812/// // Create a new store with the default Rabin fingerprinting.
813/// use arrow_avro::schema::{AvroSchema, SchemaStore};
814///
815/// let mut store = SchemaStore::new();
816/// let schema = AvroSchema::new("\"string\"".to_string());
817/// // Register the schema to get its fingerprint.
818/// let fingerprint = store.register(schema.clone()).unwrap();
819/// // Use the fingerprint to look up the schema.
820/// let retrieved_schema = store.lookup(&fingerprint).cloned();
821/// assert_eq!(retrieved_schema, Some(schema));
822/// ```
823#[derive(Debug, Clone, Default)]
824pub struct SchemaStore {
825    /// The hashing algorithm used for generating fingerprints.
826    fingerprint_algorithm: FingerprintAlgorithm,
827    /// A map from a schema's fingerprint to the schema itself.
828    schemas: HashMap<Fingerprint, AvroSchema>,
829}
830
831impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
832    type Error = ArrowError;
833
834    /// Creates a `SchemaStore` from a HashMap of schemas.
835    /// Each schema in the HashMap is registered with the new store.
836    fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
837        Ok(Self {
838            schemas,
839            ..Self::default()
840        })
841    }
842}
843
844impl SchemaStore {
845    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
846    pub fn new() -> Self {
847        Self::default()
848    }
849
850    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
851    pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
852        Self {
853            fingerprint_algorithm,
854            ..Self::default()
855        }
856    }
857
858    /// Registers a schema with the store and the provided fingerprint.
859    /// Note: Confluent wire format implementations should leverage this method.
860    ///
861    /// A schema is set in the store, using the provided fingerprint. If a schema
862    /// with the same fingerprint does not already exist in the store, the new schema
863    /// is inserted. If the fingerprint already exists, the existing schema is not overwritten.
864    ///
865    /// # Arguments
866    ///
867    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to register.
868    /// * `schema` - The `AvroSchema` to register.
869    ///
870    /// # Returns
871    ///
872    /// A `Result` returning the provided `Fingerprint` of the schema if successful,
873    /// or an `ArrowError` on failure.
874    pub fn set(
875        &mut self,
876        fingerprint: Fingerprint,
877        schema: AvroSchema,
878    ) -> Result<Fingerprint, ArrowError> {
879        match self.schemas.entry(fingerprint) {
880            Entry::Occupied(entry) => {
881                if entry.get() != &schema {
882                    return Err(ArrowError::ComputeError(format!(
883                        "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
884                    )));
885                }
886            }
887            Entry::Vacant(entry) => {
888                entry.insert(schema);
889            }
890        }
891        Ok(fingerprint)
892    }
893
894    /// Registers a schema with the store and returns its fingerprint.
895    ///
896    /// A fingerprint is calculated for the given schema using the store's configured
897    /// hash type. If a schema with the same fingerprint does not already exist in the
898    /// store, the new schema is inserted. If the fingerprint already exists, the
899    /// existing schema is not overwritten. If FingerprintAlgorithm is set to Id or Id64, this
900    /// method will return an error. Confluent wire format implementations should leverage the
901    /// set method instead.
902    ///
903    /// # Arguments
904    ///
905    /// * `schema` - The `AvroSchema` to register.
906    ///
907    /// # Returns
908    ///
909    /// A `Result` containing the `Fingerprint` of the schema if successful,
910    /// or an `ArrowError` on failure.
911    pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
912        if self.fingerprint_algorithm == FingerprintAlgorithm::Id
913            || self.fingerprint_algorithm == FingerprintAlgorithm::Id64
914        {
915            return Err(ArrowError::SchemaError(
916                "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
917            Use the set method directly instead, providing a valid fingerprint"
918                    .to_string(),
919            ));
920        }
921        let fingerprint =
922            AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
923        self.set(fingerprint, schema)?;
924        Ok(fingerprint)
925    }
926
927    /// Looks up a schema by its `Fingerprint`.
928    ///
929    /// # Arguments
930    ///
931    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to look up.
932    ///
933    /// # Returns
934    ///
935    /// An `Option` containing a clone of the `AvroSchema` if found, otherwise `None`.
936    pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
937        self.schemas.get(fingerprint)
938    }
939
940    /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently
941    /// held by this [`SchemaStore`].
942    ///
943    /// The order of the returned fingerprints is unspecified and should not be
944    /// relied upon.
945    pub fn fingerprints(&self) -> Vec<Fingerprint> {
946        self.schemas.keys().copied().collect()
947    }
948
949    /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting.
950    pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
951        self.fingerprint_algorithm
952    }
953}
954
955fn quote(s: &str) -> Result<String, ArrowError> {
956    serde_json::to_string(s)
957        .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
958}
959
960// Avro names are defined by a `name` and an optional `namespace`.
961// The full name is composed of the namespace and the name, separated by a dot.
962//
963// Avro specification defines two ways to specify a full name:
964// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
965//    In this case, the `namespace` attribute is ignored.
966// 2. The `name` attribute contains the simple name (e.g., "d") and the
967//    `namespace` attribute contains the namespace (e.g., "a.b.c").
968//
969// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
970// Complex paths with quotes or backticks like `a."hi".b` are not supported.
971//
972// This function constructs the full name and extracts the namespace,
973// handling both ways of specifying the name. It prioritizes a namespace
974// defined within the `name` attribute itself, then the explicit `namespace_attr`,
975// and finally the `enclosing_ns`.
976pub(crate) fn make_full_name(
977    name: &str,
978    namespace_attr: Option<&str>,
979    enclosing_ns: Option<&str>,
980) -> (String, Option<String>) {
981    // `name` already contains a dot then treat as full-name, ignore namespace.
982    if let Some((ns, _)) = name.rsplit_once('.') {
983        return (name.to_string(), Some(ns.to_string()));
984    }
985    match namespace_attr.or(enclosing_ns) {
986        Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
987        None => (name.to_string(), None),
988    }
989}
990
991fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
992    Ok(match schema {
993        Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
994            TypeName::Primitive(pt) => quote(pt.as_ref())?,
995            TypeName::Ref(name) => {
996                let (full_name, _) = make_full_name(name, None, enclosing_ns);
997                quote(&full_name)?
998            }
999        },
1000        Schema::Union(branches) => format!(
1001            "[{}]",
1002            branches
1003                .iter()
1004                .map(|b| build_canonical(b, enclosing_ns))
1005                .collect::<Result<Vec<_>, _>>()?
1006                .join(",")
1007        ),
1008        Schema::Complex(ct) => match ct {
1009            ComplexType::Record(r) => {
1010                let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
1011                let fields = r
1012                    .fields
1013                    .iter()
1014                    .map(|f| {
1015                        // PCF [STRIP] per Avro spec: keep only attributes relevant to parsing
1016                        // ("name" and "type" for fields) and **strip others** such as doc,
1017                        // default, order, and **aliases**. This preserves canonicalization. See:
1018                        // https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas
1019                        let field_type =
1020                            build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
1021                        Ok(format!(
1022                            r#"{{"name":{},"type":{}}}"#,
1023                            quote(f.name)?,
1024                            field_type
1025                        ))
1026                    })
1027                    .collect::<Result<Vec<_>, ArrowError>>()?
1028                    .join(",");
1029                format!(
1030                    r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
1031                    quote(&full_name)?,
1032                )
1033            }
1034            ComplexType::Enum(e) => {
1035                let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
1036                let symbols = e
1037                    .symbols
1038                    .iter()
1039                    .map(|s| quote(s))
1040                    .collect::<Result<Vec<_>, _>>()?
1041                    .join(",");
1042                format!(
1043                    r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
1044                    quote(&full_name)?
1045                )
1046            }
1047            ComplexType::Array(arr) => format!(
1048                r#"{{"type":"array","items":{}}}"#,
1049                build_canonical(&arr.items, enclosing_ns)?
1050            ),
1051            ComplexType::Map(map) => format!(
1052                r#"{{"type":"map","values":{}}}"#,
1053                build_canonical(&map.values, enclosing_ns)?
1054            ),
1055            ComplexType::Fixed(f) => {
1056                let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
1057                format!(
1058                    r#"{{"name":{},"type":"fixed","size":{}}}"#,
1059                    quote(&full_name)?,
1060                    f.size
1061                )
1062            }
1063        },
1064    })
1065}
1066
1067/// 64‑bit Rabin fingerprint as described in the Avro spec.
1068const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
1069
1070/// Build one entry of the polynomial‑division table.
1071///
1072/// We cannot yet write `for _ in 0..8` here: `for` loops rely on
1073/// `Iterator::next`, which is not `const` on stable Rust.  Until the
1074/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
1075/// loop is the only option in a `const fn`
1076const fn one_entry(i: usize) -> u64 {
1077    let mut fp = i as u64;
1078    let mut j = 0;
1079    while j < 8 {
1080        fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
1081        j += 1;
1082    }
1083    fp
1084}
1085
1086/// Build the full 256‑entry table at compile time.
1087///
1088/// We cannot yet write `for _ in 0..256` here: `for` loops rely on
1089/// `Iterator::next`, which is not `const` on stable Rust.  Until the
1090/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
1091/// loop is the only option in a `const fn`
1092const fn build_table() -> [u64; 256] {
1093    let mut table = [0u64; 256];
1094    let mut i = 0;
1095    while i < 256 {
1096        table[i] = one_entry(i);
1097        i += 1;
1098    }
1099    table
1100}
1101
1102/// The pre‑computed table.
1103static FINGERPRINT_TABLE: [u64; 256] = build_table();
1104
1105/// Computes the 64-bit Rabin fingerprint for a given canonical schema string.
1106/// This implementation is based on the Avro specification for schema fingerprinting.
1107pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1108    let mut fp = EMPTY;
1109    for &byte in canonical_form.as_bytes() {
1110        let idx = ((fp as u8) ^ byte) as usize;
1111        fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1112    }
1113    fp
1114}
1115
1116#[cfg(feature = "md5")]
1117/// Compute the **128‑bit MD5** fingerprint of the canonical form.
1118///
1119/// Returns a 16‑byte array (`[u8; 16]`) containing the full MD5 digest,
1120/// exactly as required by the Avro specification.
1121#[inline]
1122pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1123    let digest = md5::compute(canonical_form.as_bytes());
1124    digest.0
1125}
1126
1127#[cfg(feature = "sha256")]
1128/// Compute the **256‑bit SHA‑256** fingerprint of the canonical form.
1129///
1130/// Returns a 32‑byte array (`[u8; 32]`) containing the full SHA‑256 digest.
1131#[inline]
1132pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1133    let mut hasher = Sha256::new();
1134    hasher.update(canonical_form.as_bytes());
1135    let digest = hasher.finalize();
1136    digest.into()
1137}
1138
1139#[inline]
1140fn is_internal_arrow_key(key: &str) -> bool {
1141    key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1142}
1143
1144/// Copies Arrow schema metadata entries to the provided JSON map,
1145/// skipping keys that are Avro-reserved, internal Arrow keys, or
1146/// nested under the `avro.schema.` namespace. Values that parse as
1147/// JSON are inserted as JSON; otherwise the raw string is preserved.
1148fn extend_with_passthrough_metadata(
1149    target: &mut JsonMap<String, Value>,
1150    metadata: &HashMap<String, String>,
1151) {
1152    for (meta_key, meta_val) in metadata {
1153        if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1154            continue;
1155        }
1156        let json_val =
1157            serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1158        target.insert(meta_key.clone(), json_val);
1159    }
1160}
1161
1162// Sanitize an arbitrary string so it is a valid Avro field or type name
1163fn sanitise_avro_name(base_name: &str) -> String {
1164    if base_name.is_empty() {
1165        return "_".to_owned();
1166    }
1167    let mut out: String = base_name
1168        .chars()
1169        .map(|char| {
1170            if char.is_ascii_alphanumeric() || char == '_' {
1171                char
1172            } else {
1173                '_'
1174            }
1175        })
1176        .collect();
1177    if out.as_bytes()[0].is_ascii_digit() {
1178        out.insert(0, '_');
1179    }
1180    out
1181}
1182
1183#[derive(Default)]
1184struct NameGenerator {
1185    used: HashSet<String>,
1186    counters: HashMap<String, usize>,
1187}
1188
1189impl NameGenerator {
1190    fn make_unique(&mut self, field_name: &str) -> String {
1191        let field_name = sanitise_avro_name(field_name);
1192        if self.used.insert(field_name.clone()) {
1193            self.counters.insert(field_name.clone(), 1);
1194            return field_name;
1195        }
1196        let counter = self.counters.entry(field_name.clone()).or_insert(1);
1197        loop {
1198            let candidate = format!("{field_name}_{}", *counter);
1199            if self.used.insert(candidate.clone()) {
1200                return candidate;
1201            }
1202            *counter += 1;
1203        }
1204    }
1205}
1206
1207fn merge_extras(schema: Value, extras: JsonMap<String, Value>) -> Value {
1208    if extras.is_empty() {
1209        return schema;
1210    }
1211    match schema {
1212        Value::Object(mut map) => {
1213            map.extend(extras);
1214            Value::Object(map)
1215        }
1216        Value::Array(mut union) => {
1217            // For unions, we cannot attach attributes to the array itself (per Avro spec).
1218            // As a fallback for extension metadata, attach extras to the first non-null branch object.
1219            if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1220                let original = std::mem::take(non_null);
1221                *non_null = merge_extras(original, extras);
1222            }
1223            Value::Array(union)
1224        }
1225        primitive => {
1226            let mut map = JsonMap::with_capacity(extras.len() + 1);
1227            map.insert("type".into(), primitive);
1228            map.extend(extras);
1229            Value::Object(map)
1230        }
1231    }
1232}
1233
1234#[inline]
1235fn is_avro_json_null(v: &Value) -> bool {
1236    matches!(v, Value::String(s) if s == "null")
1237}
1238
1239fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1240    let null = Value::String("null".into());
1241    match inner {
1242        Value::Array(mut union) => {
1243            // If this site is already a union and already contains "null",
1244            // preserve the branch order exactly. Reordering "null" breaks
1245            // the correspondence between Arrow union child order (type_ids)
1246            // and the Avro branch index written on the wire.
1247            if union.iter().any(is_avro_json_null) {
1248                return Value::Array(union);
1249            }
1250            // Otherwise, inject "null" without reordering existing branches.
1251            match null_order {
1252                Nullability::NullFirst => union.insert(0, null),
1253                Nullability::NullSecond => union.push(null),
1254            }
1255            Value::Array(union)
1256        }
1257        other => match null_order {
1258            Nullability::NullFirst => Value::Array(vec![null, other]),
1259            Nullability::NullSecond => Value::Array(vec![other, null]),
1260        },
1261    }
1262}
1263
1264fn min_fixed_bytes_for_precision(p: usize) -> usize {
1265    // From the spec: max precision for n=1..=32 bytes:
1266    // [2,4,6,9,11,14,16,18,21,23,26,28,31,33,35,38,40,43,45,47,50,52,55,57,59,62,64,67,69,71,74,76]
1267    const MAX_P: [usize; 32] = [
1268        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1269        59, 62, 64, 67, 69, 71, 74, 76,
1270    ];
1271    for (i, &max_p) in MAX_P.iter().enumerate() {
1272        if p <= max_p {
1273            return i + 1;
1274        }
1275    }
1276    32 // saturate at Decimal256
1277}
1278
1279fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1280    match branch {
1281        Value::String(t) => Ok(format!("P:{t}")),
1282        Value::Object(map) => {
1283            let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1284                ArrowError::SchemaError("Union branch object missing string 'type'".into())
1285            })?;
1286            match t {
1287                "record" | "enum" | "fixed" => {
1288                    let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1289                        ArrowError::SchemaError(format!(
1290                            "Union branch '{t}' missing required 'name'"
1291                        ))
1292                    })?;
1293                    Ok(format!("N:{t}:{name}"))
1294                }
1295                "array" | "map" => Ok(format!("C:{t}")),
1296                other => Ok(format!("P:{other}")),
1297            }
1298        }
1299        Value::Array(_) => Err(ArrowError::SchemaError(
1300            "Avro union may not immediately contain another union".into(),
1301        )),
1302        _ => Err(ArrowError::SchemaError(
1303            "Invalid JSON for Avro union branch".into(),
1304        )),
1305    }
1306}
1307
1308fn datatype_to_avro(
1309    dt: &DataType,
1310    field_name: &str,
1311    metadata: &HashMap<String, String>,
1312    name_gen: &mut NameGenerator,
1313    null_order: Nullability,
1314    strip: bool,
1315) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1316    let mut extras = JsonMap::new();
1317    let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1318        if *scale < 0 {
1319            return Err(ArrowError::SchemaError(format!(
1320                "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1321            )));
1322        }
1323        if (*scale as usize) > (*precision as usize) {
1324            return Err(ArrowError::SchemaError(format!(
1325                "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1326                 must be <= precision ({precision})"
1327            )));
1328        }
1329        let mut meta = JsonMap::from_iter([
1330            ("logicalType".into(), json!("decimal")),
1331            ("precision".into(), json!(*precision)),
1332            ("scale".into(), json!(*scale)),
1333        ]);
1334        let mut fixed_size = metadata.get("size").and_then(|v| v.parse::<usize>().ok());
1335        let carries_name = metadata.contains_key(AVRO_NAME_METADATA_KEY)
1336            || metadata.contains_key(AVRO_NAMESPACE_METADATA_KEY);
1337        if fixed_size.is_none() && carries_name {
1338            fixed_size = Some(min_fixed_bytes_for_precision(*precision as usize));
1339        }
1340        if let Some(size) = fixed_size {
1341            meta.insert("type".into(), json!("fixed"));
1342            meta.insert("size".into(), json!(size));
1343            let chosen_name = metadata
1344                .get(AVRO_NAME_METADATA_KEY)
1345                .map(|s| sanitise_avro_name(s))
1346                .unwrap_or_else(|| name_gen.make_unique(field_name));
1347            meta.insert("name".into(), json!(chosen_name));
1348            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1349                meta.insert("namespace".into(), json!(ns));
1350            }
1351        } else {
1352            // default to bytes-backed decimal
1353            meta.insert("type".into(), json!("bytes"));
1354        }
1355        Ok(Value::Object(meta))
1356    };
1357    let val = match dt {
1358        DataType::Null => Value::String("null".into()),
1359        DataType::Boolean => Value::String("boolean".into()),
1360        DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 | DataType::Int32 => {
1361            Value::String("int".into())
1362        }
1363        DataType::UInt32 | DataType::Int64 | DataType::UInt64 => Value::String("long".into()),
1364        DataType::Float16 | DataType::Float32 => Value::String("float".into()),
1365        DataType::Float64 => Value::String("double".into()),
1366        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1367        DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1368        DataType::BinaryView => {
1369            if !strip {
1370                extras.insert("arrowBinaryView".into(), Value::Bool(true));
1371            }
1372            Value::String("bytes".into())
1373        }
1374        DataType::FixedSizeBinary(len) => {
1375            let md_is_uuid = metadata
1376                .get("logicalType")
1377                .map(|s| s.trim_matches('"') == "uuid")
1378                .unwrap_or(false);
1379            #[cfg(feature = "canonical_extension_types")]
1380            let ext_is_uuid = metadata
1381                .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
1382                .map(|v| v == arrow_schema::extension::Uuid::NAME || v == "uuid")
1383                .unwrap_or(false);
1384            #[cfg(not(feature = "canonical_extension_types"))]
1385            let ext_is_uuid = false;
1386            let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
1387            if is_uuid {
1388                json!({ "type": "string", "logicalType": "uuid" })
1389            } else {
1390                let chosen_name = metadata
1391                    .get(AVRO_NAME_METADATA_KEY)
1392                    .map(|s| sanitise_avro_name(s))
1393                    .unwrap_or_else(|| name_gen.make_unique(field_name));
1394                let mut obj = JsonMap::from_iter([
1395                    ("type".into(), json!("fixed")),
1396                    ("name".into(), json!(chosen_name)),
1397                    ("size".into(), json!(len)),
1398                ]);
1399                if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1400                    obj.insert("namespace".into(), json!(ns));
1401                }
1402                Value::Object(obj)
1403            }
1404        }
1405        #[cfg(feature = "small_decimals")]
1406        DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1407            handle_decimal(precision, scale)?
1408        }
1409        DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1410            handle_decimal(precision, scale)?
1411        }
1412        DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1413        DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1414        DataType::Time32(unit) => match unit {
1415            TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1416            TimeUnit::Second => {
1417                if !strip {
1418                    extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1419                }
1420                Value::String("int".into())
1421            }
1422            _ => Value::String("int".into()),
1423        },
1424        DataType::Time64(unit) => match unit {
1425            TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1426            TimeUnit::Nanosecond => {
1427                if !strip {
1428                    extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1429                }
1430                Value::String("long".into())
1431            }
1432            _ => Value::String("long".into()),
1433        },
1434        DataType::Timestamp(unit, tz) => {
1435            let logical_type = match (unit, tz.is_some()) {
1436                (TimeUnit::Millisecond, true) => "timestamp-millis",
1437                (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1438                (TimeUnit::Microsecond, true) => "timestamp-micros",
1439                (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1440                (TimeUnit::Nanosecond, true) => "timestamp-nanos",
1441                (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
1442                (TimeUnit::Second, _) => {
1443                    if !strip {
1444                        extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1445                    }
1446                    return Ok((Value::String("long".into()), extras));
1447                }
1448            };
1449            if !strip && matches!(unit, TimeUnit::Nanosecond) {
1450                extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1451            }
1452            json!({ "type": "long", "logicalType": logical_type })
1453        }
1454        #[cfg(not(feature = "avro_custom_types"))]
1455        DataType::Duration(_unit) => Value::String("long".into()),
1456        #[cfg(feature = "avro_custom_types")]
1457        DataType::Duration(unit) => {
1458            // When the feature is enabled, create an Avro schema object
1459            // with the correct `logicalType` annotation.
1460            let logical_type = match unit {
1461                TimeUnit::Second => "arrow.duration-seconds",
1462                TimeUnit::Millisecond => "arrow.duration-millis",
1463                TimeUnit::Microsecond => "arrow.duration-micros",
1464                TimeUnit::Nanosecond => "arrow.duration-nanos",
1465            };
1466            json!({ "type": "long", "logicalType": logical_type })
1467        }
1468        DataType::Interval(IntervalUnit::MonthDayNano) => {
1469            // Avro duration logical type: fixed(12) with months/days/millis per spec.
1470            let chosen_name = metadata
1471                .get(AVRO_NAME_METADATA_KEY)
1472                .map(|s| sanitise_avro_name(s))
1473                .unwrap_or_else(|| name_gen.make_unique(field_name));
1474            let mut obj = JsonMap::from_iter([
1475                ("type".into(), json!("fixed")),
1476                ("name".into(), json!(chosen_name)),
1477                ("size".into(), json!(12)),
1478                ("logicalType".into(), json!("duration")),
1479            ]);
1480            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1481                obj.insert("namespace".into(), json!(ns));
1482            }
1483            json!(obj)
1484        }
1485        DataType::Interval(IntervalUnit::YearMonth) => {
1486            if !strip {
1487                extras.insert(
1488                    "arrowIntervalUnit".into(),
1489                    Value::String("yearmonth".into()),
1490                );
1491            }
1492            Value::String("long".into())
1493        }
1494        DataType::Interval(IntervalUnit::DayTime) => {
1495            if !strip {
1496                extras.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1497            }
1498            Value::String("long".into())
1499        }
1500        DataType::List(child) | DataType::LargeList(child) => {
1501            if matches!(dt, DataType::LargeList(_)) && !strip {
1502                extras.insert("arrowLargeList".into(), Value::Bool(true));
1503            }
1504            let items_schema = process_datatype(
1505                child.data_type(),
1506                child.name(),
1507                child.metadata(),
1508                name_gen,
1509                null_order,
1510                child.is_nullable(),
1511                strip,
1512            )?;
1513            json!({
1514                "type": "array",
1515                "items": items_schema
1516            })
1517        }
1518        DataType::ListView(child) | DataType::LargeListView(child) => {
1519            if matches!(dt, DataType::LargeListView(_)) && !strip {
1520                extras.insert("arrowLargeList".into(), Value::Bool(true));
1521            }
1522            if !strip {
1523                extras.insert("arrowListView".into(), Value::Bool(true));
1524            }
1525            let items_schema = process_datatype(
1526                child.data_type(),
1527                child.name(),
1528                child.metadata(),
1529                name_gen,
1530                null_order,
1531                child.is_nullable(),
1532                strip,
1533            )?;
1534            json!({
1535                "type": "array",
1536                "items": items_schema
1537            })
1538        }
1539        DataType::FixedSizeList(child, len) => {
1540            if !strip {
1541                extras.insert("arrowFixedSize".into(), json!(len));
1542            }
1543            let items_schema = process_datatype(
1544                child.data_type(),
1545                child.name(),
1546                child.metadata(),
1547                name_gen,
1548                null_order,
1549                child.is_nullable(),
1550                strip,
1551            )?;
1552            json!({
1553                "type": "array",
1554                "items": items_schema
1555            })
1556        }
1557        DataType::Map(entries, _) => {
1558            let value_field = match entries.data_type() {
1559                DataType::Struct(fs) => &fs[1],
1560                _ => {
1561                    return Err(ArrowError::SchemaError(
1562                        "Map 'entries' field must be Struct(key,value)".into(),
1563                    ));
1564                }
1565            };
1566            let values_schema = process_datatype(
1567                value_field.data_type(),
1568                value_field.name(),
1569                value_field.metadata(),
1570                name_gen,
1571                null_order,
1572                value_field.is_nullable(),
1573                strip,
1574            )?;
1575            json!({
1576                "type": "map",
1577                "values": values_schema
1578            })
1579        }
1580        DataType::Struct(fields) => {
1581            let avro_fields = fields
1582                .iter()
1583                .map(|field| arrow_field_to_avro(field, name_gen, null_order, strip))
1584                .collect::<Result<Vec<_>, _>>()?;
1585            // Prefer avro.name/avro.namespace when provided on the struct field metadata
1586            let chosen_name = metadata
1587                .get(AVRO_NAME_METADATA_KEY)
1588                .map(|s| sanitise_avro_name(s))
1589                .unwrap_or_else(|| name_gen.make_unique(field_name));
1590            let mut obj = JsonMap::from_iter([
1591                ("type".into(), json!("record")),
1592                ("name".into(), json!(chosen_name)),
1593                ("fields".into(), Value::Array(avro_fields)),
1594            ]);
1595            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1596                obj.insert("namespace".into(), json!(ns));
1597            }
1598            Value::Object(obj)
1599        }
1600        DataType::Dictionary(_, value) => {
1601            if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1602                let symbols: Vec<&str> =
1603                    serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1604                // Prefer avro.name/namespace when provided for enums
1605                let chosen_name = metadata
1606                    .get(AVRO_NAME_METADATA_KEY)
1607                    .map(|s| sanitise_avro_name(s))
1608                    .unwrap_or_else(|| name_gen.make_unique(field_name));
1609                let mut obj = JsonMap::from_iter([
1610                    ("type".into(), json!("enum")),
1611                    ("name".into(), json!(chosen_name)),
1612                    ("symbols".into(), json!(symbols)),
1613                ]);
1614                if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1615                    obj.insert("namespace".into(), json!(ns));
1616                }
1617                Value::Object(obj)
1618            } else {
1619                process_datatype(
1620                    value.as_ref(),
1621                    field_name,
1622                    metadata,
1623                    name_gen,
1624                    null_order,
1625                    false,
1626                    strip,
1627                )?
1628            }
1629        }
1630        #[cfg(feature = "avro_custom_types")]
1631        DataType::RunEndEncoded(run_ends, values) => {
1632            let bits = match run_ends.data_type() {
1633                DataType::Int16 => 16,
1634                DataType::Int32 => 32,
1635                DataType::Int64 => 64,
1636                other => {
1637                    return Err(ArrowError::SchemaError(format!(
1638                        "RunEndEncoded requires Int16/Int32/Int64 for run_ends, found: {other:?}"
1639                    )));
1640                }
1641            };
1642            // Build the value site schema, preserving its own nullability
1643            let (value_schema, value_extras) = datatype_to_avro(
1644                values.data_type(),
1645                values.name(),
1646                values.metadata(),
1647                name_gen,
1648                null_order,
1649                strip,
1650            )?;
1651            let mut merged = merge_extras(value_schema, value_extras);
1652            if values.is_nullable() {
1653                merged = wrap_nullable(merged, null_order);
1654            }
1655            let mut extras = JsonMap::new();
1656            extras.insert("logicalType".into(), json!("arrow.run-end-encoded"));
1657            extras.insert("arrow.runEndIndexBits".into(), json!(bits));
1658            return Ok((merged, extras));
1659        }
1660        #[cfg(not(feature = "avro_custom_types"))]
1661        DataType::RunEndEncoded(_run_ends, values) => {
1662            let (value_schema, _extras) = datatype_to_avro(
1663                values.data_type(),
1664                values.name(),
1665                values.metadata(),
1666                name_gen,
1667                null_order,
1668                strip,
1669            )?;
1670            return Ok((value_schema, JsonMap::new()));
1671        }
1672        DataType::Union(fields, mode) => {
1673            let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1674            let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1675            for (type_id, field_ref) in fields.iter() {
1676                // NOTE: `process_datatype` would wrap nullability; force is_nullable=false here.
1677                let (branch_schema, _branch_extras) = datatype_to_avro(
1678                    field_ref.data_type(),
1679                    field_ref.name(),
1680                    field_ref.metadata(),
1681                    name_gen,
1682                    null_order,
1683                    strip,
1684                )?;
1685                // Avro unions cannot immediately contain another union
1686                if matches!(branch_schema, Value::Array(_)) {
1687                    return Err(ArrowError::SchemaError(
1688                        "Avro union may not immediately contain another union".into(),
1689                    ));
1690                }
1691                branches.push(branch_schema);
1692                type_ids.push(type_id as i32);
1693            }
1694            let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1695            for b in &branches {
1696                let sig = union_branch_signature(b)?;
1697                if !seen.insert(sig) {
1698                    return Err(ArrowError::SchemaError(
1699                        "Avro union contains duplicate branch types (disallowed by spec)".into(),
1700                    ));
1701                }
1702            }
1703            if !strip {
1704                extras.insert(
1705                    "arrowUnionMode".into(),
1706                    Value::String(
1707                        match mode {
1708                            UnionMode::Sparse => "sparse",
1709                            UnionMode::Dense => "dense",
1710                        }
1711                        .to_string(),
1712                    ),
1713                );
1714                extras.insert(
1715                    "arrowUnionTypeIds".into(),
1716                    Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1717                );
1718            }
1719            Value::Array(branches)
1720        }
1721        #[cfg(not(feature = "small_decimals"))]
1722        other => {
1723            return Err(ArrowError::NotYetImplemented(format!(
1724                "Arrow type {other:?} has no Avro representation"
1725            )));
1726        }
1727    };
1728    Ok((val, extras))
1729}
1730
1731fn process_datatype(
1732    dt: &DataType,
1733    field_name: &str,
1734    metadata: &HashMap<String, String>,
1735    name_gen: &mut NameGenerator,
1736    null_order: Nullability,
1737    is_nullable: bool,
1738    strip: bool,
1739) -> Result<Value, ArrowError> {
1740    let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order, strip)?;
1741    let mut merged = merge_extras(schema, extras);
1742    if is_nullable {
1743        merged = wrap_nullable(merged, null_order)
1744    }
1745    Ok(merged)
1746}
1747
1748fn arrow_field_to_avro(
1749    field: &ArrowField,
1750    name_gen: &mut NameGenerator,
1751    null_order: Nullability,
1752    strip: bool,
1753) -> Result<Value, ArrowError> {
1754    let avro_name = sanitise_avro_name(field.name());
1755    let schema_value = process_datatype(
1756        field.data_type(),
1757        &avro_name,
1758        field.metadata(),
1759        name_gen,
1760        null_order,
1761        field.is_nullable(),
1762        strip,
1763    )?;
1764    // Build the field map
1765    let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1766    map.insert("name".into(), Value::String(avro_name));
1767    map.insert("type".into(), schema_value);
1768    // Transfer selected metadata
1769    for (meta_key, meta_val) in field.metadata() {
1770        if is_internal_arrow_key(meta_key) {
1771            continue;
1772        }
1773        match meta_key.as_str() {
1774            AVRO_DOC_METADATA_KEY => {
1775                map.insert("doc".into(), Value::String(meta_val.clone()));
1776            }
1777            AVRO_FIELD_DEFAULT_METADATA_KEY => {
1778                let default_value = serde_json::from_str(meta_val)
1779                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1780                map.insert("default".into(), default_value);
1781            }
1782            _ => {
1783                let json_val = serde_json::from_str(meta_val)
1784                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1785                map.insert(meta_key.clone(), json_val);
1786            }
1787        }
1788    }
1789    Ok(Value::Object(map))
1790}
1791
1792#[cfg(test)]
1793mod tests {
1794    use super::*;
1795    use crate::codec::{AvroField, AvroFieldBuilder};
1796    use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1797    use serde_json::json;
1798    use std::sync::Arc;
1799
1800    fn int_schema() -> Schema<'static> {
1801        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1802    }
1803
1804    fn record_schema() -> Schema<'static> {
1805        Schema::Complex(ComplexType::Record(Record {
1806            name: "record1",
1807            namespace: Some("test.namespace"),
1808            doc: Some(Cow::from("A test record")),
1809            aliases: vec![],
1810            fields: vec![
1811                Field {
1812                    name: "field1",
1813                    doc: Some(Cow::from("An integer field")),
1814                    r#type: int_schema(),
1815                    default: None,
1816                    aliases: vec![],
1817                },
1818                Field {
1819                    name: "field2",
1820                    doc: None,
1821                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1822                    default: None,
1823                    aliases: vec![],
1824                },
1825            ],
1826            attributes: Attributes::default(),
1827        }))
1828    }
1829
1830    fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
1831        let mut sb = SchemaBuilder::new();
1832        sb.push(field);
1833        sb.finish()
1834    }
1835
1836    fn assert_json_contains(avro_json: &str, needle: &str) {
1837        assert!(
1838            avro_json.contains(needle),
1839            "JSON did not contain `{needle}` : {avro_json}"
1840        )
1841    }
1842
1843    #[test]
1844    fn test_deserialize() {
1845        let t: Schema = serde_json::from_str("\"string\"").unwrap();
1846        assert_eq!(
1847            t,
1848            Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
1849        );
1850
1851        let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
1852        assert_eq!(
1853            t,
1854            Schema::Union(vec![
1855                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1856                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1857            ])
1858        );
1859
1860        let t: Type = serde_json::from_str(
1861            r#"{
1862                   "type":"long",
1863                   "logicalType":"timestamp-micros"
1864                }"#,
1865        )
1866        .unwrap();
1867
1868        let timestamp = Type {
1869            r#type: TypeName::Primitive(PrimitiveType::Long),
1870            attributes: Attributes {
1871                logical_type: Some("timestamp-micros"),
1872                additional: Default::default(),
1873            },
1874        };
1875
1876        assert_eq!(t, timestamp);
1877
1878        let t: ComplexType = serde_json::from_str(
1879            r#"{
1880                   "type":"fixed",
1881                   "name":"fixed",
1882                   "namespace":"topLevelRecord.value",
1883                   "size":11,
1884                   "logicalType":"decimal",
1885                   "precision":25,
1886                   "scale":2
1887                }"#,
1888        )
1889        .unwrap();
1890
1891        let decimal = ComplexType::Fixed(Fixed {
1892            name: "fixed",
1893            namespace: Some("topLevelRecord.value"),
1894            aliases: vec![],
1895            size: 11,
1896            attributes: Attributes {
1897                logical_type: Some("decimal"),
1898                additional: vec![("precision", json!(25)), ("scale", json!(2))]
1899                    .into_iter()
1900                    .collect(),
1901            },
1902        });
1903
1904        assert_eq!(t, decimal);
1905
1906        let schema: Schema = serde_json::from_str(
1907            r#"{
1908               "type":"record",
1909               "name":"topLevelRecord",
1910               "fields":[
1911                  {
1912                     "name":"value",
1913                     "type":[
1914                        {
1915                           "type":"fixed",
1916                           "name":"fixed",
1917                           "namespace":"topLevelRecord.value",
1918                           "size":11,
1919                           "logicalType":"decimal",
1920                           "precision":25,
1921                           "scale":2
1922                        },
1923                        "null"
1924                     ]
1925                  }
1926               ]
1927            }"#,
1928        )
1929        .unwrap();
1930
1931        assert_eq!(
1932            schema,
1933            Schema::Complex(ComplexType::Record(Record {
1934                name: "topLevelRecord",
1935                namespace: None,
1936                doc: None,
1937                aliases: vec![],
1938                fields: vec![Field {
1939                    name: "value",
1940                    doc: None,
1941                    r#type: Schema::Union(vec![
1942                        Schema::Complex(decimal),
1943                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1944                    ]),
1945                    default: None,
1946                    aliases: vec![],
1947                },],
1948                attributes: Default::default(),
1949            }))
1950        );
1951
1952        let schema: Schema = serde_json::from_str(
1953            r#"{
1954                  "type": "record",
1955                  "name": "LongList",
1956                  "aliases": ["LinkedLongs"],
1957                  "fields" : [
1958                    {"name": "value", "type": "long"},
1959                    {"name": "next", "type": ["null", "LongList"]}
1960                  ]
1961                }"#,
1962        )
1963        .unwrap();
1964
1965        assert_eq!(
1966            schema,
1967            Schema::Complex(ComplexType::Record(Record {
1968                name: "LongList",
1969                namespace: None,
1970                doc: None,
1971                aliases: vec!["LinkedLongs"],
1972                fields: vec![
1973                    Field {
1974                        name: "value",
1975                        doc: None,
1976                        r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
1977                        default: None,
1978                        aliases: vec![],
1979                    },
1980                    Field {
1981                        name: "next",
1982                        doc: None,
1983                        r#type: Schema::Union(vec![
1984                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1985                            Schema::TypeName(TypeName::Ref("LongList")),
1986                        ]),
1987                        default: None,
1988                        aliases: vec![],
1989                    }
1990                ],
1991                attributes: Attributes::default(),
1992            }))
1993        );
1994
1995        // Recursive schema are not supported
1996        let err = AvroField::try_from(&schema).unwrap_err().to_string();
1997        assert_eq!(err, "Parser error: Failed to resolve .LongList");
1998
1999        let schema: Schema = serde_json::from_str(
2000            r#"{
2001               "type":"record",
2002               "name":"topLevelRecord",
2003               "fields":[
2004                  {
2005                     "name":"id",
2006                     "type":[
2007                        "int",
2008                        "null"
2009                     ]
2010                  },
2011                  {
2012                     "name":"timestamp_col",
2013                     "type":[
2014                        {
2015                           "type":"long",
2016                           "logicalType":"timestamp-micros"
2017                        },
2018                        "null"
2019                     ]
2020                  }
2021               ]
2022            }"#,
2023        )
2024        .unwrap();
2025
2026        assert_eq!(
2027            schema,
2028            Schema::Complex(ComplexType::Record(Record {
2029                name: "topLevelRecord",
2030                namespace: None,
2031                doc: None,
2032                aliases: vec![],
2033                fields: vec![
2034                    Field {
2035                        name: "id",
2036                        doc: None,
2037                        r#type: Schema::Union(vec![
2038                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2039                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2040                        ]),
2041                        default: None,
2042                        aliases: vec![],
2043                    },
2044                    Field {
2045                        name: "timestamp_col",
2046                        doc: None,
2047                        r#type: Schema::Union(vec![
2048                            Schema::Type(timestamp),
2049                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2050                        ]),
2051                        default: None,
2052                        aliases: vec![],
2053                    }
2054                ],
2055                attributes: Default::default(),
2056            }))
2057        );
2058        let codec = AvroField::try_from(&schema).unwrap();
2059        let expected_arrow_field = arrow_schema::Field::new(
2060            "topLevelRecord",
2061            DataType::Struct(Fields::from(vec![
2062                arrow_schema::Field::new("id", DataType::Int32, true),
2063                arrow_schema::Field::new(
2064                    "timestamp_col",
2065                    DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2066                    true,
2067                ),
2068            ])),
2069            false,
2070        )
2071        .with_metadata(std::collections::HashMap::from([(
2072            AVRO_NAME_METADATA_KEY.to_string(),
2073            "topLevelRecord".to_string(),
2074        )]));
2075
2076        assert_eq!(codec.field(), expected_arrow_field);
2077
2078        let schema: Schema = serde_json::from_str(
2079            r#"{
2080                  "type": "record",
2081                  "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
2082                  "fields": [
2083                    {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
2084                    {"name": "clientProtocol", "type": ["null", "string"]},
2085                    {"name": "serverHash", "type": "MD5"},
2086                    {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
2087                  ]
2088            }"#,
2089        )
2090        .unwrap();
2091
2092        assert_eq!(
2093            schema,
2094            Schema::Complex(ComplexType::Record(Record {
2095                name: "HandshakeRequest",
2096                namespace: Some("org.apache.avro.ipc"),
2097                doc: None,
2098                aliases: vec![],
2099                fields: vec![
2100                    Field {
2101                        name: "clientHash",
2102                        doc: None,
2103                        r#type: Schema::Complex(ComplexType::Fixed(Fixed {
2104                            name: "MD5",
2105                            namespace: None,
2106                            aliases: vec![],
2107                            size: 16,
2108                            attributes: Default::default(),
2109                        })),
2110                        default: None,
2111                        aliases: vec![],
2112                    },
2113                    Field {
2114                        name: "clientProtocol",
2115                        doc: None,
2116                        r#type: Schema::Union(vec![
2117                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2118                            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2119                        ]),
2120                        default: None,
2121                        aliases: vec![],
2122                    },
2123                    Field {
2124                        name: "serverHash",
2125                        doc: None,
2126                        r#type: Schema::TypeName(TypeName::Ref("MD5")),
2127                        default: None,
2128                        aliases: vec![],
2129                    },
2130                    Field {
2131                        name: "meta",
2132                        doc: None,
2133                        r#type: Schema::Union(vec![
2134                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2135                            Schema::Complex(ComplexType::Map(Map {
2136                                values: Box::new(Schema::TypeName(TypeName::Primitive(
2137                                    PrimitiveType::Bytes
2138                                ))),
2139                                attributes: Default::default(),
2140                            })),
2141                        ]),
2142                        default: None,
2143                        aliases: vec![],
2144                    }
2145                ],
2146                attributes: Default::default(),
2147            }))
2148        );
2149    }
2150
2151    #[test]
2152    fn test_canonical_form_generation_comprehensive_record() {
2153        // NOTE: This schema is identical to the one used in test_deserialize_comprehensive.
2154        let json_str = r#"{
2155          "type": "record",
2156          "name": "E2eComprehensive",
2157          "namespace": "org.apache.arrow.avrotests.v1",
2158          "doc": "Comprehensive Avro writer schema to exercise arrow-avro Reader/Decoder paths.",
2159          "fields": [
2160            {"name": "id", "type": "long", "doc": "Primary row id", "aliases": ["identifier"]},
2161            {"name": "flag", "type": "boolean", "default": true, "doc": "A sample boolean with default true"},
2162            {"name": "ratio_f32", "type": "float", "default": 0.0, "doc": "Float32 example"},
2163            {"name": "ratio_f64", "type": "double", "default": 0.0, "doc": "Float64 example"},
2164            {"name": "count_i32", "type": "int", "default": 0, "doc": "Int32 example"},
2165            {"name": "count_i64", "type": "long", "default": 0, "doc": "Int64 example"},
2166            {"name": "opt_i32_nullfirst", "type": ["null", "int"], "default": null, "doc": "Nullable int (null-first)"},
2167            {"name": "opt_str_nullsecond", "type": ["string", "null"], "default": "", "aliases": ["old_opt_str"], "doc": "Nullable string (null-second). Default is empty string."},
2168            {"name": "tri_union_prim", "type": ["int", "string", "boolean"], "default": 0, "doc": "Union[int, string, boolean] with default on first branch (int=0)."},
2169            {"name": "str_utf8", "type": "string", "default": "default", "doc": "Plain Utf8 string (Reader may use Utf8View)."},
2170            {"name": "raw_bytes", "type": "bytes", "default": "", "doc": "Raw bytes field"},
2171            {"name": "fx16_plain", "type": {"type": "fixed", "name": "Fx16", "namespace": "org.apache.arrow.avrotests.v1.types", "aliases": ["Fixed16Old"], "size": 16}, "doc": "Plain fixed(16)"},
2172            {"name": "dec_bytes_s10_2", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}, "doc": "Decimal encoded on bytes, precision 10, scale 2"},
2173            {"name": "dec_fix_s20_4", "type": {"type": "fixed", "name": "DecFix20", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 20, "logicalType": "decimal", "precision": 20, "scale": 4}, "doc": "Decimal encoded on fixed(20), precision 20, scale 4"},
2174            {"name": "uuid_str", "type": {"type": "string", "logicalType": "uuid"}, "doc": "UUID logical type on string"},
2175            {"name": "d_date", "type": {"type": "int", "logicalType": "date"}, "doc": "Date32: days since 1970-01-01"},
2176            {"name": "t_millis", "type": {"type": "int", "logicalType": "time-millis"}, "doc": "Time32-millis"},
2177            {"name": "t_micros", "type": {"type": "long", "logicalType": "time-micros"}, "doc": "Time64-micros"},
2178            {"name": "ts_millis_utc", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Timestamp ms (UTC)"},
2179            {"name": "ts_micros_utc", "type": {"type": "long", "logicalType": "timestamp-micros"}, "doc": "Timestamp µs (UTC)"},
2180            {"name": "ts_millis_local", "type": {"type": "long", "logicalType": "local-timestamp-millis"}, "doc": "Local timestamp ms"},
2181            {"name": "ts_micros_local", "type": {"type": "long", "logicalType": "local-timestamp-micros"}, "doc": "Local timestamp µs"},
2182            {"name": "interval_mdn", "type": {"type": "fixed", "name": "Dur12", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 12, "logicalType": "duration"}, "doc": "Duration: fixed(12) little-endian (months, days, millis)"},
2183            {"name": "status", "type": {"type": "enum", "name": "Status", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["UNKNOWN", "NEW", "PROCESSING", "DONE"], "aliases": ["State"], "doc": "Processing status enum with default"}, "default": "UNKNOWN", "doc": "Enum field using default when resolving"},
2184            {"name": "arr_union", "type": {"type": "array", "items": ["long", "string", "null"]}, "default": [], "doc": "Array whose items are a union[long,string,null]"},
2185            {"name": "map_union", "type": {"type": "map", "values": ["null", "double", "string"]}, "default": {}, "doc": "Map whose values are a union[null,double,string]"},
2186            {"name": "address", "type": {"type": "record", "name": "Address", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Postal address with defaults and field alias", "fields": [
2187                {"name": "street", "type": "string", "default": "", "aliases": ["street_name"], "doc": "Street (field alias = street_name)"},
2188                {"name": "zip", "type": "int", "default": 0, "doc": "ZIP/postal code"},
2189                {"name": "country", "type": "string", "default": "US", "doc": "Country code"}
2190            ]}, "doc": "Embedded Address record"},
2191            {"name": "maybe_auth", "type": {"type": "record", "name": "MaybeAuth", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Optional auth token model", "fields": [
2192                {"name": "user", "type": "string", "doc": "Username"},
2193                {"name": "token", "type": ["null", "bytes"], "default": null, "doc": "Nullable auth token"}
2194            ]}},
2195            {"name": "union_enum_record_array_map", "type": [
2196                {"type": "enum", "name": "Color", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["RED", "GREEN", "BLUE"], "doc": "Color enum"},
2197                {"type": "record", "name": "RecA", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]},
2198                {"type": "record", "name": "RecB", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "x", "type": "long"}, {"name": "y", "type": "bytes"}]},
2199                {"type": "array", "items": "long"},
2200                {"type": "map", "values": "string"}
2201            ], "doc": "Union of enum, two records, array, and map"},
2202            {"name": "union_date_or_fixed4", "type": [
2203                {"type": "int", "logicalType": "date"},
2204                {"type": "fixed", "name": "Fx4", "size": 4}
2205            ], "doc": "Union of date(int) or fixed(4)"},
2206            {"name": "union_interval_or_string", "type": [
2207                {"type": "fixed", "name": "Dur12U", "size": 12, "logicalType": "duration"},
2208                "string"
2209            ], "doc": "Union of duration(fixed12) or string"},
2210            {"name": "union_uuid_or_fixed10", "type": [
2211                {"type": "string", "logicalType": "uuid"},
2212                {"type": "fixed", "name": "Fx10", "size": 10}
2213            ], "doc": "Union of UUID string or fixed(10)"},
2214            {"name": "array_records_with_union", "type": {"type": "array", "items": {
2215                "type": "record", "name": "KV", "namespace": "org.apache.arrow.avrotests.v1.types",
2216                "fields": [
2217                    {"name": "key", "type": "string"},
2218                    {"name": "val", "type": ["null", "int", "long"], "default": null}
2219                ]
2220            }}, "doc": "Array<record{key, val: union[null,int,long]}>", "default": []},
2221            {"name": "union_map_or_array_int", "type": [
2222                {"type": "map", "values": "int"},
2223                {"type": "array", "items": "int"}
2224            ], "doc": "Union[map<string,int>, array<int>]"},
2225            {"name": "renamed_with_default", "type": "int", "default": 42, "aliases": ["old_count"], "doc": "Field with alias and default"},
2226            {"name": "person", "type": {"type": "record", "name": "PersonV2", "namespace": "com.example.v2", "aliases": ["com.example.Person"], "doc": "Person record with alias pointing to previous namespace/name", "fields": [
2227                {"name": "name", "type": "string"},
2228                {"name": "age", "type": "int", "default": 0}
2229            ]}, "doc": "Record using type alias for schema evolution tests"}
2230          ]
2231        }"#;
2232        let avro = AvroSchema::new(json_str.to_string());
2233        let parsed = avro.schema().expect("schema should deserialize");
2234        let expected_canonical_form = r#"{"name":"org.apache.arrow.avrotests.v1.E2eComprehensive","type":"record","fields":[{"name":"id","type":"long"},{"name":"flag","type":"boolean"},{"name":"ratio_f32","type":"float"},{"name":"ratio_f64","type":"double"},{"name":"count_i32","type":"int"},{"name":"count_i64","type":"long"},{"name":"opt_i32_nullfirst","type":["null","int"]},{"name":"opt_str_nullsecond","type":["string","null"]},{"name":"tri_union_prim","type":["int","string","boolean"]},{"name":"str_utf8","type":"string"},{"name":"raw_bytes","type":"bytes"},{"name":"fx16_plain","type":{"name":"org.apache.arrow.avrotests.v1.types.Fx16","type":"fixed","size":16}},{"name":"dec_bytes_s10_2","type":"bytes"},{"name":"dec_fix_s20_4","type":{"name":"org.apache.arrow.avrotests.v1.types.DecFix20","type":"fixed","size":20}},{"name":"uuid_str","type":"string"},{"name":"d_date","type":"int"},{"name":"t_millis","type":"int"},{"name":"t_micros","type":"long"},{"name":"ts_millis_utc","type":"long"},{"name":"ts_micros_utc","type":"long"},{"name":"ts_millis_local","type":"long"},{"name":"ts_micros_local","type":"long"},{"name":"interval_mdn","type":{"name":"org.apache.arrow.avrotests.v1.types.Dur12","type":"fixed","size":12}},{"name":"status","type":{"name":"org.apache.arrow.avrotests.v1.types.Status","type":"enum","symbols":["UNKNOWN","NEW","PROCESSING","DONE"]}},{"name":"arr_union","type":{"type":"array","items":["long","string","null"]}},{"name":"map_union","type":{"type":"map","values":["null","double","string"]}},{"name":"address","type":{"name":"org.apache.arrow.avrotests.v1.types.Address","type":"record","fields":[{"name":"street","type":"string"},{"name":"zip","type":"int"},{"name":"country","type":"string"}]}},{"name":"maybe_auth","type":{"name":"org.apache.arrow.avrotests.v1.types.MaybeAuth","type":"record","fields":[{"name":"user","type":"string"},{"name":"token","type":["null","bytes"]}]}},{"name":"union_enum_record_array_map","type":[{"name":"org.apache.arrow.avrotests.v1.types.Color","type":"enum","symbols":["RED","GREEN","BLUE"]},{"name":"org.apache.arrow.avrotests.v1.types.RecA","type":"record","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]},{"name":"org.apache.arrow.avrotests.v1.types.RecB","type":"record","fields":[{"name":"x","type":"long"},{"name":"y","type":"bytes"}]},{"type":"array","items":"long"},{"type":"map","values":"string"}]},{"name":"union_date_or_fixed4","type":["int",{"name":"org.apache.arrow.avrotests.v1.Fx4","type":"fixed","size":4}]},{"name":"union_interval_or_string","type":[{"name":"org.apache.arrow.avrotests.v1.Dur12U","type":"fixed","size":12},"string"]},{"name":"union_uuid_or_fixed10","type":["string",{"name":"org.apache.arrow.avrotests.v1.Fx10","type":"fixed","size":10}]},{"name":"array_records_with_union","type":{"type":"array","items":{"name":"org.apache.arrow.avrotests.v1.types.KV","type":"record","fields":[{"name":"key","type":"string"},{"name":"val","type":["null","int","long"]}]}}},{"name":"union_map_or_array_int","type":[{"type":"map","values":"int"},{"type":"array","items":"int"}]},{"name":"renamed_with_default","type":"int"},{"name":"person","type":{"name":"com.example.v2.PersonV2","type":"record","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}}]}"#;
2235        let canonical_form =
2236            AvroSchema::generate_canonical_form(&parsed).expect("canonical form should be built");
2237        assert_eq!(
2238            canonical_form, expected_canonical_form,
2239            "Canonical form must match Avro spec PCF exactly"
2240        );
2241    }
2242
2243    #[test]
2244    fn test_new_schema_store() {
2245        let store = SchemaStore::new();
2246        assert!(store.schemas.is_empty());
2247    }
2248
2249    #[test]
2250    fn test_try_from_schemas_rabin() {
2251        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2252        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2253        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2254        schemas.insert(
2255            int_avro_schema
2256                .fingerprint(FingerprintAlgorithm::Rabin)
2257                .unwrap(),
2258            int_avro_schema.clone(),
2259        );
2260        schemas.insert(
2261            record_avro_schema
2262                .fingerprint(FingerprintAlgorithm::Rabin)
2263                .unwrap(),
2264            record_avro_schema.clone(),
2265        );
2266        let store = SchemaStore::try_from(schemas).unwrap();
2267        let int_fp = int_avro_schema
2268            .fingerprint(FingerprintAlgorithm::Rabin)
2269            .unwrap();
2270        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2271        let rec_fp = record_avro_schema
2272            .fingerprint(FingerprintAlgorithm::Rabin)
2273            .unwrap();
2274        assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
2275    }
2276
2277    #[test]
2278    fn test_try_from_with_duplicates() {
2279        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2280        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2281        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2282        schemas.insert(
2283            int_avro_schema
2284                .fingerprint(FingerprintAlgorithm::Rabin)
2285                .unwrap(),
2286            int_avro_schema.clone(),
2287        );
2288        schemas.insert(
2289            record_avro_schema
2290                .fingerprint(FingerprintAlgorithm::Rabin)
2291                .unwrap(),
2292            record_avro_schema.clone(),
2293        );
2294        // Insert duplicate of int schema
2295        schemas.insert(
2296            int_avro_schema
2297                .fingerprint(FingerprintAlgorithm::Rabin)
2298                .unwrap(),
2299            int_avro_schema.clone(),
2300        );
2301        let store = SchemaStore::try_from(schemas).unwrap();
2302        assert_eq!(store.schemas.len(), 2);
2303        let int_fp = int_avro_schema
2304            .fingerprint(FingerprintAlgorithm::Rabin)
2305            .unwrap();
2306        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2307    }
2308
2309    #[test]
2310    fn test_register_and_lookup_rabin() {
2311        let mut store = SchemaStore::new();
2312        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2313        let fp_enum = store.register(schema.clone()).unwrap();
2314        match fp_enum {
2315            Fingerprint::Rabin(fp_val) => {
2316                assert_eq!(
2317                    store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
2318                    Some(schema.clone())
2319                );
2320                assert!(
2321                    store
2322                        .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
2323                        .is_none()
2324                );
2325            }
2326            Fingerprint::Id(_id) => {
2327                unreachable!("This test should only generate Rabin fingerprints")
2328            }
2329            Fingerprint::Id64(_id) => {
2330                unreachable!("This test should only generate Rabin fingerprints")
2331            }
2332            #[cfg(feature = "md5")]
2333            Fingerprint::MD5(_id) => {
2334                unreachable!("This test should only generate Rabin fingerprints")
2335            }
2336            #[cfg(feature = "sha256")]
2337            Fingerprint::SHA256(_id) => {
2338                unreachable!("This test should only generate Rabin fingerprints")
2339            }
2340        }
2341    }
2342
2343    #[test]
2344    fn test_set_and_lookup_id() {
2345        let mut store = SchemaStore::new();
2346        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2347        let id = 42u32;
2348        let fp = Fingerprint::Id(id);
2349        let out_fp = store.set(fp, schema.clone()).unwrap();
2350        assert_eq!(out_fp, fp);
2351        assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2352        assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2353    }
2354
2355    #[test]
2356    fn test_set_and_lookup_id64() {
2357        let mut store = SchemaStore::new();
2358        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2359        let id64: u64 = 0xDEAD_BEEF_DEAD_BEEF;
2360        let fp = Fingerprint::Id64(id64);
2361        let out_fp = store.set(fp, schema.clone()).unwrap();
2362        assert_eq!(out_fp, fp, "set should return the same Id64 fingerprint");
2363        assert_eq!(
2364            store.lookup(&fp).cloned(),
2365            Some(schema.clone()),
2366            "lookup should find the schema by Id64"
2367        );
2368        assert!(
2369            store
2370                .lookup(&Fingerprint::Id64(id64.wrapping_add(1)))
2371                .is_none(),
2372            "lookup with a different Id64 must return None"
2373        );
2374    }
2375
2376    #[test]
2377    fn test_fingerprint_id64_conversions() {
2378        let algo_from_fp = FingerprintAlgorithm::from(&Fingerprint::Id64(123));
2379        assert_eq!(algo_from_fp, FingerprintAlgorithm::Id64);
2380        let fp_from_algo = Fingerprint::from(FingerprintAlgorithm::Id64);
2381        assert!(matches!(fp_from_algo, Fingerprint::Id64(0)));
2382        let strategy_from_fp = FingerprintStrategy::from(Fingerprint::Id64(5));
2383        assert!(matches!(strategy_from_fp, FingerprintStrategy::Id64(0)));
2384        let algo_from_strategy = FingerprintAlgorithm::from(strategy_from_fp);
2385        assert_eq!(algo_from_strategy, FingerprintAlgorithm::Id64);
2386    }
2387
2388    #[test]
2389    fn test_register_duplicate_schema() {
2390        let mut store = SchemaStore::new();
2391        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2392        let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2393        let fingerprint1 = store.register(schema1).unwrap();
2394        let fingerprint2 = store.register(schema2).unwrap();
2395        assert_eq!(fingerprint1, fingerprint2);
2396        assert_eq!(store.schemas.len(), 1);
2397    }
2398
2399    #[test]
2400    fn test_set_and_lookup_with_provided_fingerprint() {
2401        let mut store = SchemaStore::new();
2402        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2403        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2404        let out_fp = store.set(fp, schema.clone()).unwrap();
2405        assert_eq!(out_fp, fp);
2406        assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2407    }
2408
2409    #[test]
2410    fn test_set_duplicate_same_schema_ok() {
2411        let mut store = SchemaStore::new();
2412        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2413        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2414        let _ = store.set(fp, schema.clone()).unwrap();
2415        let _ = store.set(fp, schema.clone()).unwrap();
2416        assert_eq!(store.schemas.len(), 1);
2417    }
2418
2419    #[test]
2420    fn test_set_duplicate_different_schema_collision_error() {
2421        let mut store = SchemaStore::new();
2422        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2423        let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2424        // Use the same Fingerprint::Id to simulate a collision across different schemas
2425        let fp = Fingerprint::Id(123);
2426        let _ = store.set(fp, schema1).unwrap();
2427        let err = store.set(fp, schema2).unwrap_err();
2428        let msg = format!("{err}");
2429        assert!(msg.contains("Schema fingerprint collision"));
2430    }
2431
2432    #[test]
2433    fn test_canonical_form_generation_primitive() {
2434        let schema = int_schema();
2435        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2436        assert_eq!(canonical_form, r#""int""#);
2437    }
2438
2439    #[test]
2440    fn test_canonical_form_generation_record() {
2441        let schema = record_schema();
2442        let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2443        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2444        assert_eq!(canonical_form, expected_canonical_form);
2445    }
2446
2447    #[test]
2448    fn test_fingerprint_calculation() {
2449        let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2450        let expected_fingerprint = 10505236152925314060;
2451        let fingerprint = compute_fingerprint_rabin(canonical_form);
2452        assert_eq!(fingerprint, expected_fingerprint);
2453    }
2454
2455    #[test]
2456    fn test_register_and_lookup_complex_schema() {
2457        let mut store = SchemaStore::new();
2458        let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2459        let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2460        let expected_fingerprint = Fingerprint::Rabin(compute_fingerprint_rabin(canonical_form));
2461        let fingerprint = store.register(schema.clone()).unwrap();
2462        assert_eq!(fingerprint, expected_fingerprint);
2463        let looked_up = store.lookup(&fingerprint).cloned();
2464        assert_eq!(looked_up, Some(schema));
2465    }
2466
2467    #[test]
2468    fn test_fingerprints_returns_all_keys() {
2469        let mut store = SchemaStore::new();
2470        let fp_int = store
2471            .register(AvroSchema::new(
2472                serde_json::to_string(&int_schema()).unwrap(),
2473            ))
2474            .unwrap();
2475        let fp_record = store
2476            .register(AvroSchema::new(
2477                serde_json::to_string(&record_schema()).unwrap(),
2478            ))
2479            .unwrap();
2480        let fps = store.fingerprints();
2481        assert_eq!(fps.len(), 2);
2482        assert!(fps.contains(&fp_int));
2483        assert!(fps.contains(&fp_record));
2484    }
2485
2486    #[test]
2487    fn test_canonical_form_strips_attributes() {
2488        let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2489            name: "record_with_attrs",
2490            namespace: None,
2491            doc: Some(Cow::from("This doc should be stripped")),
2492            aliases: vec!["alias1", "alias2"],
2493            fields: vec![Field {
2494                name: "f1",
2495                doc: Some(Cow::from("field doc")),
2496                r#type: Schema::Type(Type {
2497                    r#type: TypeName::Primitive(PrimitiveType::Bytes),
2498                    attributes: Attributes {
2499                        logical_type: None,
2500                        additional: HashMap::from([("precision", json!(4))]),
2501                    },
2502                }),
2503                default: None,
2504                aliases: vec![],
2505            }],
2506            attributes: Attributes {
2507                logical_type: None,
2508                additional: HashMap::from([("custom_attr", json!("value"))]),
2509            },
2510        }));
2511        let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2512        let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2513        assert_eq!(canonical_form, expected_canonical_form);
2514    }
2515
2516    #[test]
2517    fn test_primitive_mappings() {
2518        let cases = vec![
2519            (DataType::Boolean, "\"boolean\""),
2520            (DataType::Int8, "\"int\""),
2521            (DataType::Int16, "\"int\""),
2522            (DataType::Int32, "\"int\""),
2523            (DataType::Int64, "\"long\""),
2524            (DataType::UInt8, "\"int\""),
2525            (DataType::UInt16, "\"int\""),
2526            (DataType::UInt32, "\"long\""),
2527            (DataType::UInt64, "\"long\""),
2528            (DataType::Float16, "\"float\""),
2529            (DataType::Float32, "\"float\""),
2530            (DataType::Float64, "\"double\""),
2531            (DataType::Utf8, "\"string\""),
2532            (DataType::Binary, "\"bytes\""),
2533        ];
2534        for (dt, avro_token) in cases {
2535            let field = ArrowField::new("col", dt.clone(), false);
2536            let arrow_schema = single_field_schema(field);
2537            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2538            assert_json_contains(&avro.json_string, avro_token);
2539        }
2540    }
2541
2542    #[test]
2543    fn test_temporal_mappings() {
2544        let cases = vec![
2545            (DataType::Date32, "\"logicalType\":\"date\""),
2546            (
2547                DataType::Time32(TimeUnit::Millisecond),
2548                "\"logicalType\":\"time-millis\"",
2549            ),
2550            (
2551                DataType::Time64(TimeUnit::Microsecond),
2552                "\"logicalType\":\"time-micros\"",
2553            ),
2554            (
2555                DataType::Timestamp(TimeUnit::Millisecond, None),
2556                "\"logicalType\":\"local-timestamp-millis\"",
2557            ),
2558            (
2559                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2560                "\"logicalType\":\"timestamp-micros\"",
2561            ),
2562        ];
2563        for (dt, needle) in cases {
2564            let field = ArrowField::new("ts", dt.clone(), true);
2565            let arrow_schema = single_field_schema(field);
2566            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2567            assert_json_contains(&avro.json_string, needle);
2568        }
2569    }
2570
2571    #[test]
2572    fn test_decimal_and_uuid() {
2573        let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2574        let dec_schema = single_field_schema(decimal_field);
2575        let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2576        assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2577        assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2578        assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2579        let mut md = HashMap::new();
2580        md.insert("logicalType".into(), "uuid".into());
2581        let uuid_field =
2582            ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2583        let uuid_schema = single_field_schema(uuid_field);
2584        let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2585        assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2586    }
2587
2588    #[cfg(feature = "avro_custom_types")]
2589    #[test]
2590    fn test_interval_duration() {
2591        let interval_field = ArrowField::new(
2592            "span",
2593            DataType::Interval(IntervalUnit::MonthDayNano),
2594            false,
2595        );
2596        let s = single_field_schema(interval_field);
2597        let avro = AvroSchema::try_from(&s).unwrap();
2598        assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2599        assert_json_contains(&avro.json_string, "\"size\":12");
2600        let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2601        let s2 = single_field_schema(dur_field);
2602        let avro2 = AvroSchema::try_from(&s2).unwrap();
2603        assert_json_contains(
2604            &avro2.json_string,
2605            "\"logicalType\":\"arrow.duration-nanos\"",
2606        );
2607    }
2608
2609    #[test]
2610    fn test_complex_types() {
2611        let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2612        let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2613        let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2614        assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2615        assert_json_contains(&avro_list.json_string, "\"items\"");
2616        let value_field = ArrowField::new("value", DataType::Boolean, true);
2617        let entries_struct = ArrowField::new(
2618            "entries",
2619            DataType::Struct(Fields::from(vec![
2620                ArrowField::new("key", DataType::Utf8, false),
2621                value_field.clone(),
2622            ])),
2623            false,
2624        );
2625        let map_dt = DataType::Map(Arc::new(entries_struct), false);
2626        let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
2627        let avro_map = AvroSchema::try_from(&map_schema).unwrap();
2628        assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
2629        assert_json_contains(&avro_map.json_string, "\"values\"");
2630        let struct_dt = DataType::Struct(Fields::from(vec![
2631            ArrowField::new("f1", DataType::Int64, false),
2632            ArrowField::new("f2", DataType::Utf8, true),
2633        ]));
2634        let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
2635        let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
2636        assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
2637        assert_json_contains(&avro_struct.json_string, "\"null\"");
2638    }
2639
2640    #[test]
2641    fn test_enum_dictionary() {
2642        let mut md = HashMap::new();
2643        md.insert(
2644            AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
2645            "[\"OPEN\",\"CLOSED\"]".into(),
2646        );
2647        let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2648        let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
2649        let schema = single_field_schema(field);
2650        let avro = AvroSchema::try_from(&schema).unwrap();
2651        assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
2652        assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
2653    }
2654
2655    #[test]
2656    fn test_run_end_encoded() {
2657        let ree_dt = DataType::RunEndEncoded(
2658            Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
2659            Arc::new(ArrowField::new("values", DataType::Utf8, false)),
2660        );
2661        let s = single_field_schema(ArrowField::new("text", ree_dt, false));
2662        let avro = AvroSchema::try_from(&s).unwrap();
2663        assert_json_contains(&avro.json_string, "\"string\"");
2664    }
2665
2666    #[test]
2667    fn test_dense_union() {
2668        let uf: UnionFields = vec![
2669            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
2670            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
2671        ]
2672        .into_iter()
2673        .collect();
2674        let union_dt = DataType::Union(uf, UnionMode::Dense);
2675        let s = single_field_schema(ArrowField::new("u", union_dt, false));
2676        let avro =
2677            AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
2678        let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
2679        let fields = v
2680            .get("fields")
2681            .and_then(|x| x.as_array())
2682            .expect("fields array");
2683        let u_field = fields
2684            .iter()
2685            .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
2686            .expect("field 'u'");
2687        let union = u_field.get("type").expect("u.type");
2688        let arr = union.as_array().expect("u.type must be Avro union array");
2689        assert_eq!(arr.len(), 2, "expected two union branches");
2690        let first = &arr[0];
2691        let obj = first
2692            .as_object()
2693            .expect("first branch should be an object with metadata");
2694        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
2695        assert_eq!(
2696            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
2697            Some("dense")
2698        );
2699        let type_ids: Vec<i64> = obj
2700            .get("arrowUnionTypeIds")
2701            .and_then(|a| a.as_array())
2702            .expect("arrowUnionTypeIds array")
2703            .iter()
2704            .map(|n| n.as_i64().expect("i64"))
2705            .collect();
2706        assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
2707        assert_eq!(arr[1], Value::String("string".into()));
2708    }
2709
2710    #[test]
2711    fn round_trip_primitive() {
2712        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
2713        let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
2714        let decoded = avro_schema.schema().unwrap();
2715        assert!(matches!(decoded, Schema::Complex(_)));
2716    }
2717
2718    #[test]
2719    fn test_name_generator_sanitization_and_uniqueness() {
2720        let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
2721        let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
2722        let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
2723        let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
2724        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2725        assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
2726        assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
2727        assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
2728    }
2729
2730    #[test]
2731    fn test_date64_logical_type_mapping() {
2732        let field = ArrowField::new("d", DataType::Date64, true);
2733        let schema = single_field_schema(field);
2734        let avro = AvroSchema::try_from(&schema).unwrap();
2735        assert_json_contains(
2736            &avro.json_string,
2737            "\"logicalType\":\"local-timestamp-millis\"",
2738        );
2739    }
2740
2741    #[cfg(feature = "avro_custom_types")]
2742    #[test]
2743    fn test_duration_list_extras_propagated() {
2744        let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
2745        let list_dt = DataType::List(Arc::new(child));
2746        let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
2747        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2748        assert_json_contains(
2749            &avro.json_string,
2750            "\"logicalType\":\"arrow.duration-micros\"",
2751        );
2752    }
2753
2754    #[test]
2755    fn test_interval_yearmonth_extra() {
2756        let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
2757        let schema = single_field_schema(field);
2758        let avro = AvroSchema::try_from(&schema).unwrap();
2759        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
2760    }
2761
2762    #[test]
2763    fn test_interval_daytime_extra() {
2764        let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
2765        let schema = single_field_schema(field);
2766        let avro = AvroSchema::try_from(&schema).unwrap();
2767        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
2768    }
2769
2770    #[test]
2771    fn test_fixed_size_list_extra() {
2772        let child = ArrowField::new("item", DataType::Int32, false);
2773        let dt = DataType::FixedSizeList(Arc::new(child), 3);
2774        let schema = single_field_schema(ArrowField::new("triples", dt, false));
2775        let avro = AvroSchema::try_from(&schema).unwrap();
2776        assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
2777    }
2778
2779    #[cfg(feature = "avro_custom_types")]
2780    #[test]
2781    fn test_map_duration_value_extra() {
2782        let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
2783        let entries_struct = ArrowField::new(
2784            "entries",
2785            DataType::Struct(Fields::from(vec![
2786                ArrowField::new("key", DataType::Utf8, false),
2787                val_field,
2788            ])),
2789            false,
2790        );
2791        let map_dt = DataType::Map(Arc::new(entries_struct), false);
2792        let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
2793        let avro = AvroSchema::try_from(&schema).unwrap();
2794        assert_json_contains(
2795            &avro.json_string,
2796            "\"logicalType\":\"arrow.duration-seconds\"",
2797        );
2798    }
2799
2800    #[test]
2801    fn test_schema_with_non_string_defaults_decodes_successfully() {
2802        let schema_json = r#"{
2803            "type": "record",
2804            "name": "R",
2805            "fields": [
2806                {"name": "a", "type": "int", "default": 0},
2807                {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
2808                {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
2809                {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
2810                    {"name": "flag", "type": "boolean", "default": true},
2811                    {"name": "name", "type": "string", "default": "hi"}
2812                ]}, "default": {"flag": false, "name": "d"}},
2813                {"name": "u", "type": ["int", "null"], "default": 42}
2814            ]
2815        }"#;
2816        let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
2817        match &schema {
2818            Schema::Complex(ComplexType::Record(_)) => {}
2819            other => panic!("expected record schema, got: {:?}", other),
2820        }
2821        // Avro to Arrow conversion
2822        let field = crate::codec::AvroField::try_from(&schema)
2823            .expect("Avro->Arrow conversion should succeed");
2824        let arrow_field = field.field();
2825        // Build expected Arrow field
2826        let expected_list_item = ArrowField::new(
2827            arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
2828            DataType::Int64,
2829            false,
2830        );
2831        let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
2832
2833        let expected_map_value = ArrowField::new("value", DataType::Float64, false);
2834        let expected_entries = ArrowField::new(
2835            "entries",
2836            DataType::Struct(Fields::from(vec![
2837                ArrowField::new("key", DataType::Utf8, false),
2838                expected_map_value,
2839            ])),
2840            false,
2841        );
2842        let expected_c =
2843            ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
2844        let mut inner_md = std::collections::HashMap::new();
2845        inner_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "Inner".to_string());
2846        let expected_inner = ArrowField::new(
2847            "inner",
2848            DataType::Struct(Fields::from(vec![
2849                ArrowField::new("flag", DataType::Boolean, false),
2850                ArrowField::new("name", DataType::Utf8, false),
2851            ])),
2852            false,
2853        )
2854        .with_metadata(inner_md);
2855        let mut root_md = std::collections::HashMap::new();
2856        root_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "R".to_string());
2857        let expected = ArrowField::new(
2858            "R",
2859            DataType::Struct(Fields::from(vec![
2860                ArrowField::new("a", DataType::Int32, false),
2861                expected_b,
2862                expected_c,
2863                expected_inner,
2864                ArrowField::new("u", DataType::Int32, true),
2865            ])),
2866            false,
2867        )
2868        .with_metadata(root_md);
2869        assert_eq!(arrow_field, expected);
2870    }
2871
2872    #[test]
2873    fn default_order_is_consistent() {
2874        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
2875        let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
2876        let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
2877        assert_eq!(a, b.unwrap().json_string);
2878    }
2879
2880    #[test]
2881    fn test_union_branch_missing_name_errors() {
2882        for t in ["record", "enum", "fixed"] {
2883            let branch = json!({ "type": t });
2884            let err = union_branch_signature(&branch).unwrap_err().to_string();
2885            assert!(
2886                err.contains(&format!("Union branch '{t}' missing required 'name'")),
2887                "expected missing-name error for {t}, got: {err}"
2888            );
2889        }
2890    }
2891
2892    #[test]
2893    fn test_union_branch_named_type_signature_includes_name() {
2894        let rec = json!({ "type": "record", "name": "Foo" });
2895        assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
2896        let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
2897        assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
2898        let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
2899        assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
2900    }
2901
2902    #[test]
2903    fn test_record_field_alias_resolution_without_default() {
2904        let writer_json = r#"{
2905          "type":"record",
2906          "name":"R",
2907          "fields":[{"name":"old","type":"int"}]
2908        }"#;
2909        let reader_json = r#"{
2910          "type":"record",
2911          "name":"R",
2912          "fields":[{"name":"new","aliases":["old"],"type":"int"}]
2913        }"#;
2914        let writer: Schema = serde_json::from_str(writer_json).unwrap();
2915        let reader: Schema = serde_json::from_str(reader_json).unwrap();
2916        let resolved = AvroFieldBuilder::new(&writer)
2917            .with_reader_schema(&reader)
2918            .with_utf8view(false)
2919            .with_strict_mode(false)
2920            .build()
2921            .unwrap();
2922        let expected = ArrowField::new(
2923            "R",
2924            DataType::Struct(Fields::from(vec![ArrowField::new(
2925                "new",
2926                DataType::Int32,
2927                false,
2928            )])),
2929            false,
2930        );
2931        assert_eq!(resolved.field(), expected);
2932    }
2933
2934    #[test]
2935    fn test_record_field_alias_ambiguous_in_strict_mode_errors() {
2936        let writer_json = r#"{
2937          "type":"record",
2938          "name":"R",
2939          "fields":[
2940            {"name":"a","type":"int","aliases":["old"]},
2941            {"name":"b","type":"int","aliases":["old"]}
2942          ]
2943        }"#;
2944        let reader_json = r#"{
2945          "type":"record",
2946          "name":"R",
2947          "fields":[{"name":"target","type":"int","aliases":["old"]}]
2948        }"#;
2949        let writer: Schema = serde_json::from_str(writer_json).unwrap();
2950        let reader: Schema = serde_json::from_str(reader_json).unwrap();
2951        let err = AvroFieldBuilder::new(&writer)
2952            .with_reader_schema(&reader)
2953            .with_utf8view(false)
2954            .with_strict_mode(true)
2955            .build()
2956            .unwrap_err()
2957            .to_string();
2958        assert!(
2959            err.contains("Ambiguous alias 'old'"),
2960            "expected ambiguous-alias error, got: {err}"
2961        );
2962    }
2963
2964    #[test]
2965    fn test_pragmatic_writer_field_alias_mapping_non_strict() {
2966        let writer_json = r#"{
2967          "type":"record",
2968          "name":"R",
2969          "fields":[{"name":"before","type":"int","aliases":["now"]}]
2970        }"#;
2971        let reader_json = r#"{
2972          "type":"record",
2973          "name":"R",
2974          "fields":[{"name":"now","type":"int"}]
2975        }"#;
2976        let writer: Schema = serde_json::from_str(writer_json).unwrap();
2977        let reader: Schema = serde_json::from_str(reader_json).unwrap();
2978        let resolved = AvroFieldBuilder::new(&writer)
2979            .with_reader_schema(&reader)
2980            .with_utf8view(false)
2981            .with_strict_mode(false)
2982            .build()
2983            .unwrap();
2984        let expected = ArrowField::new(
2985            "R",
2986            DataType::Struct(Fields::from(vec![ArrowField::new(
2987                "now",
2988                DataType::Int32,
2989                false,
2990            )])),
2991            false,
2992        );
2993        assert_eq!(resolved.field(), expected);
2994    }
2995
2996    #[test]
2997    fn test_missing_reader_field_null_first_no_default_is_ok() {
2998        let writer_json = r#"{
2999          "type":"record",
3000          "name":"R",
3001          "fields":[{"name":"a","type":"int"}]
3002        }"#;
3003        let reader_json = r#"{
3004          "type":"record",
3005          "name":"R",
3006          "fields":[
3007            {"name":"a","type":"int"},
3008            {"name":"b","type":["null","int"]}
3009          ]
3010        }"#;
3011        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3012        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3013        let resolved = AvroFieldBuilder::new(&writer)
3014            .with_reader_schema(&reader)
3015            .with_utf8view(false)
3016            .with_strict_mode(false)
3017            .build()
3018            .unwrap();
3019        let expected = ArrowField::new(
3020            "R",
3021            DataType::Struct(Fields::from(vec![
3022                ArrowField::new("a", DataType::Int32, false),
3023                ArrowField::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
3024                    AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(),
3025                    "null".to_string(),
3026                )])),
3027            ])),
3028            false,
3029        );
3030        assert_eq!(resolved.field(), expected);
3031    }
3032
3033    #[test]
3034    fn test_missing_reader_field_null_second_without_default_errors() {
3035        let writer_json = r#"{
3036          "type":"record",
3037          "name":"R",
3038          "fields":[{"name":"a","type":"int"}]
3039        }"#;
3040        let reader_json = r#"{
3041          "type":"record",
3042          "name":"R",
3043          "fields":[
3044            {"name":"a","type":"int"},
3045            {"name":"b","type":["int","null"]}
3046          ]
3047        }"#;
3048        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3049        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3050        let err = AvroFieldBuilder::new(&writer)
3051            .with_reader_schema(&reader)
3052            .with_utf8view(false)
3053            .with_strict_mode(false)
3054            .build()
3055            .unwrap_err()
3056            .to_string();
3057        assert!(
3058            err.contains("must have a default value"),
3059            "expected missing-default error, got: {err}"
3060        );
3061    }
3062
3063    #[test]
3064    fn test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
3065        let field = ArrowField::new("x", DataType::Int32, true);
3066        let injected_json =
3067            r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3068                .to_string();
3069        let mut md = HashMap::new();
3070        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
3071        md.insert("custom".to_string(), "123".to_string());
3072        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3073        let opts = AvroSchemaOptions {
3074            null_order: Some(Nullability::NullSecond),
3075            strip_metadata: false,
3076        };
3077        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3078        assert_eq!(
3079            out.json_string, injected_json,
3080            "When strip_metadata=false and avro.schema is present, return the embedded JSON verbatim"
3081        );
3082        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3083        assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
3084        assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
3085    }
3086
3087    #[test]
3088    fn test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough() {
3089        let field = ArrowField::new("x", DataType::Int32, true);
3090        let injected_json =
3091            r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3092                .to_string();
3093        let mut md = HashMap::new();
3094        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
3095        md.insert("custom_meta".to_string(), "7".to_string());
3096        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3097        let opts = AvroSchemaOptions {
3098            null_order: Some(Nullability::NullFirst),
3099            strip_metadata: true,
3100        };
3101        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3102        assert_json_contains(&out.json_string, "\"type\":\"record\"");
3103        assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
3104        assert_json_contains(&out.json_string, "\"custom_meta\":7");
3105    }
3106
3107    #[test]
3108    fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
3109        let field = ArrowField::new("s", DataType::Utf8, true);
3110        let arrow_schema = single_field_schema(field);
3111        let opts = AvroSchemaOptions {
3112            null_order: Some(Nullability::NullFirst),
3113            strip_metadata: true,
3114        };
3115        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3116        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3117        let arr = v["fields"][0]["type"]
3118            .as_array()
3119            .expect("nullable primitive should be Avro union array");
3120        assert_eq!(arr[0], Value::String("null".into()));
3121        assert_eq!(arr[1], Value::String("string".into()));
3122    }
3123
3124    #[test]
3125    fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
3126        let field = ArrowField::new("s", DataType::Utf8, true);
3127        let arrow_schema = single_field_schema(field);
3128        let opts = AvroSchemaOptions {
3129            null_order: Some(Nullability::NullSecond),
3130            strip_metadata: true,
3131        };
3132        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3133        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3134        let arr = v["fields"][0]["type"]
3135            .as_array()
3136            .expect("nullable primitive should be Avro union array");
3137        assert_eq!(arr[0], Value::String("string".into()));
3138        assert_eq!(arr[1], Value::String("null".into()));
3139    }
3140
3141    #[test]
3142    fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata() {
3143        let uf: UnionFields = vec![
3144            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3145            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
3146        ]
3147        .into_iter()
3148        .collect();
3149        let union_dt = DataType::Union(uf, UnionMode::Dense);
3150        let arrow_schema = single_field_schema(ArrowField::new("u", union_dt, true));
3151        let with_extras = AvroSchema::from_arrow_with_options(
3152            &arrow_schema,
3153            Some(AvroSchemaOptions {
3154                null_order: Some(Nullability::NullFirst),
3155                strip_metadata: false,
3156            }),
3157        )
3158        .unwrap();
3159        let v_with: Value = serde_json::from_str(&with_extras.json_string).unwrap();
3160        let union_arr = v_with["fields"][0]["type"].as_array().expect("union array");
3161        let first_obj = union_arr
3162            .iter()
3163            .find(|b| b.is_object())
3164            .expect("expected an object branch with extras");
3165        let obj = first_obj.as_object().unwrap();
3166        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3167        assert_eq!(
3168            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3169            Some("dense")
3170        );
3171        let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
3172            .as_array()
3173            .expect("arrowUnionTypeIds array")
3174            .iter()
3175            .map(|n| n.as_i64().expect("i64"))
3176            .collect();
3177        assert_eq!(type_ids, vec![2, 7]);
3178        let stripped = AvroSchema::from_arrow_with_options(
3179            &arrow_schema,
3180            Some(AvroSchemaOptions {
3181                null_order: Some(Nullability::NullFirst),
3182                strip_metadata: true,
3183            }),
3184        )
3185        .unwrap();
3186        let v_stripped: Value = serde_json::from_str(&stripped.json_string).unwrap();
3187        let union_arr2 = v_stripped["fields"][0]["type"]
3188            .as_array()
3189            .expect("union array");
3190        assert!(
3191            !union_arr2.iter().any(|b| b
3192                .as_object()
3193                .is_some_and(|m| m.contains_key("arrowUnionMode"))),
3194            "extras must be removed when strip_metadata=true"
3195        );
3196        assert_eq!(union_arr2[0], Value::String("null".into()));
3197        assert_eq!(union_arr2[1], Value::String("int".into()));
3198        assert_eq!(union_arr2[2], Value::String("string".into()));
3199    }
3200
3201    #[test]
3202    fn test_project_empty_projection() {
3203        let schema_json = r#"{
3204            "type": "record",
3205            "name": "Test",
3206            "fields": [
3207                {"name": "a", "type": "int"},
3208                {"name": "b", "type": "string"}
3209            ]
3210        }"#;
3211        let schema = AvroSchema::new(schema_json.to_string());
3212        let projected = schema.project(&[]).unwrap();
3213        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3214        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3215        assert!(
3216            fields.is_empty(),
3217            "Empty projection should yield empty fields"
3218        );
3219    }
3220
3221    #[test]
3222    fn test_project_single_field() {
3223        let schema_json = r#"{
3224            "type": "record",
3225            "name": "Test",
3226            "fields": [
3227                {"name": "a", "type": "int"},
3228                {"name": "b", "type": "string"},
3229                {"name": "c", "type": "long"}
3230            ]
3231        }"#;
3232        let schema = AvroSchema::new(schema_json.to_string());
3233        let projected = schema.project(&[1]).unwrap();
3234        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3235        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3236        assert_eq!(fields.len(), 1);
3237        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("b"));
3238    }
3239
3240    #[test]
3241    fn test_project_multiple_fields() {
3242        let schema_json = r#"{
3243            "type": "record",
3244            "name": "Test",
3245            "fields": [
3246                {"name": "a", "type": "int"},
3247                {"name": "b", "type": "string"},
3248                {"name": "c", "type": "long"},
3249                {"name": "d", "type": "boolean"}
3250            ]
3251        }"#;
3252        let schema = AvroSchema::new(schema_json.to_string());
3253        let projected = schema.project(&[0, 2, 3]).unwrap();
3254        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3255        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3256        assert_eq!(fields.len(), 3);
3257        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3258        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("c"));
3259        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("d"));
3260    }
3261
3262    #[test]
3263    fn test_project_all_fields() {
3264        let schema_json = r#"{
3265            "type": "record",
3266            "name": "Test",
3267            "fields": [
3268                {"name": "a", "type": "int"},
3269                {"name": "b", "type": "string"}
3270            ]
3271        }"#;
3272        let schema = AvroSchema::new(schema_json.to_string());
3273        let projected = schema.project(&[0, 1]).unwrap();
3274        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3275        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3276        assert_eq!(fields.len(), 2);
3277        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3278        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("b"));
3279    }
3280
3281    #[test]
3282    fn test_project_reorder_fields() {
3283        let schema_json = r#"{
3284            "type": "record",
3285            "name": "Test",
3286            "fields": [
3287                {"name": "a", "type": "int"},
3288                {"name": "b", "type": "string"},
3289                {"name": "c", "type": "long"}
3290            ]
3291        }"#;
3292        let schema = AvroSchema::new(schema_json.to_string());
3293        // Project in reverse order
3294        let projected = schema.project(&[2, 0, 1]).unwrap();
3295        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3296        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3297        assert_eq!(fields.len(), 3);
3298        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("c"));
3299        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("a"));
3300        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("b"));
3301    }
3302
3303    #[test]
3304    fn test_project_preserves_record_metadata() {
3305        let schema_json = r#"{
3306            "type": "record",
3307            "name": "MyRecord",
3308            "namespace": "com.example",
3309            "doc": "A test record",
3310            "aliases": ["OldRecord"],
3311            "fields": [
3312                {"name": "a", "type": "int"},
3313                {"name": "b", "type": "string"}
3314            ]
3315        }"#;
3316        let schema = AvroSchema::new(schema_json.to_string());
3317        let projected = schema.project(&[0]).unwrap();
3318        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3319        assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("MyRecord"));
3320        assert_eq!(
3321            v.get("namespace").and_then(|n| n.as_str()),
3322            Some("com.example")
3323        );
3324        assert_eq!(v.get("doc").and_then(|n| n.as_str()), Some("A test record"));
3325        assert!(v.get("aliases").is_some());
3326    }
3327
3328    #[test]
3329    fn test_project_preserves_field_metadata() {
3330        let schema_json = r#"{
3331            "type": "record",
3332            "name": "Test",
3333            "fields": [
3334                {"name": "a", "type": "int", "doc": "Field A", "default": 0},
3335                {"name": "b", "type": "string"}
3336            ]
3337        }"#;
3338        let schema = AvroSchema::new(schema_json.to_string());
3339        let projected = schema.project(&[0]).unwrap();
3340        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3341        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3342        assert_eq!(
3343            fields[0].get("doc").and_then(|d| d.as_str()),
3344            Some("Field A")
3345        );
3346        assert_eq!(fields[0].get("default").and_then(|d| d.as_i64()), Some(0));
3347    }
3348
3349    #[test]
3350    fn test_project_with_nested_record() {
3351        let schema_json = r#"{
3352            "type": "record",
3353            "name": "Outer",
3354            "fields": [
3355                {"name": "id", "type": "int"},
3356                {"name": "inner", "type": {
3357                    "type": "record",
3358                    "name": "Inner",
3359                    "fields": [
3360                        {"name": "x", "type": "int"},
3361                        {"name": "y", "type": "string"}
3362                    ]
3363                }},
3364                {"name": "value", "type": "double"}
3365            ]
3366        }"#;
3367        let schema = AvroSchema::new(schema_json.to_string());
3368        let projected = schema.project(&[1]).unwrap();
3369        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3370        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3371        assert_eq!(fields.len(), 1);
3372        assert_eq!(
3373            fields[0].get("name").and_then(|n| n.as_str()),
3374            Some("inner")
3375        );
3376        // Verify nested record structure is preserved
3377        let inner_type = fields[0].get("type").unwrap();
3378        assert_eq!(
3379            inner_type.get("type").and_then(|t| t.as_str()),
3380            Some("record")
3381        );
3382        assert_eq!(
3383            inner_type.get("name").and_then(|n| n.as_str()),
3384            Some("Inner")
3385        );
3386    }
3387
3388    #[test]
3389    fn test_project_with_complex_field_types() {
3390        let schema_json = r#"{
3391            "type": "record",
3392            "name": "Test",
3393            "fields": [
3394                {"name": "arr", "type": {"type": "array", "items": "int"}},
3395                {"name": "map", "type": {"type": "map", "values": "string"}},
3396                {"name": "union", "type": ["null", "int"]}
3397            ]
3398        }"#;
3399        let schema = AvroSchema::new(schema_json.to_string());
3400        let projected = schema.project(&[0, 2]).unwrap();
3401        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3402        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3403        assert_eq!(fields.len(), 2);
3404        // Verify array type is preserved
3405        let arr_type = fields[0].get("type").unwrap();
3406        assert_eq!(arr_type.get("type").and_then(|t| t.as_str()), Some("array"));
3407        // Verify union type is preserved
3408        let union_type = fields[1].get("type").unwrap();
3409        assert!(union_type.is_array());
3410    }
3411
3412    #[test]
3413    fn test_project_error_invalid_json() {
3414        let schema = AvroSchema::new("not valid json".to_string());
3415        let err = schema.project(&[0]).unwrap_err();
3416        let msg = err.to_string();
3417        assert!(
3418            msg.contains("Invalid Avro schema JSON"),
3419            "Expected parse error, got: {msg}"
3420        );
3421    }
3422
3423    #[test]
3424    fn test_project_error_not_object() {
3425        // Primitive type schema (not a JSON object)
3426        let schema = AvroSchema::new(r#""string""#.to_string());
3427        let err = schema.project(&[0]).unwrap_err();
3428        let msg = err.to_string();
3429        assert!(
3430            msg.contains("must be a JSON object"),
3431            "Expected object error, got: {msg}"
3432        );
3433    }
3434
3435    #[test]
3436    fn test_project_error_array_schema() {
3437        // Array (list) is a valid JSON but not a record
3438        let schema = AvroSchema::new(r#"["null", "int"]"#.to_string());
3439        let err = schema.project(&[0]).unwrap_err();
3440        let msg = err.to_string();
3441        assert!(
3442            msg.contains("must be a JSON object"),
3443            "Expected object error for array schema, got: {msg}"
3444        );
3445    }
3446
3447    #[test]
3448    fn test_project_error_type_not_record() {
3449        let schema_json = r#"{
3450            "type": "enum",
3451            "name": "Color",
3452            "symbols": ["RED", "GREEN", "BLUE"]
3453        }"#;
3454        let schema = AvroSchema::new(schema_json.to_string());
3455        let err = schema.project(&[0]).unwrap_err();
3456        let msg = err.to_string();
3457        assert!(
3458            msg.contains("must be an Avro record") && msg.contains("'enum'"),
3459            "Expected type mismatch error, got: {msg}"
3460        );
3461    }
3462
3463    #[test]
3464    fn test_project_error_type_array() {
3465        let schema_json = r#"{
3466            "type": "array",
3467            "items": "int"
3468        }"#;
3469        let schema = AvroSchema::new(schema_json.to_string());
3470        let err = schema.project(&[0]).unwrap_err();
3471        let msg = err.to_string();
3472        assert!(
3473            msg.contains("must be an Avro record") && msg.contains("'array'"),
3474            "Expected type mismatch error for array type, got: {msg}"
3475        );
3476    }
3477
3478    #[test]
3479    fn test_project_error_type_fixed() {
3480        let schema_json = r#"{
3481            "type": "fixed",
3482            "name": "MD5",
3483            "size": 16
3484        }"#;
3485        let schema = AvroSchema::new(schema_json.to_string());
3486        let err = schema.project(&[0]).unwrap_err();
3487        let msg = err.to_string();
3488        assert!(
3489            msg.contains("must be an Avro record") && msg.contains("'fixed'"),
3490            "Expected type mismatch error for fixed type, got: {msg}"
3491        );
3492    }
3493
3494    #[test]
3495    fn test_project_error_type_map() {
3496        let schema_json = r#"{
3497            "type": "map",
3498            "values": "string"
3499        }"#;
3500        let schema = AvroSchema::new(schema_json.to_string());
3501        let err = schema.project(&[0]).unwrap_err();
3502        let msg = err.to_string();
3503        assert!(
3504            msg.contains("must be an Avro record") && msg.contains("'map'"),
3505            "Expected type mismatch error for map type, got: {msg}"
3506        );
3507    }
3508
3509    #[test]
3510    fn test_project_error_missing_type_field() {
3511        let schema_json = r#"{
3512            "name": "Test",
3513            "fields": [{"name": "a", "type": "int"}]
3514        }"#;
3515        let schema = AvroSchema::new(schema_json.to_string());
3516        let err = schema.project(&[0]).unwrap_err();
3517        let msg = err.to_string();
3518        assert!(
3519            msg.contains("missing required 'type' field"),
3520            "Expected missing type error, got: {msg}"
3521        );
3522    }
3523
3524    #[test]
3525    fn test_project_error_missing_fields() {
3526        let schema_json = r#"{
3527            "type": "record",
3528            "name": "Test"
3529        }"#;
3530        let schema = AvroSchema::new(schema_json.to_string());
3531        let err = schema.project(&[0]).unwrap_err();
3532        let msg = err.to_string();
3533        assert!(
3534            msg.contains("missing required 'fields'"),
3535            "Expected missing fields error, got: {msg}"
3536        );
3537    }
3538
3539    #[test]
3540    fn test_project_error_fields_not_array() {
3541        let schema_json = r#"{
3542            "type": "record",
3543            "name": "Test",
3544            "fields": "not an array"
3545        }"#;
3546        let schema = AvroSchema::new(schema_json.to_string());
3547        let err = schema.project(&[0]).unwrap_err();
3548        let msg = err.to_string();
3549        assert!(
3550            msg.contains("'fields' must be an array"),
3551            "Expected fields array error, got: {msg}"
3552        );
3553    }
3554
3555    #[test]
3556    fn test_project_error_index_out_of_bounds() {
3557        let schema_json = r#"{
3558            "type": "record",
3559            "name": "Test",
3560            "fields": [
3561                {"name": "a", "type": "int"},
3562                {"name": "b", "type": "string"}
3563            ]
3564        }"#;
3565        let schema = AvroSchema::new(schema_json.to_string());
3566        let err = schema.project(&[5]).unwrap_err();
3567        let msg = err.to_string();
3568        assert!(
3569            msg.contains("out of bounds") && msg.contains("5") && msg.contains("2"),
3570            "Expected out of bounds error, got: {msg}"
3571        );
3572    }
3573
3574    #[test]
3575    fn test_project_error_index_out_of_bounds_edge() {
3576        let schema_json = r#"{
3577            "type": "record",
3578            "name": "Test",
3579            "fields": [
3580                {"name": "a", "type": "int"}
3581            ]
3582        }"#;
3583        let schema = AvroSchema::new(schema_json.to_string());
3584        // Index 1 is just out of bounds for a 1-element array
3585        let err = schema.project(&[1]).unwrap_err();
3586        let msg = err.to_string();
3587        assert!(
3588            msg.contains("out of bounds") && msg.contains("1"),
3589            "Expected out of bounds error for edge case, got: {msg}"
3590        );
3591    }
3592
3593    #[test]
3594    fn test_project_error_duplicate_index() {
3595        let schema_json = r#"{
3596            "type": "record",
3597            "name": "Test",
3598            "fields": [
3599                {"name": "a", "type": "int"},
3600                {"name": "b", "type": "string"},
3601                {"name": "c", "type": "long"}
3602            ]
3603        }"#;
3604        let schema = AvroSchema::new(schema_json.to_string());
3605        let err = schema.project(&[0, 1, 0]).unwrap_err();
3606        let msg = err.to_string();
3607        assert!(
3608            msg.contains("Duplicate projection index") && msg.contains("0"),
3609            "Expected duplicate index error, got: {msg}"
3610        );
3611    }
3612
3613    #[test]
3614    fn test_project_error_duplicate_index_consecutive() {
3615        let schema_json = r#"{
3616            "type": "record",
3617            "name": "Test",
3618            "fields": [
3619                {"name": "a", "type": "int"},
3620                {"name": "b", "type": "string"}
3621            ]
3622        }"#;
3623        let schema = AvroSchema::new(schema_json.to_string());
3624        let err = schema.project(&[1, 1]).unwrap_err();
3625        let msg = err.to_string();
3626        assert!(
3627            msg.contains("Duplicate projection index") && msg.contains("1"),
3628            "Expected duplicate index error for consecutive duplicates, got: {msg}"
3629        );
3630    }
3631
3632    #[test]
3633    fn test_project_with_empty_fields() {
3634        let schema_json = r#"{
3635            "type": "record",
3636            "name": "EmptyRecord",
3637            "fields": []
3638        }"#;
3639        let schema = AvroSchema::new(schema_json.to_string());
3640        // Projecting empty from empty should succeed
3641        let projected = schema.project(&[]).unwrap();
3642        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3643        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3644        assert!(fields.is_empty());
3645    }
3646
3647    #[test]
3648    fn test_project_empty_fields_index_out_of_bounds() {
3649        let schema_json = r#"{
3650            "type": "record",
3651            "name": "EmptyRecord",
3652            "fields": []
3653        }"#;
3654        let schema = AvroSchema::new(schema_json.to_string());
3655        let err = schema.project(&[0]).unwrap_err();
3656        let msg = err.to_string();
3657        assert!(
3658            msg.contains("out of bounds") && msg.contains("0 fields"),
3659            "Expected out of bounds error for empty record, got: {msg}"
3660        );
3661    }
3662
3663    #[test]
3664    fn test_project_result_is_valid_avro_schema() {
3665        let schema_json = r#"{
3666            "type": "record",
3667            "name": "Test",
3668            "namespace": "com.example",
3669            "fields": [
3670                {"name": "id", "type": "long"},
3671                {"name": "name", "type": "string"},
3672                {"name": "active", "type": "boolean"}
3673            ]
3674        }"#;
3675        let schema = AvroSchema::new(schema_json.to_string());
3676        let projected = schema.project(&[0, 2]).unwrap();
3677        // Verify the projected schema can be parsed as a valid Avro schema
3678        let parsed = projected.schema();
3679        assert!(parsed.is_ok(), "Projected schema should be valid Avro");
3680        match parsed.unwrap() {
3681            Schema::Complex(ComplexType::Record(r)) => {
3682                assert_eq!(r.name, "Test");
3683                assert_eq!(r.namespace, Some("com.example"));
3684                assert_eq!(r.fields.len(), 2);
3685                assert_eq!(r.fields[0].name, "id");
3686                assert_eq!(r.fields[1].name, "active");
3687            }
3688            _ => panic!("Expected Record schema"),
3689        }
3690    }
3691
3692    #[test]
3693    fn test_project_non_contiguous_indices() {
3694        let schema_json = r#"{
3695            "type": "record",
3696            "name": "Test",
3697            "fields": [
3698                {"name": "f0", "type": "int"},
3699                {"name": "f1", "type": "int"},
3700                {"name": "f2", "type": "int"},
3701                {"name": "f3", "type": "int"},
3702                {"name": "f4", "type": "int"}
3703            ]
3704        }"#;
3705        let schema = AvroSchema::new(schema_json.to_string());
3706        // Select every other field
3707        let projected = schema.project(&[0, 2, 4]).unwrap();
3708        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3709        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3710        assert_eq!(fields.len(), 3);
3711        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f0"));
3712        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("f2"));
3713        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("f4"));
3714    }
3715
3716    #[test]
3717    fn test_project_single_field_from_many() {
3718        let schema_json = r#"{
3719            "type": "record",
3720            "name": "BigRecord",
3721            "fields": [
3722                {"name": "f0", "type": "int"},
3723                {"name": "f1", "type": "int"},
3724                {"name": "f2", "type": "int"},
3725                {"name": "f3", "type": "int"},
3726                {"name": "f4", "type": "int"},
3727                {"name": "f5", "type": "int"},
3728                {"name": "f6", "type": "int"},
3729                {"name": "f7", "type": "int"},
3730                {"name": "f8", "type": "int"},
3731                {"name": "f9", "type": "int"}
3732            ]
3733        }"#;
3734        let schema = AvroSchema::new(schema_json.to_string());
3735        // Select only the last field
3736        let projected = schema.project(&[9]).unwrap();
3737        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3738        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3739        assert_eq!(fields.len(), 1);
3740        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f9"));
3741    }
3742}