Skip to main content

arrow_avro/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Schema representations for Arrow.
19
20#[cfg(feature = "canonical_extension_types")]
21use arrow_schema::extension::ExtensionType;
22use arrow_schema::{
23    ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
24    UnionMode,
25};
26use serde::{Deserialize, Serialize};
27use serde_json::{Map as JsonMap, Value, json};
28#[cfg(feature = "sha256")]
29use sha2::{Digest, Sha256};
30use std::borrow::Cow;
31use std::cmp::PartialEq;
32use std::collections::hash_map::Entry;
33use std::collections::{HashMap, HashSet};
34use strum_macros::AsRefStr;
35
36/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
37pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
38
39/// The Confluent "magic" byte (`0x00`)
40pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
41
42/// The maximum possible length of a prefix.
43/// SHA256 (32) + single-object magic (2)
44pub const MAX_PREFIX_LEN: usize = 34;
45
46/// The metadata key used for storing the JSON encoded `Schema`
47pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
48
49/// Metadata key used to represent Avro enum symbols in an Arrow schema.
50pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
51
52/// Metadata key used to store the default value of a field in an Avro schema.
53pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
54
55/// Metadata key used to store the name of a type in an Avro schema.
56pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
57
58/// Metadata key used to store the name of a type in an Avro schema.
59pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
60
61/// Metadata key used to store the documentation for a type in an Avro schema.
62pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
63
64/// Default name for the root record in an Avro schema.
65pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
66
67/// Avro types are not nullable, with nullability instead encoded as a union
68/// where one of the variants is the null type.
69///
70/// To accommodate this, we specially case two-variant unions where one of the
71/// variants is the null type, and use this to derive arrow's notion of nullability
72#[derive(Debug, Copy, Clone, PartialEq, Default)]
73pub(crate) enum Nullability {
74    /// The nulls are encoded as the first union variant
75    #[default]
76    NullFirst,
77    /// The nulls are encoded as the second union variant
78    NullSecond,
79}
80
81impl Nullability {
82    /// Returns the index of the non-null variant in the union.
83    pub(crate) fn non_null_index(&self) -> usize {
84        match self {
85            Nullability::NullFirst => 1,
86            Nullability::NullSecond => 0,
87        }
88    }
89}
90
91/// Either a [`PrimitiveType`] or a reference to a previously defined named type
92///
93/// <https://avro.apache.org/docs/1.11.1/specification/#names>
94#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
95#[serde(untagged)]
96/// A type name in an Avro schema
97///
98/// This represents the different ways a type can be referenced in an Avro schema.
99pub(crate) enum TypeName<'a> {
100    /// A primitive type like null, boolean, int, etc.
101    Primitive(PrimitiveType),
102    /// A reference to another named type
103    Ref(&'a str),
104}
105
106/// A primitive type
107///
108/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
110#[serde(rename_all = "camelCase")]
111#[strum(serialize_all = "lowercase")]
112pub(crate) enum PrimitiveType {
113    /// null: no value
114    Null,
115    /// boolean: a binary value
116    Boolean,
117    /// int: 32-bit signed integer
118    Int,
119    /// long: 64-bit signed integer
120    Long,
121    /// float: single precision (32-bit) IEEE 754 floating-point number
122    Float,
123    /// double: double precision (64-bit) IEEE 754 floating-point number
124    Double,
125    /// bytes: sequence of 8-bit unsigned bytes
126    Bytes,
127    /// string: Unicode character sequence
128    String,
129}
130
131/// Additional attributes within a `Schema`
132///
133/// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration>
134#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
135#[serde(rename_all = "camelCase")]
136pub(crate) struct Attributes<'a> {
137    /// A logical type name
138    ///
139    /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types>
140    #[serde(default)]
141    pub(crate) logical_type: Option<&'a str>,
142
143    /// Additional JSON attributes
144    #[serde(flatten)]
145    pub(crate) additional: HashMap<&'a str, Value>,
146}
147
148impl Attributes<'_> {
149    /// Returns the field metadata for this [`Attributes`]
150    pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
151        self.additional
152            .iter()
153            .map(|(k, v)| (k.to_string(), v.to_string()))
154            .collect()
155    }
156}
157
158/// A type definition that is not a variant of [`ComplexType`]
159#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
160#[serde(rename_all = "camelCase")]
161pub(crate) struct Type<'a> {
162    /// The type of this Avro data structure
163    #[serde(borrow)]
164    pub(crate) r#type: TypeName<'a>,
165    /// Additional attributes associated with this type
166    #[serde(flatten)]
167    pub(crate) attributes: Attributes<'a>,
168}
169
170/// An Avro schema
171///
172/// This represents the different shapes of Avro schemas as defined in the specification.
173/// See <https://avro.apache.org/docs/1.11.1/specification/#schemas> for more details.
174#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
175#[serde(untagged)]
176pub(crate) enum Schema<'a> {
177    /// A direct type name (primitive or reference)
178    #[serde(borrow)]
179    TypeName(TypeName<'a>),
180    /// A union of multiple schemas (e.g., ["null", "string"])
181    #[serde(borrow)]
182    Union(Vec<Schema<'a>>),
183    /// A complex type such as record, array, map, etc.
184    #[serde(borrow)]
185    Complex(ComplexType<'a>),
186    /// A type with attributes
187    #[serde(borrow)]
188    Type(Type<'a>),
189}
190
191/// A complex type
192///
193/// <https://avro.apache.org/docs/1.11.1/specification/#complex-types>
194#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
195#[serde(tag = "type", rename_all = "camelCase")]
196pub(crate) enum ComplexType<'a> {
197    /// Record type: a sequence of fields with names and types
198    #[serde(borrow)]
199    Record(Record<'a>),
200    /// Enum type: a set of named values
201    #[serde(borrow)]
202    Enum(Enum<'a>),
203    /// Array type: a sequence of values of the same type
204    #[serde(borrow)]
205    Array(Array<'a>),
206    /// Map type: a mapping from strings to values of the same type
207    #[serde(borrow)]
208    Map(Map<'a>),
209    /// Fixed type: a fixed-size byte array
210    #[serde(borrow)]
211    Fixed(Fixed<'a>),
212}
213
214/// A record
215///
216/// <https://avro.apache.org/docs/1.11.1/specification/#schema-record>
217#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
218pub(crate) struct Record<'a> {
219    /// Name of the record
220    #[serde(borrow)]
221    pub(crate) name: &'a str,
222    /// Optional namespace for the record, provides a way to organize names
223    #[serde(borrow, default)]
224    pub(crate) namespace: Option<&'a str>,
225    /// Optional documentation string for the record
226    #[serde(borrow, default)]
227    pub(crate) doc: Option<Cow<'a, str>>,
228    /// Alternative names for this record
229    #[serde(borrow, default)]
230    pub(crate) aliases: Vec<&'a str>,
231    /// The fields contained in this record
232    #[serde(borrow)]
233    pub(crate) fields: Vec<Field<'a>>,
234    /// Additional attributes for this record
235    #[serde(flatten)]
236    pub(crate) attributes: Attributes<'a>,
237}
238
239fn deserialize_default<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
240where
241    D: serde::Deserializer<'de>,
242{
243    Value::deserialize(deserializer).map(Some)
244}
245
246/// A field within a [`Record`]
247#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
248pub(crate) struct Field<'a> {
249    /// Name of the field within the record
250    #[serde(borrow)]
251    pub(crate) name: &'a str,
252    /// Optional documentation for this field
253    #[serde(borrow, default)]
254    pub(crate) doc: Option<Cow<'a, str>>,
255    /// The field's type definition
256    #[serde(borrow)]
257    pub(crate) r#type: Schema<'a>,
258    /// Optional default value for this field
259    #[serde(deserialize_with = "deserialize_default", default)]
260    pub(crate) default: Option<Value>,
261    /// Alternative names (aliases) for this field (Avro spec: field-level aliases).
262    /// Borrowed from input JSON where possible.
263    #[serde(borrow, default)]
264    pub(crate) aliases: Vec<&'a str>,
265}
266
267/// An enumeration
268///
269/// <https://avro.apache.org/docs/1.11.1/specification/#enums>
270#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
271pub(crate) struct Enum<'a> {
272    /// Name of the enum
273    #[serde(borrow)]
274    pub(crate) name: &'a str,
275    /// Optional namespace for the enum, provides organizational structure
276    #[serde(borrow, default)]
277    pub(crate) namespace: Option<&'a str>,
278    /// Optional documentation string describing the enum
279    #[serde(borrow, default)]
280    pub(crate) doc: Option<Cow<'a, str>>,
281    /// Alternative names for this enum
282    #[serde(borrow, default)]
283    pub(crate) aliases: Vec<&'a str>,
284    /// The symbols (values) that this enum can have
285    #[serde(borrow)]
286    pub(crate) symbols: Vec<&'a str>,
287    /// Optional default value for this enum
288    #[serde(borrow, default)]
289    pub(crate) default: Option<&'a str>,
290    /// Additional attributes for this enum
291    #[serde(flatten)]
292    pub(crate) attributes: Attributes<'a>,
293}
294
295/// An array
296///
297/// <https://avro.apache.org/docs/1.11.1/specification/#arrays>
298#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
299pub(crate) struct Array<'a> {
300    /// The schema for items in this array
301    #[serde(borrow)]
302    pub(crate) items: Box<Schema<'a>>,
303    /// Additional attributes for this array
304    #[serde(flatten)]
305    pub(crate) attributes: Attributes<'a>,
306}
307
308/// A map
309///
310/// <https://avro.apache.org/docs/1.11.1/specification/#maps>
311#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
312pub(crate) struct Map<'a> {
313    /// The schema for values in this map
314    #[serde(borrow)]
315    pub(crate) values: Box<Schema<'a>>,
316    /// Additional attributes for this map
317    #[serde(flatten)]
318    pub(crate) attributes: Attributes<'a>,
319}
320
321/// A fixed length binary array
322///
323/// <https://avro.apache.org/docs/1.11.1/specification/#fixed>
324#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
325pub(crate) struct Fixed<'a> {
326    /// Name of the fixed type
327    #[serde(borrow)]
328    pub(crate) name: &'a str,
329    /// Optional namespace for the fixed type
330    #[serde(borrow, default)]
331    pub(crate) namespace: Option<&'a str>,
332    /// Alternative names for this fixed type
333    #[serde(borrow, default)]
334    pub(crate) aliases: Vec<&'a str>,
335    /// The number of bytes in this fixed type
336    pub(crate) size: usize,
337    /// Additional attributes for this fixed type
338    #[serde(flatten)]
339    pub(crate) attributes: Attributes<'a>,
340}
341
342#[derive(Debug, Copy, Clone, PartialEq, Default)]
343pub(crate) struct AvroSchemaOptions {
344    pub(crate) null_order: Option<Nullability>,
345    pub(crate) strip_metadata: bool,
346}
347
348/// A wrapper for an Avro schema in its JSON string representation.
349#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
350pub struct AvroSchema {
351    /// The Avro schema as a JSON string.
352    pub json_string: String,
353}
354
355impl TryFrom<&ArrowSchema> for AvroSchema {
356    type Error = ArrowError;
357
358    /// Converts an `ArrowSchema` to `AvroSchema`, delegating to
359    /// `AvroSchema::from_arrow_with_options` with `None` so that the
360    /// union null ordering is decided by `Nullability::default()`.
361    fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
362        AvroSchema::from_arrow_with_options(schema, None)
363    }
364}
365
366impl AvroSchema {
367    /// Creates a new `AvroSchema` from a JSON string.
368    pub fn new(json_string: String) -> Self {
369        Self { json_string }
370    }
371
372    pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
373        serde_json::from_str(self.json_string.as_str())
374            .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
375    }
376
377    /// Returns the fingerprint of the schema, computed using the specified [`FingerprintAlgorithm`].
378    ///
379    /// The fingerprint is computed over the schema's Parsed Canonical Form
380    /// as defined by the Avro specification. Depending on `hash_type`, this
381    /// will return one of the supported [`Fingerprint`] variants:
382    /// - [`Fingerprint::Rabin`] for [`FingerprintAlgorithm::Rabin`]
383    /// - `Fingerprint::MD5` for `FingerprintAlgorithm::MD5`
384    /// - `Fingerprint::SHA256` for `FingerprintAlgorithm::SHA256`
385    ///
386    /// Note: [`FingerprintAlgorithm::Id`] or [`FingerprintAlgorithm::Id64`] cannot be used to generate a fingerprint
387    /// and will result in an error. If you intend to use a Schema Registry ID-based
388    /// wire format, either use [`SchemaStore::set`] or load the [`Fingerprint::Id`] directly via [`Fingerprint::load_fingerprint_id`] or for
389    /// [`Fingerprint::Id64`] via [`Fingerprint::load_fingerprint_id64`].
390    ///
391    /// See also: <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
392    ///
393    /// # Errors
394    /// Returns an error if deserializing the schema fails, if generating the
395    /// canonical form of the schema fails, or if `hash_type` is [`FingerprintAlgorithm::Id`].
396    ///
397    /// # Examples
398    /// ```
399    /// use arrow_avro::schema::{AvroSchema, FingerprintAlgorithm};
400    ///
401    /// let avro = AvroSchema::new("\"string\"".to_string());
402    /// let fp = avro.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
403    /// ```
404    pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
405        Self::generate_fingerprint(&self.schema()?, hash_type)
406    }
407
408    pub(crate) fn project(&self, projection: &[usize]) -> Result<Self, ArrowError> {
409        let mut value: Value = serde_json::from_str(&self.json_string)
410            .map_err(|e| ArrowError::AvroError(format!("Invalid Avro schema JSON: {e}")))?;
411        let obj = value.as_object_mut().ok_or_else(|| {
412            ArrowError::AvroError(
413                "Projected schema must be a JSON object Avro record schema".to_string(),
414            )
415        })?;
416        match obj.get("type").and_then(|v| v.as_str()) {
417            Some("record") => {}
418            Some(other) => {
419                return Err(ArrowError::AvroError(format!(
420                    "Projected schema must be an Avro record, found type '{other}'"
421                )));
422            }
423            None => {
424                return Err(ArrowError::AvroError(
425                    "Projected schema missing required 'type' field".to_string(),
426                ));
427            }
428        }
429        let fields_val = obj.get_mut("fields").ok_or_else(|| {
430            ArrowError::AvroError("Avro record schema missing required 'fields'".to_string())
431        })?;
432        let projected_fields = {
433            let mut original_fields = match fields_val {
434                Value::Array(arr) => std::mem::take(arr),
435                _ => {
436                    return Err(ArrowError::AvroError(
437                        "Avro record schema 'fields' must be an array".to_string(),
438                    ));
439                }
440            };
441            let len = original_fields.len();
442            let mut seen: HashSet<usize> = HashSet::with_capacity(projection.len());
443            let mut out: Vec<Value> = Vec::with_capacity(projection.len());
444            for &i in projection {
445                if i >= len {
446                    return Err(ArrowError::AvroError(format!(
447                        "Projection index {i} out of bounds for record with {len} fields"
448                    )));
449                }
450                if !seen.insert(i) {
451                    return Err(ArrowError::AvroError(format!(
452                        "Duplicate projection index {i}"
453                    )));
454                }
455                out.push(std::mem::replace(&mut original_fields[i], Value::Null));
456            }
457            out
458        };
459        *fields_val = Value::Array(projected_fields);
460        let json_string = serde_json::to_string(&value).map_err(|e| {
461            ArrowError::AvroError(format!(
462                "Failed to serialize projected Avro schema JSON: {e}"
463            ))
464        })?;
465        Ok(Self::new(json_string))
466    }
467
468    pub(crate) fn generate_fingerprint(
469        schema: &Schema,
470        hash_type: FingerprintAlgorithm,
471    ) -> Result<Fingerprint, ArrowError> {
472        let canonical = Self::generate_canonical_form(schema).map_err(|e| {
473            ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
474        })?;
475        match hash_type {
476            FingerprintAlgorithm::Rabin => {
477                Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
478            }
479            FingerprintAlgorithm::Id | FingerprintAlgorithm::Id64 => Err(ArrowError::SchemaError(
480                "FingerprintAlgorithm of Id or Id64 cannot be used to generate a fingerprint; \
481                if using Fingerprint::Id, pass the registry ID in instead using the set method."
482                    .to_string(),
483            )),
484            #[cfg(feature = "md5")]
485            FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
486            #[cfg(feature = "sha256")]
487            FingerprintAlgorithm::SHA256 => {
488                Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
489            }
490        }
491    }
492
493    /// Generates the Parsed Canonical Form for the given `Schema`.
494    ///
495    /// The canonical form is a standardized JSON representation of the schema,
496    /// primarily used for generating a schema fingerprint for equality checking.
497    ///
498    /// This form strips attributes that do not affect the schema's identity,
499    /// such as `doc` fields, `aliases`, and any properties not defined in the
500    /// Avro specification.
501    ///
502    /// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
503    pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
504        build_canonical(schema, None)
505    }
506
507    /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given null‑union order and optionally stripping internal Arrow metadata.
508    ///
509    /// If the input Arrow schema already contains Avro JSON in
510    /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
511    /// the exact header encoding alignment; otherwise, a new JSON is generated
512    /// honoring `null_union_order` at **all nullable sites**.
513    pub(crate) fn from_arrow_with_options(
514        schema: &ArrowSchema,
515        options: Option<AvroSchemaOptions>,
516    ) -> Result<AvroSchema, ArrowError> {
517        let opts = options.unwrap_or_default();
518        let order = opts.null_order.unwrap_or_default();
519        let strip = opts.strip_metadata;
520        if !strip {
521            if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
522                return Ok(AvroSchema::new(json.clone()));
523            }
524        }
525        let mut name_gen = NameGenerator::default();
526        let fields_json = schema
527            .fields()
528            .iter()
529            .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
530            .collect::<Result<Vec<_>, _>>()?;
531        let record_name = schema
532            .metadata
533            .get(AVRO_NAME_METADATA_KEY)
534            .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
535        let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
536        record.insert("type".into(), Value::String("record".into()));
537        record.insert(
538            "name".into(),
539            Value::String(sanitise_avro_name(record_name)),
540        );
541        if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
542            record.insert("namespace".into(), Value::String(ns.clone()));
543        }
544        if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
545            record.insert("doc".into(), Value::String(doc.clone()));
546        }
547        record.insert("fields".into(), Value::Array(fields_json));
548        extend_with_passthrough_metadata(&mut record, &schema.metadata);
549        let json_string = serde_json::to_string(&Value::Object(record))
550            .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
551        Ok(AvroSchema::new(json_string))
552    }
553}
554
555/// A stack-allocated, fixed-size buffer for the prefix.
556#[derive(Debug, Copy, Clone)]
557pub(crate) struct Prefix {
558    buf: [u8; MAX_PREFIX_LEN],
559    len: u8,
560}
561
562impl Prefix {
563    #[inline]
564    pub(crate) fn as_slice(&self) -> &[u8] {
565        &self.buf[..self.len as usize]
566    }
567}
568
569/// Defines the strategy for generating the per-record prefix for an Avro binary stream.
570#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
571pub enum FingerprintStrategy {
572    /// Use the 64-bit Rabin fingerprint (default for single-object encoding).
573    #[default]
574    Rabin,
575    /// Use a Confluent Schema Registry 32-bit ID.
576    Id(u32),
577    /// Use an Apicurio Schema Registry 64-bit ID.
578    Id64(u64),
579    #[cfg(feature = "md5")]
580    /// Use the 128-bit MD5 fingerprint.
581    MD5,
582    #[cfg(feature = "sha256")]
583    /// Use the 256-bit SHA-256 fingerprint.
584    SHA256,
585}
586
587impl From<Fingerprint> for FingerprintStrategy {
588    fn from(f: Fingerprint) -> Self {
589        Self::from(&f)
590    }
591}
592
593impl From<FingerprintAlgorithm> for FingerprintStrategy {
594    fn from(f: FingerprintAlgorithm) -> Self {
595        match f {
596            FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
597            FingerprintAlgorithm::Id => FingerprintStrategy::Id(0),
598            FingerprintAlgorithm::Id64 => FingerprintStrategy::Id64(0),
599            #[cfg(feature = "md5")]
600            FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
601            #[cfg(feature = "sha256")]
602            FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
603        }
604    }
605}
606
607impl From<&Fingerprint> for FingerprintStrategy {
608    fn from(f: &Fingerprint) -> Self {
609        match f {
610            Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
611            Fingerprint::Id(_) => FingerprintStrategy::Id(0),
612            Fingerprint::Id64(_) => FingerprintStrategy::Id64(0),
613            #[cfg(feature = "md5")]
614            Fingerprint::MD5(_) => FingerprintStrategy::MD5,
615            #[cfg(feature = "sha256")]
616            Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
617        }
618    }
619}
620
621/// Supported fingerprint algorithms for Avro schema identification.
622/// For use with Confluent Schema Registry IDs, set to None.
623#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
624pub enum FingerprintAlgorithm {
625    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
626    #[default]
627    Rabin,
628    /// Represents a 32 bit fingerprint not based on a hash algorithm, (e.g., a 32-bit Schema Registry ID.)
629    Id,
630    /// Represents a 64 bit fingerprint not based on a hash algorithm, (e.g., a 64-bit Schema Registry ID.)
631    Id64,
632    #[cfg(feature = "md5")]
633    /// 128-bit MD5 message digest.
634    MD5,
635    #[cfg(feature = "sha256")]
636    /// 256-bit SHA-256 digest.
637    SHA256,
638}
639
640/// Allow easy extraction of the algorithm used to create a fingerprint.
641impl From<&Fingerprint> for FingerprintAlgorithm {
642    fn from(fp: &Fingerprint) -> Self {
643        match fp {
644            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
645            Fingerprint::Id(_) => FingerprintAlgorithm::Id,
646            Fingerprint::Id64(_) => FingerprintAlgorithm::Id64,
647            #[cfg(feature = "md5")]
648            Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
649            #[cfg(feature = "sha256")]
650            Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
651        }
652    }
653}
654
655impl From<FingerprintStrategy> for FingerprintAlgorithm {
656    fn from(s: FingerprintStrategy) -> Self {
657        Self::from(&s)
658    }
659}
660
661impl From<&FingerprintStrategy> for FingerprintAlgorithm {
662    fn from(s: &FingerprintStrategy) -> Self {
663        match s {
664            FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
665            FingerprintStrategy::Id(_) => FingerprintAlgorithm::Id,
666            FingerprintStrategy::Id64(_) => FingerprintAlgorithm::Id64,
667            #[cfg(feature = "md5")]
668            FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
669            #[cfg(feature = "sha256")]
670            FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
671        }
672    }
673}
674
675/// A schema fingerprint in one of the supported formats.
676///
677/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
678/// instance always stores only one variant, matching its configured
679/// `FingerprintAlgorithm`, but the enum makes the API uniform.
680///
681/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
682/// <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
683#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
684pub enum Fingerprint {
685    /// A 64-bit Rabin fingerprint.
686    Rabin(u64),
687    /// A 32-bit Schema Registry ID.
688    Id(u32),
689    /// A 64-bit Schema Registry ID.
690    Id64(u64),
691    #[cfg(feature = "md5")]
692    /// A 128-bit MD5 fingerprint.
693    MD5([u8; 16]),
694    #[cfg(feature = "sha256")]
695    /// A 256-bit SHA-256 fingerprint.
696    SHA256([u8; 32]),
697}
698
699impl From<FingerprintStrategy> for Fingerprint {
700    fn from(s: FingerprintStrategy) -> Self {
701        Self::from(&s)
702    }
703}
704
705impl From<&FingerprintStrategy> for Fingerprint {
706    fn from(s: &FingerprintStrategy) -> Self {
707        match s {
708            FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
709            FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
710            FingerprintStrategy::Id64(id) => Fingerprint::Id64(*id),
711            #[cfg(feature = "md5")]
712            FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
713            #[cfg(feature = "sha256")]
714            FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
715        }
716    }
717}
718
719impl From<FingerprintAlgorithm> for Fingerprint {
720    fn from(s: FingerprintAlgorithm) -> Self {
721        match s {
722            FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
723            FingerprintAlgorithm::Id => Fingerprint::Id(0),
724            FingerprintAlgorithm::Id64 => Fingerprint::Id64(0),
725            #[cfg(feature = "md5")]
726            FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
727            #[cfg(feature = "sha256")]
728            FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
729        }
730    }
731}
732
733impl Fingerprint {
734    /// Loads the 32-bit Schema Registry fingerprint (Confluent Schema Registry ID).
735    ///
736    /// The provided `id` is in big-endian wire order; this converts it to host order
737    /// and returns `Fingerprint::Id`.
738    ///
739    /// # Returns
740    /// A `Fingerprint::Id` variant containing the 32-bit fingerprint.
741    pub fn load_fingerprint_id(id: u32) -> Self {
742        Fingerprint::Id(u32::from_be(id))
743    }
744
745    /// Loads the 64-bit Schema Registry fingerprint (Apicurio Schema Registry ID).
746    ///
747    /// The provided `id` is in big-endian wire order; this converts it to host order
748    /// and returns `Fingerprint::Id64`.
749    ///
750    /// # Returns
751    /// A `Fingerprint::Id64` variant containing the 64-bit fingerprint.
752    pub fn load_fingerprint_id64(id: u64) -> Self {
753        Fingerprint::Id64(u64::from_be(id))
754    }
755
756    /// Constructs a serialized prefix represented as a `Vec<u8>` based on the variant of the enum.
757    ///
758    /// This method serializes data in different formats depending on the variant of `self`:
759    /// - **`Id(id)`**: Uses the Confluent wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
760    ///   followed by the big-endian byte representation of the `id`.
761    /// - **`Id64(id)`**: Uses the Apicurio wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
762    ///   followed by the big-endian 8-byte representation of the `id`.
763    /// - **`Rabin(val)`**: Uses the Avro single-object specification format. This includes a different magic header
764    ///   (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte representation of the `val`.
765    /// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard extension that adds the
766    ///   `SINGLE_OBJECT_MAGIC` header followed by the provided `bytes`.
767    /// - **`SHA256(bytes)`** (optional, `sha256` feature enabled): Similar to the `MD5` variant, this is
768    ///   a non-standard extension that attaches the `SINGLE_OBJECT_MAGIC` header followed by the given `bytes`.
769    ///
770    /// # Returns
771    ///
772    /// A `Prefix` containing the serialized prefix data.
773    ///
774    /// # Features
775    ///
776    /// - You can optionally enable the `md5` feature to include the `MD5` variant.
777    /// - You can optionally enable the `sha256` feature to include the `SHA256` variant.
778    ///
779    pub(crate) fn make_prefix(&self) -> Prefix {
780        let mut buf = [0u8; MAX_PREFIX_LEN];
781        let len = match self {
782            Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
783            Self::Id64(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
784            Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
785            #[cfg(feature = "md5")]
786            Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
787            #[cfg(feature = "sha256")]
788            Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
789        };
790        Prefix { buf, len }
791    }
792}
793
794fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
795    buf: &mut [u8; MAX_PREFIX_LEN],
796    magic: &[u8; MAGIC_LEN],
797    payload: &[u8; PAYLOAD_LEN],
798) -> u8 {
799    debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
800    let total = MAGIC_LEN + PAYLOAD_LEN;
801    let prefix_slice = &mut buf[..total];
802    prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
803    prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
804    total as u8
805}
806
807/// An in-memory cache of Avro schemas, indexed by their fingerprint.
808///
809/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently.
810/// Each schema is associated with a unique [`Fingerprint`], which is generated based
811/// on the schema's canonical form and a specific hashing algorithm.
812///
813/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin,
814/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations.
815/// This ensures consistency when generating fingerprints and looking up schemas.
816/// All schemas registered will have their fingerprint computed with this algorithm, and
817/// lookups must use a matching fingerprint.
818///
819/// # Examples
820///
821/// ```no_run
822/// // Create a new store with the default Rabin fingerprinting.
823/// use arrow_avro::schema::{AvroSchema, SchemaStore};
824///
825/// let mut store = SchemaStore::new();
826/// let schema = AvroSchema::new("\"string\"".to_string());
827/// // Register the schema to get its fingerprint.
828/// let fingerprint = store.register(schema.clone()).unwrap();
829/// // Use the fingerprint to look up the schema.
830/// let retrieved_schema = store.lookup(&fingerprint).cloned();
831/// assert_eq!(retrieved_schema, Some(schema));
832/// ```
833#[derive(Debug, Clone, Default)]
834pub struct SchemaStore {
835    /// The hashing algorithm used for generating fingerprints.
836    fingerprint_algorithm: FingerprintAlgorithm,
837    /// A map from a schema's fingerprint to the schema itself.
838    schemas: HashMap<Fingerprint, AvroSchema>,
839}
840
841impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
842    type Error = ArrowError;
843
844    /// Creates a `SchemaStore` from a HashMap of schemas.
845    /// Each schema in the HashMap is registered with the new store.
846    fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
847        Ok(Self {
848            schemas,
849            ..Self::default()
850        })
851    }
852}
853
854impl SchemaStore {
855    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
856    pub fn new() -> Self {
857        Self::default()
858    }
859
860    /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin).
861    pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
862        Self {
863            fingerprint_algorithm,
864            ..Self::default()
865        }
866    }
867
868    /// Registers a schema with the store and the provided fingerprint.
869    /// Note: Confluent wire format implementations should leverage this method.
870    ///
871    /// A schema is set in the store, using the provided fingerprint. If a schema
872    /// with the same fingerprint does not already exist in the store, the new schema
873    /// is inserted. If the fingerprint already exists, the existing schema is not overwritten.
874    ///
875    /// # Arguments
876    ///
877    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to register.
878    /// * `schema` - The `AvroSchema` to register.
879    ///
880    /// # Returns
881    ///
882    /// A `Result` returning the provided `Fingerprint` of the schema if successful,
883    /// or an `ArrowError` on failure.
884    pub fn set(
885        &mut self,
886        fingerprint: Fingerprint,
887        schema: AvroSchema,
888    ) -> Result<Fingerprint, ArrowError> {
889        match self.schemas.entry(fingerprint) {
890            Entry::Occupied(entry) => {
891                if entry.get() != &schema {
892                    return Err(ArrowError::ComputeError(format!(
893                        "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
894                    )));
895                }
896            }
897            Entry::Vacant(entry) => {
898                entry.insert(schema);
899            }
900        }
901        Ok(fingerprint)
902    }
903
904    /// Registers a schema with the store and returns its fingerprint.
905    ///
906    /// A fingerprint is calculated for the given schema using the store's configured
907    /// hash type. If a schema with the same fingerprint does not already exist in the
908    /// store, the new schema is inserted. If the fingerprint already exists, the
909    /// existing schema is not overwritten. If FingerprintAlgorithm is set to Id or Id64, this
910    /// method will return an error. Confluent wire format implementations should leverage the
911    /// set method instead.
912    ///
913    /// # Arguments
914    ///
915    /// * `schema` - The `AvroSchema` to register.
916    ///
917    /// # Returns
918    ///
919    /// A `Result` containing the `Fingerprint` of the schema if successful,
920    /// or an `ArrowError` on failure.
921    pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
922        if self.fingerprint_algorithm == FingerprintAlgorithm::Id
923            || self.fingerprint_algorithm == FingerprintAlgorithm::Id64
924        {
925            return Err(ArrowError::SchemaError(
926                "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
927            Use the set method directly instead, providing a valid fingerprint"
928                    .to_string(),
929            ));
930        }
931        let fingerprint =
932            AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
933        self.set(fingerprint, schema)?;
934        Ok(fingerprint)
935    }
936
937    /// Looks up a schema by its `Fingerprint`.
938    ///
939    /// # Arguments
940    ///
941    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to look up.
942    ///
943    /// # Returns
944    ///
945    /// An `Option` containing a clone of the `AvroSchema` if found, otherwise `None`.
946    pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
947        self.schemas.get(fingerprint)
948    }
949
950    /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently
951    /// held by this [`SchemaStore`].
952    ///
953    /// The order of the returned fingerprints is unspecified and should not be
954    /// relied upon.
955    pub fn fingerprints(&self) -> Vec<Fingerprint> {
956        self.schemas.keys().copied().collect()
957    }
958
959    /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting.
960    pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
961        self.fingerprint_algorithm
962    }
963}
964
965fn quote(s: &str) -> Result<String, ArrowError> {
966    serde_json::to_string(s)
967        .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
968}
969
970// Avro names are defined by a `name` and an optional `namespace`.
971// The full name is composed of the namespace and the name, separated by a dot.
972//
973// Avro specification defines two ways to specify a full name:
974// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
975//    In this case, the `namespace` attribute is ignored.
976// 2. The `name` attribute contains the simple name (e.g., "d") and the
977//    `namespace` attribute contains the namespace (e.g., "a.b.c").
978//
979// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
980// Complex paths with quotes or backticks like `a."hi".b` are not supported.
981//
982// This function constructs the full name and extracts the namespace,
983// handling both ways of specifying the name. It prioritizes a namespace
984// defined within the `name` attribute itself, then the explicit `namespace_attr`,
985// and finally the `enclosing_ns`.
986pub(crate) fn make_full_name(
987    name: &str,
988    namespace_attr: Option<&str>,
989    enclosing_ns: Option<&str>,
990) -> (String, Option<String>) {
991    // `name` already contains a dot then treat as full-name, ignore namespace.
992    if let Some((ns, _)) = name.rsplit_once('.') {
993        return (name.to_string(), Some(ns.to_string()));
994    }
995    match namespace_attr.or(enclosing_ns) {
996        Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
997        None => (name.to_string(), None),
998    }
999}
1000
1001fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
1002    Ok(match schema {
1003        Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
1004            TypeName::Primitive(pt) => quote(pt.as_ref())?,
1005            TypeName::Ref(name) => {
1006                let (full_name, _) = make_full_name(name, None, enclosing_ns);
1007                quote(&full_name)?
1008            }
1009        },
1010        Schema::Union(branches) => format!(
1011            "[{}]",
1012            branches
1013                .iter()
1014                .map(|b| build_canonical(b, enclosing_ns))
1015                .collect::<Result<Vec<_>, _>>()?
1016                .join(",")
1017        ),
1018        Schema::Complex(ct) => match ct {
1019            ComplexType::Record(r) => {
1020                let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
1021                let fields = r
1022                    .fields
1023                    .iter()
1024                    .map(|f| {
1025                        // PCF [STRIP] per Avro spec: keep only attributes relevant to parsing
1026                        // ("name" and "type" for fields) and **strip others** such as doc,
1027                        // default, order, and **aliases**. This preserves canonicalization. See:
1028                        // https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas
1029                        let field_type =
1030                            build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
1031                        Ok(format!(
1032                            r#"{{"name":{},"type":{}}}"#,
1033                            quote(f.name)?,
1034                            field_type
1035                        ))
1036                    })
1037                    .collect::<Result<Vec<_>, ArrowError>>()?
1038                    .join(",");
1039                format!(
1040                    r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
1041                    quote(&full_name)?,
1042                )
1043            }
1044            ComplexType::Enum(e) => {
1045                let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
1046                let symbols = e
1047                    .symbols
1048                    .iter()
1049                    .map(|s| quote(s))
1050                    .collect::<Result<Vec<_>, _>>()?
1051                    .join(",");
1052                format!(
1053                    r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
1054                    quote(&full_name)?
1055                )
1056            }
1057            ComplexType::Array(arr) => format!(
1058                r#"{{"type":"array","items":{}}}"#,
1059                build_canonical(&arr.items, enclosing_ns)?
1060            ),
1061            ComplexType::Map(map) => format!(
1062                r#"{{"type":"map","values":{}}}"#,
1063                build_canonical(&map.values, enclosing_ns)?
1064            ),
1065            ComplexType::Fixed(f) => {
1066                let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
1067                format!(
1068                    r#"{{"name":{},"type":"fixed","size":{}}}"#,
1069                    quote(&full_name)?,
1070                    f.size
1071                )
1072            }
1073        },
1074    })
1075}
1076
1077/// 64‑bit Rabin fingerprint as described in the Avro spec.
1078const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
1079
1080/// Build one entry of the polynomial‑division table.
1081///
1082/// We cannot yet write `for _ in 0..8` here: `for` loops rely on
1083/// `Iterator::next`, which is not `const` on stable Rust.  Until the
1084/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
1085/// loop is the only option in a `const fn`
1086const fn one_entry(i: usize) -> u64 {
1087    let mut fp = i as u64;
1088    let mut j = 0;
1089    while j < 8 {
1090        fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
1091        j += 1;
1092    }
1093    fp
1094}
1095
1096/// Build the full 256‑entry table at compile time.
1097///
1098/// We cannot yet write `for _ in 0..256` here: `for` loops rely on
1099/// `Iterator::next`, which is not `const` on stable Rust.  Until the
1100/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
1101/// loop is the only option in a `const fn`
1102const fn build_table() -> [u64; 256] {
1103    let mut table = [0u64; 256];
1104    let mut i = 0;
1105    while i < 256 {
1106        table[i] = one_entry(i);
1107        i += 1;
1108    }
1109    table
1110}
1111
1112/// The pre‑computed table.
1113static FINGERPRINT_TABLE: [u64; 256] = build_table();
1114
1115/// Computes the 64-bit Rabin fingerprint for a given canonical schema string.
1116/// This implementation is based on the Avro specification for schema fingerprinting.
1117pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1118    let mut fp = EMPTY;
1119    for &byte in canonical_form.as_bytes() {
1120        let idx = ((fp as u8) ^ byte) as usize;
1121        fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1122    }
1123    fp
1124}
1125
1126#[cfg(feature = "md5")]
1127/// Compute the **128‑bit MD5** fingerprint of the canonical form.
1128///
1129/// Returns a 16‑byte array (`[u8; 16]`) containing the full MD5 digest,
1130/// exactly as required by the Avro specification.
1131#[inline]
1132pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1133    let digest = md5::compute(canonical_form.as_bytes());
1134    digest.0
1135}
1136
1137#[cfg(feature = "sha256")]
1138/// Compute the **256‑bit SHA‑256** fingerprint of the canonical form.
1139///
1140/// Returns a 32‑byte array (`[u8; 32]`) containing the full SHA‑256 digest.
1141#[inline]
1142pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1143    let mut hasher = Sha256::new();
1144    hasher.update(canonical_form.as_bytes());
1145    let digest = hasher.finalize();
1146    digest.into()
1147}
1148
1149#[inline]
1150fn is_internal_arrow_key(key: &str) -> bool {
1151    key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1152}
1153
1154/// Copies Arrow schema metadata entries to the provided JSON map,
1155/// skipping keys that are Avro-reserved, internal Arrow keys, or
1156/// nested under the `avro.schema.` namespace. Values that parse as
1157/// JSON are inserted as JSON; otherwise the raw string is preserved.
1158fn extend_with_passthrough_metadata(
1159    target: &mut JsonMap<String, Value>,
1160    metadata: &HashMap<String, String>,
1161) {
1162    for (meta_key, meta_val) in metadata {
1163        if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1164            continue;
1165        }
1166        let json_val =
1167            serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1168        target.insert(meta_key.clone(), json_val);
1169    }
1170}
1171
1172// Sanitize an arbitrary string so it is a valid Avro field or type name
1173fn sanitise_avro_name(base_name: &str) -> String {
1174    if base_name.is_empty() {
1175        return "_".to_owned();
1176    }
1177    let mut out: String = base_name
1178        .chars()
1179        .map(|char| {
1180            if char.is_ascii_alphanumeric() || char == '_' {
1181                char
1182            } else {
1183                '_'
1184            }
1185        })
1186        .collect();
1187    if out.as_bytes()[0].is_ascii_digit() {
1188        out.insert(0, '_');
1189    }
1190    out
1191}
1192
1193#[derive(Default)]
1194struct NameGenerator {
1195    used: HashSet<String>,
1196    counters: HashMap<String, usize>,
1197}
1198
1199impl NameGenerator {
1200    fn make_unique(&mut self, field_name: &str) -> String {
1201        let field_name = sanitise_avro_name(field_name);
1202        if self.used.insert(field_name.clone()) {
1203            self.counters.insert(field_name.clone(), 1);
1204            return field_name;
1205        }
1206        let counter = self.counters.entry(field_name.clone()).or_insert(1);
1207        loop {
1208            let candidate = format!("{field_name}_{}", *counter);
1209            if self.used.insert(candidate.clone()) {
1210                return candidate;
1211            }
1212            *counter += 1;
1213        }
1214    }
1215}
1216
1217fn merge_extras(schema: Value, extras: JsonMap<String, Value>) -> Value {
1218    if extras.is_empty() {
1219        return schema;
1220    }
1221    match schema {
1222        Value::Object(mut map) => {
1223            map.extend(extras);
1224            Value::Object(map)
1225        }
1226        Value::Array(mut union) => {
1227            // For unions, we cannot attach attributes to the array itself (per Avro spec).
1228            // As a fallback for extension metadata, attach extras to the first non-null branch object.
1229            if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1230                let original = std::mem::take(non_null);
1231                *non_null = merge_extras(original, extras);
1232            }
1233            Value::Array(union)
1234        }
1235        primitive => {
1236            let mut map = JsonMap::with_capacity(extras.len() + 1);
1237            map.insert("type".into(), primitive);
1238            map.extend(extras);
1239            Value::Object(map)
1240        }
1241    }
1242}
1243
1244#[inline]
1245fn is_avro_json_null(v: &Value) -> bool {
1246    matches!(v, Value::String(s) if s == "null")
1247}
1248
1249fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1250    let null = Value::String("null".into());
1251    match inner {
1252        Value::Array(mut union) => {
1253            // If this site is already a union and already contains "null",
1254            // preserve the branch order exactly. Reordering "null" breaks
1255            // the correspondence between Arrow union child order (type_ids)
1256            // and the Avro branch index written on the wire.
1257            if union.iter().any(is_avro_json_null) {
1258                return Value::Array(union);
1259            }
1260            // Otherwise, inject "null" without reordering existing branches.
1261            match null_order {
1262                Nullability::NullFirst => union.insert(0, null),
1263                Nullability::NullSecond => union.push(null),
1264            }
1265            Value::Array(union)
1266        }
1267        other => match null_order {
1268            Nullability::NullFirst => Value::Array(vec![null, other]),
1269            Nullability::NullSecond => Value::Array(vec![other, null]),
1270        },
1271    }
1272}
1273
1274fn min_fixed_bytes_for_precision(p: usize) -> usize {
1275    // From the spec: max precision for n=1..=32 bytes:
1276    // [2,4,6,9,11,14,16,18,21,23,26,28,31,33,35,38,40,43,45,47,50,52,55,57,59,62,64,67,69,71,74,76]
1277    const MAX_P: [usize; 32] = [
1278        2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1279        59, 62, 64, 67, 69, 71, 74, 76,
1280    ];
1281    for (i, &max_p) in MAX_P.iter().enumerate() {
1282        if p <= max_p {
1283            return i + 1;
1284        }
1285    }
1286    32 // saturate at Decimal256
1287}
1288
1289fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1290    match branch {
1291        Value::String(t) => Ok(format!("P:{t}")),
1292        Value::Object(map) => {
1293            let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1294                ArrowError::SchemaError("Union branch object missing string 'type'".into())
1295            })?;
1296            match t {
1297                "record" | "enum" | "fixed" => {
1298                    let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1299                        ArrowError::SchemaError(format!(
1300                            "Union branch '{t}' missing required 'name'"
1301                        ))
1302                    })?;
1303                    Ok(format!("N:{t}:{name}"))
1304                }
1305                "array" | "map" => Ok(format!("C:{t}")),
1306                other => Ok(format!("P:{other}")),
1307            }
1308        }
1309        Value::Array(_) => Err(ArrowError::SchemaError(
1310            "Avro union may not immediately contain another union".into(),
1311        )),
1312        _ => Err(ArrowError::SchemaError(
1313            "Invalid JSON for Avro union branch".into(),
1314        )),
1315    }
1316}
1317
1318fn datatype_to_avro(
1319    dt: &DataType,
1320    field_name: &str,
1321    metadata: &HashMap<String, String>,
1322    name_gen: &mut NameGenerator,
1323    null_order: Nullability,
1324    strip: bool,
1325) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1326    let mut extras = JsonMap::new();
1327    let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1328        if *scale < 0 {
1329            return Err(ArrowError::SchemaError(format!(
1330                "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1331            )));
1332        }
1333        if (*scale as usize) > (*precision as usize) {
1334            return Err(ArrowError::SchemaError(format!(
1335                "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1336                 must be <= precision ({precision})"
1337            )));
1338        }
1339        let mut meta = JsonMap::from_iter([
1340            ("logicalType".into(), json!("decimal")),
1341            ("precision".into(), json!(*precision)),
1342            ("scale".into(), json!(*scale)),
1343        ]);
1344        let mut fixed_size = metadata.get("size").and_then(|v| v.parse::<usize>().ok());
1345        let carries_name = metadata.contains_key(AVRO_NAME_METADATA_KEY)
1346            || metadata.contains_key(AVRO_NAMESPACE_METADATA_KEY);
1347        if fixed_size.is_none() && carries_name {
1348            fixed_size = Some(min_fixed_bytes_for_precision(*precision as usize));
1349        }
1350        if let Some(size) = fixed_size {
1351            meta.insert("type".into(), json!("fixed"));
1352            meta.insert("size".into(), json!(size));
1353            let chosen_name = metadata
1354                .get(AVRO_NAME_METADATA_KEY)
1355                .map(|s| sanitise_avro_name(s))
1356                .unwrap_or_else(|| name_gen.make_unique(field_name));
1357            meta.insert("name".into(), json!(chosen_name));
1358            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1359                meta.insert("namespace".into(), json!(ns));
1360            }
1361        } else {
1362            // default to bytes-backed decimal
1363            meta.insert("type".into(), json!("bytes"));
1364        }
1365        Ok(Value::Object(meta))
1366    };
1367    let val = match dt {
1368        DataType::Null => Value::String("null".into()),
1369        DataType::Boolean => Value::String("boolean".into()),
1370        #[cfg(not(feature = "avro_custom_types"))]
1371        DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 => {
1372            Value::String("int".into())
1373        }
1374        DataType::Int32 => Value::String("int".into()),
1375        #[cfg(feature = "avro_custom_types")]
1376        DataType::Int8 => json!({ "type": "int", "logicalType": "arrow.int8" }),
1377        #[cfg(feature = "avro_custom_types")]
1378        DataType::Int16 => json!({ "type": "int", "logicalType": "arrow.int16" }),
1379        #[cfg(feature = "avro_custom_types")]
1380        DataType::UInt8 => json!({ "type": "int", "logicalType": "arrow.uint8" }),
1381        #[cfg(feature = "avro_custom_types")]
1382        DataType::UInt16 => json!({ "type": "int", "logicalType": "arrow.uint16" }),
1383        #[cfg(not(feature = "avro_custom_types"))]
1384        DataType::UInt32 => Value::String("long".into()),
1385        #[cfg(feature = "avro_custom_types")]
1386        DataType::UInt32 => json!({ "type": "long", "logicalType": "arrow.uint32" }),
1387        DataType::Int64 => Value::String("long".into()),
1388        #[cfg(not(feature = "avro_custom_types"))]
1389        DataType::UInt64 => Value::String("long".into()),
1390        #[cfg(feature = "avro_custom_types")]
1391        DataType::UInt64 => {
1392            // UInt64 must use fixed(8) to avoid overflow
1393            let chosen_name = metadata
1394                .get(AVRO_NAME_METADATA_KEY)
1395                .map(|s| sanitise_avro_name(s))
1396                .unwrap_or_else(|| name_gen.make_unique(field_name));
1397            let mut obj = JsonMap::from_iter([
1398                ("type".into(), json!("fixed")),
1399                ("name".into(), json!(chosen_name)),
1400                ("size".into(), json!(8)),
1401                ("logicalType".into(), json!("arrow.uint64")),
1402            ]);
1403            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1404                obj.insert("namespace".into(), json!(ns));
1405            }
1406            json!(obj)
1407        }
1408        #[cfg(not(feature = "avro_custom_types"))]
1409        DataType::Float16 => Value::String("float".into()),
1410        #[cfg(feature = "avro_custom_types")]
1411        DataType::Float16 => {
1412            // Float16 uses fixed(2) for IEEE-754 bits
1413            let chosen_name = metadata
1414                .get(AVRO_NAME_METADATA_KEY)
1415                .map(|s| sanitise_avro_name(s))
1416                .unwrap_or_else(|| name_gen.make_unique(field_name));
1417            let mut obj = JsonMap::from_iter([
1418                ("type".into(), json!("fixed")),
1419                ("name".into(), json!(chosen_name)),
1420                ("size".into(), json!(2)),
1421                ("logicalType".into(), json!("arrow.float16")),
1422            ]);
1423            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1424                obj.insert("namespace".into(), json!(ns));
1425            }
1426            json!(obj)
1427        }
1428        DataType::Float32 => Value::String("float".into()),
1429        DataType::Float64 => Value::String("double".into()),
1430        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1431        DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1432        DataType::BinaryView => {
1433            if !strip {
1434                extras.insert("arrowBinaryView".into(), Value::Bool(true));
1435            }
1436            Value::String("bytes".into())
1437        }
1438        DataType::FixedSizeBinary(len) => {
1439            let md_is_uuid = metadata
1440                .get("logicalType")
1441                .map(|s| s.trim_matches('"') == "uuid")
1442                .unwrap_or(false);
1443            #[cfg(feature = "canonical_extension_types")]
1444            let ext_is_uuid = metadata
1445                .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
1446                .map(|v| v == arrow_schema::extension::Uuid::NAME || v == "uuid")
1447                .unwrap_or(false);
1448            #[cfg(not(feature = "canonical_extension_types"))]
1449            let ext_is_uuid = false;
1450            let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
1451            if is_uuid {
1452                json!({ "type": "string", "logicalType": "uuid" })
1453            } else {
1454                let chosen_name = metadata
1455                    .get(AVRO_NAME_METADATA_KEY)
1456                    .map(|s| sanitise_avro_name(s))
1457                    .unwrap_or_else(|| name_gen.make_unique(field_name));
1458                let mut obj = JsonMap::from_iter([
1459                    ("type".into(), json!("fixed")),
1460                    ("name".into(), json!(chosen_name)),
1461                    ("size".into(), json!(len)),
1462                ]);
1463                if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1464                    obj.insert("namespace".into(), json!(ns));
1465                }
1466                Value::Object(obj)
1467            }
1468        }
1469        #[cfg(feature = "small_decimals")]
1470        DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1471            handle_decimal(precision, scale)?
1472        }
1473        DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1474            handle_decimal(precision, scale)?
1475        }
1476        DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1477        #[cfg(not(feature = "avro_custom_types"))]
1478        DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1479        #[cfg(feature = "avro_custom_types")]
1480        DataType::Date64 => json!({ "type": "long", "logicalType": "arrow.date64" }),
1481        DataType::Time32(unit) => match unit {
1482            TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1483            #[cfg(not(feature = "avro_custom_types"))]
1484            TimeUnit::Second => {
1485                // Encoder converts seconds to milliseconds, so use time-millis
1486                if !strip {
1487                    extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1488                }
1489                json!({ "type": "int", "logicalType": "time-millis" })
1490            }
1491            #[cfg(feature = "avro_custom_types")]
1492            TimeUnit::Second => {
1493                json!({ "type": "int", "logicalType": "arrow.time32-second" })
1494            }
1495            _ => Value::String("int".into()),
1496        },
1497        DataType::Time64(unit) => match unit {
1498            TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1499            #[cfg(not(feature = "avro_custom_types"))]
1500            TimeUnit::Nanosecond => {
1501                // Encoder truncates nanoseconds to microseconds, so use time-micros
1502                if !strip {
1503                    extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1504                }
1505                json!({ "type": "long", "logicalType": "time-micros" })
1506            }
1507            #[cfg(feature = "avro_custom_types")]
1508            TimeUnit::Nanosecond => {
1509                json!({ "type": "long", "logicalType": "arrow.time64-nanosecond" })
1510            }
1511            _ => Value::String("long".into()),
1512        },
1513        DataType::Timestamp(unit, tz) => {
1514            #[cfg(feature = "avro_custom_types")]
1515            if matches!(unit, TimeUnit::Second) {
1516                let logical_type = if tz.is_some() {
1517                    "arrow.timestamp-second"
1518                } else {
1519                    "arrow.local-timestamp-second"
1520                };
1521                return Ok((
1522                    json!({ "type": "long", "logicalType": logical_type }),
1523                    extras,
1524                ));
1525            }
1526            let logical_type = match (unit, tz.is_some()) {
1527                (TimeUnit::Millisecond, true) => "timestamp-millis",
1528                (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1529                (TimeUnit::Microsecond, true) => "timestamp-micros",
1530                (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1531                (TimeUnit::Nanosecond, true) => "timestamp-nanos",
1532                (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
1533                (TimeUnit::Second, has_tz) => {
1534                    // Encoder converts seconds to milliseconds, so use timestamp-millis
1535                    if !strip {
1536                        extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1537                    }
1538                    let ts_logical_type = if has_tz {
1539                        "timestamp-millis"
1540                    } else {
1541                        "local-timestamp-millis"
1542                    };
1543                    return Ok((
1544                        json!({ "type": "long", "logicalType": ts_logical_type }),
1545                        extras,
1546                    ));
1547                }
1548            };
1549            if !strip && matches!(unit, TimeUnit::Nanosecond) {
1550                extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1551            }
1552            json!({ "type": "long", "logicalType": logical_type })
1553        }
1554        #[cfg(not(feature = "avro_custom_types"))]
1555        DataType::Duration(_unit) => Value::String("long".into()),
1556        #[cfg(feature = "avro_custom_types")]
1557        DataType::Duration(unit) => {
1558            // When the feature is enabled, create an Avro schema object
1559            // with the correct `logicalType` annotation.
1560            let logical_type = match unit {
1561                TimeUnit::Second => "arrow.duration-seconds",
1562                TimeUnit::Millisecond => "arrow.duration-millis",
1563                TimeUnit::Microsecond => "arrow.duration-micros",
1564                TimeUnit::Nanosecond => "arrow.duration-nanos",
1565            };
1566            json!({ "type": "long", "logicalType": logical_type })
1567        }
1568        #[cfg(not(feature = "avro_custom_types"))]
1569        DataType::Interval(IntervalUnit::MonthDayNano) => {
1570            // Avro duration logical type: fixed(12) with months/days/millis per spec.
1571            let chosen_name = metadata
1572                .get(AVRO_NAME_METADATA_KEY)
1573                .map(|s| sanitise_avro_name(s))
1574                .unwrap_or_else(|| name_gen.make_unique(field_name));
1575            let mut obj = JsonMap::from_iter([
1576                ("type".into(), json!("fixed")),
1577                ("name".into(), json!(chosen_name)),
1578                ("size".into(), json!(12)),
1579                ("logicalType".into(), json!("duration")),
1580            ]);
1581            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1582                obj.insert("namespace".into(), json!(ns));
1583            }
1584            json!(obj)
1585        }
1586        #[cfg(feature = "avro_custom_types")]
1587        DataType::Interval(IntervalUnit::MonthDayNano) => {
1588            // Arrow MonthDayNano interval: i32 months + i32 days + i64 nanos (16 bytes).
1589            // We preserve the Arrow native representation via a custom logical type.
1590            let chosen_name = metadata
1591                .get(AVRO_NAME_METADATA_KEY)
1592                .map(|s| sanitise_avro_name(s))
1593                .unwrap_or_else(|| name_gen.make_unique(field_name));
1594            let mut obj = JsonMap::from_iter([
1595                ("type".into(), json!("fixed")),
1596                ("name".into(), json!(chosen_name)),
1597                ("size".into(), json!(16)),
1598                ("logicalType".into(), json!("arrow.interval-month-day-nano")),
1599            ]);
1600            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1601                obj.insert("namespace".into(), json!(ns));
1602            }
1603            json!(obj)
1604        }
1605        #[cfg(not(feature = "avro_custom_types"))]
1606        DataType::Interval(IntervalUnit::YearMonth) => {
1607            // Encode as Avro `duration` (fixed(12)) like MonthDayNano
1608            let chosen_name = metadata
1609                .get(AVRO_NAME_METADATA_KEY)
1610                .map(|s| sanitise_avro_name(s))
1611                .unwrap_or_else(|| name_gen.make_unique(field_name));
1612            let mut extras = JsonMap::from_iter([
1613                ("type".into(), json!("fixed")),
1614                ("name".into(), json!(chosen_name)),
1615                ("size".into(), json!(12)),
1616                ("logicalType".into(), json!("duration")),
1617            ]);
1618            if !strip {
1619                extras.insert(
1620                    "arrowIntervalUnit".into(),
1621                    Value::String("yearmonth".into()),
1622                );
1623            }
1624            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1625                extras.insert("namespace".into(), json!(ns));
1626            }
1627            json!(extras)
1628        }
1629        #[cfg(feature = "avro_custom_types")]
1630        DataType::Interval(IntervalUnit::YearMonth) => {
1631            let chosen_name = metadata
1632                .get(AVRO_NAME_METADATA_KEY)
1633                .map(|s| sanitise_avro_name(s))
1634                .unwrap_or_else(|| name_gen.make_unique(field_name));
1635            let mut obj = JsonMap::from_iter([
1636                ("type".into(), json!("fixed")),
1637                ("name".into(), json!(chosen_name)),
1638                ("size".into(), json!(4)),
1639                ("logicalType".into(), json!("arrow.interval-year-month")),
1640            ]);
1641            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1642                obj.insert("namespace".into(), json!(ns));
1643            }
1644            json!(obj)
1645        }
1646        #[cfg(not(feature = "avro_custom_types"))]
1647        DataType::Interval(IntervalUnit::DayTime) => {
1648            // Encode as Avro `duration` (fixed(12)) like MonthDayNano
1649            let chosen_name = metadata
1650                .get(AVRO_NAME_METADATA_KEY)
1651                .map(|s| sanitise_avro_name(s))
1652                .unwrap_or_else(|| name_gen.make_unique(field_name));
1653            let mut obj = JsonMap::from_iter([
1654                ("type".into(), json!("fixed")),
1655                ("name".into(), json!(chosen_name)),
1656                ("size".into(), json!(12)),
1657                ("logicalType".into(), json!("duration")),
1658            ]);
1659            if !strip {
1660                obj.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1661            }
1662            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1663                obj.insert("namespace".into(), json!(ns));
1664            }
1665            json!(obj)
1666        }
1667        #[cfg(feature = "avro_custom_types")]
1668        DataType::Interval(IntervalUnit::DayTime) => {
1669            let chosen_name = metadata
1670                .get(AVRO_NAME_METADATA_KEY)
1671                .map(|s| sanitise_avro_name(s))
1672                .unwrap_or_else(|| name_gen.make_unique(field_name));
1673            let mut obj = JsonMap::from_iter([
1674                ("type".into(), json!("fixed")),
1675                ("name".into(), json!(chosen_name)),
1676                ("size".into(), json!(8)),
1677                ("logicalType".into(), json!("arrow.interval-day-time")),
1678            ]);
1679            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1680                obj.insert("namespace".into(), json!(ns));
1681            }
1682            json!(obj)
1683        }
1684        DataType::List(child) | DataType::LargeList(child) => {
1685            if matches!(dt, DataType::LargeList(_)) && !strip {
1686                extras.insert("arrowLargeList".into(), Value::Bool(true));
1687            }
1688            let items_schema = process_datatype(
1689                child.data_type(),
1690                child.name(),
1691                child.metadata(),
1692                name_gen,
1693                null_order,
1694                child.is_nullable(),
1695                strip,
1696            )?;
1697            json!({
1698                "type": "array",
1699                "items": items_schema
1700            })
1701        }
1702        DataType::ListView(child) | DataType::LargeListView(child) => {
1703            if matches!(dt, DataType::LargeListView(_)) && !strip {
1704                extras.insert("arrowLargeList".into(), Value::Bool(true));
1705            }
1706            if !strip {
1707                extras.insert("arrowListView".into(), Value::Bool(true));
1708            }
1709            let items_schema = process_datatype(
1710                child.data_type(),
1711                child.name(),
1712                child.metadata(),
1713                name_gen,
1714                null_order,
1715                child.is_nullable(),
1716                strip,
1717            )?;
1718            json!({
1719                "type": "array",
1720                "items": items_schema
1721            })
1722        }
1723        DataType::FixedSizeList(child, len) => {
1724            if !strip {
1725                extras.insert("arrowFixedSize".into(), json!(len));
1726            }
1727            let items_schema = process_datatype(
1728                child.data_type(),
1729                child.name(),
1730                child.metadata(),
1731                name_gen,
1732                null_order,
1733                child.is_nullable(),
1734                strip,
1735            )?;
1736            json!({
1737                "type": "array",
1738                "items": items_schema
1739            })
1740        }
1741        DataType::Map(entries, _) => {
1742            let value_field = match entries.data_type() {
1743                DataType::Struct(fs) => &fs[1],
1744                _ => {
1745                    return Err(ArrowError::SchemaError(
1746                        "Map 'entries' field must be Struct(key,value)".into(),
1747                    ));
1748                }
1749            };
1750            let values_schema = process_datatype(
1751                value_field.data_type(),
1752                value_field.name(),
1753                value_field.metadata(),
1754                name_gen,
1755                null_order,
1756                value_field.is_nullable(),
1757                strip,
1758            )?;
1759            json!({
1760                "type": "map",
1761                "values": values_schema
1762            })
1763        }
1764        DataType::Struct(fields) => {
1765            let avro_fields = fields
1766                .iter()
1767                .map(|field| arrow_field_to_avro(field, name_gen, null_order, strip))
1768                .collect::<Result<Vec<_>, _>>()?;
1769            // Prefer avro.name/avro.namespace when provided on the struct field metadata
1770            let chosen_name = metadata
1771                .get(AVRO_NAME_METADATA_KEY)
1772                .map(|s| sanitise_avro_name(s))
1773                .unwrap_or_else(|| name_gen.make_unique(field_name));
1774            let mut obj = JsonMap::from_iter([
1775                ("type".into(), json!("record")),
1776                ("name".into(), json!(chosen_name)),
1777                ("fields".into(), Value::Array(avro_fields)),
1778            ]);
1779            if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1780                obj.insert("namespace".into(), json!(ns));
1781            }
1782            Value::Object(obj)
1783        }
1784        DataType::Dictionary(_, value) => {
1785            if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1786                let symbols: Vec<&str> =
1787                    serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1788                // Prefer avro.name/namespace when provided for enums
1789                let chosen_name = metadata
1790                    .get(AVRO_NAME_METADATA_KEY)
1791                    .map(|s| sanitise_avro_name(s))
1792                    .unwrap_or_else(|| name_gen.make_unique(field_name));
1793                let mut obj = JsonMap::from_iter([
1794                    ("type".into(), json!("enum")),
1795                    ("name".into(), json!(chosen_name)),
1796                    ("symbols".into(), json!(symbols)),
1797                ]);
1798                if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1799                    obj.insert("namespace".into(), json!(ns));
1800                }
1801                Value::Object(obj)
1802            } else {
1803                process_datatype(
1804                    value.as_ref(),
1805                    field_name,
1806                    metadata,
1807                    name_gen,
1808                    null_order,
1809                    false,
1810                    strip,
1811                )?
1812            }
1813        }
1814        #[cfg(feature = "avro_custom_types")]
1815        DataType::RunEndEncoded(run_ends, values) => {
1816            let bits = match run_ends.data_type() {
1817                DataType::Int16 => 16,
1818                DataType::Int32 => 32,
1819                DataType::Int64 => 64,
1820                other => {
1821                    return Err(ArrowError::SchemaError(format!(
1822                        "RunEndEncoded requires Int16/Int32/Int64 for run_ends, found: {other:?}"
1823                    )));
1824                }
1825            };
1826            // Build the value site schema, preserving its own nullability
1827            let (value_schema, value_extras) = datatype_to_avro(
1828                values.data_type(),
1829                values.name(),
1830                values.metadata(),
1831                name_gen,
1832                null_order,
1833                strip,
1834            )?;
1835            let mut merged = merge_extras(value_schema, value_extras);
1836            if values.is_nullable() {
1837                merged = wrap_nullable(merged, null_order);
1838            }
1839            let mut extras = JsonMap::new();
1840            extras.insert("logicalType".into(), json!("arrow.run-end-encoded"));
1841            extras.insert("arrow.runEndIndexBits".into(), json!(bits));
1842            return Ok((merged, extras));
1843        }
1844        #[cfg(not(feature = "avro_custom_types"))]
1845        DataType::RunEndEncoded(_run_ends, values) => {
1846            let (value_schema, _extras) = datatype_to_avro(
1847                values.data_type(),
1848                values.name(),
1849                values.metadata(),
1850                name_gen,
1851                null_order,
1852                strip,
1853            )?;
1854            return Ok((value_schema, JsonMap::new()));
1855        }
1856        DataType::Union(fields, mode) => {
1857            let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1858            let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1859            for (type_id, field_ref) in fields.iter() {
1860                // NOTE: `process_datatype` would wrap nullability; force is_nullable=false here.
1861                let (branch_schema, _branch_extras) = datatype_to_avro(
1862                    field_ref.data_type(),
1863                    field_ref.name(),
1864                    field_ref.metadata(),
1865                    name_gen,
1866                    null_order,
1867                    strip,
1868                )?;
1869                // Avro unions cannot immediately contain another union
1870                if matches!(branch_schema, Value::Array(_)) {
1871                    return Err(ArrowError::SchemaError(
1872                        "Avro union may not immediately contain another union".into(),
1873                    ));
1874                }
1875                branches.push(branch_schema);
1876                type_ids.push(type_id as i32);
1877            }
1878            let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1879            for b in &branches {
1880                let sig = union_branch_signature(b)?;
1881                if !seen.insert(sig) {
1882                    return Err(ArrowError::SchemaError(
1883                        "Avro union contains duplicate branch types (disallowed by spec)".into(),
1884                    ));
1885                }
1886            }
1887            if !strip {
1888                extras.insert(
1889                    "arrowUnionMode".into(),
1890                    Value::String(
1891                        match mode {
1892                            UnionMode::Sparse => "sparse",
1893                            UnionMode::Dense => "dense",
1894                        }
1895                        .to_string(),
1896                    ),
1897                );
1898                extras.insert(
1899                    "arrowUnionTypeIds".into(),
1900                    Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1901                );
1902            }
1903            Value::Array(branches)
1904        }
1905        #[cfg(not(feature = "small_decimals"))]
1906        other => {
1907            return Err(ArrowError::NotYetImplemented(format!(
1908                "Arrow type {other:?} has no Avro representation"
1909            )));
1910        }
1911    };
1912    Ok((val, extras))
1913}
1914
1915fn process_datatype(
1916    dt: &DataType,
1917    field_name: &str,
1918    metadata: &HashMap<String, String>,
1919    name_gen: &mut NameGenerator,
1920    null_order: Nullability,
1921    is_nullable: bool,
1922    strip: bool,
1923) -> Result<Value, ArrowError> {
1924    let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order, strip)?;
1925    let mut merged = merge_extras(schema, extras);
1926    if is_nullable {
1927        merged = wrap_nullable(merged, null_order)
1928    }
1929    Ok(merged)
1930}
1931
1932fn arrow_field_to_avro(
1933    field: &ArrowField,
1934    name_gen: &mut NameGenerator,
1935    null_order: Nullability,
1936    strip: bool,
1937) -> Result<Value, ArrowError> {
1938    let avro_name = sanitise_avro_name(field.name());
1939    let schema_value = process_datatype(
1940        field.data_type(),
1941        &avro_name,
1942        field.metadata(),
1943        name_gen,
1944        null_order,
1945        field.is_nullable(),
1946        strip,
1947    )?;
1948    // Build the field map
1949    let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1950    map.insert("name".into(), Value::String(avro_name));
1951    map.insert("type".into(), schema_value);
1952    // Transfer selected metadata
1953    for (meta_key, meta_val) in field.metadata() {
1954        if is_internal_arrow_key(meta_key) {
1955            continue;
1956        }
1957        match meta_key.as_str() {
1958            AVRO_DOC_METADATA_KEY => {
1959                map.insert("doc".into(), Value::String(meta_val.clone()));
1960            }
1961            AVRO_FIELD_DEFAULT_METADATA_KEY => {
1962                let default_value = serde_json::from_str(meta_val)
1963                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1964                map.insert("default".into(), default_value);
1965            }
1966            _ => {
1967                let json_val = serde_json::from_str(meta_val)
1968                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
1969                map.insert(meta_key.clone(), json_val);
1970            }
1971        }
1972    }
1973    Ok(Value::Object(map))
1974}
1975
1976#[cfg(test)]
1977mod tests {
1978    use super::*;
1979    use crate::codec::{AvroField, AvroFieldBuilder};
1980    use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1981    use serde_json::json;
1982    use std::sync::Arc;
1983
1984    fn int_schema() -> Schema<'static> {
1985        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1986    }
1987
1988    fn record_schema() -> Schema<'static> {
1989        Schema::Complex(ComplexType::Record(Record {
1990            name: "record1",
1991            namespace: Some("test.namespace"),
1992            doc: Some(Cow::from("A test record")),
1993            aliases: vec![],
1994            fields: vec![
1995                Field {
1996                    name: "field1",
1997                    doc: Some(Cow::from("An integer field")),
1998                    r#type: int_schema(),
1999                    default: None,
2000                    aliases: vec![],
2001                },
2002                Field {
2003                    name: "field2",
2004                    doc: None,
2005                    r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2006                    default: None,
2007                    aliases: vec![],
2008                },
2009            ],
2010            attributes: Attributes::default(),
2011        }))
2012    }
2013
2014    fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
2015        let mut sb = SchemaBuilder::new();
2016        sb.push(field);
2017        sb.finish()
2018    }
2019
2020    fn assert_json_contains(avro_json: &str, needle: &str) {
2021        assert!(
2022            avro_json.contains(needle),
2023            "JSON did not contain `{needle}` : {avro_json}"
2024        )
2025    }
2026
2027    #[test]
2028    fn test_deserialize() {
2029        let t: Schema = serde_json::from_str("\"string\"").unwrap();
2030        assert_eq!(
2031            t,
2032            Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
2033        );
2034
2035        let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
2036        assert_eq!(
2037            t,
2038            Schema::Union(vec![
2039                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2040                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2041            ])
2042        );
2043
2044        let t: Type = serde_json::from_str(
2045            r#"{
2046                   "type":"long",
2047                   "logicalType":"timestamp-micros"
2048                }"#,
2049        )
2050        .unwrap();
2051
2052        let timestamp = Type {
2053            r#type: TypeName::Primitive(PrimitiveType::Long),
2054            attributes: Attributes {
2055                logical_type: Some("timestamp-micros"),
2056                additional: Default::default(),
2057            },
2058        };
2059
2060        assert_eq!(t, timestamp);
2061
2062        let t: ComplexType = serde_json::from_str(
2063            r#"{
2064                   "type":"fixed",
2065                   "name":"fixed",
2066                   "namespace":"topLevelRecord.value",
2067                   "size":11,
2068                   "logicalType":"decimal",
2069                   "precision":25,
2070                   "scale":2
2071                }"#,
2072        )
2073        .unwrap();
2074
2075        let decimal = ComplexType::Fixed(Fixed {
2076            name: "fixed",
2077            namespace: Some("topLevelRecord.value"),
2078            aliases: vec![],
2079            size: 11,
2080            attributes: Attributes {
2081                logical_type: Some("decimal"),
2082                additional: vec![("precision", json!(25)), ("scale", json!(2))]
2083                    .into_iter()
2084                    .collect(),
2085            },
2086        });
2087
2088        assert_eq!(t, decimal);
2089
2090        let schema: Schema = serde_json::from_str(
2091            r#"{
2092               "type":"record",
2093               "name":"topLevelRecord",
2094               "fields":[
2095                  {
2096                     "name":"value",
2097                     "type":[
2098                        {
2099                           "type":"fixed",
2100                           "name":"fixed",
2101                           "namespace":"topLevelRecord.value",
2102                           "size":11,
2103                           "logicalType":"decimal",
2104                           "precision":25,
2105                           "scale":2
2106                        },
2107                        "null"
2108                     ]
2109                  }
2110               ]
2111            }"#,
2112        )
2113        .unwrap();
2114
2115        assert_eq!(
2116            schema,
2117            Schema::Complex(ComplexType::Record(Record {
2118                name: "topLevelRecord",
2119                namespace: None,
2120                doc: None,
2121                aliases: vec![],
2122                fields: vec![Field {
2123                    name: "value",
2124                    doc: None,
2125                    r#type: Schema::Union(vec![
2126                        Schema::Complex(decimal),
2127                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2128                    ]),
2129                    default: None,
2130                    aliases: vec![],
2131                },],
2132                attributes: Default::default(),
2133            }))
2134        );
2135
2136        let schema: Schema = serde_json::from_str(
2137            r#"{
2138                  "type": "record",
2139                  "name": "LongList",
2140                  "aliases": ["LinkedLongs"],
2141                  "fields" : [
2142                    {"name": "value", "type": "long"},
2143                    {"name": "next", "type": ["null", "LongList"]}
2144                  ]
2145                }"#,
2146        )
2147        .unwrap();
2148
2149        assert_eq!(
2150            schema,
2151            Schema::Complex(ComplexType::Record(Record {
2152                name: "LongList",
2153                namespace: None,
2154                doc: None,
2155                aliases: vec!["LinkedLongs"],
2156                fields: vec![
2157                    Field {
2158                        name: "value",
2159                        doc: None,
2160                        r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2161                        default: None,
2162                        aliases: vec![],
2163                    },
2164                    Field {
2165                        name: "next",
2166                        doc: None,
2167                        r#type: Schema::Union(vec![
2168                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2169                            Schema::TypeName(TypeName::Ref("LongList")),
2170                        ]),
2171                        default: None,
2172                        aliases: vec![],
2173                    }
2174                ],
2175                attributes: Attributes::default(),
2176            }))
2177        );
2178
2179        // Recursive schema are not supported
2180        let err = AvroField::try_from(&schema).unwrap_err().to_string();
2181        assert_eq!(err, "Parser error: Failed to resolve .LongList");
2182
2183        let schema: Schema = serde_json::from_str(
2184            r#"{
2185               "type":"record",
2186               "name":"topLevelRecord",
2187               "fields":[
2188                  {
2189                     "name":"id",
2190                     "type":[
2191                        "int",
2192                        "null"
2193                     ]
2194                  },
2195                  {
2196                     "name":"timestamp_col",
2197                     "type":[
2198                        {
2199                           "type":"long",
2200                           "logicalType":"timestamp-micros"
2201                        },
2202                        "null"
2203                     ]
2204                  }
2205               ]
2206            }"#,
2207        )
2208        .unwrap();
2209
2210        assert_eq!(
2211            schema,
2212            Schema::Complex(ComplexType::Record(Record {
2213                name: "topLevelRecord",
2214                namespace: None,
2215                doc: None,
2216                aliases: vec![],
2217                fields: vec![
2218                    Field {
2219                        name: "id",
2220                        doc: None,
2221                        r#type: Schema::Union(vec![
2222                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2223                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2224                        ]),
2225                        default: None,
2226                        aliases: vec![],
2227                    },
2228                    Field {
2229                        name: "timestamp_col",
2230                        doc: None,
2231                        r#type: Schema::Union(vec![
2232                            Schema::Type(timestamp),
2233                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2234                        ]),
2235                        default: None,
2236                        aliases: vec![],
2237                    }
2238                ],
2239                attributes: Default::default(),
2240            }))
2241        );
2242        let codec = AvroField::try_from(&schema).unwrap();
2243        let expected_arrow_field = arrow_schema::Field::new(
2244            "topLevelRecord",
2245            DataType::Struct(Fields::from(vec![
2246                arrow_schema::Field::new("id", DataType::Int32, true),
2247                arrow_schema::Field::new(
2248                    "timestamp_col",
2249                    DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2250                    true,
2251                ),
2252            ])),
2253            false,
2254        )
2255        .with_metadata(std::collections::HashMap::from([(
2256            AVRO_NAME_METADATA_KEY.to_string(),
2257            "topLevelRecord".to_string(),
2258        )]));
2259
2260        assert_eq!(codec.field(), expected_arrow_field);
2261
2262        let schema: Schema = serde_json::from_str(
2263            r#"{
2264                  "type": "record",
2265                  "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
2266                  "fields": [
2267                    {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
2268                    {"name": "clientProtocol", "type": ["null", "string"]},
2269                    {"name": "serverHash", "type": "MD5"},
2270                    {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
2271                  ]
2272            }"#,
2273        )
2274        .unwrap();
2275
2276        assert_eq!(
2277            schema,
2278            Schema::Complex(ComplexType::Record(Record {
2279                name: "HandshakeRequest",
2280                namespace: Some("org.apache.avro.ipc"),
2281                doc: None,
2282                aliases: vec![],
2283                fields: vec![
2284                    Field {
2285                        name: "clientHash",
2286                        doc: None,
2287                        r#type: Schema::Complex(ComplexType::Fixed(Fixed {
2288                            name: "MD5",
2289                            namespace: None,
2290                            aliases: vec![],
2291                            size: 16,
2292                            attributes: Default::default(),
2293                        })),
2294                        default: None,
2295                        aliases: vec![],
2296                    },
2297                    Field {
2298                        name: "clientProtocol",
2299                        doc: None,
2300                        r#type: Schema::Union(vec![
2301                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2302                            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2303                        ]),
2304                        default: None,
2305                        aliases: vec![],
2306                    },
2307                    Field {
2308                        name: "serverHash",
2309                        doc: None,
2310                        r#type: Schema::TypeName(TypeName::Ref("MD5")),
2311                        default: None,
2312                        aliases: vec![],
2313                    },
2314                    Field {
2315                        name: "meta",
2316                        doc: None,
2317                        r#type: Schema::Union(vec![
2318                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2319                            Schema::Complex(ComplexType::Map(Map {
2320                                values: Box::new(Schema::TypeName(TypeName::Primitive(
2321                                    PrimitiveType::Bytes
2322                                ))),
2323                                attributes: Default::default(),
2324                            })),
2325                        ]),
2326                        default: None,
2327                        aliases: vec![],
2328                    }
2329                ],
2330                attributes: Default::default(),
2331            }))
2332        );
2333    }
2334
2335    #[test]
2336    fn test_canonical_form_generation_comprehensive_record() {
2337        // NOTE: This schema is identical to the one used in test_deserialize_comprehensive.
2338        let json_str = r#"{
2339          "type": "record",
2340          "name": "E2eComprehensive",
2341          "namespace": "org.apache.arrow.avrotests.v1",
2342          "doc": "Comprehensive Avro writer schema to exercise arrow-avro Reader/Decoder paths.",
2343          "fields": [
2344            {"name": "id", "type": "long", "doc": "Primary row id", "aliases": ["identifier"]},
2345            {"name": "flag", "type": "boolean", "default": true, "doc": "A sample boolean with default true"},
2346            {"name": "ratio_f32", "type": "float", "default": 0.0, "doc": "Float32 example"},
2347            {"name": "ratio_f64", "type": "double", "default": 0.0, "doc": "Float64 example"},
2348            {"name": "count_i32", "type": "int", "default": 0, "doc": "Int32 example"},
2349            {"name": "count_i64", "type": "long", "default": 0, "doc": "Int64 example"},
2350            {"name": "opt_i32_nullfirst", "type": ["null", "int"], "default": null, "doc": "Nullable int (null-first)"},
2351            {"name": "opt_str_nullsecond", "type": ["string", "null"], "default": "", "aliases": ["old_opt_str"], "doc": "Nullable string (null-second). Default is empty string."},
2352            {"name": "tri_union_prim", "type": ["int", "string", "boolean"], "default": 0, "doc": "Union[int, string, boolean] with default on first branch (int=0)."},
2353            {"name": "str_utf8", "type": "string", "default": "default", "doc": "Plain Utf8 string (Reader may use Utf8View)."},
2354            {"name": "raw_bytes", "type": "bytes", "default": "", "doc": "Raw bytes field"},
2355            {"name": "fx16_plain", "type": {"type": "fixed", "name": "Fx16", "namespace": "org.apache.arrow.avrotests.v1.types", "aliases": ["Fixed16Old"], "size": 16}, "doc": "Plain fixed(16)"},
2356            {"name": "dec_bytes_s10_2", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}, "doc": "Decimal encoded on bytes, precision 10, scale 2"},
2357            {"name": "dec_fix_s20_4", "type": {"type": "fixed", "name": "DecFix20", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 20, "logicalType": "decimal", "precision": 20, "scale": 4}, "doc": "Decimal encoded on fixed(20), precision 20, scale 4"},
2358            {"name": "uuid_str", "type": {"type": "string", "logicalType": "uuid"}, "doc": "UUID logical type on string"},
2359            {"name": "d_date", "type": {"type": "int", "logicalType": "date"}, "doc": "Date32: days since 1970-01-01"},
2360            {"name": "t_millis", "type": {"type": "int", "logicalType": "time-millis"}, "doc": "Time32-millis"},
2361            {"name": "t_micros", "type": {"type": "long", "logicalType": "time-micros"}, "doc": "Time64-micros"},
2362            {"name": "ts_millis_utc", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Timestamp ms (UTC)"},
2363            {"name": "ts_micros_utc", "type": {"type": "long", "logicalType": "timestamp-micros"}, "doc": "Timestamp µs (UTC)"},
2364            {"name": "ts_millis_local", "type": {"type": "long", "logicalType": "local-timestamp-millis"}, "doc": "Local timestamp ms"},
2365            {"name": "ts_micros_local", "type": {"type": "long", "logicalType": "local-timestamp-micros"}, "doc": "Local timestamp µs"},
2366            {"name": "interval_mdn", "type": {"type": "fixed", "name": "Dur12", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 12, "logicalType": "duration"}, "doc": "Duration: fixed(12) little-endian (months, days, millis)"},
2367            {"name": "status", "type": {"type": "enum", "name": "Status", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["UNKNOWN", "NEW", "PROCESSING", "DONE"], "aliases": ["State"], "doc": "Processing status enum with default"}, "default": "UNKNOWN", "doc": "Enum field using default when resolving"},
2368            {"name": "arr_union", "type": {"type": "array", "items": ["long", "string", "null"]}, "default": [], "doc": "Array whose items are a union[long,string,null]"},
2369            {"name": "map_union", "type": {"type": "map", "values": ["null", "double", "string"]}, "default": {}, "doc": "Map whose values are a union[null,double,string]"},
2370            {"name": "address", "type": {"type": "record", "name": "Address", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Postal address with defaults and field alias", "fields": [
2371                {"name": "street", "type": "string", "default": "", "aliases": ["street_name"], "doc": "Street (field alias = street_name)"},
2372                {"name": "zip", "type": "int", "default": 0, "doc": "ZIP/postal code"},
2373                {"name": "country", "type": "string", "default": "US", "doc": "Country code"}
2374            ]}, "doc": "Embedded Address record"},
2375            {"name": "maybe_auth", "type": {"type": "record", "name": "MaybeAuth", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Optional auth token model", "fields": [
2376                {"name": "user", "type": "string", "doc": "Username"},
2377                {"name": "token", "type": ["null", "bytes"], "default": null, "doc": "Nullable auth token"}
2378            ]}},
2379            {"name": "union_enum_record_array_map", "type": [
2380                {"type": "enum", "name": "Color", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["RED", "GREEN", "BLUE"], "doc": "Color enum"},
2381                {"type": "record", "name": "RecA", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]},
2382                {"type": "record", "name": "RecB", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "x", "type": "long"}, {"name": "y", "type": "bytes"}]},
2383                {"type": "array", "items": "long"},
2384                {"type": "map", "values": "string"}
2385            ], "doc": "Union of enum, two records, array, and map"},
2386            {"name": "union_date_or_fixed4", "type": [
2387                {"type": "int", "logicalType": "date"},
2388                {"type": "fixed", "name": "Fx4", "size": 4}
2389            ], "doc": "Union of date(int) or fixed(4)"},
2390            {"name": "union_interval_or_string", "type": [
2391                {"type": "fixed", "name": "Dur12U", "size": 12, "logicalType": "duration"},
2392                "string"
2393            ], "doc": "Union of duration(fixed12) or string"},
2394            {"name": "union_uuid_or_fixed10", "type": [
2395                {"type": "string", "logicalType": "uuid"},
2396                {"type": "fixed", "name": "Fx10", "size": 10}
2397            ], "doc": "Union of UUID string or fixed(10)"},
2398            {"name": "array_records_with_union", "type": {"type": "array", "items": {
2399                "type": "record", "name": "KV", "namespace": "org.apache.arrow.avrotests.v1.types",
2400                "fields": [
2401                    {"name": "key", "type": "string"},
2402                    {"name": "val", "type": ["null", "int", "long"], "default": null}
2403                ]
2404            }}, "doc": "Array<record{key, val: union[null,int,long]}>", "default": []},
2405            {"name": "union_map_or_array_int", "type": [
2406                {"type": "map", "values": "int"},
2407                {"type": "array", "items": "int"}
2408            ], "doc": "Union[map<string,int>, array<int>]"},
2409            {"name": "renamed_with_default", "type": "int", "default": 42, "aliases": ["old_count"], "doc": "Field with alias and default"},
2410            {"name": "person", "type": {"type": "record", "name": "PersonV2", "namespace": "com.example.v2", "aliases": ["com.example.Person"], "doc": "Person record with alias pointing to previous namespace/name", "fields": [
2411                {"name": "name", "type": "string"},
2412                {"name": "age", "type": "int", "default": 0}
2413            ]}, "doc": "Record using type alias for schema evolution tests"}
2414          ]
2415        }"#;
2416        let avro = AvroSchema::new(json_str.to_string());
2417        let parsed = avro.schema().expect("schema should deserialize");
2418        let expected_canonical_form = r#"{"name":"org.apache.arrow.avrotests.v1.E2eComprehensive","type":"record","fields":[{"name":"id","type":"long"},{"name":"flag","type":"boolean"},{"name":"ratio_f32","type":"float"},{"name":"ratio_f64","type":"double"},{"name":"count_i32","type":"int"},{"name":"count_i64","type":"long"},{"name":"opt_i32_nullfirst","type":["null","int"]},{"name":"opt_str_nullsecond","type":["string","null"]},{"name":"tri_union_prim","type":["int","string","boolean"]},{"name":"str_utf8","type":"string"},{"name":"raw_bytes","type":"bytes"},{"name":"fx16_plain","type":{"name":"org.apache.arrow.avrotests.v1.types.Fx16","type":"fixed","size":16}},{"name":"dec_bytes_s10_2","type":"bytes"},{"name":"dec_fix_s20_4","type":{"name":"org.apache.arrow.avrotests.v1.types.DecFix20","type":"fixed","size":20}},{"name":"uuid_str","type":"string"},{"name":"d_date","type":"int"},{"name":"t_millis","type":"int"},{"name":"t_micros","type":"long"},{"name":"ts_millis_utc","type":"long"},{"name":"ts_micros_utc","type":"long"},{"name":"ts_millis_local","type":"long"},{"name":"ts_micros_local","type":"long"},{"name":"interval_mdn","type":{"name":"org.apache.arrow.avrotests.v1.types.Dur12","type":"fixed","size":12}},{"name":"status","type":{"name":"org.apache.arrow.avrotests.v1.types.Status","type":"enum","symbols":["UNKNOWN","NEW","PROCESSING","DONE"]}},{"name":"arr_union","type":{"type":"array","items":["long","string","null"]}},{"name":"map_union","type":{"type":"map","values":["null","double","string"]}},{"name":"address","type":{"name":"org.apache.arrow.avrotests.v1.types.Address","type":"record","fields":[{"name":"street","type":"string"},{"name":"zip","type":"int"},{"name":"country","type":"string"}]}},{"name":"maybe_auth","type":{"name":"org.apache.arrow.avrotests.v1.types.MaybeAuth","type":"record","fields":[{"name":"user","type":"string"},{"name":"token","type":["null","bytes"]}]}},{"name":"union_enum_record_array_map","type":[{"name":"org.apache.arrow.avrotests.v1.types.Color","type":"enum","symbols":["RED","GREEN","BLUE"]},{"name":"org.apache.arrow.avrotests.v1.types.RecA","type":"record","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]},{"name":"org.apache.arrow.avrotests.v1.types.RecB","type":"record","fields":[{"name":"x","type":"long"},{"name":"y","type":"bytes"}]},{"type":"array","items":"long"},{"type":"map","values":"string"}]},{"name":"union_date_or_fixed4","type":["int",{"name":"org.apache.arrow.avrotests.v1.Fx4","type":"fixed","size":4}]},{"name":"union_interval_or_string","type":[{"name":"org.apache.arrow.avrotests.v1.Dur12U","type":"fixed","size":12},"string"]},{"name":"union_uuid_or_fixed10","type":["string",{"name":"org.apache.arrow.avrotests.v1.Fx10","type":"fixed","size":10}]},{"name":"array_records_with_union","type":{"type":"array","items":{"name":"org.apache.arrow.avrotests.v1.types.KV","type":"record","fields":[{"name":"key","type":"string"},{"name":"val","type":["null","int","long"]}]}}},{"name":"union_map_or_array_int","type":[{"type":"map","values":"int"},{"type":"array","items":"int"}]},{"name":"renamed_with_default","type":"int"},{"name":"person","type":{"name":"com.example.v2.PersonV2","type":"record","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}}]}"#;
2419        let canonical_form =
2420            AvroSchema::generate_canonical_form(&parsed).expect("canonical form should be built");
2421        assert_eq!(
2422            canonical_form, expected_canonical_form,
2423            "Canonical form must match Avro spec PCF exactly"
2424        );
2425    }
2426
2427    #[test]
2428    fn test_new_schema_store() {
2429        let store = SchemaStore::new();
2430        assert!(store.schemas.is_empty());
2431    }
2432
2433    #[test]
2434    fn test_try_from_schemas_rabin() {
2435        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2436        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2437        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2438        schemas.insert(
2439            int_avro_schema
2440                .fingerprint(FingerprintAlgorithm::Rabin)
2441                .unwrap(),
2442            int_avro_schema.clone(),
2443        );
2444        schemas.insert(
2445            record_avro_schema
2446                .fingerprint(FingerprintAlgorithm::Rabin)
2447                .unwrap(),
2448            record_avro_schema.clone(),
2449        );
2450        let store = SchemaStore::try_from(schemas).unwrap();
2451        let int_fp = int_avro_schema
2452            .fingerprint(FingerprintAlgorithm::Rabin)
2453            .unwrap();
2454        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2455        let rec_fp = record_avro_schema
2456            .fingerprint(FingerprintAlgorithm::Rabin)
2457            .unwrap();
2458        assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
2459    }
2460
2461    #[test]
2462    fn test_try_from_with_duplicates() {
2463        let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2464        let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2465        let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2466        schemas.insert(
2467            int_avro_schema
2468                .fingerprint(FingerprintAlgorithm::Rabin)
2469                .unwrap(),
2470            int_avro_schema.clone(),
2471        );
2472        schemas.insert(
2473            record_avro_schema
2474                .fingerprint(FingerprintAlgorithm::Rabin)
2475                .unwrap(),
2476            record_avro_schema.clone(),
2477        );
2478        // Insert duplicate of int schema
2479        schemas.insert(
2480            int_avro_schema
2481                .fingerprint(FingerprintAlgorithm::Rabin)
2482                .unwrap(),
2483            int_avro_schema.clone(),
2484        );
2485        let store = SchemaStore::try_from(schemas).unwrap();
2486        assert_eq!(store.schemas.len(), 2);
2487        let int_fp = int_avro_schema
2488            .fingerprint(FingerprintAlgorithm::Rabin)
2489            .unwrap();
2490        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2491    }
2492
2493    #[test]
2494    fn test_register_and_lookup_rabin() {
2495        let mut store = SchemaStore::new();
2496        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2497        let fp_enum = store.register(schema.clone()).unwrap();
2498        match fp_enum {
2499            Fingerprint::Rabin(fp_val) => {
2500                assert_eq!(
2501                    store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
2502                    Some(schema.clone())
2503                );
2504                assert!(
2505                    store
2506                        .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
2507                        .is_none()
2508                );
2509            }
2510            Fingerprint::Id(_id) => {
2511                unreachable!("This test should only generate Rabin fingerprints")
2512            }
2513            Fingerprint::Id64(_id) => {
2514                unreachable!("This test should only generate Rabin fingerprints")
2515            }
2516            #[cfg(feature = "md5")]
2517            Fingerprint::MD5(_id) => {
2518                unreachable!("This test should only generate Rabin fingerprints")
2519            }
2520            #[cfg(feature = "sha256")]
2521            Fingerprint::SHA256(_id) => {
2522                unreachable!("This test should only generate Rabin fingerprints")
2523            }
2524        }
2525    }
2526
2527    #[test]
2528    fn test_set_and_lookup_id() {
2529        let mut store = SchemaStore::new();
2530        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2531        let id = 42u32;
2532        let fp = Fingerprint::Id(id);
2533        let out_fp = store.set(fp, schema.clone()).unwrap();
2534        assert_eq!(out_fp, fp);
2535        assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2536        assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2537    }
2538
2539    #[test]
2540    fn test_set_and_lookup_id64() {
2541        let mut store = SchemaStore::new();
2542        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2543        let id64: u64 = 0xDEAD_BEEF_DEAD_BEEF;
2544        let fp = Fingerprint::Id64(id64);
2545        let out_fp = store.set(fp, schema.clone()).unwrap();
2546        assert_eq!(out_fp, fp, "set should return the same Id64 fingerprint");
2547        assert_eq!(
2548            store.lookup(&fp).cloned(),
2549            Some(schema.clone()),
2550            "lookup should find the schema by Id64"
2551        );
2552        assert!(
2553            store
2554                .lookup(&Fingerprint::Id64(id64.wrapping_add(1)))
2555                .is_none(),
2556            "lookup with a different Id64 must return None"
2557        );
2558    }
2559
2560    #[test]
2561    fn test_fingerprint_id64_conversions() {
2562        let algo_from_fp = FingerprintAlgorithm::from(&Fingerprint::Id64(123));
2563        assert_eq!(algo_from_fp, FingerprintAlgorithm::Id64);
2564        let fp_from_algo = Fingerprint::from(FingerprintAlgorithm::Id64);
2565        assert!(matches!(fp_from_algo, Fingerprint::Id64(0)));
2566        let strategy_from_fp = FingerprintStrategy::from(Fingerprint::Id64(5));
2567        assert!(matches!(strategy_from_fp, FingerprintStrategy::Id64(0)));
2568        let algo_from_strategy = FingerprintAlgorithm::from(strategy_from_fp);
2569        assert_eq!(algo_from_strategy, FingerprintAlgorithm::Id64);
2570    }
2571
2572    #[test]
2573    fn test_register_duplicate_schema() {
2574        let mut store = SchemaStore::new();
2575        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2576        let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2577        let fingerprint1 = store.register(schema1).unwrap();
2578        let fingerprint2 = store.register(schema2).unwrap();
2579        assert_eq!(fingerprint1, fingerprint2);
2580        assert_eq!(store.schemas.len(), 1);
2581    }
2582
2583    #[test]
2584    fn test_set_and_lookup_with_provided_fingerprint() {
2585        let mut store = SchemaStore::new();
2586        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2587        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2588        let out_fp = store.set(fp, schema.clone()).unwrap();
2589        assert_eq!(out_fp, fp);
2590        assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2591    }
2592
2593    #[test]
2594    fn test_set_duplicate_same_schema_ok() {
2595        let mut store = SchemaStore::new();
2596        let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2597        let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2598        let _ = store.set(fp, schema.clone()).unwrap();
2599        let _ = store.set(fp, schema.clone()).unwrap();
2600        assert_eq!(store.schemas.len(), 1);
2601    }
2602
2603    #[test]
2604    fn test_set_duplicate_different_schema_collision_error() {
2605        let mut store = SchemaStore::new();
2606        let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2607        let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2608        // Use the same Fingerprint::Id to simulate a collision across different schemas
2609        let fp = Fingerprint::Id(123);
2610        let _ = store.set(fp, schema1).unwrap();
2611        let err = store.set(fp, schema2).unwrap_err();
2612        let msg = format!("{err}");
2613        assert!(msg.contains("Schema fingerprint collision"));
2614    }
2615
2616    #[test]
2617    fn test_canonical_form_generation_primitive() {
2618        let schema = int_schema();
2619        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2620        assert_eq!(canonical_form, r#""int""#);
2621    }
2622
2623    #[test]
2624    fn test_canonical_form_generation_record() {
2625        let schema = record_schema();
2626        let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2627        let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2628        assert_eq!(canonical_form, expected_canonical_form);
2629    }
2630
2631    #[test]
2632    fn test_fingerprint_calculation() {
2633        let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2634        let expected_fingerprint = 10505236152925314060;
2635        let fingerprint = compute_fingerprint_rabin(canonical_form);
2636        assert_eq!(fingerprint, expected_fingerprint);
2637    }
2638
2639    #[test]
2640    fn test_register_and_lookup_complex_schema() {
2641        let mut store = SchemaStore::new();
2642        let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2643        let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2644        let expected_fingerprint = Fingerprint::Rabin(compute_fingerprint_rabin(canonical_form));
2645        let fingerprint = store.register(schema.clone()).unwrap();
2646        assert_eq!(fingerprint, expected_fingerprint);
2647        let looked_up = store.lookup(&fingerprint).cloned();
2648        assert_eq!(looked_up, Some(schema));
2649    }
2650
2651    #[test]
2652    fn test_fingerprints_returns_all_keys() {
2653        let mut store = SchemaStore::new();
2654        let fp_int = store
2655            .register(AvroSchema::new(
2656                serde_json::to_string(&int_schema()).unwrap(),
2657            ))
2658            .unwrap();
2659        let fp_record = store
2660            .register(AvroSchema::new(
2661                serde_json::to_string(&record_schema()).unwrap(),
2662            ))
2663            .unwrap();
2664        let fps = store.fingerprints();
2665        assert_eq!(fps.len(), 2);
2666        assert!(fps.contains(&fp_int));
2667        assert!(fps.contains(&fp_record));
2668    }
2669
2670    #[test]
2671    fn test_canonical_form_strips_attributes() {
2672        let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2673            name: "record_with_attrs",
2674            namespace: None,
2675            doc: Some(Cow::from("This doc should be stripped")),
2676            aliases: vec!["alias1", "alias2"],
2677            fields: vec![Field {
2678                name: "f1",
2679                doc: Some(Cow::from("field doc")),
2680                r#type: Schema::Type(Type {
2681                    r#type: TypeName::Primitive(PrimitiveType::Bytes),
2682                    attributes: Attributes {
2683                        logical_type: None,
2684                        additional: HashMap::from([("precision", json!(4))]),
2685                    },
2686                }),
2687                default: None,
2688                aliases: vec![],
2689            }],
2690            attributes: Attributes {
2691                logical_type: None,
2692                additional: HashMap::from([("custom_attr", json!("value"))]),
2693            },
2694        }));
2695        let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2696        let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2697        assert_eq!(canonical_form, expected_canonical_form);
2698    }
2699
2700    #[cfg(not(feature = "avro_custom_types"))]
2701    #[test]
2702    fn test_primitive_mappings() {
2703        let cases = vec![
2704            (DataType::Boolean, "\"boolean\""),
2705            (DataType::Int8, "\"int\""),
2706            (DataType::Int16, "\"int\""),
2707            (DataType::Int32, "\"int\""),
2708            (DataType::Int64, "\"long\""),
2709            (DataType::UInt8, "\"int\""),
2710            (DataType::UInt16, "\"int\""),
2711            (DataType::UInt32, "\"long\""),
2712            (DataType::UInt64, "\"long\""),
2713            (DataType::Float16, "\"float\""),
2714            (DataType::Float32, "\"float\""),
2715            (DataType::Float64, "\"double\""),
2716            (DataType::Utf8, "\"string\""),
2717            (DataType::Binary, "\"bytes\""),
2718        ];
2719        for (dt, avro_token) in cases {
2720            let field = ArrowField::new("col", dt.clone(), false);
2721            let arrow_schema = single_field_schema(field);
2722            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2723            assert_json_contains(&avro.json_string, avro_token);
2724        }
2725    }
2726
2727    #[cfg(feature = "avro_custom_types")]
2728    #[test]
2729    fn test_primitive_mappings() {
2730        let cases = vec![
2731            (DataType::Boolean, "\"boolean\""),
2732            (DataType::Int8, "\"logicalType\":\"arrow.int8\""),
2733            (DataType::Int16, "\"logicalType\":\"arrow.int16\""),
2734            (DataType::Int32, "\"int\""),
2735            (DataType::Int64, "\"long\""),
2736            (DataType::UInt8, "\"logicalType\":\"arrow.uint8\""),
2737            (DataType::UInt16, "\"logicalType\":\"arrow.uint16\""),
2738            (DataType::UInt32, "\"logicalType\":\"arrow.uint32\""),
2739            (DataType::UInt64, "\"logicalType\":\"arrow.uint64\""),
2740            (DataType::Float16, "\"logicalType\":\"arrow.float16\""),
2741            (DataType::Float32, "\"float\""),
2742            (DataType::Float64, "\"double\""),
2743            (DataType::Utf8, "\"string\""),
2744            (DataType::Binary, "\"bytes\""),
2745        ];
2746        for (dt, avro_token) in cases {
2747            let field = ArrowField::new("col", dt.clone(), false);
2748            let arrow_schema = single_field_schema(field);
2749            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2750            assert_json_contains(&avro.json_string, avro_token);
2751        }
2752    }
2753
2754    #[cfg(feature = "avro_custom_types")]
2755    #[test]
2756    fn test_custom_fixed_logical_types_preserve_namespace_metadata() {
2757        let namespace = "com.example.types";
2758
2759        let mut md_u64 = HashMap::new();
2760        md_u64.insert(AVRO_NAME_METADATA_KEY.to_string(), "U64Type".to_string());
2761        md_u64.insert(
2762            AVRO_NAMESPACE_METADATA_KEY.to_string(),
2763            namespace.to_string(),
2764        );
2765
2766        let mut md_f16 = HashMap::new();
2767        md_f16.insert(AVRO_NAME_METADATA_KEY.to_string(), "F16Type".to_string());
2768        md_f16.insert(
2769            AVRO_NAMESPACE_METADATA_KEY.to_string(),
2770            namespace.to_string(),
2771        );
2772
2773        let mut md_iv_ym = HashMap::new();
2774        md_iv_ym.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvYmType".to_string());
2775        md_iv_ym.insert(
2776            AVRO_NAMESPACE_METADATA_KEY.to_string(),
2777            namespace.to_string(),
2778        );
2779
2780        let mut md_iv_dt = HashMap::new();
2781        md_iv_dt.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvDtType".to_string());
2782        md_iv_dt.insert(
2783            AVRO_NAMESPACE_METADATA_KEY.to_string(),
2784            namespace.to_string(),
2785        );
2786
2787        let arrow_schema = ArrowSchema::new(vec![
2788            ArrowField::new("u64_col", DataType::UInt64, false).with_metadata(md_u64),
2789            ArrowField::new("f16_col", DataType::Float16, false).with_metadata(md_f16),
2790            ArrowField::new(
2791                "iv_ym_col",
2792                DataType::Interval(IntervalUnit::YearMonth),
2793                false,
2794            )
2795            .with_metadata(md_iv_ym),
2796            ArrowField::new(
2797                "iv_dt_col",
2798                DataType::Interval(IntervalUnit::DayTime),
2799                false,
2800            )
2801            .with_metadata(md_iv_dt),
2802        ]);
2803
2804        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2805        let root: Value = serde_json::from_str(&avro.json_string).unwrap();
2806        let fields = root
2807            .get("fields")
2808            .and_then(|f| f.as_array())
2809            .expect("record fields array");
2810
2811        let expected = [
2812            ("u64_col", "arrow.uint64"),
2813            ("f16_col", "arrow.float16"),
2814            ("iv_ym_col", "arrow.interval-year-month"),
2815            ("iv_dt_col", "arrow.interval-day-time"),
2816        ];
2817
2818        for (field_name, logical_type) in expected {
2819            let field = fields
2820                .iter()
2821                .find(|f| f.get("name").and_then(Value::as_str) == Some(field_name))
2822                .unwrap_or_else(|| panic!("missing field {field_name}"));
2823            let ty = field
2824                .get("type")
2825                .and_then(Value::as_object)
2826                .unwrap_or_else(|| panic!("field {field_name} type must be object"));
2827
2828            assert_eq!(ty.get("type").and_then(Value::as_str), Some("fixed"));
2829            assert_eq!(
2830                ty.get("logicalType").and_then(Value::as_str),
2831                Some(logical_type)
2832            );
2833            assert_eq!(
2834                ty.get("namespace").and_then(Value::as_str),
2835                Some(namespace),
2836                "field {field_name} must preserve avro.namespace metadata"
2837            );
2838        }
2839    }
2840
2841    #[cfg(feature = "avro_custom_types")]
2842    #[test]
2843    fn test_custom_fixed_logical_types_omit_namespace_without_metadata() {
2844        let mut md_u64 = HashMap::new();
2845        md_u64.insert(AVRO_NAME_METADATA_KEY.to_string(), "U64Type".to_string());
2846
2847        let mut md_f16 = HashMap::new();
2848        md_f16.insert(AVRO_NAME_METADATA_KEY.to_string(), "F16Type".to_string());
2849
2850        let mut md_iv_ym = HashMap::new();
2851        md_iv_ym.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvYmType".to_string());
2852
2853        let mut md_iv_dt = HashMap::new();
2854        md_iv_dt.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvDtType".to_string());
2855
2856        let arrow_schema = ArrowSchema::new(vec![
2857            ArrowField::new("u64_col", DataType::UInt64, false).with_metadata(md_u64),
2858            ArrowField::new("f16_col", DataType::Float16, false).with_metadata(md_f16),
2859            ArrowField::new(
2860                "iv_ym_col",
2861                DataType::Interval(IntervalUnit::YearMonth),
2862                false,
2863            )
2864            .with_metadata(md_iv_ym),
2865            ArrowField::new(
2866                "iv_dt_col",
2867                DataType::Interval(IntervalUnit::DayTime),
2868                false,
2869            )
2870            .with_metadata(md_iv_dt),
2871        ]);
2872
2873        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2874        let root: Value = serde_json::from_str(&avro.json_string).unwrap();
2875        let fields = root
2876            .get("fields")
2877            .and_then(|f| f.as_array())
2878            .expect("record fields array");
2879
2880        for field_name in ["u64_col", "f16_col", "iv_ym_col", "iv_dt_col"] {
2881            let field = fields
2882                .iter()
2883                .find(|f| f.get("name").and_then(Value::as_str) == Some(field_name))
2884                .unwrap_or_else(|| panic!("missing field {field_name}"));
2885            let ty = field
2886                .get("type")
2887                .and_then(Value::as_object)
2888                .unwrap_or_else(|| panic!("field {field_name} type must be object"));
2889
2890            assert_eq!(ty.get("type").and_then(Value::as_str), Some("fixed"));
2891            assert!(
2892                !ty.contains_key("namespace"),
2893                "field {field_name} should not include namespace when metadata lacks avro.namespace"
2894            );
2895        }
2896    }
2897
2898    #[test]
2899    fn test_temporal_mappings() {
2900        let cases = vec![
2901            (DataType::Date32, "\"logicalType\":\"date\""),
2902            (
2903                DataType::Time32(TimeUnit::Millisecond),
2904                "\"logicalType\":\"time-millis\"",
2905            ),
2906            (
2907                DataType::Time64(TimeUnit::Microsecond),
2908                "\"logicalType\":\"time-micros\"",
2909            ),
2910            (
2911                DataType::Timestamp(TimeUnit::Millisecond, None),
2912                "\"logicalType\":\"local-timestamp-millis\"",
2913            ),
2914            (
2915                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2916                "\"logicalType\":\"timestamp-micros\"",
2917            ),
2918        ];
2919        for (dt, needle) in cases {
2920            let field = ArrowField::new("ts", dt.clone(), true);
2921            let arrow_schema = single_field_schema(field);
2922            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2923            assert_json_contains(&avro.json_string, needle);
2924        }
2925    }
2926
2927    #[test]
2928    fn test_decimal_and_uuid() {
2929        let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2930        let dec_schema = single_field_schema(decimal_field);
2931        let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2932        assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2933        assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2934        assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2935        let mut md = HashMap::new();
2936        md.insert("logicalType".into(), "uuid".into());
2937        let uuid_field =
2938            ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2939        let uuid_schema = single_field_schema(uuid_field);
2940        let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2941        assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2942    }
2943
2944    #[cfg(not(feature = "avro_custom_types"))]
2945    #[test]
2946    fn test_interval_month_day_nano_duration_schema() {
2947        let interval_field = ArrowField::new(
2948            "span",
2949            DataType::Interval(IntervalUnit::MonthDayNano),
2950            false,
2951        );
2952        let s = single_field_schema(interval_field);
2953        let avro = AvroSchema::try_from(&s).unwrap();
2954        assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2955        assert_json_contains(&avro.json_string, "\"size\":12");
2956    }
2957
2958    #[cfg(feature = "avro_custom_types")]
2959    #[test]
2960    fn test_interval_month_day_nano_custom_schema() {
2961        let interval_field = ArrowField::new(
2962            "span",
2963            DataType::Interval(IntervalUnit::MonthDayNano),
2964            false,
2965        );
2966        let s = single_field_schema(interval_field);
2967        let avro = AvroSchema::try_from(&s).unwrap();
2968        assert_json_contains(
2969            &avro.json_string,
2970            "\"logicalType\":\"arrow.interval-month-day-nano\"",
2971        );
2972        assert_json_contains(&avro.json_string, "\"size\":16");
2973    }
2974
2975    #[cfg(feature = "avro_custom_types")]
2976    #[test]
2977    fn test_duration_custom_logical_type() {
2978        let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2979        let s2 = single_field_schema(dur_field);
2980        let avro2 = AvroSchema::try_from(&s2).unwrap();
2981        assert_json_contains(
2982            &avro2.json_string,
2983            "\"logicalType\":\"arrow.duration-nanos\"",
2984        );
2985    }
2986
2987    #[test]
2988    fn test_complex_types() {
2989        let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2990        let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2991        let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2992        assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2993        assert_json_contains(&avro_list.json_string, "\"items\"");
2994        let value_field = ArrowField::new("value", DataType::Boolean, true);
2995        let entries_struct = ArrowField::new(
2996            "entries",
2997            DataType::Struct(Fields::from(vec![
2998                ArrowField::new("key", DataType::Utf8, false),
2999                value_field.clone(),
3000            ])),
3001            false,
3002        );
3003        let map_dt = DataType::Map(Arc::new(entries_struct), false);
3004        let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
3005        let avro_map = AvroSchema::try_from(&map_schema).unwrap();
3006        assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
3007        assert_json_contains(&avro_map.json_string, "\"values\"");
3008        let struct_dt = DataType::Struct(Fields::from(vec![
3009            ArrowField::new("f1", DataType::Int64, false),
3010            ArrowField::new("f2", DataType::Utf8, true),
3011        ]));
3012        let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
3013        let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
3014        assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
3015        assert_json_contains(&avro_struct.json_string, "\"null\"");
3016    }
3017
3018    #[test]
3019    fn test_enum_dictionary() {
3020        let mut md = HashMap::new();
3021        md.insert(
3022            AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
3023            "[\"OPEN\",\"CLOSED\"]".into(),
3024        );
3025        let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
3026        let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
3027        let schema = single_field_schema(field);
3028        let avro = AvroSchema::try_from(&schema).unwrap();
3029        assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
3030        assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
3031    }
3032
3033    #[test]
3034    fn test_run_end_encoded() {
3035        let ree_dt = DataType::RunEndEncoded(
3036            Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
3037            Arc::new(ArrowField::new("values", DataType::Utf8, false)),
3038        );
3039        let s = single_field_schema(ArrowField::new("text", ree_dt, false));
3040        let avro = AvroSchema::try_from(&s).unwrap();
3041        assert_json_contains(&avro.json_string, "\"string\"");
3042    }
3043
3044    #[test]
3045    fn test_dense_union() {
3046        let uf: UnionFields = vec![
3047            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3048            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
3049        ]
3050        .into_iter()
3051        .collect();
3052        let union_dt = DataType::Union(uf, UnionMode::Dense);
3053        let s = single_field_schema(ArrowField::new("u", union_dt, false));
3054        let avro =
3055            AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
3056        let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
3057        let fields = v
3058            .get("fields")
3059            .and_then(|x| x.as_array())
3060            .expect("fields array");
3061        let u_field = fields
3062            .iter()
3063            .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
3064            .expect("field 'u'");
3065        let union = u_field.get("type").expect("u.type");
3066        let arr = union.as_array().expect("u.type must be Avro union array");
3067        assert_eq!(arr.len(), 2, "expected two union branches");
3068        let first = &arr[0];
3069        let obj = first
3070            .as_object()
3071            .expect("first branch should be an object with metadata");
3072        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3073        assert_eq!(
3074            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3075            Some("dense")
3076        );
3077        let type_ids: Vec<i64> = obj
3078            .get("arrowUnionTypeIds")
3079            .and_then(|a| a.as_array())
3080            .expect("arrowUnionTypeIds array")
3081            .iter()
3082            .map(|n| n.as_i64().expect("i64"))
3083            .collect();
3084        assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
3085        assert_eq!(arr[1], Value::String("string".into()));
3086    }
3087
3088    #[test]
3089    fn round_trip_primitive() {
3090        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
3091        let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
3092        let decoded = avro_schema.schema().unwrap();
3093        assert!(matches!(decoded, Schema::Complex(_)));
3094    }
3095
3096    #[test]
3097    fn test_name_generator_sanitization_and_uniqueness() {
3098        let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
3099        let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
3100        let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
3101        let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
3102        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
3103        assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
3104        assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
3105        assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
3106    }
3107
3108    #[cfg(not(feature = "avro_custom_types"))]
3109    #[test]
3110    fn test_date64_logical_type_mapping() {
3111        let field = ArrowField::new("d", DataType::Date64, true);
3112        let schema = single_field_schema(field);
3113        let avro = AvroSchema::try_from(&schema).unwrap();
3114        assert_json_contains(
3115            &avro.json_string,
3116            "\"logicalType\":\"local-timestamp-millis\"",
3117        );
3118    }
3119
3120    #[cfg(feature = "avro_custom_types")]
3121    #[test]
3122    fn test_date64_logical_type_mapping_custom() {
3123        let field = ArrowField::new("d", DataType::Date64, true);
3124        let schema = single_field_schema(field);
3125        let avro = AvroSchema::try_from(&schema).unwrap();
3126        assert_json_contains(&avro.json_string, "\"logicalType\":\"arrow.date64\"");
3127    }
3128
3129    #[cfg(feature = "avro_custom_types")]
3130    #[test]
3131    fn test_duration_list_extras_propagated() {
3132        let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
3133        let list_dt = DataType::List(Arc::new(child));
3134        let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
3135        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
3136        assert_json_contains(
3137            &avro.json_string,
3138            "\"logicalType\":\"arrow.duration-micros\"",
3139        );
3140    }
3141
3142    #[cfg(not(feature = "avro_custom_types"))]
3143    #[test]
3144    fn test_interval_yearmonth_extra() {
3145        let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
3146        let schema = single_field_schema(field);
3147        let avro = AvroSchema::try_from(&schema).unwrap();
3148        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
3149    }
3150
3151    #[cfg(not(feature = "avro_custom_types"))]
3152    #[test]
3153    fn test_interval_daytime_extra() {
3154        let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
3155        let schema = single_field_schema(field);
3156        let avro = AvroSchema::try_from(&schema).unwrap();
3157        assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
3158    }
3159
3160    #[cfg(feature = "avro_custom_types")]
3161    #[test]
3162    fn test_interval_yearmonth_custom() {
3163        let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
3164        let schema = single_field_schema(field);
3165        let avro = AvroSchema::try_from(&schema).unwrap();
3166        assert_json_contains(
3167            &avro.json_string,
3168            "\"logicalType\":\"arrow.interval-year-month\"",
3169        );
3170    }
3171
3172    #[cfg(feature = "avro_custom_types")]
3173    #[test]
3174    fn test_interval_daytime_custom() {
3175        let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
3176        let schema = single_field_schema(field);
3177        let avro = AvroSchema::try_from(&schema).unwrap();
3178        assert_json_contains(
3179            &avro.json_string,
3180            "\"logicalType\":\"arrow.interval-day-time\"",
3181        );
3182    }
3183
3184    #[test]
3185    fn test_fixed_size_list_extra() {
3186        let child = ArrowField::new("item", DataType::Int32, false);
3187        let dt = DataType::FixedSizeList(Arc::new(child), 3);
3188        let schema = single_field_schema(ArrowField::new("triples", dt, false));
3189        let avro = AvroSchema::try_from(&schema).unwrap();
3190        assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
3191    }
3192
3193    #[cfg(feature = "avro_custom_types")]
3194    #[test]
3195    fn test_map_duration_value_extra() {
3196        let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
3197        let entries_struct = ArrowField::new(
3198            "entries",
3199            DataType::Struct(Fields::from(vec![
3200                ArrowField::new("key", DataType::Utf8, false),
3201                val_field,
3202            ])),
3203            false,
3204        );
3205        let map_dt = DataType::Map(Arc::new(entries_struct), false);
3206        let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
3207        let avro = AvroSchema::try_from(&schema).unwrap();
3208        assert_json_contains(
3209            &avro.json_string,
3210            "\"logicalType\":\"arrow.duration-seconds\"",
3211        );
3212    }
3213
3214    #[test]
3215    fn test_schema_with_non_string_defaults_decodes_successfully() {
3216        let schema_json = r#"{
3217            "type": "record",
3218            "name": "R",
3219            "fields": [
3220                {"name": "a", "type": "int", "default": 0},
3221                {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
3222                {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
3223                {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
3224                    {"name": "flag", "type": "boolean", "default": true},
3225                    {"name": "name", "type": "string", "default": "hi"}
3226                ]}, "default": {"flag": false, "name": "d"}},
3227                {"name": "u", "type": ["int", "null"], "default": 42}
3228            ]
3229        }"#;
3230        let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
3231        match &schema {
3232            Schema::Complex(ComplexType::Record(_)) => {}
3233            other => panic!("expected record schema, got: {:?}", other),
3234        }
3235        // Avro to Arrow conversion
3236        let field = crate::codec::AvroField::try_from(&schema)
3237            .expect("Avro->Arrow conversion should succeed");
3238        let arrow_field = field.field();
3239        // Build expected Arrow field
3240        let expected_list_item = ArrowField::new(
3241            arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
3242            DataType::Int64,
3243            false,
3244        );
3245        let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
3246
3247        let expected_map_value = ArrowField::new("value", DataType::Float64, false);
3248        let expected_entries = ArrowField::new(
3249            "entries",
3250            DataType::Struct(Fields::from(vec![
3251                ArrowField::new("key", DataType::Utf8, false),
3252                expected_map_value,
3253            ])),
3254            false,
3255        );
3256        let expected_c =
3257            ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
3258        let mut inner_md = std::collections::HashMap::new();
3259        inner_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "Inner".to_string());
3260        let expected_inner = ArrowField::new(
3261            "inner",
3262            DataType::Struct(Fields::from(vec![
3263                ArrowField::new("flag", DataType::Boolean, false),
3264                ArrowField::new("name", DataType::Utf8, false),
3265            ])),
3266            false,
3267        )
3268        .with_metadata(inner_md);
3269        let mut root_md = std::collections::HashMap::new();
3270        root_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "R".to_string());
3271        let expected = ArrowField::new(
3272            "R",
3273            DataType::Struct(Fields::from(vec![
3274                ArrowField::new("a", DataType::Int32, false),
3275                expected_b,
3276                expected_c,
3277                expected_inner,
3278                ArrowField::new("u", DataType::Int32, true),
3279            ])),
3280            false,
3281        )
3282        .with_metadata(root_md);
3283        assert_eq!(arrow_field, expected);
3284    }
3285
3286    #[test]
3287    fn default_order_is_consistent() {
3288        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
3289        let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
3290        let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
3291        assert_eq!(a, b.unwrap().json_string);
3292    }
3293
3294    #[test]
3295    fn test_union_branch_missing_name_errors() {
3296        for t in ["record", "enum", "fixed"] {
3297            let branch = json!({ "type": t });
3298            let err = union_branch_signature(&branch).unwrap_err().to_string();
3299            assert!(
3300                err.contains(&format!("Union branch '{t}' missing required 'name'")),
3301                "expected missing-name error for {t}, got: {err}"
3302            );
3303        }
3304    }
3305
3306    #[test]
3307    fn test_union_branch_named_type_signature_includes_name() {
3308        let rec = json!({ "type": "record", "name": "Foo" });
3309        assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
3310        let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
3311        assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
3312        let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
3313        assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
3314    }
3315
3316    #[test]
3317    fn test_record_field_alias_resolution_without_default() {
3318        let writer_json = r#"{
3319          "type":"record",
3320          "name":"R",
3321          "fields":[{"name":"old","type":"int"}]
3322        }"#;
3323        let reader_json = r#"{
3324          "type":"record",
3325          "name":"R",
3326          "fields":[{"name":"new","aliases":["old"],"type":"int"}]
3327        }"#;
3328        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3329        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3330        let resolved = AvroFieldBuilder::new(&writer)
3331            .with_reader_schema(&reader)
3332            .with_utf8view(false)
3333            .with_strict_mode(false)
3334            .build()
3335            .unwrap();
3336        let expected = ArrowField::new(
3337            "R",
3338            DataType::Struct(Fields::from(vec![ArrowField::new(
3339                "new",
3340                DataType::Int32,
3341                false,
3342            )])),
3343            false,
3344        )
3345        .with_metadata(HashMap::from_iter([(
3346            "avro.name".to_owned(),
3347            "R".to_owned(),
3348        )]));
3349        assert_eq!(resolved.field(), expected);
3350    }
3351
3352    #[test]
3353    fn test_record_field_alias_ambiguous_in_strict_mode_errors() {
3354        let writer_json = r#"{
3355          "type":"record",
3356          "name":"R",
3357          "fields":[
3358            {"name":"a","type":"int","aliases":["old"]},
3359            {"name":"b","type":"int","aliases":["old"]}
3360          ]
3361        }"#;
3362        let reader_json = r#"{
3363          "type":"record",
3364          "name":"R",
3365          "fields":[{"name":"target","type":"int","aliases":["old"]}]
3366        }"#;
3367        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3368        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3369        let err = AvroFieldBuilder::new(&writer)
3370            .with_reader_schema(&reader)
3371            .with_utf8view(false)
3372            .with_strict_mode(true)
3373            .build()
3374            .unwrap_err()
3375            .to_string();
3376        assert!(
3377            err.contains("Ambiguous alias 'old'"),
3378            "expected ambiguous-alias error, got: {err}"
3379        );
3380    }
3381
3382    #[test]
3383    fn test_pragmatic_writer_field_alias_mapping_non_strict() {
3384        let writer_json = r#"{
3385          "type":"record",
3386          "name":"R",
3387          "fields":[{"name":"before","type":"int","aliases":["now"]}]
3388        }"#;
3389        let reader_json = r#"{
3390          "type":"record",
3391          "name":"R",
3392          "fields":[{"name":"now","type":"int"}]
3393        }"#;
3394        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3395        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3396        let resolved = AvroFieldBuilder::new(&writer)
3397            .with_reader_schema(&reader)
3398            .with_utf8view(false)
3399            .with_strict_mode(false)
3400            .build()
3401            .unwrap();
3402        let expected = ArrowField::new(
3403            "R",
3404            DataType::Struct(Fields::from(vec![ArrowField::new(
3405                "now",
3406                DataType::Int32,
3407                false,
3408            )])),
3409            false,
3410        )
3411        .with_metadata(HashMap::from_iter([(
3412            "avro.name".to_owned(),
3413            "R".to_owned(),
3414        )]));
3415        assert_eq!(resolved.field(), expected);
3416    }
3417
3418    #[test]
3419    fn test_missing_reader_field_null_first_no_default_is_ok() {
3420        let writer_json = r#"{
3421          "type":"record",
3422          "name":"R",
3423          "fields":[{"name":"a","type":"int"}]
3424        }"#;
3425        let reader_json = r#"{
3426          "type":"record",
3427          "name":"R",
3428          "fields":[
3429            {"name":"a","type":"int"},
3430            {"name":"b","type":["null","int"]}
3431          ]
3432        }"#;
3433        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3434        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3435        let resolved = AvroFieldBuilder::new(&writer)
3436            .with_reader_schema(&reader)
3437            .with_utf8view(false)
3438            .with_strict_mode(false)
3439            .build()
3440            .unwrap();
3441        let expected = ArrowField::new(
3442            "R",
3443            DataType::Struct(Fields::from(vec![
3444                ArrowField::new("a", DataType::Int32, false),
3445                ArrowField::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
3446                    AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(),
3447                    "null".to_string(),
3448                )])),
3449            ])),
3450            false,
3451        )
3452        .with_metadata(HashMap::from_iter([(
3453            "avro.name".to_owned(),
3454            "R".to_owned(),
3455        )]));
3456        assert_eq!(resolved.field(), expected);
3457    }
3458
3459    #[test]
3460    fn test_missing_reader_field_null_second_without_default_errors() {
3461        let writer_json = r#"{
3462          "type":"record",
3463          "name":"R",
3464          "fields":[{"name":"a","type":"int"}]
3465        }"#;
3466        let reader_json = r#"{
3467          "type":"record",
3468          "name":"R",
3469          "fields":[
3470            {"name":"a","type":"int"},
3471            {"name":"b","type":["int","null"]}
3472          ]
3473        }"#;
3474        let writer: Schema = serde_json::from_str(writer_json).unwrap();
3475        let reader: Schema = serde_json::from_str(reader_json).unwrap();
3476        let err = AvroFieldBuilder::new(&writer)
3477            .with_reader_schema(&reader)
3478            .with_utf8view(false)
3479            .with_strict_mode(false)
3480            .build()
3481            .unwrap_err()
3482            .to_string();
3483        assert!(
3484            err.contains("must have a default value"),
3485            "expected missing-default error, got: {err}"
3486        );
3487    }
3488
3489    #[test]
3490    fn test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
3491        let field = ArrowField::new("x", DataType::Int32, true);
3492        let injected_json =
3493            r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3494                .to_string();
3495        let mut md = HashMap::new();
3496        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
3497        md.insert("custom".to_string(), "123".to_string());
3498        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3499        let opts = AvroSchemaOptions {
3500            null_order: Some(Nullability::NullSecond),
3501            strip_metadata: false,
3502        };
3503        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3504        assert_eq!(
3505            out.json_string, injected_json,
3506            "When strip_metadata=false and avro.schema is present, return the embedded JSON verbatim"
3507        );
3508        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3509        assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
3510        assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
3511    }
3512
3513    #[test]
3514    fn test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough() {
3515        let field = ArrowField::new("x", DataType::Int32, true);
3516        let injected_json =
3517            r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3518                .to_string();
3519        let mut md = HashMap::new();
3520        md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
3521        md.insert("custom_meta".to_string(), "7".to_string());
3522        let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3523        let opts = AvroSchemaOptions {
3524            null_order: Some(Nullability::NullFirst),
3525            strip_metadata: true,
3526        };
3527        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3528        assert_json_contains(&out.json_string, "\"type\":\"record\"");
3529        assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
3530        assert_json_contains(&out.json_string, "\"custom_meta\":7");
3531    }
3532
3533    #[test]
3534    fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
3535        let field = ArrowField::new("s", DataType::Utf8, true);
3536        let arrow_schema = single_field_schema(field);
3537        let opts = AvroSchemaOptions {
3538            null_order: Some(Nullability::NullFirst),
3539            strip_metadata: true,
3540        };
3541        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3542        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3543        let arr = v["fields"][0]["type"]
3544            .as_array()
3545            .expect("nullable primitive should be Avro union array");
3546        assert_eq!(arr[0], Value::String("null".into()));
3547        assert_eq!(arr[1], Value::String("string".into()));
3548    }
3549
3550    #[test]
3551    fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
3552        let field = ArrowField::new("s", DataType::Utf8, true);
3553        let arrow_schema = single_field_schema(field);
3554        let opts = AvroSchemaOptions {
3555            null_order: Some(Nullability::NullSecond),
3556            strip_metadata: true,
3557        };
3558        let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3559        let v: Value = serde_json::from_str(&out.json_string).unwrap();
3560        let arr = v["fields"][0]["type"]
3561            .as_array()
3562            .expect("nullable primitive should be Avro union array");
3563        assert_eq!(arr[0], Value::String("string".into()));
3564        assert_eq!(arr[1], Value::String("null".into()));
3565    }
3566
3567    #[test]
3568    fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata() {
3569        let uf: UnionFields = vec![
3570            (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3571            (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
3572        ]
3573        .into_iter()
3574        .collect();
3575        let union_dt = DataType::Union(uf, UnionMode::Dense);
3576        let arrow_schema = single_field_schema(ArrowField::new("u", union_dt, true));
3577        let with_extras = AvroSchema::from_arrow_with_options(
3578            &arrow_schema,
3579            Some(AvroSchemaOptions {
3580                null_order: Some(Nullability::NullFirst),
3581                strip_metadata: false,
3582            }),
3583        )
3584        .unwrap();
3585        let v_with: Value = serde_json::from_str(&with_extras.json_string).unwrap();
3586        let union_arr = v_with["fields"][0]["type"].as_array().expect("union array");
3587        let first_obj = union_arr
3588            .iter()
3589            .find(|b| b.is_object())
3590            .expect("expected an object branch with extras");
3591        let obj = first_obj.as_object().unwrap();
3592        assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3593        assert_eq!(
3594            obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3595            Some("dense")
3596        );
3597        let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
3598            .as_array()
3599            .expect("arrowUnionTypeIds array")
3600            .iter()
3601            .map(|n| n.as_i64().expect("i64"))
3602            .collect();
3603        assert_eq!(type_ids, vec![2, 7]);
3604        let stripped = AvroSchema::from_arrow_with_options(
3605            &arrow_schema,
3606            Some(AvroSchemaOptions {
3607                null_order: Some(Nullability::NullFirst),
3608                strip_metadata: true,
3609            }),
3610        )
3611        .unwrap();
3612        let v_stripped: Value = serde_json::from_str(&stripped.json_string).unwrap();
3613        let union_arr2 = v_stripped["fields"][0]["type"]
3614            .as_array()
3615            .expect("union array");
3616        assert!(
3617            !union_arr2.iter().any(|b| b
3618                .as_object()
3619                .is_some_and(|m| m.contains_key("arrowUnionMode"))),
3620            "extras must be removed when strip_metadata=true"
3621        );
3622        assert_eq!(union_arr2[0], Value::String("null".into()));
3623        assert_eq!(union_arr2[1], Value::String("int".into()));
3624        assert_eq!(union_arr2[2], Value::String("string".into()));
3625    }
3626
3627    #[test]
3628    fn test_project_empty_projection() {
3629        let schema_json = r#"{
3630            "type": "record",
3631            "name": "Test",
3632            "fields": [
3633                {"name": "a", "type": "int"},
3634                {"name": "b", "type": "string"}
3635            ]
3636        }"#;
3637        let schema = AvroSchema::new(schema_json.to_string());
3638        let projected = schema.project(&[]).unwrap();
3639        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3640        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3641        assert!(
3642            fields.is_empty(),
3643            "Empty projection should yield empty fields"
3644        );
3645    }
3646
3647    #[test]
3648    fn test_project_single_field() {
3649        let schema_json = r#"{
3650            "type": "record",
3651            "name": "Test",
3652            "fields": [
3653                {"name": "a", "type": "int"},
3654                {"name": "b", "type": "string"},
3655                {"name": "c", "type": "long"}
3656            ]
3657        }"#;
3658        let schema = AvroSchema::new(schema_json.to_string());
3659        let projected = schema.project(&[1]).unwrap();
3660        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3661        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3662        assert_eq!(fields.len(), 1);
3663        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("b"));
3664    }
3665
3666    #[test]
3667    fn test_project_multiple_fields() {
3668        let schema_json = r#"{
3669            "type": "record",
3670            "name": "Test",
3671            "fields": [
3672                {"name": "a", "type": "int"},
3673                {"name": "b", "type": "string"},
3674                {"name": "c", "type": "long"},
3675                {"name": "d", "type": "boolean"}
3676            ]
3677        }"#;
3678        let schema = AvroSchema::new(schema_json.to_string());
3679        let projected = schema.project(&[0, 2, 3]).unwrap();
3680        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3681        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3682        assert_eq!(fields.len(), 3);
3683        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3684        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("c"));
3685        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("d"));
3686    }
3687
3688    #[test]
3689    fn test_project_all_fields() {
3690        let schema_json = r#"{
3691            "type": "record",
3692            "name": "Test",
3693            "fields": [
3694                {"name": "a", "type": "int"},
3695                {"name": "b", "type": "string"}
3696            ]
3697        }"#;
3698        let schema = AvroSchema::new(schema_json.to_string());
3699        let projected = schema.project(&[0, 1]).unwrap();
3700        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3701        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3702        assert_eq!(fields.len(), 2);
3703        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3704        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("b"));
3705    }
3706
3707    #[test]
3708    fn test_project_reorder_fields() {
3709        let schema_json = r#"{
3710            "type": "record",
3711            "name": "Test",
3712            "fields": [
3713                {"name": "a", "type": "int"},
3714                {"name": "b", "type": "string"},
3715                {"name": "c", "type": "long"}
3716            ]
3717        }"#;
3718        let schema = AvroSchema::new(schema_json.to_string());
3719        // Project in reverse order
3720        let projected = schema.project(&[2, 0, 1]).unwrap();
3721        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3722        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3723        assert_eq!(fields.len(), 3);
3724        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("c"));
3725        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("a"));
3726        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("b"));
3727    }
3728
3729    #[test]
3730    fn test_project_preserves_record_metadata() {
3731        let schema_json = r#"{
3732            "type": "record",
3733            "name": "MyRecord",
3734            "namespace": "com.example",
3735            "doc": "A test record",
3736            "aliases": ["OldRecord"],
3737            "fields": [
3738                {"name": "a", "type": "int"},
3739                {"name": "b", "type": "string"}
3740            ]
3741        }"#;
3742        let schema = AvroSchema::new(schema_json.to_string());
3743        let projected = schema.project(&[0]).unwrap();
3744        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3745        assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("MyRecord"));
3746        assert_eq!(
3747            v.get("namespace").and_then(|n| n.as_str()),
3748            Some("com.example")
3749        );
3750        assert_eq!(v.get("doc").and_then(|n| n.as_str()), Some("A test record"));
3751        assert!(v.get("aliases").is_some());
3752    }
3753
3754    #[test]
3755    fn test_project_preserves_field_metadata() {
3756        let schema_json = r#"{
3757            "type": "record",
3758            "name": "Test",
3759            "fields": [
3760                {"name": "a", "type": "int", "doc": "Field A", "default": 0},
3761                {"name": "b", "type": "string"}
3762            ]
3763        }"#;
3764        let schema = AvroSchema::new(schema_json.to_string());
3765        let projected = schema.project(&[0]).unwrap();
3766        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3767        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3768        assert_eq!(
3769            fields[0].get("doc").and_then(|d| d.as_str()),
3770            Some("Field A")
3771        );
3772        assert_eq!(fields[0].get("default").and_then(|d| d.as_i64()), Some(0));
3773    }
3774
3775    #[test]
3776    fn test_project_with_nested_record() {
3777        let schema_json = r#"{
3778            "type": "record",
3779            "name": "Outer",
3780            "fields": [
3781                {"name": "id", "type": "int"},
3782                {"name": "inner", "type": {
3783                    "type": "record",
3784                    "name": "Inner",
3785                    "fields": [
3786                        {"name": "x", "type": "int"},
3787                        {"name": "y", "type": "string"}
3788                    ]
3789                }},
3790                {"name": "value", "type": "double"}
3791            ]
3792        }"#;
3793        let schema = AvroSchema::new(schema_json.to_string());
3794        let projected = schema.project(&[1]).unwrap();
3795        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3796        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3797        assert_eq!(fields.len(), 1);
3798        assert_eq!(
3799            fields[0].get("name").and_then(|n| n.as_str()),
3800            Some("inner")
3801        );
3802        // Verify nested record structure is preserved
3803        let inner_type = fields[0].get("type").unwrap();
3804        assert_eq!(
3805            inner_type.get("type").and_then(|t| t.as_str()),
3806            Some("record")
3807        );
3808        assert_eq!(
3809            inner_type.get("name").and_then(|n| n.as_str()),
3810            Some("Inner")
3811        );
3812    }
3813
3814    #[test]
3815    fn test_project_with_complex_field_types() {
3816        let schema_json = r#"{
3817            "type": "record",
3818            "name": "Test",
3819            "fields": [
3820                {"name": "arr", "type": {"type": "array", "items": "int"}},
3821                {"name": "map", "type": {"type": "map", "values": "string"}},
3822                {"name": "union", "type": ["null", "int"]}
3823            ]
3824        }"#;
3825        let schema = AvroSchema::new(schema_json.to_string());
3826        let projected = schema.project(&[0, 2]).unwrap();
3827        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3828        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3829        assert_eq!(fields.len(), 2);
3830        // Verify array type is preserved
3831        let arr_type = fields[0].get("type").unwrap();
3832        assert_eq!(arr_type.get("type").and_then(|t| t.as_str()), Some("array"));
3833        // Verify union type is preserved
3834        let union_type = fields[1].get("type").unwrap();
3835        assert!(union_type.is_array());
3836    }
3837
3838    #[test]
3839    fn test_project_error_invalid_json() {
3840        let schema = AvroSchema::new("not valid json".to_string());
3841        let err = schema.project(&[0]).unwrap_err();
3842        let msg = err.to_string();
3843        assert!(
3844            msg.contains("Invalid Avro schema JSON"),
3845            "Expected parse error, got: {msg}"
3846        );
3847    }
3848
3849    #[test]
3850    fn test_project_error_not_object() {
3851        // Primitive type schema (not a JSON object)
3852        let schema = AvroSchema::new(r#""string""#.to_string());
3853        let err = schema.project(&[0]).unwrap_err();
3854        let msg = err.to_string();
3855        assert!(
3856            msg.contains("must be a JSON object"),
3857            "Expected object error, got: {msg}"
3858        );
3859    }
3860
3861    #[test]
3862    fn test_project_error_array_schema() {
3863        // Array (list) is a valid JSON but not a record
3864        let schema = AvroSchema::new(r#"["null", "int"]"#.to_string());
3865        let err = schema.project(&[0]).unwrap_err();
3866        let msg = err.to_string();
3867        assert!(
3868            msg.contains("must be a JSON object"),
3869            "Expected object error for array schema, got: {msg}"
3870        );
3871    }
3872
3873    #[test]
3874    fn test_project_error_type_not_record() {
3875        let schema_json = r#"{
3876            "type": "enum",
3877            "name": "Color",
3878            "symbols": ["RED", "GREEN", "BLUE"]
3879        }"#;
3880        let schema = AvroSchema::new(schema_json.to_string());
3881        let err = schema.project(&[0]).unwrap_err();
3882        let msg = err.to_string();
3883        assert!(
3884            msg.contains("must be an Avro record") && msg.contains("'enum'"),
3885            "Expected type mismatch error, got: {msg}"
3886        );
3887    }
3888
3889    #[test]
3890    fn test_project_error_type_array() {
3891        let schema_json = r#"{
3892            "type": "array",
3893            "items": "int"
3894        }"#;
3895        let schema = AvroSchema::new(schema_json.to_string());
3896        let err = schema.project(&[0]).unwrap_err();
3897        let msg = err.to_string();
3898        assert!(
3899            msg.contains("must be an Avro record") && msg.contains("'array'"),
3900            "Expected type mismatch error for array type, got: {msg}"
3901        );
3902    }
3903
3904    #[test]
3905    fn test_project_error_type_fixed() {
3906        let schema_json = r#"{
3907            "type": "fixed",
3908            "name": "MD5",
3909            "size": 16
3910        }"#;
3911        let schema = AvroSchema::new(schema_json.to_string());
3912        let err = schema.project(&[0]).unwrap_err();
3913        let msg = err.to_string();
3914        assert!(
3915            msg.contains("must be an Avro record") && msg.contains("'fixed'"),
3916            "Expected type mismatch error for fixed type, got: {msg}"
3917        );
3918    }
3919
3920    #[test]
3921    fn test_project_error_type_map() {
3922        let schema_json = r#"{
3923            "type": "map",
3924            "values": "string"
3925        }"#;
3926        let schema = AvroSchema::new(schema_json.to_string());
3927        let err = schema.project(&[0]).unwrap_err();
3928        let msg = err.to_string();
3929        assert!(
3930            msg.contains("must be an Avro record") && msg.contains("'map'"),
3931            "Expected type mismatch error for map type, got: {msg}"
3932        );
3933    }
3934
3935    #[test]
3936    fn test_project_error_missing_type_field() {
3937        let schema_json = r#"{
3938            "name": "Test",
3939            "fields": [{"name": "a", "type": "int"}]
3940        }"#;
3941        let schema = AvroSchema::new(schema_json.to_string());
3942        let err = schema.project(&[0]).unwrap_err();
3943        let msg = err.to_string();
3944        assert!(
3945            msg.contains("missing required 'type' field"),
3946            "Expected missing type error, got: {msg}"
3947        );
3948    }
3949
3950    #[test]
3951    fn test_project_error_missing_fields() {
3952        let schema_json = r#"{
3953            "type": "record",
3954            "name": "Test"
3955        }"#;
3956        let schema = AvroSchema::new(schema_json.to_string());
3957        let err = schema.project(&[0]).unwrap_err();
3958        let msg = err.to_string();
3959        assert!(
3960            msg.contains("missing required 'fields'"),
3961            "Expected missing fields error, got: {msg}"
3962        );
3963    }
3964
3965    #[test]
3966    fn test_project_error_fields_not_array() {
3967        let schema_json = r#"{
3968            "type": "record",
3969            "name": "Test",
3970            "fields": "not an array"
3971        }"#;
3972        let schema = AvroSchema::new(schema_json.to_string());
3973        let err = schema.project(&[0]).unwrap_err();
3974        let msg = err.to_string();
3975        assert!(
3976            msg.contains("'fields' must be an array"),
3977            "Expected fields array error, got: {msg}"
3978        );
3979    }
3980
3981    #[test]
3982    fn test_project_error_index_out_of_bounds() {
3983        let schema_json = r#"{
3984            "type": "record",
3985            "name": "Test",
3986            "fields": [
3987                {"name": "a", "type": "int"},
3988                {"name": "b", "type": "string"}
3989            ]
3990        }"#;
3991        let schema = AvroSchema::new(schema_json.to_string());
3992        let err = schema.project(&[5]).unwrap_err();
3993        let msg = err.to_string();
3994        assert!(
3995            msg.contains("out of bounds") && msg.contains("5") && msg.contains("2"),
3996            "Expected out of bounds error, got: {msg}"
3997        );
3998    }
3999
4000    #[test]
4001    fn test_project_error_index_out_of_bounds_edge() {
4002        let schema_json = r#"{
4003            "type": "record",
4004            "name": "Test",
4005            "fields": [
4006                {"name": "a", "type": "int"}
4007            ]
4008        }"#;
4009        let schema = AvroSchema::new(schema_json.to_string());
4010        // Index 1 is just out of bounds for a 1-element array
4011        let err = schema.project(&[1]).unwrap_err();
4012        let msg = err.to_string();
4013        assert!(
4014            msg.contains("out of bounds") && msg.contains("1"),
4015            "Expected out of bounds error for edge case, got: {msg}"
4016        );
4017    }
4018
4019    #[test]
4020    fn test_project_error_duplicate_index() {
4021        let schema_json = r#"{
4022            "type": "record",
4023            "name": "Test",
4024            "fields": [
4025                {"name": "a", "type": "int"},
4026                {"name": "b", "type": "string"},
4027                {"name": "c", "type": "long"}
4028            ]
4029        }"#;
4030        let schema = AvroSchema::new(schema_json.to_string());
4031        let err = schema.project(&[0, 1, 0]).unwrap_err();
4032        let msg = err.to_string();
4033        assert!(
4034            msg.contains("Duplicate projection index") && msg.contains("0"),
4035            "Expected duplicate index error, got: {msg}"
4036        );
4037    }
4038
4039    #[test]
4040    fn test_project_error_duplicate_index_consecutive() {
4041        let schema_json = r#"{
4042            "type": "record",
4043            "name": "Test",
4044            "fields": [
4045                {"name": "a", "type": "int"},
4046                {"name": "b", "type": "string"}
4047            ]
4048        }"#;
4049        let schema = AvroSchema::new(schema_json.to_string());
4050        let err = schema.project(&[1, 1]).unwrap_err();
4051        let msg = err.to_string();
4052        assert!(
4053            msg.contains("Duplicate projection index") && msg.contains("1"),
4054            "Expected duplicate index error for consecutive duplicates, got: {msg}"
4055        );
4056    }
4057
4058    #[test]
4059    fn test_project_with_empty_fields() {
4060        let schema_json = r#"{
4061            "type": "record",
4062            "name": "EmptyRecord",
4063            "fields": []
4064        }"#;
4065        let schema = AvroSchema::new(schema_json.to_string());
4066        // Projecting empty from empty should succeed
4067        let projected = schema.project(&[]).unwrap();
4068        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4069        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4070        assert!(fields.is_empty());
4071    }
4072
4073    #[test]
4074    fn test_project_empty_fields_index_out_of_bounds() {
4075        let schema_json = r#"{
4076            "type": "record",
4077            "name": "EmptyRecord",
4078            "fields": []
4079        }"#;
4080        let schema = AvroSchema::new(schema_json.to_string());
4081        let err = schema.project(&[0]).unwrap_err();
4082        let msg = err.to_string();
4083        assert!(
4084            msg.contains("out of bounds") && msg.contains("0 fields"),
4085            "Expected out of bounds error for empty record, got: {msg}"
4086        );
4087    }
4088
4089    #[test]
4090    fn test_project_result_is_valid_avro_schema() {
4091        let schema_json = r#"{
4092            "type": "record",
4093            "name": "Test",
4094            "namespace": "com.example",
4095            "fields": [
4096                {"name": "id", "type": "long"},
4097                {"name": "name", "type": "string"},
4098                {"name": "active", "type": "boolean"}
4099            ]
4100        }"#;
4101        let schema = AvroSchema::new(schema_json.to_string());
4102        let projected = schema.project(&[0, 2]).unwrap();
4103        // Verify the projected schema can be parsed as a valid Avro schema
4104        let parsed = projected.schema();
4105        assert!(parsed.is_ok(), "Projected schema should be valid Avro");
4106        match parsed.unwrap() {
4107            Schema::Complex(ComplexType::Record(r)) => {
4108                assert_eq!(r.name, "Test");
4109                assert_eq!(r.namespace, Some("com.example"));
4110                assert_eq!(r.fields.len(), 2);
4111                assert_eq!(r.fields[0].name, "id");
4112                assert_eq!(r.fields[1].name, "active");
4113            }
4114            _ => panic!("Expected Record schema"),
4115        }
4116    }
4117
4118    #[test]
4119    fn test_project_non_contiguous_indices() {
4120        let schema_json = r#"{
4121            "type": "record",
4122            "name": "Test",
4123            "fields": [
4124                {"name": "f0", "type": "int"},
4125                {"name": "f1", "type": "int"},
4126                {"name": "f2", "type": "int"},
4127                {"name": "f3", "type": "int"},
4128                {"name": "f4", "type": "int"}
4129            ]
4130        }"#;
4131        let schema = AvroSchema::new(schema_json.to_string());
4132        // Select every other field
4133        let projected = schema.project(&[0, 2, 4]).unwrap();
4134        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4135        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4136        assert_eq!(fields.len(), 3);
4137        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f0"));
4138        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("f2"));
4139        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("f4"));
4140    }
4141
4142    #[test]
4143    fn test_project_single_field_from_many() {
4144        let schema_json = r#"{
4145            "type": "record",
4146            "name": "BigRecord",
4147            "fields": [
4148                {"name": "f0", "type": "int"},
4149                {"name": "f1", "type": "int"},
4150                {"name": "f2", "type": "int"},
4151                {"name": "f3", "type": "int"},
4152                {"name": "f4", "type": "int"},
4153                {"name": "f5", "type": "int"},
4154                {"name": "f6", "type": "int"},
4155                {"name": "f7", "type": "int"},
4156                {"name": "f8", "type": "int"},
4157                {"name": "f9", "type": "int"}
4158            ]
4159        }"#;
4160        let schema = AvroSchema::new(schema_json.to_string());
4161        // Select only the last field
4162        let projected = schema.project(&[9]).unwrap();
4163        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4164        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4165        assert_eq!(fields.len(), 1);
4166        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f9"));
4167    }
4168}