1#[cfg(feature = "canonical_extension_types")]
21use arrow_schema::extension::ExtensionType;
22use arrow_schema::{
23 ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
24 UnionMode,
25};
26use serde::{Deserialize, Serialize};
27use serde_json::{Map as JsonMap, Value, json};
28#[cfg(feature = "sha256")]
29use sha2::{Digest, Sha256};
30use std::borrow::Cow;
31use std::cmp::PartialEq;
32use std::collections::hash_map::Entry;
33use std::collections::{HashMap, HashSet};
34use strum_macros::AsRefStr;
35
36pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
38
39pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
41
42pub const MAX_PREFIX_LEN: usize = 34;
45
46pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
48
49pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
51
52pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
54
55pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
57
58pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
60
61pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
63
64pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
66
67#[derive(Debug, Copy, Clone, PartialEq, Default)]
73pub(crate) enum Nullability {
74 #[default]
76 NullFirst,
77 NullSecond,
79}
80
81impl Nullability {
82 pub(crate) fn non_null_index(&self) -> usize {
84 match self {
85 Nullability::NullFirst => 1,
86 Nullability::NullSecond => 0,
87 }
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
95#[serde(untagged)]
96pub(crate) enum TypeName<'a> {
100 Primitive(PrimitiveType),
102 Ref(&'a str),
104}
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
110#[serde(rename_all = "camelCase")]
111#[strum(serialize_all = "lowercase")]
112pub(crate) enum PrimitiveType {
113 Null,
115 Boolean,
117 Int,
119 Long,
121 Float,
123 Double,
125 Bytes,
127 String,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
135#[serde(rename_all = "camelCase")]
136pub(crate) struct Attributes<'a> {
137 #[serde(default)]
141 pub(crate) logical_type: Option<&'a str>,
142
143 #[serde(flatten)]
145 pub(crate) additional: HashMap<&'a str, Value>,
146}
147
148impl Attributes<'_> {
149 pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
151 self.additional
152 .iter()
153 .map(|(k, v)| (k.to_string(), v.to_string()))
154 .collect()
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
160#[serde(rename_all = "camelCase")]
161pub(crate) struct Type<'a> {
162 #[serde(borrow)]
164 pub(crate) r#type: TypeName<'a>,
165 #[serde(flatten)]
167 pub(crate) attributes: Attributes<'a>,
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
175#[serde(untagged)]
176pub(crate) enum Schema<'a> {
177 #[serde(borrow)]
179 TypeName(TypeName<'a>),
180 #[serde(borrow)]
182 Union(Vec<Schema<'a>>),
183 #[serde(borrow)]
185 Complex(ComplexType<'a>),
186 #[serde(borrow)]
188 Type(Type<'a>),
189}
190
191#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
195#[serde(tag = "type", rename_all = "camelCase")]
196pub(crate) enum ComplexType<'a> {
197 #[serde(borrow)]
199 Record(Record<'a>),
200 #[serde(borrow)]
202 Enum(Enum<'a>),
203 #[serde(borrow)]
205 Array(Array<'a>),
206 #[serde(borrow)]
208 Map(Map<'a>),
209 #[serde(borrow)]
211 Fixed(Fixed<'a>),
212}
213
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
218pub(crate) struct Record<'a> {
219 #[serde(borrow)]
221 pub(crate) name: &'a str,
222 #[serde(borrow, default)]
224 pub(crate) namespace: Option<&'a str>,
225 #[serde(borrow, default)]
227 pub(crate) doc: Option<Cow<'a, str>>,
228 #[serde(borrow, default)]
230 pub(crate) aliases: Vec<&'a str>,
231 #[serde(borrow)]
233 pub(crate) fields: Vec<Field<'a>>,
234 #[serde(flatten)]
236 pub(crate) attributes: Attributes<'a>,
237}
238
239fn deserialize_default<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
240where
241 D: serde::Deserializer<'de>,
242{
243 Value::deserialize(deserializer).map(Some)
244}
245
246#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
248pub(crate) struct Field<'a> {
249 #[serde(borrow)]
251 pub(crate) name: &'a str,
252 #[serde(borrow, default)]
254 pub(crate) doc: Option<Cow<'a, str>>,
255 #[serde(borrow)]
257 pub(crate) r#type: Schema<'a>,
258 #[serde(deserialize_with = "deserialize_default", default)]
260 pub(crate) default: Option<Value>,
261 #[serde(borrow, default)]
264 pub(crate) aliases: Vec<&'a str>,
265}
266
267#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
271pub(crate) struct Enum<'a> {
272 #[serde(borrow)]
274 pub(crate) name: &'a str,
275 #[serde(borrow, default)]
277 pub(crate) namespace: Option<&'a str>,
278 #[serde(borrow, default)]
280 pub(crate) doc: Option<Cow<'a, str>>,
281 #[serde(borrow, default)]
283 pub(crate) aliases: Vec<&'a str>,
284 #[serde(borrow)]
286 pub(crate) symbols: Vec<&'a str>,
287 #[serde(borrow, default)]
289 pub(crate) default: Option<&'a str>,
290 #[serde(flatten)]
292 pub(crate) attributes: Attributes<'a>,
293}
294
295#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
299pub(crate) struct Array<'a> {
300 #[serde(borrow)]
302 pub(crate) items: Box<Schema<'a>>,
303 #[serde(flatten)]
305 pub(crate) attributes: Attributes<'a>,
306}
307
308#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
312pub(crate) struct Map<'a> {
313 #[serde(borrow)]
315 pub(crate) values: Box<Schema<'a>>,
316 #[serde(flatten)]
318 pub(crate) attributes: Attributes<'a>,
319}
320
321#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
325pub(crate) struct Fixed<'a> {
326 #[serde(borrow)]
328 pub(crate) name: &'a str,
329 #[serde(borrow, default)]
331 pub(crate) namespace: Option<&'a str>,
332 #[serde(borrow, default)]
334 pub(crate) aliases: Vec<&'a str>,
335 pub(crate) size: usize,
337 #[serde(flatten)]
339 pub(crate) attributes: Attributes<'a>,
340}
341
342#[derive(Debug, Copy, Clone, PartialEq, Default)]
343pub(crate) struct AvroSchemaOptions {
344 pub(crate) null_order: Option<Nullability>,
345 pub(crate) strip_metadata: bool,
346}
347
348#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
350pub struct AvroSchema {
351 pub json_string: String,
353}
354
355impl TryFrom<&ArrowSchema> for AvroSchema {
356 type Error = ArrowError;
357
358 fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
362 AvroSchema::from_arrow_with_options(schema, None)
363 }
364}
365
366impl AvroSchema {
367 pub fn new(json_string: String) -> Self {
369 Self { json_string }
370 }
371
372 pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
373 serde_json::from_str(self.json_string.as_str())
374 .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
375 }
376
377 pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
405 Self::generate_fingerprint(&self.schema()?, hash_type)
406 }
407
408 pub(crate) fn project(&self, projection: &[usize]) -> Result<Self, ArrowError> {
409 let mut value: Value = serde_json::from_str(&self.json_string)
410 .map_err(|e| ArrowError::AvroError(format!("Invalid Avro schema JSON: {e}")))?;
411 let obj = value.as_object_mut().ok_or_else(|| {
412 ArrowError::AvroError(
413 "Projected schema must be a JSON object Avro record schema".to_string(),
414 )
415 })?;
416 match obj.get("type").and_then(|v| v.as_str()) {
417 Some("record") => {}
418 Some(other) => {
419 return Err(ArrowError::AvroError(format!(
420 "Projected schema must be an Avro record, found type '{other}'"
421 )));
422 }
423 None => {
424 return Err(ArrowError::AvroError(
425 "Projected schema missing required 'type' field".to_string(),
426 ));
427 }
428 }
429 let fields_val = obj.get_mut("fields").ok_or_else(|| {
430 ArrowError::AvroError("Avro record schema missing required 'fields'".to_string())
431 })?;
432 let projected_fields = {
433 let mut original_fields = match fields_val {
434 Value::Array(arr) => std::mem::take(arr),
435 _ => {
436 return Err(ArrowError::AvroError(
437 "Avro record schema 'fields' must be an array".to_string(),
438 ));
439 }
440 };
441 let len = original_fields.len();
442 let mut seen: HashSet<usize> = HashSet::with_capacity(projection.len());
443 let mut out: Vec<Value> = Vec::with_capacity(projection.len());
444 for &i in projection {
445 if i >= len {
446 return Err(ArrowError::AvroError(format!(
447 "Projection index {i} out of bounds for record with {len} fields"
448 )));
449 }
450 if !seen.insert(i) {
451 return Err(ArrowError::AvroError(format!(
452 "Duplicate projection index {i}"
453 )));
454 }
455 out.push(std::mem::replace(&mut original_fields[i], Value::Null));
456 }
457 out
458 };
459 *fields_val = Value::Array(projected_fields);
460 let json_string = serde_json::to_string(&value).map_err(|e| {
461 ArrowError::AvroError(format!(
462 "Failed to serialize projected Avro schema JSON: {e}"
463 ))
464 })?;
465 Ok(Self::new(json_string))
466 }
467
468 pub(crate) fn generate_fingerprint(
469 schema: &Schema,
470 hash_type: FingerprintAlgorithm,
471 ) -> Result<Fingerprint, ArrowError> {
472 let canonical = Self::generate_canonical_form(schema).map_err(|e| {
473 ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
474 })?;
475 match hash_type {
476 FingerprintAlgorithm::Rabin => {
477 Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
478 }
479 FingerprintAlgorithm::Id | FingerprintAlgorithm::Id64 => Err(ArrowError::SchemaError(
480 "FingerprintAlgorithm of Id or Id64 cannot be used to generate a fingerprint; \
481 if using Fingerprint::Id, pass the registry ID in instead using the set method."
482 .to_string(),
483 )),
484 #[cfg(feature = "md5")]
485 FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
486 #[cfg(feature = "sha256")]
487 FingerprintAlgorithm::SHA256 => {
488 Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
489 }
490 }
491 }
492
493 pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
504 build_canonical(schema, None)
505 }
506
507 pub(crate) fn from_arrow_with_options(
514 schema: &ArrowSchema,
515 options: Option<AvroSchemaOptions>,
516 ) -> Result<AvroSchema, ArrowError> {
517 let opts = options.unwrap_or_default();
518 let order = opts.null_order.unwrap_or_default();
519 let strip = opts.strip_metadata;
520 if !strip {
521 if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
522 return Ok(AvroSchema::new(json.clone()));
523 }
524 }
525 let mut name_gen = NameGenerator::default();
526 let fields_json = schema
527 .fields()
528 .iter()
529 .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
530 .collect::<Result<Vec<_>, _>>()?;
531 let record_name = schema
532 .metadata
533 .get(AVRO_NAME_METADATA_KEY)
534 .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
535 let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
536 record.insert("type".into(), Value::String("record".into()));
537 record.insert(
538 "name".into(),
539 Value::String(sanitise_avro_name(record_name)),
540 );
541 if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
542 record.insert("namespace".into(), Value::String(ns.clone()));
543 }
544 if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
545 record.insert("doc".into(), Value::String(doc.clone()));
546 }
547 record.insert("fields".into(), Value::Array(fields_json));
548 extend_with_passthrough_metadata(&mut record, &schema.metadata);
549 let json_string = serde_json::to_string(&Value::Object(record))
550 .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
551 Ok(AvroSchema::new(json_string))
552 }
553}
554
555#[derive(Debug, Copy, Clone)]
557pub(crate) struct Prefix {
558 buf: [u8; MAX_PREFIX_LEN],
559 len: u8,
560}
561
562impl Prefix {
563 #[inline]
564 pub(crate) fn as_slice(&self) -> &[u8] {
565 &self.buf[..self.len as usize]
566 }
567}
568
569#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
571pub enum FingerprintStrategy {
572 #[default]
574 Rabin,
575 Id(u32),
577 Id64(u64),
579 #[cfg(feature = "md5")]
580 MD5,
582 #[cfg(feature = "sha256")]
583 SHA256,
585}
586
587impl From<Fingerprint> for FingerprintStrategy {
588 fn from(f: Fingerprint) -> Self {
589 Self::from(&f)
590 }
591}
592
593impl From<FingerprintAlgorithm> for FingerprintStrategy {
594 fn from(f: FingerprintAlgorithm) -> Self {
595 match f {
596 FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
597 FingerprintAlgorithm::Id => FingerprintStrategy::Id(0),
598 FingerprintAlgorithm::Id64 => FingerprintStrategy::Id64(0),
599 #[cfg(feature = "md5")]
600 FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
601 #[cfg(feature = "sha256")]
602 FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
603 }
604 }
605}
606
607impl From<&Fingerprint> for FingerprintStrategy {
608 fn from(f: &Fingerprint) -> Self {
609 match f {
610 Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
611 Fingerprint::Id(_) => FingerprintStrategy::Id(0),
612 Fingerprint::Id64(_) => FingerprintStrategy::Id64(0),
613 #[cfg(feature = "md5")]
614 Fingerprint::MD5(_) => FingerprintStrategy::MD5,
615 #[cfg(feature = "sha256")]
616 Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
617 }
618 }
619}
620
621#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
624pub enum FingerprintAlgorithm {
625 #[default]
627 Rabin,
628 Id,
630 Id64,
632 #[cfg(feature = "md5")]
633 MD5,
635 #[cfg(feature = "sha256")]
636 SHA256,
638}
639
640impl From<&Fingerprint> for FingerprintAlgorithm {
642 fn from(fp: &Fingerprint) -> Self {
643 match fp {
644 Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
645 Fingerprint::Id(_) => FingerprintAlgorithm::Id,
646 Fingerprint::Id64(_) => FingerprintAlgorithm::Id64,
647 #[cfg(feature = "md5")]
648 Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
649 #[cfg(feature = "sha256")]
650 Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
651 }
652 }
653}
654
655impl From<FingerprintStrategy> for FingerprintAlgorithm {
656 fn from(s: FingerprintStrategy) -> Self {
657 Self::from(&s)
658 }
659}
660
661impl From<&FingerprintStrategy> for FingerprintAlgorithm {
662 fn from(s: &FingerprintStrategy) -> Self {
663 match s {
664 FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
665 FingerprintStrategy::Id(_) => FingerprintAlgorithm::Id,
666 FingerprintStrategy::Id64(_) => FingerprintAlgorithm::Id64,
667 #[cfg(feature = "md5")]
668 FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
669 #[cfg(feature = "sha256")]
670 FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
671 }
672 }
673}
674
675#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
684pub enum Fingerprint {
685 Rabin(u64),
687 Id(u32),
689 Id64(u64),
691 #[cfg(feature = "md5")]
692 MD5([u8; 16]),
694 #[cfg(feature = "sha256")]
695 SHA256([u8; 32]),
697}
698
699impl From<FingerprintStrategy> for Fingerprint {
700 fn from(s: FingerprintStrategy) -> Self {
701 Self::from(&s)
702 }
703}
704
705impl From<&FingerprintStrategy> for Fingerprint {
706 fn from(s: &FingerprintStrategy) -> Self {
707 match s {
708 FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
709 FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
710 FingerprintStrategy::Id64(id) => Fingerprint::Id64(*id),
711 #[cfg(feature = "md5")]
712 FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
713 #[cfg(feature = "sha256")]
714 FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
715 }
716 }
717}
718
719impl From<FingerprintAlgorithm> for Fingerprint {
720 fn from(s: FingerprintAlgorithm) -> Self {
721 match s {
722 FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
723 FingerprintAlgorithm::Id => Fingerprint::Id(0),
724 FingerprintAlgorithm::Id64 => Fingerprint::Id64(0),
725 #[cfg(feature = "md5")]
726 FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
727 #[cfg(feature = "sha256")]
728 FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
729 }
730 }
731}
732
733impl Fingerprint {
734 pub fn load_fingerprint_id(id: u32) -> Self {
742 Fingerprint::Id(u32::from_be(id))
743 }
744
745 pub fn load_fingerprint_id64(id: u64) -> Self {
753 Fingerprint::Id64(u64::from_be(id))
754 }
755
756 pub(crate) fn make_prefix(&self) -> Prefix {
780 let mut buf = [0u8; MAX_PREFIX_LEN];
781 let len = match self {
782 Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
783 Self::Id64(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
784 Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
785 #[cfg(feature = "md5")]
786 Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
787 #[cfg(feature = "sha256")]
788 Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
789 };
790 Prefix { buf, len }
791 }
792}
793
794fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
795 buf: &mut [u8; MAX_PREFIX_LEN],
796 magic: &[u8; MAGIC_LEN],
797 payload: &[u8; PAYLOAD_LEN],
798) -> u8 {
799 debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
800 let total = MAGIC_LEN + PAYLOAD_LEN;
801 let prefix_slice = &mut buf[..total];
802 prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
803 prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
804 total as u8
805}
806
807#[derive(Debug, Clone, Default)]
834pub struct SchemaStore {
835 fingerprint_algorithm: FingerprintAlgorithm,
837 schemas: HashMap<Fingerprint, AvroSchema>,
839}
840
841impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
842 type Error = ArrowError;
843
844 fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
847 Ok(Self {
848 schemas,
849 ..Self::default()
850 })
851 }
852}
853
854impl SchemaStore {
855 pub fn new() -> Self {
857 Self::default()
858 }
859
860 pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
862 Self {
863 fingerprint_algorithm,
864 ..Self::default()
865 }
866 }
867
868 pub fn set(
885 &mut self,
886 fingerprint: Fingerprint,
887 schema: AvroSchema,
888 ) -> Result<Fingerprint, ArrowError> {
889 match self.schemas.entry(fingerprint) {
890 Entry::Occupied(entry) => {
891 if entry.get() != &schema {
892 return Err(ArrowError::ComputeError(format!(
893 "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
894 )));
895 }
896 }
897 Entry::Vacant(entry) => {
898 entry.insert(schema);
899 }
900 }
901 Ok(fingerprint)
902 }
903
904 pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
922 if self.fingerprint_algorithm == FingerprintAlgorithm::Id
923 || self.fingerprint_algorithm == FingerprintAlgorithm::Id64
924 {
925 return Err(ArrowError::SchemaError(
926 "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
927 Use the set method directly instead, providing a valid fingerprint"
928 .to_string(),
929 ));
930 }
931 let fingerprint =
932 AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
933 self.set(fingerprint, schema)?;
934 Ok(fingerprint)
935 }
936
937 pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
947 self.schemas.get(fingerprint)
948 }
949
950 pub fn fingerprints(&self) -> Vec<Fingerprint> {
956 self.schemas.keys().copied().collect()
957 }
958
959 pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
961 self.fingerprint_algorithm
962 }
963}
964
965fn quote(s: &str) -> Result<String, ArrowError> {
966 serde_json::to_string(s)
967 .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
968}
969
970pub(crate) fn make_full_name(
987 name: &str,
988 namespace_attr: Option<&str>,
989 enclosing_ns: Option<&str>,
990) -> (String, Option<String>) {
991 if let Some((ns, _)) = name.rsplit_once('.') {
993 return (name.to_string(), Some(ns.to_string()));
994 }
995 match namespace_attr.or(enclosing_ns) {
996 Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
997 None => (name.to_string(), None),
998 }
999}
1000
1001fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
1002 Ok(match schema {
1003 Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
1004 TypeName::Primitive(pt) => quote(pt.as_ref())?,
1005 TypeName::Ref(name) => {
1006 let (full_name, _) = make_full_name(name, None, enclosing_ns);
1007 quote(&full_name)?
1008 }
1009 },
1010 Schema::Union(branches) => format!(
1011 "[{}]",
1012 branches
1013 .iter()
1014 .map(|b| build_canonical(b, enclosing_ns))
1015 .collect::<Result<Vec<_>, _>>()?
1016 .join(",")
1017 ),
1018 Schema::Complex(ct) => match ct {
1019 ComplexType::Record(r) => {
1020 let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
1021 let fields = r
1022 .fields
1023 .iter()
1024 .map(|f| {
1025 let field_type =
1030 build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
1031 Ok(format!(
1032 r#"{{"name":{},"type":{}}}"#,
1033 quote(f.name)?,
1034 field_type
1035 ))
1036 })
1037 .collect::<Result<Vec<_>, ArrowError>>()?
1038 .join(",");
1039 format!(
1040 r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
1041 quote(&full_name)?,
1042 )
1043 }
1044 ComplexType::Enum(e) => {
1045 let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
1046 let symbols = e
1047 .symbols
1048 .iter()
1049 .map(|s| quote(s))
1050 .collect::<Result<Vec<_>, _>>()?
1051 .join(",");
1052 format!(
1053 r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
1054 quote(&full_name)?
1055 )
1056 }
1057 ComplexType::Array(arr) => format!(
1058 r#"{{"type":"array","items":{}}}"#,
1059 build_canonical(&arr.items, enclosing_ns)?
1060 ),
1061 ComplexType::Map(map) => format!(
1062 r#"{{"type":"map","values":{}}}"#,
1063 build_canonical(&map.values, enclosing_ns)?
1064 ),
1065 ComplexType::Fixed(f) => {
1066 let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
1067 format!(
1068 r#"{{"name":{},"type":"fixed","size":{}}}"#,
1069 quote(&full_name)?,
1070 f.size
1071 )
1072 }
1073 },
1074 })
1075}
1076
1077const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
1079
1080const fn one_entry(i: usize) -> u64 {
1087 let mut fp = i as u64;
1088 let mut j = 0;
1089 while j < 8 {
1090 fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
1091 j += 1;
1092 }
1093 fp
1094}
1095
1096const fn build_table() -> [u64; 256] {
1103 let mut table = [0u64; 256];
1104 let mut i = 0;
1105 while i < 256 {
1106 table[i] = one_entry(i);
1107 i += 1;
1108 }
1109 table
1110}
1111
1112static FINGERPRINT_TABLE: [u64; 256] = build_table();
1114
1115pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1118 let mut fp = EMPTY;
1119 for &byte in canonical_form.as_bytes() {
1120 let idx = ((fp as u8) ^ byte) as usize;
1121 fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1122 }
1123 fp
1124}
1125
1126#[cfg(feature = "md5")]
1127#[inline]
1132pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1133 let digest = md5::compute(canonical_form.as_bytes());
1134 digest.0
1135}
1136
1137#[cfg(feature = "sha256")]
1138#[inline]
1142pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1143 let mut hasher = Sha256::new();
1144 hasher.update(canonical_form.as_bytes());
1145 let digest = hasher.finalize();
1146 digest.into()
1147}
1148
1149#[inline]
1150fn is_internal_arrow_key(key: &str) -> bool {
1151 key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1152}
1153
1154fn extend_with_passthrough_metadata(
1159 target: &mut JsonMap<String, Value>,
1160 metadata: &HashMap<String, String>,
1161) {
1162 for (meta_key, meta_val) in metadata {
1163 if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1164 continue;
1165 }
1166 let json_val =
1167 serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1168 target.insert(meta_key.clone(), json_val);
1169 }
1170}
1171
1172fn sanitise_avro_name(base_name: &str) -> String {
1174 if base_name.is_empty() {
1175 return "_".to_owned();
1176 }
1177 let mut out: String = base_name
1178 .chars()
1179 .map(|char| {
1180 if char.is_ascii_alphanumeric() || char == '_' {
1181 char
1182 } else {
1183 '_'
1184 }
1185 })
1186 .collect();
1187 if out.as_bytes()[0].is_ascii_digit() {
1188 out.insert(0, '_');
1189 }
1190 out
1191}
1192
1193#[derive(Default)]
1194struct NameGenerator {
1195 used: HashSet<String>,
1196 counters: HashMap<String, usize>,
1197}
1198
1199impl NameGenerator {
1200 fn make_unique(&mut self, field_name: &str) -> String {
1201 let field_name = sanitise_avro_name(field_name);
1202 if self.used.insert(field_name.clone()) {
1203 self.counters.insert(field_name.clone(), 1);
1204 return field_name;
1205 }
1206 let counter = self.counters.entry(field_name.clone()).or_insert(1);
1207 loop {
1208 let candidate = format!("{field_name}_{}", *counter);
1209 if self.used.insert(candidate.clone()) {
1210 return candidate;
1211 }
1212 *counter += 1;
1213 }
1214 }
1215}
1216
1217fn merge_extras(schema: Value, extras: JsonMap<String, Value>) -> Value {
1218 if extras.is_empty() {
1219 return schema;
1220 }
1221 match schema {
1222 Value::Object(mut map) => {
1223 map.extend(extras);
1224 Value::Object(map)
1225 }
1226 Value::Array(mut union) => {
1227 if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1230 let original = std::mem::take(non_null);
1231 *non_null = merge_extras(original, extras);
1232 }
1233 Value::Array(union)
1234 }
1235 primitive => {
1236 let mut map = JsonMap::with_capacity(extras.len() + 1);
1237 map.insert("type".into(), primitive);
1238 map.extend(extras);
1239 Value::Object(map)
1240 }
1241 }
1242}
1243
1244#[inline]
1245fn is_avro_json_null(v: &Value) -> bool {
1246 matches!(v, Value::String(s) if s == "null")
1247}
1248
1249fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1250 let null = Value::String("null".into());
1251 match inner {
1252 Value::Array(mut union) => {
1253 if union.iter().any(is_avro_json_null) {
1258 return Value::Array(union);
1259 }
1260 match null_order {
1262 Nullability::NullFirst => union.insert(0, null),
1263 Nullability::NullSecond => union.push(null),
1264 }
1265 Value::Array(union)
1266 }
1267 other => match null_order {
1268 Nullability::NullFirst => Value::Array(vec![null, other]),
1269 Nullability::NullSecond => Value::Array(vec![other, null]),
1270 },
1271 }
1272}
1273
1274fn min_fixed_bytes_for_precision(p: usize) -> usize {
1275 const MAX_P: [usize; 32] = [
1278 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1279 59, 62, 64, 67, 69, 71, 74, 76,
1280 ];
1281 for (i, &max_p) in MAX_P.iter().enumerate() {
1282 if p <= max_p {
1283 return i + 1;
1284 }
1285 }
1286 32 }
1288
1289fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1290 match branch {
1291 Value::String(t) => Ok(format!("P:{t}")),
1292 Value::Object(map) => {
1293 let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1294 ArrowError::SchemaError("Union branch object missing string 'type'".into())
1295 })?;
1296 match t {
1297 "record" | "enum" | "fixed" => {
1298 let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1299 ArrowError::SchemaError(format!(
1300 "Union branch '{t}' missing required 'name'"
1301 ))
1302 })?;
1303 Ok(format!("N:{t}:{name}"))
1304 }
1305 "array" | "map" => Ok(format!("C:{t}")),
1306 other => Ok(format!("P:{other}")),
1307 }
1308 }
1309 Value::Array(_) => Err(ArrowError::SchemaError(
1310 "Avro union may not immediately contain another union".into(),
1311 )),
1312 _ => Err(ArrowError::SchemaError(
1313 "Invalid JSON for Avro union branch".into(),
1314 )),
1315 }
1316}
1317
1318fn datatype_to_avro(
1319 dt: &DataType,
1320 field_name: &str,
1321 metadata: &HashMap<String, String>,
1322 name_gen: &mut NameGenerator,
1323 null_order: Nullability,
1324 strip: bool,
1325) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1326 let mut extras = JsonMap::new();
1327 let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1328 if *scale < 0 {
1329 return Err(ArrowError::SchemaError(format!(
1330 "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1331 )));
1332 }
1333 if (*scale as usize) > (*precision as usize) {
1334 return Err(ArrowError::SchemaError(format!(
1335 "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1336 must be <= precision ({precision})"
1337 )));
1338 }
1339 let mut meta = JsonMap::from_iter([
1340 ("logicalType".into(), json!("decimal")),
1341 ("precision".into(), json!(*precision)),
1342 ("scale".into(), json!(*scale)),
1343 ]);
1344 let mut fixed_size = metadata.get("size").and_then(|v| v.parse::<usize>().ok());
1345 let carries_name = metadata.contains_key(AVRO_NAME_METADATA_KEY)
1346 || metadata.contains_key(AVRO_NAMESPACE_METADATA_KEY);
1347 if fixed_size.is_none() && carries_name {
1348 fixed_size = Some(min_fixed_bytes_for_precision(*precision as usize));
1349 }
1350 if let Some(size) = fixed_size {
1351 meta.insert("type".into(), json!("fixed"));
1352 meta.insert("size".into(), json!(size));
1353 let chosen_name = metadata
1354 .get(AVRO_NAME_METADATA_KEY)
1355 .map(|s| sanitise_avro_name(s))
1356 .unwrap_or_else(|| name_gen.make_unique(field_name));
1357 meta.insert("name".into(), json!(chosen_name));
1358 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1359 meta.insert("namespace".into(), json!(ns));
1360 }
1361 } else {
1362 meta.insert("type".into(), json!("bytes"));
1364 }
1365 Ok(Value::Object(meta))
1366 };
1367 let val = match dt {
1368 DataType::Null => Value::String("null".into()),
1369 DataType::Boolean => Value::String("boolean".into()),
1370 #[cfg(not(feature = "avro_custom_types"))]
1371 DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 => {
1372 Value::String("int".into())
1373 }
1374 DataType::Int32 => Value::String("int".into()),
1375 #[cfg(feature = "avro_custom_types")]
1376 DataType::Int8 => json!({ "type": "int", "logicalType": "arrow.int8" }),
1377 #[cfg(feature = "avro_custom_types")]
1378 DataType::Int16 => json!({ "type": "int", "logicalType": "arrow.int16" }),
1379 #[cfg(feature = "avro_custom_types")]
1380 DataType::UInt8 => json!({ "type": "int", "logicalType": "arrow.uint8" }),
1381 #[cfg(feature = "avro_custom_types")]
1382 DataType::UInt16 => json!({ "type": "int", "logicalType": "arrow.uint16" }),
1383 #[cfg(not(feature = "avro_custom_types"))]
1384 DataType::UInt32 => Value::String("long".into()),
1385 #[cfg(feature = "avro_custom_types")]
1386 DataType::UInt32 => json!({ "type": "long", "logicalType": "arrow.uint32" }),
1387 DataType::Int64 => Value::String("long".into()),
1388 #[cfg(not(feature = "avro_custom_types"))]
1389 DataType::UInt64 => Value::String("long".into()),
1390 #[cfg(feature = "avro_custom_types")]
1391 DataType::UInt64 => {
1392 let chosen_name = metadata
1394 .get(AVRO_NAME_METADATA_KEY)
1395 .map(|s| sanitise_avro_name(s))
1396 .unwrap_or_else(|| name_gen.make_unique(field_name));
1397 let mut obj = JsonMap::from_iter([
1398 ("type".into(), json!("fixed")),
1399 ("name".into(), json!(chosen_name)),
1400 ("size".into(), json!(8)),
1401 ("logicalType".into(), json!("arrow.uint64")),
1402 ]);
1403 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1404 obj.insert("namespace".into(), json!(ns));
1405 }
1406 json!(obj)
1407 }
1408 #[cfg(not(feature = "avro_custom_types"))]
1409 DataType::Float16 => Value::String("float".into()),
1410 #[cfg(feature = "avro_custom_types")]
1411 DataType::Float16 => {
1412 let chosen_name = metadata
1414 .get(AVRO_NAME_METADATA_KEY)
1415 .map(|s| sanitise_avro_name(s))
1416 .unwrap_or_else(|| name_gen.make_unique(field_name));
1417 let mut obj = JsonMap::from_iter([
1418 ("type".into(), json!("fixed")),
1419 ("name".into(), json!(chosen_name)),
1420 ("size".into(), json!(2)),
1421 ("logicalType".into(), json!("arrow.float16")),
1422 ]);
1423 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1424 obj.insert("namespace".into(), json!(ns));
1425 }
1426 json!(obj)
1427 }
1428 DataType::Float32 => Value::String("float".into()),
1429 DataType::Float64 => Value::String("double".into()),
1430 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1431 DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1432 DataType::BinaryView => {
1433 if !strip {
1434 extras.insert("arrowBinaryView".into(), Value::Bool(true));
1435 }
1436 Value::String("bytes".into())
1437 }
1438 DataType::FixedSizeBinary(len) => {
1439 let md_is_uuid = metadata
1440 .get("logicalType")
1441 .map(|s| s.trim_matches('"') == "uuid")
1442 .unwrap_or(false);
1443 #[cfg(feature = "canonical_extension_types")]
1444 let ext_is_uuid = metadata
1445 .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
1446 .map(|v| v == arrow_schema::extension::Uuid::NAME || v == "uuid")
1447 .unwrap_or(false);
1448 #[cfg(not(feature = "canonical_extension_types"))]
1449 let ext_is_uuid = false;
1450 let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
1451 if is_uuid {
1452 json!({ "type": "string", "logicalType": "uuid" })
1453 } else {
1454 let chosen_name = metadata
1455 .get(AVRO_NAME_METADATA_KEY)
1456 .map(|s| sanitise_avro_name(s))
1457 .unwrap_or_else(|| name_gen.make_unique(field_name));
1458 let mut obj = JsonMap::from_iter([
1459 ("type".into(), json!("fixed")),
1460 ("name".into(), json!(chosen_name)),
1461 ("size".into(), json!(len)),
1462 ]);
1463 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1464 obj.insert("namespace".into(), json!(ns));
1465 }
1466 Value::Object(obj)
1467 }
1468 }
1469 #[cfg(feature = "small_decimals")]
1470 DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1471 handle_decimal(precision, scale)?
1472 }
1473 DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1474 handle_decimal(precision, scale)?
1475 }
1476 DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1477 #[cfg(not(feature = "avro_custom_types"))]
1478 DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1479 #[cfg(feature = "avro_custom_types")]
1480 DataType::Date64 => json!({ "type": "long", "logicalType": "arrow.date64" }),
1481 DataType::Time32(unit) => match unit {
1482 TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1483 #[cfg(not(feature = "avro_custom_types"))]
1484 TimeUnit::Second => {
1485 if !strip {
1487 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1488 }
1489 json!({ "type": "int", "logicalType": "time-millis" })
1490 }
1491 #[cfg(feature = "avro_custom_types")]
1492 TimeUnit::Second => {
1493 json!({ "type": "int", "logicalType": "arrow.time32-second" })
1494 }
1495 _ => Value::String("int".into()),
1496 },
1497 DataType::Time64(unit) => match unit {
1498 TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1499 #[cfg(not(feature = "avro_custom_types"))]
1500 TimeUnit::Nanosecond => {
1501 if !strip {
1503 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1504 }
1505 json!({ "type": "long", "logicalType": "time-micros" })
1506 }
1507 #[cfg(feature = "avro_custom_types")]
1508 TimeUnit::Nanosecond => {
1509 json!({ "type": "long", "logicalType": "arrow.time64-nanosecond" })
1510 }
1511 _ => Value::String("long".into()),
1512 },
1513 DataType::Timestamp(unit, tz) => {
1514 #[cfg(feature = "avro_custom_types")]
1515 if matches!(unit, TimeUnit::Second) {
1516 let logical_type = if tz.is_some() {
1517 "arrow.timestamp-second"
1518 } else {
1519 "arrow.local-timestamp-second"
1520 };
1521 return Ok((
1522 json!({ "type": "long", "logicalType": logical_type }),
1523 extras,
1524 ));
1525 }
1526 let logical_type = match (unit, tz.is_some()) {
1527 (TimeUnit::Millisecond, true) => "timestamp-millis",
1528 (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1529 (TimeUnit::Microsecond, true) => "timestamp-micros",
1530 (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1531 (TimeUnit::Nanosecond, true) => "timestamp-nanos",
1532 (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
1533 (TimeUnit::Second, has_tz) => {
1534 if !strip {
1536 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1537 }
1538 let ts_logical_type = if has_tz {
1539 "timestamp-millis"
1540 } else {
1541 "local-timestamp-millis"
1542 };
1543 return Ok((
1544 json!({ "type": "long", "logicalType": ts_logical_type }),
1545 extras,
1546 ));
1547 }
1548 };
1549 if !strip && matches!(unit, TimeUnit::Nanosecond) {
1550 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1551 }
1552 json!({ "type": "long", "logicalType": logical_type })
1553 }
1554 #[cfg(not(feature = "avro_custom_types"))]
1555 DataType::Duration(_unit) => Value::String("long".into()),
1556 #[cfg(feature = "avro_custom_types")]
1557 DataType::Duration(unit) => {
1558 let logical_type = match unit {
1561 TimeUnit::Second => "arrow.duration-seconds",
1562 TimeUnit::Millisecond => "arrow.duration-millis",
1563 TimeUnit::Microsecond => "arrow.duration-micros",
1564 TimeUnit::Nanosecond => "arrow.duration-nanos",
1565 };
1566 json!({ "type": "long", "logicalType": logical_type })
1567 }
1568 #[cfg(not(feature = "avro_custom_types"))]
1569 DataType::Interval(IntervalUnit::MonthDayNano) => {
1570 let chosen_name = metadata
1572 .get(AVRO_NAME_METADATA_KEY)
1573 .map(|s| sanitise_avro_name(s))
1574 .unwrap_or_else(|| name_gen.make_unique(field_name));
1575 let mut obj = JsonMap::from_iter([
1576 ("type".into(), json!("fixed")),
1577 ("name".into(), json!(chosen_name)),
1578 ("size".into(), json!(12)),
1579 ("logicalType".into(), json!("duration")),
1580 ]);
1581 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1582 obj.insert("namespace".into(), json!(ns));
1583 }
1584 json!(obj)
1585 }
1586 #[cfg(feature = "avro_custom_types")]
1587 DataType::Interval(IntervalUnit::MonthDayNano) => {
1588 let chosen_name = metadata
1591 .get(AVRO_NAME_METADATA_KEY)
1592 .map(|s| sanitise_avro_name(s))
1593 .unwrap_or_else(|| name_gen.make_unique(field_name));
1594 let mut obj = JsonMap::from_iter([
1595 ("type".into(), json!("fixed")),
1596 ("name".into(), json!(chosen_name)),
1597 ("size".into(), json!(16)),
1598 ("logicalType".into(), json!("arrow.interval-month-day-nano")),
1599 ]);
1600 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1601 obj.insert("namespace".into(), json!(ns));
1602 }
1603 json!(obj)
1604 }
1605 #[cfg(not(feature = "avro_custom_types"))]
1606 DataType::Interval(IntervalUnit::YearMonth) => {
1607 let chosen_name = metadata
1609 .get(AVRO_NAME_METADATA_KEY)
1610 .map(|s| sanitise_avro_name(s))
1611 .unwrap_or_else(|| name_gen.make_unique(field_name));
1612 let mut extras = JsonMap::from_iter([
1613 ("type".into(), json!("fixed")),
1614 ("name".into(), json!(chosen_name)),
1615 ("size".into(), json!(12)),
1616 ("logicalType".into(), json!("duration")),
1617 ]);
1618 if !strip {
1619 extras.insert(
1620 "arrowIntervalUnit".into(),
1621 Value::String("yearmonth".into()),
1622 );
1623 }
1624 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1625 extras.insert("namespace".into(), json!(ns));
1626 }
1627 json!(extras)
1628 }
1629 #[cfg(feature = "avro_custom_types")]
1630 DataType::Interval(IntervalUnit::YearMonth) => {
1631 let chosen_name = metadata
1632 .get(AVRO_NAME_METADATA_KEY)
1633 .map(|s| sanitise_avro_name(s))
1634 .unwrap_or_else(|| name_gen.make_unique(field_name));
1635 let mut obj = JsonMap::from_iter([
1636 ("type".into(), json!("fixed")),
1637 ("name".into(), json!(chosen_name)),
1638 ("size".into(), json!(4)),
1639 ("logicalType".into(), json!("arrow.interval-year-month")),
1640 ]);
1641 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1642 obj.insert("namespace".into(), json!(ns));
1643 }
1644 json!(obj)
1645 }
1646 #[cfg(not(feature = "avro_custom_types"))]
1647 DataType::Interval(IntervalUnit::DayTime) => {
1648 let chosen_name = metadata
1650 .get(AVRO_NAME_METADATA_KEY)
1651 .map(|s| sanitise_avro_name(s))
1652 .unwrap_or_else(|| name_gen.make_unique(field_name));
1653 let mut obj = JsonMap::from_iter([
1654 ("type".into(), json!("fixed")),
1655 ("name".into(), json!(chosen_name)),
1656 ("size".into(), json!(12)),
1657 ("logicalType".into(), json!("duration")),
1658 ]);
1659 if !strip {
1660 obj.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1661 }
1662 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1663 obj.insert("namespace".into(), json!(ns));
1664 }
1665 json!(obj)
1666 }
1667 #[cfg(feature = "avro_custom_types")]
1668 DataType::Interval(IntervalUnit::DayTime) => {
1669 let chosen_name = metadata
1670 .get(AVRO_NAME_METADATA_KEY)
1671 .map(|s| sanitise_avro_name(s))
1672 .unwrap_or_else(|| name_gen.make_unique(field_name));
1673 let mut obj = JsonMap::from_iter([
1674 ("type".into(), json!("fixed")),
1675 ("name".into(), json!(chosen_name)),
1676 ("size".into(), json!(8)),
1677 ("logicalType".into(), json!("arrow.interval-day-time")),
1678 ]);
1679 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1680 obj.insert("namespace".into(), json!(ns));
1681 }
1682 json!(obj)
1683 }
1684 DataType::List(child) | DataType::LargeList(child) => {
1685 if matches!(dt, DataType::LargeList(_)) && !strip {
1686 extras.insert("arrowLargeList".into(), Value::Bool(true));
1687 }
1688 let items_schema = process_datatype(
1689 child.data_type(),
1690 child.name(),
1691 child.metadata(),
1692 name_gen,
1693 null_order,
1694 child.is_nullable(),
1695 strip,
1696 )?;
1697 json!({
1698 "type": "array",
1699 "items": items_schema
1700 })
1701 }
1702 DataType::ListView(child) | DataType::LargeListView(child) => {
1703 if matches!(dt, DataType::LargeListView(_)) && !strip {
1704 extras.insert("arrowLargeList".into(), Value::Bool(true));
1705 }
1706 if !strip {
1707 extras.insert("arrowListView".into(), Value::Bool(true));
1708 }
1709 let items_schema = process_datatype(
1710 child.data_type(),
1711 child.name(),
1712 child.metadata(),
1713 name_gen,
1714 null_order,
1715 child.is_nullable(),
1716 strip,
1717 )?;
1718 json!({
1719 "type": "array",
1720 "items": items_schema
1721 })
1722 }
1723 DataType::FixedSizeList(child, len) => {
1724 if !strip {
1725 extras.insert("arrowFixedSize".into(), json!(len));
1726 }
1727 let items_schema = process_datatype(
1728 child.data_type(),
1729 child.name(),
1730 child.metadata(),
1731 name_gen,
1732 null_order,
1733 child.is_nullable(),
1734 strip,
1735 )?;
1736 json!({
1737 "type": "array",
1738 "items": items_schema
1739 })
1740 }
1741 DataType::Map(entries, _) => {
1742 let value_field = match entries.data_type() {
1743 DataType::Struct(fs) => &fs[1],
1744 _ => {
1745 return Err(ArrowError::SchemaError(
1746 "Map 'entries' field must be Struct(key,value)".into(),
1747 ));
1748 }
1749 };
1750 let values_schema = process_datatype(
1751 value_field.data_type(),
1752 value_field.name(),
1753 value_field.metadata(),
1754 name_gen,
1755 null_order,
1756 value_field.is_nullable(),
1757 strip,
1758 )?;
1759 json!({
1760 "type": "map",
1761 "values": values_schema
1762 })
1763 }
1764 DataType::Struct(fields) => {
1765 let avro_fields = fields
1766 .iter()
1767 .map(|field| arrow_field_to_avro(field, name_gen, null_order, strip))
1768 .collect::<Result<Vec<_>, _>>()?;
1769 let chosen_name = metadata
1771 .get(AVRO_NAME_METADATA_KEY)
1772 .map(|s| sanitise_avro_name(s))
1773 .unwrap_or_else(|| name_gen.make_unique(field_name));
1774 let mut obj = JsonMap::from_iter([
1775 ("type".into(), json!("record")),
1776 ("name".into(), json!(chosen_name)),
1777 ("fields".into(), Value::Array(avro_fields)),
1778 ]);
1779 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1780 obj.insert("namespace".into(), json!(ns));
1781 }
1782 Value::Object(obj)
1783 }
1784 DataType::Dictionary(_, value) => {
1785 if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1786 let symbols: Vec<&str> =
1787 serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1788 let chosen_name = metadata
1790 .get(AVRO_NAME_METADATA_KEY)
1791 .map(|s| sanitise_avro_name(s))
1792 .unwrap_or_else(|| name_gen.make_unique(field_name));
1793 let mut obj = JsonMap::from_iter([
1794 ("type".into(), json!("enum")),
1795 ("name".into(), json!(chosen_name)),
1796 ("symbols".into(), json!(symbols)),
1797 ]);
1798 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1799 obj.insert("namespace".into(), json!(ns));
1800 }
1801 Value::Object(obj)
1802 } else {
1803 process_datatype(
1804 value.as_ref(),
1805 field_name,
1806 metadata,
1807 name_gen,
1808 null_order,
1809 false,
1810 strip,
1811 )?
1812 }
1813 }
1814 #[cfg(feature = "avro_custom_types")]
1815 DataType::RunEndEncoded(run_ends, values) => {
1816 let bits = match run_ends.data_type() {
1817 DataType::Int16 => 16,
1818 DataType::Int32 => 32,
1819 DataType::Int64 => 64,
1820 other => {
1821 return Err(ArrowError::SchemaError(format!(
1822 "RunEndEncoded requires Int16/Int32/Int64 for run_ends, found: {other:?}"
1823 )));
1824 }
1825 };
1826 let (value_schema, value_extras) = datatype_to_avro(
1828 values.data_type(),
1829 values.name(),
1830 values.metadata(),
1831 name_gen,
1832 null_order,
1833 strip,
1834 )?;
1835 let mut merged = merge_extras(value_schema, value_extras);
1836 if values.is_nullable() {
1837 merged = wrap_nullable(merged, null_order);
1838 }
1839 let mut extras = JsonMap::new();
1840 extras.insert("logicalType".into(), json!("arrow.run-end-encoded"));
1841 extras.insert("arrow.runEndIndexBits".into(), json!(bits));
1842 return Ok((merged, extras));
1843 }
1844 #[cfg(not(feature = "avro_custom_types"))]
1845 DataType::RunEndEncoded(_run_ends, values) => {
1846 let (value_schema, _extras) = datatype_to_avro(
1847 values.data_type(),
1848 values.name(),
1849 values.metadata(),
1850 name_gen,
1851 null_order,
1852 strip,
1853 )?;
1854 return Ok((value_schema, JsonMap::new()));
1855 }
1856 DataType::Union(fields, mode) => {
1857 let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1858 let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1859 for (type_id, field_ref) in fields.iter() {
1860 let (branch_schema, _branch_extras) = datatype_to_avro(
1862 field_ref.data_type(),
1863 field_ref.name(),
1864 field_ref.metadata(),
1865 name_gen,
1866 null_order,
1867 strip,
1868 )?;
1869 if matches!(branch_schema, Value::Array(_)) {
1871 return Err(ArrowError::SchemaError(
1872 "Avro union may not immediately contain another union".into(),
1873 ));
1874 }
1875 branches.push(branch_schema);
1876 type_ids.push(type_id as i32);
1877 }
1878 let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1879 for b in &branches {
1880 let sig = union_branch_signature(b)?;
1881 if !seen.insert(sig) {
1882 return Err(ArrowError::SchemaError(
1883 "Avro union contains duplicate branch types (disallowed by spec)".into(),
1884 ));
1885 }
1886 }
1887 if !strip {
1888 extras.insert(
1889 "arrowUnionMode".into(),
1890 Value::String(
1891 match mode {
1892 UnionMode::Sparse => "sparse",
1893 UnionMode::Dense => "dense",
1894 }
1895 .to_string(),
1896 ),
1897 );
1898 extras.insert(
1899 "arrowUnionTypeIds".into(),
1900 Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1901 );
1902 }
1903 Value::Array(branches)
1904 }
1905 #[cfg(not(feature = "small_decimals"))]
1906 other => {
1907 return Err(ArrowError::NotYetImplemented(format!(
1908 "Arrow type {other:?} has no Avro representation"
1909 )));
1910 }
1911 };
1912 Ok((val, extras))
1913}
1914
1915fn process_datatype(
1916 dt: &DataType,
1917 field_name: &str,
1918 metadata: &HashMap<String, String>,
1919 name_gen: &mut NameGenerator,
1920 null_order: Nullability,
1921 is_nullable: bool,
1922 strip: bool,
1923) -> Result<Value, ArrowError> {
1924 let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order, strip)?;
1925 let mut merged = merge_extras(schema, extras);
1926 if is_nullable {
1927 merged = wrap_nullable(merged, null_order)
1928 }
1929 Ok(merged)
1930}
1931
1932fn arrow_field_to_avro(
1933 field: &ArrowField,
1934 name_gen: &mut NameGenerator,
1935 null_order: Nullability,
1936 strip: bool,
1937) -> Result<Value, ArrowError> {
1938 let avro_name = sanitise_avro_name(field.name());
1939 let schema_value = process_datatype(
1940 field.data_type(),
1941 &avro_name,
1942 field.metadata(),
1943 name_gen,
1944 null_order,
1945 field.is_nullable(),
1946 strip,
1947 )?;
1948 let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1950 map.insert("name".into(), Value::String(avro_name));
1951 map.insert("type".into(), schema_value);
1952 for (meta_key, meta_val) in field.metadata() {
1954 if is_internal_arrow_key(meta_key) {
1955 continue;
1956 }
1957 match meta_key.as_str() {
1958 AVRO_DOC_METADATA_KEY => {
1959 map.insert("doc".into(), Value::String(meta_val.clone()));
1960 }
1961 AVRO_FIELD_DEFAULT_METADATA_KEY => {
1962 let default_value = serde_json::from_str(meta_val)
1963 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1964 map.insert("default".into(), default_value);
1965 }
1966 _ => {
1967 let json_val = serde_json::from_str(meta_val)
1968 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1969 map.insert(meta_key.clone(), json_val);
1970 }
1971 }
1972 }
1973 Ok(Value::Object(map))
1974}
1975
1976#[cfg(test)]
1977mod tests {
1978 use super::*;
1979 use crate::codec::{AvroField, AvroFieldBuilder};
1980 use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1981 use serde_json::json;
1982 use std::sync::Arc;
1983
1984 fn int_schema() -> Schema<'static> {
1985 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1986 }
1987
1988 fn record_schema() -> Schema<'static> {
1989 Schema::Complex(ComplexType::Record(Record {
1990 name: "record1",
1991 namespace: Some("test.namespace"),
1992 doc: Some(Cow::from("A test record")),
1993 aliases: vec![],
1994 fields: vec![
1995 Field {
1996 name: "field1",
1997 doc: Some(Cow::from("An integer field")),
1998 r#type: int_schema(),
1999 default: None,
2000 aliases: vec![],
2001 },
2002 Field {
2003 name: "field2",
2004 doc: None,
2005 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2006 default: None,
2007 aliases: vec![],
2008 },
2009 ],
2010 attributes: Attributes::default(),
2011 }))
2012 }
2013
2014 fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
2015 let mut sb = SchemaBuilder::new();
2016 sb.push(field);
2017 sb.finish()
2018 }
2019
2020 fn assert_json_contains(avro_json: &str, needle: &str) {
2021 assert!(
2022 avro_json.contains(needle),
2023 "JSON did not contain `{needle}` : {avro_json}"
2024 )
2025 }
2026
2027 #[test]
2028 fn test_deserialize() {
2029 let t: Schema = serde_json::from_str("\"string\"").unwrap();
2030 assert_eq!(
2031 t,
2032 Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
2033 );
2034
2035 let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
2036 assert_eq!(
2037 t,
2038 Schema::Union(vec![
2039 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2040 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2041 ])
2042 );
2043
2044 let t: Type = serde_json::from_str(
2045 r#"{
2046 "type":"long",
2047 "logicalType":"timestamp-micros"
2048 }"#,
2049 )
2050 .unwrap();
2051
2052 let timestamp = Type {
2053 r#type: TypeName::Primitive(PrimitiveType::Long),
2054 attributes: Attributes {
2055 logical_type: Some("timestamp-micros"),
2056 additional: Default::default(),
2057 },
2058 };
2059
2060 assert_eq!(t, timestamp);
2061
2062 let t: ComplexType = serde_json::from_str(
2063 r#"{
2064 "type":"fixed",
2065 "name":"fixed",
2066 "namespace":"topLevelRecord.value",
2067 "size":11,
2068 "logicalType":"decimal",
2069 "precision":25,
2070 "scale":2
2071 }"#,
2072 )
2073 .unwrap();
2074
2075 let decimal = ComplexType::Fixed(Fixed {
2076 name: "fixed",
2077 namespace: Some("topLevelRecord.value"),
2078 aliases: vec![],
2079 size: 11,
2080 attributes: Attributes {
2081 logical_type: Some("decimal"),
2082 additional: vec![("precision", json!(25)), ("scale", json!(2))]
2083 .into_iter()
2084 .collect(),
2085 },
2086 });
2087
2088 assert_eq!(t, decimal);
2089
2090 let schema: Schema = serde_json::from_str(
2091 r#"{
2092 "type":"record",
2093 "name":"topLevelRecord",
2094 "fields":[
2095 {
2096 "name":"value",
2097 "type":[
2098 {
2099 "type":"fixed",
2100 "name":"fixed",
2101 "namespace":"topLevelRecord.value",
2102 "size":11,
2103 "logicalType":"decimal",
2104 "precision":25,
2105 "scale":2
2106 },
2107 "null"
2108 ]
2109 }
2110 ]
2111 }"#,
2112 )
2113 .unwrap();
2114
2115 assert_eq!(
2116 schema,
2117 Schema::Complex(ComplexType::Record(Record {
2118 name: "topLevelRecord",
2119 namespace: None,
2120 doc: None,
2121 aliases: vec![],
2122 fields: vec![Field {
2123 name: "value",
2124 doc: None,
2125 r#type: Schema::Union(vec![
2126 Schema::Complex(decimal),
2127 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2128 ]),
2129 default: None,
2130 aliases: vec![],
2131 },],
2132 attributes: Default::default(),
2133 }))
2134 );
2135
2136 let schema: Schema = serde_json::from_str(
2137 r#"{
2138 "type": "record",
2139 "name": "LongList",
2140 "aliases": ["LinkedLongs"],
2141 "fields" : [
2142 {"name": "value", "type": "long"},
2143 {"name": "next", "type": ["null", "LongList"]}
2144 ]
2145 }"#,
2146 )
2147 .unwrap();
2148
2149 assert_eq!(
2150 schema,
2151 Schema::Complex(ComplexType::Record(Record {
2152 name: "LongList",
2153 namespace: None,
2154 doc: None,
2155 aliases: vec!["LinkedLongs"],
2156 fields: vec![
2157 Field {
2158 name: "value",
2159 doc: None,
2160 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2161 default: None,
2162 aliases: vec![],
2163 },
2164 Field {
2165 name: "next",
2166 doc: None,
2167 r#type: Schema::Union(vec![
2168 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2169 Schema::TypeName(TypeName::Ref("LongList")),
2170 ]),
2171 default: None,
2172 aliases: vec![],
2173 }
2174 ],
2175 attributes: Attributes::default(),
2176 }))
2177 );
2178
2179 let err = AvroField::try_from(&schema).unwrap_err().to_string();
2181 assert_eq!(err, "Parser error: Failed to resolve .LongList");
2182
2183 let schema: Schema = serde_json::from_str(
2184 r#"{
2185 "type":"record",
2186 "name":"topLevelRecord",
2187 "fields":[
2188 {
2189 "name":"id",
2190 "type":[
2191 "int",
2192 "null"
2193 ]
2194 },
2195 {
2196 "name":"timestamp_col",
2197 "type":[
2198 {
2199 "type":"long",
2200 "logicalType":"timestamp-micros"
2201 },
2202 "null"
2203 ]
2204 }
2205 ]
2206 }"#,
2207 )
2208 .unwrap();
2209
2210 assert_eq!(
2211 schema,
2212 Schema::Complex(ComplexType::Record(Record {
2213 name: "topLevelRecord",
2214 namespace: None,
2215 doc: None,
2216 aliases: vec![],
2217 fields: vec![
2218 Field {
2219 name: "id",
2220 doc: None,
2221 r#type: Schema::Union(vec![
2222 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2223 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2224 ]),
2225 default: None,
2226 aliases: vec![],
2227 },
2228 Field {
2229 name: "timestamp_col",
2230 doc: None,
2231 r#type: Schema::Union(vec![
2232 Schema::Type(timestamp),
2233 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2234 ]),
2235 default: None,
2236 aliases: vec![],
2237 }
2238 ],
2239 attributes: Default::default(),
2240 }))
2241 );
2242 let codec = AvroField::try_from(&schema).unwrap();
2243 let expected_arrow_field = arrow_schema::Field::new(
2244 "topLevelRecord",
2245 DataType::Struct(Fields::from(vec![
2246 arrow_schema::Field::new("id", DataType::Int32, true),
2247 arrow_schema::Field::new(
2248 "timestamp_col",
2249 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2250 true,
2251 ),
2252 ])),
2253 false,
2254 )
2255 .with_metadata(std::collections::HashMap::from([(
2256 AVRO_NAME_METADATA_KEY.to_string(),
2257 "topLevelRecord".to_string(),
2258 )]));
2259
2260 assert_eq!(codec.field(), expected_arrow_field);
2261
2262 let schema: Schema = serde_json::from_str(
2263 r#"{
2264 "type": "record",
2265 "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
2266 "fields": [
2267 {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
2268 {"name": "clientProtocol", "type": ["null", "string"]},
2269 {"name": "serverHash", "type": "MD5"},
2270 {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
2271 ]
2272 }"#,
2273 )
2274 .unwrap();
2275
2276 assert_eq!(
2277 schema,
2278 Schema::Complex(ComplexType::Record(Record {
2279 name: "HandshakeRequest",
2280 namespace: Some("org.apache.avro.ipc"),
2281 doc: None,
2282 aliases: vec![],
2283 fields: vec![
2284 Field {
2285 name: "clientHash",
2286 doc: None,
2287 r#type: Schema::Complex(ComplexType::Fixed(Fixed {
2288 name: "MD5",
2289 namespace: None,
2290 aliases: vec![],
2291 size: 16,
2292 attributes: Default::default(),
2293 })),
2294 default: None,
2295 aliases: vec![],
2296 },
2297 Field {
2298 name: "clientProtocol",
2299 doc: None,
2300 r#type: Schema::Union(vec![
2301 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2302 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2303 ]),
2304 default: None,
2305 aliases: vec![],
2306 },
2307 Field {
2308 name: "serverHash",
2309 doc: None,
2310 r#type: Schema::TypeName(TypeName::Ref("MD5")),
2311 default: None,
2312 aliases: vec![],
2313 },
2314 Field {
2315 name: "meta",
2316 doc: None,
2317 r#type: Schema::Union(vec![
2318 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2319 Schema::Complex(ComplexType::Map(Map {
2320 values: Box::new(Schema::TypeName(TypeName::Primitive(
2321 PrimitiveType::Bytes
2322 ))),
2323 attributes: Default::default(),
2324 })),
2325 ]),
2326 default: None,
2327 aliases: vec![],
2328 }
2329 ],
2330 attributes: Default::default(),
2331 }))
2332 );
2333 }
2334
2335 #[test]
2336 fn test_canonical_form_generation_comprehensive_record() {
2337 let json_str = r#"{
2339 "type": "record",
2340 "name": "E2eComprehensive",
2341 "namespace": "org.apache.arrow.avrotests.v1",
2342 "doc": "Comprehensive Avro writer schema to exercise arrow-avro Reader/Decoder paths.",
2343 "fields": [
2344 {"name": "id", "type": "long", "doc": "Primary row id", "aliases": ["identifier"]},
2345 {"name": "flag", "type": "boolean", "default": true, "doc": "A sample boolean with default true"},
2346 {"name": "ratio_f32", "type": "float", "default": 0.0, "doc": "Float32 example"},
2347 {"name": "ratio_f64", "type": "double", "default": 0.0, "doc": "Float64 example"},
2348 {"name": "count_i32", "type": "int", "default": 0, "doc": "Int32 example"},
2349 {"name": "count_i64", "type": "long", "default": 0, "doc": "Int64 example"},
2350 {"name": "opt_i32_nullfirst", "type": ["null", "int"], "default": null, "doc": "Nullable int (null-first)"},
2351 {"name": "opt_str_nullsecond", "type": ["string", "null"], "default": "", "aliases": ["old_opt_str"], "doc": "Nullable string (null-second). Default is empty string."},
2352 {"name": "tri_union_prim", "type": ["int", "string", "boolean"], "default": 0, "doc": "Union[int, string, boolean] with default on first branch (int=0)."},
2353 {"name": "str_utf8", "type": "string", "default": "default", "doc": "Plain Utf8 string (Reader may use Utf8View)."},
2354 {"name": "raw_bytes", "type": "bytes", "default": "", "doc": "Raw bytes field"},
2355 {"name": "fx16_plain", "type": {"type": "fixed", "name": "Fx16", "namespace": "org.apache.arrow.avrotests.v1.types", "aliases": ["Fixed16Old"], "size": 16}, "doc": "Plain fixed(16)"},
2356 {"name": "dec_bytes_s10_2", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}, "doc": "Decimal encoded on bytes, precision 10, scale 2"},
2357 {"name": "dec_fix_s20_4", "type": {"type": "fixed", "name": "DecFix20", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 20, "logicalType": "decimal", "precision": 20, "scale": 4}, "doc": "Decimal encoded on fixed(20), precision 20, scale 4"},
2358 {"name": "uuid_str", "type": {"type": "string", "logicalType": "uuid"}, "doc": "UUID logical type on string"},
2359 {"name": "d_date", "type": {"type": "int", "logicalType": "date"}, "doc": "Date32: days since 1970-01-01"},
2360 {"name": "t_millis", "type": {"type": "int", "logicalType": "time-millis"}, "doc": "Time32-millis"},
2361 {"name": "t_micros", "type": {"type": "long", "logicalType": "time-micros"}, "doc": "Time64-micros"},
2362 {"name": "ts_millis_utc", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Timestamp ms (UTC)"},
2363 {"name": "ts_micros_utc", "type": {"type": "long", "logicalType": "timestamp-micros"}, "doc": "Timestamp µs (UTC)"},
2364 {"name": "ts_millis_local", "type": {"type": "long", "logicalType": "local-timestamp-millis"}, "doc": "Local timestamp ms"},
2365 {"name": "ts_micros_local", "type": {"type": "long", "logicalType": "local-timestamp-micros"}, "doc": "Local timestamp µs"},
2366 {"name": "interval_mdn", "type": {"type": "fixed", "name": "Dur12", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 12, "logicalType": "duration"}, "doc": "Duration: fixed(12) little-endian (months, days, millis)"},
2367 {"name": "status", "type": {"type": "enum", "name": "Status", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["UNKNOWN", "NEW", "PROCESSING", "DONE"], "aliases": ["State"], "doc": "Processing status enum with default"}, "default": "UNKNOWN", "doc": "Enum field using default when resolving"},
2368 {"name": "arr_union", "type": {"type": "array", "items": ["long", "string", "null"]}, "default": [], "doc": "Array whose items are a union[long,string,null]"},
2369 {"name": "map_union", "type": {"type": "map", "values": ["null", "double", "string"]}, "default": {}, "doc": "Map whose values are a union[null,double,string]"},
2370 {"name": "address", "type": {"type": "record", "name": "Address", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Postal address with defaults and field alias", "fields": [
2371 {"name": "street", "type": "string", "default": "", "aliases": ["street_name"], "doc": "Street (field alias = street_name)"},
2372 {"name": "zip", "type": "int", "default": 0, "doc": "ZIP/postal code"},
2373 {"name": "country", "type": "string", "default": "US", "doc": "Country code"}
2374 ]}, "doc": "Embedded Address record"},
2375 {"name": "maybe_auth", "type": {"type": "record", "name": "MaybeAuth", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Optional auth token model", "fields": [
2376 {"name": "user", "type": "string", "doc": "Username"},
2377 {"name": "token", "type": ["null", "bytes"], "default": null, "doc": "Nullable auth token"}
2378 ]}},
2379 {"name": "union_enum_record_array_map", "type": [
2380 {"type": "enum", "name": "Color", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["RED", "GREEN", "BLUE"], "doc": "Color enum"},
2381 {"type": "record", "name": "RecA", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]},
2382 {"type": "record", "name": "RecB", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "x", "type": "long"}, {"name": "y", "type": "bytes"}]},
2383 {"type": "array", "items": "long"},
2384 {"type": "map", "values": "string"}
2385 ], "doc": "Union of enum, two records, array, and map"},
2386 {"name": "union_date_or_fixed4", "type": [
2387 {"type": "int", "logicalType": "date"},
2388 {"type": "fixed", "name": "Fx4", "size": 4}
2389 ], "doc": "Union of date(int) or fixed(4)"},
2390 {"name": "union_interval_or_string", "type": [
2391 {"type": "fixed", "name": "Dur12U", "size": 12, "logicalType": "duration"},
2392 "string"
2393 ], "doc": "Union of duration(fixed12) or string"},
2394 {"name": "union_uuid_or_fixed10", "type": [
2395 {"type": "string", "logicalType": "uuid"},
2396 {"type": "fixed", "name": "Fx10", "size": 10}
2397 ], "doc": "Union of UUID string or fixed(10)"},
2398 {"name": "array_records_with_union", "type": {"type": "array", "items": {
2399 "type": "record", "name": "KV", "namespace": "org.apache.arrow.avrotests.v1.types",
2400 "fields": [
2401 {"name": "key", "type": "string"},
2402 {"name": "val", "type": ["null", "int", "long"], "default": null}
2403 ]
2404 }}, "doc": "Array<record{key, val: union[null,int,long]}>", "default": []},
2405 {"name": "union_map_or_array_int", "type": [
2406 {"type": "map", "values": "int"},
2407 {"type": "array", "items": "int"}
2408 ], "doc": "Union[map<string,int>, array<int>]"},
2409 {"name": "renamed_with_default", "type": "int", "default": 42, "aliases": ["old_count"], "doc": "Field with alias and default"},
2410 {"name": "person", "type": {"type": "record", "name": "PersonV2", "namespace": "com.example.v2", "aliases": ["com.example.Person"], "doc": "Person record with alias pointing to previous namespace/name", "fields": [
2411 {"name": "name", "type": "string"},
2412 {"name": "age", "type": "int", "default": 0}
2413 ]}, "doc": "Record using type alias for schema evolution tests"}
2414 ]
2415 }"#;
2416 let avro = AvroSchema::new(json_str.to_string());
2417 let parsed = avro.schema().expect("schema should deserialize");
2418 let expected_canonical_form = r#"{"name":"org.apache.arrow.avrotests.v1.E2eComprehensive","type":"record","fields":[{"name":"id","type":"long"},{"name":"flag","type":"boolean"},{"name":"ratio_f32","type":"float"},{"name":"ratio_f64","type":"double"},{"name":"count_i32","type":"int"},{"name":"count_i64","type":"long"},{"name":"opt_i32_nullfirst","type":["null","int"]},{"name":"opt_str_nullsecond","type":["string","null"]},{"name":"tri_union_prim","type":["int","string","boolean"]},{"name":"str_utf8","type":"string"},{"name":"raw_bytes","type":"bytes"},{"name":"fx16_plain","type":{"name":"org.apache.arrow.avrotests.v1.types.Fx16","type":"fixed","size":16}},{"name":"dec_bytes_s10_2","type":"bytes"},{"name":"dec_fix_s20_4","type":{"name":"org.apache.arrow.avrotests.v1.types.DecFix20","type":"fixed","size":20}},{"name":"uuid_str","type":"string"},{"name":"d_date","type":"int"},{"name":"t_millis","type":"int"},{"name":"t_micros","type":"long"},{"name":"ts_millis_utc","type":"long"},{"name":"ts_micros_utc","type":"long"},{"name":"ts_millis_local","type":"long"},{"name":"ts_micros_local","type":"long"},{"name":"interval_mdn","type":{"name":"org.apache.arrow.avrotests.v1.types.Dur12","type":"fixed","size":12}},{"name":"status","type":{"name":"org.apache.arrow.avrotests.v1.types.Status","type":"enum","symbols":["UNKNOWN","NEW","PROCESSING","DONE"]}},{"name":"arr_union","type":{"type":"array","items":["long","string","null"]}},{"name":"map_union","type":{"type":"map","values":["null","double","string"]}},{"name":"address","type":{"name":"org.apache.arrow.avrotests.v1.types.Address","type":"record","fields":[{"name":"street","type":"string"},{"name":"zip","type":"int"},{"name":"country","type":"string"}]}},{"name":"maybe_auth","type":{"name":"org.apache.arrow.avrotests.v1.types.MaybeAuth","type":"record","fields":[{"name":"user","type":"string"},{"name":"token","type":["null","bytes"]}]}},{"name":"union_enum_record_array_map","type":[{"name":"org.apache.arrow.avrotests.v1.types.Color","type":"enum","symbols":["RED","GREEN","BLUE"]},{"name":"org.apache.arrow.avrotests.v1.types.RecA","type":"record","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]},{"name":"org.apache.arrow.avrotests.v1.types.RecB","type":"record","fields":[{"name":"x","type":"long"},{"name":"y","type":"bytes"}]},{"type":"array","items":"long"},{"type":"map","values":"string"}]},{"name":"union_date_or_fixed4","type":["int",{"name":"org.apache.arrow.avrotests.v1.Fx4","type":"fixed","size":4}]},{"name":"union_interval_or_string","type":[{"name":"org.apache.arrow.avrotests.v1.Dur12U","type":"fixed","size":12},"string"]},{"name":"union_uuid_or_fixed10","type":["string",{"name":"org.apache.arrow.avrotests.v1.Fx10","type":"fixed","size":10}]},{"name":"array_records_with_union","type":{"type":"array","items":{"name":"org.apache.arrow.avrotests.v1.types.KV","type":"record","fields":[{"name":"key","type":"string"},{"name":"val","type":["null","int","long"]}]}}},{"name":"union_map_or_array_int","type":[{"type":"map","values":"int"},{"type":"array","items":"int"}]},{"name":"renamed_with_default","type":"int"},{"name":"person","type":{"name":"com.example.v2.PersonV2","type":"record","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}}]}"#;
2419 let canonical_form =
2420 AvroSchema::generate_canonical_form(&parsed).expect("canonical form should be built");
2421 assert_eq!(
2422 canonical_form, expected_canonical_form,
2423 "Canonical form must match Avro spec PCF exactly"
2424 );
2425 }
2426
2427 #[test]
2428 fn test_new_schema_store() {
2429 let store = SchemaStore::new();
2430 assert!(store.schemas.is_empty());
2431 }
2432
2433 #[test]
2434 fn test_try_from_schemas_rabin() {
2435 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2436 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2437 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2438 schemas.insert(
2439 int_avro_schema
2440 .fingerprint(FingerprintAlgorithm::Rabin)
2441 .unwrap(),
2442 int_avro_schema.clone(),
2443 );
2444 schemas.insert(
2445 record_avro_schema
2446 .fingerprint(FingerprintAlgorithm::Rabin)
2447 .unwrap(),
2448 record_avro_schema.clone(),
2449 );
2450 let store = SchemaStore::try_from(schemas).unwrap();
2451 let int_fp = int_avro_schema
2452 .fingerprint(FingerprintAlgorithm::Rabin)
2453 .unwrap();
2454 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2455 let rec_fp = record_avro_schema
2456 .fingerprint(FingerprintAlgorithm::Rabin)
2457 .unwrap();
2458 assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
2459 }
2460
2461 #[test]
2462 fn test_try_from_with_duplicates() {
2463 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2464 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2465 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2466 schemas.insert(
2467 int_avro_schema
2468 .fingerprint(FingerprintAlgorithm::Rabin)
2469 .unwrap(),
2470 int_avro_schema.clone(),
2471 );
2472 schemas.insert(
2473 record_avro_schema
2474 .fingerprint(FingerprintAlgorithm::Rabin)
2475 .unwrap(),
2476 record_avro_schema.clone(),
2477 );
2478 schemas.insert(
2480 int_avro_schema
2481 .fingerprint(FingerprintAlgorithm::Rabin)
2482 .unwrap(),
2483 int_avro_schema.clone(),
2484 );
2485 let store = SchemaStore::try_from(schemas).unwrap();
2486 assert_eq!(store.schemas.len(), 2);
2487 let int_fp = int_avro_schema
2488 .fingerprint(FingerprintAlgorithm::Rabin)
2489 .unwrap();
2490 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2491 }
2492
2493 #[test]
2494 fn test_register_and_lookup_rabin() {
2495 let mut store = SchemaStore::new();
2496 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2497 let fp_enum = store.register(schema.clone()).unwrap();
2498 match fp_enum {
2499 Fingerprint::Rabin(fp_val) => {
2500 assert_eq!(
2501 store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
2502 Some(schema.clone())
2503 );
2504 assert!(
2505 store
2506 .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
2507 .is_none()
2508 );
2509 }
2510 Fingerprint::Id(_id) => {
2511 unreachable!("This test should only generate Rabin fingerprints")
2512 }
2513 Fingerprint::Id64(_id) => {
2514 unreachable!("This test should only generate Rabin fingerprints")
2515 }
2516 #[cfg(feature = "md5")]
2517 Fingerprint::MD5(_id) => {
2518 unreachable!("This test should only generate Rabin fingerprints")
2519 }
2520 #[cfg(feature = "sha256")]
2521 Fingerprint::SHA256(_id) => {
2522 unreachable!("This test should only generate Rabin fingerprints")
2523 }
2524 }
2525 }
2526
2527 #[test]
2528 fn test_set_and_lookup_id() {
2529 let mut store = SchemaStore::new();
2530 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2531 let id = 42u32;
2532 let fp = Fingerprint::Id(id);
2533 let out_fp = store.set(fp, schema.clone()).unwrap();
2534 assert_eq!(out_fp, fp);
2535 assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2536 assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2537 }
2538
2539 #[test]
2540 fn test_set_and_lookup_id64() {
2541 let mut store = SchemaStore::new();
2542 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2543 let id64: u64 = 0xDEAD_BEEF_DEAD_BEEF;
2544 let fp = Fingerprint::Id64(id64);
2545 let out_fp = store.set(fp, schema.clone()).unwrap();
2546 assert_eq!(out_fp, fp, "set should return the same Id64 fingerprint");
2547 assert_eq!(
2548 store.lookup(&fp).cloned(),
2549 Some(schema.clone()),
2550 "lookup should find the schema by Id64"
2551 );
2552 assert!(
2553 store
2554 .lookup(&Fingerprint::Id64(id64.wrapping_add(1)))
2555 .is_none(),
2556 "lookup with a different Id64 must return None"
2557 );
2558 }
2559
2560 #[test]
2561 fn test_fingerprint_id64_conversions() {
2562 let algo_from_fp = FingerprintAlgorithm::from(&Fingerprint::Id64(123));
2563 assert_eq!(algo_from_fp, FingerprintAlgorithm::Id64);
2564 let fp_from_algo = Fingerprint::from(FingerprintAlgorithm::Id64);
2565 assert!(matches!(fp_from_algo, Fingerprint::Id64(0)));
2566 let strategy_from_fp = FingerprintStrategy::from(Fingerprint::Id64(5));
2567 assert!(matches!(strategy_from_fp, FingerprintStrategy::Id64(0)));
2568 let algo_from_strategy = FingerprintAlgorithm::from(strategy_from_fp);
2569 assert_eq!(algo_from_strategy, FingerprintAlgorithm::Id64);
2570 }
2571
2572 #[test]
2573 fn test_register_duplicate_schema() {
2574 let mut store = SchemaStore::new();
2575 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2576 let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2577 let fingerprint1 = store.register(schema1).unwrap();
2578 let fingerprint2 = store.register(schema2).unwrap();
2579 assert_eq!(fingerprint1, fingerprint2);
2580 assert_eq!(store.schemas.len(), 1);
2581 }
2582
2583 #[test]
2584 fn test_set_and_lookup_with_provided_fingerprint() {
2585 let mut store = SchemaStore::new();
2586 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2587 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2588 let out_fp = store.set(fp, schema.clone()).unwrap();
2589 assert_eq!(out_fp, fp);
2590 assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2591 }
2592
2593 #[test]
2594 fn test_set_duplicate_same_schema_ok() {
2595 let mut store = SchemaStore::new();
2596 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2597 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2598 let _ = store.set(fp, schema.clone()).unwrap();
2599 let _ = store.set(fp, schema.clone()).unwrap();
2600 assert_eq!(store.schemas.len(), 1);
2601 }
2602
2603 #[test]
2604 fn test_set_duplicate_different_schema_collision_error() {
2605 let mut store = SchemaStore::new();
2606 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2607 let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2608 let fp = Fingerprint::Id(123);
2610 let _ = store.set(fp, schema1).unwrap();
2611 let err = store.set(fp, schema2).unwrap_err();
2612 let msg = format!("{err}");
2613 assert!(msg.contains("Schema fingerprint collision"));
2614 }
2615
2616 #[test]
2617 fn test_canonical_form_generation_primitive() {
2618 let schema = int_schema();
2619 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2620 assert_eq!(canonical_form, r#""int""#);
2621 }
2622
2623 #[test]
2624 fn test_canonical_form_generation_record() {
2625 let schema = record_schema();
2626 let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2627 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2628 assert_eq!(canonical_form, expected_canonical_form);
2629 }
2630
2631 #[test]
2632 fn test_fingerprint_calculation() {
2633 let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2634 let expected_fingerprint = 10505236152925314060;
2635 let fingerprint = compute_fingerprint_rabin(canonical_form);
2636 assert_eq!(fingerprint, expected_fingerprint);
2637 }
2638
2639 #[test]
2640 fn test_register_and_lookup_complex_schema() {
2641 let mut store = SchemaStore::new();
2642 let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2643 let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2644 let expected_fingerprint = Fingerprint::Rabin(compute_fingerprint_rabin(canonical_form));
2645 let fingerprint = store.register(schema.clone()).unwrap();
2646 assert_eq!(fingerprint, expected_fingerprint);
2647 let looked_up = store.lookup(&fingerprint).cloned();
2648 assert_eq!(looked_up, Some(schema));
2649 }
2650
2651 #[test]
2652 fn test_fingerprints_returns_all_keys() {
2653 let mut store = SchemaStore::new();
2654 let fp_int = store
2655 .register(AvroSchema::new(
2656 serde_json::to_string(&int_schema()).unwrap(),
2657 ))
2658 .unwrap();
2659 let fp_record = store
2660 .register(AvroSchema::new(
2661 serde_json::to_string(&record_schema()).unwrap(),
2662 ))
2663 .unwrap();
2664 let fps = store.fingerprints();
2665 assert_eq!(fps.len(), 2);
2666 assert!(fps.contains(&fp_int));
2667 assert!(fps.contains(&fp_record));
2668 }
2669
2670 #[test]
2671 fn test_canonical_form_strips_attributes() {
2672 let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2673 name: "record_with_attrs",
2674 namespace: None,
2675 doc: Some(Cow::from("This doc should be stripped")),
2676 aliases: vec!["alias1", "alias2"],
2677 fields: vec![Field {
2678 name: "f1",
2679 doc: Some(Cow::from("field doc")),
2680 r#type: Schema::Type(Type {
2681 r#type: TypeName::Primitive(PrimitiveType::Bytes),
2682 attributes: Attributes {
2683 logical_type: None,
2684 additional: HashMap::from([("precision", json!(4))]),
2685 },
2686 }),
2687 default: None,
2688 aliases: vec![],
2689 }],
2690 attributes: Attributes {
2691 logical_type: None,
2692 additional: HashMap::from([("custom_attr", json!("value"))]),
2693 },
2694 }));
2695 let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2696 let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2697 assert_eq!(canonical_form, expected_canonical_form);
2698 }
2699
2700 #[cfg(not(feature = "avro_custom_types"))]
2701 #[test]
2702 fn test_primitive_mappings() {
2703 let cases = vec![
2704 (DataType::Boolean, "\"boolean\""),
2705 (DataType::Int8, "\"int\""),
2706 (DataType::Int16, "\"int\""),
2707 (DataType::Int32, "\"int\""),
2708 (DataType::Int64, "\"long\""),
2709 (DataType::UInt8, "\"int\""),
2710 (DataType::UInt16, "\"int\""),
2711 (DataType::UInt32, "\"long\""),
2712 (DataType::UInt64, "\"long\""),
2713 (DataType::Float16, "\"float\""),
2714 (DataType::Float32, "\"float\""),
2715 (DataType::Float64, "\"double\""),
2716 (DataType::Utf8, "\"string\""),
2717 (DataType::Binary, "\"bytes\""),
2718 ];
2719 for (dt, avro_token) in cases {
2720 let field = ArrowField::new("col", dt.clone(), false);
2721 let arrow_schema = single_field_schema(field);
2722 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2723 assert_json_contains(&avro.json_string, avro_token);
2724 }
2725 }
2726
2727 #[cfg(feature = "avro_custom_types")]
2728 #[test]
2729 fn test_primitive_mappings() {
2730 let cases = vec![
2731 (DataType::Boolean, "\"boolean\""),
2732 (DataType::Int8, "\"logicalType\":\"arrow.int8\""),
2733 (DataType::Int16, "\"logicalType\":\"arrow.int16\""),
2734 (DataType::Int32, "\"int\""),
2735 (DataType::Int64, "\"long\""),
2736 (DataType::UInt8, "\"logicalType\":\"arrow.uint8\""),
2737 (DataType::UInt16, "\"logicalType\":\"arrow.uint16\""),
2738 (DataType::UInt32, "\"logicalType\":\"arrow.uint32\""),
2739 (DataType::UInt64, "\"logicalType\":\"arrow.uint64\""),
2740 (DataType::Float16, "\"logicalType\":\"arrow.float16\""),
2741 (DataType::Float32, "\"float\""),
2742 (DataType::Float64, "\"double\""),
2743 (DataType::Utf8, "\"string\""),
2744 (DataType::Binary, "\"bytes\""),
2745 ];
2746 for (dt, avro_token) in cases {
2747 let field = ArrowField::new("col", dt.clone(), false);
2748 let arrow_schema = single_field_schema(field);
2749 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2750 assert_json_contains(&avro.json_string, avro_token);
2751 }
2752 }
2753
2754 #[cfg(feature = "avro_custom_types")]
2755 #[test]
2756 fn test_custom_fixed_logical_types_preserve_namespace_metadata() {
2757 let namespace = "com.example.types";
2758
2759 let mut md_u64 = HashMap::new();
2760 md_u64.insert(AVRO_NAME_METADATA_KEY.to_string(), "U64Type".to_string());
2761 md_u64.insert(
2762 AVRO_NAMESPACE_METADATA_KEY.to_string(),
2763 namespace.to_string(),
2764 );
2765
2766 let mut md_f16 = HashMap::new();
2767 md_f16.insert(AVRO_NAME_METADATA_KEY.to_string(), "F16Type".to_string());
2768 md_f16.insert(
2769 AVRO_NAMESPACE_METADATA_KEY.to_string(),
2770 namespace.to_string(),
2771 );
2772
2773 let mut md_iv_ym = HashMap::new();
2774 md_iv_ym.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvYmType".to_string());
2775 md_iv_ym.insert(
2776 AVRO_NAMESPACE_METADATA_KEY.to_string(),
2777 namespace.to_string(),
2778 );
2779
2780 let mut md_iv_dt = HashMap::new();
2781 md_iv_dt.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvDtType".to_string());
2782 md_iv_dt.insert(
2783 AVRO_NAMESPACE_METADATA_KEY.to_string(),
2784 namespace.to_string(),
2785 );
2786
2787 let arrow_schema = ArrowSchema::new(vec![
2788 ArrowField::new("u64_col", DataType::UInt64, false).with_metadata(md_u64),
2789 ArrowField::new("f16_col", DataType::Float16, false).with_metadata(md_f16),
2790 ArrowField::new(
2791 "iv_ym_col",
2792 DataType::Interval(IntervalUnit::YearMonth),
2793 false,
2794 )
2795 .with_metadata(md_iv_ym),
2796 ArrowField::new(
2797 "iv_dt_col",
2798 DataType::Interval(IntervalUnit::DayTime),
2799 false,
2800 )
2801 .with_metadata(md_iv_dt),
2802 ]);
2803
2804 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2805 let root: Value = serde_json::from_str(&avro.json_string).unwrap();
2806 let fields = root
2807 .get("fields")
2808 .and_then(|f| f.as_array())
2809 .expect("record fields array");
2810
2811 let expected = [
2812 ("u64_col", "arrow.uint64"),
2813 ("f16_col", "arrow.float16"),
2814 ("iv_ym_col", "arrow.interval-year-month"),
2815 ("iv_dt_col", "arrow.interval-day-time"),
2816 ];
2817
2818 for (field_name, logical_type) in expected {
2819 let field = fields
2820 .iter()
2821 .find(|f| f.get("name").and_then(Value::as_str) == Some(field_name))
2822 .unwrap_or_else(|| panic!("missing field {field_name}"));
2823 let ty = field
2824 .get("type")
2825 .and_then(Value::as_object)
2826 .unwrap_or_else(|| panic!("field {field_name} type must be object"));
2827
2828 assert_eq!(ty.get("type").and_then(Value::as_str), Some("fixed"));
2829 assert_eq!(
2830 ty.get("logicalType").and_then(Value::as_str),
2831 Some(logical_type)
2832 );
2833 assert_eq!(
2834 ty.get("namespace").and_then(Value::as_str),
2835 Some(namespace),
2836 "field {field_name} must preserve avro.namespace metadata"
2837 );
2838 }
2839 }
2840
2841 #[cfg(feature = "avro_custom_types")]
2842 #[test]
2843 fn test_custom_fixed_logical_types_omit_namespace_without_metadata() {
2844 let mut md_u64 = HashMap::new();
2845 md_u64.insert(AVRO_NAME_METADATA_KEY.to_string(), "U64Type".to_string());
2846
2847 let mut md_f16 = HashMap::new();
2848 md_f16.insert(AVRO_NAME_METADATA_KEY.to_string(), "F16Type".to_string());
2849
2850 let mut md_iv_ym = HashMap::new();
2851 md_iv_ym.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvYmType".to_string());
2852
2853 let mut md_iv_dt = HashMap::new();
2854 md_iv_dt.insert(AVRO_NAME_METADATA_KEY.to_string(), "IvDtType".to_string());
2855
2856 let arrow_schema = ArrowSchema::new(vec![
2857 ArrowField::new("u64_col", DataType::UInt64, false).with_metadata(md_u64),
2858 ArrowField::new("f16_col", DataType::Float16, false).with_metadata(md_f16),
2859 ArrowField::new(
2860 "iv_ym_col",
2861 DataType::Interval(IntervalUnit::YearMonth),
2862 false,
2863 )
2864 .with_metadata(md_iv_ym),
2865 ArrowField::new(
2866 "iv_dt_col",
2867 DataType::Interval(IntervalUnit::DayTime),
2868 false,
2869 )
2870 .with_metadata(md_iv_dt),
2871 ]);
2872
2873 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2874 let root: Value = serde_json::from_str(&avro.json_string).unwrap();
2875 let fields = root
2876 .get("fields")
2877 .and_then(|f| f.as_array())
2878 .expect("record fields array");
2879
2880 for field_name in ["u64_col", "f16_col", "iv_ym_col", "iv_dt_col"] {
2881 let field = fields
2882 .iter()
2883 .find(|f| f.get("name").and_then(Value::as_str) == Some(field_name))
2884 .unwrap_or_else(|| panic!("missing field {field_name}"));
2885 let ty = field
2886 .get("type")
2887 .and_then(Value::as_object)
2888 .unwrap_or_else(|| panic!("field {field_name} type must be object"));
2889
2890 assert_eq!(ty.get("type").and_then(Value::as_str), Some("fixed"));
2891 assert!(
2892 !ty.contains_key("namespace"),
2893 "field {field_name} should not include namespace when metadata lacks avro.namespace"
2894 );
2895 }
2896 }
2897
2898 #[test]
2899 fn test_temporal_mappings() {
2900 let cases = vec![
2901 (DataType::Date32, "\"logicalType\":\"date\""),
2902 (
2903 DataType::Time32(TimeUnit::Millisecond),
2904 "\"logicalType\":\"time-millis\"",
2905 ),
2906 (
2907 DataType::Time64(TimeUnit::Microsecond),
2908 "\"logicalType\":\"time-micros\"",
2909 ),
2910 (
2911 DataType::Timestamp(TimeUnit::Millisecond, None),
2912 "\"logicalType\":\"local-timestamp-millis\"",
2913 ),
2914 (
2915 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2916 "\"logicalType\":\"timestamp-micros\"",
2917 ),
2918 ];
2919 for (dt, needle) in cases {
2920 let field = ArrowField::new("ts", dt.clone(), true);
2921 let arrow_schema = single_field_schema(field);
2922 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2923 assert_json_contains(&avro.json_string, needle);
2924 }
2925 }
2926
2927 #[test]
2928 fn test_decimal_and_uuid() {
2929 let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2930 let dec_schema = single_field_schema(decimal_field);
2931 let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2932 assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2933 assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2934 assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2935 let mut md = HashMap::new();
2936 md.insert("logicalType".into(), "uuid".into());
2937 let uuid_field =
2938 ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2939 let uuid_schema = single_field_schema(uuid_field);
2940 let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2941 assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2942 }
2943
2944 #[cfg(not(feature = "avro_custom_types"))]
2945 #[test]
2946 fn test_interval_month_day_nano_duration_schema() {
2947 let interval_field = ArrowField::new(
2948 "span",
2949 DataType::Interval(IntervalUnit::MonthDayNano),
2950 false,
2951 );
2952 let s = single_field_schema(interval_field);
2953 let avro = AvroSchema::try_from(&s).unwrap();
2954 assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2955 assert_json_contains(&avro.json_string, "\"size\":12");
2956 }
2957
2958 #[cfg(feature = "avro_custom_types")]
2959 #[test]
2960 fn test_interval_month_day_nano_custom_schema() {
2961 let interval_field = ArrowField::new(
2962 "span",
2963 DataType::Interval(IntervalUnit::MonthDayNano),
2964 false,
2965 );
2966 let s = single_field_schema(interval_field);
2967 let avro = AvroSchema::try_from(&s).unwrap();
2968 assert_json_contains(
2969 &avro.json_string,
2970 "\"logicalType\":\"arrow.interval-month-day-nano\"",
2971 );
2972 assert_json_contains(&avro.json_string, "\"size\":16");
2973 }
2974
2975 #[cfg(feature = "avro_custom_types")]
2976 #[test]
2977 fn test_duration_custom_logical_type() {
2978 let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2979 let s2 = single_field_schema(dur_field);
2980 let avro2 = AvroSchema::try_from(&s2).unwrap();
2981 assert_json_contains(
2982 &avro2.json_string,
2983 "\"logicalType\":\"arrow.duration-nanos\"",
2984 );
2985 }
2986
2987 #[test]
2988 fn test_complex_types() {
2989 let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2990 let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2991 let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2992 assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2993 assert_json_contains(&avro_list.json_string, "\"items\"");
2994 let value_field = ArrowField::new("value", DataType::Boolean, true);
2995 let entries_struct = ArrowField::new(
2996 "entries",
2997 DataType::Struct(Fields::from(vec![
2998 ArrowField::new("key", DataType::Utf8, false),
2999 value_field.clone(),
3000 ])),
3001 false,
3002 );
3003 let map_dt = DataType::Map(Arc::new(entries_struct), false);
3004 let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
3005 let avro_map = AvroSchema::try_from(&map_schema).unwrap();
3006 assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
3007 assert_json_contains(&avro_map.json_string, "\"values\"");
3008 let struct_dt = DataType::Struct(Fields::from(vec![
3009 ArrowField::new("f1", DataType::Int64, false),
3010 ArrowField::new("f2", DataType::Utf8, true),
3011 ]));
3012 let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
3013 let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
3014 assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
3015 assert_json_contains(&avro_struct.json_string, "\"null\"");
3016 }
3017
3018 #[test]
3019 fn test_enum_dictionary() {
3020 let mut md = HashMap::new();
3021 md.insert(
3022 AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
3023 "[\"OPEN\",\"CLOSED\"]".into(),
3024 );
3025 let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
3026 let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
3027 let schema = single_field_schema(field);
3028 let avro = AvroSchema::try_from(&schema).unwrap();
3029 assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
3030 assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
3031 }
3032
3033 #[test]
3034 fn test_run_end_encoded() {
3035 let ree_dt = DataType::RunEndEncoded(
3036 Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
3037 Arc::new(ArrowField::new("values", DataType::Utf8, false)),
3038 );
3039 let s = single_field_schema(ArrowField::new("text", ree_dt, false));
3040 let avro = AvroSchema::try_from(&s).unwrap();
3041 assert_json_contains(&avro.json_string, "\"string\"");
3042 }
3043
3044 #[test]
3045 fn test_dense_union() {
3046 let uf: UnionFields = vec![
3047 (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3048 (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
3049 ]
3050 .into_iter()
3051 .collect();
3052 let union_dt = DataType::Union(uf, UnionMode::Dense);
3053 let s = single_field_schema(ArrowField::new("u", union_dt, false));
3054 let avro =
3055 AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
3056 let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
3057 let fields = v
3058 .get("fields")
3059 .and_then(|x| x.as_array())
3060 .expect("fields array");
3061 let u_field = fields
3062 .iter()
3063 .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
3064 .expect("field 'u'");
3065 let union = u_field.get("type").expect("u.type");
3066 let arr = union.as_array().expect("u.type must be Avro union array");
3067 assert_eq!(arr.len(), 2, "expected two union branches");
3068 let first = &arr[0];
3069 let obj = first
3070 .as_object()
3071 .expect("first branch should be an object with metadata");
3072 assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3073 assert_eq!(
3074 obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3075 Some("dense")
3076 );
3077 let type_ids: Vec<i64> = obj
3078 .get("arrowUnionTypeIds")
3079 .and_then(|a| a.as_array())
3080 .expect("arrowUnionTypeIds array")
3081 .iter()
3082 .map(|n| n.as_i64().expect("i64"))
3083 .collect();
3084 assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
3085 assert_eq!(arr[1], Value::String("string".into()));
3086 }
3087
3088 #[test]
3089 fn round_trip_primitive() {
3090 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
3091 let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
3092 let decoded = avro_schema.schema().unwrap();
3093 assert!(matches!(decoded, Schema::Complex(_)));
3094 }
3095
3096 #[test]
3097 fn test_name_generator_sanitization_and_uniqueness() {
3098 let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
3099 let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
3100 let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
3101 let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
3102 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
3103 assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
3104 assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
3105 assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
3106 }
3107
3108 #[cfg(not(feature = "avro_custom_types"))]
3109 #[test]
3110 fn test_date64_logical_type_mapping() {
3111 let field = ArrowField::new("d", DataType::Date64, true);
3112 let schema = single_field_schema(field);
3113 let avro = AvroSchema::try_from(&schema).unwrap();
3114 assert_json_contains(
3115 &avro.json_string,
3116 "\"logicalType\":\"local-timestamp-millis\"",
3117 );
3118 }
3119
3120 #[cfg(feature = "avro_custom_types")]
3121 #[test]
3122 fn test_date64_logical_type_mapping_custom() {
3123 let field = ArrowField::new("d", DataType::Date64, true);
3124 let schema = single_field_schema(field);
3125 let avro = AvroSchema::try_from(&schema).unwrap();
3126 assert_json_contains(&avro.json_string, "\"logicalType\":\"arrow.date64\"");
3127 }
3128
3129 #[cfg(feature = "avro_custom_types")]
3130 #[test]
3131 fn test_duration_list_extras_propagated() {
3132 let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
3133 let list_dt = DataType::List(Arc::new(child));
3134 let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
3135 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
3136 assert_json_contains(
3137 &avro.json_string,
3138 "\"logicalType\":\"arrow.duration-micros\"",
3139 );
3140 }
3141
3142 #[cfg(not(feature = "avro_custom_types"))]
3143 #[test]
3144 fn test_interval_yearmonth_extra() {
3145 let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
3146 let schema = single_field_schema(field);
3147 let avro = AvroSchema::try_from(&schema).unwrap();
3148 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
3149 }
3150
3151 #[cfg(not(feature = "avro_custom_types"))]
3152 #[test]
3153 fn test_interval_daytime_extra() {
3154 let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
3155 let schema = single_field_schema(field);
3156 let avro = AvroSchema::try_from(&schema).unwrap();
3157 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
3158 }
3159
3160 #[cfg(feature = "avro_custom_types")]
3161 #[test]
3162 fn test_interval_yearmonth_custom() {
3163 let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
3164 let schema = single_field_schema(field);
3165 let avro = AvroSchema::try_from(&schema).unwrap();
3166 assert_json_contains(
3167 &avro.json_string,
3168 "\"logicalType\":\"arrow.interval-year-month\"",
3169 );
3170 }
3171
3172 #[cfg(feature = "avro_custom_types")]
3173 #[test]
3174 fn test_interval_daytime_custom() {
3175 let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
3176 let schema = single_field_schema(field);
3177 let avro = AvroSchema::try_from(&schema).unwrap();
3178 assert_json_contains(
3179 &avro.json_string,
3180 "\"logicalType\":\"arrow.interval-day-time\"",
3181 );
3182 }
3183
3184 #[test]
3185 fn test_fixed_size_list_extra() {
3186 let child = ArrowField::new("item", DataType::Int32, false);
3187 let dt = DataType::FixedSizeList(Arc::new(child), 3);
3188 let schema = single_field_schema(ArrowField::new("triples", dt, false));
3189 let avro = AvroSchema::try_from(&schema).unwrap();
3190 assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
3191 }
3192
3193 #[cfg(feature = "avro_custom_types")]
3194 #[test]
3195 fn test_map_duration_value_extra() {
3196 let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
3197 let entries_struct = ArrowField::new(
3198 "entries",
3199 DataType::Struct(Fields::from(vec![
3200 ArrowField::new("key", DataType::Utf8, false),
3201 val_field,
3202 ])),
3203 false,
3204 );
3205 let map_dt = DataType::Map(Arc::new(entries_struct), false);
3206 let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
3207 let avro = AvroSchema::try_from(&schema).unwrap();
3208 assert_json_contains(
3209 &avro.json_string,
3210 "\"logicalType\":\"arrow.duration-seconds\"",
3211 );
3212 }
3213
3214 #[test]
3215 fn test_schema_with_non_string_defaults_decodes_successfully() {
3216 let schema_json = r#"{
3217 "type": "record",
3218 "name": "R",
3219 "fields": [
3220 {"name": "a", "type": "int", "default": 0},
3221 {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
3222 {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
3223 {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
3224 {"name": "flag", "type": "boolean", "default": true},
3225 {"name": "name", "type": "string", "default": "hi"}
3226 ]}, "default": {"flag": false, "name": "d"}},
3227 {"name": "u", "type": ["int", "null"], "default": 42}
3228 ]
3229 }"#;
3230 let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
3231 match &schema {
3232 Schema::Complex(ComplexType::Record(_)) => {}
3233 other => panic!("expected record schema, got: {:?}", other),
3234 }
3235 let field = crate::codec::AvroField::try_from(&schema)
3237 .expect("Avro->Arrow conversion should succeed");
3238 let arrow_field = field.field();
3239 let expected_list_item = ArrowField::new(
3241 arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
3242 DataType::Int64,
3243 false,
3244 );
3245 let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
3246
3247 let expected_map_value = ArrowField::new("value", DataType::Float64, false);
3248 let expected_entries = ArrowField::new(
3249 "entries",
3250 DataType::Struct(Fields::from(vec![
3251 ArrowField::new("key", DataType::Utf8, false),
3252 expected_map_value,
3253 ])),
3254 false,
3255 );
3256 let expected_c =
3257 ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
3258 let mut inner_md = std::collections::HashMap::new();
3259 inner_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "Inner".to_string());
3260 let expected_inner = ArrowField::new(
3261 "inner",
3262 DataType::Struct(Fields::from(vec![
3263 ArrowField::new("flag", DataType::Boolean, false),
3264 ArrowField::new("name", DataType::Utf8, false),
3265 ])),
3266 false,
3267 )
3268 .with_metadata(inner_md);
3269 let mut root_md = std::collections::HashMap::new();
3270 root_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "R".to_string());
3271 let expected = ArrowField::new(
3272 "R",
3273 DataType::Struct(Fields::from(vec![
3274 ArrowField::new("a", DataType::Int32, false),
3275 expected_b,
3276 expected_c,
3277 expected_inner,
3278 ArrowField::new("u", DataType::Int32, true),
3279 ])),
3280 false,
3281 )
3282 .with_metadata(root_md);
3283 assert_eq!(arrow_field, expected);
3284 }
3285
3286 #[test]
3287 fn default_order_is_consistent() {
3288 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
3289 let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
3290 let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
3291 assert_eq!(a, b.unwrap().json_string);
3292 }
3293
3294 #[test]
3295 fn test_union_branch_missing_name_errors() {
3296 for t in ["record", "enum", "fixed"] {
3297 let branch = json!({ "type": t });
3298 let err = union_branch_signature(&branch).unwrap_err().to_string();
3299 assert!(
3300 err.contains(&format!("Union branch '{t}' missing required 'name'")),
3301 "expected missing-name error for {t}, got: {err}"
3302 );
3303 }
3304 }
3305
3306 #[test]
3307 fn test_union_branch_named_type_signature_includes_name() {
3308 let rec = json!({ "type": "record", "name": "Foo" });
3309 assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
3310 let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
3311 assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
3312 let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
3313 assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
3314 }
3315
3316 #[test]
3317 fn test_record_field_alias_resolution_without_default() {
3318 let writer_json = r#"{
3319 "type":"record",
3320 "name":"R",
3321 "fields":[{"name":"old","type":"int"}]
3322 }"#;
3323 let reader_json = r#"{
3324 "type":"record",
3325 "name":"R",
3326 "fields":[{"name":"new","aliases":["old"],"type":"int"}]
3327 }"#;
3328 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3329 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3330 let resolved = AvroFieldBuilder::new(&writer)
3331 .with_reader_schema(&reader)
3332 .with_utf8view(false)
3333 .with_strict_mode(false)
3334 .build()
3335 .unwrap();
3336 let expected = ArrowField::new(
3337 "R",
3338 DataType::Struct(Fields::from(vec![ArrowField::new(
3339 "new",
3340 DataType::Int32,
3341 false,
3342 )])),
3343 false,
3344 )
3345 .with_metadata(HashMap::from_iter([(
3346 "avro.name".to_owned(),
3347 "R".to_owned(),
3348 )]));
3349 assert_eq!(resolved.field(), expected);
3350 }
3351
3352 #[test]
3353 fn test_record_field_alias_ambiguous_in_strict_mode_errors() {
3354 let writer_json = r#"{
3355 "type":"record",
3356 "name":"R",
3357 "fields":[
3358 {"name":"a","type":"int","aliases":["old"]},
3359 {"name":"b","type":"int","aliases":["old"]}
3360 ]
3361 }"#;
3362 let reader_json = r#"{
3363 "type":"record",
3364 "name":"R",
3365 "fields":[{"name":"target","type":"int","aliases":["old"]}]
3366 }"#;
3367 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3368 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3369 let err = AvroFieldBuilder::new(&writer)
3370 .with_reader_schema(&reader)
3371 .with_utf8view(false)
3372 .with_strict_mode(true)
3373 .build()
3374 .unwrap_err()
3375 .to_string();
3376 assert!(
3377 err.contains("Ambiguous alias 'old'"),
3378 "expected ambiguous-alias error, got: {err}"
3379 );
3380 }
3381
3382 #[test]
3383 fn test_pragmatic_writer_field_alias_mapping_non_strict() {
3384 let writer_json = r#"{
3385 "type":"record",
3386 "name":"R",
3387 "fields":[{"name":"before","type":"int","aliases":["now"]}]
3388 }"#;
3389 let reader_json = r#"{
3390 "type":"record",
3391 "name":"R",
3392 "fields":[{"name":"now","type":"int"}]
3393 }"#;
3394 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3395 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3396 let resolved = AvroFieldBuilder::new(&writer)
3397 .with_reader_schema(&reader)
3398 .with_utf8view(false)
3399 .with_strict_mode(false)
3400 .build()
3401 .unwrap();
3402 let expected = ArrowField::new(
3403 "R",
3404 DataType::Struct(Fields::from(vec![ArrowField::new(
3405 "now",
3406 DataType::Int32,
3407 false,
3408 )])),
3409 false,
3410 )
3411 .with_metadata(HashMap::from_iter([(
3412 "avro.name".to_owned(),
3413 "R".to_owned(),
3414 )]));
3415 assert_eq!(resolved.field(), expected);
3416 }
3417
3418 #[test]
3419 fn test_missing_reader_field_null_first_no_default_is_ok() {
3420 let writer_json = r#"{
3421 "type":"record",
3422 "name":"R",
3423 "fields":[{"name":"a","type":"int"}]
3424 }"#;
3425 let reader_json = r#"{
3426 "type":"record",
3427 "name":"R",
3428 "fields":[
3429 {"name":"a","type":"int"},
3430 {"name":"b","type":["null","int"]}
3431 ]
3432 }"#;
3433 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3434 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3435 let resolved = AvroFieldBuilder::new(&writer)
3436 .with_reader_schema(&reader)
3437 .with_utf8view(false)
3438 .with_strict_mode(false)
3439 .build()
3440 .unwrap();
3441 let expected = ArrowField::new(
3442 "R",
3443 DataType::Struct(Fields::from(vec![
3444 ArrowField::new("a", DataType::Int32, false),
3445 ArrowField::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
3446 AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(),
3447 "null".to_string(),
3448 )])),
3449 ])),
3450 false,
3451 )
3452 .with_metadata(HashMap::from_iter([(
3453 "avro.name".to_owned(),
3454 "R".to_owned(),
3455 )]));
3456 assert_eq!(resolved.field(), expected);
3457 }
3458
3459 #[test]
3460 fn test_missing_reader_field_null_second_without_default_errors() {
3461 let writer_json = r#"{
3462 "type":"record",
3463 "name":"R",
3464 "fields":[{"name":"a","type":"int"}]
3465 }"#;
3466 let reader_json = r#"{
3467 "type":"record",
3468 "name":"R",
3469 "fields":[
3470 {"name":"a","type":"int"},
3471 {"name":"b","type":["int","null"]}
3472 ]
3473 }"#;
3474 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3475 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3476 let err = AvroFieldBuilder::new(&writer)
3477 .with_reader_schema(&reader)
3478 .with_utf8view(false)
3479 .with_strict_mode(false)
3480 .build()
3481 .unwrap_err()
3482 .to_string();
3483 assert!(
3484 err.contains("must have a default value"),
3485 "expected missing-default error, got: {err}"
3486 );
3487 }
3488
3489 #[test]
3490 fn test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
3491 let field = ArrowField::new("x", DataType::Int32, true);
3492 let injected_json =
3493 r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3494 .to_string();
3495 let mut md = HashMap::new();
3496 md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
3497 md.insert("custom".to_string(), "123".to_string());
3498 let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3499 let opts = AvroSchemaOptions {
3500 null_order: Some(Nullability::NullSecond),
3501 strip_metadata: false,
3502 };
3503 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3504 assert_eq!(
3505 out.json_string, injected_json,
3506 "When strip_metadata=false and avro.schema is present, return the embedded JSON verbatim"
3507 );
3508 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3509 assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
3510 assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
3511 }
3512
3513 #[test]
3514 fn test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough() {
3515 let field = ArrowField::new("x", DataType::Int32, true);
3516 let injected_json =
3517 r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3518 .to_string();
3519 let mut md = HashMap::new();
3520 md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
3521 md.insert("custom_meta".to_string(), "7".to_string());
3522 let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3523 let opts = AvroSchemaOptions {
3524 null_order: Some(Nullability::NullFirst),
3525 strip_metadata: true,
3526 };
3527 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3528 assert_json_contains(&out.json_string, "\"type\":\"record\"");
3529 assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
3530 assert_json_contains(&out.json_string, "\"custom_meta\":7");
3531 }
3532
3533 #[test]
3534 fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
3535 let field = ArrowField::new("s", DataType::Utf8, true);
3536 let arrow_schema = single_field_schema(field);
3537 let opts = AvroSchemaOptions {
3538 null_order: Some(Nullability::NullFirst),
3539 strip_metadata: true,
3540 };
3541 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3542 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3543 let arr = v["fields"][0]["type"]
3544 .as_array()
3545 .expect("nullable primitive should be Avro union array");
3546 assert_eq!(arr[0], Value::String("null".into()));
3547 assert_eq!(arr[1], Value::String("string".into()));
3548 }
3549
3550 #[test]
3551 fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
3552 let field = ArrowField::new("s", DataType::Utf8, true);
3553 let arrow_schema = single_field_schema(field);
3554 let opts = AvroSchemaOptions {
3555 null_order: Some(Nullability::NullSecond),
3556 strip_metadata: true,
3557 };
3558 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3559 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3560 let arr = v["fields"][0]["type"]
3561 .as_array()
3562 .expect("nullable primitive should be Avro union array");
3563 assert_eq!(arr[0], Value::String("string".into()));
3564 assert_eq!(arr[1], Value::String("null".into()));
3565 }
3566
3567 #[test]
3568 fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata() {
3569 let uf: UnionFields = vec![
3570 (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3571 (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
3572 ]
3573 .into_iter()
3574 .collect();
3575 let union_dt = DataType::Union(uf, UnionMode::Dense);
3576 let arrow_schema = single_field_schema(ArrowField::new("u", union_dt, true));
3577 let with_extras = AvroSchema::from_arrow_with_options(
3578 &arrow_schema,
3579 Some(AvroSchemaOptions {
3580 null_order: Some(Nullability::NullFirst),
3581 strip_metadata: false,
3582 }),
3583 )
3584 .unwrap();
3585 let v_with: Value = serde_json::from_str(&with_extras.json_string).unwrap();
3586 let union_arr = v_with["fields"][0]["type"].as_array().expect("union array");
3587 let first_obj = union_arr
3588 .iter()
3589 .find(|b| b.is_object())
3590 .expect("expected an object branch with extras");
3591 let obj = first_obj.as_object().unwrap();
3592 assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3593 assert_eq!(
3594 obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3595 Some("dense")
3596 );
3597 let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
3598 .as_array()
3599 .expect("arrowUnionTypeIds array")
3600 .iter()
3601 .map(|n| n.as_i64().expect("i64"))
3602 .collect();
3603 assert_eq!(type_ids, vec![2, 7]);
3604 let stripped = AvroSchema::from_arrow_with_options(
3605 &arrow_schema,
3606 Some(AvroSchemaOptions {
3607 null_order: Some(Nullability::NullFirst),
3608 strip_metadata: true,
3609 }),
3610 )
3611 .unwrap();
3612 let v_stripped: Value = serde_json::from_str(&stripped.json_string).unwrap();
3613 let union_arr2 = v_stripped["fields"][0]["type"]
3614 .as_array()
3615 .expect("union array");
3616 assert!(
3617 !union_arr2.iter().any(|b| b
3618 .as_object()
3619 .is_some_and(|m| m.contains_key("arrowUnionMode"))),
3620 "extras must be removed when strip_metadata=true"
3621 );
3622 assert_eq!(union_arr2[0], Value::String("null".into()));
3623 assert_eq!(union_arr2[1], Value::String("int".into()));
3624 assert_eq!(union_arr2[2], Value::String("string".into()));
3625 }
3626
3627 #[test]
3628 fn test_project_empty_projection() {
3629 let schema_json = r#"{
3630 "type": "record",
3631 "name": "Test",
3632 "fields": [
3633 {"name": "a", "type": "int"},
3634 {"name": "b", "type": "string"}
3635 ]
3636 }"#;
3637 let schema = AvroSchema::new(schema_json.to_string());
3638 let projected = schema.project(&[]).unwrap();
3639 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3640 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3641 assert!(
3642 fields.is_empty(),
3643 "Empty projection should yield empty fields"
3644 );
3645 }
3646
3647 #[test]
3648 fn test_project_single_field() {
3649 let schema_json = r#"{
3650 "type": "record",
3651 "name": "Test",
3652 "fields": [
3653 {"name": "a", "type": "int"},
3654 {"name": "b", "type": "string"},
3655 {"name": "c", "type": "long"}
3656 ]
3657 }"#;
3658 let schema = AvroSchema::new(schema_json.to_string());
3659 let projected = schema.project(&[1]).unwrap();
3660 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3661 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3662 assert_eq!(fields.len(), 1);
3663 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("b"));
3664 }
3665
3666 #[test]
3667 fn test_project_multiple_fields() {
3668 let schema_json = r#"{
3669 "type": "record",
3670 "name": "Test",
3671 "fields": [
3672 {"name": "a", "type": "int"},
3673 {"name": "b", "type": "string"},
3674 {"name": "c", "type": "long"},
3675 {"name": "d", "type": "boolean"}
3676 ]
3677 }"#;
3678 let schema = AvroSchema::new(schema_json.to_string());
3679 let projected = schema.project(&[0, 2, 3]).unwrap();
3680 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3681 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3682 assert_eq!(fields.len(), 3);
3683 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3684 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("c"));
3685 assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("d"));
3686 }
3687
3688 #[test]
3689 fn test_project_all_fields() {
3690 let schema_json = r#"{
3691 "type": "record",
3692 "name": "Test",
3693 "fields": [
3694 {"name": "a", "type": "int"},
3695 {"name": "b", "type": "string"}
3696 ]
3697 }"#;
3698 let schema = AvroSchema::new(schema_json.to_string());
3699 let projected = schema.project(&[0, 1]).unwrap();
3700 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3701 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3702 assert_eq!(fields.len(), 2);
3703 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3704 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("b"));
3705 }
3706
3707 #[test]
3708 fn test_project_reorder_fields() {
3709 let schema_json = r#"{
3710 "type": "record",
3711 "name": "Test",
3712 "fields": [
3713 {"name": "a", "type": "int"},
3714 {"name": "b", "type": "string"},
3715 {"name": "c", "type": "long"}
3716 ]
3717 }"#;
3718 let schema = AvroSchema::new(schema_json.to_string());
3719 let projected = schema.project(&[2, 0, 1]).unwrap();
3721 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3722 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3723 assert_eq!(fields.len(), 3);
3724 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("c"));
3725 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("a"));
3726 assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("b"));
3727 }
3728
3729 #[test]
3730 fn test_project_preserves_record_metadata() {
3731 let schema_json = r#"{
3732 "type": "record",
3733 "name": "MyRecord",
3734 "namespace": "com.example",
3735 "doc": "A test record",
3736 "aliases": ["OldRecord"],
3737 "fields": [
3738 {"name": "a", "type": "int"},
3739 {"name": "b", "type": "string"}
3740 ]
3741 }"#;
3742 let schema = AvroSchema::new(schema_json.to_string());
3743 let projected = schema.project(&[0]).unwrap();
3744 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3745 assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("MyRecord"));
3746 assert_eq!(
3747 v.get("namespace").and_then(|n| n.as_str()),
3748 Some("com.example")
3749 );
3750 assert_eq!(v.get("doc").and_then(|n| n.as_str()), Some("A test record"));
3751 assert!(v.get("aliases").is_some());
3752 }
3753
3754 #[test]
3755 fn test_project_preserves_field_metadata() {
3756 let schema_json = r#"{
3757 "type": "record",
3758 "name": "Test",
3759 "fields": [
3760 {"name": "a", "type": "int", "doc": "Field A", "default": 0},
3761 {"name": "b", "type": "string"}
3762 ]
3763 }"#;
3764 let schema = AvroSchema::new(schema_json.to_string());
3765 let projected = schema.project(&[0]).unwrap();
3766 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3767 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3768 assert_eq!(
3769 fields[0].get("doc").and_then(|d| d.as_str()),
3770 Some("Field A")
3771 );
3772 assert_eq!(fields[0].get("default").and_then(|d| d.as_i64()), Some(0));
3773 }
3774
3775 #[test]
3776 fn test_project_with_nested_record() {
3777 let schema_json = r#"{
3778 "type": "record",
3779 "name": "Outer",
3780 "fields": [
3781 {"name": "id", "type": "int"},
3782 {"name": "inner", "type": {
3783 "type": "record",
3784 "name": "Inner",
3785 "fields": [
3786 {"name": "x", "type": "int"},
3787 {"name": "y", "type": "string"}
3788 ]
3789 }},
3790 {"name": "value", "type": "double"}
3791 ]
3792 }"#;
3793 let schema = AvroSchema::new(schema_json.to_string());
3794 let projected = schema.project(&[1]).unwrap();
3795 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3796 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3797 assert_eq!(fields.len(), 1);
3798 assert_eq!(
3799 fields[0].get("name").and_then(|n| n.as_str()),
3800 Some("inner")
3801 );
3802 let inner_type = fields[0].get("type").unwrap();
3804 assert_eq!(
3805 inner_type.get("type").and_then(|t| t.as_str()),
3806 Some("record")
3807 );
3808 assert_eq!(
3809 inner_type.get("name").and_then(|n| n.as_str()),
3810 Some("Inner")
3811 );
3812 }
3813
3814 #[test]
3815 fn test_project_with_complex_field_types() {
3816 let schema_json = r#"{
3817 "type": "record",
3818 "name": "Test",
3819 "fields": [
3820 {"name": "arr", "type": {"type": "array", "items": "int"}},
3821 {"name": "map", "type": {"type": "map", "values": "string"}},
3822 {"name": "union", "type": ["null", "int"]}
3823 ]
3824 }"#;
3825 let schema = AvroSchema::new(schema_json.to_string());
3826 let projected = schema.project(&[0, 2]).unwrap();
3827 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3828 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3829 assert_eq!(fields.len(), 2);
3830 let arr_type = fields[0].get("type").unwrap();
3832 assert_eq!(arr_type.get("type").and_then(|t| t.as_str()), Some("array"));
3833 let union_type = fields[1].get("type").unwrap();
3835 assert!(union_type.is_array());
3836 }
3837
3838 #[test]
3839 fn test_project_error_invalid_json() {
3840 let schema = AvroSchema::new("not valid json".to_string());
3841 let err = schema.project(&[0]).unwrap_err();
3842 let msg = err.to_string();
3843 assert!(
3844 msg.contains("Invalid Avro schema JSON"),
3845 "Expected parse error, got: {msg}"
3846 );
3847 }
3848
3849 #[test]
3850 fn test_project_error_not_object() {
3851 let schema = AvroSchema::new(r#""string""#.to_string());
3853 let err = schema.project(&[0]).unwrap_err();
3854 let msg = err.to_string();
3855 assert!(
3856 msg.contains("must be a JSON object"),
3857 "Expected object error, got: {msg}"
3858 );
3859 }
3860
3861 #[test]
3862 fn test_project_error_array_schema() {
3863 let schema = AvroSchema::new(r#"["null", "int"]"#.to_string());
3865 let err = schema.project(&[0]).unwrap_err();
3866 let msg = err.to_string();
3867 assert!(
3868 msg.contains("must be a JSON object"),
3869 "Expected object error for array schema, got: {msg}"
3870 );
3871 }
3872
3873 #[test]
3874 fn test_project_error_type_not_record() {
3875 let schema_json = r#"{
3876 "type": "enum",
3877 "name": "Color",
3878 "symbols": ["RED", "GREEN", "BLUE"]
3879 }"#;
3880 let schema = AvroSchema::new(schema_json.to_string());
3881 let err = schema.project(&[0]).unwrap_err();
3882 let msg = err.to_string();
3883 assert!(
3884 msg.contains("must be an Avro record") && msg.contains("'enum'"),
3885 "Expected type mismatch error, got: {msg}"
3886 );
3887 }
3888
3889 #[test]
3890 fn test_project_error_type_array() {
3891 let schema_json = r#"{
3892 "type": "array",
3893 "items": "int"
3894 }"#;
3895 let schema = AvroSchema::new(schema_json.to_string());
3896 let err = schema.project(&[0]).unwrap_err();
3897 let msg = err.to_string();
3898 assert!(
3899 msg.contains("must be an Avro record") && msg.contains("'array'"),
3900 "Expected type mismatch error for array type, got: {msg}"
3901 );
3902 }
3903
3904 #[test]
3905 fn test_project_error_type_fixed() {
3906 let schema_json = r#"{
3907 "type": "fixed",
3908 "name": "MD5",
3909 "size": 16
3910 }"#;
3911 let schema = AvroSchema::new(schema_json.to_string());
3912 let err = schema.project(&[0]).unwrap_err();
3913 let msg = err.to_string();
3914 assert!(
3915 msg.contains("must be an Avro record") && msg.contains("'fixed'"),
3916 "Expected type mismatch error for fixed type, got: {msg}"
3917 );
3918 }
3919
3920 #[test]
3921 fn test_project_error_type_map() {
3922 let schema_json = r#"{
3923 "type": "map",
3924 "values": "string"
3925 }"#;
3926 let schema = AvroSchema::new(schema_json.to_string());
3927 let err = schema.project(&[0]).unwrap_err();
3928 let msg = err.to_string();
3929 assert!(
3930 msg.contains("must be an Avro record") && msg.contains("'map'"),
3931 "Expected type mismatch error for map type, got: {msg}"
3932 );
3933 }
3934
3935 #[test]
3936 fn test_project_error_missing_type_field() {
3937 let schema_json = r#"{
3938 "name": "Test",
3939 "fields": [{"name": "a", "type": "int"}]
3940 }"#;
3941 let schema = AvroSchema::new(schema_json.to_string());
3942 let err = schema.project(&[0]).unwrap_err();
3943 let msg = err.to_string();
3944 assert!(
3945 msg.contains("missing required 'type' field"),
3946 "Expected missing type error, got: {msg}"
3947 );
3948 }
3949
3950 #[test]
3951 fn test_project_error_missing_fields() {
3952 let schema_json = r#"{
3953 "type": "record",
3954 "name": "Test"
3955 }"#;
3956 let schema = AvroSchema::new(schema_json.to_string());
3957 let err = schema.project(&[0]).unwrap_err();
3958 let msg = err.to_string();
3959 assert!(
3960 msg.contains("missing required 'fields'"),
3961 "Expected missing fields error, got: {msg}"
3962 );
3963 }
3964
3965 #[test]
3966 fn test_project_error_fields_not_array() {
3967 let schema_json = r#"{
3968 "type": "record",
3969 "name": "Test",
3970 "fields": "not an array"
3971 }"#;
3972 let schema = AvroSchema::new(schema_json.to_string());
3973 let err = schema.project(&[0]).unwrap_err();
3974 let msg = err.to_string();
3975 assert!(
3976 msg.contains("'fields' must be an array"),
3977 "Expected fields array error, got: {msg}"
3978 );
3979 }
3980
3981 #[test]
3982 fn test_project_error_index_out_of_bounds() {
3983 let schema_json = r#"{
3984 "type": "record",
3985 "name": "Test",
3986 "fields": [
3987 {"name": "a", "type": "int"},
3988 {"name": "b", "type": "string"}
3989 ]
3990 }"#;
3991 let schema = AvroSchema::new(schema_json.to_string());
3992 let err = schema.project(&[5]).unwrap_err();
3993 let msg = err.to_string();
3994 assert!(
3995 msg.contains("out of bounds") && msg.contains("5") && msg.contains("2"),
3996 "Expected out of bounds error, got: {msg}"
3997 );
3998 }
3999
4000 #[test]
4001 fn test_project_error_index_out_of_bounds_edge() {
4002 let schema_json = r#"{
4003 "type": "record",
4004 "name": "Test",
4005 "fields": [
4006 {"name": "a", "type": "int"}
4007 ]
4008 }"#;
4009 let schema = AvroSchema::new(schema_json.to_string());
4010 let err = schema.project(&[1]).unwrap_err();
4012 let msg = err.to_string();
4013 assert!(
4014 msg.contains("out of bounds") && msg.contains("1"),
4015 "Expected out of bounds error for edge case, got: {msg}"
4016 );
4017 }
4018
4019 #[test]
4020 fn test_project_error_duplicate_index() {
4021 let schema_json = r#"{
4022 "type": "record",
4023 "name": "Test",
4024 "fields": [
4025 {"name": "a", "type": "int"},
4026 {"name": "b", "type": "string"},
4027 {"name": "c", "type": "long"}
4028 ]
4029 }"#;
4030 let schema = AvroSchema::new(schema_json.to_string());
4031 let err = schema.project(&[0, 1, 0]).unwrap_err();
4032 let msg = err.to_string();
4033 assert!(
4034 msg.contains("Duplicate projection index") && msg.contains("0"),
4035 "Expected duplicate index error, got: {msg}"
4036 );
4037 }
4038
4039 #[test]
4040 fn test_project_error_duplicate_index_consecutive() {
4041 let schema_json = r#"{
4042 "type": "record",
4043 "name": "Test",
4044 "fields": [
4045 {"name": "a", "type": "int"},
4046 {"name": "b", "type": "string"}
4047 ]
4048 }"#;
4049 let schema = AvroSchema::new(schema_json.to_string());
4050 let err = schema.project(&[1, 1]).unwrap_err();
4051 let msg = err.to_string();
4052 assert!(
4053 msg.contains("Duplicate projection index") && msg.contains("1"),
4054 "Expected duplicate index error for consecutive duplicates, got: {msg}"
4055 );
4056 }
4057
4058 #[test]
4059 fn test_project_with_empty_fields() {
4060 let schema_json = r#"{
4061 "type": "record",
4062 "name": "EmptyRecord",
4063 "fields": []
4064 }"#;
4065 let schema = AvroSchema::new(schema_json.to_string());
4066 let projected = schema.project(&[]).unwrap();
4068 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4069 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4070 assert!(fields.is_empty());
4071 }
4072
4073 #[test]
4074 fn test_project_empty_fields_index_out_of_bounds() {
4075 let schema_json = r#"{
4076 "type": "record",
4077 "name": "EmptyRecord",
4078 "fields": []
4079 }"#;
4080 let schema = AvroSchema::new(schema_json.to_string());
4081 let err = schema.project(&[0]).unwrap_err();
4082 let msg = err.to_string();
4083 assert!(
4084 msg.contains("out of bounds") && msg.contains("0 fields"),
4085 "Expected out of bounds error for empty record, got: {msg}"
4086 );
4087 }
4088
4089 #[test]
4090 fn test_project_result_is_valid_avro_schema() {
4091 let schema_json = r#"{
4092 "type": "record",
4093 "name": "Test",
4094 "namespace": "com.example",
4095 "fields": [
4096 {"name": "id", "type": "long"},
4097 {"name": "name", "type": "string"},
4098 {"name": "active", "type": "boolean"}
4099 ]
4100 }"#;
4101 let schema = AvroSchema::new(schema_json.to_string());
4102 let projected = schema.project(&[0, 2]).unwrap();
4103 let parsed = projected.schema();
4105 assert!(parsed.is_ok(), "Projected schema should be valid Avro");
4106 match parsed.unwrap() {
4107 Schema::Complex(ComplexType::Record(r)) => {
4108 assert_eq!(r.name, "Test");
4109 assert_eq!(r.namespace, Some("com.example"));
4110 assert_eq!(r.fields.len(), 2);
4111 assert_eq!(r.fields[0].name, "id");
4112 assert_eq!(r.fields[1].name, "active");
4113 }
4114 _ => panic!("Expected Record schema"),
4115 }
4116 }
4117
4118 #[test]
4119 fn test_project_non_contiguous_indices() {
4120 let schema_json = r#"{
4121 "type": "record",
4122 "name": "Test",
4123 "fields": [
4124 {"name": "f0", "type": "int"},
4125 {"name": "f1", "type": "int"},
4126 {"name": "f2", "type": "int"},
4127 {"name": "f3", "type": "int"},
4128 {"name": "f4", "type": "int"}
4129 ]
4130 }"#;
4131 let schema = AvroSchema::new(schema_json.to_string());
4132 let projected = schema.project(&[0, 2, 4]).unwrap();
4134 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4135 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4136 assert_eq!(fields.len(), 3);
4137 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f0"));
4138 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("f2"));
4139 assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("f4"));
4140 }
4141
4142 #[test]
4143 fn test_project_single_field_from_many() {
4144 let schema_json = r#"{
4145 "type": "record",
4146 "name": "BigRecord",
4147 "fields": [
4148 {"name": "f0", "type": "int"},
4149 {"name": "f1", "type": "int"},
4150 {"name": "f2", "type": "int"},
4151 {"name": "f3", "type": "int"},
4152 {"name": "f4", "type": "int"},
4153 {"name": "f5", "type": "int"},
4154 {"name": "f6", "type": "int"},
4155 {"name": "f7", "type": "int"},
4156 {"name": "f8", "type": "int"},
4157 {"name": "f9", "type": "int"}
4158 ]
4159 }"#;
4160 let schema = AvroSchema::new(schema_json.to_string());
4161 let projected = schema.project(&[9]).unwrap();
4163 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
4164 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
4165 assert_eq!(fields.len(), 1);
4166 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f9"));
4167 }
4168}