1use crate::schema::{
21 AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22 AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23 PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26 ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27 IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43 Promotion(Promotion),
45 DefaultValue(AvroLiteral),
47 EnumMapping(EnumMapping),
49 Record(ResolvedRecord),
51 Union(ResolvedUnion),
53}
54
55#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60 Null,
62 Boolean(bool),
64 Int(i32),
66 Long(i64),
68 Float(f32),
70 Double(f64),
72 Bytes(Vec<u8>),
74 String(String),
76 Enum(String),
78 Array(Vec<AvroLiteral>),
80 Map(IndexMap<String, AvroLiteral>),
82}
83
84#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87 pub(crate) writer_to_reader: Arc<[Option<usize>]>,
90 pub(crate) default_fields: Arc<[usize]>,
92 pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub(crate) enum Promotion {
103 Direct,
105 IntToLong,
107 IntToFloat,
109 IntToDouble,
111 LongToFloat,
113 LongToDouble,
115 FloatToDouble,
117 StringToBytes,
119 BytesToString,
121}
122
123impl Display for Promotion {
124 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
125 match self {
126 Self::Direct => write!(formatter, "Direct"),
127 Self::IntToLong => write!(formatter, "Int->Long"),
128 Self::IntToFloat => write!(formatter, "Int->Float"),
129 Self::IntToDouble => write!(formatter, "Int->Double"),
130 Self::LongToFloat => write!(formatter, "Long->Float"),
131 Self::LongToDouble => write!(formatter, "Long->Double"),
132 Self::FloatToDouble => write!(formatter, "Float->Double"),
133 Self::StringToBytes => write!(formatter, "String->Bytes"),
134 Self::BytesToString => write!(formatter, "Bytes->String"),
135 }
136 }
137}
138
139#[derive(Debug, Clone, PartialEq)]
141pub(crate) struct ResolvedUnion {
142 pub(crate) writer_to_reader: Arc<[Option<(usize, Promotion)>]>,
145 pub(crate) writer_is_union: bool,
147 pub(crate) reader_is_union: bool,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
155pub(crate) struct EnumMapping {
156 pub(crate) mapping: Arc<[i32]>,
158 pub(crate) default_index: i32,
161}
162
163#[cfg(feature = "canonical_extension_types")]
164fn with_extension_type(codec: &Codec, field: Field) -> Field {
165 match codec {
166 Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
167 _ => field,
168 }
169}
170
171#[derive(Debug, Clone, PartialEq)]
173pub(crate) struct AvroDataType {
174 nullability: Option<Nullability>,
175 metadata: HashMap<String, String>,
176 codec: Codec,
177 pub(crate) resolution: Option<ResolutionInfo>,
178}
179
180impl AvroDataType {
181 pub(crate) fn new(
183 codec: Codec,
184 metadata: HashMap<String, String>,
185 nullability: Option<Nullability>,
186 ) -> Self {
187 AvroDataType {
188 codec,
189 metadata,
190 nullability,
191 resolution: None,
192 }
193 }
194
195 #[inline]
196 fn new_with_resolution(
197 codec: Codec,
198 metadata: HashMap<String, String>,
199 nullability: Option<Nullability>,
200 resolution: Option<ResolutionInfo>,
201 ) -> Self {
202 Self {
203 codec,
204 metadata,
205 nullability,
206 resolution,
207 }
208 }
209
210 pub(crate) fn field_with_name(&self, name: &str) -> Field {
212 let mut nullable = self.nullability.is_some();
213 if !nullable {
214 if let Codec::Union(children, _, _) = self.codec() {
215 if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
217 nullable = true;
218 }
219 }
220 }
221 let data_type = self.codec.data_type();
222 let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
223 #[cfg(feature = "canonical_extension_types")]
224 return with_extension_type(&self.codec, field);
225 #[cfg(not(feature = "canonical_extension_types"))]
226 field
227 }
228
229 pub(crate) fn codec(&self) -> &Codec {
234 &self.codec
235 }
236
237 pub(crate) fn nullability(&self) -> Option<Nullability> {
245 self.nullability
246 }
247
248 #[inline]
249 fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
250 fn expect_string<'v>(
251 default_json: &'v Value,
252 data_type: &str,
253 ) -> Result<&'v str, ArrowError> {
254 match default_json {
255 Value::String(s) => Ok(s.as_str()),
256 _ => Err(ArrowError::SchemaError(format!(
257 "Default value must be a JSON string for {data_type}"
258 ))),
259 }
260 }
261
262 fn parse_bytes_default(
263 default_json: &Value,
264 expected_len: Option<usize>,
265 ) -> Result<Vec<u8>, ArrowError> {
266 let s = expect_string(default_json, "bytes/fixed logical types")?;
267 let mut out = Vec::with_capacity(s.len());
268 for ch in s.chars() {
269 let cp = ch as u32;
270 if cp > 0xFF {
271 return Err(ArrowError::SchemaError(format!(
272 "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
273 )));
274 }
275 out.push(cp as u8);
276 }
277 if let Some(len) = expected_len {
278 if out.len() != len {
279 return Err(ArrowError::SchemaError(format!(
280 "Default length {} does not match expected fixed size {len}",
281 out.len(),
282 )));
283 }
284 }
285 Ok(out)
286 }
287
288 fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
289 match default_json {
290 Value::Number(n) => n.as_i64().ok_or_else(|| {
291 ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
292 }),
293 _ => Err(ArrowError::SchemaError(format!(
294 "Default {data_type} must be a JSON integer"
295 ))),
296 }
297 }
298
299 fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
300 match default_json {
301 Value::Number(n) => n.as_f64().ok_or_else(|| {
302 ArrowError::SchemaError(format!("Default {data_type} must be a number"))
303 }),
304 _ => Err(ArrowError::SchemaError(format!(
305 "Default {data_type} must be a JSON number"
306 ))),
307 }
308 }
309
310 if default_json.is_null() {
312 return match self.codec() {
313 Codec::Null => Ok(AvroLiteral::Null),
314 Codec::Union(encodings, _, _) if !encodings.is_empty()
315 && matches!(encodings[0].codec(), Codec::Null) =>
316 {
317 Ok(AvroLiteral::Null)
318 }
319 _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
320 _ => Err(ArrowError::SchemaError(
321 "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
322 .to_string(),
323 )),
324 };
325 }
326 let lit = match self.codec() {
327 Codec::Null => {
328 return Err(ArrowError::SchemaError(
329 "Default for `null` type must be JSON null".to_string(),
330 ));
331 }
332 Codec::Boolean => match default_json {
333 Value::Bool(b) => AvroLiteral::Boolean(*b),
334 _ => {
335 return Err(ArrowError::SchemaError(
336 "Boolean default must be a JSON boolean".to_string(),
337 ));
338 }
339 },
340 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
341 let i = parse_json_i64(default_json, "int")?;
342 if i < i32::MIN as i64 || i > i32::MAX as i64 {
343 return Err(ArrowError::SchemaError(format!(
344 "Default int {i} out of i32 range"
345 )));
346 }
347 AvroLiteral::Int(i as i32)
348 }
349 Codec::Int64
350 | Codec::TimeMicros
351 | Codec::TimestampMillis(_)
352 | Codec::TimestampMicros(_)
353 | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
354 #[cfg(feature = "avro_custom_types")]
355 Codec::DurationNanos
356 | Codec::DurationMicros
357 | Codec::DurationMillis
358 | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
359 Codec::Float32 => {
360 let f = parse_json_f64(default_json, "float")?;
361 if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
362 return Err(ArrowError::SchemaError(format!(
363 "Default float {f} out of f32 range or not finite"
364 )));
365 }
366 AvroLiteral::Float(f as f32)
367 }
368 Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
369 Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
370 AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
371 }
372 Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
373 Codec::Fixed(sz) => {
374 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
375 }
376 Codec::Decimal(_, _, fixed_size) => {
377 AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
378 }
379 Codec::Enum(symbols) => {
380 let s = expect_string(default_json, "enum")?;
381 if symbols.iter().any(|sym| sym == s) {
382 AvroLiteral::Enum(s.to_string())
383 } else {
384 return Err(ArrowError::SchemaError(format!(
385 "Default enum symbol {s:?} not found in reader enum symbols"
386 )));
387 }
388 }
389 Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
390 Codec::List(item_dt) => match default_json {
391 Value::Array(items) => AvroLiteral::Array(
392 items
393 .iter()
394 .map(|v| item_dt.parse_default_literal(v))
395 .collect::<Result<_, _>>()?,
396 ),
397 _ => {
398 return Err(ArrowError::SchemaError(
399 "Default value must be a JSON array for Avro array type".to_string(),
400 ));
401 }
402 },
403 Codec::Map(val_dt) => match default_json {
404 Value::Object(map) => {
405 let mut out = IndexMap::with_capacity(map.len());
406 for (k, v) in map {
407 out.insert(k.clone(), val_dt.parse_default_literal(v)?);
408 }
409 AvroLiteral::Map(out)
410 }
411 _ => {
412 return Err(ArrowError::SchemaError(
413 "Default value must be a JSON object for Avro map type".to_string(),
414 ));
415 }
416 },
417 Codec::Struct(fields) => match default_json {
418 Value::Object(obj) => {
419 let mut out: IndexMap<String, AvroLiteral> =
420 IndexMap::with_capacity(fields.len());
421 for f in fields.as_ref() {
422 let name = f.name().to_string();
423 if let Some(sub) = obj.get(&name) {
424 out.insert(name, f.data_type().parse_default_literal(sub)?);
425 } else {
426 let stored_default =
428 f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
429 if stored_default.is_none()
430 && f.data_type().nullability() == Some(Nullability::default())
431 {
432 out.insert(name, AvroLiteral::Null);
433 } else if let Some(default_json) = stored_default {
434 let v: Value =
435 serde_json::from_str(default_json).map_err(|e| {
436 ArrowError::SchemaError(format!(
437 "Failed to parse stored subfield default JSON for '{}': {e}",
438 f.name(),
439 ))
440 })?;
441 out.insert(name, f.data_type().parse_default_literal(&v)?);
442 } else {
443 return Err(ArrowError::SchemaError(format!(
444 "Record default missing required subfield '{}' with non-nullable type {:?}",
445 f.name(),
446 f.data_type().codec()
447 )));
448 }
449 }
450 }
451 AvroLiteral::Map(out)
452 }
453 _ => {
454 return Err(ArrowError::SchemaError(
455 "Default value for record/struct must be a JSON object".to_string(),
456 ));
457 }
458 },
459 Codec::Union(encodings, _, _) => {
460 let Some(default_encoding) = encodings.first() else {
461 return Err(ArrowError::SchemaError(
462 "Union with no branches cannot have a default".to_string(),
463 ));
464 };
465 default_encoding.parse_default_literal(default_json)?
466 }
467 #[cfg(feature = "avro_custom_types")]
468 Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
469 };
470 Ok(lit)
471 }
472
473 fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
474 let json_text = serde_json::to_string(default_json).map_err(|e| {
475 ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
476 })?;
477 self.metadata
478 .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
479 Ok(())
480 }
481
482 fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
483 let lit = self.parse_default_literal(default_json)?;
484 self.store_default(default_json)?;
485 Ok(lit)
486 }
487}
488
489#[derive(Debug, Clone, PartialEq)]
491pub(crate) struct AvroField {
492 name: String,
493 data_type: AvroDataType,
494}
495
496impl AvroField {
497 pub(crate) fn field(&self) -> Field {
499 self.data_type.field_with_name(&self.name)
500 }
501
502 pub(crate) fn data_type(&self) -> &AvroDataType {
504 &self.data_type
505 }
506
507 pub(crate) fn with_utf8view(&self) -> Self {
516 let mut field = self.clone();
517 if let Codec::Utf8 = field.data_type.codec {
518 field.data_type.codec = Codec::Utf8View;
519 }
520 field
521 }
522
523 pub(crate) fn name(&self) -> &str {
528 &self.name
529 }
530}
531
532impl<'a> TryFrom<&Schema<'a>> for AvroField {
533 type Error = ArrowError;
534
535 fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
536 match schema {
537 Schema::Complex(ComplexType::Record(r)) => {
538 let mut resolver = Maker::new(false, false);
539 let data_type = resolver.make_data_type(schema, None, None)?;
540 Ok(AvroField {
541 data_type,
542 name: r.name.to_string(),
543 })
544 }
545 _ => Err(ArrowError::ParseError(format!(
546 "Expected record got {schema:?}"
547 ))),
548 }
549 }
550}
551
552#[derive(Debug)]
554pub(crate) struct AvroFieldBuilder<'a> {
555 writer_schema: &'a Schema<'a>,
556 reader_schema: Option<&'a Schema<'a>>,
557 use_utf8view: bool,
558 strict_mode: bool,
559}
560
561impl<'a> AvroFieldBuilder<'a> {
562 pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
564 Self {
565 writer_schema,
566 reader_schema: None,
567 use_utf8view: false,
568 strict_mode: false,
569 }
570 }
571
572 #[inline]
577 pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
578 self.reader_schema = Some(reader_schema);
579 self
580 }
581
582 pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
584 self.use_utf8view = use_utf8view;
585 self
586 }
587
588 pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
590 self.strict_mode = strict_mode;
591 self
592 }
593
594 pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
596 match self.writer_schema {
597 Schema::Complex(ComplexType::Record(r)) => {
598 let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
599 let data_type =
600 resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
601 Ok(AvroField {
602 name: r.name.to_string(),
603 data_type,
604 })
605 }
606 _ => Err(ArrowError::ParseError(format!(
607 "Expected a Record schema to build an AvroField, but got {:?}",
608 self.writer_schema
609 ))),
610 }
611 }
612}
613
614#[derive(Debug, Clone, PartialEq)]
618pub(crate) enum Codec {
619 Null,
621 Boolean,
623 Int32,
625 Int64,
627 Float32,
629 Float64,
631 Binary,
633 Utf8,
635 Utf8View,
640 Date32,
642 TimeMillis,
644 TimeMicros,
646 TimestampMillis(bool),
651 TimestampMicros(bool),
656 TimestampNanos(bool),
661 Fixed(i32),
664 Decimal(usize, Option<usize>, Option<usize>),
671 Uuid,
673 Enum(Arc<[String]>),
677 List(Arc<AvroDataType>),
679 Struct(Arc<[AvroField]>),
681 Map(Arc<AvroDataType>),
683 Interval,
685 Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
687 #[cfg(feature = "avro_custom_types")]
689 DurationNanos,
690 #[cfg(feature = "avro_custom_types")]
692 DurationMicros,
693 #[cfg(feature = "avro_custom_types")]
695 DurationMillis,
696 #[cfg(feature = "avro_custom_types")]
698 DurationSeconds,
699 #[cfg(feature = "avro_custom_types")]
700 RunEndEncoded(Arc<AvroDataType>, u8),
701}
702
703impl Codec {
704 fn data_type(&self) -> DataType {
705 match self {
706 Self::Null => DataType::Null,
707 Self::Boolean => DataType::Boolean,
708 Self::Int32 => DataType::Int32,
709 Self::Int64 => DataType::Int64,
710 Self::Float32 => DataType::Float32,
711 Self::Float64 => DataType::Float64,
712 Self::Binary => DataType::Binary,
713 Self::Utf8 => DataType::Utf8,
714 Self::Utf8View => DataType::Utf8View,
715 Self::Date32 => DataType::Date32,
716 Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
717 Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
718 Self::TimestampMillis(is_utc) => {
719 DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
720 }
721 Self::TimestampMicros(is_utc) => {
722 DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
723 }
724 Self::TimestampNanos(is_utc) => {
725 DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
726 }
727 Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
728 Self::Fixed(size) => DataType::FixedSizeBinary(*size),
729 Self::Decimal(precision, scale, _size) => {
730 let p = *precision as u8;
731 let s = scale.unwrap_or(0) as i8;
732 #[cfg(feature = "small_decimals")]
733 {
734 if *precision <= DECIMAL32_MAX_PRECISION as usize {
735 DataType::Decimal32(p, s)
736 } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
737 DataType::Decimal64(p, s)
738 } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
739 DataType::Decimal128(p, s)
740 } else {
741 DataType::Decimal256(p, s)
742 }
743 }
744 #[cfg(not(feature = "small_decimals"))]
745 {
746 if *precision <= DECIMAL128_MAX_PRECISION as usize {
747 DataType::Decimal128(p, s)
748 } else {
749 DataType::Decimal256(p, s)
750 }
751 }
752 }
753 Self::Uuid => DataType::FixedSizeBinary(16),
754 Self::Enum(_) => {
755 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
756 }
757 Self::List(f) => {
758 DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
759 }
760 Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
761 Self::Map(value_type) => {
762 let val_field = value_type.field_with_name("value");
763 DataType::Map(
764 Arc::new(Field::new(
765 "entries",
766 DataType::Struct(Fields::from(vec![
767 Field::new("key", DataType::Utf8, false),
768 val_field,
769 ])),
770 false,
771 )),
772 false,
773 )
774 }
775 Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
776 #[cfg(feature = "avro_custom_types")]
777 Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
778 #[cfg(feature = "avro_custom_types")]
779 Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
780 #[cfg(feature = "avro_custom_types")]
781 Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
782 #[cfg(feature = "avro_custom_types")]
783 Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
784 #[cfg(feature = "avro_custom_types")]
785 Self::RunEndEncoded(values, bits) => {
786 let run_ends_dt = match *bits {
787 16 => DataType::Int16,
788 32 => DataType::Int32,
789 64 => DataType::Int64,
790 _ => unreachable!(),
791 };
792 DataType::RunEndEncoded(
793 Arc::new(Field::new("run_ends", run_ends_dt, false)),
794 Arc::new(Field::new("values", values.codec().data_type(), true)),
795 )
796 }
797 }
798 }
799
800 pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
806 if use_utf8view && matches!(self, Self::Utf8) {
807 Self::Utf8View
808 } else {
809 self
810 }
811 }
812
813 #[inline]
814 fn union_field_name(&self) -> String {
815 UnionFieldKind::from(self).as_ref().to_owned()
816 }
817}
818
819impl From<PrimitiveType> for Codec {
820 fn from(value: PrimitiveType) -> Self {
821 match value {
822 PrimitiveType::Null => Self::Null,
823 PrimitiveType::Boolean => Self::Boolean,
824 PrimitiveType::Int => Self::Int32,
825 PrimitiveType::Long => Self::Int64,
826 PrimitiveType::Float => Self::Float32,
827 PrimitiveType::Double => Self::Float64,
828 PrimitiveType::Bytes => Self::Binary,
829 PrimitiveType::String => Self::Utf8,
830 }
831 }
832}
833
834const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
843 const MAX_P: [usize; 32] = [
848 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
849 59, 62, 64, 67, 69, 71, 74, 76,
850 ];
851 match n {
852 1..=32 => Some(MAX_P[n - 1]),
853 _ => None,
854 }
855}
856
857fn parse_decimal_attributes(
858 attributes: &Attributes,
859 fallback_size: Option<usize>,
860 precision_required: bool,
861) -> Result<(usize, usize, Option<usize>), ArrowError> {
862 let precision = attributes
863 .additional
864 .get("precision")
865 .and_then(|v| v.as_u64())
866 .or(if precision_required { None } else { Some(10) })
867 .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
868 as usize;
869 let scale = attributes
870 .additional
871 .get("scale")
872 .and_then(|v| v.as_u64())
873 .unwrap_or(0) as usize;
874 let size = attributes
875 .additional
876 .get("size")
877 .and_then(|v| v.as_u64())
878 .map(|s| s as usize)
879 .or(fallback_size);
880 if precision == 0 {
881 return Err(ArrowError::ParseError(
882 "Decimal requires precision > 0".to_string(),
883 ));
884 }
885 if scale > precision {
886 return Err(ArrowError::ParseError(format!(
887 "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
888 )));
889 }
890 if precision > DECIMAL256_MAX_PRECISION as usize {
891 return Err(ArrowError::ParseError(format!(
892 "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
893 DECIMAL256_MAX_PRECISION
894 )));
895 }
896 if let Some(sz) = size {
897 let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
898 ArrowError::ParseError(format!(
899 "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
900 ))
901 })?;
902 if precision > max_p {
903 return Err(ArrowError::ParseError(format!(
904 "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
905 )));
906 }
907 }
908 Ok((precision, scale, size))
909}
910
911#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
912#[strum(serialize_all = "snake_case")]
913enum UnionFieldKind {
914 Null,
915 Boolean,
916 Int,
917 Long,
918 Float,
919 Double,
920 Bytes,
921 String,
922 Date,
923 TimeMillis,
924 TimeMicros,
925 TimestampMillisUtc,
926 TimestampMillisLocal,
927 TimestampMicrosUtc,
928 TimestampMicrosLocal,
929 TimestampNanosUtc,
930 TimestampNanosLocal,
931 Duration,
932 Fixed,
933 Decimal,
934 Enum,
935 Array,
936 Record,
937 Map,
938 Uuid,
939 Union,
940}
941
942impl From<&Codec> for UnionFieldKind {
943 fn from(c: &Codec) -> Self {
944 match c {
945 Codec::Null => Self::Null,
946 Codec::Boolean => Self::Boolean,
947 Codec::Int32 => Self::Int,
948 Codec::Int64 => Self::Long,
949 Codec::Float32 => Self::Float,
950 Codec::Float64 => Self::Double,
951 Codec::Binary => Self::Bytes,
952 Codec::Utf8 | Codec::Utf8View => Self::String,
953 Codec::Date32 => Self::Date,
954 Codec::TimeMillis => Self::TimeMillis,
955 Codec::TimeMicros => Self::TimeMicros,
956 Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
957 Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
958 Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
959 Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
960 Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
961 Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
962 Codec::Interval => Self::Duration,
963 Codec::Fixed(_) => Self::Fixed,
964 Codec::Decimal(..) => Self::Decimal,
965 Codec::Enum(_) => Self::Enum,
966 Codec::List(_) => Self::Array,
967 Codec::Struct(_) => Self::Record,
968 Codec::Map(_) => Self::Map,
969 Codec::Uuid => Self::Uuid,
970 Codec::Union(..) => Self::Union,
971 #[cfg(feature = "avro_custom_types")]
972 Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
973 #[cfg(feature = "avro_custom_types")]
974 Codec::DurationNanos
975 | Codec::DurationMicros
976 | Codec::DurationMillis
977 | Codec::DurationSeconds => Self::Duration,
978 }
979 }
980}
981
982fn union_branch_name(dt: &AvroDataType) -> String {
983 if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
984 if name.contains(".") {
985 return name.to_string();
987 }
988 if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
989 return format!("{ns}.{name}");
990 }
991 return name.to_string();
992 }
993 dt.codec.union_field_name()
994}
995
996fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
997 let arrow_fields: Vec<Field> = encodings
998 .iter()
999 .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1000 .collect();
1001 let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1002 UnionFields::try_new(type_ids, arrow_fields)
1003}
1004
1005#[derive(Debug, Default)]
1009struct Resolver<'a> {
1010 map: HashMap<(&'a str, &'a str), AvroDataType>,
1011}
1012
1013impl<'a> Resolver<'a> {
1014 fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1015 self.map.insert((namespace.unwrap_or(""), name), schema);
1016 }
1017
1018 fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1019 let (namespace, name) = name
1020 .rsplit_once('.')
1021 .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1022 self.map
1023 .get(&(namespace, name))
1024 .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1025 .cloned()
1026 }
1027}
1028
1029fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1030 let mut out = HashSet::with_capacity(1 + aliases.len());
1031 let (full, _) = make_full_name(name, ns, None);
1032 out.insert(full);
1033 for a in aliases {
1034 let (fa, _) = make_full_name(a, None, ns);
1035 out.insert(fa);
1036 }
1037 out
1038}
1039
1040fn names_match(
1041 writer_name: &str,
1042 writer_namespace: Option<&str>,
1043 writer_aliases: &[&str],
1044 reader_name: &str,
1045 reader_namespace: Option<&str>,
1046 reader_aliases: &[&str],
1047) -> bool {
1048 let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1049 let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1050 !writer_set.is_disjoint(&reader_set)
1052}
1053
1054fn ensure_names_match(
1055 data_type: &str,
1056 writer_name: &str,
1057 writer_namespace: Option<&str>,
1058 writer_aliases: &[&str],
1059 reader_name: &str,
1060 reader_namespace: Option<&str>,
1061 reader_aliases: &[&str],
1062) -> Result<(), ArrowError> {
1063 if names_match(
1064 writer_name,
1065 writer_namespace,
1066 writer_aliases,
1067 reader_name,
1068 reader_namespace,
1069 reader_aliases,
1070 ) {
1071 Ok(())
1072 } else {
1073 Err(ArrowError::ParseError(format!(
1074 "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1075 )))
1076 }
1077}
1078
1079fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1080 match schema {
1081 Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1082 Schema::Type(Type {
1083 r#type: TypeName::Primitive(primitive),
1084 ..
1085 }) => Some(*primitive),
1086 _ => None,
1087 }
1088}
1089
1090fn nullable_union_variants<'x, 'y>(
1091 variant: &'y [Schema<'x>],
1092) -> Option<(Nullability, &'y Schema<'x>)> {
1093 if variant.len() != 2 {
1094 return None;
1095 }
1096 let is_null = |schema: &Schema<'x>| {
1097 matches!(
1098 schema,
1099 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1100 )
1101 };
1102 match (is_null(&variant[0]), is_null(&variant[1])) {
1103 (true, false) => Some((Nullability::NullFirst, &variant[1])),
1104 (false, true) => Some((Nullability::NullSecond, &variant[0])),
1105 _ => None,
1106 }
1107}
1108
1109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1110enum UnionBranchKey {
1111 Named(String),
1112 Primitive(PrimitiveType),
1113 Array,
1114 Map,
1115}
1116
1117fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1118 let (name, namespace) = match s {
1119 Schema::TypeName(TypeName::Primitive(p))
1120 | Schema::Type(Type {
1121 r#type: TypeName::Primitive(p),
1122 ..
1123 }) => return Some(UnionBranchKey::Primitive(*p)),
1124 Schema::TypeName(TypeName::Ref(name))
1125 | Schema::Type(Type {
1126 r#type: TypeName::Ref(name),
1127 ..
1128 }) => (name, None),
1129 Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1130 Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1131 Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1132 Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1133 Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1134 Schema::Union(_) => return None,
1135 };
1136 let (full, _) = make_full_name(name, namespace, enclosing_ns);
1137 Some(UnionBranchKey::Named(full))
1138}
1139
1140fn union_first_duplicate<'a>(
1141 branches: &'a [Schema<'a>],
1142 enclosing_ns: Option<&'a str>,
1143) -> Option<String> {
1144 let mut seen = HashSet::with_capacity(branches.len());
1145 for schema in branches {
1146 if let Some(key) = branch_key_of(schema, enclosing_ns) {
1147 if !seen.insert(key.clone()) {
1148 let msg = match key {
1149 UnionBranchKey::Named(full) => format!("named type {full}"),
1150 UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1151 UnionBranchKey::Array => "array".to_string(),
1152 UnionBranchKey::Map => "map".to_string(),
1153 };
1154 return Some(msg);
1155 }
1156 }
1157 }
1158 None
1159}
1160
1161struct Maker<'a> {
1165 resolver: Resolver<'a>,
1166 use_utf8view: bool,
1167 strict_mode: bool,
1168}
1169
1170impl<'a> Maker<'a> {
1171 fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1172 Self {
1173 resolver: Default::default(),
1174 use_utf8view,
1175 strict_mode,
1176 }
1177 }
1178
1179 #[cfg(feature = "avro_custom_types")]
1180 #[inline]
1181 fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1182 if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1183 let mut inner = (*values).clone();
1184 inner.nullability = Some(nb);
1185 dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1186 }
1187 }
1188
1189 fn make_data_type<'s>(
1190 &mut self,
1191 writer_schema: &'s Schema<'a>,
1192 reader_schema: Option<&'s Schema<'a>>,
1193 namespace: Option<&'a str>,
1194 ) -> Result<AvroDataType, ArrowError> {
1195 match reader_schema {
1196 Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1197 None => self.parse_type(writer_schema, namespace),
1198 }
1199 }
1200
1201 fn parse_type<'s>(
1214 &mut self,
1215 schema: &'s Schema<'a>,
1216 namespace: Option<&'a str>,
1217 ) -> Result<AvroDataType, ArrowError> {
1218 match schema {
1219 Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1220 Codec::from(*p).with_utf8view(self.use_utf8view),
1221 Default::default(),
1222 None,
1223 )),
1224 Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1225 Schema::Union(f) => {
1226 let null = f
1227 .iter()
1228 .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1229 match (f.len() == 2, null) {
1230 (true, Some(0)) => {
1231 let mut field = self.parse_type(&f[1], namespace)?;
1232 field.nullability = Some(Nullability::NullFirst);
1233 #[cfg(feature = "avro_custom_types")]
1234 Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1235 return Ok(field);
1236 }
1237 (true, Some(1)) => {
1238 if self.strict_mode {
1239 return Err(ArrowError::SchemaError(
1240 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1241 .to_string(),
1242 ));
1243 }
1244 let mut field = self.parse_type(&f[0], namespace)?;
1245 field.nullability = Some(Nullability::NullSecond);
1246 #[cfg(feature = "avro_custom_types")]
1247 Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1248 return Ok(field);
1249 }
1250 _ => {}
1251 }
1252 if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1254 return Err(ArrowError::SchemaError(
1255 "Avro unions may not immediately contain other unions".to_string(),
1256 ));
1257 }
1258 if let Some(dup) = union_first_duplicate(f, namespace) {
1260 return Err(ArrowError::SchemaError(format!(
1261 "Avro union contains duplicate branch type: {dup}"
1262 )));
1263 }
1264 let children: Vec<AvroDataType> = f
1266 .iter()
1267 .map(|s| self.parse_type(s, namespace))
1268 .collect::<Result<_, _>>()?;
1269 let union_fields = build_union_fields(&children)?;
1271 Ok(AvroDataType::new(
1272 Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1273 Default::default(),
1274 None,
1275 ))
1276 }
1277 Schema::Complex(c) => match c {
1278 ComplexType::Record(r) => {
1279 let namespace = r.namespace.or(namespace);
1280 let mut metadata = r.attributes.field_metadata();
1281 let fields = r
1282 .fields
1283 .iter()
1284 .map(|field| {
1285 Ok(AvroField {
1286 name: field.name.to_string(),
1287 data_type: self.parse_type(&field.r#type, namespace)?,
1288 })
1289 })
1290 .collect::<Result<_, ArrowError>>()?;
1291 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1292 if let Some(ns) = namespace {
1293 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1294 }
1295 let field = AvroDataType {
1296 nullability: None,
1297 codec: Codec::Struct(fields),
1298 metadata,
1299 resolution: None,
1300 };
1301 self.resolver.register(r.name, namespace, field.clone());
1302 Ok(field)
1303 }
1304 ComplexType::Array(a) => {
1305 let field = self.parse_type(a.items.as_ref(), namespace)?;
1306 Ok(AvroDataType {
1307 nullability: None,
1308 metadata: a.attributes.field_metadata(),
1309 codec: Codec::List(Arc::new(field)),
1310 resolution: None,
1311 })
1312 }
1313 ComplexType::Fixed(f) => {
1314 let size = f.size.try_into().map_err(|e| {
1315 ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1316 })?;
1317 let namespace = f.namespace.or(namespace);
1318 let mut metadata = f.attributes.field_metadata();
1319 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1320 if let Some(ns) = namespace {
1321 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1322 }
1323 let field = match f.attributes.logical_type {
1324 Some("decimal") => {
1325 let (precision, scale, _) =
1326 parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1327 AvroDataType {
1328 nullability: None,
1329 metadata,
1330 codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1331 resolution: None,
1332 }
1333 }
1334 Some("duration") => {
1335 if size != 12 {
1336 return Err(ArrowError::ParseError(format!(
1337 "Invalid fixed size for Duration: {size}, must be 12"
1338 )));
1339 };
1340 AvroDataType {
1341 nullability: None,
1342 metadata,
1343 codec: Codec::Interval,
1344 resolution: None,
1345 }
1346 }
1347 _ => AvroDataType {
1348 nullability: None,
1349 metadata,
1350 codec: Codec::Fixed(size),
1351 resolution: None,
1352 },
1353 };
1354 self.resolver.register(f.name, namespace, field.clone());
1355 Ok(field)
1356 }
1357 ComplexType::Enum(e) => {
1358 let namespace = e.namespace.or(namespace);
1359 let symbols = e
1360 .symbols
1361 .iter()
1362 .map(|s| s.to_string())
1363 .collect::<Arc<[String]>>();
1364 let mut metadata = e.attributes.field_metadata();
1365 let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1366 ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1367 })?;
1368 metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1369 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1370 if let Some(ns) = namespace {
1371 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1372 }
1373 let field = AvroDataType {
1374 nullability: None,
1375 metadata,
1376 codec: Codec::Enum(symbols),
1377 resolution: None,
1378 };
1379 self.resolver.register(e.name, namespace, field.clone());
1380 Ok(field)
1381 }
1382 ComplexType::Map(m) => {
1383 let val = self.parse_type(&m.values, namespace)?;
1384 Ok(AvroDataType {
1385 nullability: None,
1386 metadata: m.attributes.field_metadata(),
1387 codec: Codec::Map(Arc::new(val)),
1388 resolution: None,
1389 })
1390 }
1391 },
1392 Schema::Type(t) => {
1393 let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1394 match (t.attributes.logical_type, &mut field.codec) {
1396 (Some("decimal"), c @ Codec::Binary) => {
1397 let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1398 *c = Codec::Decimal(prec, Some(sc), None);
1399 }
1400 (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1401 (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1402 (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1403 (Some("timestamp-millis"), c @ Codec::Int64) => {
1404 *c = Codec::TimestampMillis(true)
1405 }
1406 (Some("timestamp-micros"), c @ Codec::Int64) => {
1407 *c = Codec::TimestampMicros(true)
1408 }
1409 (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1410 *c = Codec::TimestampMillis(false)
1411 }
1412 (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1413 *c = Codec::TimestampMicros(false)
1414 }
1415 (Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
1416 (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1417 *c = Codec::TimestampNanos(false)
1418 }
1419 (Some("uuid"), c @ Codec::Utf8) => {
1420 *c = Codec::Uuid;
1424 field.metadata.insert("logicalType".into(), "uuid".into());
1425 }
1426 #[cfg(feature = "avro_custom_types")]
1427 (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1428 #[cfg(feature = "avro_custom_types")]
1429 (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1430 #[cfg(feature = "avro_custom_types")]
1431 (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1432 #[cfg(feature = "avro_custom_types")]
1433 (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1434 *c = Codec::DurationSeconds
1435 }
1436 #[cfg(feature = "avro_custom_types")]
1437 (Some("arrow.run-end-encoded"), _) => {
1438 let bits_u8: u8 = t
1439 .attributes
1440 .additional
1441 .get("arrow.runEndIndexBits")
1442 .and_then(|v| v.as_u64())
1443 .and_then(|n| u8::try_from(n).ok())
1444 .ok_or_else(|| ArrowError::ParseError(
1445 "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1446 .to_string(),
1447 ))?;
1448 if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1449 return Err(ArrowError::ParseError(format!(
1450 "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1451 )));
1452 }
1453 let values_site = field.clone();
1455 field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1456 }
1457 (Some(logical), _) => {
1458 field.metadata.insert("logicalType".into(), logical.into());
1460 }
1461 (None, _) => {}
1462 }
1463 if matches!(field.codec, Codec::Int64) {
1464 if let Some(unit) = t
1465 .attributes
1466 .additional
1467 .get("arrowTimeUnit")
1468 .and_then(|v| v.as_str())
1469 {
1470 if unit == "nanosecond" {
1471 field.codec = Codec::TimestampNanos(false);
1472 }
1473 }
1474 }
1475 if !t.attributes.additional.is_empty() {
1476 for (k, v) in &t.attributes.additional {
1477 field.metadata.insert(k.to_string(), v.to_string());
1478 }
1479 }
1480 Ok(field)
1481 }
1482 }
1483 }
1484
1485 fn resolve_type<'s>(
1486 &mut self,
1487 writer_schema: &'s Schema<'a>,
1488 reader_schema: &'s Schema<'a>,
1489 namespace: Option<&'a str>,
1490 ) -> Result<AvroDataType, ArrowError> {
1491 if let (Some(write_primitive), Some(read_primitive)) =
1492 (primitive_of(writer_schema), primitive_of(reader_schema))
1493 {
1494 return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1495 }
1496 match (writer_schema, reader_schema) {
1497 (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1498 let writer_variants = writer_variants.as_slice();
1499 let reader_variants = reader_variants.as_slice();
1500 match (
1501 nullable_union_variants(writer_variants),
1502 nullable_union_variants(reader_variants),
1503 ) {
1504 (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
1505 let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?;
1506 dt.nullability = Some(w_nb);
1507 #[cfg(feature = "avro_custom_types")]
1508 Self::propagate_nullability_into_ree(&mut dt, w_nb);
1509 Ok(dt)
1510 }
1511 _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1512 }
1513 }
1514 (Schema::Union(writer_variants), reader_non_union) => {
1515 let writer_to_reader: Vec<Option<(usize, Promotion)>> = writer_variants
1516 .iter()
1517 .map(|writer| {
1518 self.resolve_type(writer, reader_non_union, namespace)
1519 .ok()
1520 .map(|tmp| (0usize, Self::coercion_from(&tmp)))
1521 })
1522 .collect();
1523 let mut dt = self.parse_type(reader_non_union, namespace)?;
1524 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1525 writer_to_reader: Arc::from(writer_to_reader),
1526 writer_is_union: true,
1527 reader_is_union: false,
1528 }));
1529 Ok(dt)
1530 }
1531 (writer_non_union, Schema::Union(reader_variants)) => {
1532 if let Some((nullability, non_null_branch)) =
1533 nullable_union_variants(reader_variants)
1534 {
1535 let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1536 let non_null_idx = match nullability {
1537 Nullability::NullFirst => 1,
1538 Nullability::NullSecond => 0,
1539 };
1540 #[cfg(feature = "avro_custom_types")]
1541 Self::propagate_nullability_into_ree(&mut dt, nullability);
1542 dt.nullability = Some(nullability);
1543 let promotion = Self::coercion_from(&dt);
1544 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1545 writer_to_reader: Arc::from(vec![Some((non_null_idx, promotion))]),
1546 writer_is_union: false,
1547 reader_is_union: true,
1548 }));
1549 Ok(dt)
1550 } else {
1551 let mut best_match: Option<(usize, AvroDataType, Promotion)> = None;
1552 for (i, variant) in reader_variants.iter().enumerate() {
1553 if let Ok(resolved_dt) =
1554 self.resolve_type(writer_non_union, variant, namespace)
1555 {
1556 let promotion = Self::coercion_from(&resolved_dt);
1557 if promotion == Promotion::Direct {
1558 best_match = Some((i, resolved_dt, promotion));
1559 break;
1560 } else if best_match.is_none() {
1561 best_match = Some((i, resolved_dt, promotion));
1562 }
1563 }
1564 }
1565 let Some((match_idx, match_dt, promotion)) = best_match else {
1566 return Err(ArrowError::SchemaError(
1567 "Writer schema does not match any reader union branch".to_string(),
1568 ));
1569 };
1570 let mut children = Vec::with_capacity(reader_variants.len());
1571 let mut match_dt = Some(match_dt);
1572 for (i, variant) in reader_variants.iter().enumerate() {
1573 if i == match_idx {
1574 if let Some(mut dt) = match_dt.take() {
1575 if matches!(dt.resolution, Some(ResolutionInfo::Promotion(_))) {
1576 dt.resolution = None;
1577 }
1578 children.push(dt);
1579 }
1580 } else {
1581 children.push(self.parse_type(variant, namespace)?);
1582 }
1583 }
1584 let union_fields = build_union_fields(&children)?;
1585 let mut dt = AvroDataType::new(
1586 Codec::Union(children.into(), union_fields, UnionMode::Dense),
1587 Default::default(),
1588 None,
1589 );
1590 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1591 writer_to_reader: Arc::from(vec![Some((match_idx, promotion))]),
1592 writer_is_union: false,
1593 reader_is_union: true,
1594 }));
1595 Ok(dt)
1596 }
1597 }
1598 (
1599 Schema::Complex(ComplexType::Array(writer_array)),
1600 Schema::Complex(ComplexType::Array(reader_array)),
1601 ) => self.resolve_array(writer_array, reader_array, namespace),
1602 (
1603 Schema::Complex(ComplexType::Map(writer_map)),
1604 Schema::Complex(ComplexType::Map(reader_map)),
1605 ) => self.resolve_map(writer_map, reader_map, namespace),
1606 (
1607 Schema::Complex(ComplexType::Fixed(writer_fixed)),
1608 Schema::Complex(ComplexType::Fixed(reader_fixed)),
1609 ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1610 (
1611 Schema::Complex(ComplexType::Record(writer_record)),
1612 Schema::Complex(ComplexType::Record(reader_record)),
1613 ) => self.resolve_records(writer_record, reader_record, namespace),
1614 (
1615 Schema::Complex(ComplexType::Enum(writer_enum)),
1616 Schema::Complex(ComplexType::Enum(reader_enum)),
1617 ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1618 (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1619 (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1620 _ => Err(ArrowError::NotYetImplemented(
1621 "Other resolutions not yet implemented".to_string(),
1622 )),
1623 }
1624 }
1625
1626 #[inline]
1627 fn coercion_from(dt: &AvroDataType) -> Promotion {
1628 match dt.resolution.as_ref() {
1629 Some(ResolutionInfo::Promotion(promotion)) => *promotion,
1630 _ => Promotion::Direct,
1631 }
1632 }
1633
1634 fn find_best_promotion(
1635 &mut self,
1636 writer: &Schema<'a>,
1637 reader_variants: &[Schema<'a>],
1638 namespace: Option<&'a str>,
1639 ) -> Option<(usize, Promotion)> {
1640 let mut first_promotion: Option<(usize, Promotion)> = None;
1641 for (reader_index, reader) in reader_variants.iter().enumerate() {
1642 if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
1643 let promotion = Self::coercion_from(&tmp);
1644 if promotion == Promotion::Direct {
1645 return Some((reader_index, promotion));
1647 } else if first_promotion.is_none() {
1648 first_promotion = Some((reader_index, promotion));
1650 }
1651 }
1652 }
1653 first_promotion
1654 }
1655
1656 fn resolve_unions<'s>(
1657 &mut self,
1658 writer_variants: &'s [Schema<'a>],
1659 reader_variants: &'s [Schema<'a>],
1660 namespace: Option<&'a str>,
1661 ) -> Result<AvroDataType, ArrowError> {
1662 let reader_encodings: Vec<AvroDataType> = reader_variants
1663 .iter()
1664 .map(|reader_schema| self.parse_type(reader_schema, namespace))
1665 .collect::<Result<_, _>>()?;
1666 let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
1667 Vec::with_capacity(writer_variants.len());
1668 for writer in writer_variants {
1669 writer_to_reader.push(self.find_best_promotion(writer, reader_variants, namespace));
1670 }
1671 let union_fields = build_union_fields(&reader_encodings)?;
1672 let mut dt = AvroDataType::new(
1673 Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1674 Default::default(),
1675 None,
1676 );
1677 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1678 writer_to_reader: Arc::from(writer_to_reader),
1679 writer_is_union: true,
1680 reader_is_union: true,
1681 }));
1682 Ok(dt)
1683 }
1684
1685 fn resolve_array(
1686 &mut self,
1687 writer_array: &Array<'a>,
1688 reader_array: &Array<'a>,
1689 namespace: Option<&'a str>,
1690 ) -> Result<AvroDataType, ArrowError> {
1691 Ok(AvroDataType {
1692 nullability: None,
1693 metadata: reader_array.attributes.field_metadata(),
1694 codec: Codec::List(Arc::new(self.make_data_type(
1695 writer_array.items.as_ref(),
1696 Some(reader_array.items.as_ref()),
1697 namespace,
1698 )?)),
1699 resolution: None,
1700 })
1701 }
1702
1703 fn resolve_map(
1704 &mut self,
1705 writer_map: &Map<'a>,
1706 reader_map: &Map<'a>,
1707 namespace: Option<&'a str>,
1708 ) -> Result<AvroDataType, ArrowError> {
1709 Ok(AvroDataType {
1710 nullability: None,
1711 metadata: reader_map.attributes.field_metadata(),
1712 codec: Codec::Map(Arc::new(self.make_data_type(
1713 &writer_map.values,
1714 Some(&reader_map.values),
1715 namespace,
1716 )?)),
1717 resolution: None,
1718 })
1719 }
1720
1721 fn resolve_fixed<'s>(
1722 &mut self,
1723 writer_fixed: &Fixed<'a>,
1724 reader_fixed: &Fixed<'a>,
1725 reader_schema: &'s Schema<'a>,
1726 namespace: Option<&'a str>,
1727 ) -> Result<AvroDataType, ArrowError> {
1728 ensure_names_match(
1729 "Fixed",
1730 writer_fixed.name,
1731 writer_fixed.namespace,
1732 &writer_fixed.aliases,
1733 reader_fixed.name,
1734 reader_fixed.namespace,
1735 &reader_fixed.aliases,
1736 )?;
1737 if writer_fixed.size != reader_fixed.size {
1738 return Err(ArrowError::SchemaError(format!(
1739 "Fixed size mismatch for {}: writer={}, reader={}",
1740 reader_fixed.name, writer_fixed.size, reader_fixed.size
1741 )));
1742 }
1743 self.parse_type(reader_schema, namespace)
1744 }
1745
1746 fn resolve_primitives(
1747 &mut self,
1748 write_primitive: PrimitiveType,
1749 read_primitive: PrimitiveType,
1750 reader_schema: &Schema<'a>,
1751 ) -> Result<AvroDataType, ArrowError> {
1752 if write_primitive == read_primitive {
1753 return self.parse_type(reader_schema, None);
1754 }
1755 let promotion = match (write_primitive, read_primitive) {
1756 (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
1757 (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
1758 (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
1759 (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
1760 (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
1761 (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
1762 (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
1763 (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
1764 _ => {
1765 return Err(ArrowError::ParseError(format!(
1766 "Illegal promotion {write_primitive:?} to {read_primitive:?}"
1767 )));
1768 }
1769 };
1770 let mut datatype = self.parse_type(reader_schema, None)?;
1771 datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
1772 Ok(datatype)
1773 }
1774
1775 fn resolve_enums(
1831 &mut self,
1832 writer_enum: &Enum<'a>,
1833 reader_enum: &Enum<'a>,
1834 reader_schema: &Schema<'a>,
1835 namespace: Option<&'a str>,
1836 ) -> Result<AvroDataType, ArrowError> {
1837 ensure_names_match(
1838 "Enum",
1839 writer_enum.name,
1840 writer_enum.namespace,
1841 &writer_enum.aliases,
1842 reader_enum.name,
1843 reader_enum.namespace,
1844 &reader_enum.aliases,
1845 )?;
1846 if writer_enum.symbols == reader_enum.symbols {
1847 return self.parse_type(reader_schema, namespace);
1848 }
1849 let reader_index: HashMap<&str, i32> = reader_enum
1850 .symbols
1851 .iter()
1852 .enumerate()
1853 .map(|(index, &symbol)| (symbol, index as i32))
1854 .collect();
1855 let default_index: i32 = match reader_enum.default {
1856 Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
1857 ArrowError::SchemaError(format!(
1858 "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
1859 reader_enum.name,
1860 ))
1861 })?,
1862 None => -1,
1863 };
1864 let mapping: Vec<i32> = writer_enum
1865 .symbols
1866 .iter()
1867 .map(|&write_symbol| {
1868 reader_index
1869 .get(write_symbol)
1870 .copied()
1871 .unwrap_or(default_index)
1872 })
1873 .collect();
1874 if self.strict_mode && mapping.iter().any(|&m| m < 0) {
1875 return Err(ArrowError::SchemaError(format!(
1876 "Reader enum '{}' does not cover all writer symbols and no default is provided",
1877 reader_enum.name
1878 )));
1879 }
1880 let mut dt = self.parse_type(reader_schema, namespace)?;
1881 dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
1882 mapping: Arc::from(mapping),
1883 default_index,
1884 }));
1885 let reader_ns = reader_enum.namespace.or(namespace);
1886 self.resolver
1887 .register(reader_enum.name, reader_ns, dt.clone());
1888 Ok(dt)
1889 }
1890
1891 #[inline]
1892 fn build_writer_lookup(
1893 writer_record: &Record<'a>,
1894 ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
1895 let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
1896 for (idx, wf) in writer_record.fields.iter().enumerate() {
1897 map.insert(wf.name, idx);
1899 }
1900 let mut ambiguous: HashSet<&str> = HashSet::new();
1902 for (idx, wf) in writer_record.fields.iter().enumerate() {
1903 for &alias in &wf.aliases {
1904 match map.entry(alias) {
1905 Entry::Occupied(e) if *e.get() != idx => {
1906 ambiguous.insert(alias);
1907 }
1908 Entry::Vacant(e) => {
1909 e.insert(idx);
1910 }
1911 _ => {}
1912 }
1913 }
1914 }
1915 (map, ambiguous)
1916 }
1917
1918 fn resolve_records(
1919 &mut self,
1920 writer_record: &Record<'a>,
1921 reader_record: &Record<'a>,
1922 namespace: Option<&'a str>,
1923 ) -> Result<AvroDataType, ArrowError> {
1924 ensure_names_match(
1925 "Record",
1926 writer_record.name,
1927 writer_record.namespace,
1928 &writer_record.aliases,
1929 reader_record.name,
1930 reader_record.namespace,
1931 &reader_record.aliases,
1932 )?;
1933 let writer_ns = writer_record.namespace.or(namespace);
1934 let reader_ns = reader_record.namespace.or(namespace);
1935 let reader_md = reader_record.attributes.field_metadata();
1936 let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
1938 let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
1939 let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
1940 let mut default_fields: Vec<usize> = Vec::new();
1942 for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
1943 let mut match_idx = writer_lookup.get(r_field.name).copied();
1945 let mut matched_via_alias: Option<&str> = None;
1946 if match_idx.is_none() {
1947 for &alias in &r_field.aliases {
1948 if let Some(i) = writer_lookup.get(alias).copied() {
1949 if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
1950 return Err(ArrowError::SchemaError(format!(
1951 "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
1952 r_field.name
1953 )));
1954 }
1955 match_idx = Some(i);
1956 matched_via_alias = Some(alias);
1957 break;
1958 }
1959 }
1960 }
1961 if let Some(wi) = match_idx {
1962 if writer_to_reader[wi].is_none() {
1963 let w_schema = &writer_record.fields[wi].r#type;
1964 let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
1965 writer_to_reader[wi] = Some(reader_idx);
1966 reader_fields.push(AvroField {
1967 name: r_field.name.to_owned(),
1968 data_type: dt,
1969 });
1970 continue;
1971 } else if self.strict_mode {
1972 let existing_reader = writer_to_reader[wi].unwrap();
1974 let via = matched_via_alias
1975 .map(|a| format!("alias '{a}'"))
1976 .unwrap_or_else(|| "name match".to_string());
1977 return Err(ArrowError::SchemaError(format!(
1978 "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
1979 writer_record.fields[wi].name
1980 )));
1981 }
1982 }
1984 let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
1986 if let Some(default_json) = r_field.default.as_ref() {
1987 dt.resolution = Some(ResolutionInfo::DefaultValue(
1988 dt.parse_and_store_default(default_json)?,
1989 ));
1990 default_fields.push(reader_idx);
1991 } else if dt.nullability() == Some(Nullability::NullFirst) {
1992 dt.resolution = Some(ResolutionInfo::DefaultValue(
1994 dt.parse_and_store_default(&Value::Null)?,
1995 ));
1996 default_fields.push(reader_idx);
1997 } else {
1998 return Err(ArrowError::SchemaError(format!(
1999 "Reader field '{}' not present in writer schema must have a default value",
2000 r_field.name
2001 )));
2002 }
2003 reader_fields.push(AvroField {
2004 name: r_field.name.to_owned(),
2005 data_type: dt,
2006 });
2007 }
2008 let mut skip_fields: Vec<Option<AvroDataType>> =
2010 Vec::with_capacity(writer_record.fields.len());
2011 for (writer_index, writer_field) in writer_record.fields.iter().enumerate() {
2012 if writer_to_reader[writer_index].is_some() {
2013 skip_fields.push(None);
2014 } else {
2015 skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?));
2016 }
2017 }
2018 let resolved = AvroDataType::new_with_resolution(
2019 Codec::Struct(Arc::from(reader_fields)),
2020 reader_md,
2021 None,
2022 Some(ResolutionInfo::Record(ResolvedRecord {
2023 writer_to_reader: Arc::from(writer_to_reader),
2024 default_fields: Arc::from(default_fields),
2025 skip_fields: Arc::from(skip_fields),
2026 })),
2027 );
2028 self.resolver
2030 .register(reader_record.name, reader_ns, resolved.clone());
2031 Ok(resolved)
2032 }
2033}
2034
2035#[cfg(test)]
2036mod tests {
2037 use super::*;
2038 use crate::schema::{
2039 AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2040 Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2041 };
2042 use indexmap::IndexMap;
2043 use serde_json::{self, Value};
2044
2045 fn create_schema_with_logical_type(
2046 primitive_type: PrimitiveType,
2047 logical_type: &'static str,
2048 ) -> Schema<'static> {
2049 let attributes = Attributes {
2050 logical_type: Some(logical_type),
2051 additional: Default::default(),
2052 };
2053
2054 Schema::Type(Type {
2055 r#type: TypeName::Primitive(primitive_type),
2056 attributes,
2057 })
2058 }
2059
2060 fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2061 let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2062 let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2063 let mut maker = Maker::new(false, false);
2064 maker
2065 .make_data_type(&writer_schema, Some(&reader_schema), None)
2066 .expect("promotion should resolve")
2067 }
2068
2069 fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2070 Schema::TypeName(TypeName::Primitive(pt))
2071 }
2072 fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2073 Schema::Union(branches)
2074 }
2075
2076 #[test]
2077 fn test_date_logical_type() {
2078 let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2079
2080 let mut maker = Maker::new(false, false);
2081 let result = maker.make_data_type(&schema, None, None).unwrap();
2082
2083 assert!(matches!(result.codec, Codec::Date32));
2084 }
2085
2086 #[test]
2087 fn test_time_millis_logical_type() {
2088 let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2089
2090 let mut maker = Maker::new(false, false);
2091 let result = maker.make_data_type(&schema, None, None).unwrap();
2092
2093 assert!(matches!(result.codec, Codec::TimeMillis));
2094 }
2095
2096 #[test]
2097 fn test_time_micros_logical_type() {
2098 let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2099
2100 let mut maker = Maker::new(false, false);
2101 let result = maker.make_data_type(&schema, None, None).unwrap();
2102
2103 assert!(matches!(result.codec, Codec::TimeMicros));
2104 }
2105
2106 #[test]
2107 fn test_timestamp_millis_logical_type() {
2108 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2109
2110 let mut maker = Maker::new(false, false);
2111 let result = maker.make_data_type(&schema, None, None).unwrap();
2112
2113 assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2114 }
2115
2116 #[test]
2117 fn test_timestamp_micros_logical_type() {
2118 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2119
2120 let mut maker = Maker::new(false, false);
2121 let result = maker.make_data_type(&schema, None, None).unwrap();
2122
2123 assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2124 }
2125
2126 #[test]
2127 fn test_local_timestamp_millis_logical_type() {
2128 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2129
2130 let mut maker = Maker::new(false, false);
2131 let result = maker.make_data_type(&schema, None, None).unwrap();
2132
2133 assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2134 }
2135
2136 #[test]
2137 fn test_local_timestamp_micros_logical_type() {
2138 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2139
2140 let mut maker = Maker::new(false, false);
2141 let result = maker.make_data_type(&schema, None, None).unwrap();
2142
2143 assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2144 }
2145
2146 #[test]
2147 fn test_uuid_type() {
2148 let mut codec = Codec::Fixed(16);
2149 if let c @ Codec::Fixed(16) = &mut codec {
2150 *c = Codec::Uuid;
2151 }
2152 assert!(matches!(codec, Codec::Uuid));
2153 }
2154
2155 #[test]
2156 fn test_duration_logical_type() {
2157 let mut codec = Codec::Fixed(12);
2158
2159 if let c @ Codec::Fixed(12) = &mut codec {
2160 *c = Codec::Interval;
2161 }
2162
2163 assert!(matches!(codec, Codec::Interval));
2164 }
2165
2166 #[test]
2167 fn test_decimal_logical_type_not_implemented() {
2168 let codec = Codec::Fixed(16);
2169
2170 let process_decimal = || -> Result<(), ArrowError> {
2171 if let Codec::Fixed(_) = codec {
2172 return Err(ArrowError::NotYetImplemented(
2173 "Decimals are not currently supported".to_string(),
2174 ));
2175 }
2176 Ok(())
2177 };
2178
2179 let result = process_decimal();
2180
2181 assert!(result.is_err());
2182 if let Err(ArrowError::NotYetImplemented(msg)) = result {
2183 assert!(msg.contains("Decimals are not currently supported"));
2184 } else {
2185 panic!("Expected NotYetImplemented error");
2186 }
2187 }
2188 #[test]
2189 fn test_unknown_logical_type_added_to_metadata() {
2190 let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2191
2192 let mut maker = Maker::new(false, false);
2193 let result = maker.make_data_type(&schema, None, None).unwrap();
2194
2195 assert_eq!(
2196 result.metadata.get("logicalType"),
2197 Some(&"custom-type".to_string())
2198 );
2199 }
2200
2201 #[test]
2202 fn test_string_with_utf8view_enabled() {
2203 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2204
2205 let mut maker = Maker::new(true, false);
2206 let result = maker.make_data_type(&schema, None, None).unwrap();
2207
2208 assert!(matches!(result.codec, Codec::Utf8View));
2209 }
2210
2211 #[test]
2212 fn test_string_without_utf8view_enabled() {
2213 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2214
2215 let mut maker = Maker::new(false, false);
2216 let result = maker.make_data_type(&schema, None, None).unwrap();
2217
2218 assert!(matches!(result.codec, Codec::Utf8));
2219 }
2220
2221 #[test]
2222 fn test_record_with_string_and_utf8view_enabled() {
2223 let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2224
2225 let avro_field = crate::schema::Field {
2226 name: "string_field",
2227 r#type: field_schema,
2228 default: None,
2229 doc: None,
2230 aliases: vec![],
2231 };
2232
2233 let record = Record {
2234 name: "test_record",
2235 namespace: None,
2236 aliases: vec![],
2237 doc: None,
2238 fields: vec![avro_field],
2239 attributes: Attributes::default(),
2240 };
2241
2242 let schema = Schema::Complex(ComplexType::Record(record));
2243
2244 let mut maker = Maker::new(true, false);
2245 let result = maker.make_data_type(&schema, None, None).unwrap();
2246
2247 if let Codec::Struct(fields) = &result.codec {
2248 let first_field_codec = &fields[0].data_type().codec;
2249 assert!(matches!(first_field_codec, Codec::Utf8View));
2250 } else {
2251 panic!("Expected Struct codec");
2252 }
2253 }
2254
2255 #[test]
2256 fn test_union_with_strict_mode() {
2257 let schema = Schema::Union(vec![
2258 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2259 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2260 ]);
2261
2262 let mut maker = Maker::new(false, true);
2263 let result = maker.make_data_type(&schema, None, None);
2264
2265 assert!(result.is_err());
2266 match result {
2267 Err(ArrowError::SchemaError(msg)) => {
2268 assert!(msg.contains(
2269 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2270 ));
2271 }
2272 _ => panic!("Expected SchemaError"),
2273 }
2274 }
2275
2276 #[test]
2277 fn test_resolve_int_to_float_promotion() {
2278 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2279 assert!(matches!(result.codec, Codec::Float32));
2280 assert_eq!(
2281 result.resolution,
2282 Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2283 );
2284 }
2285
2286 #[test]
2287 fn test_resolve_int_to_double_promotion() {
2288 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2289 assert!(matches!(result.codec, Codec::Float64));
2290 assert_eq!(
2291 result.resolution,
2292 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2293 );
2294 }
2295
2296 #[test]
2297 fn test_resolve_long_to_float_promotion() {
2298 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2299 assert!(matches!(result.codec, Codec::Float32));
2300 assert_eq!(
2301 result.resolution,
2302 Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2303 );
2304 }
2305
2306 #[test]
2307 fn test_resolve_long_to_double_promotion() {
2308 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2309 assert!(matches!(result.codec, Codec::Float64));
2310 assert_eq!(
2311 result.resolution,
2312 Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2313 );
2314 }
2315
2316 #[test]
2317 fn test_resolve_float_to_double_promotion() {
2318 let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2319 assert!(matches!(result.codec, Codec::Float64));
2320 assert_eq!(
2321 result.resolution,
2322 Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2323 );
2324 }
2325
2326 #[test]
2327 fn test_resolve_string_to_bytes_promotion() {
2328 let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2329 assert!(matches!(result.codec, Codec::Binary));
2330 assert_eq!(
2331 result.resolution,
2332 Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2333 );
2334 }
2335
2336 #[test]
2337 fn test_resolve_bytes_to_string_promotion() {
2338 let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2339 assert!(matches!(result.codec, Codec::Utf8));
2340 assert_eq!(
2341 result.resolution,
2342 Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2343 );
2344 }
2345
2346 #[test]
2347 fn test_resolve_illegal_promotion_double_to_float_errors() {
2348 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2349 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2350 let mut maker = Maker::new(false, false);
2351 let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2352 assert!(result.is_err());
2353 match result {
2354 Err(ArrowError::ParseError(msg)) => {
2355 assert!(msg.contains("Illegal promotion"));
2356 }
2357 _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2358 }
2359 }
2360
2361 #[test]
2362 fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2363 let writer = Schema::Union(vec![
2364 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2365 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2366 ]);
2367 let reader = Schema::Union(vec![
2368 Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2369 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2370 ]);
2371 let mut maker = Maker::new(false, false);
2372 let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2373 assert!(matches!(result.codec, Codec::Float64));
2374 assert_eq!(
2375 result.resolution,
2376 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2377 );
2378 assert_eq!(result.nullability, Some(Nullability::NullFirst));
2379 }
2380
2381 #[test]
2382 fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2383 let writer = mk_union(vec![
2384 mk_primitive(PrimitiveType::String),
2385 mk_primitive(PrimitiveType::Long),
2386 ]);
2387 let reader = mk_primitive(PrimitiveType::Bytes);
2388 let mut maker = Maker::new(false, false);
2389 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2390 assert!(matches!(dt.codec(), Codec::Binary));
2391 let resolved = match dt.resolution {
2392 Some(ResolutionInfo::Union(u)) => u,
2393 other => panic!("expected union resolution info, got {other:?}"),
2394 };
2395 assert!(resolved.writer_is_union && !resolved.reader_is_union);
2396 assert_eq!(
2397 resolved.writer_to_reader.as_ref(),
2398 &[Some((0, Promotion::StringToBytes)), None]
2399 );
2400 }
2401
2402 #[test]
2403 fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2404 let writer = mk_primitive(PrimitiveType::Long);
2405 let reader = mk_union(vec![
2406 mk_primitive(PrimitiveType::Long),
2407 mk_primitive(PrimitiveType::Double),
2408 ]);
2409 let mut maker = Maker::new(false, false);
2410 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2411 let resolved = match dt.resolution {
2412 Some(ResolutionInfo::Union(u)) => u,
2413 other => panic!("expected union resolution info, got {other:?}"),
2414 };
2415 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2416 assert_eq!(
2417 resolved.writer_to_reader.as_ref(),
2418 &[Some((0, Promotion::Direct))]
2419 );
2420 }
2421
2422 #[test]
2423 fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2424 let writer = mk_primitive(PrimitiveType::Int);
2425 let reader = mk_union(vec![
2426 mk_primitive(PrimitiveType::Null),
2427 mk_primitive(PrimitiveType::Long),
2428 mk_primitive(PrimitiveType::String),
2429 ]);
2430 let mut maker = Maker::new(false, false);
2431 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2432 let resolved = match dt.resolution {
2433 Some(ResolutionInfo::Union(u)) => u,
2434 other => panic!("expected union resolution info, got {other:?}"),
2435 };
2436 assert_eq!(
2437 resolved.writer_to_reader.as_ref(),
2438 &[Some((1, Promotion::IntToLong))]
2439 );
2440 }
2441
2442 #[test]
2443 fn test_resolve_both_nullable_unions_direct_match() {
2444 let writer = mk_union(vec![
2445 mk_primitive(PrimitiveType::Null),
2446 mk_primitive(PrimitiveType::String),
2447 ]);
2448 let reader = mk_union(vec![
2449 mk_primitive(PrimitiveType::String),
2450 mk_primitive(PrimitiveType::Null),
2451 ]);
2452 let mut maker = Maker::new(false, false);
2453 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2454 assert!(matches!(dt.codec(), Codec::Utf8));
2455 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2456 assert!(dt.resolution.is_none());
2457 }
2458
2459 #[test]
2460 fn test_resolve_both_nullable_unions_with_promotion() {
2461 let writer = mk_union(vec![
2462 mk_primitive(PrimitiveType::Null),
2463 mk_primitive(PrimitiveType::Int),
2464 ]);
2465 let reader = mk_union(vec![
2466 mk_primitive(PrimitiveType::Double),
2467 mk_primitive(PrimitiveType::Null),
2468 ]);
2469 let mut maker = Maker::new(false, false);
2470 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2471 assert!(matches!(dt.codec(), Codec::Float64));
2472 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2473 assert_eq!(
2474 dt.resolution,
2475 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2476 );
2477 }
2478
2479 #[test]
2480 fn test_resolve_type_promotion() {
2481 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2482 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2483 let mut maker = Maker::new(false, false);
2484 let result = maker
2485 .make_data_type(&writer_schema, Some(&reader_schema), None)
2486 .unwrap();
2487 assert!(matches!(result.codec, Codec::Int64));
2488 assert_eq!(
2489 result.resolution,
2490 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2491 );
2492 }
2493
2494 #[test]
2495 fn test_nested_record_type_reuse_without_namespace() {
2496 let schema_str = r#"
2497 {
2498 "type": "record",
2499 "name": "Record",
2500 "fields": [
2501 {
2502 "name": "nested",
2503 "type": {
2504 "type": "record",
2505 "name": "Nested",
2506 "fields": [
2507 { "name": "nested_int", "type": "int" }
2508 ]
2509 }
2510 },
2511 { "name": "nestedRecord", "type": "Nested" },
2512 { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
2513 { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
2514 ]
2515 }
2516 "#;
2517
2518 let schema: Schema = serde_json::from_str(schema_str).unwrap();
2519
2520 let mut maker = Maker::new(false, false);
2521 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2522
2523 if let Codec::Struct(fields) = avro_data_type.codec() {
2524 assert_eq!(fields.len(), 4);
2525
2526 assert_eq!(fields[0].name(), "nested");
2528 let nested_data_type = fields[0].data_type();
2529 if let Codec::Struct(nested_fields) = nested_data_type.codec() {
2530 assert_eq!(nested_fields.len(), 1);
2531 assert_eq!(nested_fields[0].name(), "nested_int");
2532 assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
2533 } else {
2534 panic!(
2535 "'nested' field is not a struct but {:?}",
2536 nested_data_type.codec()
2537 );
2538 }
2539
2540 assert_eq!(fields[1].name(), "nestedRecord");
2542 let nested_record_data_type = fields[1].data_type();
2543 assert_eq!(
2544 nested_record_data_type.codec().data_type(),
2545 nested_data_type.codec().data_type()
2546 );
2547
2548 assert_eq!(fields[2].name(), "nestedArray");
2550 if let Codec::List(item_type) = fields[2].data_type().codec() {
2551 assert_eq!(
2552 item_type.codec().data_type(),
2553 nested_data_type.codec().data_type()
2554 );
2555 } else {
2556 panic!("'nestedArray' field is not a list");
2557 }
2558
2559 assert_eq!(fields[3].name(), "nestedMap");
2561 if let Codec::Map(value_type) = fields[3].data_type().codec() {
2562 assert_eq!(
2563 value_type.codec().data_type(),
2564 nested_data_type.codec().data_type()
2565 );
2566 } else {
2567 panic!("'nestedMap' field is not a map");
2568 }
2569 } else {
2570 panic!("Top-level schema is not a struct");
2571 }
2572 }
2573
2574 #[test]
2575 fn test_nested_enum_type_reuse_with_namespace() {
2576 let schema_str = r#"
2577 {
2578 "type": "record",
2579 "name": "Record",
2580 "namespace": "record_ns",
2581 "fields": [
2582 {
2583 "name": "status",
2584 "type": {
2585 "type": "enum",
2586 "name": "Status",
2587 "namespace": "enum_ns",
2588 "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
2589 }
2590 },
2591 { "name": "backupStatus", "type": "enum_ns.Status" },
2592 { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
2593 { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
2594 ]
2595 }
2596 "#;
2597
2598 let schema: Schema = serde_json::from_str(schema_str).unwrap();
2599
2600 let mut maker = Maker::new(false, false);
2601 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2602
2603 if let Codec::Struct(fields) = avro_data_type.codec() {
2604 assert_eq!(fields.len(), 4);
2605
2606 assert_eq!(fields[0].name(), "status");
2608 let status_data_type = fields[0].data_type();
2609 if let Codec::Enum(symbols) = status_data_type.codec() {
2610 assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
2611 } else {
2612 panic!(
2613 "'status' field is not an enum but {:?}",
2614 status_data_type.codec()
2615 );
2616 }
2617
2618 assert_eq!(fields[1].name(), "backupStatus");
2620 let backup_status_data_type = fields[1].data_type();
2621 assert_eq!(
2622 backup_status_data_type.codec().data_type(),
2623 status_data_type.codec().data_type()
2624 );
2625
2626 assert_eq!(fields[2].name(), "statusHistory");
2628 if let Codec::List(item_type) = fields[2].data_type().codec() {
2629 assert_eq!(
2630 item_type.codec().data_type(),
2631 status_data_type.codec().data_type()
2632 );
2633 } else {
2634 panic!("'statusHistory' field is not a list");
2635 }
2636
2637 assert_eq!(fields[3].name(), "statusMap");
2639 if let Codec::Map(value_type) = fields[3].data_type().codec() {
2640 assert_eq!(
2641 value_type.codec().data_type(),
2642 status_data_type.codec().data_type()
2643 );
2644 } else {
2645 panic!("'statusMap' field is not a map");
2646 }
2647 } else {
2648 panic!("Top-level schema is not a struct");
2649 }
2650 }
2651
2652 #[test]
2653 fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
2654 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2655 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2656 let mut maker = Maker::new(false, false);
2657 let data_type = maker
2658 .make_data_type(&writer_schema, Some(&reader_schema), None)
2659 .expect("resolution should succeed");
2660 let field = AvroField {
2661 name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
2662 data_type,
2663 };
2664 assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
2665 assert!(matches!(field.data_type().codec(), Codec::Utf8));
2666 }
2667
2668 fn json_string(s: &str) -> Value {
2669 Value::String(s.to_string())
2670 }
2671
2672 fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
2673 let stored = dt
2674 .metadata
2675 .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
2676 .cloned()
2677 .unwrap_or_default();
2678 let expected = serde_json::to_string(default_json).unwrap();
2679 assert_eq!(stored, expected, "stored default metadata should match");
2680 }
2681
2682 #[test]
2683 fn test_validate_and_store_default_null_and_nullability_rules() {
2684 let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
2685 let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
2686 assert_eq!(lit, AvroLiteral::Null);
2687 assert_default_stored(&dt_null, &Value::Null);
2688 let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2689 let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
2690 assert!(
2691 err.to_string()
2692 .contains("JSON null default is only valid for `null` type"),
2693 "unexpected error: {err}"
2694 );
2695 let mut dt_int_nf =
2696 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
2697 let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
2698 assert_eq!(lit2, AvroLiteral::Null);
2699 assert_default_stored(&dt_int_nf, &Value::Null);
2700 let mut dt_int_ns =
2701 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
2702 let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
2703 assert!(
2704 err2.to_string()
2705 .contains("JSON null default is only valid for `null` type"),
2706 "unexpected error: {err2}"
2707 );
2708 }
2709
2710 #[test]
2711 fn test_validate_and_store_default_primitives_and_temporal() {
2712 let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
2713 let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
2714 assert_eq!(lit, AvroLiteral::Boolean(true));
2715 assert_default_stored(&dt_bool, &Value::Bool(true));
2716 let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2717 let lit = dt_i32
2718 .parse_and_store_default(&serde_json::json!(123))
2719 .unwrap();
2720 assert_eq!(lit, AvroLiteral::Int(123));
2721 assert_default_stored(&dt_i32, &serde_json::json!(123));
2722 let err = dt_i32
2723 .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
2724 .unwrap_err();
2725 assert!(format!("{err}").contains("out of i32 range"));
2726 let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2727 let lit = dt_i64
2728 .parse_and_store_default(&serde_json::json!(1234567890))
2729 .unwrap();
2730 assert_eq!(lit, AvroLiteral::Long(1234567890));
2731 assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
2732 let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
2733 let lit = dt_f32
2734 .parse_and_store_default(&serde_json::json!(1.25))
2735 .unwrap();
2736 assert_eq!(lit, AvroLiteral::Float(1.25));
2737 assert_default_stored(&dt_f32, &serde_json::json!(1.25));
2738 let err = dt_f32
2739 .parse_and_store_default(&serde_json::json!(1e39))
2740 .unwrap_err();
2741 assert!(format!("{err}").contains("out of f32 range"));
2742 let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2743 let lit = dt_f64
2744 .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
2745 .unwrap();
2746 assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
2747 assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
2748 let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
2749 let l = dt_str
2750 .parse_and_store_default(&json_string("hello"))
2751 .unwrap();
2752 assert_eq!(l, AvroLiteral::String("hello".into()));
2753 assert_default_stored(&dt_str, &json_string("hello"));
2754 let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
2755 let l = dt_strv
2756 .parse_and_store_default(&json_string("view"))
2757 .unwrap();
2758 assert_eq!(l, AvroLiteral::String("view".into()));
2759 assert_default_stored(&dt_strv, &json_string("view"));
2760 let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
2761 let l = dt_uuid
2762 .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
2763 .unwrap();
2764 assert_eq!(
2765 l,
2766 AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
2767 );
2768 let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
2769 let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
2770 assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
2771 let err = dt_bin
2772 .parse_and_store_default(&json_string("€")) .unwrap_err();
2774 assert!(format!("{err}").contains("Invalid codepoint"));
2775 let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
2776 let ld = dt_date
2777 .parse_and_store_default(&serde_json::json!(1))
2778 .unwrap();
2779 assert_eq!(ld, AvroLiteral::Int(1));
2780 let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
2781 let lt = dt_tmill
2782 .parse_and_store_default(&serde_json::json!(86_400_000))
2783 .unwrap();
2784 assert_eq!(lt, AvroLiteral::Int(86_400_000));
2785 let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
2786 let ltm = dt_tmicros
2787 .parse_and_store_default(&serde_json::json!(1_000_000))
2788 .unwrap();
2789 assert_eq!(ltm, AvroLiteral::Long(1_000_000));
2790 let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
2791 let l1 = dt_ts_milli
2792 .parse_and_store_default(&serde_json::json!(123))
2793 .unwrap();
2794 assert_eq!(l1, AvroLiteral::Long(123));
2795 let mut dt_ts_micro =
2796 AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
2797 let l2 = dt_ts_micro
2798 .parse_and_store_default(&serde_json::json!(456))
2799 .unwrap();
2800 assert_eq!(l2, AvroLiteral::Long(456));
2801 }
2802
2803 #[test]
2804 fn test_validate_and_store_default_fixed_decimal_interval() {
2805 let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
2806 let l = dt_fixed
2807 .parse_and_store_default(&json_string("WXYZ"))
2808 .unwrap();
2809 assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
2810 let err = dt_fixed
2811 .parse_and_store_default(&json_string("TOO LONG"))
2812 .unwrap_err();
2813 assert!(err.to_string().contains("Default length"));
2814 let mut dt_dec_fixed =
2815 AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
2816 let l = dt_dec_fixed
2817 .parse_and_store_default(&json_string("abc"))
2818 .unwrap();
2819 assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
2820 let err = dt_dec_fixed
2821 .parse_and_store_default(&json_string("toolong"))
2822 .unwrap_err();
2823 assert!(err.to_string().contains("Default length"));
2824 let mut dt_dec_bytes =
2825 AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2826 let l = dt_dec_bytes
2827 .parse_and_store_default(&json_string("freeform"))
2828 .unwrap();
2829 assert_eq!(
2830 l,
2831 AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
2832 );
2833 let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
2834 let l = dt_interval
2835 .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
2836 .unwrap();
2837 assert_eq!(
2838 l,
2839 AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
2840 );
2841 let err = dt_interval
2842 .parse_and_store_default(&json_string("short"))
2843 .unwrap_err();
2844 assert!(err.to_string().contains("Default length"));
2845 }
2846
2847 #[test]
2848 fn test_validate_and_store_default_enum_list_map_struct() {
2849 let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
2850 .into_iter()
2851 .collect();
2852 let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
2853 let l = dt_enum
2854 .parse_and_store_default(&json_string("GREEN"))
2855 .unwrap();
2856 assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
2857 let err = dt_enum
2858 .parse_and_store_default(&json_string("YELLOW"))
2859 .unwrap_err();
2860 assert!(err.to_string().contains("Default enum symbol"));
2861 let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2862 let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
2863 let val = serde_json::json!([1, 2, 3]);
2864 let l = dt_list.parse_and_store_default(&val).unwrap();
2865 assert_eq!(
2866 l,
2867 AvroLiteral::Array(vec![
2868 AvroLiteral::Long(1),
2869 AvroLiteral::Long(2),
2870 AvroLiteral::Long(3)
2871 ])
2872 );
2873 let err = dt_list
2874 .parse_and_store_default(&serde_json::json!({"not":"array"}))
2875 .unwrap_err();
2876 assert!(err.to_string().contains("JSON array"));
2877 let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2878 let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
2879 let mv = serde_json::json!({"x": 1.5, "y": 2.5});
2880 let l = dt_map.parse_and_store_default(&mv).unwrap();
2881 let mut expected = IndexMap::new();
2882 expected.insert("x".into(), AvroLiteral::Double(1.5));
2883 expected.insert("y".into(), AvroLiteral::Double(2.5));
2884 assert_eq!(l, AvroLiteral::Map(expected));
2885 let err = dt_map
2887 .parse_and_store_default(&serde_json::json!(123))
2888 .unwrap_err();
2889 assert!(err.to_string().contains("JSON object"));
2890 let mut field_a = AvroField {
2891 name: "a".into(),
2892 data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
2893 };
2894 let field_b = AvroField {
2895 name: "b".into(),
2896 data_type: AvroDataType::new(
2897 Codec::Int64,
2898 HashMap::new(),
2899 Some(Nullability::NullFirst),
2900 ),
2901 };
2902 let mut c_md = HashMap::new();
2903 c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
2904 let field_c = AvroField {
2905 name: "c".into(),
2906 data_type: AvroDataType::new(Codec::Utf8, c_md, None),
2907 };
2908 field_a.data_type.metadata.insert("doc".into(), "na".into());
2909 let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
2910 let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
2911 let default_obj = serde_json::json!({"a": 7});
2912 let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
2913 let mut expected = IndexMap::new();
2914 expected.insert("a".into(), AvroLiteral::Int(7));
2915 expected.insert("b".into(), AvroLiteral::Null);
2916 expected.insert("c".into(), AvroLiteral::String("xyz".into()));
2917 assert_eq!(l, AvroLiteral::Map(expected));
2918 assert_default_stored(&dt_struct, &default_obj);
2919 let req_field = AvroField {
2920 name: "req".into(),
2921 data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
2922 };
2923 let mut dt_bad = AvroDataType::new(
2924 Codec::Struct(Arc::from(vec![req_field])),
2925 HashMap::new(),
2926 None,
2927 );
2928 let err = dt_bad
2929 .parse_and_store_default(&serde_json::json!({}))
2930 .unwrap_err();
2931 assert!(
2932 err.to_string().contains("missing required subfield 'req'"),
2933 "unexpected error: {err}"
2934 );
2935 let err = dt_struct
2936 .parse_and_store_default(&serde_json::json!(10))
2937 .unwrap_err();
2938 err.to_string().contains("must be a JSON object");
2939 }
2940
2941 #[test]
2942 fn test_resolve_array_promotion_and_reader_metadata() {
2943 let mut w_add: HashMap<&str, Value> = HashMap::new();
2944 w_add.insert("who", json_string("writer"));
2945 let mut r_add: HashMap<&str, Value> = HashMap::new();
2946 r_add.insert("who", json_string("reader"));
2947 let writer_schema = Schema::Complex(ComplexType::Array(Array {
2948 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2949 attributes: Attributes {
2950 logical_type: None,
2951 additional: w_add,
2952 },
2953 }));
2954 let reader_schema = Schema::Complex(ComplexType::Array(Array {
2955 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
2956 attributes: Attributes {
2957 logical_type: None,
2958 additional: r_add,
2959 },
2960 }));
2961 let mut maker = Maker::new(false, false);
2962 let dt = maker
2963 .make_data_type(&writer_schema, Some(&reader_schema), None)
2964 .unwrap();
2965 assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
2966 if let Codec::List(inner) = dt.codec() {
2967 assert!(matches!(inner.codec(), Codec::Int64));
2968 assert_eq!(
2969 inner.resolution,
2970 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2971 );
2972 } else {
2973 panic!("expected list codec");
2974 }
2975 }
2976
2977 #[test]
2978 fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
2979 let writer_schema = Schema::Complex(ComplexType::Array(Array {
2980 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2981 attributes: Attributes::default(),
2982 }));
2983 let reader_schema = Schema::Complex(ComplexType::Array(Array {
2984 items: Box::new(mk_union(vec![
2985 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2986 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2987 ])),
2988 attributes: Attributes::default(),
2989 }));
2990 let mut maker = Maker::new(false, false);
2991 let dt = maker
2992 .make_data_type(&writer_schema, Some(&reader_schema), None)
2993 .unwrap();
2994 if let Codec::List(inner) = dt.codec() {
2995 assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
2996 assert!(matches!(inner.codec(), Codec::Int32));
2997 match inner.resolution.as_ref() {
2998 Some(ResolutionInfo::Union(info)) => {
2999 assert!(!info.writer_is_union, "writer should be non-union");
3000 assert!(info.reader_is_union, "reader should be union");
3001 assert_eq!(
3002 info.writer_to_reader.as_ref(),
3003 &[Some((1, Promotion::Direct))]
3004 );
3005 }
3006 other => panic!("expected Union resolution, got {other:?}"),
3007 }
3008 } else {
3009 panic!("expected List codec");
3010 }
3011 }
3012
3013 #[test]
3014 fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3015 let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3016 name: "MD5",
3017 namespace: None,
3018 aliases: vec!["Hash16"],
3019 size: 16,
3020 attributes: Attributes::default(),
3021 }));
3022 let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3023 name: "Hash16",
3024 namespace: None,
3025 aliases: vec![],
3026 size: 16,
3027 attributes: Attributes::default(),
3028 }));
3029 let mut maker = Maker::new(false, false);
3030 let dt = maker
3031 .make_data_type(&writer_schema, Some(&reader_schema), None)
3032 .unwrap();
3033 assert!(matches!(dt.codec(), Codec::Fixed(16)));
3034 }
3035
3036 #[test]
3037 fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3038 let writer = Schema::Complex(ComplexType::Record(Record {
3039 name: "R",
3040 namespace: None,
3041 doc: None,
3042 aliases: vec![],
3043 fields: vec![
3044 crate::schema::Field {
3045 name: "a",
3046 doc: None,
3047 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3048 default: None,
3049 aliases: vec![],
3050 },
3051 crate::schema::Field {
3052 name: "skipme",
3053 doc: None,
3054 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3055 default: None,
3056 aliases: vec![],
3057 },
3058 crate::schema::Field {
3059 name: "b",
3060 doc: None,
3061 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3062 default: None,
3063 aliases: vec![],
3064 },
3065 ],
3066 attributes: Attributes::default(),
3067 }));
3068 let reader = Schema::Complex(ComplexType::Record(Record {
3069 name: "R",
3070 namespace: None,
3071 doc: None,
3072 aliases: vec![],
3073 fields: vec![
3074 crate::schema::Field {
3075 name: "b",
3076 doc: None,
3077 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3078 default: None,
3079 aliases: vec![],
3080 },
3081 crate::schema::Field {
3082 name: "a",
3083 doc: None,
3084 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3085 default: None,
3086 aliases: vec![],
3087 },
3088 crate::schema::Field {
3089 name: "name",
3090 doc: None,
3091 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3092 default: Some(json_string("anon")),
3093 aliases: vec![],
3094 },
3095 crate::schema::Field {
3096 name: "opt",
3097 doc: None,
3098 r#type: Schema::Union(vec![
3099 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3100 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3101 ]),
3102 default: None, aliases: vec![],
3104 },
3105 ],
3106 attributes: Attributes::default(),
3107 }));
3108 let mut maker = Maker::new(false, false);
3109 let dt = maker
3110 .make_data_type(&writer, Some(&reader), None)
3111 .expect("record resolution");
3112 let fields = match dt.codec() {
3113 Codec::Struct(f) => f,
3114 other => panic!("expected struct, got {other:?}"),
3115 };
3116 assert_eq!(fields.len(), 4);
3117 assert_eq!(fields[0].name(), "b");
3118 assert_eq!(fields[1].name(), "a");
3119 assert_eq!(fields[2].name(), "name");
3120 assert_eq!(fields[3].name(), "opt");
3121 assert!(matches!(
3122 fields[1].data_type().resolution,
3123 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3124 ));
3125 let rec = match dt.resolution {
3126 Some(ResolutionInfo::Record(ref r)) => r.clone(),
3127 other => panic!("expected record resolution, got {other:?}"),
3128 };
3129 assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
3130 assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3131 assert!(rec.skip_fields[0].is_none());
3132 assert!(rec.skip_fields[2].is_none());
3133 let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
3134 assert!(matches!(skip1.codec(), Codec::Utf8));
3135 let name_md = &fields[2].data_type().metadata;
3136 assert_eq!(
3137 name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3138 Some(&"\"anon\"".to_string())
3139 );
3140 let opt_md = &fields[3].data_type().metadata;
3141 assert_eq!(
3142 opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3143 Some(&"null".to_string())
3144 );
3145 }
3146
3147 #[test]
3148 fn test_named_type_alias_resolution_record_cross_namespace() {
3149 let writer_record = Record {
3150 name: "PersonV2",
3151 namespace: Some("com.example.v2"),
3152 doc: None,
3153 aliases: vec!["com.example.Person"],
3154 fields: vec![
3155 AvroFieldSchema {
3156 name: "name",
3157 doc: None,
3158 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3159 default: None,
3160 aliases: vec![],
3161 },
3162 AvroFieldSchema {
3163 name: "age",
3164 doc: None,
3165 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3166 default: None,
3167 aliases: vec![],
3168 },
3169 ],
3170 attributes: Attributes::default(),
3171 };
3172 let reader_record = Record {
3173 name: "Person",
3174 namespace: Some("com.example"),
3175 doc: None,
3176 aliases: vec![],
3177 fields: writer_record.fields.clone(),
3178 attributes: Attributes::default(),
3179 };
3180 let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3181 let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3182 let mut maker = Maker::new(false, false);
3183 let result = maker
3184 .make_data_type(&writer_schema, Some(&reader_schema), None)
3185 .expect("record alias resolution should succeed");
3186 match result.codec {
3187 Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3188 other => panic!("expected struct, got {other:?}"),
3189 }
3190 }
3191
3192 #[test]
3193 fn test_named_type_alias_resolution_enum_cross_namespace() {
3194 let writer_enum = Enum {
3195 name: "ColorV2",
3196 namespace: Some("org.example.v2"),
3197 doc: None,
3198 aliases: vec!["org.example.Color"],
3199 symbols: vec!["RED", "GREEN", "BLUE"],
3200 default: None,
3201 attributes: Attributes::default(),
3202 };
3203 let reader_enum = Enum {
3204 name: "Color",
3205 namespace: Some("org.example"),
3206 doc: None,
3207 aliases: vec![],
3208 symbols: vec!["RED", "GREEN", "BLUE"],
3209 default: None,
3210 attributes: Attributes::default(),
3211 };
3212 let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3213 let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3214 let mut maker = Maker::new(false, false);
3215 maker
3216 .make_data_type(&writer_schema, Some(&reader_schema), None)
3217 .expect("enum alias resolution should succeed");
3218 }
3219
3220 #[test]
3221 fn test_named_type_alias_resolution_fixed_cross_namespace() {
3222 let writer_fixed = Fixed {
3223 name: "Fx10V2",
3224 namespace: Some("ns.v2"),
3225 aliases: vec!["ns.Fx10"],
3226 size: 10,
3227 attributes: Attributes::default(),
3228 };
3229 let reader_fixed = Fixed {
3230 name: "Fx10",
3231 namespace: Some("ns"),
3232 aliases: vec![],
3233 size: 10,
3234 attributes: Attributes::default(),
3235 };
3236 let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3237 let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3238 let mut maker = Maker::new(false, false);
3239 maker
3240 .make_data_type(&writer_schema, Some(&reader_schema), None)
3241 .expect("fixed alias resolution should succeed");
3242 }
3243}