1#[cfg(feature = "canonical_extension_types")]
21use arrow_schema::extension::ExtensionType;
22use arrow_schema::{
23 ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as ArrowSchema, TimeUnit,
24 UnionMode,
25};
26use serde::{Deserialize, Serialize};
27use serde_json::{Map as JsonMap, Value, json};
28#[cfg(feature = "sha256")]
29use sha2::{Digest, Sha256};
30use std::borrow::Cow;
31use std::cmp::PartialEq;
32use std::collections::hash_map::Entry;
33use std::collections::{HashMap, HashSet};
34use strum_macros::AsRefStr;
35
36pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
38
39pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
41
42pub const MAX_PREFIX_LEN: usize = 34;
45
46pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
48
49pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
51
52pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
54
55pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
57
58pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
60
61pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
63
64pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
66
67#[derive(Debug, Copy, Clone, PartialEq, Default)]
73pub(crate) enum Nullability {
74 #[default]
76 NullFirst,
77 NullSecond,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85#[serde(untagged)]
86pub(crate) enum TypeName<'a> {
90 Primitive(PrimitiveType),
92 Ref(&'a str),
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)]
100#[serde(rename_all = "camelCase")]
101#[strum(serialize_all = "lowercase")]
102pub(crate) enum PrimitiveType {
103 Null,
105 Boolean,
107 Int,
109 Long,
111 Float,
113 Double,
115 Bytes,
117 String,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
125#[serde(rename_all = "camelCase")]
126pub(crate) struct Attributes<'a> {
127 #[serde(default)]
131 pub(crate) logical_type: Option<&'a str>,
132
133 #[serde(flatten)]
135 pub(crate) additional: HashMap<&'a str, Value>,
136}
137
138impl Attributes<'_> {
139 pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
141 self.additional
142 .iter()
143 .map(|(k, v)| (k.to_string(), v.to_string()))
144 .collect()
145 }
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
150#[serde(rename_all = "camelCase")]
151pub(crate) struct Type<'a> {
152 #[serde(borrow)]
154 pub(crate) r#type: TypeName<'a>,
155 #[serde(flatten)]
157 pub(crate) attributes: Attributes<'a>,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(untagged)]
166pub(crate) enum Schema<'a> {
167 #[serde(borrow)]
169 TypeName(TypeName<'a>),
170 #[serde(borrow)]
172 Union(Vec<Schema<'a>>),
173 #[serde(borrow)]
175 Complex(ComplexType<'a>),
176 #[serde(borrow)]
178 Type(Type<'a>),
179}
180
181#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(tag = "type", rename_all = "camelCase")]
186pub(crate) enum ComplexType<'a> {
187 #[serde(borrow)]
189 Record(Record<'a>),
190 #[serde(borrow)]
192 Enum(Enum<'a>),
193 #[serde(borrow)]
195 Array(Array<'a>),
196 #[serde(borrow)]
198 Map(Map<'a>),
199 #[serde(borrow)]
201 Fixed(Fixed<'a>),
202}
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub(crate) struct Record<'a> {
209 #[serde(borrow)]
211 pub(crate) name: &'a str,
212 #[serde(borrow, default)]
214 pub(crate) namespace: Option<&'a str>,
215 #[serde(borrow, default)]
217 pub(crate) doc: Option<Cow<'a, str>>,
218 #[serde(borrow, default)]
220 pub(crate) aliases: Vec<&'a str>,
221 #[serde(borrow)]
223 pub(crate) fields: Vec<Field<'a>>,
224 #[serde(flatten)]
226 pub(crate) attributes: Attributes<'a>,
227}
228
229fn deserialize_default<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
230where
231 D: serde::Deserializer<'de>,
232{
233 Value::deserialize(deserializer).map(Some)
234}
235
236#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
238pub(crate) struct Field<'a> {
239 #[serde(borrow)]
241 pub(crate) name: &'a str,
242 #[serde(borrow, default)]
244 pub(crate) doc: Option<Cow<'a, str>>,
245 #[serde(borrow)]
247 pub(crate) r#type: Schema<'a>,
248 #[serde(deserialize_with = "deserialize_default", default)]
250 pub(crate) default: Option<Value>,
251 #[serde(borrow, default)]
254 pub(crate) aliases: Vec<&'a str>,
255}
256
257#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
261pub(crate) struct Enum<'a> {
262 #[serde(borrow)]
264 pub(crate) name: &'a str,
265 #[serde(borrow, default)]
267 pub(crate) namespace: Option<&'a str>,
268 #[serde(borrow, default)]
270 pub(crate) doc: Option<Cow<'a, str>>,
271 #[serde(borrow, default)]
273 pub(crate) aliases: Vec<&'a str>,
274 #[serde(borrow)]
276 pub(crate) symbols: Vec<&'a str>,
277 #[serde(borrow, default)]
279 pub(crate) default: Option<&'a str>,
280 #[serde(flatten)]
282 pub(crate) attributes: Attributes<'a>,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
289pub(crate) struct Array<'a> {
290 #[serde(borrow)]
292 pub(crate) items: Box<Schema<'a>>,
293 #[serde(flatten)]
295 pub(crate) attributes: Attributes<'a>,
296}
297
298#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
302pub(crate) struct Map<'a> {
303 #[serde(borrow)]
305 pub(crate) values: Box<Schema<'a>>,
306 #[serde(flatten)]
308 pub(crate) attributes: Attributes<'a>,
309}
310
311#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
315pub(crate) struct Fixed<'a> {
316 #[serde(borrow)]
318 pub(crate) name: &'a str,
319 #[serde(borrow, default)]
321 pub(crate) namespace: Option<&'a str>,
322 #[serde(borrow, default)]
324 pub(crate) aliases: Vec<&'a str>,
325 pub(crate) size: usize,
327 #[serde(flatten)]
329 pub(crate) attributes: Attributes<'a>,
330}
331
332#[derive(Debug, Copy, Clone, PartialEq, Default)]
333pub(crate) struct AvroSchemaOptions {
334 pub(crate) null_order: Option<Nullability>,
335 pub(crate) strip_metadata: bool,
336}
337
338#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340pub struct AvroSchema {
341 pub json_string: String,
343}
344
345impl TryFrom<&ArrowSchema> for AvroSchema {
346 type Error = ArrowError;
347
348 fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
352 AvroSchema::from_arrow_with_options(schema, None)
353 }
354}
355
356impl AvroSchema {
357 pub fn new(json_string: String) -> Self {
359 Self { json_string }
360 }
361
362 pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
363 serde_json::from_str(self.json_string.as_str())
364 .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
365 }
366
367 pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
395 Self::generate_fingerprint(&self.schema()?, hash_type)
396 }
397
398 pub(crate) fn project(&self, projection: &[usize]) -> Result<Self, ArrowError> {
399 let mut value: Value = serde_json::from_str(&self.json_string)
400 .map_err(|e| ArrowError::AvroError(format!("Invalid Avro schema JSON: {e}")))?;
401 let obj = value.as_object_mut().ok_or_else(|| {
402 ArrowError::AvroError(
403 "Projected schema must be a JSON object Avro record schema".to_string(),
404 )
405 })?;
406 match obj.get("type").and_then(|v| v.as_str()) {
407 Some("record") => {}
408 Some(other) => {
409 return Err(ArrowError::AvroError(format!(
410 "Projected schema must be an Avro record, found type '{other}'"
411 )));
412 }
413 None => {
414 return Err(ArrowError::AvroError(
415 "Projected schema missing required 'type' field".to_string(),
416 ));
417 }
418 }
419 let fields_val = obj.get_mut("fields").ok_or_else(|| {
420 ArrowError::AvroError("Avro record schema missing required 'fields'".to_string())
421 })?;
422 let projected_fields = {
423 let mut original_fields = match fields_val {
424 Value::Array(arr) => std::mem::take(arr),
425 _ => {
426 return Err(ArrowError::AvroError(
427 "Avro record schema 'fields' must be an array".to_string(),
428 ));
429 }
430 };
431 let len = original_fields.len();
432 let mut seen: HashSet<usize> = HashSet::with_capacity(projection.len());
433 let mut out: Vec<Value> = Vec::with_capacity(projection.len());
434 for &i in projection {
435 if i >= len {
436 return Err(ArrowError::AvroError(format!(
437 "Projection index {i} out of bounds for record with {len} fields"
438 )));
439 }
440 if !seen.insert(i) {
441 return Err(ArrowError::AvroError(format!(
442 "Duplicate projection index {i}"
443 )));
444 }
445 out.push(std::mem::replace(&mut original_fields[i], Value::Null));
446 }
447 out
448 };
449 *fields_val = Value::Array(projected_fields);
450 let json_string = serde_json::to_string(&value).map_err(|e| {
451 ArrowError::AvroError(format!(
452 "Failed to serialize projected Avro schema JSON: {e}"
453 ))
454 })?;
455 Ok(Self::new(json_string))
456 }
457
458 pub(crate) fn generate_fingerprint(
459 schema: &Schema,
460 hash_type: FingerprintAlgorithm,
461 ) -> Result<Fingerprint, ArrowError> {
462 let canonical = Self::generate_canonical_form(schema).map_err(|e| {
463 ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}"))
464 })?;
465 match hash_type {
466 FingerprintAlgorithm::Rabin => {
467 Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
468 }
469 FingerprintAlgorithm::Id | FingerprintAlgorithm::Id64 => Err(ArrowError::SchemaError(
470 "FingerprintAlgorithm of Id or Id64 cannot be used to generate a fingerprint; \
471 if using Fingerprint::Id, pass the registry ID in instead using the set method."
472 .to_string(),
473 )),
474 #[cfg(feature = "md5")]
475 FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))),
476 #[cfg(feature = "sha256")]
477 FingerprintAlgorithm::SHA256 => {
478 Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical)))
479 }
480 }
481 }
482
483 pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
494 build_canonical(schema, None)
495 }
496
497 pub(crate) fn from_arrow_with_options(
504 schema: &ArrowSchema,
505 options: Option<AvroSchemaOptions>,
506 ) -> Result<AvroSchema, ArrowError> {
507 let opts = options.unwrap_or_default();
508 let order = opts.null_order.unwrap_or_default();
509 let strip = opts.strip_metadata;
510 if !strip {
511 if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
512 return Ok(AvroSchema::new(json.clone()));
513 }
514 }
515 let mut name_gen = NameGenerator::default();
516 let fields_json = schema
517 .fields()
518 .iter()
519 .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
520 .collect::<Result<Vec<_>, _>>()?;
521 let record_name = schema
522 .metadata
523 .get(AVRO_NAME_METADATA_KEY)
524 .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
525 let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
526 record.insert("type".into(), Value::String("record".into()));
527 record.insert(
528 "name".into(),
529 Value::String(sanitise_avro_name(record_name)),
530 );
531 if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
532 record.insert("namespace".into(), Value::String(ns.clone()));
533 }
534 if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
535 record.insert("doc".into(), Value::String(doc.clone()));
536 }
537 record.insert("fields".into(), Value::Array(fields_json));
538 extend_with_passthrough_metadata(&mut record, &schema.metadata);
539 let json_string = serde_json::to_string(&Value::Object(record))
540 .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro JSON failed: {e}")))?;
541 Ok(AvroSchema::new(json_string))
542 }
543}
544
545#[derive(Debug, Copy, Clone)]
547pub(crate) struct Prefix {
548 buf: [u8; MAX_PREFIX_LEN],
549 len: u8,
550}
551
552impl Prefix {
553 #[inline]
554 pub(crate) fn as_slice(&self) -> &[u8] {
555 &self.buf[..self.len as usize]
556 }
557}
558
559#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
561pub enum FingerprintStrategy {
562 #[default]
564 Rabin,
565 Id(u32),
567 Id64(u64),
569 #[cfg(feature = "md5")]
570 MD5,
572 #[cfg(feature = "sha256")]
573 SHA256,
575}
576
577impl From<Fingerprint> for FingerprintStrategy {
578 fn from(f: Fingerprint) -> Self {
579 Self::from(&f)
580 }
581}
582
583impl From<FingerprintAlgorithm> for FingerprintStrategy {
584 fn from(f: FingerprintAlgorithm) -> Self {
585 match f {
586 FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
587 FingerprintAlgorithm::Id => FingerprintStrategy::Id(0),
588 FingerprintAlgorithm::Id64 => FingerprintStrategy::Id64(0),
589 #[cfg(feature = "md5")]
590 FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
591 #[cfg(feature = "sha256")]
592 FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
593 }
594 }
595}
596
597impl From<&Fingerprint> for FingerprintStrategy {
598 fn from(f: &Fingerprint) -> Self {
599 match f {
600 Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
601 Fingerprint::Id(_) => FingerprintStrategy::Id(0),
602 Fingerprint::Id64(_) => FingerprintStrategy::Id64(0),
603 #[cfg(feature = "md5")]
604 Fingerprint::MD5(_) => FingerprintStrategy::MD5,
605 #[cfg(feature = "sha256")]
606 Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
607 }
608 }
609}
610
611#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
614pub enum FingerprintAlgorithm {
615 #[default]
617 Rabin,
618 Id,
620 Id64,
622 #[cfg(feature = "md5")]
623 MD5,
625 #[cfg(feature = "sha256")]
626 SHA256,
628}
629
630impl From<&Fingerprint> for FingerprintAlgorithm {
632 fn from(fp: &Fingerprint) -> Self {
633 match fp {
634 Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
635 Fingerprint::Id(_) => FingerprintAlgorithm::Id,
636 Fingerprint::Id64(_) => FingerprintAlgorithm::Id64,
637 #[cfg(feature = "md5")]
638 Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
639 #[cfg(feature = "sha256")]
640 Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256,
641 }
642 }
643}
644
645impl From<FingerprintStrategy> for FingerprintAlgorithm {
646 fn from(s: FingerprintStrategy) -> Self {
647 Self::from(&s)
648 }
649}
650
651impl From<&FingerprintStrategy> for FingerprintAlgorithm {
652 fn from(s: &FingerprintStrategy) -> Self {
653 match s {
654 FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
655 FingerprintStrategy::Id(_) => FingerprintAlgorithm::Id,
656 FingerprintStrategy::Id64(_) => FingerprintAlgorithm::Id64,
657 #[cfg(feature = "md5")]
658 FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
659 #[cfg(feature = "sha256")]
660 FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
661 }
662 }
663}
664
665#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
674pub enum Fingerprint {
675 Rabin(u64),
677 Id(u32),
679 Id64(u64),
681 #[cfg(feature = "md5")]
682 MD5([u8; 16]),
684 #[cfg(feature = "sha256")]
685 SHA256([u8; 32]),
687}
688
689impl From<FingerprintStrategy> for Fingerprint {
690 fn from(s: FingerprintStrategy) -> Self {
691 Self::from(&s)
692 }
693}
694
695impl From<&FingerprintStrategy> for Fingerprint {
696 fn from(s: &FingerprintStrategy) -> Self {
697 match s {
698 FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
699 FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
700 FingerprintStrategy::Id64(id) => Fingerprint::Id64(*id),
701 #[cfg(feature = "md5")]
702 FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
703 #[cfg(feature = "sha256")]
704 FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
705 }
706 }
707}
708
709impl From<FingerprintAlgorithm> for Fingerprint {
710 fn from(s: FingerprintAlgorithm) -> Self {
711 match s {
712 FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
713 FingerprintAlgorithm::Id => Fingerprint::Id(0),
714 FingerprintAlgorithm::Id64 => Fingerprint::Id64(0),
715 #[cfg(feature = "md5")]
716 FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
717 #[cfg(feature = "sha256")]
718 FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
719 }
720 }
721}
722
723impl Fingerprint {
724 pub fn load_fingerprint_id(id: u32) -> Self {
732 Fingerprint::Id(u32::from_be(id))
733 }
734
735 pub fn load_fingerprint_id64(id: u64) -> Self {
743 Fingerprint::Id64(u64::from_be(id))
744 }
745
746 pub(crate) fn make_prefix(&self) -> Prefix {
770 let mut buf = [0u8; MAX_PREFIX_LEN];
771 let len = match self {
772 Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
773 Self::Id64(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
774 Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
775 #[cfg(feature = "md5")]
776 Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
777 #[cfg(feature = "sha256")]
778 Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
779 };
780 Prefix { buf, len }
781 }
782}
783
784fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
785 buf: &mut [u8; MAX_PREFIX_LEN],
786 magic: &[u8; MAGIC_LEN],
787 payload: &[u8; PAYLOAD_LEN],
788) -> u8 {
789 debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
790 let total = MAGIC_LEN + PAYLOAD_LEN;
791 let prefix_slice = &mut buf[..total];
792 prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
793 prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
794 total as u8
795}
796
797#[derive(Debug, Clone, Default)]
824pub struct SchemaStore {
825 fingerprint_algorithm: FingerprintAlgorithm,
827 schemas: HashMap<Fingerprint, AvroSchema>,
829}
830
831impl TryFrom<HashMap<Fingerprint, AvroSchema>> for SchemaStore {
832 type Error = ArrowError;
833
834 fn try_from(schemas: HashMap<Fingerprint, AvroSchema>) -> Result<Self, Self::Error> {
837 Ok(Self {
838 schemas,
839 ..Self::default()
840 })
841 }
842}
843
844impl SchemaStore {
845 pub fn new() -> Self {
847 Self::default()
848 }
849
850 pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self {
852 Self {
853 fingerprint_algorithm,
854 ..Self::default()
855 }
856 }
857
858 pub fn set(
875 &mut self,
876 fingerprint: Fingerprint,
877 schema: AvroSchema,
878 ) -> Result<Fingerprint, ArrowError> {
879 match self.schemas.entry(fingerprint) {
880 Entry::Occupied(entry) => {
881 if entry.get() != &schema {
882 return Err(ArrowError::ComputeError(format!(
883 "Schema fingerprint collision detected for fingerprint {fingerprint:?}"
884 )));
885 }
886 }
887 Entry::Vacant(entry) => {
888 entry.insert(schema);
889 }
890 }
891 Ok(fingerprint)
892 }
893
894 pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, ArrowError> {
912 if self.fingerprint_algorithm == FingerprintAlgorithm::Id
913 || self.fingerprint_algorithm == FingerprintAlgorithm::Id64
914 {
915 return Err(ArrowError::SchemaError(
916 "Invalid FingerprintAlgorithm; unable to generate fingerprint. \
917 Use the set method directly instead, providing a valid fingerprint"
918 .to_string(),
919 ));
920 }
921 let fingerprint =
922 AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?;
923 self.set(fingerprint, schema)?;
924 Ok(fingerprint)
925 }
926
927 pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
937 self.schemas.get(fingerprint)
938 }
939
940 pub fn fingerprints(&self) -> Vec<Fingerprint> {
946 self.schemas.keys().copied().collect()
947 }
948
949 pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
951 self.fingerprint_algorithm
952 }
953}
954
955fn quote(s: &str) -> Result<String, ArrowError> {
956 serde_json::to_string(s)
957 .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}")))
958}
959
960pub(crate) fn make_full_name(
977 name: &str,
978 namespace_attr: Option<&str>,
979 enclosing_ns: Option<&str>,
980) -> (String, Option<String>) {
981 if let Some((ns, _)) = name.rsplit_once('.') {
983 return (name.to_string(), Some(ns.to_string()));
984 }
985 match namespace_attr.or(enclosing_ns) {
986 Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
987 None => (name.to_string(), None),
988 }
989}
990
991fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> {
992 Ok(match schema {
993 Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn {
994 TypeName::Primitive(pt) => quote(pt.as_ref())?,
995 TypeName::Ref(name) => {
996 let (full_name, _) = make_full_name(name, None, enclosing_ns);
997 quote(&full_name)?
998 }
999 },
1000 Schema::Union(branches) => format!(
1001 "[{}]",
1002 branches
1003 .iter()
1004 .map(|b| build_canonical(b, enclosing_ns))
1005 .collect::<Result<Vec<_>, _>>()?
1006 .join(",")
1007 ),
1008 Schema::Complex(ct) => match ct {
1009 ComplexType::Record(r) => {
1010 let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns);
1011 let fields = r
1012 .fields
1013 .iter()
1014 .map(|f| {
1015 let field_type =
1020 build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?;
1021 Ok(format!(
1022 r#"{{"name":{},"type":{}}}"#,
1023 quote(f.name)?,
1024 field_type
1025 ))
1026 })
1027 .collect::<Result<Vec<_>, ArrowError>>()?
1028 .join(",");
1029 format!(
1030 r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
1031 quote(&full_name)?,
1032 )
1033 }
1034 ComplexType::Enum(e) => {
1035 let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns);
1036 let symbols = e
1037 .symbols
1038 .iter()
1039 .map(|s| quote(s))
1040 .collect::<Result<Vec<_>, _>>()?
1041 .join(",");
1042 format!(
1043 r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
1044 quote(&full_name)?
1045 )
1046 }
1047 ComplexType::Array(arr) => format!(
1048 r#"{{"type":"array","items":{}}}"#,
1049 build_canonical(&arr.items, enclosing_ns)?
1050 ),
1051 ComplexType::Map(map) => format!(
1052 r#"{{"type":"map","values":{}}}"#,
1053 build_canonical(&map.values, enclosing_ns)?
1054 ),
1055 ComplexType::Fixed(f) => {
1056 let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns);
1057 format!(
1058 r#"{{"name":{},"type":"fixed","size":{}}}"#,
1059 quote(&full_name)?,
1060 f.size
1061 )
1062 }
1063 },
1064 })
1065}
1066
1067const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
1069
1070const fn one_entry(i: usize) -> u64 {
1077 let mut fp = i as u64;
1078 let mut j = 0;
1079 while j < 8 {
1080 fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
1081 j += 1;
1082 }
1083 fp
1084}
1085
1086const fn build_table() -> [u64; 256] {
1093 let mut table = [0u64; 256];
1094 let mut i = 0;
1095 while i < 256 {
1096 table[i] = one_entry(i);
1097 i += 1;
1098 }
1099 table
1100}
1101
1102static FINGERPRINT_TABLE: [u64; 256] = build_table();
1104
1105pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
1108 let mut fp = EMPTY;
1109 for &byte in canonical_form.as_bytes() {
1110 let idx = ((fp as u8) ^ byte) as usize;
1111 fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
1112 }
1113 fp
1114}
1115
1116#[cfg(feature = "md5")]
1117#[inline]
1122pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] {
1123 let digest = md5::compute(canonical_form.as_bytes());
1124 digest.0
1125}
1126
1127#[cfg(feature = "sha256")]
1128#[inline]
1132pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] {
1133 let mut hasher = Sha256::new();
1134 hasher.update(canonical_form.as_bytes());
1135 let digest = hasher.finalize();
1136 digest.into()
1137}
1138
1139#[inline]
1140fn is_internal_arrow_key(key: &str) -> bool {
1141 key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
1142}
1143
1144fn extend_with_passthrough_metadata(
1149 target: &mut JsonMap<String, Value>,
1150 metadata: &HashMap<String, String>,
1151) {
1152 for (meta_key, meta_val) in metadata {
1153 if meta_key.starts_with("avro.") || is_internal_arrow_key(meta_key) {
1154 continue;
1155 }
1156 let json_val =
1157 serde_json::from_str(meta_val).unwrap_or_else(|_| Value::String(meta_val.clone()));
1158 target.insert(meta_key.clone(), json_val);
1159 }
1160}
1161
1162fn sanitise_avro_name(base_name: &str) -> String {
1164 if base_name.is_empty() {
1165 return "_".to_owned();
1166 }
1167 let mut out: String = base_name
1168 .chars()
1169 .map(|char| {
1170 if char.is_ascii_alphanumeric() || char == '_' {
1171 char
1172 } else {
1173 '_'
1174 }
1175 })
1176 .collect();
1177 if out.as_bytes()[0].is_ascii_digit() {
1178 out.insert(0, '_');
1179 }
1180 out
1181}
1182
1183#[derive(Default)]
1184struct NameGenerator {
1185 used: HashSet<String>,
1186 counters: HashMap<String, usize>,
1187}
1188
1189impl NameGenerator {
1190 fn make_unique(&mut self, field_name: &str) -> String {
1191 let field_name = sanitise_avro_name(field_name);
1192 if self.used.insert(field_name.clone()) {
1193 self.counters.insert(field_name.clone(), 1);
1194 return field_name;
1195 }
1196 let counter = self.counters.entry(field_name.clone()).or_insert(1);
1197 loop {
1198 let candidate = format!("{field_name}_{}", *counter);
1199 if self.used.insert(candidate.clone()) {
1200 return candidate;
1201 }
1202 *counter += 1;
1203 }
1204 }
1205}
1206
1207fn merge_extras(schema: Value, extras: JsonMap<String, Value>) -> Value {
1208 if extras.is_empty() {
1209 return schema;
1210 }
1211 match schema {
1212 Value::Object(mut map) => {
1213 map.extend(extras);
1214 Value::Object(map)
1215 }
1216 Value::Array(mut union) => {
1217 if let Some(non_null) = union.iter_mut().find(|val| val.as_str() != Some("null")) {
1220 let original = std::mem::take(non_null);
1221 *non_null = merge_extras(original, extras);
1222 }
1223 Value::Array(union)
1224 }
1225 primitive => {
1226 let mut map = JsonMap::with_capacity(extras.len() + 1);
1227 map.insert("type".into(), primitive);
1228 map.extend(extras);
1229 Value::Object(map)
1230 }
1231 }
1232}
1233
1234#[inline]
1235fn is_avro_json_null(v: &Value) -> bool {
1236 matches!(v, Value::String(s) if s == "null")
1237}
1238
1239fn wrap_nullable(inner: Value, null_order: Nullability) -> Value {
1240 let null = Value::String("null".into());
1241 match inner {
1242 Value::Array(mut union) => {
1243 if union.iter().any(is_avro_json_null) {
1248 return Value::Array(union);
1249 }
1250 match null_order {
1252 Nullability::NullFirst => union.insert(0, null),
1253 Nullability::NullSecond => union.push(null),
1254 }
1255 Value::Array(union)
1256 }
1257 other => match null_order {
1258 Nullability::NullFirst => Value::Array(vec![null, other]),
1259 Nullability::NullSecond => Value::Array(vec![other, null]),
1260 },
1261 }
1262}
1263
1264fn min_fixed_bytes_for_precision(p: usize) -> usize {
1265 const MAX_P: [usize; 32] = [
1268 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1269 59, 62, 64, 67, 69, 71, 74, 76,
1270 ];
1271 for (i, &max_p) in MAX_P.iter().enumerate() {
1272 if p <= max_p {
1273 return i + 1;
1274 }
1275 }
1276 32 }
1278
1279fn union_branch_signature(branch: &Value) -> Result<String, ArrowError> {
1280 match branch {
1281 Value::String(t) => Ok(format!("P:{t}")),
1282 Value::Object(map) => {
1283 let t = map.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
1284 ArrowError::SchemaError("Union branch object missing string 'type'".into())
1285 })?;
1286 match t {
1287 "record" | "enum" | "fixed" => {
1288 let name = map.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
1289 ArrowError::SchemaError(format!(
1290 "Union branch '{t}' missing required 'name'"
1291 ))
1292 })?;
1293 Ok(format!("N:{t}:{name}"))
1294 }
1295 "array" | "map" => Ok(format!("C:{t}")),
1296 other => Ok(format!("P:{other}")),
1297 }
1298 }
1299 Value::Array(_) => Err(ArrowError::SchemaError(
1300 "Avro union may not immediately contain another union".into(),
1301 )),
1302 _ => Err(ArrowError::SchemaError(
1303 "Invalid JSON for Avro union branch".into(),
1304 )),
1305 }
1306}
1307
1308fn datatype_to_avro(
1309 dt: &DataType,
1310 field_name: &str,
1311 metadata: &HashMap<String, String>,
1312 name_gen: &mut NameGenerator,
1313 null_order: Nullability,
1314 strip: bool,
1315) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
1316 let mut extras = JsonMap::new();
1317 let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value, ArrowError> {
1318 if *scale < 0 {
1319 return Err(ArrowError::SchemaError(format!(
1320 "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0"
1321 )));
1322 }
1323 if (*scale as usize) > (*precision as usize) {
1324 return Err(ArrowError::SchemaError(format!(
1325 "Invalid Avro decimal for field '{field_name}': scale ({scale}) \
1326 must be <= precision ({precision})"
1327 )));
1328 }
1329 let mut meta = JsonMap::from_iter([
1330 ("logicalType".into(), json!("decimal")),
1331 ("precision".into(), json!(*precision)),
1332 ("scale".into(), json!(*scale)),
1333 ]);
1334 let mut fixed_size = metadata.get("size").and_then(|v| v.parse::<usize>().ok());
1335 let carries_name = metadata.contains_key(AVRO_NAME_METADATA_KEY)
1336 || metadata.contains_key(AVRO_NAMESPACE_METADATA_KEY);
1337 if fixed_size.is_none() && carries_name {
1338 fixed_size = Some(min_fixed_bytes_for_precision(*precision as usize));
1339 }
1340 if let Some(size) = fixed_size {
1341 meta.insert("type".into(), json!("fixed"));
1342 meta.insert("size".into(), json!(size));
1343 let chosen_name = metadata
1344 .get(AVRO_NAME_METADATA_KEY)
1345 .map(|s| sanitise_avro_name(s))
1346 .unwrap_or_else(|| name_gen.make_unique(field_name));
1347 meta.insert("name".into(), json!(chosen_name));
1348 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1349 meta.insert("namespace".into(), json!(ns));
1350 }
1351 } else {
1352 meta.insert("type".into(), json!("bytes"));
1354 }
1355 Ok(Value::Object(meta))
1356 };
1357 let val = match dt {
1358 DataType::Null => Value::String("null".into()),
1359 DataType::Boolean => Value::String("boolean".into()),
1360 DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 | DataType::Int32 => {
1361 Value::String("int".into())
1362 }
1363 DataType::UInt32 | DataType::Int64 | DataType::UInt64 => Value::String("long".into()),
1364 DataType::Float16 | DataType::Float32 => Value::String("float".into()),
1365 DataType::Float64 => Value::String("double".into()),
1366 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Value::String("string".into()),
1367 DataType::Binary | DataType::LargeBinary => Value::String("bytes".into()),
1368 DataType::BinaryView => {
1369 if !strip {
1370 extras.insert("arrowBinaryView".into(), Value::Bool(true));
1371 }
1372 Value::String("bytes".into())
1373 }
1374 DataType::FixedSizeBinary(len) => {
1375 let md_is_uuid = metadata
1376 .get("logicalType")
1377 .map(|s| s.trim_matches('"') == "uuid")
1378 .unwrap_or(false);
1379 #[cfg(feature = "canonical_extension_types")]
1380 let ext_is_uuid = metadata
1381 .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
1382 .map(|v| v == arrow_schema::extension::Uuid::NAME || v == "uuid")
1383 .unwrap_or(false);
1384 #[cfg(not(feature = "canonical_extension_types"))]
1385 let ext_is_uuid = false;
1386 let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
1387 if is_uuid {
1388 json!({ "type": "string", "logicalType": "uuid" })
1389 } else {
1390 let chosen_name = metadata
1391 .get(AVRO_NAME_METADATA_KEY)
1392 .map(|s| sanitise_avro_name(s))
1393 .unwrap_or_else(|| name_gen.make_unique(field_name));
1394 let mut obj = JsonMap::from_iter([
1395 ("type".into(), json!("fixed")),
1396 ("name".into(), json!(chosen_name)),
1397 ("size".into(), json!(len)),
1398 ]);
1399 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1400 obj.insert("namespace".into(), json!(ns));
1401 }
1402 Value::Object(obj)
1403 }
1404 }
1405 #[cfg(feature = "small_decimals")]
1406 DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => {
1407 handle_decimal(precision, scale)?
1408 }
1409 DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
1410 handle_decimal(precision, scale)?
1411 }
1412 DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
1413 DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }),
1414 DataType::Time32(unit) => match unit {
1415 TimeUnit::Millisecond => json!({ "type": "int", "logicalType": "time-millis" }),
1416 TimeUnit::Second => {
1417 if !strip {
1418 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1419 }
1420 Value::String("int".into())
1421 }
1422 _ => Value::String("int".into()),
1423 },
1424 DataType::Time64(unit) => match unit {
1425 TimeUnit::Microsecond => json!({ "type": "long", "logicalType": "time-micros" }),
1426 TimeUnit::Nanosecond => {
1427 if !strip {
1428 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1429 }
1430 Value::String("long".into())
1431 }
1432 _ => Value::String("long".into()),
1433 },
1434 DataType::Timestamp(unit, tz) => {
1435 let logical_type = match (unit, tz.is_some()) {
1436 (TimeUnit::Millisecond, true) => "timestamp-millis",
1437 (TimeUnit::Millisecond, false) => "local-timestamp-millis",
1438 (TimeUnit::Microsecond, true) => "timestamp-micros",
1439 (TimeUnit::Microsecond, false) => "local-timestamp-micros",
1440 (TimeUnit::Nanosecond, true) => "timestamp-nanos",
1441 (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
1442 (TimeUnit::Second, _) => {
1443 if !strip {
1444 extras.insert("arrowTimeUnit".into(), Value::String("second".into()));
1445 }
1446 return Ok((Value::String("long".into()), extras));
1447 }
1448 };
1449 if !strip && matches!(unit, TimeUnit::Nanosecond) {
1450 extras.insert("arrowTimeUnit".into(), Value::String("nanosecond".into()));
1451 }
1452 json!({ "type": "long", "logicalType": logical_type })
1453 }
1454 #[cfg(not(feature = "avro_custom_types"))]
1455 DataType::Duration(_unit) => Value::String("long".into()),
1456 #[cfg(feature = "avro_custom_types")]
1457 DataType::Duration(unit) => {
1458 let logical_type = match unit {
1461 TimeUnit::Second => "arrow.duration-seconds",
1462 TimeUnit::Millisecond => "arrow.duration-millis",
1463 TimeUnit::Microsecond => "arrow.duration-micros",
1464 TimeUnit::Nanosecond => "arrow.duration-nanos",
1465 };
1466 json!({ "type": "long", "logicalType": logical_type })
1467 }
1468 DataType::Interval(IntervalUnit::MonthDayNano) => {
1469 let chosen_name = metadata
1471 .get(AVRO_NAME_METADATA_KEY)
1472 .map(|s| sanitise_avro_name(s))
1473 .unwrap_or_else(|| name_gen.make_unique(field_name));
1474 let mut obj = JsonMap::from_iter([
1475 ("type".into(), json!("fixed")),
1476 ("name".into(), json!(chosen_name)),
1477 ("size".into(), json!(12)),
1478 ("logicalType".into(), json!("duration")),
1479 ]);
1480 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1481 obj.insert("namespace".into(), json!(ns));
1482 }
1483 json!(obj)
1484 }
1485 DataType::Interval(IntervalUnit::YearMonth) => {
1486 if !strip {
1487 extras.insert(
1488 "arrowIntervalUnit".into(),
1489 Value::String("yearmonth".into()),
1490 );
1491 }
1492 Value::String("long".into())
1493 }
1494 DataType::Interval(IntervalUnit::DayTime) => {
1495 if !strip {
1496 extras.insert("arrowIntervalUnit".into(), Value::String("daytime".into()));
1497 }
1498 Value::String("long".into())
1499 }
1500 DataType::List(child) | DataType::LargeList(child) => {
1501 if matches!(dt, DataType::LargeList(_)) && !strip {
1502 extras.insert("arrowLargeList".into(), Value::Bool(true));
1503 }
1504 let items_schema = process_datatype(
1505 child.data_type(),
1506 child.name(),
1507 child.metadata(),
1508 name_gen,
1509 null_order,
1510 child.is_nullable(),
1511 strip,
1512 )?;
1513 json!({
1514 "type": "array",
1515 "items": items_schema
1516 })
1517 }
1518 DataType::ListView(child) | DataType::LargeListView(child) => {
1519 if matches!(dt, DataType::LargeListView(_)) && !strip {
1520 extras.insert("arrowLargeList".into(), Value::Bool(true));
1521 }
1522 if !strip {
1523 extras.insert("arrowListView".into(), Value::Bool(true));
1524 }
1525 let items_schema = process_datatype(
1526 child.data_type(),
1527 child.name(),
1528 child.metadata(),
1529 name_gen,
1530 null_order,
1531 child.is_nullable(),
1532 strip,
1533 )?;
1534 json!({
1535 "type": "array",
1536 "items": items_schema
1537 })
1538 }
1539 DataType::FixedSizeList(child, len) => {
1540 if !strip {
1541 extras.insert("arrowFixedSize".into(), json!(len));
1542 }
1543 let items_schema = process_datatype(
1544 child.data_type(),
1545 child.name(),
1546 child.metadata(),
1547 name_gen,
1548 null_order,
1549 child.is_nullable(),
1550 strip,
1551 )?;
1552 json!({
1553 "type": "array",
1554 "items": items_schema
1555 })
1556 }
1557 DataType::Map(entries, _) => {
1558 let value_field = match entries.data_type() {
1559 DataType::Struct(fs) => &fs[1],
1560 _ => {
1561 return Err(ArrowError::SchemaError(
1562 "Map 'entries' field must be Struct(key,value)".into(),
1563 ));
1564 }
1565 };
1566 let values_schema = process_datatype(
1567 value_field.data_type(),
1568 value_field.name(),
1569 value_field.metadata(),
1570 name_gen,
1571 null_order,
1572 value_field.is_nullable(),
1573 strip,
1574 )?;
1575 json!({
1576 "type": "map",
1577 "values": values_schema
1578 })
1579 }
1580 DataType::Struct(fields) => {
1581 let avro_fields = fields
1582 .iter()
1583 .map(|field| arrow_field_to_avro(field, name_gen, null_order, strip))
1584 .collect::<Result<Vec<_>, _>>()?;
1585 let chosen_name = metadata
1587 .get(AVRO_NAME_METADATA_KEY)
1588 .map(|s| sanitise_avro_name(s))
1589 .unwrap_or_else(|| name_gen.make_unique(field_name));
1590 let mut obj = JsonMap::from_iter([
1591 ("type".into(), json!("record")),
1592 ("name".into(), json!(chosen_name)),
1593 ("fields".into(), Value::Array(avro_fields)),
1594 ]);
1595 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1596 obj.insert("namespace".into(), json!(ns));
1597 }
1598 Value::Object(obj)
1599 }
1600 DataType::Dictionary(_, value) => {
1601 if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
1602 let symbols: Vec<&str> =
1603 serde_json::from_str(j).map_err(|e| ArrowError::ParseError(e.to_string()))?;
1604 let chosen_name = metadata
1606 .get(AVRO_NAME_METADATA_KEY)
1607 .map(|s| sanitise_avro_name(s))
1608 .unwrap_or_else(|| name_gen.make_unique(field_name));
1609 let mut obj = JsonMap::from_iter([
1610 ("type".into(), json!("enum")),
1611 ("name".into(), json!(chosen_name)),
1612 ("symbols".into(), json!(symbols)),
1613 ]);
1614 if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1615 obj.insert("namespace".into(), json!(ns));
1616 }
1617 Value::Object(obj)
1618 } else {
1619 process_datatype(
1620 value.as_ref(),
1621 field_name,
1622 metadata,
1623 name_gen,
1624 null_order,
1625 false,
1626 strip,
1627 )?
1628 }
1629 }
1630 #[cfg(feature = "avro_custom_types")]
1631 DataType::RunEndEncoded(run_ends, values) => {
1632 let bits = match run_ends.data_type() {
1633 DataType::Int16 => 16,
1634 DataType::Int32 => 32,
1635 DataType::Int64 => 64,
1636 other => {
1637 return Err(ArrowError::SchemaError(format!(
1638 "RunEndEncoded requires Int16/Int32/Int64 for run_ends, found: {other:?}"
1639 )));
1640 }
1641 };
1642 let (value_schema, value_extras) = datatype_to_avro(
1644 values.data_type(),
1645 values.name(),
1646 values.metadata(),
1647 name_gen,
1648 null_order,
1649 strip,
1650 )?;
1651 let mut merged = merge_extras(value_schema, value_extras);
1652 if values.is_nullable() {
1653 merged = wrap_nullable(merged, null_order);
1654 }
1655 let mut extras = JsonMap::new();
1656 extras.insert("logicalType".into(), json!("arrow.run-end-encoded"));
1657 extras.insert("arrow.runEndIndexBits".into(), json!(bits));
1658 return Ok((merged, extras));
1659 }
1660 #[cfg(not(feature = "avro_custom_types"))]
1661 DataType::RunEndEncoded(_run_ends, values) => {
1662 let (value_schema, _extras) = datatype_to_avro(
1663 values.data_type(),
1664 values.name(),
1665 values.metadata(),
1666 name_gen,
1667 null_order,
1668 strip,
1669 )?;
1670 return Ok((value_schema, JsonMap::new()));
1671 }
1672 DataType::Union(fields, mode) => {
1673 let mut branches: Vec<Value> = Vec::with_capacity(fields.len());
1674 let mut type_ids: Vec<i32> = Vec::with_capacity(fields.len());
1675 for (type_id, field_ref) in fields.iter() {
1676 let (branch_schema, _branch_extras) = datatype_to_avro(
1678 field_ref.data_type(),
1679 field_ref.name(),
1680 field_ref.metadata(),
1681 name_gen,
1682 null_order,
1683 strip,
1684 )?;
1685 if matches!(branch_schema, Value::Array(_)) {
1687 return Err(ArrowError::SchemaError(
1688 "Avro union may not immediately contain another union".into(),
1689 ));
1690 }
1691 branches.push(branch_schema);
1692 type_ids.push(type_id as i32);
1693 }
1694 let mut seen: HashSet<String> = HashSet::with_capacity(branches.len());
1695 for b in &branches {
1696 let sig = union_branch_signature(b)?;
1697 if !seen.insert(sig) {
1698 return Err(ArrowError::SchemaError(
1699 "Avro union contains duplicate branch types (disallowed by spec)".into(),
1700 ));
1701 }
1702 }
1703 if !strip {
1704 extras.insert(
1705 "arrowUnionMode".into(),
1706 Value::String(
1707 match mode {
1708 UnionMode::Sparse => "sparse",
1709 UnionMode::Dense => "dense",
1710 }
1711 .to_string(),
1712 ),
1713 );
1714 extras.insert(
1715 "arrowUnionTypeIds".into(),
1716 Value::Array(type_ids.into_iter().map(|id| json!(id)).collect()),
1717 );
1718 }
1719 Value::Array(branches)
1720 }
1721 #[cfg(not(feature = "small_decimals"))]
1722 other => {
1723 return Err(ArrowError::NotYetImplemented(format!(
1724 "Arrow type {other:?} has no Avro representation"
1725 )));
1726 }
1727 };
1728 Ok((val, extras))
1729}
1730
1731fn process_datatype(
1732 dt: &DataType,
1733 field_name: &str,
1734 metadata: &HashMap<String, String>,
1735 name_gen: &mut NameGenerator,
1736 null_order: Nullability,
1737 is_nullable: bool,
1738 strip: bool,
1739) -> Result<Value, ArrowError> {
1740 let (schema, extras) = datatype_to_avro(dt, field_name, metadata, name_gen, null_order, strip)?;
1741 let mut merged = merge_extras(schema, extras);
1742 if is_nullable {
1743 merged = wrap_nullable(merged, null_order)
1744 }
1745 Ok(merged)
1746}
1747
1748fn arrow_field_to_avro(
1749 field: &ArrowField,
1750 name_gen: &mut NameGenerator,
1751 null_order: Nullability,
1752 strip: bool,
1753) -> Result<Value, ArrowError> {
1754 let avro_name = sanitise_avro_name(field.name());
1755 let schema_value = process_datatype(
1756 field.data_type(),
1757 &avro_name,
1758 field.metadata(),
1759 name_gen,
1760 null_order,
1761 field.is_nullable(),
1762 strip,
1763 )?;
1764 let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
1766 map.insert("name".into(), Value::String(avro_name));
1767 map.insert("type".into(), schema_value);
1768 for (meta_key, meta_val) in field.metadata() {
1770 if is_internal_arrow_key(meta_key) {
1771 continue;
1772 }
1773 match meta_key.as_str() {
1774 AVRO_DOC_METADATA_KEY => {
1775 map.insert("doc".into(), Value::String(meta_val.clone()));
1776 }
1777 AVRO_FIELD_DEFAULT_METADATA_KEY => {
1778 let default_value = serde_json::from_str(meta_val)
1779 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1780 map.insert("default".into(), default_value);
1781 }
1782 _ => {
1783 let json_val = serde_json::from_str(meta_val)
1784 .unwrap_or_else(|_| Value::String(meta_val.clone()));
1785 map.insert(meta_key.clone(), json_val);
1786 }
1787 }
1788 }
1789 Ok(Value::Object(map))
1790}
1791
1792#[cfg(test)]
1793mod tests {
1794 use super::*;
1795 use crate::codec::{AvroField, AvroFieldBuilder};
1796 use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
1797 use serde_json::json;
1798 use std::sync::Arc;
1799
1800 fn int_schema() -> Schema<'static> {
1801 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
1802 }
1803
1804 fn record_schema() -> Schema<'static> {
1805 Schema::Complex(ComplexType::Record(Record {
1806 name: "record1",
1807 namespace: Some("test.namespace"),
1808 doc: Some(Cow::from("A test record")),
1809 aliases: vec![],
1810 fields: vec![
1811 Field {
1812 name: "field1",
1813 doc: Some(Cow::from("An integer field")),
1814 r#type: int_schema(),
1815 default: None,
1816 aliases: vec![],
1817 },
1818 Field {
1819 name: "field2",
1820 doc: None,
1821 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
1822 default: None,
1823 aliases: vec![],
1824 },
1825 ],
1826 attributes: Attributes::default(),
1827 }))
1828 }
1829
1830 fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
1831 let mut sb = SchemaBuilder::new();
1832 sb.push(field);
1833 sb.finish()
1834 }
1835
1836 fn assert_json_contains(avro_json: &str, needle: &str) {
1837 assert!(
1838 avro_json.contains(needle),
1839 "JSON did not contain `{needle}` : {avro_json}"
1840 )
1841 }
1842
1843 #[test]
1844 fn test_deserialize() {
1845 let t: Schema = serde_json::from_str("\"string\"").unwrap();
1846 assert_eq!(
1847 t,
1848 Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
1849 );
1850
1851 let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
1852 assert_eq!(
1853 t,
1854 Schema::Union(vec![
1855 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
1856 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1857 ])
1858 );
1859
1860 let t: Type = serde_json::from_str(
1861 r#"{
1862 "type":"long",
1863 "logicalType":"timestamp-micros"
1864 }"#,
1865 )
1866 .unwrap();
1867
1868 let timestamp = Type {
1869 r#type: TypeName::Primitive(PrimitiveType::Long),
1870 attributes: Attributes {
1871 logical_type: Some("timestamp-micros"),
1872 additional: Default::default(),
1873 },
1874 };
1875
1876 assert_eq!(t, timestamp);
1877
1878 let t: ComplexType = serde_json::from_str(
1879 r#"{
1880 "type":"fixed",
1881 "name":"fixed",
1882 "namespace":"topLevelRecord.value",
1883 "size":11,
1884 "logicalType":"decimal",
1885 "precision":25,
1886 "scale":2
1887 }"#,
1888 )
1889 .unwrap();
1890
1891 let decimal = ComplexType::Fixed(Fixed {
1892 name: "fixed",
1893 namespace: Some("topLevelRecord.value"),
1894 aliases: vec![],
1895 size: 11,
1896 attributes: Attributes {
1897 logical_type: Some("decimal"),
1898 additional: vec![("precision", json!(25)), ("scale", json!(2))]
1899 .into_iter()
1900 .collect(),
1901 },
1902 });
1903
1904 assert_eq!(t, decimal);
1905
1906 let schema: Schema = serde_json::from_str(
1907 r#"{
1908 "type":"record",
1909 "name":"topLevelRecord",
1910 "fields":[
1911 {
1912 "name":"value",
1913 "type":[
1914 {
1915 "type":"fixed",
1916 "name":"fixed",
1917 "namespace":"topLevelRecord.value",
1918 "size":11,
1919 "logicalType":"decimal",
1920 "precision":25,
1921 "scale":2
1922 },
1923 "null"
1924 ]
1925 }
1926 ]
1927 }"#,
1928 )
1929 .unwrap();
1930
1931 assert_eq!(
1932 schema,
1933 Schema::Complex(ComplexType::Record(Record {
1934 name: "topLevelRecord",
1935 namespace: None,
1936 doc: None,
1937 aliases: vec![],
1938 fields: vec![Field {
1939 name: "value",
1940 doc: None,
1941 r#type: Schema::Union(vec![
1942 Schema::Complex(decimal),
1943 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1944 ]),
1945 default: None,
1946 aliases: vec![],
1947 },],
1948 attributes: Default::default(),
1949 }))
1950 );
1951
1952 let schema: Schema = serde_json::from_str(
1953 r#"{
1954 "type": "record",
1955 "name": "LongList",
1956 "aliases": ["LinkedLongs"],
1957 "fields" : [
1958 {"name": "value", "type": "long"},
1959 {"name": "next", "type": ["null", "LongList"]}
1960 ]
1961 }"#,
1962 )
1963 .unwrap();
1964
1965 assert_eq!(
1966 schema,
1967 Schema::Complex(ComplexType::Record(Record {
1968 name: "LongList",
1969 namespace: None,
1970 doc: None,
1971 aliases: vec!["LinkedLongs"],
1972 fields: vec![
1973 Field {
1974 name: "value",
1975 doc: None,
1976 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
1977 default: None,
1978 aliases: vec![],
1979 },
1980 Field {
1981 name: "next",
1982 doc: None,
1983 r#type: Schema::Union(vec![
1984 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
1985 Schema::TypeName(TypeName::Ref("LongList")),
1986 ]),
1987 default: None,
1988 aliases: vec![],
1989 }
1990 ],
1991 attributes: Attributes::default(),
1992 }))
1993 );
1994
1995 let err = AvroField::try_from(&schema).unwrap_err().to_string();
1997 assert_eq!(err, "Parser error: Failed to resolve .LongList");
1998
1999 let schema: Schema = serde_json::from_str(
2000 r#"{
2001 "type":"record",
2002 "name":"topLevelRecord",
2003 "fields":[
2004 {
2005 "name":"id",
2006 "type":[
2007 "int",
2008 "null"
2009 ]
2010 },
2011 {
2012 "name":"timestamp_col",
2013 "type":[
2014 {
2015 "type":"long",
2016 "logicalType":"timestamp-micros"
2017 },
2018 "null"
2019 ]
2020 }
2021 ]
2022 }"#,
2023 )
2024 .unwrap();
2025
2026 assert_eq!(
2027 schema,
2028 Schema::Complex(ComplexType::Record(Record {
2029 name: "topLevelRecord",
2030 namespace: None,
2031 doc: None,
2032 aliases: vec![],
2033 fields: vec![
2034 Field {
2035 name: "id",
2036 doc: None,
2037 r#type: Schema::Union(vec![
2038 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2039 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2040 ]),
2041 default: None,
2042 aliases: vec![],
2043 },
2044 Field {
2045 name: "timestamp_col",
2046 doc: None,
2047 r#type: Schema::Union(vec![
2048 Schema::Type(timestamp),
2049 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2050 ]),
2051 default: None,
2052 aliases: vec![],
2053 }
2054 ],
2055 attributes: Default::default(),
2056 }))
2057 );
2058 let codec = AvroField::try_from(&schema).unwrap();
2059 let expected_arrow_field = arrow_schema::Field::new(
2060 "topLevelRecord",
2061 DataType::Struct(Fields::from(vec![
2062 arrow_schema::Field::new("id", DataType::Int32, true),
2063 arrow_schema::Field::new(
2064 "timestamp_col",
2065 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2066 true,
2067 ),
2068 ])),
2069 false,
2070 )
2071 .with_metadata(std::collections::HashMap::from([(
2072 AVRO_NAME_METADATA_KEY.to_string(),
2073 "topLevelRecord".to_string(),
2074 )]));
2075
2076 assert_eq!(codec.field(), expected_arrow_field);
2077
2078 let schema: Schema = serde_json::from_str(
2079 r#"{
2080 "type": "record",
2081 "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
2082 "fields": [
2083 {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
2084 {"name": "clientProtocol", "type": ["null", "string"]},
2085 {"name": "serverHash", "type": "MD5"},
2086 {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
2087 ]
2088 }"#,
2089 )
2090 .unwrap();
2091
2092 assert_eq!(
2093 schema,
2094 Schema::Complex(ComplexType::Record(Record {
2095 name: "HandshakeRequest",
2096 namespace: Some("org.apache.avro.ipc"),
2097 doc: None,
2098 aliases: vec![],
2099 fields: vec![
2100 Field {
2101 name: "clientHash",
2102 doc: None,
2103 r#type: Schema::Complex(ComplexType::Fixed(Fixed {
2104 name: "MD5",
2105 namespace: None,
2106 aliases: vec![],
2107 size: 16,
2108 attributes: Default::default(),
2109 })),
2110 default: None,
2111 aliases: vec![],
2112 },
2113 Field {
2114 name: "clientProtocol",
2115 doc: None,
2116 r#type: Schema::Union(vec![
2117 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2118 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2119 ]),
2120 default: None,
2121 aliases: vec![],
2122 },
2123 Field {
2124 name: "serverHash",
2125 doc: None,
2126 r#type: Schema::TypeName(TypeName::Ref("MD5")),
2127 default: None,
2128 aliases: vec![],
2129 },
2130 Field {
2131 name: "meta",
2132 doc: None,
2133 r#type: Schema::Union(vec![
2134 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2135 Schema::Complex(ComplexType::Map(Map {
2136 values: Box::new(Schema::TypeName(TypeName::Primitive(
2137 PrimitiveType::Bytes
2138 ))),
2139 attributes: Default::default(),
2140 })),
2141 ]),
2142 default: None,
2143 aliases: vec![],
2144 }
2145 ],
2146 attributes: Default::default(),
2147 }))
2148 );
2149 }
2150
2151 #[test]
2152 fn test_canonical_form_generation_comprehensive_record() {
2153 let json_str = r#"{
2155 "type": "record",
2156 "name": "E2eComprehensive",
2157 "namespace": "org.apache.arrow.avrotests.v1",
2158 "doc": "Comprehensive Avro writer schema to exercise arrow-avro Reader/Decoder paths.",
2159 "fields": [
2160 {"name": "id", "type": "long", "doc": "Primary row id", "aliases": ["identifier"]},
2161 {"name": "flag", "type": "boolean", "default": true, "doc": "A sample boolean with default true"},
2162 {"name": "ratio_f32", "type": "float", "default": 0.0, "doc": "Float32 example"},
2163 {"name": "ratio_f64", "type": "double", "default": 0.0, "doc": "Float64 example"},
2164 {"name": "count_i32", "type": "int", "default": 0, "doc": "Int32 example"},
2165 {"name": "count_i64", "type": "long", "default": 0, "doc": "Int64 example"},
2166 {"name": "opt_i32_nullfirst", "type": ["null", "int"], "default": null, "doc": "Nullable int (null-first)"},
2167 {"name": "opt_str_nullsecond", "type": ["string", "null"], "default": "", "aliases": ["old_opt_str"], "doc": "Nullable string (null-second). Default is empty string."},
2168 {"name": "tri_union_prim", "type": ["int", "string", "boolean"], "default": 0, "doc": "Union[int, string, boolean] with default on first branch (int=0)."},
2169 {"name": "str_utf8", "type": "string", "default": "default", "doc": "Plain Utf8 string (Reader may use Utf8View)."},
2170 {"name": "raw_bytes", "type": "bytes", "default": "", "doc": "Raw bytes field"},
2171 {"name": "fx16_plain", "type": {"type": "fixed", "name": "Fx16", "namespace": "org.apache.arrow.avrotests.v1.types", "aliases": ["Fixed16Old"], "size": 16}, "doc": "Plain fixed(16)"},
2172 {"name": "dec_bytes_s10_2", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}, "doc": "Decimal encoded on bytes, precision 10, scale 2"},
2173 {"name": "dec_fix_s20_4", "type": {"type": "fixed", "name": "DecFix20", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 20, "logicalType": "decimal", "precision": 20, "scale": 4}, "doc": "Decimal encoded on fixed(20), precision 20, scale 4"},
2174 {"name": "uuid_str", "type": {"type": "string", "logicalType": "uuid"}, "doc": "UUID logical type on string"},
2175 {"name": "d_date", "type": {"type": "int", "logicalType": "date"}, "doc": "Date32: days since 1970-01-01"},
2176 {"name": "t_millis", "type": {"type": "int", "logicalType": "time-millis"}, "doc": "Time32-millis"},
2177 {"name": "t_micros", "type": {"type": "long", "logicalType": "time-micros"}, "doc": "Time64-micros"},
2178 {"name": "ts_millis_utc", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Timestamp ms (UTC)"},
2179 {"name": "ts_micros_utc", "type": {"type": "long", "logicalType": "timestamp-micros"}, "doc": "Timestamp µs (UTC)"},
2180 {"name": "ts_millis_local", "type": {"type": "long", "logicalType": "local-timestamp-millis"}, "doc": "Local timestamp ms"},
2181 {"name": "ts_micros_local", "type": {"type": "long", "logicalType": "local-timestamp-micros"}, "doc": "Local timestamp µs"},
2182 {"name": "interval_mdn", "type": {"type": "fixed", "name": "Dur12", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 12, "logicalType": "duration"}, "doc": "Duration: fixed(12) little-endian (months, days, millis)"},
2183 {"name": "status", "type": {"type": "enum", "name": "Status", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["UNKNOWN", "NEW", "PROCESSING", "DONE"], "aliases": ["State"], "doc": "Processing status enum with default"}, "default": "UNKNOWN", "doc": "Enum field using default when resolving"},
2184 {"name": "arr_union", "type": {"type": "array", "items": ["long", "string", "null"]}, "default": [], "doc": "Array whose items are a union[long,string,null]"},
2185 {"name": "map_union", "type": {"type": "map", "values": ["null", "double", "string"]}, "default": {}, "doc": "Map whose values are a union[null,double,string]"},
2186 {"name": "address", "type": {"type": "record", "name": "Address", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Postal address with defaults and field alias", "fields": [
2187 {"name": "street", "type": "string", "default": "", "aliases": ["street_name"], "doc": "Street (field alias = street_name)"},
2188 {"name": "zip", "type": "int", "default": 0, "doc": "ZIP/postal code"},
2189 {"name": "country", "type": "string", "default": "US", "doc": "Country code"}
2190 ]}, "doc": "Embedded Address record"},
2191 {"name": "maybe_auth", "type": {"type": "record", "name": "MaybeAuth", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Optional auth token model", "fields": [
2192 {"name": "user", "type": "string", "doc": "Username"},
2193 {"name": "token", "type": ["null", "bytes"], "default": null, "doc": "Nullable auth token"}
2194 ]}},
2195 {"name": "union_enum_record_array_map", "type": [
2196 {"type": "enum", "name": "Color", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["RED", "GREEN", "BLUE"], "doc": "Color enum"},
2197 {"type": "record", "name": "RecA", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]},
2198 {"type": "record", "name": "RecB", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "x", "type": "long"}, {"name": "y", "type": "bytes"}]},
2199 {"type": "array", "items": "long"},
2200 {"type": "map", "values": "string"}
2201 ], "doc": "Union of enum, two records, array, and map"},
2202 {"name": "union_date_or_fixed4", "type": [
2203 {"type": "int", "logicalType": "date"},
2204 {"type": "fixed", "name": "Fx4", "size": 4}
2205 ], "doc": "Union of date(int) or fixed(4)"},
2206 {"name": "union_interval_or_string", "type": [
2207 {"type": "fixed", "name": "Dur12U", "size": 12, "logicalType": "duration"},
2208 "string"
2209 ], "doc": "Union of duration(fixed12) or string"},
2210 {"name": "union_uuid_or_fixed10", "type": [
2211 {"type": "string", "logicalType": "uuid"},
2212 {"type": "fixed", "name": "Fx10", "size": 10}
2213 ], "doc": "Union of UUID string or fixed(10)"},
2214 {"name": "array_records_with_union", "type": {"type": "array", "items": {
2215 "type": "record", "name": "KV", "namespace": "org.apache.arrow.avrotests.v1.types",
2216 "fields": [
2217 {"name": "key", "type": "string"},
2218 {"name": "val", "type": ["null", "int", "long"], "default": null}
2219 ]
2220 }}, "doc": "Array<record{key, val: union[null,int,long]}>", "default": []},
2221 {"name": "union_map_or_array_int", "type": [
2222 {"type": "map", "values": "int"},
2223 {"type": "array", "items": "int"}
2224 ], "doc": "Union[map<string,int>, array<int>]"},
2225 {"name": "renamed_with_default", "type": "int", "default": 42, "aliases": ["old_count"], "doc": "Field with alias and default"},
2226 {"name": "person", "type": {"type": "record", "name": "PersonV2", "namespace": "com.example.v2", "aliases": ["com.example.Person"], "doc": "Person record with alias pointing to previous namespace/name", "fields": [
2227 {"name": "name", "type": "string"},
2228 {"name": "age", "type": "int", "default": 0}
2229 ]}, "doc": "Record using type alias for schema evolution tests"}
2230 ]
2231 }"#;
2232 let avro = AvroSchema::new(json_str.to_string());
2233 let parsed = avro.schema().expect("schema should deserialize");
2234 let expected_canonical_form = r#"{"name":"org.apache.arrow.avrotests.v1.E2eComprehensive","type":"record","fields":[{"name":"id","type":"long"},{"name":"flag","type":"boolean"},{"name":"ratio_f32","type":"float"},{"name":"ratio_f64","type":"double"},{"name":"count_i32","type":"int"},{"name":"count_i64","type":"long"},{"name":"opt_i32_nullfirst","type":["null","int"]},{"name":"opt_str_nullsecond","type":["string","null"]},{"name":"tri_union_prim","type":["int","string","boolean"]},{"name":"str_utf8","type":"string"},{"name":"raw_bytes","type":"bytes"},{"name":"fx16_plain","type":{"name":"org.apache.arrow.avrotests.v1.types.Fx16","type":"fixed","size":16}},{"name":"dec_bytes_s10_2","type":"bytes"},{"name":"dec_fix_s20_4","type":{"name":"org.apache.arrow.avrotests.v1.types.DecFix20","type":"fixed","size":20}},{"name":"uuid_str","type":"string"},{"name":"d_date","type":"int"},{"name":"t_millis","type":"int"},{"name":"t_micros","type":"long"},{"name":"ts_millis_utc","type":"long"},{"name":"ts_micros_utc","type":"long"},{"name":"ts_millis_local","type":"long"},{"name":"ts_micros_local","type":"long"},{"name":"interval_mdn","type":{"name":"org.apache.arrow.avrotests.v1.types.Dur12","type":"fixed","size":12}},{"name":"status","type":{"name":"org.apache.arrow.avrotests.v1.types.Status","type":"enum","symbols":["UNKNOWN","NEW","PROCESSING","DONE"]}},{"name":"arr_union","type":{"type":"array","items":["long","string","null"]}},{"name":"map_union","type":{"type":"map","values":["null","double","string"]}},{"name":"address","type":{"name":"org.apache.arrow.avrotests.v1.types.Address","type":"record","fields":[{"name":"street","type":"string"},{"name":"zip","type":"int"},{"name":"country","type":"string"}]}},{"name":"maybe_auth","type":{"name":"org.apache.arrow.avrotests.v1.types.MaybeAuth","type":"record","fields":[{"name":"user","type":"string"},{"name":"token","type":["null","bytes"]}]}},{"name":"union_enum_record_array_map","type":[{"name":"org.apache.arrow.avrotests.v1.types.Color","type":"enum","symbols":["RED","GREEN","BLUE"]},{"name":"org.apache.arrow.avrotests.v1.types.RecA","type":"record","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]},{"name":"org.apache.arrow.avrotests.v1.types.RecB","type":"record","fields":[{"name":"x","type":"long"},{"name":"y","type":"bytes"}]},{"type":"array","items":"long"},{"type":"map","values":"string"}]},{"name":"union_date_or_fixed4","type":["int",{"name":"org.apache.arrow.avrotests.v1.Fx4","type":"fixed","size":4}]},{"name":"union_interval_or_string","type":[{"name":"org.apache.arrow.avrotests.v1.Dur12U","type":"fixed","size":12},"string"]},{"name":"union_uuid_or_fixed10","type":["string",{"name":"org.apache.arrow.avrotests.v1.Fx10","type":"fixed","size":10}]},{"name":"array_records_with_union","type":{"type":"array","items":{"name":"org.apache.arrow.avrotests.v1.types.KV","type":"record","fields":[{"name":"key","type":"string"},{"name":"val","type":["null","int","long"]}]}}},{"name":"union_map_or_array_int","type":[{"type":"map","values":"int"},{"type":"array","items":"int"}]},{"name":"renamed_with_default","type":"int"},{"name":"person","type":{"name":"com.example.v2.PersonV2","type":"record","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}}]}"#;
2235 let canonical_form =
2236 AvroSchema::generate_canonical_form(&parsed).expect("canonical form should be built");
2237 assert_eq!(
2238 canonical_form, expected_canonical_form,
2239 "Canonical form must match Avro spec PCF exactly"
2240 );
2241 }
2242
2243 #[test]
2244 fn test_new_schema_store() {
2245 let store = SchemaStore::new();
2246 assert!(store.schemas.is_empty());
2247 }
2248
2249 #[test]
2250 fn test_try_from_schemas_rabin() {
2251 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2252 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2253 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2254 schemas.insert(
2255 int_avro_schema
2256 .fingerprint(FingerprintAlgorithm::Rabin)
2257 .unwrap(),
2258 int_avro_schema.clone(),
2259 );
2260 schemas.insert(
2261 record_avro_schema
2262 .fingerprint(FingerprintAlgorithm::Rabin)
2263 .unwrap(),
2264 record_avro_schema.clone(),
2265 );
2266 let store = SchemaStore::try_from(schemas).unwrap();
2267 let int_fp = int_avro_schema
2268 .fingerprint(FingerprintAlgorithm::Rabin)
2269 .unwrap();
2270 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2271 let rec_fp = record_avro_schema
2272 .fingerprint(FingerprintAlgorithm::Rabin)
2273 .unwrap();
2274 assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
2275 }
2276
2277 #[test]
2278 fn test_try_from_with_duplicates() {
2279 let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2280 let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2281 let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
2282 schemas.insert(
2283 int_avro_schema
2284 .fingerprint(FingerprintAlgorithm::Rabin)
2285 .unwrap(),
2286 int_avro_schema.clone(),
2287 );
2288 schemas.insert(
2289 record_avro_schema
2290 .fingerprint(FingerprintAlgorithm::Rabin)
2291 .unwrap(),
2292 record_avro_schema.clone(),
2293 );
2294 schemas.insert(
2296 int_avro_schema
2297 .fingerprint(FingerprintAlgorithm::Rabin)
2298 .unwrap(),
2299 int_avro_schema.clone(),
2300 );
2301 let store = SchemaStore::try_from(schemas).unwrap();
2302 assert_eq!(store.schemas.len(), 2);
2303 let int_fp = int_avro_schema
2304 .fingerprint(FingerprintAlgorithm::Rabin)
2305 .unwrap();
2306 assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
2307 }
2308
2309 #[test]
2310 fn test_register_and_lookup_rabin() {
2311 let mut store = SchemaStore::new();
2312 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2313 let fp_enum = store.register(schema.clone()).unwrap();
2314 match fp_enum {
2315 Fingerprint::Rabin(fp_val) => {
2316 assert_eq!(
2317 store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
2318 Some(schema.clone())
2319 );
2320 assert!(
2321 store
2322 .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
2323 .is_none()
2324 );
2325 }
2326 Fingerprint::Id(_id) => {
2327 unreachable!("This test should only generate Rabin fingerprints")
2328 }
2329 Fingerprint::Id64(_id) => {
2330 unreachable!("This test should only generate Rabin fingerprints")
2331 }
2332 #[cfg(feature = "md5")]
2333 Fingerprint::MD5(_id) => {
2334 unreachable!("This test should only generate Rabin fingerprints")
2335 }
2336 #[cfg(feature = "sha256")]
2337 Fingerprint::SHA256(_id) => {
2338 unreachable!("This test should only generate Rabin fingerprints")
2339 }
2340 }
2341 }
2342
2343 #[test]
2344 fn test_set_and_lookup_id() {
2345 let mut store = SchemaStore::new();
2346 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2347 let id = 42u32;
2348 let fp = Fingerprint::Id(id);
2349 let out_fp = store.set(fp, schema.clone()).unwrap();
2350 assert_eq!(out_fp, fp);
2351 assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone()));
2352 assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
2353 }
2354
2355 #[test]
2356 fn test_set_and_lookup_id64() {
2357 let mut store = SchemaStore::new();
2358 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2359 let id64: u64 = 0xDEAD_BEEF_DEAD_BEEF;
2360 let fp = Fingerprint::Id64(id64);
2361 let out_fp = store.set(fp, schema.clone()).unwrap();
2362 assert_eq!(out_fp, fp, "set should return the same Id64 fingerprint");
2363 assert_eq!(
2364 store.lookup(&fp).cloned(),
2365 Some(schema.clone()),
2366 "lookup should find the schema by Id64"
2367 );
2368 assert!(
2369 store
2370 .lookup(&Fingerprint::Id64(id64.wrapping_add(1)))
2371 .is_none(),
2372 "lookup with a different Id64 must return None"
2373 );
2374 }
2375
2376 #[test]
2377 fn test_fingerprint_id64_conversions() {
2378 let algo_from_fp = FingerprintAlgorithm::from(&Fingerprint::Id64(123));
2379 assert_eq!(algo_from_fp, FingerprintAlgorithm::Id64);
2380 let fp_from_algo = Fingerprint::from(FingerprintAlgorithm::Id64);
2381 assert!(matches!(fp_from_algo, Fingerprint::Id64(0)));
2382 let strategy_from_fp = FingerprintStrategy::from(Fingerprint::Id64(5));
2383 assert!(matches!(strategy_from_fp, FingerprintStrategy::Id64(0)));
2384 let algo_from_strategy = FingerprintAlgorithm::from(strategy_from_fp);
2385 assert_eq!(algo_from_strategy, FingerprintAlgorithm::Id64);
2386 }
2387
2388 #[test]
2389 fn test_register_duplicate_schema() {
2390 let mut store = SchemaStore::new();
2391 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2392 let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2393 let fingerprint1 = store.register(schema1).unwrap();
2394 let fingerprint2 = store.register(schema2).unwrap();
2395 assert_eq!(fingerprint1, fingerprint2);
2396 assert_eq!(store.schemas.len(), 1);
2397 }
2398
2399 #[test]
2400 fn test_set_and_lookup_with_provided_fingerprint() {
2401 let mut store = SchemaStore::new();
2402 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2403 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2404 let out_fp = store.set(fp, schema.clone()).unwrap();
2405 assert_eq!(out_fp, fp);
2406 assert_eq!(store.lookup(&fp).cloned(), Some(schema));
2407 }
2408
2409 #[test]
2410 fn test_set_duplicate_same_schema_ok() {
2411 let mut store = SchemaStore::new();
2412 let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2413 let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
2414 let _ = store.set(fp, schema.clone()).unwrap();
2415 let _ = store.set(fp, schema.clone()).unwrap();
2416 assert_eq!(store.schemas.len(), 1);
2417 }
2418
2419 #[test]
2420 fn test_set_duplicate_different_schema_collision_error() {
2421 let mut store = SchemaStore::new();
2422 let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
2423 let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2424 let fp = Fingerprint::Id(123);
2426 let _ = store.set(fp, schema1).unwrap();
2427 let err = store.set(fp, schema2).unwrap_err();
2428 let msg = format!("{err}");
2429 assert!(msg.contains("Schema fingerprint collision"));
2430 }
2431
2432 #[test]
2433 fn test_canonical_form_generation_primitive() {
2434 let schema = int_schema();
2435 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2436 assert_eq!(canonical_form, r#""int""#);
2437 }
2438
2439 #[test]
2440 fn test_canonical_form_generation_record() {
2441 let schema = record_schema();
2442 let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2443 let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap();
2444 assert_eq!(canonical_form, expected_canonical_form);
2445 }
2446
2447 #[test]
2448 fn test_fingerprint_calculation() {
2449 let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
2450 let expected_fingerprint = 10505236152925314060;
2451 let fingerprint = compute_fingerprint_rabin(canonical_form);
2452 assert_eq!(fingerprint, expected_fingerprint);
2453 }
2454
2455 #[test]
2456 fn test_register_and_lookup_complex_schema() {
2457 let mut store = SchemaStore::new();
2458 let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
2459 let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
2460 let expected_fingerprint = Fingerprint::Rabin(compute_fingerprint_rabin(canonical_form));
2461 let fingerprint = store.register(schema.clone()).unwrap();
2462 assert_eq!(fingerprint, expected_fingerprint);
2463 let looked_up = store.lookup(&fingerprint).cloned();
2464 assert_eq!(looked_up, Some(schema));
2465 }
2466
2467 #[test]
2468 fn test_fingerprints_returns_all_keys() {
2469 let mut store = SchemaStore::new();
2470 let fp_int = store
2471 .register(AvroSchema::new(
2472 serde_json::to_string(&int_schema()).unwrap(),
2473 ))
2474 .unwrap();
2475 let fp_record = store
2476 .register(AvroSchema::new(
2477 serde_json::to_string(&record_schema()).unwrap(),
2478 ))
2479 .unwrap();
2480 let fps = store.fingerprints();
2481 assert_eq!(fps.len(), 2);
2482 assert!(fps.contains(&fp_int));
2483 assert!(fps.contains(&fp_record));
2484 }
2485
2486 #[test]
2487 fn test_canonical_form_strips_attributes() {
2488 let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
2489 name: "record_with_attrs",
2490 namespace: None,
2491 doc: Some(Cow::from("This doc should be stripped")),
2492 aliases: vec!["alias1", "alias2"],
2493 fields: vec![Field {
2494 name: "f1",
2495 doc: Some(Cow::from("field doc")),
2496 r#type: Schema::Type(Type {
2497 r#type: TypeName::Primitive(PrimitiveType::Bytes),
2498 attributes: Attributes {
2499 logical_type: None,
2500 additional: HashMap::from([("precision", json!(4))]),
2501 },
2502 }),
2503 default: None,
2504 aliases: vec![],
2505 }],
2506 attributes: Attributes {
2507 logical_type: None,
2508 additional: HashMap::from([("custom_attr", json!("value"))]),
2509 },
2510 }));
2511 let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
2512 let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap();
2513 assert_eq!(canonical_form, expected_canonical_form);
2514 }
2515
2516 #[test]
2517 fn test_primitive_mappings() {
2518 let cases = vec![
2519 (DataType::Boolean, "\"boolean\""),
2520 (DataType::Int8, "\"int\""),
2521 (DataType::Int16, "\"int\""),
2522 (DataType::Int32, "\"int\""),
2523 (DataType::Int64, "\"long\""),
2524 (DataType::UInt8, "\"int\""),
2525 (DataType::UInt16, "\"int\""),
2526 (DataType::UInt32, "\"long\""),
2527 (DataType::UInt64, "\"long\""),
2528 (DataType::Float16, "\"float\""),
2529 (DataType::Float32, "\"float\""),
2530 (DataType::Float64, "\"double\""),
2531 (DataType::Utf8, "\"string\""),
2532 (DataType::Binary, "\"bytes\""),
2533 ];
2534 for (dt, avro_token) in cases {
2535 let field = ArrowField::new("col", dt.clone(), false);
2536 let arrow_schema = single_field_schema(field);
2537 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2538 assert_json_contains(&avro.json_string, avro_token);
2539 }
2540 }
2541
2542 #[test]
2543 fn test_temporal_mappings() {
2544 let cases = vec![
2545 (DataType::Date32, "\"logicalType\":\"date\""),
2546 (
2547 DataType::Time32(TimeUnit::Millisecond),
2548 "\"logicalType\":\"time-millis\"",
2549 ),
2550 (
2551 DataType::Time64(TimeUnit::Microsecond),
2552 "\"logicalType\":\"time-micros\"",
2553 ),
2554 (
2555 DataType::Timestamp(TimeUnit::Millisecond, None),
2556 "\"logicalType\":\"local-timestamp-millis\"",
2557 ),
2558 (
2559 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2560 "\"logicalType\":\"timestamp-micros\"",
2561 ),
2562 ];
2563 for (dt, needle) in cases {
2564 let field = ArrowField::new("ts", dt.clone(), true);
2565 let arrow_schema = single_field_schema(field);
2566 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2567 assert_json_contains(&avro.json_string, needle);
2568 }
2569 }
2570
2571 #[test]
2572 fn test_decimal_and_uuid() {
2573 let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 2), false);
2574 let dec_schema = single_field_schema(decimal_field);
2575 let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
2576 assert_json_contains(&avro_dec.json_string, "\"logicalType\":\"decimal\"");
2577 assert_json_contains(&avro_dec.json_string, "\"precision\":25");
2578 assert_json_contains(&avro_dec.json_string, "\"scale\":2");
2579 let mut md = HashMap::new();
2580 md.insert("logicalType".into(), "uuid".into());
2581 let uuid_field =
2582 ArrowField::new("id", DataType::FixedSizeBinary(16), false).with_metadata(md);
2583 let uuid_schema = single_field_schema(uuid_field);
2584 let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
2585 assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
2586 }
2587
2588 #[cfg(feature = "avro_custom_types")]
2589 #[test]
2590 fn test_interval_duration() {
2591 let interval_field = ArrowField::new(
2592 "span",
2593 DataType::Interval(IntervalUnit::MonthDayNano),
2594 false,
2595 );
2596 let s = single_field_schema(interval_field);
2597 let avro = AvroSchema::try_from(&s).unwrap();
2598 assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
2599 assert_json_contains(&avro.json_string, "\"size\":12");
2600 let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
2601 let s2 = single_field_schema(dur_field);
2602 let avro2 = AvroSchema::try_from(&s2).unwrap();
2603 assert_json_contains(
2604 &avro2.json_string,
2605 "\"logicalType\":\"arrow.duration-nanos\"",
2606 );
2607 }
2608
2609 #[test]
2610 fn test_complex_types() {
2611 let list_dt = DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true)));
2612 let list_schema = single_field_schema(ArrowField::new("numbers", list_dt, false));
2613 let avro_list = AvroSchema::try_from(&list_schema).unwrap();
2614 assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
2615 assert_json_contains(&avro_list.json_string, "\"items\"");
2616 let value_field = ArrowField::new("value", DataType::Boolean, true);
2617 let entries_struct = ArrowField::new(
2618 "entries",
2619 DataType::Struct(Fields::from(vec![
2620 ArrowField::new("key", DataType::Utf8, false),
2621 value_field.clone(),
2622 ])),
2623 false,
2624 );
2625 let map_dt = DataType::Map(Arc::new(entries_struct), false);
2626 let map_schema = single_field_schema(ArrowField::new("props", map_dt, false));
2627 let avro_map = AvroSchema::try_from(&map_schema).unwrap();
2628 assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
2629 assert_json_contains(&avro_map.json_string, "\"values\"");
2630 let struct_dt = DataType::Struct(Fields::from(vec![
2631 ArrowField::new("f1", DataType::Int64, false),
2632 ArrowField::new("f2", DataType::Utf8, true),
2633 ]));
2634 let struct_schema = single_field_schema(ArrowField::new("person", struct_dt, true));
2635 let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
2636 assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
2637 assert_json_contains(&avro_struct.json_string, "\"null\"");
2638 }
2639
2640 #[test]
2641 fn test_enum_dictionary() {
2642 let mut md = HashMap::new();
2643 md.insert(
2644 AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
2645 "[\"OPEN\",\"CLOSED\"]".into(),
2646 );
2647 let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2648 let field = ArrowField::new("status", enum_dt, false).with_metadata(md);
2649 let schema = single_field_schema(field);
2650 let avro = AvroSchema::try_from(&schema).unwrap();
2651 assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
2652 assert_json_contains(&avro.json_string, "\"symbols\":[\"OPEN\",\"CLOSED\"]");
2653 }
2654
2655 #[test]
2656 fn test_run_end_encoded() {
2657 let ree_dt = DataType::RunEndEncoded(
2658 Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
2659 Arc::new(ArrowField::new("values", DataType::Utf8, false)),
2660 );
2661 let s = single_field_schema(ArrowField::new("text", ree_dt, false));
2662 let avro = AvroSchema::try_from(&s).unwrap();
2663 assert_json_contains(&avro.json_string, "\"string\"");
2664 }
2665
2666 #[test]
2667 fn test_dense_union() {
2668 let uf: UnionFields = vec![
2669 (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
2670 (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
2671 ]
2672 .into_iter()
2673 .collect();
2674 let union_dt = DataType::Union(uf, UnionMode::Dense);
2675 let s = single_field_schema(ArrowField::new("u", union_dt, false));
2676 let avro =
2677 AvroSchema::try_from(&s).expect("Arrow Union -> Avro union conversion should succeed");
2678 let v: serde_json::Value = serde_json::from_str(&avro.json_string).unwrap();
2679 let fields = v
2680 .get("fields")
2681 .and_then(|x| x.as_array())
2682 .expect("fields array");
2683 let u_field = fields
2684 .iter()
2685 .find(|f| f.get("name").and_then(|n| n.as_str()) == Some("u"))
2686 .expect("field 'u'");
2687 let union = u_field.get("type").expect("u.type");
2688 let arr = union.as_array().expect("u.type must be Avro union array");
2689 assert_eq!(arr.len(), 2, "expected two union branches");
2690 let first = &arr[0];
2691 let obj = first
2692 .as_object()
2693 .expect("first branch should be an object with metadata");
2694 assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
2695 assert_eq!(
2696 obj.get("arrowUnionMode").and_then(|m| m.as_str()),
2697 Some("dense")
2698 );
2699 let type_ids: Vec<i64> = obj
2700 .get("arrowUnionTypeIds")
2701 .and_then(|a| a.as_array())
2702 .expect("arrowUnionTypeIds array")
2703 .iter()
2704 .map(|n| n.as_i64().expect("i64"))
2705 .collect();
2706 assert_eq!(type_ids, vec![2, 7], "type id ordering should be preserved");
2707 assert_eq!(arr[1], Value::String("string".into()));
2708 }
2709
2710 #[test]
2711 fn round_trip_primitive() {
2712 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", DataType::Int32, false)]);
2713 let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
2714 let decoded = avro_schema.schema().unwrap();
2715 assert!(matches!(decoded, Schema::Complex(_)));
2716 }
2717
2718 #[test]
2719 fn test_name_generator_sanitization_and_uniqueness() {
2720 let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), false);
2721 let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), false);
2722 let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), false);
2723 let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
2724 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2725 assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
2726 assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
2727 assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
2728 }
2729
2730 #[test]
2731 fn test_date64_logical_type_mapping() {
2732 let field = ArrowField::new("d", DataType::Date64, true);
2733 let schema = single_field_schema(field);
2734 let avro = AvroSchema::try_from(&schema).unwrap();
2735 assert_json_contains(
2736 &avro.json_string,
2737 "\"logicalType\":\"local-timestamp-millis\"",
2738 );
2739 }
2740
2741 #[cfg(feature = "avro_custom_types")]
2742 #[test]
2743 fn test_duration_list_extras_propagated() {
2744 let child = ArrowField::new("lat", DataType::Duration(TimeUnit::Microsecond), false);
2745 let list_dt = DataType::List(Arc::new(child));
2746 let arrow_schema = single_field_schema(ArrowField::new("durations", list_dt, false));
2747 let avro = AvroSchema::try_from(&arrow_schema).unwrap();
2748 assert_json_contains(
2749 &avro.json_string,
2750 "\"logicalType\":\"arrow.duration-micros\"",
2751 );
2752 }
2753
2754 #[test]
2755 fn test_interval_yearmonth_extra() {
2756 let field = ArrowField::new("iv", DataType::Interval(IntervalUnit::YearMonth), false);
2757 let schema = single_field_schema(field);
2758 let avro = AvroSchema::try_from(&schema).unwrap();
2759 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"yearmonth\"");
2760 }
2761
2762 #[test]
2763 fn test_interval_daytime_extra() {
2764 let field = ArrowField::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false);
2765 let schema = single_field_schema(field);
2766 let avro = AvroSchema::try_from(&schema).unwrap();
2767 assert_json_contains(&avro.json_string, "\"arrowIntervalUnit\":\"daytime\"");
2768 }
2769
2770 #[test]
2771 fn test_fixed_size_list_extra() {
2772 let child = ArrowField::new("item", DataType::Int32, false);
2773 let dt = DataType::FixedSizeList(Arc::new(child), 3);
2774 let schema = single_field_schema(ArrowField::new("triples", dt, false));
2775 let avro = AvroSchema::try_from(&schema).unwrap();
2776 assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
2777 }
2778
2779 #[cfg(feature = "avro_custom_types")]
2780 #[test]
2781 fn test_map_duration_value_extra() {
2782 let val_field = ArrowField::new("value", DataType::Duration(TimeUnit::Second), true);
2783 let entries_struct = ArrowField::new(
2784 "entries",
2785 DataType::Struct(Fields::from(vec![
2786 ArrowField::new("key", DataType::Utf8, false),
2787 val_field,
2788 ])),
2789 false,
2790 );
2791 let map_dt = DataType::Map(Arc::new(entries_struct), false);
2792 let schema = single_field_schema(ArrowField::new("metrics", map_dt, false));
2793 let avro = AvroSchema::try_from(&schema).unwrap();
2794 assert_json_contains(
2795 &avro.json_string,
2796 "\"logicalType\":\"arrow.duration-seconds\"",
2797 );
2798 }
2799
2800 #[test]
2801 fn test_schema_with_non_string_defaults_decodes_successfully() {
2802 let schema_json = r#"{
2803 "type": "record",
2804 "name": "R",
2805 "fields": [
2806 {"name": "a", "type": "int", "default": 0},
2807 {"name": "b", "type": {"type": "array", "items": "long"}, "default": [1, 2, 3]},
2808 {"name": "c", "type": {"type": "map", "values": "double"}, "default": {"x": 1.5, "y": 2.5}},
2809 {"name": "inner", "type": {"type": "record", "name": "Inner", "fields": [
2810 {"name": "flag", "type": "boolean", "default": true},
2811 {"name": "name", "type": "string", "default": "hi"}
2812 ]}, "default": {"flag": false, "name": "d"}},
2813 {"name": "u", "type": ["int", "null"], "default": 42}
2814 ]
2815 }"#;
2816 let schema: Schema = serde_json::from_str(schema_json).expect("schema should parse");
2817 match &schema {
2818 Schema::Complex(ComplexType::Record(_)) => {}
2819 other => panic!("expected record schema, got: {:?}", other),
2820 }
2821 let field = crate::codec::AvroField::try_from(&schema)
2823 .expect("Avro->Arrow conversion should succeed");
2824 let arrow_field = field.field();
2825 let expected_list_item = ArrowField::new(
2827 arrow_schema::Field::LIST_FIELD_DEFAULT_NAME,
2828 DataType::Int64,
2829 false,
2830 );
2831 let expected_b = ArrowField::new("b", DataType::List(Arc::new(expected_list_item)), false);
2832
2833 let expected_map_value = ArrowField::new("value", DataType::Float64, false);
2834 let expected_entries = ArrowField::new(
2835 "entries",
2836 DataType::Struct(Fields::from(vec![
2837 ArrowField::new("key", DataType::Utf8, false),
2838 expected_map_value,
2839 ])),
2840 false,
2841 );
2842 let expected_c =
2843 ArrowField::new("c", DataType::Map(Arc::new(expected_entries), false), false);
2844 let mut inner_md = std::collections::HashMap::new();
2845 inner_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "Inner".to_string());
2846 let expected_inner = ArrowField::new(
2847 "inner",
2848 DataType::Struct(Fields::from(vec![
2849 ArrowField::new("flag", DataType::Boolean, false),
2850 ArrowField::new("name", DataType::Utf8, false),
2851 ])),
2852 false,
2853 )
2854 .with_metadata(inner_md);
2855 let mut root_md = std::collections::HashMap::new();
2856 root_md.insert(AVRO_NAME_METADATA_KEY.to_string(), "R".to_string());
2857 let expected = ArrowField::new(
2858 "R",
2859 DataType::Struct(Fields::from(vec![
2860 ArrowField::new("a", DataType::Int32, false),
2861 expected_b,
2862 expected_c,
2863 expected_inner,
2864 ArrowField::new("u", DataType::Int32, true),
2865 ])),
2866 false,
2867 )
2868 .with_metadata(root_md);
2869 assert_eq!(arrow_field, expected);
2870 }
2871
2872 #[test]
2873 fn default_order_is_consistent() {
2874 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", DataType::Utf8, true)]);
2875 let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
2876 let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
2877 assert_eq!(a, b.unwrap().json_string);
2878 }
2879
2880 #[test]
2881 fn test_union_branch_missing_name_errors() {
2882 for t in ["record", "enum", "fixed"] {
2883 let branch = json!({ "type": t });
2884 let err = union_branch_signature(&branch).unwrap_err().to_string();
2885 assert!(
2886 err.contains(&format!("Union branch '{t}' missing required 'name'")),
2887 "expected missing-name error for {t}, got: {err}"
2888 );
2889 }
2890 }
2891
2892 #[test]
2893 fn test_union_branch_named_type_signature_includes_name() {
2894 let rec = json!({ "type": "record", "name": "Foo" });
2895 assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
2896 let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", "G", "B"] });
2897 assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
2898 let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
2899 assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
2900 }
2901
2902 #[test]
2903 fn test_record_field_alias_resolution_without_default() {
2904 let writer_json = r#"{
2905 "type":"record",
2906 "name":"R",
2907 "fields":[{"name":"old","type":"int"}]
2908 }"#;
2909 let reader_json = r#"{
2910 "type":"record",
2911 "name":"R",
2912 "fields":[{"name":"new","aliases":["old"],"type":"int"}]
2913 }"#;
2914 let writer: Schema = serde_json::from_str(writer_json).unwrap();
2915 let reader: Schema = serde_json::from_str(reader_json).unwrap();
2916 let resolved = AvroFieldBuilder::new(&writer)
2917 .with_reader_schema(&reader)
2918 .with_utf8view(false)
2919 .with_strict_mode(false)
2920 .build()
2921 .unwrap();
2922 let expected = ArrowField::new(
2923 "R",
2924 DataType::Struct(Fields::from(vec![ArrowField::new(
2925 "new",
2926 DataType::Int32,
2927 false,
2928 )])),
2929 false,
2930 );
2931 assert_eq!(resolved.field(), expected);
2932 }
2933
2934 #[test]
2935 fn test_record_field_alias_ambiguous_in_strict_mode_errors() {
2936 let writer_json = r#"{
2937 "type":"record",
2938 "name":"R",
2939 "fields":[
2940 {"name":"a","type":"int","aliases":["old"]},
2941 {"name":"b","type":"int","aliases":["old"]}
2942 ]
2943 }"#;
2944 let reader_json = r#"{
2945 "type":"record",
2946 "name":"R",
2947 "fields":[{"name":"target","type":"int","aliases":["old"]}]
2948 }"#;
2949 let writer: Schema = serde_json::from_str(writer_json).unwrap();
2950 let reader: Schema = serde_json::from_str(reader_json).unwrap();
2951 let err = AvroFieldBuilder::new(&writer)
2952 .with_reader_schema(&reader)
2953 .with_utf8view(false)
2954 .with_strict_mode(true)
2955 .build()
2956 .unwrap_err()
2957 .to_string();
2958 assert!(
2959 err.contains("Ambiguous alias 'old'"),
2960 "expected ambiguous-alias error, got: {err}"
2961 );
2962 }
2963
2964 #[test]
2965 fn test_pragmatic_writer_field_alias_mapping_non_strict() {
2966 let writer_json = r#"{
2967 "type":"record",
2968 "name":"R",
2969 "fields":[{"name":"before","type":"int","aliases":["now"]}]
2970 }"#;
2971 let reader_json = r#"{
2972 "type":"record",
2973 "name":"R",
2974 "fields":[{"name":"now","type":"int"}]
2975 }"#;
2976 let writer: Schema = serde_json::from_str(writer_json).unwrap();
2977 let reader: Schema = serde_json::from_str(reader_json).unwrap();
2978 let resolved = AvroFieldBuilder::new(&writer)
2979 .with_reader_schema(&reader)
2980 .with_utf8view(false)
2981 .with_strict_mode(false)
2982 .build()
2983 .unwrap();
2984 let expected = ArrowField::new(
2985 "R",
2986 DataType::Struct(Fields::from(vec![ArrowField::new(
2987 "now",
2988 DataType::Int32,
2989 false,
2990 )])),
2991 false,
2992 );
2993 assert_eq!(resolved.field(), expected);
2994 }
2995
2996 #[test]
2997 fn test_missing_reader_field_null_first_no_default_is_ok() {
2998 let writer_json = r#"{
2999 "type":"record",
3000 "name":"R",
3001 "fields":[{"name":"a","type":"int"}]
3002 }"#;
3003 let reader_json = r#"{
3004 "type":"record",
3005 "name":"R",
3006 "fields":[
3007 {"name":"a","type":"int"},
3008 {"name":"b","type":["null","int"]}
3009 ]
3010 }"#;
3011 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3012 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3013 let resolved = AvroFieldBuilder::new(&writer)
3014 .with_reader_schema(&reader)
3015 .with_utf8view(false)
3016 .with_strict_mode(false)
3017 .build()
3018 .unwrap();
3019 let expected = ArrowField::new(
3020 "R",
3021 DataType::Struct(Fields::from(vec![
3022 ArrowField::new("a", DataType::Int32, false),
3023 ArrowField::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
3024 AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(),
3025 "null".to_string(),
3026 )])),
3027 ])),
3028 false,
3029 );
3030 assert_eq!(resolved.field(), expected);
3031 }
3032
3033 #[test]
3034 fn test_missing_reader_field_null_second_without_default_errors() {
3035 let writer_json = r#"{
3036 "type":"record",
3037 "name":"R",
3038 "fields":[{"name":"a","type":"int"}]
3039 }"#;
3040 let reader_json = r#"{
3041 "type":"record",
3042 "name":"R",
3043 "fields":[
3044 {"name":"a","type":"int"},
3045 {"name":"b","type":["int","null"]}
3046 ]
3047 }"#;
3048 let writer: Schema = serde_json::from_str(writer_json).unwrap();
3049 let reader: Schema = serde_json::from_str(reader_json).unwrap();
3050 let err = AvroFieldBuilder::new(&writer)
3051 .with_reader_schema(&reader)
3052 .with_utf8view(false)
3053 .with_strict_mode(false)
3054 .build()
3055 .unwrap_err()
3056 .to_string();
3057 assert!(
3058 err.contains("must have a default value"),
3059 "expected missing-default error, got: {err}"
3060 );
3061 }
3062
3063 #[test]
3064 fn test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
3065 let field = ArrowField::new("x", DataType::Int32, true);
3066 let injected_json =
3067 r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3068 .to_string();
3069 let mut md = HashMap::new();
3070 md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
3071 md.insert("custom".to_string(), "123".to_string());
3072 let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3073 let opts = AvroSchemaOptions {
3074 null_order: Some(Nullability::NullSecond),
3075 strip_metadata: false,
3076 };
3077 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3078 assert_eq!(
3079 out.json_string, injected_json,
3080 "When strip_metadata=false and avro.schema is present, return the embedded JSON verbatim"
3081 );
3082 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3083 assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
3084 assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
3085 }
3086
3087 #[test]
3088 fn test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough() {
3089 let field = ArrowField::new("x", DataType::Int32, true);
3090 let injected_json =
3091 r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
3092 .to_string();
3093 let mut md = HashMap::new();
3094 md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
3095 md.insert("custom_meta".to_string(), "7".to_string());
3096 let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
3097 let opts = AvroSchemaOptions {
3098 null_order: Some(Nullability::NullFirst),
3099 strip_metadata: true,
3100 };
3101 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3102 assert_json_contains(&out.json_string, "\"type\":\"record\"");
3103 assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
3104 assert_json_contains(&out.json_string, "\"custom_meta\":7");
3105 }
3106
3107 #[test]
3108 fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
3109 let field = ArrowField::new("s", DataType::Utf8, true);
3110 let arrow_schema = single_field_schema(field);
3111 let opts = AvroSchemaOptions {
3112 null_order: Some(Nullability::NullFirst),
3113 strip_metadata: true,
3114 };
3115 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3116 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3117 let arr = v["fields"][0]["type"]
3118 .as_array()
3119 .expect("nullable primitive should be Avro union array");
3120 assert_eq!(arr[0], Value::String("null".into()));
3121 assert_eq!(arr[1], Value::String("string".into()));
3122 }
3123
3124 #[test]
3125 fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
3126 let field = ArrowField::new("s", DataType::Utf8, true);
3127 let arrow_schema = single_field_schema(field);
3128 let opts = AvroSchemaOptions {
3129 null_order: Some(Nullability::NullSecond),
3130 strip_metadata: true,
3131 };
3132 let out = AvroSchema::from_arrow_with_options(&arrow_schema, Some(opts)).unwrap();
3133 let v: Value = serde_json::from_str(&out.json_string).unwrap();
3134 let arr = v["fields"][0]["type"]
3135 .as_array()
3136 .expect("nullable primitive should be Avro union array");
3137 assert_eq!(arr[0], Value::String("string".into()));
3138 assert_eq!(arr[1], Value::String("null".into()));
3139 }
3140
3141 #[test]
3142 fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata() {
3143 let uf: UnionFields = vec![
3144 (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
3145 (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
3146 ]
3147 .into_iter()
3148 .collect();
3149 let union_dt = DataType::Union(uf, UnionMode::Dense);
3150 let arrow_schema = single_field_schema(ArrowField::new("u", union_dt, true));
3151 let with_extras = AvroSchema::from_arrow_with_options(
3152 &arrow_schema,
3153 Some(AvroSchemaOptions {
3154 null_order: Some(Nullability::NullFirst),
3155 strip_metadata: false,
3156 }),
3157 )
3158 .unwrap();
3159 let v_with: Value = serde_json::from_str(&with_extras.json_string).unwrap();
3160 let union_arr = v_with["fields"][0]["type"].as_array().expect("union array");
3161 let first_obj = union_arr
3162 .iter()
3163 .find(|b| b.is_object())
3164 .expect("expected an object branch with extras");
3165 let obj = first_obj.as_object().unwrap();
3166 assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
3167 assert_eq!(
3168 obj.get("arrowUnionMode").and_then(|m| m.as_str()),
3169 Some("dense")
3170 );
3171 let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
3172 .as_array()
3173 .expect("arrowUnionTypeIds array")
3174 .iter()
3175 .map(|n| n.as_i64().expect("i64"))
3176 .collect();
3177 assert_eq!(type_ids, vec![2, 7]);
3178 let stripped = AvroSchema::from_arrow_with_options(
3179 &arrow_schema,
3180 Some(AvroSchemaOptions {
3181 null_order: Some(Nullability::NullFirst),
3182 strip_metadata: true,
3183 }),
3184 )
3185 .unwrap();
3186 let v_stripped: Value = serde_json::from_str(&stripped.json_string).unwrap();
3187 let union_arr2 = v_stripped["fields"][0]["type"]
3188 .as_array()
3189 .expect("union array");
3190 assert!(
3191 !union_arr2.iter().any(|b| b
3192 .as_object()
3193 .is_some_and(|m| m.contains_key("arrowUnionMode"))),
3194 "extras must be removed when strip_metadata=true"
3195 );
3196 assert_eq!(union_arr2[0], Value::String("null".into()));
3197 assert_eq!(union_arr2[1], Value::String("int".into()));
3198 assert_eq!(union_arr2[2], Value::String("string".into()));
3199 }
3200
3201 #[test]
3202 fn test_project_empty_projection() {
3203 let schema_json = r#"{
3204 "type": "record",
3205 "name": "Test",
3206 "fields": [
3207 {"name": "a", "type": "int"},
3208 {"name": "b", "type": "string"}
3209 ]
3210 }"#;
3211 let schema = AvroSchema::new(schema_json.to_string());
3212 let projected = schema.project(&[]).unwrap();
3213 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3214 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3215 assert!(
3216 fields.is_empty(),
3217 "Empty projection should yield empty fields"
3218 );
3219 }
3220
3221 #[test]
3222 fn test_project_single_field() {
3223 let schema_json = r#"{
3224 "type": "record",
3225 "name": "Test",
3226 "fields": [
3227 {"name": "a", "type": "int"},
3228 {"name": "b", "type": "string"},
3229 {"name": "c", "type": "long"}
3230 ]
3231 }"#;
3232 let schema = AvroSchema::new(schema_json.to_string());
3233 let projected = schema.project(&[1]).unwrap();
3234 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3235 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3236 assert_eq!(fields.len(), 1);
3237 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("b"));
3238 }
3239
3240 #[test]
3241 fn test_project_multiple_fields() {
3242 let schema_json = r#"{
3243 "type": "record",
3244 "name": "Test",
3245 "fields": [
3246 {"name": "a", "type": "int"},
3247 {"name": "b", "type": "string"},
3248 {"name": "c", "type": "long"},
3249 {"name": "d", "type": "boolean"}
3250 ]
3251 }"#;
3252 let schema = AvroSchema::new(schema_json.to_string());
3253 let projected = schema.project(&[0, 2, 3]).unwrap();
3254 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3255 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3256 assert_eq!(fields.len(), 3);
3257 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3258 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("c"));
3259 assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("d"));
3260 }
3261
3262 #[test]
3263 fn test_project_all_fields() {
3264 let schema_json = r#"{
3265 "type": "record",
3266 "name": "Test",
3267 "fields": [
3268 {"name": "a", "type": "int"},
3269 {"name": "b", "type": "string"}
3270 ]
3271 }"#;
3272 let schema = AvroSchema::new(schema_json.to_string());
3273 let projected = schema.project(&[0, 1]).unwrap();
3274 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3275 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3276 assert_eq!(fields.len(), 2);
3277 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
3278 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("b"));
3279 }
3280
3281 #[test]
3282 fn test_project_reorder_fields() {
3283 let schema_json = r#"{
3284 "type": "record",
3285 "name": "Test",
3286 "fields": [
3287 {"name": "a", "type": "int"},
3288 {"name": "b", "type": "string"},
3289 {"name": "c", "type": "long"}
3290 ]
3291 }"#;
3292 let schema = AvroSchema::new(schema_json.to_string());
3293 let projected = schema.project(&[2, 0, 1]).unwrap();
3295 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3296 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3297 assert_eq!(fields.len(), 3);
3298 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("c"));
3299 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("a"));
3300 assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("b"));
3301 }
3302
3303 #[test]
3304 fn test_project_preserves_record_metadata() {
3305 let schema_json = r#"{
3306 "type": "record",
3307 "name": "MyRecord",
3308 "namespace": "com.example",
3309 "doc": "A test record",
3310 "aliases": ["OldRecord"],
3311 "fields": [
3312 {"name": "a", "type": "int"},
3313 {"name": "b", "type": "string"}
3314 ]
3315 }"#;
3316 let schema = AvroSchema::new(schema_json.to_string());
3317 let projected = schema.project(&[0]).unwrap();
3318 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3319 assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("MyRecord"));
3320 assert_eq!(
3321 v.get("namespace").and_then(|n| n.as_str()),
3322 Some("com.example")
3323 );
3324 assert_eq!(v.get("doc").and_then(|n| n.as_str()), Some("A test record"));
3325 assert!(v.get("aliases").is_some());
3326 }
3327
3328 #[test]
3329 fn test_project_preserves_field_metadata() {
3330 let schema_json = r#"{
3331 "type": "record",
3332 "name": "Test",
3333 "fields": [
3334 {"name": "a", "type": "int", "doc": "Field A", "default": 0},
3335 {"name": "b", "type": "string"}
3336 ]
3337 }"#;
3338 let schema = AvroSchema::new(schema_json.to_string());
3339 let projected = schema.project(&[0]).unwrap();
3340 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3341 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3342 assert_eq!(
3343 fields[0].get("doc").and_then(|d| d.as_str()),
3344 Some("Field A")
3345 );
3346 assert_eq!(fields[0].get("default").and_then(|d| d.as_i64()), Some(0));
3347 }
3348
3349 #[test]
3350 fn test_project_with_nested_record() {
3351 let schema_json = r#"{
3352 "type": "record",
3353 "name": "Outer",
3354 "fields": [
3355 {"name": "id", "type": "int"},
3356 {"name": "inner", "type": {
3357 "type": "record",
3358 "name": "Inner",
3359 "fields": [
3360 {"name": "x", "type": "int"},
3361 {"name": "y", "type": "string"}
3362 ]
3363 }},
3364 {"name": "value", "type": "double"}
3365 ]
3366 }"#;
3367 let schema = AvroSchema::new(schema_json.to_string());
3368 let projected = schema.project(&[1]).unwrap();
3369 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3370 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3371 assert_eq!(fields.len(), 1);
3372 assert_eq!(
3373 fields[0].get("name").and_then(|n| n.as_str()),
3374 Some("inner")
3375 );
3376 let inner_type = fields[0].get("type").unwrap();
3378 assert_eq!(
3379 inner_type.get("type").and_then(|t| t.as_str()),
3380 Some("record")
3381 );
3382 assert_eq!(
3383 inner_type.get("name").and_then(|n| n.as_str()),
3384 Some("Inner")
3385 );
3386 }
3387
3388 #[test]
3389 fn test_project_with_complex_field_types() {
3390 let schema_json = r#"{
3391 "type": "record",
3392 "name": "Test",
3393 "fields": [
3394 {"name": "arr", "type": {"type": "array", "items": "int"}},
3395 {"name": "map", "type": {"type": "map", "values": "string"}},
3396 {"name": "union", "type": ["null", "int"]}
3397 ]
3398 }"#;
3399 let schema = AvroSchema::new(schema_json.to_string());
3400 let projected = schema.project(&[0, 2]).unwrap();
3401 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3402 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3403 assert_eq!(fields.len(), 2);
3404 let arr_type = fields[0].get("type").unwrap();
3406 assert_eq!(arr_type.get("type").and_then(|t| t.as_str()), Some("array"));
3407 let union_type = fields[1].get("type").unwrap();
3409 assert!(union_type.is_array());
3410 }
3411
3412 #[test]
3413 fn test_project_error_invalid_json() {
3414 let schema = AvroSchema::new("not valid json".to_string());
3415 let err = schema.project(&[0]).unwrap_err();
3416 let msg = err.to_string();
3417 assert!(
3418 msg.contains("Invalid Avro schema JSON"),
3419 "Expected parse error, got: {msg}"
3420 );
3421 }
3422
3423 #[test]
3424 fn test_project_error_not_object() {
3425 let schema = AvroSchema::new(r#""string""#.to_string());
3427 let err = schema.project(&[0]).unwrap_err();
3428 let msg = err.to_string();
3429 assert!(
3430 msg.contains("must be a JSON object"),
3431 "Expected object error, got: {msg}"
3432 );
3433 }
3434
3435 #[test]
3436 fn test_project_error_array_schema() {
3437 let schema = AvroSchema::new(r#"["null", "int"]"#.to_string());
3439 let err = schema.project(&[0]).unwrap_err();
3440 let msg = err.to_string();
3441 assert!(
3442 msg.contains("must be a JSON object"),
3443 "Expected object error for array schema, got: {msg}"
3444 );
3445 }
3446
3447 #[test]
3448 fn test_project_error_type_not_record() {
3449 let schema_json = r#"{
3450 "type": "enum",
3451 "name": "Color",
3452 "symbols": ["RED", "GREEN", "BLUE"]
3453 }"#;
3454 let schema = AvroSchema::new(schema_json.to_string());
3455 let err = schema.project(&[0]).unwrap_err();
3456 let msg = err.to_string();
3457 assert!(
3458 msg.contains("must be an Avro record") && msg.contains("'enum'"),
3459 "Expected type mismatch error, got: {msg}"
3460 );
3461 }
3462
3463 #[test]
3464 fn test_project_error_type_array() {
3465 let schema_json = r#"{
3466 "type": "array",
3467 "items": "int"
3468 }"#;
3469 let schema = AvroSchema::new(schema_json.to_string());
3470 let err = schema.project(&[0]).unwrap_err();
3471 let msg = err.to_string();
3472 assert!(
3473 msg.contains("must be an Avro record") && msg.contains("'array'"),
3474 "Expected type mismatch error for array type, got: {msg}"
3475 );
3476 }
3477
3478 #[test]
3479 fn test_project_error_type_fixed() {
3480 let schema_json = r#"{
3481 "type": "fixed",
3482 "name": "MD5",
3483 "size": 16
3484 }"#;
3485 let schema = AvroSchema::new(schema_json.to_string());
3486 let err = schema.project(&[0]).unwrap_err();
3487 let msg = err.to_string();
3488 assert!(
3489 msg.contains("must be an Avro record") && msg.contains("'fixed'"),
3490 "Expected type mismatch error for fixed type, got: {msg}"
3491 );
3492 }
3493
3494 #[test]
3495 fn test_project_error_type_map() {
3496 let schema_json = r#"{
3497 "type": "map",
3498 "values": "string"
3499 }"#;
3500 let schema = AvroSchema::new(schema_json.to_string());
3501 let err = schema.project(&[0]).unwrap_err();
3502 let msg = err.to_string();
3503 assert!(
3504 msg.contains("must be an Avro record") && msg.contains("'map'"),
3505 "Expected type mismatch error for map type, got: {msg}"
3506 );
3507 }
3508
3509 #[test]
3510 fn test_project_error_missing_type_field() {
3511 let schema_json = r#"{
3512 "name": "Test",
3513 "fields": [{"name": "a", "type": "int"}]
3514 }"#;
3515 let schema = AvroSchema::new(schema_json.to_string());
3516 let err = schema.project(&[0]).unwrap_err();
3517 let msg = err.to_string();
3518 assert!(
3519 msg.contains("missing required 'type' field"),
3520 "Expected missing type error, got: {msg}"
3521 );
3522 }
3523
3524 #[test]
3525 fn test_project_error_missing_fields() {
3526 let schema_json = r#"{
3527 "type": "record",
3528 "name": "Test"
3529 }"#;
3530 let schema = AvroSchema::new(schema_json.to_string());
3531 let err = schema.project(&[0]).unwrap_err();
3532 let msg = err.to_string();
3533 assert!(
3534 msg.contains("missing required 'fields'"),
3535 "Expected missing fields error, got: {msg}"
3536 );
3537 }
3538
3539 #[test]
3540 fn test_project_error_fields_not_array() {
3541 let schema_json = r#"{
3542 "type": "record",
3543 "name": "Test",
3544 "fields": "not an array"
3545 }"#;
3546 let schema = AvroSchema::new(schema_json.to_string());
3547 let err = schema.project(&[0]).unwrap_err();
3548 let msg = err.to_string();
3549 assert!(
3550 msg.contains("'fields' must be an array"),
3551 "Expected fields array error, got: {msg}"
3552 );
3553 }
3554
3555 #[test]
3556 fn test_project_error_index_out_of_bounds() {
3557 let schema_json = r#"{
3558 "type": "record",
3559 "name": "Test",
3560 "fields": [
3561 {"name": "a", "type": "int"},
3562 {"name": "b", "type": "string"}
3563 ]
3564 }"#;
3565 let schema = AvroSchema::new(schema_json.to_string());
3566 let err = schema.project(&[5]).unwrap_err();
3567 let msg = err.to_string();
3568 assert!(
3569 msg.contains("out of bounds") && msg.contains("5") && msg.contains("2"),
3570 "Expected out of bounds error, got: {msg}"
3571 );
3572 }
3573
3574 #[test]
3575 fn test_project_error_index_out_of_bounds_edge() {
3576 let schema_json = r#"{
3577 "type": "record",
3578 "name": "Test",
3579 "fields": [
3580 {"name": "a", "type": "int"}
3581 ]
3582 }"#;
3583 let schema = AvroSchema::new(schema_json.to_string());
3584 let err = schema.project(&[1]).unwrap_err();
3586 let msg = err.to_string();
3587 assert!(
3588 msg.contains("out of bounds") && msg.contains("1"),
3589 "Expected out of bounds error for edge case, got: {msg}"
3590 );
3591 }
3592
3593 #[test]
3594 fn test_project_error_duplicate_index() {
3595 let schema_json = r#"{
3596 "type": "record",
3597 "name": "Test",
3598 "fields": [
3599 {"name": "a", "type": "int"},
3600 {"name": "b", "type": "string"},
3601 {"name": "c", "type": "long"}
3602 ]
3603 }"#;
3604 let schema = AvroSchema::new(schema_json.to_string());
3605 let err = schema.project(&[0, 1, 0]).unwrap_err();
3606 let msg = err.to_string();
3607 assert!(
3608 msg.contains("Duplicate projection index") && msg.contains("0"),
3609 "Expected duplicate index error, got: {msg}"
3610 );
3611 }
3612
3613 #[test]
3614 fn test_project_error_duplicate_index_consecutive() {
3615 let schema_json = r#"{
3616 "type": "record",
3617 "name": "Test",
3618 "fields": [
3619 {"name": "a", "type": "int"},
3620 {"name": "b", "type": "string"}
3621 ]
3622 }"#;
3623 let schema = AvroSchema::new(schema_json.to_string());
3624 let err = schema.project(&[1, 1]).unwrap_err();
3625 let msg = err.to_string();
3626 assert!(
3627 msg.contains("Duplicate projection index") && msg.contains("1"),
3628 "Expected duplicate index error for consecutive duplicates, got: {msg}"
3629 );
3630 }
3631
3632 #[test]
3633 fn test_project_with_empty_fields() {
3634 let schema_json = r#"{
3635 "type": "record",
3636 "name": "EmptyRecord",
3637 "fields": []
3638 }"#;
3639 let schema = AvroSchema::new(schema_json.to_string());
3640 let projected = schema.project(&[]).unwrap();
3642 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3643 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3644 assert!(fields.is_empty());
3645 }
3646
3647 #[test]
3648 fn test_project_empty_fields_index_out_of_bounds() {
3649 let schema_json = r#"{
3650 "type": "record",
3651 "name": "EmptyRecord",
3652 "fields": []
3653 }"#;
3654 let schema = AvroSchema::new(schema_json.to_string());
3655 let err = schema.project(&[0]).unwrap_err();
3656 let msg = err.to_string();
3657 assert!(
3658 msg.contains("out of bounds") && msg.contains("0 fields"),
3659 "Expected out of bounds error for empty record, got: {msg}"
3660 );
3661 }
3662
3663 #[test]
3664 fn test_project_result_is_valid_avro_schema() {
3665 let schema_json = r#"{
3666 "type": "record",
3667 "name": "Test",
3668 "namespace": "com.example",
3669 "fields": [
3670 {"name": "id", "type": "long"},
3671 {"name": "name", "type": "string"},
3672 {"name": "active", "type": "boolean"}
3673 ]
3674 }"#;
3675 let schema = AvroSchema::new(schema_json.to_string());
3676 let projected = schema.project(&[0, 2]).unwrap();
3677 let parsed = projected.schema();
3679 assert!(parsed.is_ok(), "Projected schema should be valid Avro");
3680 match parsed.unwrap() {
3681 Schema::Complex(ComplexType::Record(r)) => {
3682 assert_eq!(r.name, "Test");
3683 assert_eq!(r.namespace, Some("com.example"));
3684 assert_eq!(r.fields.len(), 2);
3685 assert_eq!(r.fields[0].name, "id");
3686 assert_eq!(r.fields[1].name, "active");
3687 }
3688 _ => panic!("Expected Record schema"),
3689 }
3690 }
3691
3692 #[test]
3693 fn test_project_non_contiguous_indices() {
3694 let schema_json = r#"{
3695 "type": "record",
3696 "name": "Test",
3697 "fields": [
3698 {"name": "f0", "type": "int"},
3699 {"name": "f1", "type": "int"},
3700 {"name": "f2", "type": "int"},
3701 {"name": "f3", "type": "int"},
3702 {"name": "f4", "type": "int"}
3703 ]
3704 }"#;
3705 let schema = AvroSchema::new(schema_json.to_string());
3706 let projected = schema.project(&[0, 2, 4]).unwrap();
3708 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3709 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3710 assert_eq!(fields.len(), 3);
3711 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f0"));
3712 assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("f2"));
3713 assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("f4"));
3714 }
3715
3716 #[test]
3717 fn test_project_single_field_from_many() {
3718 let schema_json = r#"{
3719 "type": "record",
3720 "name": "BigRecord",
3721 "fields": [
3722 {"name": "f0", "type": "int"},
3723 {"name": "f1", "type": "int"},
3724 {"name": "f2", "type": "int"},
3725 {"name": "f3", "type": "int"},
3726 {"name": "f4", "type": "int"},
3727 {"name": "f5", "type": "int"},
3728 {"name": "f6", "type": "int"},
3729 {"name": "f7", "type": "int"},
3730 {"name": "f8", "type": "int"},
3731 {"name": "f9", "type": "int"}
3732 ]
3733 }"#;
3734 let schema = AvroSchema::new(schema_json.to_string());
3735 let projected = schema.project(&[9]).unwrap();
3737 let v: Value = serde_json::from_str(&projected.json_string).unwrap();
3738 let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
3739 assert_eq!(fields.len(), 1);
3740 assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f9"));
3741 }
3742}