1use arrow_schema::{
19 ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
20 UnionMode,
21};
22use serde::{Deserialize, Serialize};
23use serde_json::{json, Map as JsonMap, Value};
24#[cfg(feature = "sha256")]
25use sha2::{Digest, Sha256};
26use std::cmp::PartialEq;
27use std::collections::hash_map::Entry;
28use std::collections::{HashMap, HashSet};
29use strum_macros::AsRefStr;
30
31pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
33
34pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
36
37pub const MAX_PREFIX_LEN: usize = 34;
40
41pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
43
44pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
46
47pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
49
50pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
52
53pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
55
56pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
58
59pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
61
62pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result<bool, ArrowError> {
65 let canon_writer = AvroSchema::generate_canonical_form(writer)?;
66 let canon_reader = AvroSchema::generate_canonical_form(reader)?;
67 Ok(canon_writer == canon_reader)
68}
69
70#[derive(Debug, Copy, Clone, PartialEq, Default)]
76pub enum Nullability {
77 #[default]
79 NullFirst,
80 NullSecond,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88#[serde(untagged)]
89pub enum TypeName<'a> {
93 Primitive(PrimitiveType),
95 Ref(&'a str),
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
103#[serde(rename_all = "camelCase")]
104#[strum(serialize_all = "lowercase")]
105pub enum PrimitiveType {
106 Null,
108 Boolean,
110 Int,
112 Long,
114 Float,
116 Double,
118 Bytes,
120 String,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct Attributes<'a> {
130 #[serde(default)]
134 pub logical_type: Option<&'a str>,
135
136 #[serde(flatten)]
138 pub additional: HashMap<&'a str, Value>,
139}
140
141impl Attributes<'_> {
142 pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
144 self.additional
145 .iter()
146 .map(|(k, v)| (k.to_string(), v.to_string()))
147 .collect()
148 }
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
153#[serde(rename_all = "camelCase")]
154pub struct Type<'a> {
155 #[serde(borrow)]
157 pub r#type: TypeName<'a>,
158 #[serde(flatten)]
160 pub attributes: Attributes<'a>,
161}
162
163#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
168#[serde(untagged)]
169pub enum Schema<'a> {
170 #[serde(borrow)]
172 TypeName(TypeName<'a>),
173 #[serde(borrow)]
175 Union(Vec<Schema<'a>>),
176 #[serde(borrow)]
178 Complex(ComplexType<'a>),
179 #[serde(borrow)]
181 Type(Type<'a>),
182}
183
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
188#[serde(tag = "type", rename_all = "camelCase")]
189pub enum ComplexType<'a> {
190 #[serde(borrow)]
192 Record(Record<'a>),
193 #[serde(borrow)]
195 Enum(Enum<'a>),
196 #[serde(borrow)]
198 Array(Array<'a>),
199 #[serde(borrow)]
201 Map(Map<'a>),
202 #[serde(borrow)]
204 Fixed(Fixed<'a>),
205}
206
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
211pub struct Record<'a> {
212 #[serde(borrow)]
214 pub name: &'a str,
215 #[serde(borrow, default)]
217 pub namespace: Option<&'a str>,
218 #[serde(borrow, default)]
220 pub doc: Option<&'a str>,
221 #[serde(borrow, default)]
223 pub aliases: Vec<&'a str>,
224 #[serde(borrow)]
226 pub fields: Vec<Field<'a>>,
227 #[serde(flatten)]
229 pub attributes: Attributes<'a>,
230}
231
232#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
234pub struct Field<'a> {
235 #[serde(borrow)]
237 pub name: &'a str,
238 #[serde(borrow, default)]
240 pub doc: Option<&'a str>,
241 #[serde(borrow)]
243 pub r#type: Schema<'a>,
244 #[serde(default)]
246 pub default: Option<Value>,
247}
248
249#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
253pub struct Enum<'a> {
254 #[serde(borrow)]
256 pub name: &'a str,
257 #[serde(borrow, default)]
259 pub namespace: Option<&'a str>,
260 #[serde(borrow, default)]
262 pub doc: Option<&'a str>,
263 #[serde(borrow, default)]
265 pub aliases: Vec<&'a str>,
266 #[serde(borrow)]
268 pub symbols: Vec<&'a str>,
269 #[serde(borrow, default)]
271 pub default: Option<&'a str>,
272 #[serde(flatten)]
274 pub attributes: Attributes<'a>,
275}
276
277#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
281pub struct Array<'a> {
282 #[serde(borrow)]
284 pub items: Box<Schema<'a>>,
285 #[serde(flatten)]
287 pub attributes: Attributes<'a>,
288}
289
290#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
294pub struct Map<'a> {
295 #[serde(borrow)]
297 pub values: Box<Schema<'a>>,
298 #[serde(flatten)]
300 pub attributes: Attributes<'a>,
301}
302
303#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
307pub struct Fixed<'a> {
308 #[serde(borrow)]
310 pub name: &'a str,
311 #[serde(borrow, default)]
313 pub namespace: Option<&'a str>,
314 #[serde(borrow, default)]
316 pub aliases: Vec<&'a str>,
317 pub size: usize,
319 #[serde(flatten)]
321 pub attributes: Attributes<'a>,
322}
323
324#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
326pub struct AvroSchema {
327 pub json_string: String,
329}
330
331impl TryFrom<&ArrowSchema> for AvroSchema {
332 type Error = ArrowError;
333
334 fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
338 AvroSchema::from_arrow_with_options(schema, None)
339 }
340}
341
342impl AvroSchema {
343 pub fn new(json_string: String) -> Self {
345 Self { json_string }
346 }
347
348 pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
349 serde_json::from_str(self.json_string.as_str())
350 .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
351 }
352
353 pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
381 Self::generate_fingerprint(&self.schema()?, hash_type)
382 }
383
384 pub(crate) fn generate_fingerprint(
385 schema: &Schema,
386 hash_type: FingerprintAlgorithm,
387 ) -> Result<Fingerprint, ArrowError> {
388 let canonical = Self::generate_canonical_form(schema).map_err(|e| {
389 ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
390 })?;
391 match hash_type {
392 FingerprintAlgorithm::Rabin => {
393 Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
394 }
395 FingerprintAlgorithm::None => Err(ArrowError::SchemaError(
396 "FingerprintAlgorithm of None cannot be used to generate a fingerprint; \
397 if using Fingerprint::Id, pass the registry ID in instead using the set method."
398 .to_string(),
399 )),
400 #[cfg(feature = "md5")]
401 FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
402 #[cfg(feature = "sha256")]
403 FingerprintAlgorithm::SHA256 => {
404 Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
405 }
406 }
407 }
408
409 pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, ArrowError> {
417 Self::generate_fingerprint(schema, FingerprintAlgorithm::Rabin)
418 }
419
420 pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
431 build_canonical(schema, None)
432 }
433
434 pub fn from_arrow_with_options(
441 schema: &ArrowSchema,
442 null_order: Option<Nullability>,
443 ) -> Result<AvroSchema, ArrowError> {
444 if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
445 return Ok(AvroSchema::new(json.clone()));
446 }
447 let order = null_order.unwrap_or_default();
448 let mut name_gen = NameGenerator::default();
449 let fields_json = schema
450 .fields()
451 .iter()
452 .map(|f| arrow_field_to_avro(f, &mut name_gen, order))
453 .collect::<Result<Vec<_>, _>>()?;
454 let record_name = schema
455 .metadata
456 .get(AVRO_NAME_METADATA_KEY)
457 .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
458 let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
459 record.insert("type".into(), Value::String("record".into()));
460 record.insert(
461 "name".into(),
462 Value::String(sanitise_avro_name(record_name)),
463 );
464 if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
465 record.insert("namespace".into(), Value::String(ns.clone()));
466 }
467 if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
468 record.insert("doc".into(), Value::String(doc.clone()));
469 }
470 record.insert("fields".into(), Value::Array(fields_json));
471 extend_with_passthrough_metadata(&mut record, &schema.metadata);
472 let json_string = serde_json::to_string(&Value::Object(record))
473 .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
474 Ok(AvroSchema::new(json_string))
475 }
476}
477
478#[derive(Debug, Copy, Clone)]
480pub struct Prefix {
481 buf: [u8; MAX_PREFIX_LEN],
482 len: u8,
483}
484
485impl Prefix {
486 #[inline]
487 pub(crate) fn as_slice(&self) -> &[u8] {
488 &self.buf[..self.len as usize]
489 }
490}
491
492#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
494pub enum FingerprintStrategy {
495 #[default]
497 Rabin,
498 Id(u32),
500 #[cfg(feature = "md5")]
501 MD5,
503 #[cfg(feature = "sha256")]
504 SHA256,
506}
507
508impl From<Fingerprint> for FingerprintStrategy {
509 fn from(f: Fingerprint) -> Self {
510 Self::from(&f)
511 }
512}
513
514impl From<FingerprintAlgorithm> for FingerprintStrategy {
515 fn from(f: FingerprintAlgorithm) -> Self {
516 match f {
517 FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
518 FingerprintAlgorithm::None => FingerprintStrategy::Id(0),
519 #[cfg(feature = "md5")]
520 FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
521 #[cfg(feature = "sha256")]
522 FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
523 }
524 }
525}
526
527impl From<&Fingerprint> for FingerprintStrategy {
528 fn from(f: &Fingerprint) -> Self {
529 match f {
530 Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
531 Fingerprint::Id(id) => FingerprintStrategy::Id(*id),
532 #[cfg(feature = "md5")]
533 Fingerprint::MD5(_) => FingerprintStrategy::MD5,
534 #[cfg(feature = "sha256")]
535 Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
536 }
537 }
538}
539
540#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
543pub enum FingerprintAlgorithm {
544 #[default]
546 Rabin,
547 None,
549 #[cfg(feature = "md5")]
550 MD5,
552 #[cfg(feature = "sha256")]
553 SHA256,
555}
556
557impl From<&Fingerprint> for FingerprintAlgorithm {
559 fn from(fp: &Fingerprint) -> Self {
560 match fp {
561 Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
562 Fingerprint::Id(_) => FingerprintAlgorithm::None,
563 #[cfg(feature = "md5")]
564 Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
565 #[cfg(feature = "sha256")]
566 Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
567 }
568 }
569}
570
571impl From<FingerprintStrategy> for FingerprintAlgorithm {
572 fn from(s: FingerprintStrategy) -> Self {
573 Self::from(&s)
574 }
575}
576
577impl From<&FingerprintStrategy> for FingerprintAlgorithm {
578 fn from(s: &FingerprintStrategy) -> Self {
579 match s {
580 FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
581 FingerprintStrategy::Id(_) => FingerprintAlgorithm::None,
582 #[cfg(feature = "md5")]
583 FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
584 #[cfg(feature = "sha256")]
585 FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
586 }
587 }
588}
589
590#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
599pub enum Fingerprint {
600 Rabin(u64),
602 Id(u32),
604 #[cfg(feature = "md5")]
605 MD5([u8; 16]),
607 #[cfg(feature = "sha256")]
608 SHA256([u8; 32]),
610}
611
612impl From<FingerprintStrategy> for Fingerprint {
613 fn from(s: FingerprintStrategy) -> Self {
614 Self::from(&s)
615 }
616}
617
618impl From<&FingerprintStrategy> for Fingerprint {
619 fn from(s: &FingerprintStrategy) -> Self {
620 match s {
621 FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
622 FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
623 #[cfg(feature = "md5")]
624 FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
625 #[cfg(feature = "sha256")]
626 FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
627 }
628 }
629}
630
631impl From<FingerprintAlgorithm> for Fingerprint {
632 fn from(s: FingerprintAlgorithm) -> Self {
633 match s {
634 FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
635 FingerprintAlgorithm::None => Fingerprint::Id(0),
636 #[cfg(feature = "md5")]
637 FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
638 #[cfg(feature = "sha256")]
639 FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
640 }
641 }
642}
643
644impl Fingerprint {
645 pub fn load_fingerprint_id(id: u32) -> Self {
653 Fingerprint::Id(u32::from_be(id))
654 }
655
656 pub fn make_prefix(&self) -> Prefix {
678 let mut buf = [0u8; MAX_PREFIX_LEN];
679 let len = match self {
680 Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
681 Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
682 #[cfg(feature = "md5")]
683 Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
684 #[cfg(feature = "sha256")]
685 Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
686 };
687 Prefix { buf, len }
688 }
689}
690
691fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
692 buf: &mut [u8; MAX_PREFIX_LEN],
693 magic: &[u8; MAGIC_LEN],
694 payload: &[u8; PAYLOAD_LEN],
695) -> u8 {
696 debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
697 let total = MAGIC_LEN + PAYLOAD_LEN;
698 let prefix_slice = &mut buf[..total];
699 prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
700 prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
701 total as u8
702}
703
704#[derive(Debug, Clone, Default)]
731pub struct SchemaStore {
732 fingerprint_algorithm: FingerprintAlgorithm,
734 schemas: HashMap<Fingerprint, AvroSchema>,
736}
737
738impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
739 type Error = ArrowError;
740
741 fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
744 Ok(Self {
745 schemas,
746 ..Self::default()
747 })
748 }
749}
750
751impl SchemaStore {
752 pub fn new() -> Self {
754 Self::default()
755 }
756
757 pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
759 Self {
760 fingerprint_algorithm,
761 ..Self::default()
762 }
763 }
764
765 pub fn set(
782 &mut self,
783 fingerprint: Fingerprint,
784 schema: AvroSchema,
785 ) -> Result<Fingerprint, ArrowError> {
786 match self.schemas.entry(fingerprint) {
787 Entry::Occupied(entry) => {
788 if entry.get() != &schema {
789 return Err(ArrowError::ComputeError(format!(
790 "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
791 )));
792 }
793 }
794 Entry::Vacant(entry) => {
795 entry.insert(schema);
796 }
797 }
798 Ok(fingerprint)
799 }
800
801 pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
819 if self.fingerprint_algorithm == FingerprintAlgorithm::None {
820 return Err(ArrowError::SchemaError(
821 "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
822 Use the set method directly instead, providing a valid fingerprint"
823 .to_string(),
824 ));
825 }
826 let fingerprint =
827 AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
828 self.set(fingerprint, schema)?;
829 Ok(fingerprint)
830 }
831
832 pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
842 self.schemas.get(fingerprint)
843 }
844
845 pub fn fingerprints(&self) -> Vec<Fingerprint> {
851 self.schemas.keys().copied().collect()
852 }
853
854 pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
856 self.fingerprint_algorithm
857 }
858}
859
860fn quote(s: &str) -> Result<String, ArrowError> {
861 serde_json::to_string(s)
862 .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
863}
864
865pub(crate) fn make_full_name(
882 name: &str,
883 namespace_attr: Option<&str>,
884 enclosing_ns: Option<&str>,
885) -> (String, Option<String>) {
886 if let Some((ns, _)) = name.rsplit_once('.') {
888 return (name.to_string(), Some(ns.to_string()));
889 }
890 match namespace_attr.or(enclosing_ns) {
891 Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
892 None => (name.to_string(), None),
893 }
894}
895
896fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
897 Ok(match schema {
898 Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
899 TypeName::Primitive(pt) => quote(pt.as_ref())?,
900 TypeName::Ref(name) => {
901 let (full_name, _) = make_full_name(name, None, enclosing_ns);
902 quote(&full_name)?
903 }
904 },
905 Schema::Union(branches) => format!(
906 "[{}]",
907 branches
908 .iter()
909 .map(|b| build_canonical(b, enclosing_ns))
910 .collect::<Result<Vec<_>, _>>()?
911 .join(",")
912 ),
913 Schema::Complex(ct) => match ct {
914 ComplexType::Record(r) => {
915 let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
916 let fields = r
917 .fields
918 .iter()
919 .map(|f| {
920 let field_type =
921 build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
922 Ok(format!(
923 r#"{{"name":{},"type":{}}}"#,
924 quote(f.name)?,
925 field_type
926 ))
927 })
928 .collect::<Result<Vec<_>, ArrowError>>()?
929 .join(",");
930 format!(
931 r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
932 quote(&full_name)?,
933 )
934 }
935 ComplexType::Enum(e) => {
936 let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
937 let symbols = e
938 .symbols
939 .iter()
940 .map(|s| quote(s))
941 .collect::<Result<Vec<_>, _>>()?
942 .join(",");
943 format!(
944 r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
945 quote(&full_name)?
946 )
947 }
948 ComplexType::Array(arr) => format!(
949 r#"{{"type":"array","items":{}}}"#,
950 build_canonical(&arr.items, enclosing_ns)?
951 ),
952 ComplexType::Map(map) => format!(
953 r#"{{"type":"map","values":{}}}"#,
954 build_canonical(&map.values, enclosing_ns)?
955 ),
956 ComplexType::Fixed(f) => {
957 let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
958 format!(
959 r#"{{"name":{},"type":"fixed","size":{}}}"#,
960 quote(&full_name)?,
961 f.size
962 )
963 }
964 },
965 })
966}
967
968const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
970
971const fn one_entry(i: usize) -> u64 {
978 let mut fp = i as u64;
979 let mut j = 0;
980 while j < 8 {
981 fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
982 j += 1;
983 }
984 fp
985}
986
987const fn build_table() -> [u64; 256] {
994 let mut table = [0u64; 256];
995 let mut i = 0;
996 while i < 256 {
997 table[i] = one_entry(i);
998 i += 1;
999 }
1000 table
1001}
1002
1003static FINGERPRINT_TABLE: [u64; 256] = build_table();
1005
1006pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1009 let mut fp = EMPTY;
1010 for &byte in canonical_form.as_bytes() {
1011 let idx = ((fp as u8) ^ byte) as usize;
1012 fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1013 }
1014 fp
1015}
1016
1017#[cfg(feature = "md5")]
1018#[inline]
1023pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1024 let digest = md5::compute(canonical_form.as_bytes());
1025 digest.0
1026}
1027
1028#[cfg(feature = "sha256")]
1029#[inline]
1033pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1034 let mut hasher = Sha256::new();
1035 hasher.update(canonical_form.as_bytes());
1036 let digest = hasher.finalize();
1037 digest.into()
1038}
1039
1040#[inline]
1041fn is_internal_arrow_key(key: &str) -> bool {
1042 key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1043}
1044
1045fn extend_with_passthrough_metadata(
1050 target: &mut JsonMap<String, Value>,
1051 metadata: &HashMap<String, String>,
1052) {
1053 for (meta_key, meta_val) in metadata {
1054 if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1055 continue;
1056 }
1057 let json_val =
1058 serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1059 target.insert(meta_key.clone(), json_val);
1060 }
1061}
1062
1063fn sanitise_avro_name(base_name: &str) -> String {
1065 if base_name.is_empty() {
1066 return "_".to_owned();
1067 }
1068 let mut out: String = base_name
1069 .chars()
1070 .map(|char| {
1071 if char.is_ascii_alphanumeric() || char == '_' {
1072 char
1073 } else {
1074 '_'
1075 }
1076 })
1077 .collect();
1078 if out.as_bytes()[0].is_ascii_digit() {
1079 out.insert(0, '_');
1080 }
1081 out
1082}
1083
1084#[derive(Default)]
1085struct NameGenerator {
1086 used: HashSet<String>,
1087 counters: HashMap<String, usize>,
1088}
1089
1090impl NameGenerator {
1091 fn make_unique(&mut self, field_name: &str) -> String {
1092 let field_name = sanitise_avro_name(field_name);
1093 if self.used.insert(field_name.clone()) {
1094 self.counters.insert(field_name.clone(), 1);
1095 return field_name;
1096 }
1097 let counter = self.counters.entry(field_name.clone()).or_insert(1);
1098 loop {
1099 let candidate = format!("{field_name}_{}", *counter);
1100 if self.used.insert(candidate.clone()) {
1101 return candidate;
1102 }
1103 *counter += 1;
1104 }
1105 }
1106}
1107
1108fn merge_extras(schema: Value, mut extras: JsonMap<String, Value>) -> Value {
1109 if extras.is_empty() {
1110 return schema;
1111 }
1112 match schema {
1113 Value::Object(mut map) => {
1114 map.extend(extras);
1115 Value::Object(map)
1116 }
1117 Value::Array(mut union) => {
1118 if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1121 let original = std::mem::take(non_null);
1122 *non_null = merge_extras(original, extras);
1123 }
1124 Value::Array(union)
1125 }
1126 primitive => {
1127 let mut map = JsonMap::with_capacity(extras.len() + 1);
1128 map.insert("type".into(), primitive);
1129 map.extend(extras);
1130 Value::Object(map)
1131 }
1132 }
1133}
1134
1135#[inline]
1136fn is_avro_json_null(v: &Value) -> bool {
1137 matches!(v, Value::String(s) if s == "null")
1138}
1139
1140fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1141 let null = Value::String("null".into());
1142 match inner {
1143 Value::Array(mut union) => {
1144 union.retain(|v| !is_avro_json_null(v));
1145 match null_order {
1146 Nullability::NullFirst => union.insert(0, null),
1147 Nullability::NullSecond => union.push(null),
1148 }
1149 Value::Array(union)
1150 }
1151 other => match null_order {
1152 Nullability::NullFirst => Value::Array(vec![null, other]),
1153 Nullability::NullSecond => Value::Array(vec![other, null]),
1154 },
1155 }
1156}
1157
1158fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1159 match branch {
1160 Value::String(t) => Ok(format!("P:{t}")),
1161 Value::Object(map) => {
1162 let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1163 ArrowError::SchemaError("Union branch object missing string 'type'".into())
1164 })?;
1165 match t {
1166 "record" | "enum" | "fixed" => {
1167 let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1168 ArrowError::SchemaError(format!(
1169 "Union branch '{t}' missing required 'name'"
1170 ))
1171 })?;
1172 Ok(format!("N:{t}:{name}"))
1173 }
1174 "array" | "map" => Ok(format!("C:{t}")),
1175 other => Ok(format!("P:{other}")),
1176 }
1177 }
1178 Value::Array(_) => Err(ArrowError::SchemaError(
1179 "Avro union may not immediately contain another union".into(),
1180 )),
1181 _ => Err(ArrowError::SchemaError(
1182 "Invalid JSON for Avro union branch".into(),
1183 )),
1184 }
1185}
1186
1187fn datatype_to_avro(
1188 dt: &DataType,
1189 field_name: &str,
1190 metadata: &HashMap<String, String>,
1191 name_gen: &mut NameGenerator,
1192 null_order: Nullability,
1193) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1194 let mut extras = JsonMap::new();
1195 let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1196 if *scale < 0 {
1197 return Err(ArrowError::SchemaError(format!(
1198 "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1199 )));
1200 }
1201 if (*scale as usize) > (*precision as usize) {
1202 return Err(ArrowError::SchemaError(format!(
1203 "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1204 must be <= precision ({precision})"
1205 )));
1206 }
1207
1208 let mut meta = JsonMap::from_iter([
1209 ("logicalType".into(), json!("decimal")),
1210 ("precision".into(), json!(*precision)),
1211 ("scale".into(), json!(*scale)),
1212 ]);
1213 if let Some(size) = metadata
1214 .get("size")
1215 .and_then(|val| val.parse::<usize>().ok())
1216 {
1217 meta.insert("type".into(), json!("fixed"));
1218 meta.insert("size".into(), json!(size));
1219 meta.insert("name".into(), json!(name_gen.make_unique(field_name)));
1220 } else {
1221 meta.insert("type".into(), json!("bytes"));
1222 }
1223 Ok(Value::Object(meta))
1224 };
1225 let val = match dt {
1226 DataType::Null => Value::String("null".into()),
1227 DataType::Boolean => Value::String("boolean".into()),
1228 DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 | DataType::Int32 => {
1229 Value::String("int".into())
1230 }
1231 DataType::UInt32 | DataType::Int64 | DataType::UInt64 => Value::String("long".into()),
1232 DataType::Float16 | DataType::Float32 => Value::String("float".into()),
1233 DataType::Float64 => Value::String("double".into()),
1234 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1235 DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1236 DataType::BinaryView => {
1237 extras.insert("arrowBinaryView".into(), Value::Bool(true));
1238 Value::String("bytes".into())
1239 }
1240 DataType::FixedSizeBinary(len) => {
1241 let is_uuid = metadata
1242 .get("logicalType")
1243 .is_some_and(|value| value == "uuid")
1244 || (*len == 16
1245 && metadata
1246 .get("ARROW:extension:name")
1247 .is_some_and(|value| value == "uuid"));
1248 if is_uuid {
1249 json!({ "type": "string", "logicalType": "uuid" })
1250 } else {
1251 json!({
1252 "type": "fixed",
1253 "name": name_gen.make_unique(field_name),
1254 "size": len
1255 })
1256 }
1257 }
1258 #[cfg(feature = "small_decimals")]
1259 DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1260 handle_decimal(precision, scale)?
1261 }
1262 DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1263 handle_decimal(precision, scale)?
1264 }
1265 DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1266 DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1267 DataType::Time32(unit) => match unit {
1268 TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1269 TimeUnit::Second => {
1270 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1271 Value::String("int".into())
1272 }
1273 _ => Value::String("int".into()),
1274 },
1275 DataType::Time64(unit) => match unit {
1276 TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1277 TimeUnit::Nanosecond => {
1278 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1279 Value::String("long".into())
1280 }
1281 _ => Value::String("long".into()),
1282 },
1283 DataType::Timestamp(unit, tz) => {
1284 let logical_type = match (unit, tz.is_some()) {
1285 (TimeUnit::Millisecond, true) => "timestamp-millis",
1286 (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1287 (TimeUnit::Microsecond, true) => "timestamp-micros",
1288 (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1289 (TimeUnit::Second, _) => {
1290 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1291 return Ok((Value::String("long".into()), extras));
1292 }
1293 (TimeUnit::Nanosecond, _) => {
1294 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1295 return Ok((Value::String("long".into()), extras));
1296 }
1297 };
1298 json!({ "type": "long", "logicalType": logical_type })
1299 }
1300 DataType::Duration(unit) => {
1301 #[cfg(feature = "avro_custom_types")]
1302 {
1303 let logical_type = match unit {
1306 TimeUnit::Second => "arrow.duration-seconds",
1307 TimeUnit::Millisecond => "arrow.duration-millis",
1308 TimeUnit::Microsecond => "arrow.duration-micros",
1309 TimeUnit::Nanosecond => "arrow.duration-nanos",
1310 };
1311 json!({ "type": "long", "logicalType": logical_type })
1312 }
1313 #[cfg(not(feature = "avro_custom_types"))]
1314 {
1315 Value::String("long".into())
1316 }
1317 }
1318 DataType::Interval(IntervalUnit::MonthDayNano) => json!({
1319 "type": "fixed",
1320 "name": name_gen.make_unique(&format!("{field_name}_duration")),
1321 "size": 12,
1322 "logicalType": "duration"
1323 }),
1324 DataType::Interval(IntervalUnit::YearMonth) => {
1325 extras.insert(
1326 "arrowIntervalUnit".into(),
1327 Value::String("yearmonth".into()),
1328 );
1329 Value::String("long".into())
1330 }
1331 DataType::Interval(IntervalUnit::DayTime) => {
1332 extras.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1333 Value::String("long".into())
1334 }
1335 DataType::List(child) | DataType::LargeList(child) => {
1336 if matches!(dt, DataType::LargeList(_)) {
1337 extras.insert("arrowLargeList".into(), Value::Bool(true));
1338 }
1339 let items_schema = process_datatype(
1340 child.data_type(),
1341 child.name(),
1342 child.metadata(),
1343 name_gen,
1344 null_order,
1345 child.is_nullable(),
1346 )?;
1347 json!({
1348 "type": "array",
1349 "items": items_schema
1350 })
1351 }
1352 DataType::ListView(child) | DataType::LargeListView(child) => {
1353 if matches!(dt, DataType::LargeListView(_)) {
1354 extras.insert("arrowLargeList".into(), Value::Bool(true));
1355 }
1356 extras.insert("arrowListView".into(), Value::Bool(true));
1357 let items_schema = process_datatype(
1358 child.data_type(),
1359 child.name(),
1360 child.metadata(),
1361 name_gen,
1362 null_order,
1363 child.is_nullable(),
1364 )?;
1365 json!({
1366 "type": "array",
1367 "items": items_schema
1368 })
1369 }
1370 DataType::FixedSizeList(child, len) => {
1371 extras.insert("arrowFixedSize".into(), json!(len));
1372 let items_schema = process_datatype(
1373 child.data_type(),
1374 child.name(),
1375 child.metadata(),
1376 name_gen,
1377 null_order,
1378 child.is_nullable(),
1379 )?;
1380 json!({
1381 "type": "array",
1382 "items": items_schema
1383 })
1384 }
1385 DataType::Map(entries, _) => {
1386 let value_field = match entries.data_type() {
1387 DataType::Struct(fs) => &fs[1],
1388 _ => {
1389 return Err(ArrowError::SchemaError(
1390 "Map 'entries' field must be Struct(key,value)".into(),
1391 ))
1392 }
1393 };
1394 let values_schema = process_datatype(
1395 value_field.data_type(),
1396 value_field.name(),
1397 value_field.metadata(),
1398 name_gen,
1399 null_order,
1400 value_field.is_nullable(),
1401 )?;
1402 json!({
1403 "type": "map",
1404 "values": values_schema
1405 })
1406 }
1407 DataType::Struct(fields) => {
1408 let avro_fields = fields
1409 .iter()
1410 .map(|field| arrow_field_to_avro(field, name_gen, null_order))
1411 .collect::<Result<Vec<_>, _>>()?;
1412 json!({
1413 "type": "record",
1414 "name": name_gen.make_unique(field_name),
1415 "fields": avro_fields
1416 })
1417 }
1418 DataType::Dictionary(_, value) => {
1419 if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1420 let symbols: Vec<&str> =
1421 serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1422 json!({
1423 "type": "enum",
1424 "name": name_gen.make_unique(field_name),
1425 "symbols": symbols
1426 })
1427 } else {
1428 process_datatype(
1429 value.as_ref(),
1430 field_name,
1431 metadata,
1432 name_gen,
1433 null_order,
1434 false,
1435 )?
1436 }
1437 }
1438 DataType::RunEndEncoded(_, values) => process_datatype(
1439 values.data_type(),
1440 values.name(),
1441 values.metadata(),
1442 name_gen,
1443 null_order,
1444 false,
1445 )?,
1446 DataType::Union(fields, mode) => {
1447 let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1448 let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1449 for (type_id, field_ref) in fields.iter() {
1450 let (branch_schema, _branch_extras) = datatype_to_avro(
1452 field_ref.data_type(),
1453 field_ref.name(),
1454 field_ref.metadata(),
1455 name_gen,
1456 null_order,
1457 )?;
1458 if matches!(branch_schema, Value::Array(_)) {
1460 return Err(ArrowError::SchemaError(
1461 "Avro union may not immediately contain another union".into(),
1462 ));
1463 }
1464 branches.push(branch_schema);
1465 type_ids.push(type_id as i32);
1466 }
1467 let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1468 for b in &branches {
1469 let sig = union_branch_signature(b)?;
1470 if !seen.insert(sig) {
1471 return Err(ArrowError::SchemaError(
1472 "Avro union contains duplicate branch types (disallowed by spec)".into(),
1473 ));
1474 }
1475 }
1476 extras.insert(
1477 "arrowUnionMode".into(),
1478 Value::String(
1479 match mode {
1480 UnionMode::Sparse => "sparse",
1481 UnionMode::Dense => "dense",
1482 }
1483 .to_string(),
1484 ),
1485 );
1486 extras.insert(
1487 "arrowUnionTypeIds".into(),
1488 Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1489 );
1490
1491 Value::Array(branches)
1492 }
1493 other => {
1494 return Err(ArrowError::NotYetImplemented(format!(
1495 "Arrow type {other:?} has no Avro representation"
1496 )))
1497 }
1498 };
1499 Ok((val, extras))
1500}
1501
1502fn process_datatype(
1503 dt: &DataType,
1504 field_name: &str,
1505 metadata: &HashMap<String, String>,
1506 name_gen: &mut NameGenerator,
1507 null_order: Nullability,
1508 is_nullable: bool,
1509) -> Result<Value, ArrowError> {
1510 let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order)?;
1511 let mut merged = merge_extras(schema, extras);
1512 if is_nullable {
1513 merged = wrap_nullable(merged, null_order)
1514 }
1515 Ok(merged)
1516}
1517
1518fn arrow_field_to_avro(
1519 field: &ArrowField,
1520 name_gen: &mut NameGenerator,
1521 null_order: Nullability,
1522) -> Result<Value, ArrowError> {
1523 let avro_name = sanitise_avro_name(field.name());
1524 let schema_value = process_datatype(
1525 field.data_type(),
1526 &avro_name,
1527 field.metadata(),
1528 name_gen,
1529 null_order,
1530 field.is_nullable(),
1531 )?;
1532 let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1534 map.insert("name".into(), Value::String(avro_name));
1535 map.insert("type".into(), schema_value);
1536 for (meta_key, meta_val) in field.metadata() {
1538 if is_internal_arrow_key(meta_key) {
1539 continue;
1540 }
1541 match meta_key.as_str() {
1542 AVRO_DOC_METADATA_KEY => {
1543 map.insert("doc".into(), Value::String(meta_val.clone()));
1544 }
1545 AVRO_FIELD_DEFAULT_METADATA_KEY => {
1546 let default_value = serde_json::from_str(meta_val)
1547 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1548 map.insert("default".into(), default_value);
1549 }
1550 _ => {
1551 let json_val = serde_json::from_str(meta_val)
1552 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1553 map.insert(meta_key.clone(), json_val);
1554 }
1555 }
1556 }
1557 Ok(Value::Object(map))
1558}
1559
1560#[cfg(test)]
1561mod tests {
1562 use super::*;
1563 use crate::codec::{AvroDataType, AvroField};
1564 use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1565 use serde_json::json;
1566 use std::sync::Arc;
1567
1568 fn int_schema() -> Schema<'static> {
1569 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1570 }
1571
1572 fn record_schema() -> Schema<'static> {
1573 Schema::Complex(ComplexType::Record(Record {
1574 name: "record1",
1575 namespace: Some("test.namespace"),
1576 doc: Some("A test record"),
1577 aliases: vec![],
1578 fields: vec![
1579 Field {
1580 name: "field1",
1581 doc: Some("An integer field"),
1582 r#type: int_schema(),
1583 default: None,
1584 },
1585 Field {
1586 name: "field2",
1587 doc: None,
1588 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1589 default: None,
1590 },
1591 ],
1592 attributes: Attributes::default(),
1593 }))
1594 }
1595
1596 fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
1597 let mut sb = SchemaBuilder::new();
1598 sb.push(field);
1599 sb.finish()
1600 }
1601
1602 fn assert_json_contains(avro_json: &str, needle: &str) {
1603 assert!(
1604 avro_json.contains(needle),
1605 "JSON did not contain `{needle}` : {avro_json}"
1606 )
1607 }
1608
1609 #[test]
1610 fn test_deserialize() {
1611 let t: Schema = serde_json::from_str("\"string\"").unwrap();
1612 assert_eq!(
1613 t,
1614 Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
1615 );
1616
1617 let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
1618 assert_eq!(
1619 t,
1620 Schema::Union(vec![
1621 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1622 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1623 ])
1624 );
1625
1626 let t: Type = serde_json::from_str(
1627 r#"{
1628 "type":"long",
1629 "logicalType":"timestamp-micros"
1630 }"#,
1631 )
1632 .unwrap();
1633
1634 let timestamp = Type {
1635 r#type: TypeName::Primitive(PrimitiveType::Long),
1636 attributes: Attributes {
1637 logical_type: Some("timestamp-micros"),
1638 additional: Default::default(),
1639 },
1640 };
1641
1642 assert_eq!(t, timestamp);
1643
1644 let t: ComplexType = serde_json::from_str(
1645 r#"{
1646 "type":"fixed",
1647 "name":"fixed",
1648 "namespace":"topLevelRecord.value",
1649 "size":11,
1650 "logicalType":"decimal",
1651 "precision":25,
1652 "scale":2
1653 }"#,
1654 )
1655 .unwrap();
1656
1657 let decimal = ComplexType::Fixed(Fixed {
1658 name: "fixed",
1659 namespace: Some("topLevelRecord.value"),
1660 aliases: vec![],
1661 size: 11,
1662 attributes: Attributes {
1663 logical_type: Some("decimal"),
1664 additional: vec![("precision", json!(25)), ("scale", json!(2))]
1665 .into_iter()
1666 .collect(),
1667 },
1668 });
1669
1670 assert_eq!(t, decimal);
1671
1672 let schema: Schema = serde_json::from_str(
1673 r#"{
1674 "type":"record",
1675 "name":"topLevelRecord",
1676 "fields":[
1677 {
1678 "name":"value",
1679 "type":[
1680 {
1681 "type":"fixed",
1682 "name":"fixed",
1683 "namespace":"topLevelRecord.value",
1684 "size":11,
1685 "logicalType":"decimal",
1686 "precision":25,
1687 "scale":2
1688 },
1689 "null"
1690 ]
1691 }
1692 ]
1693 }"#,
1694 )
1695 .unwrap();
1696
1697 assert_eq!(
1698 schema,
1699 Schema::Complex(ComplexType::Record(Record {
1700 name: "topLevelRecord",
1701 namespace: None,
1702 doc: None,
1703 aliases: vec![],
1704 fields: vec![Field {
1705 name: "value",
1706 doc: None,
1707 r#type: Schema::Union(vec![
1708 Schema::Complex(decimal),
1709 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1710 ]),
1711 default: None,
1712 },],
1713 attributes: Default::default(),
1714 }))
1715 );
1716
1717 let schema: Schema = serde_json::from_str(
1718 r#"{
1719 "type": "record",
1720 "name": "LongList",
1721 "aliases": ["LinkedLongs"],
1722 "fields" : [
1723 {"name": "value", "type": "long"},
1724 {"name": "next", "type": ["null", "LongList"]}
1725 ]
1726 }"#,
1727 )
1728 .unwrap();
1729
1730 assert_eq!(
1731 schema,
1732 Schema::Complex(ComplexType::Record(Record {
1733 name: "LongList",
1734 namespace: None,
1735 doc: None,
1736 aliases: vec!["LinkedLongs"],
1737 fields: vec![
1738 Field {
1739 name: "value",
1740 doc: None,
1741 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
1742 default: None,
1743 },
1744 Field {
1745 name: "next",
1746 doc: None,
1747 r#type: Schema::Union(vec![
1748 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1749 Schema::TypeName(TypeName::Ref("LongList")),
1750 ]),
1751 default: None,
1752 }
1753 ],
1754 attributes: Attributes::default(),
1755 }))
1756 );
1757
1758 let err = AvroField::try_from(&schema).unwrap_err().to_string();
1760 assert_eq!(err, "Parser error: Failed to resolve .LongList");
1761
1762 let schema: Schema = serde_json::from_str(
1763 r#"{
1764 "type":"record",
1765 "name":"topLevelRecord",
1766 "fields":[
1767 {
1768 "name":"id",
1769 "type":[
1770 "int",
1771 "null"
1772 ]
1773 },
1774 {
1775 "name":"timestamp_col",
1776 "type":[
1777 {
1778 "type":"long",
1779 "logicalType":"timestamp-micros"
1780 },
1781 "null"
1782 ]
1783 }
1784 ]
1785 }"#,
1786 )
1787 .unwrap();
1788
1789 assert_eq!(
1790 schema,
1791 Schema::Complex(ComplexType::Record(Record {
1792 name: "topLevelRecord",
1793 namespace: None,
1794 doc: None,
1795 aliases: vec![],
1796 fields: vec![
1797 Field {
1798 name: "id",
1799 doc: None,
1800 r#type: Schema::Union(vec![
1801 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1802 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1803 ]),
1804 default: None,
1805 },
1806 Field {
1807 name: "timestamp_col",
1808 doc: None,
1809 r#type: Schema::Union(vec![
1810 Schema::Type(timestamp),
1811 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1812 ]),
1813 default: None,
1814 }
1815 ],
1816 attributes: Default::default(),
1817 }))
1818 );
1819 let codec = AvroField::try_from(&schema).unwrap();
1820 assert_eq!(
1821 codec.field(),
1822 arrow_schema::Field::new(
1823 "topLevelRecord",
1824 DataType::Struct(Fields::from(vec![
1825 arrow_schema::Field::new("id", DataType::Int32, true),
1826 arrow_schema::Field::new(
1827 "timestamp_col",
1828 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1829 true
1830 ),
1831 ])),
1832 false
1833 )
1834 );
1835
1836 let schema: Schema = serde_json::from_str(
1837 r#"{
1838 "type": "record",
1839 "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
1840 "fields": [
1841 {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
1842 {"name": "clientProtocol", "type": ["null", "string"]},
1843 {"name": "serverHash", "type": "MD5"},
1844 {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
1845 ]
1846 }"#,
1847 )
1848 .unwrap();
1849
1850 assert_eq!(
1851 schema,
1852 Schema::Complex(ComplexType::Record(Record {
1853 name: "HandshakeRequest",
1854 namespace: Some("org.apache.avro.ipc"),
1855 doc: None,
1856 aliases: vec![],
1857 fields: vec![
1858 Field {
1859 name: "clientHash",
1860 doc: None,
1861 r#type: Schema::Complex(ComplexType::Fixed(Fixed {
1862 name: "MD5",
1863 namespace: None,
1864 aliases: vec![],
1865 size: 16,
1866 attributes: Default::default(),
1867 })),
1868 default: None,
1869 },
1870 Field {
1871 name: "clientProtocol",
1872 doc: None,
1873 r#type: Schema::Union(vec![
1874 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1875 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1876 ]),
1877 default: None,
1878 },
1879 Field {
1880 name: "serverHash",
1881 doc: None,
1882 r#type: Schema::TypeName(TypeName::Ref("MD5")),
1883 default: None,
1884 },
1885 Field {
1886 name: "meta",
1887 doc: None,
1888 r#type: Schema::Union(vec![
1889 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1890 Schema::Complex(ComplexType::Map(Map {
1891 values: Box::new(Schema::TypeName(TypeName::Primitive(
1892 PrimitiveType::Bytes
1893 ))),
1894 attributes: Default::default(),
1895 })),
1896 ]),
1897 default: None,
1898 }
1899 ],
1900 attributes: Default::default(),
1901 }))
1902 );
1903 }
1904
1905 #[test]
1906 fn test_new_schema_store() {
1907 let store = SchemaStore::new();
1908 assert!(store.schemas.is_empty());
1909 }
1910
1911 #[test]
1912 fn test_try_from_schemas_rabin() {
1913 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
1914 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
1915 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
1916 schemas.insert(
1917 int_avro_schema
1918 .fingerprint(FingerprintAlgorithm::Rabin)
1919 .unwrap(),
1920 int_avro_schema.clone(),
1921 );
1922 schemas.insert(
1923 record_avro_schema
1924 .fingerprint(FingerprintAlgorithm::Rabin)
1925 .unwrap(),
1926 record_avro_schema.clone(),
1927 );
1928 let store = SchemaStore::try_from(schemas).unwrap();
1929 let int_fp = int_avro_schema
1930 .fingerprint(FingerprintAlgorithm::Rabin)
1931 .unwrap();
1932 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
1933 let rec_fp = record_avro_schema
1934 .fingerprint(FingerprintAlgorithm::Rabin)
1935 .unwrap();
1936 assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
1937 }
1938
1939 #[test]
1940 fn test_try_from_with_duplicates() {
1941 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
1942 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
1943 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
1944 schemas.insert(
1945 int_avro_schema
1946 .fingerprint(FingerprintAlgorithm::Rabin)
1947 .unwrap(),
1948 int_avro_schema.clone(),
1949 );
1950 schemas.insert(
1951 record_avro_schema
1952 .fingerprint(FingerprintAlgorithm::Rabin)
1953 .unwrap(),
1954 record_avro_schema.clone(),
1955 );
1956 schemas.insert(
1958 int_avro_schema
1959 .fingerprint(FingerprintAlgorithm::Rabin)
1960 .unwrap(),
1961 int_avro_schema.clone(),
1962 );
1963 let store = SchemaStore::try_from(schemas).unwrap();
1964 assert_eq!(store.schemas.len(), 2);
1965 let int_fp = int_avro_schema
1966 .fingerprint(FingerprintAlgorithm::Rabin)
1967 .unwrap();
1968 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
1969 }
1970
1971 #[test]
1972 fn test_register_and_lookup_rabin() {
1973 let mut store = SchemaStore::new();
1974 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
1975 let fp_enum = store.register(schema.clone()).unwrap();
1976 match fp_enum {
1977 Fingerprint::Rabin(fp_val) => {
1978 assert_eq!(
1979 store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
1980 Some(schema.clone())
1981 );
1982 assert!(store
1983 .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
1984 .is_none());
1985 }
1986 Fingerprint::Id(id) => {
1987 unreachable!("This test should only generate Rabin fingerprints")
1988 }
1989 #[cfg(feature = "md5")]
1990 Fingerprint::MD5(id) => {
1991 unreachable!("This test should only generate Rabin fingerprints")
1992 }
1993 #[cfg(feature = "sha256")]
1994 Fingerprint::SHA256(id) => {
1995 unreachable!("This test should only generate Rabin fingerprints")
1996 }
1997 }
1998 }
1999
2000 #[test]
2001 fn test_set_and_lookup_id() {
2002 let mut store = SchemaStore::new();
2003 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2004 let id = 42u32;
2005 let fp = Fingerprint::Id(id);
2006 let out_fp = store.set(fp, schema.clone()).unwrap();
2007 assert_eq!(out_fp, fp);
2008 assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2009 assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2010 }
2011
2012 #[test]
2013 fn test_register_duplicate_schema() {
2014 let mut store = SchemaStore::new();
2015 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2016 let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2017 let fingerprint1 = store.register(schema1).unwrap();
2018 let fingerprint2 = store.register(schema2).unwrap();
2019 assert_eq!(fingerprint1, fingerprint2);
2020 assert_eq!(store.schemas.len(), 1);
2021 }
2022
2023 #[test]
2024 fn test_set_and_lookup_with_provided_fingerprint() {
2025 let mut store = SchemaStore::new();
2026 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2027 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2028 let out_fp = store.set(fp, schema.clone()).unwrap();
2029 assert_eq!(out_fp, fp);
2030 assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2031 }
2032
2033 #[test]
2034 fn test_set_duplicate_same_schema_ok() {
2035 let mut store = SchemaStore::new();
2036 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2037 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2038 let _ = store.set(fp, schema.clone()).unwrap();
2039 let _ = store.set(fp, schema.clone()).unwrap();
2040 assert_eq!(store.schemas.len(), 1);
2041 }
2042
2043 #[test]
2044 fn test_set_duplicate_different_schema_collision_error() {
2045 let mut store = SchemaStore::new();
2046 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2047 let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2048 let fp = Fingerprint::Id(123);
2050 let _ = store.set(fp, schema1).unwrap();
2051 let err = store.set(fp, schema2).unwrap_err();
2052 let msg = format!("{err}");
2053 assert!(msg.contains("Schema fingerprint collision"));
2054 }
2055
2056 #[test]
2057 fn test_canonical_form_generation_primitive() {
2058 let schema = int_schema();
2059 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2060 assert_eq!(canonical_form, r#""int""#);
2061 }
2062
2063 #[test]
2064 fn test_canonical_form_generation_record() {
2065 let schema = record_schema();
2066 let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2067 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2068 assert_eq!(canonical_form, expected_canonical_form);
2069 }
2070
2071 #[test]
2072 fn test_fingerprint_calculation() {
2073 let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2074 let expected_fingerprint = 10505236152925314060;
2075 let fingerprint = compute_fingerprint_rabin(canonical_form);
2076 assert_eq!(fingerprint, expected_fingerprint);
2077 }
2078
2079 #[test]
2080 fn test_register_and_lookup_complex_schema() {
2081 let mut store = SchemaStore::new();
2082 let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2083 let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2084 let expected_fingerprint =
2085 Fingerprint::Rabin(super::compute_fingerprint_rabin(canonical_form));
2086 let fingerprint = store.register(schema.clone()).unwrap();
2087 assert_eq!(fingerprint, expected_fingerprint);
2088 let looked_up = store.lookup(&fingerprint).cloned();
2089 assert_eq!(looked_up, Some(schema));
2090 }
2091
2092 #[test]
2093 fn test_fingerprints_returns_all_keys() {
2094 let mut store = SchemaStore::new();
2095 let fp_int = store
2096 .register(AvroSchema::new(
2097 serde_json::to_string(&int_schema()).unwrap(),
2098 ))
2099 .unwrap();
2100 let fp_record = store
2101 .register(AvroSchema::new(
2102 serde_json::to_string(&record_schema()).unwrap(),
2103 ))
2104 .unwrap();
2105 let fps = store.fingerprints();
2106 assert_eq!(fps.len(), 2);
2107 assert!(fps.contains(&fp_int));
2108 assert!(fps.contains(&fp_record));
2109 }
2110
2111 #[test]
2112 fn test_canonical_form_strips_attributes() {
2113 let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2114 name: "record_with_attrs",
2115 namespace: None,
2116 doc: Some("This doc should be stripped"),
2117 aliases: vec!["alias1", "alias2"],
2118 fields: vec![Field {
2119 name: "f1",
2120 doc: Some("field doc"),
2121 r#type: Schema::Type(Type {
2122 r#type: TypeName::Primitive(PrimitiveType::Bytes),
2123 attributes: Attributes {
2124 logical_type: None,
2125 additional: HashMap::from([("precision", json!(4))]),
2126 },
2127 }),
2128 default: None,
2129 }],
2130 attributes: Attributes {
2131 logical_type: None,
2132 additional: HashMap::from([("custom_attr", json!("value"))]),
2133 },
2134 }));
2135 let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2136 let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2137 assert_eq!(canonical_form, expected_canonical_form);
2138 }
2139
2140 #[test]
2141 fn test_primitive_mappings() {
2142 let cases = vec![
2143 (DataType::Boolean, "\"boolean\""),
2144 (DataType::Int8, "\"int\""),
2145 (DataType::Int16, "\"int\""),
2146 (DataType::Int32, "\"int\""),
2147 (DataType::Int64, "\"long\""),
2148 (DataType::UInt8, "\"int\""),
2149 (DataType::UInt16, "\"int\""),
2150 (DataType::UInt32, "\"long\""),
2151 (DataType::UInt64, "\"long\""),
2152 (DataType::Float16, "\"float\""),
2153 (DataType::Float32, "\"float\""),
2154 (DataType::Float64, "\"double\""),
2155 (DataType::Utf8, "\"string\""),
2156 (DataType::Binary, "\"bytes\""),
2157 ];
2158 for (dt, avro_token) in cases {
2159 let field = ArrowField::new("col", dt.clone(), false);
2160 let arrow_schema = single_field_schema(field);
2161 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2162 assert_json_contains(&avro.json_string, avro_token);
2163 }
2164 }
2165
2166 #[test]
2167 fn test_temporal_mappings() {
2168 let cases = vec![
2169 (DataType::Date32, "\"logicalType\":\"date\""),
2170 (
2171 DataType::Time32(TimeUnit::Millisecond),
2172 "\"logicalType\":\"time-millis\"",
2173 ),
2174 (
2175 DataType::Time64(TimeUnit::Microsecond),
2176 "\"logicalType\":\"time-micros\"",
2177 ),
2178 (
2179 DataType::Timestamp(TimeUnit::Millisecond, None),
2180 "\"logicalType\":\"local-timestamp-millis\"",
2181 ),
2182 (
2183 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2184 "\"logicalType\":\"timestamp-micros\"",
2185 ),
2186 ];
2187 for (dt, needle) in cases {
2188 let field = ArrowField::new("ts", dt.clone(), true);
2189 let arrow_schema = single_field_schema(field);
2190 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2191 assert_json_contains(&avro.json_string, needle);
2192 }
2193 }
2194
2195 #[test]
2196 fn test_decimal_and_uuid() {
2197 let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2198 let dec_schema = single_field_schema(decimal_field);
2199 let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2200 assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2201 assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2202 assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2203 let mut md = HashMap::new();
2204 md.insert("logicalType".into(), "uuid".into());
2205 let uuid_field =
2206 ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2207 let uuid_schema = single_field_schema(uuid_field);
2208 let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2209 assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2210 }
2211
2212 #[test]
2213 fn test_interval_duration() {
2214 let interval_field = ArrowField::new(
2215 "span",
2216 DataType::Interval(IntervalUnit::MonthDayNano),
2217 false,
2218 );
2219 let s = single_field_schema(interval_field);
2220 let avro = AvroSchema::try_from(&s).unwrap();
2221 assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2222 assert_json_contains(&avro.json_string, "\"size\":12");
2223 let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2224 let s2 = single_field_schema(dur_field);
2225 let avro2 = AvroSchema::try_from(&s2).unwrap();
2226 #[cfg(feature = "avro_custom_types")]
2227 assert_json_contains(
2228 &avro2.json_string,
2229 "\"logicalType\":\"arrow.duration-nanos\"",
2230 );
2231 }
2232
2233 #[test]
2234 fn test_complex_types() {
2235 let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2236 let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2237 let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2238 assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2239 assert_json_contains(&avro_list.json_string, "\"items\"");
2240 let value_field = ArrowField::new("value", DataType::Boolean, true);
2241 let entries_struct = ArrowField::new(
2242 "entries",
2243 DataType::Struct(Fields::from(vec![
2244 ArrowField::new("key", DataType::Utf8, false),
2245 value_field.clone(),
2246 ])),
2247 false,
2248 );
2249 let map_dt = DataType::Map(Arc::new(entries_struct), false);
2250 let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
2251 let avro_map = AvroSchema::try_from(&map_schema).unwrap();
2252 assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
2253 assert_json_contains(&avro_map.json_string, "\"values\"");
2254 let struct_dt = DataType::Struct(Fields::from(vec![
2255 ArrowField::new("f1", DataType::Int64, false),
2256 ArrowField::new("f2", DataType::Utf8, true),
2257 ]));
2258 let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
2259 let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
2260 assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
2261 assert_json_contains(&avro_struct.json_string, "\"null\"");
2262 }
2263
2264 #[test]
2265 fn test_enum_dictionary() {
2266 let mut md = HashMap::new();
2267 md.insert(
2268 AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
2269 "[\"OPEN\",\"CLOSED\"]".into(),
2270 );
2271 let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2272 let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
2273 let schema = single_field_schema(field);
2274 let avro = AvroSchema::try_from(&schema).unwrap();
2275 assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
2276 assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
2277 }
2278
2279 #[test]
2280 fn test_run_end_encoded() {
2281 let ree_dt = DataType::RunEndEncoded(
2282 Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
2283 Arc::new(ArrowField::new("values", DataType::Utf8, false)),
2284 );
2285 let s = single_field_schema(ArrowField::new("text", ree_dt, false));
2286 let avro = AvroSchema::try_from(&s).unwrap();
2287 assert_json_contains(&avro.json_string, "\"string\"");
2288 }
2289
2290 #[test]
2291 fn test_dense_union() {
2292 let uf: UnionFields = vec![
2293 (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
2294 (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
2295 ]
2296 .into_iter()
2297 .collect();
2298 let union_dt = DataType::Union(uf, UnionMode::Dense);
2299 let s = single_field_schema(ArrowField::new("u", union_dt, false));
2300 let avro =
2301 AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
2302 let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
2303 let fields = v
2304 .get("fields")
2305 .and_then(|x| x.as_array())
2306 .expect("fields array");
2307 let u_field = fields
2308 .iter()
2309 .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
2310 .expect("field 'u'");
2311 let union = u_field.get("type").expect("u.type");
2312 let arr = union.as_array().expect("u.type must be Avro union array");
2313 assert_eq!(arr.len(), 2, "expected two union branches");
2314 let first = &arr[0];
2315 let obj = first
2316 .as_object()
2317 .expect("first branch should be an object with metadata");
2318 assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
2319 assert_eq!(
2320 obj.get("arrowUnionMode").and_then(|m| m.as_str()),
2321 Some("dense")
2322 );
2323 let type_ids: Vec<i64> = obj
2324 .get("arrowUnionTypeIds")
2325 .and_then(|a| a.as_array())
2326 .expect("arrowUnionTypeIds array")
2327 .iter()
2328 .map(|n| n.as_i64().expect("i64"))
2329 .collect();
2330 assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
2331 assert_eq!(arr[1], Value::String("string".into()));
2332 }
2333
2334 #[test]
2335 fn round_trip_primitive() {
2336 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
2337 let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
2338 let decoded = avro_schema.schema().unwrap();
2339 assert!(matches!(decoded, Schema::Complex(_)));
2340 }
2341
2342 #[test]
2343 fn test_name_generator_sanitization_and_uniqueness() {
2344 let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
2345 let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
2346 let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
2347 let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
2348 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2349 assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
2350 assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
2351 assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
2352 }
2353
2354 #[test]
2355 fn test_date64_logical_type_mapping() {
2356 let field = ArrowField::new("d", DataType::Date64, true);
2357 let schema = single_field_schema(field);
2358 let avro = AvroSchema::try_from(&schema).unwrap();
2359 assert_json_contains(
2360 &avro.json_string,
2361 "\"logicalType\":\"local-timestamp-millis\"",
2362 );
2363 }
2364
2365 #[test]
2366 fn test_duration_list_extras_propagated() {
2367 let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
2368 let list_dt = DataType::List(Arc::new(child));
2369 let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
2370 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2371 #[cfg(feature = "avro_custom_types")]
2372 assert_json_contains(
2373 &avro.json_string,
2374 "\"logicalType\":\"arrow.duration-micros\"",
2375 );
2376 }
2377
2378 #[test]
2379 fn test_interval_yearmonth_extra() {
2380 let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
2381 let schema = single_field_schema(field);
2382 let avro = AvroSchema::try_from(&schema).unwrap();
2383 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
2384 }
2385
2386 #[test]
2387 fn test_interval_daytime_extra() {
2388 let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
2389 let schema = single_field_schema(field);
2390 let avro = AvroSchema::try_from(&schema).unwrap();
2391 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
2392 }
2393
2394 #[test]
2395 fn test_fixed_size_list_extra() {
2396 let child = ArrowField::new("item", DataType::Int32, false);
2397 let dt = DataType::FixedSizeList(Arc::new(child), 3);
2398 let schema = single_field_schema(ArrowField::new("triples", dt, false));
2399 let avro = AvroSchema::try_from(&schema).unwrap();
2400 assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
2401 }
2402
2403 #[test]
2404 fn test_map_duration_value_extra() {
2405 let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
2406 let entries_struct = ArrowField::new(
2407 "entries",
2408 DataType::Struct(Fields::from(vec![
2409 ArrowField::new("key", DataType::Utf8, false),
2410 val_field,
2411 ])),
2412 false,
2413 );
2414 let map_dt = DataType::Map(Arc::new(entries_struct), false);
2415 let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
2416 let avro = AvroSchema::try_from(&schema).unwrap();
2417 #[cfg(feature = "avro_custom_types")]
2418 assert_json_contains(
2419 &avro.json_string,
2420 "\"logicalType\":\"arrow.duration-seconds\"",
2421 );
2422 }
2423
2424 #[test]
2425 fn test_schema_with_non_string_defaults_decodes_successfully() {
2426 let schema_json = r#"{
2427 "type": "record",
2428 "name": "R",
2429 "fields": [
2430 {"name": "a", "type": "int", "default": 0},
2431 {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
2432 {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
2433 {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
2434 {"name": "flag", "type": "boolean", "default": true},
2435 {"name": "name", "type": "string", "default": "hi"}
2436 ]}, "default": {"flag": false, "name": "d"}},
2437 {"name": "u", "type": ["int", "null"], "default": 42}
2438 ]
2439 }"#;
2440
2441 let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
2442 match &schema {
2443 Schema::Complex(ComplexType::Record(_)) => {}
2444 other => panic!("expected record schema, got: {:?}", other),
2445 }
2446 let field = crate::codec::AvroField::try_from(&schema)
2448 .expect("Avro->Arrow conversion should succeed");
2449 let arrow_field = field.field();
2450
2451 let expected_list_item = ArrowField::new(
2453 arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
2454 DataType::Int64,
2455 false,
2456 );
2457 let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
2458
2459 let expected_map_value = ArrowField::new("value", DataType::Float64, false);
2460 let expected_entries = ArrowField::new(
2461 "entries",
2462 DataType::Struct(Fields::from(vec![
2463 ArrowField::new("key", DataType::Utf8, false),
2464 expected_map_value,
2465 ])),
2466 false,
2467 );
2468 let expected_c =
2469 ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
2470
2471 let expected_inner = ArrowField::new(
2472 "inner",
2473 DataType::Struct(Fields::from(vec![
2474 ArrowField::new("flag", DataType::Boolean, false),
2475 ArrowField::new("name", DataType::Utf8, false),
2476 ])),
2477 false,
2478 );
2479
2480 let expected = ArrowField::new(
2481 "R",
2482 DataType::Struct(Fields::from(vec![
2483 ArrowField::new("a", DataType::Int32, false),
2484 expected_b,
2485 expected_c,
2486 expected_inner,
2487 ArrowField::new("u", DataType::Int32, true),
2488 ])),
2489 false,
2490 );
2491
2492 assert_eq!(arrow_field, expected);
2493 }
2494
2495 #[test]
2496 fn default_order_is_consistent() {
2497 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
2498 let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
2499 let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
2500 assert_eq!(a, b.unwrap().json_string);
2501 }
2502
2503 #[test]
2504 fn test_union_branch_missing_name_errors() {
2505 for t in ["record", "enum", "fixed"] {
2506 let branch = json!({ "type": t });
2507 let err = union_branch_signature(&branch).unwrap_err().to_string();
2508 assert!(
2509 err.contains(&format!("Union branch '{t}' missing required 'name'")),
2510 "expected missing-name error for {t}, got: {err}"
2511 );
2512 }
2513 }
2514
2515 #[test]
2516 fn test_union_branch_named_type_signature_includes_name() {
2517 let rec = json!({ "type": "record", "name": "Foo" });
2518 assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
2519 let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
2520 assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
2521 let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
2522 assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
2523 }
2524}