1#[cfg(feature = "canonical_extension_types")]
21use arrow_schema::extension::ExtensionType;
22use arrow_schema::{
23 ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
24 UnionMode,
25};
26use serde::{Deserialize, Serialize};
27use serde_json::{Map as JsonMap, Value, json};
28#[cfg(feature = "sha256")]
29use sha2::{Digest, Sha256};
30use std::borrow::Cow;
31use std::cmp::PartialEq;
32use std::collections::hash_map::Entry;
33use std::collections::{HashMap, HashSet};
34use strum_macros::AsRefStr;
35
36pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
38
39pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
41
42pub const MAX_PREFIX_LEN: usize = 34;
45
46pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
48
49pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
51
52pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
54
55pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
57
58pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
60
61pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
63
64pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
66
67#[derive(Debug, Copy, Clone, PartialEq, Default)]
73pub(crate) enum Nullability {
74 #[default]
76 NullFirst,
77 NullSecond,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85#[serde(untagged)]
86pub(crate) enum TypeName<'a> {
90 Primitive(PrimitiveType),
92 Ref(&'a str),
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
100#[serde(rename_all = "camelCase")]
101#[strum(serialize_all = "lowercase")]
102pub(crate) enum PrimitiveType {
103 Null,
105 Boolean,
107 Int,
109 Long,
111 Float,
113 Double,
115 Bytes,
117 String,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
125#[serde(rename_all = "camelCase")]
126pub(crate) struct Attributes<'a> {
127 #[serde(default)]
131 pub(crate) logical_type: Option<&'a str>,
132
133 #[serde(flatten)]
135 pub(crate) additional: HashMap<&'a str, Value>,
136}
137
138impl Attributes<'_> {
139 pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
141 self.additional
142 .iter()
143 .map(|(k, v)| (k.to_string(), v.to_string()))
144 .collect()
145 }
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
150#[serde(rename_all = "camelCase")]
151pub(crate) struct Type<'a> {
152 #[serde(borrow)]
154 pub(crate) r#type: TypeName<'a>,
155 #[serde(flatten)]
157 pub(crate) attributes: Attributes<'a>,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(untagged)]
166pub(crate) enum Schema<'a> {
167 #[serde(borrow)]
169 TypeName(TypeName<'a>),
170 #[serde(borrow)]
172 Union(Vec<Schema<'a>>),
173 #[serde(borrow)]
175 Complex(ComplexType<'a>),
176 #[serde(borrow)]
178 Type(Type<'a>),
179}
180
181#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(tag = "type", rename_all = "camelCase")]
186pub(crate) enum ComplexType<'a> {
187 #[serde(borrow)]
189 Record(Record<'a>),
190 #[serde(borrow)]
192 Enum(Enum<'a>),
193 #[serde(borrow)]
195 Array(Array<'a>),
196 #[serde(borrow)]
198 Map(Map<'a>),
199 #[serde(borrow)]
201 Fixed(Fixed<'a>),
202}
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub(crate) struct Record<'a> {
209 #[serde(borrow)]
211 pub(crate) name: &'a str,
212 #[serde(borrow, default)]
214 pub(crate) namespace: Option<&'a str>,
215 #[serde(borrow, default)]
217 pub(crate) doc: Option<Cow<'a, str>>,
218 #[serde(borrow, default)]
220 pub(crate) aliases: Vec<&'a str>,
221 #[serde(borrow)]
223 pub(crate) fields: Vec<Field<'a>>,
224 #[serde(flatten)]
226 pub(crate) attributes: Attributes<'a>,
227}
228
229fn deserialize_default<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
230where
231 D: serde::Deserializer<'de>,
232{
233 Value::deserialize(deserializer).map(Some)
234}
235
236#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
238pub(crate) struct Field<'a> {
239 #[serde(borrow)]
241 pub(crate) name: &'a str,
242 #[serde(borrow, default)]
244 pub(crate) doc: Option<Cow<'a, str>>,
245 #[serde(borrow)]
247 pub(crate) r#type: Schema<'a>,
248 #[serde(deserialize_with = "deserialize_default", default)]
250 pub(crate) default: Option<Value>,
251 #[serde(borrow, default)]
254 pub(crate) aliases: Vec<&'a str>,
255}
256
257#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
261pub(crate) struct Enum<'a> {
262 #[serde(borrow)]
264 pub(crate) name: &'a str,
265 #[serde(borrow, default)]
267 pub(crate) namespace: Option<&'a str>,
268 #[serde(borrow, default)]
270 pub(crate) doc: Option<Cow<'a, str>>,
271 #[serde(borrow, default)]
273 pub(crate) aliases: Vec<&'a str>,
274 #[serde(borrow)]
276 pub(crate) symbols: Vec<&'a str>,
277 #[serde(borrow, default)]
279 pub(crate) default: Option<&'a str>,
280 #[serde(flatten)]
282 pub(crate) attributes: Attributes<'a>,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
289pub(crate) struct Array<'a> {
290 #[serde(borrow)]
292 pub(crate) items: Box<Schema<'a>>,
293 #[serde(flatten)]
295 pub(crate) attributes: Attributes<'a>,
296}
297
298#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
302pub(crate) struct Map<'a> {
303 #[serde(borrow)]
305 pub(crate) values: Box<Schema<'a>>,
306 #[serde(flatten)]
308 pub(crate) attributes: Attributes<'a>,
309}
310
311#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
315pub(crate) struct Fixed<'a> {
316 #[serde(borrow)]
318 pub(crate) name: &'a str,
319 #[serde(borrow, default)]
321 pub(crate) namespace: Option<&'a str>,
322 #[serde(borrow, default)]
324 pub(crate) aliases: Vec<&'a str>,
325 pub(crate) size: usize,
327 #[serde(flatten)]
329 pub(crate) attributes: Attributes<'a>,
330}
331
332#[derive(Debug, Copy, Clone, PartialEq, Default)]
333pub(crate) struct AvroSchemaOptions {
334 pub(crate) null_order: Option<Nullability>,
335 pub(crate) strip_metadata: bool,
336}
337
338#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340pub struct AvroSchema {
341 pub json_string: String,
343}
344
345impl TryFrom<&ArrowSchema> for AvroSchema {
346 type Error = ArrowError;
347
348 fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
352 AvroSchema::from_arrow_with_options(schema, None)
353 }
354}
355
356impl AvroSchema {
357 pub fn new(json_string: String) -> Self {
359 Self { json_string }
360 }
361
362 pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
363 serde_json::from_str(self.json_string.as_str())
364 .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
365 }
366
367 pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
395 Self::generate_fingerprint(&self.schema()?, hash_type)
396 }
397
398 pub(crate) fn generate_fingerprint(
399 schema: &Schema,
400 hash_type: FingerprintAlgorithm,
401 ) -> Result<Fingerprint, ArrowError> {
402 let canonical = Self::generate_canonical_form(schema).map_err(|e| {
403 ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
404 })?;
405 match hash_type {
406 FingerprintAlgorithm::Rabin => {
407 Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
408 }
409 FingerprintAlgorithm::Id | FingerprintAlgorithm::Id64 => Err(ArrowError::SchemaError(
410 "FingerprintAlgorithm of Id or Id64 cannot be used to generate a fingerprint; \
411 if using Fingerprint::Id, pass the registry ID in instead using the set method."
412 .to_string(),
413 )),
414 #[cfg(feature = "md5")]
415 FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
416 #[cfg(feature = "sha256")]
417 FingerprintAlgorithm::SHA256 => {
418 Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
419 }
420 }
421 }
422
423 pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
434 build_canonical(schema, None)
435 }
436
437 pub(crate) fn from_arrow_with_options(
444 schema: &ArrowSchema,
445 options: Option<AvroSchemaOptions>,
446 ) -> Result<AvroSchema, ArrowError> {
447 let opts = options.unwrap_or_default();
448 let order = opts.null_order.unwrap_or_default();
449 let strip = opts.strip_metadata;
450 if !strip {
451 if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
452 return Ok(AvroSchema::new(json.clone()));
453 }
454 }
455 let mut name_gen = NameGenerator::default();
456 let fields_json = schema
457 .fields()
458 .iter()
459 .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
460 .collect::<Result<Vec<_>, _>>()?;
461 let record_name = schema
462 .metadata
463 .get(AVRO_NAME_METADATA_KEY)
464 .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
465 let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
466 record.insert("type".into(), Value::String("record".into()));
467 record.insert(
468 "name".into(),
469 Value::String(sanitise_avro_name(record_name)),
470 );
471 if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
472 record.insert("namespace".into(), Value::String(ns.clone()));
473 }
474 if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
475 record.insert("doc".into(), Value::String(doc.clone()));
476 }
477 record.insert("fields".into(), Value::Array(fields_json));
478 extend_with_passthrough_metadata(&mut record, &schema.metadata);
479 let json_string = serde_json::to_string(&Value::Object(record))
480 .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
481 Ok(AvroSchema::new(json_string))
482 }
483}
484
485#[derive(Debug, Copy, Clone)]
487pub(crate) struct Prefix {
488 buf: [u8; MAX_PREFIX_LEN],
489 len: u8,
490}
491
492impl Prefix {
493 #[inline]
494 pub(crate) fn as_slice(&self) -> &[u8] {
495 &self.buf[..self.len as usize]
496 }
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
501pub enum FingerprintStrategy {
502 #[default]
504 Rabin,
505 Id(u32),
507 Id64(u64),
509 #[cfg(feature = "md5")]
510 MD5,
512 #[cfg(feature = "sha256")]
513 SHA256,
515}
516
517impl From<Fingerprint> for FingerprintStrategy {
518 fn from(f: Fingerprint) -> Self {
519 Self::from(&f)
520 }
521}
522
523impl From<FingerprintAlgorithm> for FingerprintStrategy {
524 fn from(f: FingerprintAlgorithm) -> Self {
525 match f {
526 FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
527 FingerprintAlgorithm::Id => FingerprintStrategy::Id(0),
528 FingerprintAlgorithm::Id64 => FingerprintStrategy::Id64(0),
529 #[cfg(feature = "md5")]
530 FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
531 #[cfg(feature = "sha256")]
532 FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
533 }
534 }
535}
536
537impl From<&Fingerprint> for FingerprintStrategy {
538 fn from(f: &Fingerprint) -> Self {
539 match f {
540 Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
541 Fingerprint::Id(_) => FingerprintStrategy::Id(0),
542 Fingerprint::Id64(_) => FingerprintStrategy::Id64(0),
543 #[cfg(feature = "md5")]
544 Fingerprint::MD5(_) => FingerprintStrategy::MD5,
545 #[cfg(feature = "sha256")]
546 Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
547 }
548 }
549}
550
551#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
554pub enum FingerprintAlgorithm {
555 #[default]
557 Rabin,
558 Id,
560 Id64,
562 #[cfg(feature = "md5")]
563 MD5,
565 #[cfg(feature = "sha256")]
566 SHA256,
568}
569
570impl From<&Fingerprint> for FingerprintAlgorithm {
572 fn from(fp: &Fingerprint) -> Self {
573 match fp {
574 Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
575 Fingerprint::Id(_) => FingerprintAlgorithm::Id,
576 Fingerprint::Id64(_) => FingerprintAlgorithm::Id64,
577 #[cfg(feature = "md5")]
578 Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
579 #[cfg(feature = "sha256")]
580 Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
581 }
582 }
583}
584
585impl From<FingerprintStrategy> for FingerprintAlgorithm {
586 fn from(s: FingerprintStrategy) -> Self {
587 Self::from(&s)
588 }
589}
590
591impl From<&FingerprintStrategy> for FingerprintAlgorithm {
592 fn from(s: &FingerprintStrategy) -> Self {
593 match s {
594 FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
595 FingerprintStrategy::Id(_) => FingerprintAlgorithm::Id,
596 FingerprintStrategy::Id64(_) => FingerprintAlgorithm::Id64,
597 #[cfg(feature = "md5")]
598 FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
599 #[cfg(feature = "sha256")]
600 FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
601 }
602 }
603}
604
605#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
614pub enum Fingerprint {
615 Rabin(u64),
617 Id(u32),
619 Id64(u64),
621 #[cfg(feature = "md5")]
622 MD5([u8; 16]),
624 #[cfg(feature = "sha256")]
625 SHA256([u8; 32]),
627}
628
629impl From<FingerprintStrategy> for Fingerprint {
630 fn from(s: FingerprintStrategy) -> Self {
631 Self::from(&s)
632 }
633}
634
635impl From<&FingerprintStrategy> for Fingerprint {
636 fn from(s: &FingerprintStrategy) -> Self {
637 match s {
638 FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
639 FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
640 FingerprintStrategy::Id64(id) => Fingerprint::Id64(*id),
641 #[cfg(feature = "md5")]
642 FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
643 #[cfg(feature = "sha256")]
644 FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
645 }
646 }
647}
648
649impl From<FingerprintAlgorithm> for Fingerprint {
650 fn from(s: FingerprintAlgorithm) -> Self {
651 match s {
652 FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
653 FingerprintAlgorithm::Id => Fingerprint::Id(0),
654 FingerprintAlgorithm::Id64 => Fingerprint::Id64(0),
655 #[cfg(feature = "md5")]
656 FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
657 #[cfg(feature = "sha256")]
658 FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
659 }
660 }
661}
662
663impl Fingerprint {
664 pub fn load_fingerprint_id(id: u32) -> Self {
672 Fingerprint::Id(u32::from_be(id))
673 }
674
675 pub fn load_fingerprint_id64(id: u64) -> Self {
683 Fingerprint::Id64(u64::from_be(id))
684 }
685
686 pub(crate) fn make_prefix(&self) -> Prefix {
710 let mut buf = [0u8; MAX_PREFIX_LEN];
711 let len = match self {
712 Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
713 Self::Id64(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
714 Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
715 #[cfg(feature = "md5")]
716 Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
717 #[cfg(feature = "sha256")]
718 Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
719 };
720 Prefix { buf, len }
721 }
722}
723
724fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
725 buf: &mut [u8; MAX_PREFIX_LEN],
726 magic: &[u8; MAGIC_LEN],
727 payload: &[u8; PAYLOAD_LEN],
728) -> u8 {
729 debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
730 let total = MAGIC_LEN + PAYLOAD_LEN;
731 let prefix_slice = &mut buf[..total];
732 prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
733 prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
734 total as u8
735}
736
737#[derive(Debug, Clone, Default)]
764pub struct SchemaStore {
765 fingerprint_algorithm: FingerprintAlgorithm,
767 schemas: HashMap<Fingerprint, AvroSchema>,
769}
770
771impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
772 type Error = ArrowError;
773
774 fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
777 Ok(Self {
778 schemas,
779 ..Self::default()
780 })
781 }
782}
783
784impl SchemaStore {
785 pub fn new() -> Self {
787 Self::default()
788 }
789
790 pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
792 Self {
793 fingerprint_algorithm,
794 ..Self::default()
795 }
796 }
797
798 pub fn set(
815 &mut self,
816 fingerprint: Fingerprint,
817 schema: AvroSchema,
818 ) -> Result<Fingerprint, ArrowError> {
819 match self.schemas.entry(fingerprint) {
820 Entry::Occupied(entry) => {
821 if entry.get() != &schema {
822 return Err(ArrowError::ComputeError(format!(
823 "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
824 )));
825 }
826 }
827 Entry::Vacant(entry) => {
828 entry.insert(schema);
829 }
830 }
831 Ok(fingerprint)
832 }
833
834 pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
852 if self.fingerprint_algorithm == FingerprintAlgorithm::Id
853 || self.fingerprint_algorithm == FingerprintAlgorithm::Id64
854 {
855 return Err(ArrowError::SchemaError(
856 "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
857 Use the set method directly instead, providing a valid fingerprint"
858 .to_string(),
859 ));
860 }
861 let fingerprint =
862 AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
863 self.set(fingerprint, schema)?;
864 Ok(fingerprint)
865 }
866
867 pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
877 self.schemas.get(fingerprint)
878 }
879
880 pub fn fingerprints(&self) -> Vec<Fingerprint> {
886 self.schemas.keys().copied().collect()
887 }
888
889 pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
891 self.fingerprint_algorithm
892 }
893}
894
895fn quote(s: &str) -> Result<String, ArrowError> {
896 serde_json::to_string(s)
897 .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
898}
899
900pub(crate) fn make_full_name(
917 name: &str,
918 namespace_attr: Option<&str>,
919 enclosing_ns: Option<&str>,
920) -> (String, Option<String>) {
921 if let Some((ns, _)) = name.rsplit_once('.') {
923 return (name.to_string(), Some(ns.to_string()));
924 }
925 match namespace_attr.or(enclosing_ns) {
926 Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
927 None => (name.to_string(), None),
928 }
929}
930
931fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
932 Ok(match schema {
933 Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
934 TypeName::Primitive(pt) => quote(pt.as_ref())?,
935 TypeName::Ref(name) => {
936 let (full_name, _) = make_full_name(name, None, enclosing_ns);
937 quote(&full_name)?
938 }
939 },
940 Schema::Union(branches) => format!(
941 "[{}]",
942 branches
943 .iter()
944 .map(|b| build_canonical(b, enclosing_ns))
945 .collect::<Result<Vec<_>, _>>()?
946 .join(",")
947 ),
948 Schema::Complex(ct) => match ct {
949 ComplexType::Record(r) => {
950 let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
951 let fields = r
952 .fields
953 .iter()
954 .map(|f| {
955 let field_type =
960 build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
961 Ok(format!(
962 r#"{{"name":{},"type":{}}}"#,
963 quote(f.name)?,
964 field_type
965 ))
966 })
967 .collect::<Result<Vec<_>, ArrowError>>()?
968 .join(",");
969 format!(
970 r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
971 quote(&full_name)?,
972 )
973 }
974 ComplexType::Enum(e) => {
975 let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
976 let symbols = e
977 .symbols
978 .iter()
979 .map(|s| quote(s))
980 .collect::<Result<Vec<_>, _>>()?
981 .join(",");
982 format!(
983 r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
984 quote(&full_name)?
985 )
986 }
987 ComplexType::Array(arr) => format!(
988 r#"{{"type":"array","items":{}}}"#,
989 build_canonical(&arr.items, enclosing_ns)?
990 ),
991 ComplexType::Map(map) => format!(
992 r#"{{"type":"map","values":{}}}"#,
993 build_canonical(&map.values, enclosing_ns)?
994 ),
995 ComplexType::Fixed(f) => {
996 let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
997 format!(
998 r#"{{"name":{},"type":"fixed","size":{}}}"#,
999 quote(&full_name)?,
1000 f.size
1001 )
1002 }
1003 },
1004 })
1005}
1006
1007const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
1009
1010const fn one_entry(i: usize) -> u64 {
1017 let mut fp = i as u64;
1018 let mut j = 0;
1019 while j < 8 {
1020 fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
1021 j += 1;
1022 }
1023 fp
1024}
1025
1026const fn build_table() -> [u64; 256] {
1033 let mut table = [0u64; 256];
1034 let mut i = 0;
1035 while i < 256 {
1036 table[i] = one_entry(i);
1037 i += 1;
1038 }
1039 table
1040}
1041
1042static FINGERPRINT_TABLE: [u64; 256] = build_table();
1044
1045pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1048 let mut fp = EMPTY;
1049 for &byte in canonical_form.as_bytes() {
1050 let idx = ((fp as u8) ^ byte) as usize;
1051 fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1052 }
1053 fp
1054}
1055
1056#[cfg(feature = "md5")]
1057#[inline]
1062pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1063 let digest = md5::compute(canonical_form.as_bytes());
1064 digest.0
1065}
1066
1067#[cfg(feature = "sha256")]
1068#[inline]
1072pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1073 let mut hasher = Sha256::new();
1074 hasher.update(canonical_form.as_bytes());
1075 let digest = hasher.finalize();
1076 digest.into()
1077}
1078
1079#[inline]
1080fn is_internal_arrow_key(key: &str) -> bool {
1081 key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1082}
1083
1084fn extend_with_passthrough_metadata(
1089 target: &mut JsonMap<String, Value>,
1090 metadata: &HashMap<String, String>,
1091) {
1092 for (meta_key, meta_val) in metadata {
1093 if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1094 continue;
1095 }
1096 let json_val =
1097 serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1098 target.insert(meta_key.clone(), json_val);
1099 }
1100}
1101
1102fn sanitise_avro_name(base_name: &str) -> String {
1104 if base_name.is_empty() {
1105 return "_".to_owned();
1106 }
1107 let mut out: String = base_name
1108 .chars()
1109 .map(|char| {
1110 if char.is_ascii_alphanumeric() || char == '_' {
1111 char
1112 } else {
1113 '_'
1114 }
1115 })
1116 .collect();
1117 if out.as_bytes()[0].is_ascii_digit() {
1118 out.insert(0, '_');
1119 }
1120 out
1121}
1122
1123#[derive(Default)]
1124struct NameGenerator {
1125 used: HashSet<String>,
1126 counters: HashMap<String, usize>,
1127}
1128
1129impl NameGenerator {
1130 fn make_unique(&mut self, field_name: &str) -> String {
1131 let field_name = sanitise_avro_name(field_name);
1132 if self.used.insert(field_name.clone()) {
1133 self.counters.insert(field_name.clone(), 1);
1134 return field_name;
1135 }
1136 let counter = self.counters.entry(field_name.clone()).or_insert(1);
1137 loop {
1138 let candidate = format!("{field_name}_{}", *counter);
1139 if self.used.insert(candidate.clone()) {
1140 return candidate;
1141 }
1142 *counter += 1;
1143 }
1144 }
1145}
1146
1147fn merge_extras(schema: Value, extras: JsonMap<String, Value>) -> Value {
1148 if extras.is_empty() {
1149 return schema;
1150 }
1151 match schema {
1152 Value::Object(mut map) => {
1153 map.extend(extras);
1154 Value::Object(map)
1155 }
1156 Value::Array(mut union) => {
1157 if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1160 let original = std::mem::take(non_null);
1161 *non_null = merge_extras(original, extras);
1162 }
1163 Value::Array(union)
1164 }
1165 primitive => {
1166 let mut map = JsonMap::with_capacity(extras.len() + 1);
1167 map.insert("type".into(), primitive);
1168 map.extend(extras);
1169 Value::Object(map)
1170 }
1171 }
1172}
1173
1174#[inline]
1175fn is_avro_json_null(v: &Value) -> bool {
1176 matches!(v, Value::String(s) if s == "null")
1177}
1178
1179fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1180 let null = Value::String("null".into());
1181 match inner {
1182 Value::Array(mut union) => {
1183 if union.iter().any(is_avro_json_null) {
1188 return Value::Array(union);
1189 }
1190 match null_order {
1192 Nullability::NullFirst => union.insert(0, null),
1193 Nullability::NullSecond => union.push(null),
1194 }
1195 Value::Array(union)
1196 }
1197 other => match null_order {
1198 Nullability::NullFirst => Value::Array(vec![null, other]),
1199 Nullability::NullSecond => Value::Array(vec![other, null]),
1200 },
1201 }
1202}
1203
1204fn min_fixed_bytes_for_precision(p: usize) -> usize {
1205 const MAX_P: [usize; 32] = [
1208 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1209 59, 62, 64, 67, 69, 71, 74, 76,
1210 ];
1211 for (i, &max_p) in MAX_P.iter().enumerate() {
1212 if p <= max_p {
1213 return i + 1;
1214 }
1215 }
1216 32 }
1218
1219fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1220 match branch {
1221 Value::String(t) => Ok(format!("P:{t}")),
1222 Value::Object(map) => {
1223 let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1224 ArrowError::SchemaError("Union branch object missing string 'type'".into())
1225 })?;
1226 match t {
1227 "record" | "enum" | "fixed" => {
1228 let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1229 ArrowError::SchemaError(format!(
1230 "Union branch '{t}' missing required 'name'"
1231 ))
1232 })?;
1233 Ok(format!("N:{t}:{name}"))
1234 }
1235 "array" | "map" => Ok(format!("C:{t}")),
1236 other => Ok(format!("P:{other}")),
1237 }
1238 }
1239 Value::Array(_) => Err(ArrowError::SchemaError(
1240 "Avro union may not immediately contain another union".into(),
1241 )),
1242 _ => Err(ArrowError::SchemaError(
1243 "Invalid JSON for Avro union branch".into(),
1244 )),
1245 }
1246}
1247
1248fn datatype_to_avro(
1249 dt: &DataType,
1250 field_name: &str,
1251 metadata: &HashMap<String, String>,
1252 name_gen: &mut NameGenerator,
1253 null_order: Nullability,
1254 strip: bool,
1255) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1256 let mut extras = JsonMap::new();
1257 let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1258 if *scale < 0 {
1259 return Err(ArrowError::SchemaError(format!(
1260 "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1261 )));
1262 }
1263 if (*scale as usize) > (*precision as usize) {
1264 return Err(ArrowError::SchemaError(format!(
1265 "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1266 must be <= precision ({precision})"
1267 )));
1268 }
1269 let mut meta = JsonMap::from_iter([
1270 ("logicalType".into(), json!("decimal")),
1271 ("precision".into(), json!(*precision)),
1272 ("scale".into(), json!(*scale)),
1273 ]);
1274 let mut fixed_size = metadata.get("size").and_then(|v| v.parse::<usize>().ok());
1275 let carries_name = metadata.contains_key(AVRO_NAME_METADATA_KEY)
1276 || metadata.contains_key(AVRO_NAMESPACE_METADATA_KEY);
1277 if fixed_size.is_none() && carries_name {
1278 fixed_size = Some(min_fixed_bytes_for_precision(*precision as usize));
1279 }
1280 if let Some(size) = fixed_size {
1281 meta.insert("type".into(), json!("fixed"));
1282 meta.insert("size".into(), json!(size));
1283 let chosen_name = metadata
1284 .get(AVRO_NAME_METADATA_KEY)
1285 .map(|s| sanitise_avro_name(s))
1286 .unwrap_or_else(|| name_gen.make_unique(field_name));
1287 meta.insert("name".into(), json!(chosen_name));
1288 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1289 meta.insert("namespace".into(), json!(ns));
1290 }
1291 } else {
1292 meta.insert("type".into(), json!("bytes"));
1294 }
1295 Ok(Value::Object(meta))
1296 };
1297 let val = match dt {
1298 DataType::Null => Value::String("null".into()),
1299 DataType::Boolean => Value::String("boolean".into()),
1300 DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 | DataType::Int32 => {
1301 Value::String("int".into())
1302 }
1303 DataType::UInt32 | DataType::Int64 | DataType::UInt64 => Value::String("long".into()),
1304 DataType::Float16 | DataType::Float32 => Value::String("float".into()),
1305 DataType::Float64 => Value::String("double".into()),
1306 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1307 DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1308 DataType::BinaryView => {
1309 if !strip {
1310 extras.insert("arrowBinaryView".into(), Value::Bool(true));
1311 }
1312 Value::String("bytes".into())
1313 }
1314 DataType::FixedSizeBinary(len) => {
1315 let md_is_uuid = metadata
1316 .get("logicalType")
1317 .map(|s| s.trim_matches('"') == "uuid")
1318 .unwrap_or(false);
1319 #[cfg(feature = "canonical_extension_types")]
1320 let ext_is_uuid = metadata
1321 .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
1322 .map(|v| v == arrow_schema::extension::Uuid::NAME || v == "uuid")
1323 .unwrap_or(false);
1324 #[cfg(not(feature = "canonical_extension_types"))]
1325 let ext_is_uuid = false;
1326 let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
1327 if is_uuid {
1328 json!({ "type": "string", "logicalType": "uuid" })
1329 } else {
1330 let chosen_name = metadata
1331 .get(AVRO_NAME_METADATA_KEY)
1332 .map(|s| sanitise_avro_name(s))
1333 .unwrap_or_else(|| name_gen.make_unique(field_name));
1334 let mut obj = JsonMap::from_iter([
1335 ("type".into(), json!("fixed")),
1336 ("name".into(), json!(chosen_name)),
1337 ("size".into(), json!(len)),
1338 ]);
1339 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1340 obj.insert("namespace".into(), json!(ns));
1341 }
1342 Value::Object(obj)
1343 }
1344 }
1345 #[cfg(feature = "small_decimals")]
1346 DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1347 handle_decimal(precision, scale)?
1348 }
1349 DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1350 handle_decimal(precision, scale)?
1351 }
1352 DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1353 DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1354 DataType::Time32(unit) => match unit {
1355 TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1356 TimeUnit::Second => {
1357 if !strip {
1358 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1359 }
1360 Value::String("int".into())
1361 }
1362 _ => Value::String("int".into()),
1363 },
1364 DataType::Time64(unit) => match unit {
1365 TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1366 TimeUnit::Nanosecond => {
1367 if !strip {
1368 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1369 }
1370 Value::String("long".into())
1371 }
1372 _ => Value::String("long".into()),
1373 },
1374 DataType::Timestamp(unit, tz) => {
1375 let logical_type = match (unit, tz.is_some()) {
1376 (TimeUnit::Millisecond, true) => "timestamp-millis",
1377 (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1378 (TimeUnit::Microsecond, true) => "timestamp-micros",
1379 (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1380 (TimeUnit::Nanosecond, true) => "timestamp-nanos",
1381 (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
1382 (TimeUnit::Second, _) => {
1383 if !strip {
1384 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1385 }
1386 return Ok((Value::String("long".into()), extras));
1387 }
1388 };
1389 if !strip && matches!(unit, TimeUnit::Nanosecond) {
1390 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1391 }
1392 json!({ "type": "long", "logicalType": logical_type })
1393 }
1394 #[cfg(not(feature = "avro_custom_types"))]
1395 DataType::Duration(_unit) => Value::String("long".into()),
1396 #[cfg(feature = "avro_custom_types")]
1397 DataType::Duration(unit) => {
1398 let logical_type = match unit {
1401 TimeUnit::Second => "arrow.duration-seconds",
1402 TimeUnit::Millisecond => "arrow.duration-millis",
1403 TimeUnit::Microsecond => "arrow.duration-micros",
1404 TimeUnit::Nanosecond => "arrow.duration-nanos",
1405 };
1406 json!({ "type": "long", "logicalType": logical_type })
1407 }
1408 DataType::Interval(IntervalUnit::MonthDayNano) => {
1409 let chosen_name = metadata
1411 .get(AVRO_NAME_METADATA_KEY)
1412 .map(|s| sanitise_avro_name(s))
1413 .unwrap_or_else(|| name_gen.make_unique(field_name));
1414 let mut obj = JsonMap::from_iter([
1415 ("type".into(), json!("fixed")),
1416 ("name".into(), json!(chosen_name)),
1417 ("size".into(), json!(12)),
1418 ("logicalType".into(), json!("duration")),
1419 ]);
1420 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1421 obj.insert("namespace".into(), json!(ns));
1422 }
1423 json!(obj)
1424 }
1425 DataType::Interval(IntervalUnit::YearMonth) => {
1426 if !strip {
1427 extras.insert(
1428 "arrowIntervalUnit".into(),
1429 Value::String("yearmonth".into()),
1430 );
1431 }
1432 Value::String("long".into())
1433 }
1434 DataType::Interval(IntervalUnit::DayTime) => {
1435 if !strip {
1436 extras.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1437 }
1438 Value::String("long".into())
1439 }
1440 DataType::List(child) | DataType::LargeList(child) => {
1441 if matches!(dt, DataType::LargeList(_)) && !strip {
1442 extras.insert("arrowLargeList".into(), Value::Bool(true));
1443 }
1444 let items_schema = process_datatype(
1445 child.data_type(),
1446 child.name(),
1447 child.metadata(),
1448 name_gen,
1449 null_order,
1450 child.is_nullable(),
1451 strip,
1452 )?;
1453 json!({
1454 "type": "array",
1455 "items": items_schema
1456 })
1457 }
1458 DataType::ListView(child) | DataType::LargeListView(child) => {
1459 if matches!(dt, DataType::LargeListView(_)) && !strip {
1460 extras.insert("arrowLargeList".into(), Value::Bool(true));
1461 }
1462 if !strip {
1463 extras.insert("arrowListView".into(), Value::Bool(true));
1464 }
1465 let items_schema = process_datatype(
1466 child.data_type(),
1467 child.name(),
1468 child.metadata(),
1469 name_gen,
1470 null_order,
1471 child.is_nullable(),
1472 strip,
1473 )?;
1474 json!({
1475 "type": "array",
1476 "items": items_schema
1477 })
1478 }
1479 DataType::FixedSizeList(child, len) => {
1480 if !strip {
1481 extras.insert("arrowFixedSize".into(), json!(len));
1482 }
1483 let items_schema = process_datatype(
1484 child.data_type(),
1485 child.name(),
1486 child.metadata(),
1487 name_gen,
1488 null_order,
1489 child.is_nullable(),
1490 strip,
1491 )?;
1492 json!({
1493 "type": "array",
1494 "items": items_schema
1495 })
1496 }
1497 DataType::Map(entries, _) => {
1498 let value_field = match entries.data_type() {
1499 DataType::Struct(fs) => &fs[1],
1500 _ => {
1501 return Err(ArrowError::SchemaError(
1502 "Map 'entries' field must be Struct(key,value)".into(),
1503 ));
1504 }
1505 };
1506 let values_schema = process_datatype(
1507 value_field.data_type(),
1508 value_field.name(),
1509 value_field.metadata(),
1510 name_gen,
1511 null_order,
1512 value_field.is_nullable(),
1513 strip,
1514 )?;
1515 json!({
1516 "type": "map",
1517 "values": values_schema
1518 })
1519 }
1520 DataType::Struct(fields) => {
1521 let avro_fields = fields
1522 .iter()
1523 .map(|field| arrow_field_to_avro(field, name_gen, null_order, strip))
1524 .collect::<Result<Vec<_>, _>>()?;
1525 let chosen_name = metadata
1527 .get(AVRO_NAME_METADATA_KEY)
1528 .map(|s| sanitise_avro_name(s))
1529 .unwrap_or_else(|| name_gen.make_unique(field_name));
1530 let mut obj = JsonMap::from_iter([
1531 ("type".into(), json!("record")),
1532 ("name".into(), json!(chosen_name)),
1533 ("fields".into(), Value::Array(avro_fields)),
1534 ]);
1535 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1536 obj.insert("namespace".into(), json!(ns));
1537 }
1538 Value::Object(obj)
1539 }
1540 DataType::Dictionary(_, value) => {
1541 if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1542 let symbols: Vec<&str> =
1543 serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1544 let chosen_name = metadata
1546 .get(AVRO_NAME_METADATA_KEY)
1547 .map(|s| sanitise_avro_name(s))
1548 .unwrap_or_else(|| name_gen.make_unique(field_name));
1549 let mut obj = JsonMap::from_iter([
1550 ("type".into(), json!("enum")),
1551 ("name".into(), json!(chosen_name)),
1552 ("symbols".into(), json!(symbols)),
1553 ]);
1554 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1555 obj.insert("namespace".into(), json!(ns));
1556 }
1557 Value::Object(obj)
1558 } else {
1559 process_datatype(
1560 value.as_ref(),
1561 field_name,
1562 metadata,
1563 name_gen,
1564 null_order,
1565 false,
1566 strip,
1567 )?
1568 }
1569 }
1570 #[cfg(feature = "avro_custom_types")]
1571 DataType::RunEndEncoded(run_ends, values) => {
1572 let bits = match run_ends.data_type() {
1573 DataType::Int16 => 16,
1574 DataType::Int32 => 32,
1575 DataType::Int64 => 64,
1576 other => {
1577 return Err(ArrowError::SchemaError(format!(
1578 "RunEndEncoded requires Int16/Int32/Int64 for run_ends, found: {other:?}"
1579 )));
1580 }
1581 };
1582 let (value_schema, value_extras) = datatype_to_avro(
1584 values.data_type(),
1585 values.name(),
1586 values.metadata(),
1587 name_gen,
1588 null_order,
1589 strip,
1590 )?;
1591 let mut merged = merge_extras(value_schema, value_extras);
1592 if values.is_nullable() {
1593 merged = wrap_nullable(merged, null_order);
1594 }
1595 let mut extras = JsonMap::new();
1596 extras.insert("logicalType".into(), json!("arrow.run-end-encoded"));
1597 extras.insert("arrow.runEndIndexBits".into(), json!(bits));
1598 return Ok((merged, extras));
1599 }
1600 #[cfg(not(feature = "avro_custom_types"))]
1601 DataType::RunEndEncoded(_run_ends, values) => {
1602 let (value_schema, _extras) = datatype_to_avro(
1603 values.data_type(),
1604 values.name(),
1605 values.metadata(),
1606 name_gen,
1607 null_order,
1608 strip,
1609 )?;
1610 return Ok((value_schema, JsonMap::new()));
1611 }
1612 DataType::Union(fields, mode) => {
1613 let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1614 let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1615 for (type_id, field_ref) in fields.iter() {
1616 let (branch_schema, _branch_extras) = datatype_to_avro(
1618 field_ref.data_type(),
1619 field_ref.name(),
1620 field_ref.metadata(),
1621 name_gen,
1622 null_order,
1623 strip,
1624 )?;
1625 if matches!(branch_schema, Value::Array(_)) {
1627 return Err(ArrowError::SchemaError(
1628 "Avro union may not immediately contain another union".into(),
1629 ));
1630 }
1631 branches.push(branch_schema);
1632 type_ids.push(type_id as i32);
1633 }
1634 let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1635 for b in &branches {
1636 let sig = union_branch_signature(b)?;
1637 if !seen.insert(sig) {
1638 return Err(ArrowError::SchemaError(
1639 "Avro union contains duplicate branch types (disallowed by spec)".into(),
1640 ));
1641 }
1642 }
1643 if !strip {
1644 extras.insert(
1645 "arrowUnionMode".into(),
1646 Value::String(
1647 match mode {
1648 UnionMode::Sparse => "sparse",
1649 UnionMode::Dense => "dense",
1650 }
1651 .to_string(),
1652 ),
1653 );
1654 extras.insert(
1655 "arrowUnionTypeIds".into(),
1656 Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1657 );
1658 }
1659 Value::Array(branches)
1660 }
1661 #[cfg(not(feature = "small_decimals"))]
1662 other => {
1663 return Err(ArrowError::NotYetImplemented(format!(
1664 "Arrow type {other:?} has no Avro representation"
1665 )));
1666 }
1667 };
1668 Ok((val, extras))
1669}
1670
1671fn process_datatype(
1672 dt: &DataType,
1673 field_name: &str,
1674 metadata: &HashMap<String, String>,
1675 name_gen: &mut NameGenerator,
1676 null_order: Nullability,
1677 is_nullable: bool,
1678 strip: bool,
1679) -> Result<Value, ArrowError> {
1680 let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order, strip)?;
1681 let mut merged = merge_extras(schema, extras);
1682 if is_nullable {
1683 merged = wrap_nullable(merged, null_order)
1684 }
1685 Ok(merged)
1686}
1687
1688fn arrow_field_to_avro(
1689 field: &ArrowField,
1690 name_gen: &mut NameGenerator,
1691 null_order: Nullability,
1692 strip: bool,
1693) -> Result<Value, ArrowError> {
1694 let avro_name = sanitise_avro_name(field.name());
1695 let schema_value = process_datatype(
1696 field.data_type(),
1697 &avro_name,
1698 field.metadata(),
1699 name_gen,
1700 null_order,
1701 field.is_nullable(),
1702 strip,
1703 )?;
1704 let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1706 map.insert("name".into(), Value::String(avro_name));
1707 map.insert("type".into(), schema_value);
1708 for (meta_key, meta_val) in field.metadata() {
1710 if is_internal_arrow_key(meta_key) {
1711 continue;
1712 }
1713 match meta_key.as_str() {
1714 AVRO_DOC_METADATA_KEY => {
1715 map.insert("doc".into(), Value::String(meta_val.clone()));
1716 }
1717 AVRO_FIELD_DEFAULT_METADATA_KEY => {
1718 let default_value = serde_json::from_str(meta_val)
1719 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1720 map.insert("default".into(), default_value);
1721 }
1722 _ => {
1723 let json_val = serde_json::from_str(meta_val)
1724 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1725 map.insert(meta_key.clone(), json_val);
1726 }
1727 }
1728 }
1729 Ok(Value::Object(map))
1730}
1731
1732#[cfg(test)]
1733mod tests {
1734 use super::*;
1735 use crate::codec::{AvroField, AvroFieldBuilder};
1736 use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1737 use serde_json::json;
1738 use std::sync::Arc;
1739
1740 fn int_schema() -> Schema<'static> {
1741 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1742 }
1743
1744 fn record_schema() -> Schema<'static> {
1745 Schema::Complex(ComplexType::Record(Record {
1746 name: "record1",
1747 namespace: Some("test.namespace"),
1748 doc: Some(Cow::from("A test record")),
1749 aliases: vec![],
1750 fields: vec![
1751 Field {
1752 name: "field1",
1753 doc: Some(Cow::from("An integer field")),
1754 r#type: int_schema(),
1755 default: None,
1756 aliases: vec![],
1757 },
1758 Field {
1759 name: "field2",
1760 doc: None,
1761 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1762 default: None,
1763 aliases: vec![],
1764 },
1765 ],
1766 attributes: Attributes::default(),
1767 }))
1768 }
1769
1770 fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
1771 let mut sb = SchemaBuilder::new();
1772 sb.push(field);
1773 sb.finish()
1774 }
1775
1776 fn assert_json_contains(avro_json: &str, needle: &str) {
1777 assert!(
1778 avro_json.contains(needle),
1779 "JSON did not contain `{needle}` : {avro_json}"
1780 )
1781 }
1782
1783 #[test]
1784 fn test_deserialize() {
1785 let t: Schema = serde_json::from_str("\"string\"").unwrap();
1786 assert_eq!(
1787 t,
1788 Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
1789 );
1790
1791 let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
1792 assert_eq!(
1793 t,
1794 Schema::Union(vec![
1795 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1796 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1797 ])
1798 );
1799
1800 let t: Type = serde_json::from_str(
1801 r#"{
1802 "type":"long",
1803 "logicalType":"timestamp-micros"
1804 }"#,
1805 )
1806 .unwrap();
1807
1808 let timestamp = Type {
1809 r#type: TypeName::Primitive(PrimitiveType::Long),
1810 attributes: Attributes {
1811 logical_type: Some("timestamp-micros"),
1812 additional: Default::default(),
1813 },
1814 };
1815
1816 assert_eq!(t, timestamp);
1817
1818 let t: ComplexType = serde_json::from_str(
1819 r#"{
1820 "type":"fixed",
1821 "name":"fixed",
1822 "namespace":"topLevelRecord.value",
1823 "size":11,
1824 "logicalType":"decimal",
1825 "precision":25,
1826 "scale":2
1827 }"#,
1828 )
1829 .unwrap();
1830
1831 let decimal = ComplexType::Fixed(Fixed {
1832 name: "fixed",
1833 namespace: Some("topLevelRecord.value"),
1834 aliases: vec![],
1835 size: 11,
1836 attributes: Attributes {
1837 logical_type: Some("decimal"),
1838 additional: vec![("precision", json!(25)), ("scale", json!(2))]
1839 .into_iter()
1840 .collect(),
1841 },
1842 });
1843
1844 assert_eq!(t, decimal);
1845
1846 let schema: Schema = serde_json::from_str(
1847 r#"{
1848 "type":"record",
1849 "name":"topLevelRecord",
1850 "fields":[
1851 {
1852 "name":"value",
1853 "type":[
1854 {
1855 "type":"fixed",
1856 "name":"fixed",
1857 "namespace":"topLevelRecord.value",
1858 "size":11,
1859 "logicalType":"decimal",
1860 "precision":25,
1861 "scale":2
1862 },
1863 "null"
1864 ]
1865 }
1866 ]
1867 }"#,
1868 )
1869 .unwrap();
1870
1871 assert_eq!(
1872 schema,
1873 Schema::Complex(ComplexType::Record(Record {
1874 name: "topLevelRecord",
1875 namespace: None,
1876 doc: None,
1877 aliases: vec![],
1878 fields: vec![Field {
1879 name: "value",
1880 doc: None,
1881 r#type: Schema::Union(vec![
1882 Schema::Complex(decimal),
1883 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1884 ]),
1885 default: None,
1886 aliases: vec![],
1887 },],
1888 attributes: Default::default(),
1889 }))
1890 );
1891
1892 let schema: Schema = serde_json::from_str(
1893 r#"{
1894 "type": "record",
1895 "name": "LongList",
1896 "aliases": ["LinkedLongs"],
1897 "fields" : [
1898 {"name": "value", "type": "long"},
1899 {"name": "next", "type": ["null", "LongList"]}
1900 ]
1901 }"#,
1902 )
1903 .unwrap();
1904
1905 assert_eq!(
1906 schema,
1907 Schema::Complex(ComplexType::Record(Record {
1908 name: "LongList",
1909 namespace: None,
1910 doc: None,
1911 aliases: vec!["LinkedLongs"],
1912 fields: vec![
1913 Field {
1914 name: "value",
1915 doc: None,
1916 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
1917 default: None,
1918 aliases: vec![],
1919 },
1920 Field {
1921 name: "next",
1922 doc: None,
1923 r#type: Schema::Union(vec![
1924 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1925 Schema::TypeName(TypeName::Ref("LongList")),
1926 ]),
1927 default: None,
1928 aliases: vec![],
1929 }
1930 ],
1931 attributes: Attributes::default(),
1932 }))
1933 );
1934
1935 let err = AvroField::try_from(&schema).unwrap_err().to_string();
1937 assert_eq!(err, "Parser error: Failed to resolve .LongList");
1938
1939 let schema: Schema = serde_json::from_str(
1940 r#"{
1941 "type":"record",
1942 "name":"topLevelRecord",
1943 "fields":[
1944 {
1945 "name":"id",
1946 "type":[
1947 "int",
1948 "null"
1949 ]
1950 },
1951 {
1952 "name":"timestamp_col",
1953 "type":[
1954 {
1955 "type":"long",
1956 "logicalType":"timestamp-micros"
1957 },
1958 "null"
1959 ]
1960 }
1961 ]
1962 }"#,
1963 )
1964 .unwrap();
1965
1966 assert_eq!(
1967 schema,
1968 Schema::Complex(ComplexType::Record(Record {
1969 name: "topLevelRecord",
1970 namespace: None,
1971 doc: None,
1972 aliases: vec![],
1973 fields: vec![
1974 Field {
1975 name: "id",
1976 doc: None,
1977 r#type: Schema::Union(vec![
1978 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1979 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1980 ]),
1981 default: None,
1982 aliases: vec![],
1983 },
1984 Field {
1985 name: "timestamp_col",
1986 doc: None,
1987 r#type: Schema::Union(vec![
1988 Schema::Type(timestamp),
1989 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1990 ]),
1991 default: None,
1992 aliases: vec![],
1993 }
1994 ],
1995 attributes: Default::default(),
1996 }))
1997 );
1998 let codec = AvroField::try_from(&schema).unwrap();
1999 let expected_arrow_field = arrow_schema::Field::new(
2000 "topLevelRecord",
2001 DataType::Struct(Fields::from(vec![
2002 arrow_schema::Field::new("id", DataType::Int32, true),
2003 arrow_schema::Field::new(
2004 "timestamp_col",
2005 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2006 true,
2007 ),
2008 ])),
2009 false,
2010 )
2011 .with_metadata(std::collections::HashMap::from([(
2012 AVRO_NAME_METADATA_KEY.to_string(),
2013 "topLevelRecord".to_string(),
2014 )]));
2015
2016 assert_eq!(codec.field(), expected_arrow_field);
2017
2018 let schema: Schema = serde_json::from_str(
2019 r#"{
2020 "type": "record",
2021 "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
2022 "fields": [
2023 {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
2024 {"name": "clientProtocol", "type": ["null", "string"]},
2025 {"name": "serverHash", "type": "MD5"},
2026 {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
2027 ]
2028 }"#,
2029 )
2030 .unwrap();
2031
2032 assert_eq!(
2033 schema,
2034 Schema::Complex(ComplexType::Record(Record {
2035 name: "HandshakeRequest",
2036 namespace: Some("org.apache.avro.ipc"),
2037 doc: None,
2038 aliases: vec![],
2039 fields: vec![
2040 Field {
2041 name: "clientHash",
2042 doc: None,
2043 r#type: Schema::Complex(ComplexType::Fixed(Fixed {
2044 name: "MD5",
2045 namespace: None,
2046 aliases: vec![],
2047 size: 16,
2048 attributes: Default::default(),
2049 })),
2050 default: None,
2051 aliases: vec![],
2052 },
2053 Field {
2054 name: "clientProtocol",
2055 doc: None,
2056 r#type: Schema::Union(vec![
2057 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2058 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2059 ]),
2060 default: None,
2061 aliases: vec![],
2062 },
2063 Field {
2064 name: "serverHash",
2065 doc: None,
2066 r#type: Schema::TypeName(TypeName::Ref("MD5")),
2067 default: None,
2068 aliases: vec![],
2069 },
2070 Field {
2071 name: "meta",
2072 doc: None,
2073 r#type: Schema::Union(vec![
2074 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2075 Schema::Complex(ComplexType::Map(Map {
2076 values: Box::new(Schema::TypeName(TypeName::Primitive(
2077 PrimitiveType::Bytes
2078 ))),
2079 attributes: Default::default(),
2080 })),
2081 ]),
2082 default: None,
2083 aliases: vec![],
2084 }
2085 ],
2086 attributes: Default::default(),
2087 }))
2088 );
2089 }
2090
2091 #[test]
2092 fn test_canonical_form_generation_comprehensive_record() {
2093 let json_str = r#"{
2095 "type": "record",
2096 "name": "E2eComprehensive",
2097 "namespace": "org.apache.arrow.avrotests.v1",
2098 "doc": "Comprehensive Avro writer schema to exercise arrow-avro Reader/Decoder paths.",
2099 "fields": [
2100 {"name": "id", "type": "long", "doc": "Primary row id", "aliases": ["identifier"]},
2101 {"name": "flag", "type": "boolean", "default": true, "doc": "A sample boolean with default true"},
2102 {"name": "ratio_f32", "type": "float", "default": 0.0, "doc": "Float32 example"},
2103 {"name": "ratio_f64", "type": "double", "default": 0.0, "doc": "Float64 example"},
2104 {"name": "count_i32", "type": "int", "default": 0, "doc": "Int32 example"},
2105 {"name": "count_i64", "type": "long", "default": 0, "doc": "Int64 example"},
2106 {"name": "opt_i32_nullfirst", "type": ["null", "int"], "default": null, "doc": "Nullable int (null-first)"},
2107 {"name": "opt_str_nullsecond", "type": ["string", "null"], "default": "", "aliases": ["old_opt_str"], "doc": "Nullable string (null-second). Default is empty string."},
2108 {"name": "tri_union_prim", "type": ["int", "string", "boolean"], "default": 0, "doc": "Union[int, string, boolean] with default on first branch (int=0)."},
2109 {"name": "str_utf8", "type": "string", "default": "default", "doc": "Plain Utf8 string (Reader may use Utf8View)."},
2110 {"name": "raw_bytes", "type": "bytes", "default": "", "doc": "Raw bytes field"},
2111 {"name": "fx16_plain", "type": {"type": "fixed", "name": "Fx16", "namespace": "org.apache.arrow.avrotests.v1.types", "aliases": ["Fixed16Old"], "size": 16}, "doc": "Plain fixed(16)"},
2112 {"name": "dec_bytes_s10_2", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}, "doc": "Decimal encoded on bytes, precision 10, scale 2"},
2113 {"name": "dec_fix_s20_4", "type": {"type": "fixed", "name": "DecFix20", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 20, "logicalType": "decimal", "precision": 20, "scale": 4}, "doc": "Decimal encoded on fixed(20), precision 20, scale 4"},
2114 {"name": "uuid_str", "type": {"type": "string", "logicalType": "uuid"}, "doc": "UUID logical type on string"},
2115 {"name": "d_date", "type": {"type": "int", "logicalType": "date"}, "doc": "Date32: days since 1970-01-01"},
2116 {"name": "t_millis", "type": {"type": "int", "logicalType": "time-millis"}, "doc": "Time32-millis"},
2117 {"name": "t_micros", "type": {"type": "long", "logicalType": "time-micros"}, "doc": "Time64-micros"},
2118 {"name": "ts_millis_utc", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Timestamp ms (UTC)"},
2119 {"name": "ts_micros_utc", "type": {"type": "long", "logicalType": "timestamp-micros"}, "doc": "Timestamp µs (UTC)"},
2120 {"name": "ts_millis_local", "type": {"type": "long", "logicalType": "local-timestamp-millis"}, "doc": "Local timestamp ms"},
2121 {"name": "ts_micros_local", "type": {"type": "long", "logicalType": "local-timestamp-micros"}, "doc": "Local timestamp µs"},
2122 {"name": "interval_mdn", "type": {"type": "fixed", "name": "Dur12", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 12, "logicalType": "duration"}, "doc": "Duration: fixed(12) little-endian (months, days, millis)"},
2123 {"name": "status", "type": {"type": "enum", "name": "Status", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["UNKNOWN", "NEW", "PROCESSING", "DONE"], "aliases": ["State"], "doc": "Processing status enum with default"}, "default": "UNKNOWN", "doc": "Enum field using default when resolving"},
2124 {"name": "arr_union", "type": {"type": "array", "items": ["long", "string", "null"]}, "default": [], "doc": "Array whose items are a union[long,string,null]"},
2125 {"name": "map_union", "type": {"type": "map", "values": ["null", "double", "string"]}, "default": {}, "doc": "Map whose values are a union[null,double,string]"},
2126 {"name": "address", "type": {"type": "record", "name": "Address", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Postal address with defaults and field alias", "fields": [
2127 {"name": "street", "type": "string", "default": "", "aliases": ["street_name"], "doc": "Street (field alias = street_name)"},
2128 {"name": "zip", "type": "int", "default": 0, "doc": "ZIP/postal code"},
2129 {"name": "country", "type": "string", "default": "US", "doc": "Country code"}
2130 ]}, "doc": "Embedded Address record"},
2131 {"name": "maybe_auth", "type": {"type": "record", "name": "MaybeAuth", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Optional auth token model", "fields": [
2132 {"name": "user", "type": "string", "doc": "Username"},
2133 {"name": "token", "type": ["null", "bytes"], "default": null, "doc": "Nullable auth token"}
2134 ]}},
2135 {"name": "union_enum_record_array_map", "type": [
2136 {"type": "enum", "name": "Color", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["RED", "GREEN", "BLUE"], "doc": "Color enum"},
2137 {"type": "record", "name": "RecA", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]},
2138 {"type": "record", "name": "RecB", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "x", "type": "long"}, {"name": "y", "type": "bytes"}]},
2139 {"type": "array", "items": "long"},
2140 {"type": "map", "values": "string"}
2141 ], "doc": "Union of enum, two records, array, and map"},
2142 {"name": "union_date_or_fixed4", "type": [
2143 {"type": "int", "logicalType": "date"},
2144 {"type": "fixed", "name": "Fx4", "size": 4}
2145 ], "doc": "Union of date(int) or fixed(4)"},
2146 {"name": "union_interval_or_string", "type": [
2147 {"type": "fixed", "name": "Dur12U", "size": 12, "logicalType": "duration"},
2148 "string"
2149 ], "doc": "Union of duration(fixed12) or string"},
2150 {"name": "union_uuid_or_fixed10", "type": [
2151 {"type": "string", "logicalType": "uuid"},
2152 {"type": "fixed", "name": "Fx10", "size": 10}
2153 ], "doc": "Union of UUID string or fixed(10)"},
2154 {"name": "array_records_with_union", "type": {"type": "array", "items": {
2155 "type": "record", "name": "KV", "namespace": "org.apache.arrow.avrotests.v1.types",
2156 "fields": [
2157 {"name": "key", "type": "string"},
2158 {"name": "val", "type": ["null", "int", "long"], "default": null}
2159 ]
2160 }}, "doc": "Array<record{key, val: union[null,int,long]}>", "default": []},
2161 {"name": "union_map_or_array_int", "type": [
2162 {"type": "map", "values": "int"},
2163 {"type": "array", "items": "int"}
2164 ], "doc": "Union[map<string,int>, array<int>]"},
2165 {"name": "renamed_with_default", "type": "int", "default": 42, "aliases": ["old_count"], "doc": "Field with alias and default"},
2166 {"name": "person", "type": {"type": "record", "name": "PersonV2", "namespace": "com.example.v2", "aliases": ["com.example.Person"], "doc": "Person record with alias pointing to previous namespace/name", "fields": [
2167 {"name": "name", "type": "string"},
2168 {"name": "age", "type": "int", "default": 0}
2169 ]}, "doc": "Record using type alias for schema evolution tests"}
2170 ]
2171 }"#;
2172 let avro = AvroSchema::new(json_str.to_string());
2173 let parsed = avro.schema().expect("schema should deserialize");
2174 let expected_canonical_form = r#"{"name":"org.apache.arrow.avrotests.v1.E2eComprehensive","type":"record","fields":[{"name":"id","type":"long"},{"name":"flag","type":"boolean"},{"name":"ratio_f32","type":"float"},{"name":"ratio_f64","type":"double"},{"name":"count_i32","type":"int"},{"name":"count_i64","type":"long"},{"name":"opt_i32_nullfirst","type":["null","int"]},{"name":"opt_str_nullsecond","type":["string","null"]},{"name":"tri_union_prim","type":["int","string","boolean"]},{"name":"str_utf8","type":"string"},{"name":"raw_bytes","type":"bytes"},{"name":"fx16_plain","type":{"name":"org.apache.arrow.avrotests.v1.types.Fx16","type":"fixed","size":16}},{"name":"dec_bytes_s10_2","type":"bytes"},{"name":"dec_fix_s20_4","type":{"name":"org.apache.arrow.avrotests.v1.types.DecFix20","type":"fixed","size":20}},{"name":"uuid_str","type":"string"},{"name":"d_date","type":"int"},{"name":"t_millis","type":"int"},{"name":"t_micros","type":"long"},{"name":"ts_millis_utc","type":"long"},{"name":"ts_micros_utc","type":"long"},{"name":"ts_millis_local","type":"long"},{"name":"ts_micros_local","type":"long"},{"name":"interval_mdn","type":{"name":"org.apache.arrow.avrotests.v1.types.Dur12","type":"fixed","size":12}},{"name":"status","type":{"name":"org.apache.arrow.avrotests.v1.types.Status","type":"enum","symbols":["UNKNOWN","NEW","PROCESSING","DONE"]}},{"name":"arr_union","type":{"type":"array","items":["long","string","null"]}},{"name":"map_union","type":{"type":"map","values":["null","double","string"]}},{"name":"address","type":{"name":"org.apache.arrow.avrotests.v1.types.Address","type":"record","fields":[{"name":"street","type":"string"},{"name":"zip","type":"int"},{"name":"country","type":"string"}]}},{"name":"maybe_auth","type":{"name":"org.apache.arrow.avrotests.v1.types.MaybeAuth","type":"record","fields":[{"name":"user","type":"string"},{"name":"token","type":["null","bytes"]}]}},{"name":"union_enum_record_array_map","type":[{"name":"org.apache.arrow.avrotests.v1.types.Color","type":"enum","symbols":["RED","GREEN","BLUE"]},{"name":"org.apache.arrow.avrotests.v1.types.RecA","type":"record","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]},{"name":"org.apache.arrow.avrotests.v1.types.RecB","type":"record","fields":[{"name":"x","type":"long"},{"name":"y","type":"bytes"}]},{"type":"array","items":"long"},{"type":"map","values":"string"}]},{"name":"union_date_or_fixed4","type":["int",{"name":"org.apache.arrow.avrotests.v1.Fx4","type":"fixed","size":4}]},{"name":"union_interval_or_string","type":[{"name":"org.apache.arrow.avrotests.v1.Dur12U","type":"fixed","size":12},"string"]},{"name":"union_uuid_or_fixed10","type":["string",{"name":"org.apache.arrow.avrotests.v1.Fx10","type":"fixed","size":10}]},{"name":"array_records_with_union","type":{"type":"array","items":{"name":"org.apache.arrow.avrotests.v1.types.KV","type":"record","fields":[{"name":"key","type":"string"},{"name":"val","type":["null","int","long"]}]}}},{"name":"union_map_or_array_int","type":[{"type":"map","values":"int"},{"type":"array","items":"int"}]},{"name":"renamed_with_default","type":"int"},{"name":"person","type":{"name":"com.example.v2.PersonV2","type":"record","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}}]}"#;
2175 let canonical_form =
2176 AvroSchema::generate_canonical_form(&parsed).expect("canonical form should be built");
2177 assert_eq!(
2178 canonical_form, expected_canonical_form,
2179 "Canonical form must match Avro spec PCF exactly"
2180 );
2181 }
2182
2183 #[test]
2184 fn test_new_schema_store() {
2185 let store = SchemaStore::new();
2186 assert!(store.schemas.is_empty());
2187 }
2188
2189 #[test]
2190 fn test_try_from_schemas_rabin() {
2191 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2192 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2193 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2194 schemas.insert(
2195 int_avro_schema
2196 .fingerprint(FingerprintAlgorithm::Rabin)
2197 .unwrap(),
2198 int_avro_schema.clone(),
2199 );
2200 schemas.insert(
2201 record_avro_schema
2202 .fingerprint(FingerprintAlgorithm::Rabin)
2203 .unwrap(),
2204 record_avro_schema.clone(),
2205 );
2206 let store = SchemaStore::try_from(schemas).unwrap();
2207 let int_fp = int_avro_schema
2208 .fingerprint(FingerprintAlgorithm::Rabin)
2209 .unwrap();
2210 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2211 let rec_fp = record_avro_schema
2212 .fingerprint(FingerprintAlgorithm::Rabin)
2213 .unwrap();
2214 assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
2215 }
2216
2217 #[test]
2218 fn test_try_from_with_duplicates() {
2219 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2220 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2221 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2222 schemas.insert(
2223 int_avro_schema
2224 .fingerprint(FingerprintAlgorithm::Rabin)
2225 .unwrap(),
2226 int_avro_schema.clone(),
2227 );
2228 schemas.insert(
2229 record_avro_schema
2230 .fingerprint(FingerprintAlgorithm::Rabin)
2231 .unwrap(),
2232 record_avro_schema.clone(),
2233 );
2234 schemas.insert(
2236 int_avro_schema
2237 .fingerprint(FingerprintAlgorithm::Rabin)
2238 .unwrap(),
2239 int_avro_schema.clone(),
2240 );
2241 let store = SchemaStore::try_from(schemas).unwrap();
2242 assert_eq!(store.schemas.len(), 2);
2243 let int_fp = int_avro_schema
2244 .fingerprint(FingerprintAlgorithm::Rabin)
2245 .unwrap();
2246 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2247 }
2248
2249 #[test]
2250 fn test_register_and_lookup_rabin() {
2251 let mut store = SchemaStore::new();
2252 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2253 let fp_enum = store.register(schema.clone()).unwrap();
2254 match fp_enum {
2255 Fingerprint::Rabin(fp_val) => {
2256 assert_eq!(
2257 store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
2258 Some(schema.clone())
2259 );
2260 assert!(
2261 store
2262 .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
2263 .is_none()
2264 );
2265 }
2266 Fingerprint::Id(_id) => {
2267 unreachable!("This test should only generate Rabin fingerprints")
2268 }
2269 Fingerprint::Id64(_id) => {
2270 unreachable!("This test should only generate Rabin fingerprints")
2271 }
2272 #[cfg(feature = "md5")]
2273 Fingerprint::MD5(_id) => {
2274 unreachable!("This test should only generate Rabin fingerprints")
2275 }
2276 #[cfg(feature = "sha256")]
2277 Fingerprint::SHA256(_id) => {
2278 unreachable!("This test should only generate Rabin fingerprints")
2279 }
2280 }
2281 }
2282
2283 #[test]
2284 fn test_set_and_lookup_id() {
2285 let mut store = SchemaStore::new();
2286 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2287 let id = 42u32;
2288 let fp = Fingerprint::Id(id);
2289 let out_fp = store.set(fp, schema.clone()).unwrap();
2290 assert_eq!(out_fp, fp);
2291 assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2292 assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2293 }
2294
2295 #[test]
2296 fn test_set_and_lookup_id64() {
2297 let mut store = SchemaStore::new();
2298 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2299 let id64: u64 = 0xDEAD_BEEF_DEAD_BEEF;
2300 let fp = Fingerprint::Id64(id64);
2301 let out_fp = store.set(fp, schema.clone()).unwrap();
2302 assert_eq!(out_fp, fp, "set should return the same Id64 fingerprint");
2303 assert_eq!(
2304 store.lookup(&fp).cloned(),
2305 Some(schema.clone()),
2306 "lookup should find the schema by Id64"
2307 );
2308 assert!(
2309 store
2310 .lookup(&Fingerprint::Id64(id64.wrapping_add(1)))
2311 .is_none(),
2312 "lookup with a different Id64 must return None"
2313 );
2314 }
2315
2316 #[test]
2317 fn test_fingerprint_id64_conversions() {
2318 let algo_from_fp = FingerprintAlgorithm::from(&Fingerprint::Id64(123));
2319 assert_eq!(algo_from_fp, FingerprintAlgorithm::Id64);
2320 let fp_from_algo = Fingerprint::from(FingerprintAlgorithm::Id64);
2321 assert!(matches!(fp_from_algo, Fingerprint::Id64(0)));
2322 let strategy_from_fp = FingerprintStrategy::from(Fingerprint::Id64(5));
2323 assert!(matches!(strategy_from_fp, FingerprintStrategy::Id64(0)));
2324 let algo_from_strategy = FingerprintAlgorithm::from(strategy_from_fp);
2325 assert_eq!(algo_from_strategy, FingerprintAlgorithm::Id64);
2326 }
2327
2328 #[test]
2329 fn test_register_duplicate_schema() {
2330 let mut store = SchemaStore::new();
2331 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2332 let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2333 let fingerprint1 = store.register(schema1).unwrap();
2334 let fingerprint2 = store.register(schema2).unwrap();
2335 assert_eq!(fingerprint1, fingerprint2);
2336 assert_eq!(store.schemas.len(), 1);
2337 }
2338
2339 #[test]
2340 fn test_set_and_lookup_with_provided_fingerprint() {
2341 let mut store = SchemaStore::new();
2342 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2343 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2344 let out_fp = store.set(fp, schema.clone()).unwrap();
2345 assert_eq!(out_fp, fp);
2346 assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2347 }
2348
2349 #[test]
2350 fn test_set_duplicate_same_schema_ok() {
2351 let mut store = SchemaStore::new();
2352 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2353 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2354 let _ = store.set(fp, schema.clone()).unwrap();
2355 let _ = store.set(fp, schema.clone()).unwrap();
2356 assert_eq!(store.schemas.len(), 1);
2357 }
2358
2359 #[test]
2360 fn test_set_duplicate_different_schema_collision_error() {
2361 let mut store = SchemaStore::new();
2362 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2363 let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2364 let fp = Fingerprint::Id(123);
2366 let _ = store.set(fp, schema1).unwrap();
2367 let err = store.set(fp, schema2).unwrap_err();
2368 let msg = format!("{err}");
2369 assert!(msg.contains("Schema fingerprint collision"));
2370 }
2371
2372 #[test]
2373 fn test_canonical_form_generation_primitive() {
2374 let schema = int_schema();
2375 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2376 assert_eq!(canonical_form, r#""int""#);
2377 }
2378
2379 #[test]
2380 fn test_canonical_form_generation_record() {
2381 let schema = record_schema();
2382 let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2383 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2384 assert_eq!(canonical_form, expected_canonical_form);
2385 }
2386
2387 #[test]
2388 fn test_fingerprint_calculation() {
2389 let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2390 let expected_fingerprint = 10505236152925314060;
2391 let fingerprint = compute_fingerprint_rabin(canonical_form);
2392 assert_eq!(fingerprint, expected_fingerprint);
2393 }
2394
2395 #[test]
2396 fn test_register_and_lookup_complex_schema() {
2397 let mut store = SchemaStore::new();
2398 let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2399 let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2400 let expected_fingerprint = Fingerprint::Rabin(compute_fingerprint_rabin(canonical_form));
2401 let fingerprint = store.register(schema.clone()).unwrap();
2402 assert_eq!(fingerprint, expected_fingerprint);
2403 let looked_up = store.lookup(&fingerprint).cloned();
2404 assert_eq!(looked_up, Some(schema));
2405 }
2406
2407 #[test]
2408 fn test_fingerprints_returns_all_keys() {
2409 let mut store = SchemaStore::new();
2410 let fp_int = store
2411 .register(AvroSchema::new(
2412 serde_json::to_string(&int_schema()).unwrap(),
2413 ))
2414 .unwrap();
2415 let fp_record = store
2416 .register(AvroSchema::new(
2417 serde_json::to_string(&record_schema()).unwrap(),
2418 ))
2419 .unwrap();
2420 let fps = store.fingerprints();
2421 assert_eq!(fps.len(), 2);
2422 assert!(fps.contains(&fp_int));
2423 assert!(fps.contains(&fp_record));
2424 }
2425
2426 #[test]
2427 fn test_canonical_form_strips_attributes() {
2428 let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2429 name: "record_with_attrs",
2430 namespace: None,
2431 doc: Some(Cow::from("This doc should be stripped")),
2432 aliases: vec!["alias1", "alias2"],
2433 fields: vec![Field {
2434 name: "f1",
2435 doc: Some(Cow::from("field doc")),
2436 r#type: Schema::Type(Type {
2437 r#type: TypeName::Primitive(PrimitiveType::Bytes),
2438 attributes: Attributes {
2439 logical_type: None,
2440 additional: HashMap::from([("precision", json!(4))]),
2441 },
2442 }),
2443 default: None,
2444 aliases: vec![],
2445 }],
2446 attributes: Attributes {
2447 logical_type: None,
2448 additional: HashMap::from([("custom_attr", json!("value"))]),
2449 },
2450 }));
2451 let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2452 let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2453 assert_eq!(canonical_form, expected_canonical_form);
2454 }
2455
2456 #[test]
2457 fn test_primitive_mappings() {
2458 let cases = vec![
2459 (DataType::Boolean, "\"boolean\""),
2460 (DataType::Int8, "\"int\""),
2461 (DataType::Int16, "\"int\""),
2462 (DataType::Int32, "\"int\""),
2463 (DataType::Int64, "\"long\""),
2464 (DataType::UInt8, "\"int\""),
2465 (DataType::UInt16, "\"int\""),
2466 (DataType::UInt32, "\"long\""),
2467 (DataType::UInt64, "\"long\""),
2468 (DataType::Float16, "\"float\""),
2469 (DataType::Float32, "\"float\""),
2470 (DataType::Float64, "\"double\""),
2471 (DataType::Utf8, "\"string\""),
2472 (DataType::Binary, "\"bytes\""),
2473 ];
2474 for (dt, avro_token) in cases {
2475 let field = ArrowField::new("col", dt.clone(), false);
2476 let arrow_schema = single_field_schema(field);
2477 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2478 assert_json_contains(&avro.json_string, avro_token);
2479 }
2480 }
2481
2482 #[test]
2483 fn test_temporal_mappings() {
2484 let cases = vec![
2485 (DataType::Date32, "\"logicalType\":\"date\""),
2486 (
2487 DataType::Time32(TimeUnit::Millisecond),
2488 "\"logicalType\":\"time-millis\"",
2489 ),
2490 (
2491 DataType::Time64(TimeUnit::Microsecond),
2492 "\"logicalType\":\"time-micros\"",
2493 ),
2494 (
2495 DataType::Timestamp(TimeUnit::Millisecond, None),
2496 "\"logicalType\":\"local-timestamp-millis\"",
2497 ),
2498 (
2499 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2500 "\"logicalType\":\"timestamp-micros\"",
2501 ),
2502 ];
2503 for (dt, needle) in cases {
2504 let field = ArrowField::new("ts", dt.clone(), true);
2505 let arrow_schema = single_field_schema(field);
2506 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2507 assert_json_contains(&avro.json_string, needle);
2508 }
2509 }
2510
2511 #[test]
2512 fn test_decimal_and_uuid() {
2513 let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2514 let dec_schema = single_field_schema(decimal_field);
2515 let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2516 assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2517 assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2518 assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2519 let mut md = HashMap::new();
2520 md.insert("logicalType".into(), "uuid".into());
2521 let uuid_field =
2522 ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2523 let uuid_schema = single_field_schema(uuid_field);
2524 let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2525 assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2526 }
2527
2528 #[cfg(feature = "avro_custom_types")]
2529 #[test]
2530 fn test_interval_duration() {
2531 let interval_field = ArrowField::new(
2532 "span",
2533 DataType::Interval(IntervalUnit::MonthDayNano),
2534 false,
2535 );
2536 let s = single_field_schema(interval_field);
2537 let avro = AvroSchema::try_from(&s).unwrap();
2538 assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2539 assert_json_contains(&avro.json_string, "\"size\":12");
2540 let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2541 let s2 = single_field_schema(dur_field);
2542 let avro2 = AvroSchema::try_from(&s2).unwrap();
2543 assert_json_contains(
2544 &avro2.json_string,
2545 "\"logicalType\":\"arrow.duration-nanos\"",
2546 );
2547 }
2548
2549 #[test]
2550 fn test_complex_types() {
2551 let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2552 let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2553 let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2554 assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2555 assert_json_contains(&avro_list.json_string, "\"items\"");
2556 let value_field = ArrowField::new("value", DataType::Boolean, true);
2557 let entries_struct = ArrowField::new(
2558 "entries",
2559 DataType::Struct(Fields::from(vec![
2560 ArrowField::new("key", DataType::Utf8, false),
2561 value_field.clone(),
2562 ])),
2563 false,
2564 );
2565 let map_dt = DataType::Map(Arc::new(entries_struct), false);
2566 let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
2567 let avro_map = AvroSchema::try_from(&map_schema).unwrap();
2568 assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
2569 assert_json_contains(&avro_map.json_string, "\"values\"");
2570 let struct_dt = DataType::Struct(Fields::from(vec![
2571 ArrowField::new("f1", DataType::Int64, false),
2572 ArrowField::new("f2", DataType::Utf8, true),
2573 ]));
2574 let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
2575 let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
2576 assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
2577 assert_json_contains(&avro_struct.json_string, "\"null\"");
2578 }
2579
2580 #[test]
2581 fn test_enum_dictionary() {
2582 let mut md = HashMap::new();
2583 md.insert(
2584 AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
2585 "[\"OPEN\",\"CLOSED\"]".into(),
2586 );
2587 let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2588 let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
2589 let schema = single_field_schema(field);
2590 let avro = AvroSchema::try_from(&schema).unwrap();
2591 assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
2592 assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
2593 }
2594
2595 #[test]
2596 fn test_run_end_encoded() {
2597 let ree_dt = DataType::RunEndEncoded(
2598 Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
2599 Arc::new(ArrowField::new("values", DataType::Utf8, false)),
2600 );
2601 let s = single_field_schema(ArrowField::new("text", ree_dt, false));
2602 let avro = AvroSchema::try_from(&s).unwrap();
2603 assert_json_contains(&avro.json_string, "\"string\"");
2604 }
2605
2606 #[test]
2607 fn test_dense_union() {
2608 let uf: UnionFields = vec![
2609 (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
2610 (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
2611 ]
2612 .into_iter()
2613 .collect();
2614 let union_dt = DataType::Union(uf, UnionMode::Dense);
2615 let s = single_field_schema(ArrowField::new("u", union_dt, false));
2616 let avro =
2617 AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
2618 let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
2619 let fields = v
2620 .get("fields")
2621 .and_then(|x| x.as_array())
2622 .expect("fields array");
2623 let u_field = fields
2624 .iter()
2625 .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
2626 .expect("field 'u'");
2627 let union = u_field.get("type").expect("u.type");
2628 let arr = union.as_array().expect("u.type must be Avro union array");
2629 assert_eq!(arr.len(), 2, "expected two union branches");
2630 let first = &arr[0];
2631 let obj = first
2632 .as_object()
2633 .expect("first branch should be an object with metadata");
2634 assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
2635 assert_eq!(
2636 obj.get("arrowUnionMode").and_then(|m| m.as_str()),
2637 Some("dense")
2638 );
2639 let type_ids: Vec<i64> = obj
2640 .get("arrowUnionTypeIds")
2641 .and_then(|a| a.as_array())
2642 .expect("arrowUnionTypeIds array")
2643 .iter()
2644 .map(|n| n.as_i64().expect("i64"))
2645 .collect();
2646 assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
2647 assert_eq!(arr[1], Value::String("string".into()));
2648 }
2649
2650 #[test]
2651 fn round_trip_primitive() {
2652 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
2653 let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
2654 let decoded = avro_schema.schema().unwrap();
2655 assert!(matches!(decoded, Schema::Complex(_)));
2656 }
2657
2658 #[test]
2659 fn test_name_generator_sanitization_and_uniqueness() {
2660 let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
2661 let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
2662 let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
2663 let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
2664 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2665 assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
2666 assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
2667 assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
2668 }
2669
2670 #[test]
2671 fn test_date64_logical_type_mapping() {
2672 let field = ArrowField::new("d", DataType::Date64, true);
2673 let schema = single_field_schema(field);
2674 let avro = AvroSchema::try_from(&schema).unwrap();
2675 assert_json_contains(
2676 &avro.json_string,
2677 "\"logicalType\":\"local-timestamp-millis\"",
2678 );
2679 }
2680
2681 #[cfg(feature = "avro_custom_types")]
2682 #[test]
2683 fn test_duration_list_extras_propagated() {
2684 let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
2685 let list_dt = DataType::List(Arc::new(child));
2686 let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
2687 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2688 assert_json_contains(
2689 &avro.json_string,
2690 "\"logicalType\":\"arrow.duration-micros\"",
2691 );
2692 }
2693
2694 #[test]
2695 fn test_interval_yearmonth_extra() {
2696 let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
2697 let schema = single_field_schema(field);
2698 let avro = AvroSchema::try_from(&schema).unwrap();
2699 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
2700 }
2701
2702 #[test]
2703 fn test_interval_daytime_extra() {
2704 let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
2705 let schema = single_field_schema(field);
2706 let avro = AvroSchema::try_from(&schema).unwrap();
2707 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
2708 }
2709
2710 #[test]
2711 fn test_fixed_size_list_extra() {
2712 let child = ArrowField::new("item", DataType::Int32, false);
2713 let dt = DataType::FixedSizeList(Arc::new(child), 3);
2714 let schema = single_field_schema(ArrowField::new("triples", dt, false));
2715 let avro = AvroSchema::try_from(&schema).unwrap();
2716 assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
2717 }
2718
2719 #[cfg(feature = "avro_custom_types")]
2720 #[test]
2721 fn test_map_duration_value_extra() {
2722 let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
2723 let entries_struct = ArrowField::new(
2724 "entries",
2725 DataType::Struct(Fields::from(vec![
2726 ArrowField::new("key", DataType::Utf8, false),
2727 val_field,
2728 ])),
2729 false,
2730 );
2731 let map_dt = DataType::Map(Arc::new(entries_struct), false);
2732 let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
2733 let avro = AvroSchema::try_from(&schema).unwrap();
2734 assert_json_contains(
2735 &avro.json_string,
2736 "\"logicalType\":\"arrow.duration-seconds\"",
2737 );
2738 }
2739
2740 #[test]
2741 fn test_schema_with_non_string_defaults_decodes_successfully() {
2742 let schema_json = r#"{
2743 "type": "record",
2744 "name": "R",
2745 "fields": [
2746 {"name": "a", "type": "int", "default": 0},
2747 {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
2748 {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
2749 {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
2750 {"name": "flag", "type": "boolean", "default": true},
2751 {"name": "name", "type": "string", "default": "hi"}
2752 ]}, "default": {"flag": false, "name": "d"}},
2753 {"name": "u", "type": ["int", "null"], "default": 42}
2754 ]
2755 }"#;
2756 let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
2757 match &schema {
2758 Schema::Complex(ComplexType::Record(_)) => {}
2759 other => panic!("expected record schema, got: {:?}", other),
2760 }
2761 let field = crate::codec::AvroField::try_from(&schema)
2763 .expect("Avro->Arrow conversion should succeed");
2764 let arrow_field = field.field();
2765 let expected_list_item = ArrowField::new(
2767 arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
2768 DataType::Int64,
2769 false,
2770 );
2771 let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
2772
2773 let expected_map_value = ArrowField::new("value", DataType::Float64, false);
2774 let expected_entries = ArrowField::new(
2775 "entries",
2776 DataType::Struct(Fields::from(vec![
2777 ArrowField::new("key", DataType::Utf8, false),
2778 expected_map_value,
2779 ])),
2780 false,
2781 );
2782 let expected_c =
2783 ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
2784 let mut inner_md = std::collections::HashMap::new();
2785 inner_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "Inner".to_string());
2786 let expected_inner = ArrowField::new(
2787 "inner",
2788 DataType::Struct(Fields::from(vec![
2789 ArrowField::new("flag", DataType::Boolean, false),
2790 ArrowField::new("name", DataType::Utf8, false),
2791 ])),
2792 false,
2793 )
2794 .with_metadata(inner_md);
2795 let mut root_md = std::collections::HashMap::new();
2796 root_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "R".to_string());
2797 let expected = ArrowField::new(
2798 "R",
2799 DataType::Struct(Fields::from(vec![
2800 ArrowField::new("a", DataType::Int32, false),
2801 expected_b,
2802 expected_c,
2803 expected_inner,
2804 ArrowField::new("u", DataType::Int32, true),
2805 ])),
2806 false,
2807 )
2808 .with_metadata(root_md);
2809 assert_eq!(arrow_field, expected);
2810 }
2811
2812 #[test]
2813 fn default_order_is_consistent() {
2814 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
2815 let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
2816 let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
2817 assert_eq!(a, b.unwrap().json_string);
2818 }
2819
2820 #[test]
2821 fn test_union_branch_missing_name_errors() {
2822 for t in ["record", "enum", "fixed"] {
2823 let branch = json!({ "type": t });
2824 let err = union_branch_signature(&branch).unwrap_err().to_string();
2825 assert!(
2826 err.contains(&format!("Union branch '{t}' missing required 'name'")),
2827 "expected missing-name error for {t}, got: {err}"
2828 );
2829 }
2830 }
2831
2832 #[test]
2833 fn test_union_branch_named_type_signature_includes_name() {
2834 let rec = json!({ "type": "record", "name": "Foo" });
2835 assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
2836 let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
2837 assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
2838 let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
2839 assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
2840 }
2841
2842 #[test]
2843 fn test_record_field_alias_resolution_without_default() {
2844 let writer_json = r#"{
2845 "type":"record",
2846 "name":"R",
2847 "fields":[{"name":"old","type":"int"}]
2848 }"#;
2849 let reader_json = r#"{
2850 "type":"record",
2851 "name":"R",
2852 "fields":[{"name":"new","aliases":["old"],"type":"int"}]
2853 }"#;
2854 let writer: Schema = serde_json::from_str(writer_json).unwrap();
2855 let reader: Schema = serde_json::from_str(reader_json).unwrap();
2856 let resolved = AvroFieldBuilder::new(&writer)
2857 .with_reader_schema(&reader)
2858 .with_utf8view(false)
2859 .with_strict_mode(false)
2860 .build()
2861 .unwrap();
2862 let expected = ArrowField::new(
2863 "R",
2864 DataType::Struct(Fields::from(vec![ArrowField::new(
2865 "new",
2866 DataType::Int32,
2867 false,
2868 )])),
2869 false,
2870 );
2871 assert_eq!(resolved.field(), expected);
2872 }
2873
2874 #[test]
2875 fn test_record_field_alias_ambiguous_in_strict_mode_errors() {
2876 let writer_json = r#"{
2877 "type":"record",
2878 "name":"R",
2879 "fields":[
2880 {"name":"a","type":"int","aliases":["old"]},
2881 {"name":"b","type":"int","aliases":["old"]}
2882 ]
2883 }"#;
2884 let reader_json = r#"{
2885 "type":"record",
2886 "name":"R",
2887 "fields":[{"name":"target","type":"int","aliases":["old"]}]
2888 }"#;
2889 let writer: Schema = serde_json::from_str(writer_json).unwrap();
2890 let reader: Schema = serde_json::from_str(reader_json).unwrap();
2891 let err = AvroFieldBuilder::new(&writer)
2892 .with_reader_schema(&reader)
2893 .with_utf8view(false)
2894 .with_strict_mode(true)
2895 .build()
2896 .unwrap_err()
2897 .to_string();
2898 assert!(
2899 err.contains("Ambiguous alias 'old'"),
2900 "expected ambiguous-alias error, got: {err}"
2901 );
2902 }
2903
2904 #[test]
2905 fn test_pragmatic_writer_field_alias_mapping_non_strict() {
2906 let writer_json = r#"{
2907 "type":"record",
2908 "name":"R",
2909 "fields":[{"name":"before","type":"int","aliases":["now"]}]
2910 }"#;
2911 let reader_json = r#"{
2912 "type":"record",
2913 "name":"R",
2914 "fields":[{"name":"now","type":"int"}]
2915 }"#;
2916 let writer: Schema = serde_json::from_str(writer_json).unwrap();
2917 let reader: Schema = serde_json::from_str(reader_json).unwrap();
2918 let resolved = AvroFieldBuilder::new(&writer)
2919 .with_reader_schema(&reader)
2920 .with_utf8view(false)
2921 .with_strict_mode(false)
2922 .build()
2923 .unwrap();
2924 let expected = ArrowField::new(
2925 "R",
2926 DataType::Struct(Fields::from(vec![ArrowField::new(
2927 "now",
2928 DataType::Int32,
2929 false,
2930 )])),
2931 false,
2932 );
2933 assert_eq!(resolved.field(), expected);
2934 }
2935
2936 #[test]
2937 fn test_missing_reader_field_null_first_no_default_is_ok() {
2938 let writer_json = r#"{
2939 "type":"record",
2940 "name":"R",
2941 "fields":[{"name":"a","type":"int"}]
2942 }"#;
2943 let reader_json = r#"{
2944 "type":"record",
2945 "name":"R",
2946 "fields":[
2947 {"name":"a","type":"int"},
2948 {"name":"b","type":["null","int"]}
2949 ]
2950 }"#;
2951 let writer: Schema = serde_json::from_str(writer_json).unwrap();
2952 let reader: Schema = serde_json::from_str(reader_json).unwrap();
2953 let resolved = AvroFieldBuilder::new(&writer)
2954 .with_reader_schema(&reader)
2955 .with_utf8view(false)
2956 .with_strict_mode(false)
2957 .build()
2958 .unwrap();
2959 let expected = ArrowField::new(
2960 "R",
2961 DataType::Struct(Fields::from(vec![
2962 ArrowField::new("a", DataType::Int32, false),
2963 ArrowField::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
2964 AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(),
2965 "null".to_string(),
2966 )])),
2967 ])),
2968 false,
2969 );
2970 assert_eq!(resolved.field(), expected);
2971 }
2972
2973 #[test]
2974 fn test_missing_reader_field_null_second_without_default_errors() {
2975 let writer_json = r#"{
2976 "type":"record",
2977 "name":"R",
2978 "fields":[{"name":"a","type":"int"}]
2979 }"#;
2980 let reader_json = r#"{
2981 "type":"record",
2982 "name":"R",
2983 "fields":[
2984 {"name":"a","type":"int"},
2985 {"name":"b","type":["int","null"]}
2986 ]
2987 }"#;
2988 let writer: Schema = serde_json::from_str(writer_json).unwrap();
2989 let reader: Schema = serde_json::from_str(reader_json).unwrap();
2990 let err = AvroFieldBuilder::new(&writer)
2991 .with_reader_schema(&reader)
2992 .with_utf8view(false)
2993 .with_strict_mode(false)
2994 .build()
2995 .unwrap_err()
2996 .to_string();
2997 assert!(
2998 err.contains("must have a default value"),
2999 "expected missing-default error, got: {err}"
3000 );
3001 }
3002
3003 #[test]
3004 fn test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
3005 let field = ArrowField::new("x", DataType::Int32, true);
3006 let injected_json =
3007 r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3008 .to_string();
3009 let mut md = HashMap::new();
3010 md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
3011 md.insert("custom".to_string(), "123".to_string());
3012 let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3013 let opts = AvroSchemaOptions {
3014 null_order: Some(Nullability::NullSecond),
3015 strip_metadata: false,
3016 };
3017 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3018 assert_eq!(
3019 out.json_string, injected_json,
3020 "When strip_metadata=false and avro.schema is present, return the embedded JSON verbatim"
3021 );
3022 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3023 assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
3024 assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
3025 }
3026
3027 #[test]
3028 fn test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough() {
3029 let field = ArrowField::new("x", DataType::Int32, true);
3030 let injected_json =
3031 r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3032 .to_string();
3033 let mut md = HashMap::new();
3034 md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
3035 md.insert("custom_meta".to_string(), "7".to_string());
3036 let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3037 let opts = AvroSchemaOptions {
3038 null_order: Some(Nullability::NullFirst),
3039 strip_metadata: true,
3040 };
3041 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3042 assert_json_contains(&out.json_string, "\"type\":\"record\"");
3043 assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
3044 assert_json_contains(&out.json_string, "\"custom_meta\":7");
3045 }
3046
3047 #[test]
3048 fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
3049 let field = ArrowField::new("s", DataType::Utf8, true);
3050 let arrow_schema = single_field_schema(field);
3051 let opts = AvroSchemaOptions {
3052 null_order: Some(Nullability::NullFirst),
3053 strip_metadata: true,
3054 };
3055 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3056 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3057 let arr = v["fields"][0]["type"]
3058 .as_array()
3059 .expect("nullable primitive should be Avro union array");
3060 assert_eq!(arr[0], Value::String("null".into()));
3061 assert_eq!(arr[1], Value::String("string".into()));
3062 }
3063
3064 #[test]
3065 fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
3066 let field = ArrowField::new("s", DataType::Utf8, true);
3067 let arrow_schema = single_field_schema(field);
3068 let opts = AvroSchemaOptions {
3069 null_order: Some(Nullability::NullSecond),
3070 strip_metadata: true,
3071 };
3072 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3073 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3074 let arr = v["fields"][0]["type"]
3075 .as_array()
3076 .expect("nullable primitive should be Avro union array");
3077 assert_eq!(arr[0], Value::String("string".into()));
3078 assert_eq!(arr[1], Value::String("null".into()));
3079 }
3080
3081 #[test]
3082 fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata() {
3083 let uf: UnionFields = vec![
3084 (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3085 (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
3086 ]
3087 .into_iter()
3088 .collect();
3089 let union_dt = DataType::Union(uf, UnionMode::Dense);
3090 let arrow_schema = single_field_schema(ArrowField::new("u", union_dt, true));
3091 let with_extras = AvroSchema::from_arrow_with_options(
3092 &arrow_schema,
3093 Some(AvroSchemaOptions {
3094 null_order: Some(Nullability::NullFirst),
3095 strip_metadata: false,
3096 }),
3097 )
3098 .unwrap();
3099 let v_with: Value = serde_json::from_str(&with_extras.json_string).unwrap();
3100 let union_arr = v_with["fields"][0]["type"].as_array().expect("union array");
3101 let first_obj = union_arr
3102 .iter()
3103 .find(|b| b.is_object())
3104 .expect("expected an object branch with extras");
3105 let obj = first_obj.as_object().unwrap();
3106 assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3107 assert_eq!(
3108 obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3109 Some("dense")
3110 );
3111 let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
3112 .as_array()
3113 .expect("arrowUnionTypeIds array")
3114 .iter()
3115 .map(|n| n.as_i64().expect("i64"))
3116 .collect();
3117 assert_eq!(type_ids, vec![2, 7]);
3118 let stripped = AvroSchema::from_arrow_with_options(
3119 &arrow_schema,
3120 Some(AvroSchemaOptions {
3121 null_order: Some(Nullability::NullFirst),
3122 strip_metadata: true,
3123 }),
3124 )
3125 .unwrap();
3126 let v_stripped: Value = serde_json::from_str(&stripped.json_string).unwrap();
3127 let union_arr2 = v_stripped["fields"][0]["type"]
3128 .as_array()
3129 .expect("union array");
3130 assert!(
3131 !union_arr2.iter().any(|b| b
3132 .as_object()
3133 .is_some_and(|m| m.contains_key("arrowUnionMode"))),
3134 "extras must be removed when strip_metadata=true"
3135 );
3136 assert_eq!(union_arr2[0], Value::String("null".into()));
3137 assert_eq!(union_arr2[1], Value::String("int".into()));
3138 assert_eq!(union_arr2[2], Value::String("string".into()));
3139 }
3140}