1use crate::schema::{
21 AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22 AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23 PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26 ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27 IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43 Promotion(Promotion),
45 DefaultValue(AvroLiteral),
47 EnumMapping(EnumMapping),
49 Record(ResolvedRecord),
51 Union(ResolvedUnion),
53}
54
55#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60 Null,
62 Boolean(bool),
64 Int(i32),
66 Long(i64),
68 Float(f32),
70 Double(f64),
72 Bytes(Vec<u8>),
74 String(String),
76 Enum(String),
78 Array(Vec<AvroLiteral>),
80 Map(IndexMap<String, AvroLiteral>),
82}
83
84#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87 pub(crate) writer_to_reader: Arc<[Option<usize>]>,
90 pub(crate) default_fields: Arc<[usize]>,
92 pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub(crate) enum Promotion {
103 Direct,
105 IntToLong,
107 IntToFloat,
109 IntToDouble,
111 LongToFloat,
113 LongToDouble,
115 FloatToDouble,
117 StringToBytes,
119 BytesToString,
121}
122
123impl Display for Promotion {
124 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
125 match self {
126 Self::Direct => write!(formatter, "Direct"),
127 Self::IntToLong => write!(formatter, "Int->Long"),
128 Self::IntToFloat => write!(formatter, "Int->Float"),
129 Self::IntToDouble => write!(formatter, "Int->Double"),
130 Self::LongToFloat => write!(formatter, "Long->Float"),
131 Self::LongToDouble => write!(formatter, "Long->Double"),
132 Self::FloatToDouble => write!(formatter, "Float->Double"),
133 Self::StringToBytes => write!(formatter, "String->Bytes"),
134 Self::BytesToString => write!(formatter, "Bytes->String"),
135 }
136 }
137}
138
139#[derive(Debug, Clone, PartialEq)]
141pub(crate) struct ResolvedUnion {
142 pub(crate) writer_to_reader: Arc<[Option<(usize, Promotion)>]>,
145 pub(crate) writer_is_union: bool,
147 pub(crate) reader_is_union: bool,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
155pub(crate) struct EnumMapping {
156 pub(crate) mapping: Arc<[i32]>,
158 pub(crate) default_index: i32,
161}
162
163#[cfg(feature = "canonical_extension_types")]
164fn with_extension_type(codec: &Codec, field: Field) -> Field {
165 match codec {
166 Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
167 _ => field,
168 }
169}
170
171#[derive(Debug, Clone, PartialEq)]
173pub(crate) struct AvroDataType {
174 nullability: Option<Nullability>,
175 metadata: HashMap<String, String>,
176 codec: Codec,
177 pub(crate) resolution: Option<ResolutionInfo>,
178}
179
180impl AvroDataType {
181 pub(crate) fn new(
183 codec: Codec,
184 metadata: HashMap<String, String>,
185 nullability: Option<Nullability>,
186 ) -> Self {
187 AvroDataType {
188 codec,
189 metadata,
190 nullability,
191 resolution: None,
192 }
193 }
194
195 #[inline]
196 fn new_with_resolution(
197 codec: Codec,
198 metadata: HashMap<String, String>,
199 nullability: Option<Nullability>,
200 resolution: Option<ResolutionInfo>,
201 ) -> Self {
202 Self {
203 codec,
204 metadata,
205 nullability,
206 resolution,
207 }
208 }
209
210 pub(crate) fn field_with_name(&self, name: &str) -> Field {
212 let mut nullable = self.nullability.is_some();
213 if !nullable {
214 if let Codec::Union(children, _, _) = self.codec() {
215 if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
217 nullable = true;
218 }
219 }
220 }
221 let data_type = self.codec.data_type();
222 let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
223 #[cfg(feature = "canonical_extension_types")]
224 return with_extension_type(&self.codec, field);
225 #[cfg(not(feature = "canonical_extension_types"))]
226 field
227 }
228
229 pub(crate) fn codec(&self) -> &Codec {
234 &self.codec
235 }
236
237 pub(crate) fn nullability(&self) -> Option<Nullability> {
245 self.nullability
246 }
247
248 #[inline]
249 fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
250 fn expect_string<'v>(
251 default_json: &'v Value,
252 data_type: &str,
253 ) -> Result<&'v str, ArrowError> {
254 match default_json {
255 Value::String(s) => Ok(s.as_str()),
256 _ => Err(ArrowError::SchemaError(format!(
257 "Default value must be a JSON string for {data_type}"
258 ))),
259 }
260 }
261
262 fn parse_bytes_default(
263 default_json: &Value,
264 expected_len: Option<usize>,
265 ) -> Result<Vec<u8>, ArrowError> {
266 let s = expect_string(default_json, "bytes/fixed logical types")?;
267 let mut out = Vec::with_capacity(s.len());
268 for ch in s.chars() {
269 let cp = ch as u32;
270 if cp > 0xFF {
271 return Err(ArrowError::SchemaError(format!(
272 "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
273 )));
274 }
275 out.push(cp as u8);
276 }
277 if let Some(len) = expected_len {
278 if out.len() != len {
279 return Err(ArrowError::SchemaError(format!(
280 "Default length {} does not match expected fixed size {len}",
281 out.len(),
282 )));
283 }
284 }
285 Ok(out)
286 }
287
288 fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
289 match default_json {
290 Value::Number(n) => n.as_i64().ok_or_else(|| {
291 ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
292 }),
293 _ => Err(ArrowError::SchemaError(format!(
294 "Default {data_type} must be a JSON integer"
295 ))),
296 }
297 }
298
299 fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
300 match default_json {
301 Value::Number(n) => n.as_f64().ok_or_else(|| {
302 ArrowError::SchemaError(format!("Default {data_type} must be a number"))
303 }),
304 _ => Err(ArrowError::SchemaError(format!(
305 "Default {data_type} must be a JSON number"
306 ))),
307 }
308 }
309
310 if default_json.is_null() {
312 return match self.codec() {
313 Codec::Null => Ok(AvroLiteral::Null),
314 Codec::Union(encodings, _, _) if !encodings.is_empty()
315 && matches!(encodings[0].codec(), Codec::Null) =>
316 {
317 Ok(AvroLiteral::Null)
318 }
319 _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
320 _ => Err(ArrowError::SchemaError(
321 "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
322 .to_string(),
323 )),
324 };
325 }
326 let lit = match self.codec() {
327 Codec::Null => {
328 return Err(ArrowError::SchemaError(
329 "Default for `null` type must be JSON null".to_string(),
330 ));
331 }
332 Codec::Boolean => match default_json {
333 Value::Bool(b) => AvroLiteral::Boolean(*b),
334 _ => {
335 return Err(ArrowError::SchemaError(
336 "Boolean default must be a JSON boolean".to_string(),
337 ));
338 }
339 },
340 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
341 let i = parse_json_i64(default_json, "int")?;
342 if i < i32::MIN as i64 || i > i32::MAX as i64 {
343 return Err(ArrowError::SchemaError(format!(
344 "Default int {i} out of i32 range"
345 )));
346 }
347 AvroLiteral::Int(i as i32)
348 }
349 Codec::Int64
350 | Codec::TimeMicros
351 | Codec::TimestampMillis(_)
352 | Codec::TimestampMicros(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
353 #[cfg(feature = "avro_custom_types")]
354 Codec::DurationNanos
355 | Codec::DurationMicros
356 | Codec::DurationMillis
357 | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
358 Codec::Float32 => {
359 let f = parse_json_f64(default_json, "float")?;
360 if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
361 return Err(ArrowError::SchemaError(format!(
362 "Default float {f} out of f32 range or not finite"
363 )));
364 }
365 AvroLiteral::Float(f as f32)
366 }
367 Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
368 Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
369 AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
370 }
371 Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
372 Codec::Fixed(sz) => {
373 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
374 }
375 Codec::Decimal(_, _, fixed_size) => {
376 AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
377 }
378 Codec::Enum(symbols) => {
379 let s = expect_string(default_json, "enum")?;
380 if symbols.iter().any(|sym| sym == s) {
381 AvroLiteral::Enum(s.to_string())
382 } else {
383 return Err(ArrowError::SchemaError(format!(
384 "Default enum symbol {s:?} not found in reader enum symbols"
385 )));
386 }
387 }
388 Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
389 Codec::List(item_dt) => match default_json {
390 Value::Array(items) => AvroLiteral::Array(
391 items
392 .iter()
393 .map(|v| item_dt.parse_default_literal(v))
394 .collect::<Result<_, _>>()?,
395 ),
396 _ => {
397 return Err(ArrowError::SchemaError(
398 "Default value must be a JSON array for Avro array type".to_string(),
399 ));
400 }
401 },
402 Codec::Map(val_dt) => match default_json {
403 Value::Object(map) => {
404 let mut out = IndexMap::with_capacity(map.len());
405 for (k, v) in map {
406 out.insert(k.clone(), val_dt.parse_default_literal(v)?);
407 }
408 AvroLiteral::Map(out)
409 }
410 _ => {
411 return Err(ArrowError::SchemaError(
412 "Default value must be a JSON object for Avro map type".to_string(),
413 ));
414 }
415 },
416 Codec::Struct(fields) => match default_json {
417 Value::Object(obj) => {
418 let mut out: IndexMap<String, AvroLiteral> =
419 IndexMap::with_capacity(fields.len());
420 for f in fields.as_ref() {
421 let name = f.name().to_string();
422 if let Some(sub) = obj.get(&name) {
423 out.insert(name, f.data_type().parse_default_literal(sub)?);
424 } else {
425 let stored_default =
427 f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
428 if stored_default.is_none()
429 && f.data_type().nullability() == Some(Nullability::default())
430 {
431 out.insert(name, AvroLiteral::Null);
432 } else if let Some(default_json) = stored_default {
433 let v: Value =
434 serde_json::from_str(default_json).map_err(|e| {
435 ArrowError::SchemaError(format!(
436 "Failed to parse stored subfield default JSON for '{}': {e}",
437 f.name(),
438 ))
439 })?;
440 out.insert(name, f.data_type().parse_default_literal(&v)?);
441 } else {
442 return Err(ArrowError::SchemaError(format!(
443 "Record default missing required subfield '{}' with non-nullable type {:?}",
444 f.name(),
445 f.data_type().codec()
446 )));
447 }
448 }
449 }
450 AvroLiteral::Map(out)
451 }
452 _ => {
453 return Err(ArrowError::SchemaError(
454 "Default value for record/struct must be a JSON object".to_string(),
455 ));
456 }
457 },
458 Codec::Union(encodings, _, _) => {
459 let Some(default_encoding) = encodings.first() else {
460 return Err(ArrowError::SchemaError(
461 "Union with no branches cannot have a default".to_string(),
462 ));
463 };
464 default_encoding.parse_default_literal(default_json)?
465 }
466 #[cfg(feature = "avro_custom_types")]
467 Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
468 };
469 Ok(lit)
470 }
471
472 fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
473 let json_text = serde_json::to_string(default_json).map_err(|e| {
474 ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
475 })?;
476 self.metadata
477 .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
478 Ok(())
479 }
480
481 fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
482 let lit = self.parse_default_literal(default_json)?;
483 self.store_default(default_json)?;
484 Ok(lit)
485 }
486}
487
488#[derive(Debug, Clone, PartialEq)]
490pub(crate) struct AvroField {
491 name: String,
492 data_type: AvroDataType,
493}
494
495impl AvroField {
496 pub(crate) fn field(&self) -> Field {
498 self.data_type.field_with_name(&self.name)
499 }
500
501 pub(crate) fn data_type(&self) -> &AvroDataType {
503 &self.data_type
504 }
505
506 pub(crate) fn with_utf8view(&self) -> Self {
515 let mut field = self.clone();
516 if let Codec::Utf8 = field.data_type.codec {
517 field.data_type.codec = Codec::Utf8View;
518 }
519 field
520 }
521
522 pub(crate) fn name(&self) -> &str {
527 &self.name
528 }
529}
530
531impl<'a> TryFrom<&Schema<'a>> for AvroField {
532 type Error = ArrowError;
533
534 fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
535 match schema {
536 Schema::Complex(ComplexType::Record(r)) => {
537 let mut resolver = Maker::new(false, false);
538 let data_type = resolver.make_data_type(schema, None, None)?;
539 Ok(AvroField {
540 data_type,
541 name: r.name.to_string(),
542 })
543 }
544 _ => Err(ArrowError::ParseError(format!(
545 "Expected record got {schema:?}"
546 ))),
547 }
548 }
549}
550
551#[derive(Debug)]
553pub(crate) struct AvroFieldBuilder<'a> {
554 writer_schema: &'a Schema<'a>,
555 reader_schema: Option<&'a Schema<'a>>,
556 use_utf8view: bool,
557 strict_mode: bool,
558}
559
560impl<'a> AvroFieldBuilder<'a> {
561 pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
563 Self {
564 writer_schema,
565 reader_schema: None,
566 use_utf8view: false,
567 strict_mode: false,
568 }
569 }
570
571 #[inline]
576 pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
577 self.reader_schema = Some(reader_schema);
578 self
579 }
580
581 pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
583 self.use_utf8view = use_utf8view;
584 self
585 }
586
587 pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
589 self.strict_mode = strict_mode;
590 self
591 }
592
593 pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
595 match self.writer_schema {
596 Schema::Complex(ComplexType::Record(r)) => {
597 let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
598 let data_type =
599 resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
600 Ok(AvroField {
601 name: r.name.to_string(),
602 data_type,
603 })
604 }
605 _ => Err(ArrowError::ParseError(format!(
606 "Expected a Record schema to build an AvroField, but got {:?}",
607 self.writer_schema
608 ))),
609 }
610 }
611}
612
613#[derive(Debug, Clone, PartialEq)]
617pub(crate) enum Codec {
618 Null,
620 Boolean,
622 Int32,
624 Int64,
626 Float32,
628 Float64,
630 Binary,
632 Utf8,
634 Utf8View,
639 Date32,
641 TimeMillis,
643 TimeMicros,
645 TimestampMillis(bool),
650 TimestampMicros(bool),
655 Fixed(i32),
658 Decimal(usize, Option<usize>, Option<usize>),
665 Uuid,
667 Enum(Arc<[String]>),
671 List(Arc<AvroDataType>),
673 Struct(Arc<[AvroField]>),
675 Map(Arc<AvroDataType>),
677 Interval,
679 Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
681 #[cfg(feature = "avro_custom_types")]
683 DurationNanos,
684 #[cfg(feature = "avro_custom_types")]
686 DurationMicros,
687 #[cfg(feature = "avro_custom_types")]
689 DurationMillis,
690 #[cfg(feature = "avro_custom_types")]
692 DurationSeconds,
693 #[cfg(feature = "avro_custom_types")]
694 RunEndEncoded(Arc<AvroDataType>, u8),
695}
696
697impl Codec {
698 fn data_type(&self) -> DataType {
699 match self {
700 Self::Null => DataType::Null,
701 Self::Boolean => DataType::Boolean,
702 Self::Int32 => DataType::Int32,
703 Self::Int64 => DataType::Int64,
704 Self::Float32 => DataType::Float32,
705 Self::Float64 => DataType::Float64,
706 Self::Binary => DataType::Binary,
707 Self::Utf8 => DataType::Utf8,
708 Self::Utf8View => DataType::Utf8View,
709 Self::Date32 => DataType::Date32,
710 Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
711 Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
712 Self::TimestampMillis(is_utc) => {
713 DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
714 }
715 Self::TimestampMicros(is_utc) => {
716 DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
717 }
718 Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
719 Self::Fixed(size) => DataType::FixedSizeBinary(*size),
720 Self::Decimal(precision, scale, _size) => {
721 let p = *precision as u8;
722 let s = scale.unwrap_or(0) as i8;
723 #[cfg(feature = "small_decimals")]
724 {
725 if *precision <= DECIMAL32_MAX_PRECISION as usize {
726 DataType::Decimal32(p, s)
727 } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
728 DataType::Decimal64(p, s)
729 } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
730 DataType::Decimal128(p, s)
731 } else {
732 DataType::Decimal256(p, s)
733 }
734 }
735 #[cfg(not(feature = "small_decimals"))]
736 {
737 if *precision <= DECIMAL128_MAX_PRECISION as usize {
738 DataType::Decimal128(p, s)
739 } else {
740 DataType::Decimal256(p, s)
741 }
742 }
743 }
744 Self::Uuid => DataType::FixedSizeBinary(16),
745 Self::Enum(_) => {
746 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
747 }
748 Self::List(f) => {
749 DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
750 }
751 Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
752 Self::Map(value_type) => {
753 let val_field = value_type.field_with_name("value");
754 DataType::Map(
755 Arc::new(Field::new(
756 "entries",
757 DataType::Struct(Fields::from(vec![
758 Field::new("key", DataType::Utf8, false),
759 val_field,
760 ])),
761 false,
762 )),
763 false,
764 )
765 }
766 Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
767 #[cfg(feature = "avro_custom_types")]
768 Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
769 #[cfg(feature = "avro_custom_types")]
770 Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
771 #[cfg(feature = "avro_custom_types")]
772 Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
773 #[cfg(feature = "avro_custom_types")]
774 Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
775 #[cfg(feature = "avro_custom_types")]
776 Self::RunEndEncoded(values, bits) => {
777 let run_ends_dt = match *bits {
778 16 => DataType::Int16,
779 32 => DataType::Int32,
780 64 => DataType::Int64,
781 _ => unreachable!(),
782 };
783 DataType::RunEndEncoded(
784 Arc::new(Field::new("run_ends", run_ends_dt, false)),
785 Arc::new(Field::new("values", values.codec().data_type(), true)),
786 )
787 }
788 }
789 }
790
791 pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
797 if use_utf8view && matches!(self, Self::Utf8) {
798 Self::Utf8View
799 } else {
800 self
801 }
802 }
803
804 #[inline]
805 fn union_field_name(&self) -> String {
806 UnionFieldKind::from(self).as_ref().to_owned()
807 }
808}
809
810impl From<PrimitiveType> for Codec {
811 fn from(value: PrimitiveType) -> Self {
812 match value {
813 PrimitiveType::Null => Self::Null,
814 PrimitiveType::Boolean => Self::Boolean,
815 PrimitiveType::Int => Self::Int32,
816 PrimitiveType::Long => Self::Int64,
817 PrimitiveType::Float => Self::Float32,
818 PrimitiveType::Double => Self::Float64,
819 PrimitiveType::Bytes => Self::Binary,
820 PrimitiveType::String => Self::Utf8,
821 }
822 }
823}
824
825const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
834 const MAX_P: [usize; 32] = [
839 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
840 59, 62, 64, 67, 69, 71, 74, 76,
841 ];
842 match n {
843 1..=32 => Some(MAX_P[n - 1]),
844 _ => None,
845 }
846}
847
848fn parse_decimal_attributes(
849 attributes: &Attributes,
850 fallback_size: Option<usize>,
851 precision_required: bool,
852) -> Result<(usize, usize, Option<usize>), ArrowError> {
853 let precision = attributes
854 .additional
855 .get("precision")
856 .and_then(|v| v.as_u64())
857 .or(if precision_required { None } else { Some(10) })
858 .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
859 as usize;
860 let scale = attributes
861 .additional
862 .get("scale")
863 .and_then(|v| v.as_u64())
864 .unwrap_or(0) as usize;
865 let size = attributes
866 .additional
867 .get("size")
868 .and_then(|v| v.as_u64())
869 .map(|s| s as usize)
870 .or(fallback_size);
871 if precision == 0 {
872 return Err(ArrowError::ParseError(
873 "Decimal requires precision > 0".to_string(),
874 ));
875 }
876 if scale > precision {
877 return Err(ArrowError::ParseError(format!(
878 "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
879 )));
880 }
881 if precision > DECIMAL256_MAX_PRECISION as usize {
882 return Err(ArrowError::ParseError(format!(
883 "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
884 DECIMAL256_MAX_PRECISION
885 )));
886 }
887 if let Some(sz) = size {
888 let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
889 ArrowError::ParseError(format!(
890 "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
891 ))
892 })?;
893 if precision > max_p {
894 return Err(ArrowError::ParseError(format!(
895 "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
896 )));
897 }
898 }
899 Ok((precision, scale, size))
900}
901
902#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
903#[strum(serialize_all = "snake_case")]
904enum UnionFieldKind {
905 Null,
906 Boolean,
907 Int,
908 Long,
909 Float,
910 Double,
911 Bytes,
912 String,
913 Date,
914 TimeMillis,
915 TimeMicros,
916 TimestampMillisUtc,
917 TimestampMillisLocal,
918 TimestampMicrosUtc,
919 TimestampMicrosLocal,
920 Duration,
921 Fixed,
922 Decimal,
923 Enum,
924 Array,
925 Record,
926 Map,
927 Uuid,
928 Union,
929}
930
931impl From<&Codec> for UnionFieldKind {
932 fn from(c: &Codec) -> Self {
933 match c {
934 Codec::Null => Self::Null,
935 Codec::Boolean => Self::Boolean,
936 Codec::Int32 => Self::Int,
937 Codec::Int64 => Self::Long,
938 Codec::Float32 => Self::Float,
939 Codec::Float64 => Self::Double,
940 Codec::Binary => Self::Bytes,
941 Codec::Utf8 | Codec::Utf8View => Self::String,
942 Codec::Date32 => Self::Date,
943 Codec::TimeMillis => Self::TimeMillis,
944 Codec::TimeMicros => Self::TimeMicros,
945 Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
946 Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
947 Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
948 Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
949 Codec::Interval => Self::Duration,
950 Codec::Fixed(_) => Self::Fixed,
951 Codec::Decimal(..) => Self::Decimal,
952 Codec::Enum(_) => Self::Enum,
953 Codec::List(_) => Self::Array,
954 Codec::Struct(_) => Self::Record,
955 Codec::Map(_) => Self::Map,
956 Codec::Uuid => Self::Uuid,
957 Codec::Union(..) => Self::Union,
958 #[cfg(feature = "avro_custom_types")]
959 Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
960 #[cfg(feature = "avro_custom_types")]
961 Codec::DurationNanos
962 | Codec::DurationMicros
963 | Codec::DurationMillis
964 | Codec::DurationSeconds => Self::Duration,
965 }
966 }
967}
968
969fn union_branch_name(dt: &AvroDataType) -> String {
970 if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
971 if name.contains(".") {
972 return name.to_string();
974 }
975 if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
976 return format!("{ns}.{name}");
977 }
978 return name.to_string();
979 }
980 dt.codec.union_field_name()
981}
982
983fn build_union_fields(encodings: &[AvroDataType]) -> UnionFields {
984 let arrow_fields: Vec<Field> = encodings
985 .iter()
986 .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
987 .collect();
988 let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
989 UnionFields::new(type_ids, arrow_fields)
990}
991
992#[derive(Debug, Default)]
996struct Resolver<'a> {
997 map: HashMap<(&'a str, &'a str), AvroDataType>,
998}
999
1000impl<'a> Resolver<'a> {
1001 fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1002 self.map.insert((namespace.unwrap_or(""), name), schema);
1003 }
1004
1005 fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1006 let (namespace, name) = name
1007 .rsplit_once('.')
1008 .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1009 self.map
1010 .get(&(namespace, name))
1011 .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1012 .cloned()
1013 }
1014}
1015
1016fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1017 let mut out = HashSet::with_capacity(1 + aliases.len());
1018 let (full, _) = make_full_name(name, ns, None);
1019 out.insert(full);
1020 for a in aliases {
1021 let (fa, _) = make_full_name(a, None, ns);
1022 out.insert(fa);
1023 }
1024 out
1025}
1026
1027fn names_match(
1028 writer_name: &str,
1029 writer_namespace: Option<&str>,
1030 writer_aliases: &[&str],
1031 reader_name: &str,
1032 reader_namespace: Option<&str>,
1033 reader_aliases: &[&str],
1034) -> bool {
1035 let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1036 let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1037 !writer_set.is_disjoint(&reader_set)
1039}
1040
1041fn ensure_names_match(
1042 data_type: &str,
1043 writer_name: &str,
1044 writer_namespace: Option<&str>,
1045 writer_aliases: &[&str],
1046 reader_name: &str,
1047 reader_namespace: Option<&str>,
1048 reader_aliases: &[&str],
1049) -> Result<(), ArrowError> {
1050 if names_match(
1051 writer_name,
1052 writer_namespace,
1053 writer_aliases,
1054 reader_name,
1055 reader_namespace,
1056 reader_aliases,
1057 ) {
1058 Ok(())
1059 } else {
1060 Err(ArrowError::ParseError(format!(
1061 "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1062 )))
1063 }
1064}
1065
1066fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1067 match schema {
1068 Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1069 Schema::Type(Type {
1070 r#type: TypeName::Primitive(primitive),
1071 ..
1072 }) => Some(*primitive),
1073 _ => None,
1074 }
1075}
1076
1077fn nullable_union_variants<'x, 'y>(
1078 variant: &'y [Schema<'x>],
1079) -> Option<(Nullability, &'y Schema<'x>)> {
1080 if variant.len() != 2 {
1081 return None;
1082 }
1083 let is_null = |schema: &Schema<'x>| {
1084 matches!(
1085 schema,
1086 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1087 )
1088 };
1089 match (is_null(&variant[0]), is_null(&variant[1])) {
1090 (true, false) => Some((Nullability::NullFirst, &variant[1])),
1091 (false, true) => Some((Nullability::NullSecond, &variant[0])),
1092 _ => None,
1093 }
1094}
1095
1096#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1097enum UnionBranchKey {
1098 Named(String),
1099 Primitive(PrimitiveType),
1100 Array,
1101 Map,
1102}
1103
1104fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1105 let (name, namespace) = match s {
1106 Schema::TypeName(TypeName::Primitive(p))
1107 | Schema::Type(Type {
1108 r#type: TypeName::Primitive(p),
1109 ..
1110 }) => return Some(UnionBranchKey::Primitive(*p)),
1111 Schema::TypeName(TypeName::Ref(name))
1112 | Schema::Type(Type {
1113 r#type: TypeName::Ref(name),
1114 ..
1115 }) => (name, None),
1116 Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1117 Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1118 Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1119 Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1120 Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1121 Schema::Union(_) => return None,
1122 };
1123 let (full, _) = make_full_name(name, namespace, enclosing_ns);
1124 Some(UnionBranchKey::Named(full))
1125}
1126
1127fn union_first_duplicate<'a>(
1128 branches: &'a [Schema<'a>],
1129 enclosing_ns: Option<&'a str>,
1130) -> Option<String> {
1131 let mut seen = HashSet::with_capacity(branches.len());
1132 for schema in branches {
1133 if let Some(key) = branch_key_of(schema, enclosing_ns) {
1134 if !seen.insert(key.clone()) {
1135 let msg = match key {
1136 UnionBranchKey::Named(full) => format!("named type {full}"),
1137 UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1138 UnionBranchKey::Array => "array".to_string(),
1139 UnionBranchKey::Map => "map".to_string(),
1140 };
1141 return Some(msg);
1142 }
1143 }
1144 }
1145 None
1146}
1147
1148struct Maker<'a> {
1152 resolver: Resolver<'a>,
1153 use_utf8view: bool,
1154 strict_mode: bool,
1155}
1156
1157impl<'a> Maker<'a> {
1158 fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1159 Self {
1160 resolver: Default::default(),
1161 use_utf8view,
1162 strict_mode,
1163 }
1164 }
1165
1166 #[cfg(feature = "avro_custom_types")]
1167 #[inline]
1168 fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1169 if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1170 let mut inner = (*values).clone();
1171 inner.nullability = Some(nb);
1172 dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1173 }
1174 }
1175
1176 fn make_data_type<'s>(
1177 &mut self,
1178 writer_schema: &'s Schema<'a>,
1179 reader_schema: Option<&'s Schema<'a>>,
1180 namespace: Option<&'a str>,
1181 ) -> Result<AvroDataType, ArrowError> {
1182 match reader_schema {
1183 Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1184 None => self.parse_type(writer_schema, namespace),
1185 }
1186 }
1187
1188 fn parse_type<'s>(
1201 &mut self,
1202 schema: &'s Schema<'a>,
1203 namespace: Option<&'a str>,
1204 ) -> Result<AvroDataType, ArrowError> {
1205 match schema {
1206 Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1207 Codec::from(*p).with_utf8view(self.use_utf8view),
1208 Default::default(),
1209 None,
1210 )),
1211 Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1212 Schema::Union(f) => {
1213 let null = f
1214 .iter()
1215 .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1216 match (f.len() == 2, null) {
1217 (true, Some(0)) => {
1218 let mut field = self.parse_type(&f[1], namespace)?;
1219 field.nullability = Some(Nullability::NullFirst);
1220 #[cfg(feature = "avro_custom_types")]
1221 Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1222 return Ok(field);
1223 }
1224 (true, Some(1)) => {
1225 if self.strict_mode {
1226 return Err(ArrowError::SchemaError(
1227 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1228 .to_string(),
1229 ));
1230 }
1231 let mut field = self.parse_type(&f[0], namespace)?;
1232 field.nullability = Some(Nullability::NullSecond);
1233 #[cfg(feature = "avro_custom_types")]
1234 Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1235 return Ok(field);
1236 }
1237 _ => {}
1238 }
1239 if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1241 return Err(ArrowError::SchemaError(
1242 "Avro unions may not immediately contain other unions".to_string(),
1243 ));
1244 }
1245 if let Some(dup) = union_first_duplicate(f, namespace) {
1247 return Err(ArrowError::SchemaError(format!(
1248 "Avro union contains duplicate branch type: {dup}"
1249 )));
1250 }
1251 let children: Vec<AvroDataType> = f
1253 .iter()
1254 .map(|s| self.parse_type(s, namespace))
1255 .collect::<Result<_, _>>()?;
1256 let union_fields = build_union_fields(&children);
1258 Ok(AvroDataType::new(
1259 Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1260 Default::default(),
1261 None,
1262 ))
1263 }
1264 Schema::Complex(c) => match c {
1265 ComplexType::Record(r) => {
1266 let namespace = r.namespace.or(namespace);
1267 let mut metadata = r.attributes.field_metadata();
1268 let fields = r
1269 .fields
1270 .iter()
1271 .map(|field| {
1272 Ok(AvroField {
1273 name: field.name.to_string(),
1274 data_type: self.parse_type(&field.r#type, namespace)?,
1275 })
1276 })
1277 .collect::<Result<_, ArrowError>>()?;
1278 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1279 if let Some(ns) = namespace {
1280 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1281 }
1282 let field = AvroDataType {
1283 nullability: None,
1284 codec: Codec::Struct(fields),
1285 metadata,
1286 resolution: None,
1287 };
1288 self.resolver.register(r.name, namespace, field.clone());
1289 Ok(field)
1290 }
1291 ComplexType::Array(a) => {
1292 let field = self.parse_type(a.items.as_ref(), namespace)?;
1293 Ok(AvroDataType {
1294 nullability: None,
1295 metadata: a.attributes.field_metadata(),
1296 codec: Codec::List(Arc::new(field)),
1297 resolution: None,
1298 })
1299 }
1300 ComplexType::Fixed(f) => {
1301 let size = f.size.try_into().map_err(|e| {
1302 ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1303 })?;
1304 let namespace = f.namespace.or(namespace);
1305 let mut metadata = f.attributes.field_metadata();
1306 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1307 if let Some(ns) = namespace {
1308 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1309 }
1310 let field = match f.attributes.logical_type {
1311 Some("decimal") => {
1312 let (precision, scale, _) =
1313 parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1314 AvroDataType {
1315 nullability: None,
1316 metadata,
1317 codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1318 resolution: None,
1319 }
1320 }
1321 Some("duration") => {
1322 if size != 12 {
1323 return Err(ArrowError::ParseError(format!(
1324 "Invalid fixed size for Duration: {size}, must be 12"
1325 )));
1326 };
1327 AvroDataType {
1328 nullability: None,
1329 metadata,
1330 codec: Codec::Interval,
1331 resolution: None,
1332 }
1333 }
1334 _ => AvroDataType {
1335 nullability: None,
1336 metadata,
1337 codec: Codec::Fixed(size),
1338 resolution: None,
1339 },
1340 };
1341 self.resolver.register(f.name, namespace, field.clone());
1342 Ok(field)
1343 }
1344 ComplexType::Enum(e) => {
1345 let namespace = e.namespace.or(namespace);
1346 let symbols = e
1347 .symbols
1348 .iter()
1349 .map(|s| s.to_string())
1350 .collect::<Arc<[String]>>();
1351 let mut metadata = e.attributes.field_metadata();
1352 let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1353 ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1354 })?;
1355 metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1356 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1357 if let Some(ns) = namespace {
1358 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1359 }
1360 let field = AvroDataType {
1361 nullability: None,
1362 metadata,
1363 codec: Codec::Enum(symbols),
1364 resolution: None,
1365 };
1366 self.resolver.register(e.name, namespace, field.clone());
1367 Ok(field)
1368 }
1369 ComplexType::Map(m) => {
1370 let val = self.parse_type(&m.values, namespace)?;
1371 Ok(AvroDataType {
1372 nullability: None,
1373 metadata: m.attributes.field_metadata(),
1374 codec: Codec::Map(Arc::new(val)),
1375 resolution: None,
1376 })
1377 }
1378 },
1379 Schema::Type(t) => {
1380 let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1381 match (t.attributes.logical_type, &mut field.codec) {
1383 (Some("decimal"), c @ Codec::Binary) => {
1384 let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1385 *c = Codec::Decimal(prec, Some(sc), None);
1386 }
1387 (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1388 (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1389 (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1390 (Some("timestamp-millis"), c @ Codec::Int64) => {
1391 *c = Codec::TimestampMillis(true)
1392 }
1393 (Some("timestamp-micros"), c @ Codec::Int64) => {
1394 *c = Codec::TimestampMicros(true)
1395 }
1396 (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1397 *c = Codec::TimestampMillis(false)
1398 }
1399 (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1400 *c = Codec::TimestampMicros(false)
1401 }
1402 (Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
1403 #[cfg(feature = "avro_custom_types")]
1404 (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1405 #[cfg(feature = "avro_custom_types")]
1406 (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1407 #[cfg(feature = "avro_custom_types")]
1408 (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1409 #[cfg(feature = "avro_custom_types")]
1410 (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1411 *c = Codec::DurationSeconds
1412 }
1413 #[cfg(feature = "avro_custom_types")]
1414 (Some("arrow.run-end-encoded"), _) => {
1415 let bits_u8: u8 = t
1416 .attributes
1417 .additional
1418 .get("arrow.runEndIndexBits")
1419 .and_then(|v| v.as_u64())
1420 .and_then(|n| u8::try_from(n).ok())
1421 .ok_or_else(|| ArrowError::ParseError(
1422 "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1423 .to_string(),
1424 ))?;
1425 if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1426 return Err(ArrowError::ParseError(format!(
1427 "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1428 )));
1429 }
1430 let values_site = field.clone();
1432 field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1433 }
1434 (Some(logical), _) => {
1435 field.metadata.insert("logicalType".into(), logical.into());
1437 }
1438 (None, _) => {}
1439 }
1440 if !t.attributes.additional.is_empty() {
1441 for (k, v) in &t.attributes.additional {
1442 field.metadata.insert(k.to_string(), v.to_string());
1443 }
1444 }
1445 Ok(field)
1446 }
1447 }
1448 }
1449
1450 fn resolve_type<'s>(
1451 &mut self,
1452 writer_schema: &'s Schema<'a>,
1453 reader_schema: &'s Schema<'a>,
1454 namespace: Option<&'a str>,
1455 ) -> Result<AvroDataType, ArrowError> {
1456 if let (Some(write_primitive), Some(read_primitive)) =
1457 (primitive_of(writer_schema), primitive_of(reader_schema))
1458 {
1459 return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1460 }
1461 match (writer_schema, reader_schema) {
1462 (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1463 let writer_variants = writer_variants.as_slice();
1464 let reader_variants = reader_variants.as_slice();
1465 match (
1466 nullable_union_variants(writer_variants),
1467 nullable_union_variants(reader_variants),
1468 ) {
1469 (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
1470 let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?;
1471 dt.nullability = Some(w_nb);
1472 #[cfg(feature = "avro_custom_types")]
1473 Self::propagate_nullability_into_ree(&mut dt, w_nb);
1474 Ok(dt)
1475 }
1476 _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1477 }
1478 }
1479 (Schema::Union(writer_variants), reader_non_union) => {
1480 let writer_to_reader: Vec<Option<(usize, Promotion)>> = writer_variants
1481 .iter()
1482 .map(|writer| {
1483 self.resolve_type(writer, reader_non_union, namespace)
1484 .ok()
1485 .map(|tmp| (0usize, Self::coercion_from(&tmp)))
1486 })
1487 .collect();
1488 let mut dt = self.parse_type(reader_non_union, namespace)?;
1489 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1490 writer_to_reader: Arc::from(writer_to_reader),
1491 writer_is_union: true,
1492 reader_is_union: false,
1493 }));
1494 Ok(dt)
1495 }
1496 (writer_non_union, Schema::Union(reader_variants)) => {
1497 let promo = self.find_best_promotion(
1498 writer_non_union,
1499 reader_variants.as_slice(),
1500 namespace,
1501 );
1502 let Some((reader_index, promotion)) = promo else {
1503 return Err(ArrowError::SchemaError(
1504 "Writer schema does not match any reader union branch".to_string(),
1505 ));
1506 };
1507 let mut dt = self.parse_type(reader_schema, namespace)?;
1508 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1509 writer_to_reader: Arc::from(vec![Some((reader_index, promotion))]),
1510 writer_is_union: false,
1511 reader_is_union: true,
1512 }));
1513 Ok(dt)
1514 }
1515 (
1516 Schema::Complex(ComplexType::Array(writer_array)),
1517 Schema::Complex(ComplexType::Array(reader_array)),
1518 ) => self.resolve_array(writer_array, reader_array, namespace),
1519 (
1520 Schema::Complex(ComplexType::Map(writer_map)),
1521 Schema::Complex(ComplexType::Map(reader_map)),
1522 ) => self.resolve_map(writer_map, reader_map, namespace),
1523 (
1524 Schema::Complex(ComplexType::Fixed(writer_fixed)),
1525 Schema::Complex(ComplexType::Fixed(reader_fixed)),
1526 ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1527 (
1528 Schema::Complex(ComplexType::Record(writer_record)),
1529 Schema::Complex(ComplexType::Record(reader_record)),
1530 ) => self.resolve_records(writer_record, reader_record, namespace),
1531 (
1532 Schema::Complex(ComplexType::Enum(writer_enum)),
1533 Schema::Complex(ComplexType::Enum(reader_enum)),
1534 ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1535 (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1536 (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1537 _ => Err(ArrowError::NotYetImplemented(
1538 "Other resolutions not yet implemented".to_string(),
1539 )),
1540 }
1541 }
1542
1543 #[inline]
1544 fn coercion_from(dt: &AvroDataType) -> Promotion {
1545 match dt.resolution.as_ref() {
1546 Some(ResolutionInfo::Promotion(promotion)) => *promotion,
1547 _ => Promotion::Direct,
1548 }
1549 }
1550
1551 fn find_best_promotion(
1552 &mut self,
1553 writer: &Schema<'a>,
1554 reader_variants: &[Schema<'a>],
1555 namespace: Option<&'a str>,
1556 ) -> Option<(usize, Promotion)> {
1557 let mut first_promotion: Option<(usize, Promotion)> = None;
1558 for (reader_index, reader) in reader_variants.iter().enumerate() {
1559 if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
1560 let promotion = Self::coercion_from(&tmp);
1561 if promotion == Promotion::Direct {
1562 return Some((reader_index, promotion));
1564 } else if first_promotion.is_none() {
1565 first_promotion = Some((reader_index, promotion));
1567 }
1568 }
1569 }
1570 first_promotion
1571 }
1572
1573 fn resolve_unions<'s>(
1574 &mut self,
1575 writer_variants: &'s [Schema<'a>],
1576 reader_variants: &'s [Schema<'a>],
1577 namespace: Option<&'a str>,
1578 ) -> Result<AvroDataType, ArrowError> {
1579 let reader_encodings: Vec<AvroDataType> = reader_variants
1580 .iter()
1581 .map(|reader_schema| self.parse_type(reader_schema, namespace))
1582 .collect::<Result<_, _>>()?;
1583 let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
1584 Vec::with_capacity(writer_variants.len());
1585 for writer in writer_variants {
1586 writer_to_reader.push(self.find_best_promotion(writer, reader_variants, namespace));
1587 }
1588 let union_fields = build_union_fields(&reader_encodings);
1589 let mut dt = AvroDataType::new(
1590 Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1591 Default::default(),
1592 None,
1593 );
1594 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1595 writer_to_reader: Arc::from(writer_to_reader),
1596 writer_is_union: true,
1597 reader_is_union: true,
1598 }));
1599 Ok(dt)
1600 }
1601
1602 fn resolve_array(
1603 &mut self,
1604 writer_array: &Array<'a>,
1605 reader_array: &Array<'a>,
1606 namespace: Option<&'a str>,
1607 ) -> Result<AvroDataType, ArrowError> {
1608 Ok(AvroDataType {
1609 nullability: None,
1610 metadata: reader_array.attributes.field_metadata(),
1611 codec: Codec::List(Arc::new(self.make_data_type(
1612 writer_array.items.as_ref(),
1613 Some(reader_array.items.as_ref()),
1614 namespace,
1615 )?)),
1616 resolution: None,
1617 })
1618 }
1619
1620 fn resolve_map(
1621 &mut self,
1622 writer_map: &Map<'a>,
1623 reader_map: &Map<'a>,
1624 namespace: Option<&'a str>,
1625 ) -> Result<AvroDataType, ArrowError> {
1626 Ok(AvroDataType {
1627 nullability: None,
1628 metadata: reader_map.attributes.field_metadata(),
1629 codec: Codec::Map(Arc::new(self.make_data_type(
1630 &writer_map.values,
1631 Some(&reader_map.values),
1632 namespace,
1633 )?)),
1634 resolution: None,
1635 })
1636 }
1637
1638 fn resolve_fixed<'s>(
1639 &mut self,
1640 writer_fixed: &Fixed<'a>,
1641 reader_fixed: &Fixed<'a>,
1642 reader_schema: &'s Schema<'a>,
1643 namespace: Option<&'a str>,
1644 ) -> Result<AvroDataType, ArrowError> {
1645 ensure_names_match(
1646 "Fixed",
1647 writer_fixed.name,
1648 writer_fixed.namespace,
1649 &writer_fixed.aliases,
1650 reader_fixed.name,
1651 reader_fixed.namespace,
1652 &reader_fixed.aliases,
1653 )?;
1654 if writer_fixed.size != reader_fixed.size {
1655 return Err(ArrowError::SchemaError(format!(
1656 "Fixed size mismatch for {}: writer={}, reader={}",
1657 reader_fixed.name, writer_fixed.size, reader_fixed.size
1658 )));
1659 }
1660 self.parse_type(reader_schema, namespace)
1661 }
1662
1663 fn resolve_primitives(
1664 &mut self,
1665 write_primitive: PrimitiveType,
1666 read_primitive: PrimitiveType,
1667 reader_schema: &Schema<'a>,
1668 ) -> Result<AvroDataType, ArrowError> {
1669 if write_primitive == read_primitive {
1670 return self.parse_type(reader_schema, None);
1671 }
1672 let promotion = match (write_primitive, read_primitive) {
1673 (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
1674 (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
1675 (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
1676 (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
1677 (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
1678 (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
1679 (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
1680 (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
1681 _ => {
1682 return Err(ArrowError::ParseError(format!(
1683 "Illegal promotion {write_primitive:?} to {read_primitive:?}"
1684 )));
1685 }
1686 };
1687 let mut datatype = self.parse_type(reader_schema, None)?;
1688 datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
1689 Ok(datatype)
1690 }
1691
1692 fn resolve_enums(
1748 &mut self,
1749 writer_enum: &Enum<'a>,
1750 reader_enum: &Enum<'a>,
1751 reader_schema: &Schema<'a>,
1752 namespace: Option<&'a str>,
1753 ) -> Result<AvroDataType, ArrowError> {
1754 ensure_names_match(
1755 "Enum",
1756 writer_enum.name,
1757 writer_enum.namespace,
1758 &writer_enum.aliases,
1759 reader_enum.name,
1760 reader_enum.namespace,
1761 &reader_enum.aliases,
1762 )?;
1763 if writer_enum.symbols == reader_enum.symbols {
1764 return self.parse_type(reader_schema, namespace);
1765 }
1766 let reader_index: HashMap<&str, i32> = reader_enum
1767 .symbols
1768 .iter()
1769 .enumerate()
1770 .map(|(index, &symbol)| (symbol, index as i32))
1771 .collect();
1772 let default_index: i32 = match reader_enum.default {
1773 Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
1774 ArrowError::SchemaError(format!(
1775 "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
1776 reader_enum.name,
1777 ))
1778 })?,
1779 None => -1,
1780 };
1781 let mapping: Vec<i32> = writer_enum
1782 .symbols
1783 .iter()
1784 .map(|&write_symbol| {
1785 reader_index
1786 .get(write_symbol)
1787 .copied()
1788 .unwrap_or(default_index)
1789 })
1790 .collect();
1791 if self.strict_mode && mapping.iter().any(|&m| m < 0) {
1792 return Err(ArrowError::SchemaError(format!(
1793 "Reader enum '{}' does not cover all writer symbols and no default is provided",
1794 reader_enum.name
1795 )));
1796 }
1797 let mut dt = self.parse_type(reader_schema, namespace)?;
1798 dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
1799 mapping: Arc::from(mapping),
1800 default_index,
1801 }));
1802 let reader_ns = reader_enum.namespace.or(namespace);
1803 self.resolver
1804 .register(reader_enum.name, reader_ns, dt.clone());
1805 Ok(dt)
1806 }
1807
1808 #[inline]
1809 fn build_writer_lookup(
1810 writer_record: &Record<'a>,
1811 ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
1812 let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
1813 for (idx, wf) in writer_record.fields.iter().enumerate() {
1814 map.insert(wf.name, idx);
1816 }
1817 let mut ambiguous: HashSet<&str> = HashSet::new();
1819 for (idx, wf) in writer_record.fields.iter().enumerate() {
1820 for &alias in &wf.aliases {
1821 match map.entry(alias) {
1822 Entry::Occupied(e) if *e.get() != idx => {
1823 ambiguous.insert(alias);
1824 }
1825 Entry::Vacant(e) => {
1826 e.insert(idx);
1827 }
1828 _ => {}
1829 }
1830 }
1831 }
1832 (map, ambiguous)
1833 }
1834
1835 fn resolve_records(
1836 &mut self,
1837 writer_record: &Record<'a>,
1838 reader_record: &Record<'a>,
1839 namespace: Option<&'a str>,
1840 ) -> Result<AvroDataType, ArrowError> {
1841 ensure_names_match(
1842 "Record",
1843 writer_record.name,
1844 writer_record.namespace,
1845 &writer_record.aliases,
1846 reader_record.name,
1847 reader_record.namespace,
1848 &reader_record.aliases,
1849 )?;
1850 let writer_ns = writer_record.namespace.or(namespace);
1851 let reader_ns = reader_record.namespace.or(namespace);
1852 let reader_md = reader_record.attributes.field_metadata();
1853 let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
1855 let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
1856 let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
1857 let mut default_fields: Vec<usize> = Vec::new();
1859 for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
1860 let mut match_idx = writer_lookup.get(r_field.name).copied();
1862 let mut matched_via_alias: Option<&str> = None;
1863 if match_idx.is_none() {
1864 for &alias in &r_field.aliases {
1865 if let Some(i) = writer_lookup.get(alias).copied() {
1866 if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
1867 return Err(ArrowError::SchemaError(format!(
1868 "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
1869 r_field.name
1870 )));
1871 }
1872 match_idx = Some(i);
1873 matched_via_alias = Some(alias);
1874 break;
1875 }
1876 }
1877 }
1878 if let Some(wi) = match_idx {
1879 if writer_to_reader[wi].is_none() {
1880 let w_schema = &writer_record.fields[wi].r#type;
1881 let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
1882 writer_to_reader[wi] = Some(reader_idx);
1883 reader_fields.push(AvroField {
1884 name: r_field.name.to_owned(),
1885 data_type: dt,
1886 });
1887 continue;
1888 } else if self.strict_mode {
1889 let existing_reader = writer_to_reader[wi].unwrap();
1891 let via = matched_via_alias
1892 .map(|a| format!("alias '{a}'"))
1893 .unwrap_or_else(|| "name match".to_string());
1894 return Err(ArrowError::SchemaError(format!(
1895 "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
1896 writer_record.fields[wi].name
1897 )));
1898 }
1899 }
1901 let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
1903 if let Some(default_json) = r_field.default.as_ref() {
1904 dt.resolution = Some(ResolutionInfo::DefaultValue(
1905 dt.parse_and_store_default(default_json)?,
1906 ));
1907 default_fields.push(reader_idx);
1908 } else if dt.nullability() == Some(Nullability::NullFirst) {
1909 dt.resolution = Some(ResolutionInfo::DefaultValue(
1911 dt.parse_and_store_default(&Value::Null)?,
1912 ));
1913 default_fields.push(reader_idx);
1914 } else {
1915 return Err(ArrowError::SchemaError(format!(
1916 "Reader field '{}' not present in writer schema must have a default value",
1917 r_field.name
1918 )));
1919 }
1920 reader_fields.push(AvroField {
1921 name: r_field.name.to_owned(),
1922 data_type: dt,
1923 });
1924 }
1925 let mut skip_fields: Vec<Option<AvroDataType>> =
1927 Vec::with_capacity(writer_record.fields.len());
1928 for (writer_index, writer_field) in writer_record.fields.iter().enumerate() {
1929 if writer_to_reader[writer_index].is_some() {
1930 skip_fields.push(None);
1931 } else {
1932 skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?));
1933 }
1934 }
1935 let resolved = AvroDataType::new_with_resolution(
1936 Codec::Struct(Arc::from(reader_fields)),
1937 reader_md,
1938 None,
1939 Some(ResolutionInfo::Record(ResolvedRecord {
1940 writer_to_reader: Arc::from(writer_to_reader),
1941 default_fields: Arc::from(default_fields),
1942 skip_fields: Arc::from(skip_fields),
1943 })),
1944 );
1945 self.resolver
1947 .register(reader_record.name, reader_ns, resolved.clone());
1948 Ok(resolved)
1949 }
1950}
1951
1952#[cfg(test)]
1953mod tests {
1954 use super::*;
1955 use crate::schema::{
1956 AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
1957 Fixed, PrimitiveType, Record, Schema, Type, TypeName,
1958 };
1959 use indexmap::IndexMap;
1960 use serde_json::{self, Value};
1961
1962 fn create_schema_with_logical_type(
1963 primitive_type: PrimitiveType,
1964 logical_type: &'static str,
1965 ) -> Schema<'static> {
1966 let attributes = Attributes {
1967 logical_type: Some(logical_type),
1968 additional: Default::default(),
1969 };
1970
1971 Schema::Type(Type {
1972 r#type: TypeName::Primitive(primitive_type),
1973 attributes,
1974 })
1975 }
1976
1977 fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
1978 let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
1979 let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
1980 let mut maker = Maker::new(false, false);
1981 maker
1982 .make_data_type(&writer_schema, Some(&reader_schema), None)
1983 .expect("promotion should resolve")
1984 }
1985
1986 fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
1987 Schema::TypeName(TypeName::Primitive(pt))
1988 }
1989 fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
1990 Schema::Union(branches)
1991 }
1992
1993 #[test]
1994 fn test_date_logical_type() {
1995 let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
1996
1997 let mut maker = Maker::new(false, false);
1998 let result = maker.make_data_type(&schema, None, None).unwrap();
1999
2000 assert!(matches!(result.codec, Codec::Date32));
2001 }
2002
2003 #[test]
2004 fn test_time_millis_logical_type() {
2005 let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2006
2007 let mut maker = Maker::new(false, false);
2008 let result = maker.make_data_type(&schema, None, None).unwrap();
2009
2010 assert!(matches!(result.codec, Codec::TimeMillis));
2011 }
2012
2013 #[test]
2014 fn test_time_micros_logical_type() {
2015 let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2016
2017 let mut maker = Maker::new(false, false);
2018 let result = maker.make_data_type(&schema, None, None).unwrap();
2019
2020 assert!(matches!(result.codec, Codec::TimeMicros));
2021 }
2022
2023 #[test]
2024 fn test_timestamp_millis_logical_type() {
2025 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2026
2027 let mut maker = Maker::new(false, false);
2028 let result = maker.make_data_type(&schema, None, None).unwrap();
2029
2030 assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2031 }
2032
2033 #[test]
2034 fn test_timestamp_micros_logical_type() {
2035 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2036
2037 let mut maker = Maker::new(false, false);
2038 let result = maker.make_data_type(&schema, None, None).unwrap();
2039
2040 assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2041 }
2042
2043 #[test]
2044 fn test_local_timestamp_millis_logical_type() {
2045 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2046
2047 let mut maker = Maker::new(false, false);
2048 let result = maker.make_data_type(&schema, None, None).unwrap();
2049
2050 assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2051 }
2052
2053 #[test]
2054 fn test_local_timestamp_micros_logical_type() {
2055 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2056
2057 let mut maker = Maker::new(false, false);
2058 let result = maker.make_data_type(&schema, None, None).unwrap();
2059
2060 assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2061 }
2062
2063 #[test]
2064 fn test_uuid_type() {
2065 let mut codec = Codec::Fixed(16);
2066 if let c @ Codec::Fixed(16) = &mut codec {
2067 *c = Codec::Uuid;
2068 }
2069 assert!(matches!(codec, Codec::Uuid));
2070 }
2071
2072 #[test]
2073 fn test_duration_logical_type() {
2074 let mut codec = Codec::Fixed(12);
2075
2076 if let c @ Codec::Fixed(12) = &mut codec {
2077 *c = Codec::Interval;
2078 }
2079
2080 assert!(matches!(codec, Codec::Interval));
2081 }
2082
2083 #[test]
2084 fn test_decimal_logical_type_not_implemented() {
2085 let codec = Codec::Fixed(16);
2086
2087 let process_decimal = || -> Result<(), ArrowError> {
2088 if let Codec::Fixed(_) = codec {
2089 return Err(ArrowError::NotYetImplemented(
2090 "Decimals are not currently supported".to_string(),
2091 ));
2092 }
2093 Ok(())
2094 };
2095
2096 let result = process_decimal();
2097
2098 assert!(result.is_err());
2099 if let Err(ArrowError::NotYetImplemented(msg)) = result {
2100 assert!(msg.contains("Decimals are not currently supported"));
2101 } else {
2102 panic!("Expected NotYetImplemented error");
2103 }
2104 }
2105 #[test]
2106 fn test_unknown_logical_type_added_to_metadata() {
2107 let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2108
2109 let mut maker = Maker::new(false, false);
2110 let result = maker.make_data_type(&schema, None, None).unwrap();
2111
2112 assert_eq!(
2113 result.metadata.get("logicalType"),
2114 Some(&"custom-type".to_string())
2115 );
2116 }
2117
2118 #[test]
2119 fn test_string_with_utf8view_enabled() {
2120 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2121
2122 let mut maker = Maker::new(true, false);
2123 let result = maker.make_data_type(&schema, None, None).unwrap();
2124
2125 assert!(matches!(result.codec, Codec::Utf8View));
2126 }
2127
2128 #[test]
2129 fn test_string_without_utf8view_enabled() {
2130 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2131
2132 let mut maker = Maker::new(false, false);
2133 let result = maker.make_data_type(&schema, None, None).unwrap();
2134
2135 assert!(matches!(result.codec, Codec::Utf8));
2136 }
2137
2138 #[test]
2139 fn test_record_with_string_and_utf8view_enabled() {
2140 let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2141
2142 let avro_field = crate::schema::Field {
2143 name: "string_field",
2144 r#type: field_schema,
2145 default: None,
2146 doc: None,
2147 aliases: vec![],
2148 };
2149
2150 let record = Record {
2151 name: "test_record",
2152 namespace: None,
2153 aliases: vec![],
2154 doc: None,
2155 fields: vec![avro_field],
2156 attributes: Attributes::default(),
2157 };
2158
2159 let schema = Schema::Complex(ComplexType::Record(record));
2160
2161 let mut maker = Maker::new(true, false);
2162 let result = maker.make_data_type(&schema, None, None).unwrap();
2163
2164 if let Codec::Struct(fields) = &result.codec {
2165 let first_field_codec = &fields[0].data_type().codec;
2166 assert!(matches!(first_field_codec, Codec::Utf8View));
2167 } else {
2168 panic!("Expected Struct codec");
2169 }
2170 }
2171
2172 #[test]
2173 fn test_union_with_strict_mode() {
2174 let schema = Schema::Union(vec![
2175 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2176 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2177 ]);
2178
2179 let mut maker = Maker::new(false, true);
2180 let result = maker.make_data_type(&schema, None, None);
2181
2182 assert!(result.is_err());
2183 match result {
2184 Err(ArrowError::SchemaError(msg)) => {
2185 assert!(msg.contains(
2186 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2187 ));
2188 }
2189 _ => panic!("Expected SchemaError"),
2190 }
2191 }
2192
2193 #[test]
2194 fn test_resolve_int_to_float_promotion() {
2195 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2196 assert!(matches!(result.codec, Codec::Float32));
2197 assert_eq!(
2198 result.resolution,
2199 Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2200 );
2201 }
2202
2203 #[test]
2204 fn test_resolve_int_to_double_promotion() {
2205 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2206 assert!(matches!(result.codec, Codec::Float64));
2207 assert_eq!(
2208 result.resolution,
2209 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2210 );
2211 }
2212
2213 #[test]
2214 fn test_resolve_long_to_float_promotion() {
2215 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2216 assert!(matches!(result.codec, Codec::Float32));
2217 assert_eq!(
2218 result.resolution,
2219 Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2220 );
2221 }
2222
2223 #[test]
2224 fn test_resolve_long_to_double_promotion() {
2225 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2226 assert!(matches!(result.codec, Codec::Float64));
2227 assert_eq!(
2228 result.resolution,
2229 Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2230 );
2231 }
2232
2233 #[test]
2234 fn test_resolve_float_to_double_promotion() {
2235 let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2236 assert!(matches!(result.codec, Codec::Float64));
2237 assert_eq!(
2238 result.resolution,
2239 Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2240 );
2241 }
2242
2243 #[test]
2244 fn test_resolve_string_to_bytes_promotion() {
2245 let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2246 assert!(matches!(result.codec, Codec::Binary));
2247 assert_eq!(
2248 result.resolution,
2249 Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2250 );
2251 }
2252
2253 #[test]
2254 fn test_resolve_bytes_to_string_promotion() {
2255 let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2256 assert!(matches!(result.codec, Codec::Utf8));
2257 assert_eq!(
2258 result.resolution,
2259 Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2260 );
2261 }
2262
2263 #[test]
2264 fn test_resolve_illegal_promotion_double_to_float_errors() {
2265 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2266 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2267 let mut maker = Maker::new(false, false);
2268 let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2269 assert!(result.is_err());
2270 match result {
2271 Err(ArrowError::ParseError(msg)) => {
2272 assert!(msg.contains("Illegal promotion"));
2273 }
2274 _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2275 }
2276 }
2277
2278 #[test]
2279 fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2280 let writer = Schema::Union(vec![
2281 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2282 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2283 ]);
2284 let reader = Schema::Union(vec![
2285 Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2286 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2287 ]);
2288 let mut maker = Maker::new(false, false);
2289 let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2290 assert!(matches!(result.codec, Codec::Float64));
2291 assert_eq!(
2292 result.resolution,
2293 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2294 );
2295 assert_eq!(result.nullability, Some(Nullability::NullFirst));
2296 }
2297
2298 #[test]
2299 fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2300 let writer = mk_union(vec![
2301 mk_primitive(PrimitiveType::String),
2302 mk_primitive(PrimitiveType::Long),
2303 ]);
2304 let reader = mk_primitive(PrimitiveType::Bytes);
2305 let mut maker = Maker::new(false, false);
2306 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2307 assert!(matches!(dt.codec(), Codec::Binary));
2308 let resolved = match dt.resolution {
2309 Some(ResolutionInfo::Union(u)) => u,
2310 other => panic!("expected union resolution info, got {other:?}"),
2311 };
2312 assert!(resolved.writer_is_union && !resolved.reader_is_union);
2313 assert_eq!(
2314 resolved.writer_to_reader.as_ref(),
2315 &[Some((0, Promotion::StringToBytes)), None]
2316 );
2317 }
2318
2319 #[test]
2320 fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2321 let writer = mk_primitive(PrimitiveType::Long);
2322 let reader = mk_union(vec![
2323 mk_primitive(PrimitiveType::Long),
2324 mk_primitive(PrimitiveType::Double),
2325 ]);
2326 let mut maker = Maker::new(false, false);
2327 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2328 let resolved = match dt.resolution {
2329 Some(ResolutionInfo::Union(u)) => u,
2330 other => panic!("expected union resolution info, got {other:?}"),
2331 };
2332 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2333 assert_eq!(
2334 resolved.writer_to_reader.as_ref(),
2335 &[Some((0, Promotion::Direct))]
2336 );
2337 }
2338
2339 #[test]
2340 fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2341 let writer = mk_primitive(PrimitiveType::Int);
2342 let reader = mk_union(vec![
2343 mk_primitive(PrimitiveType::Null),
2344 mk_primitive(PrimitiveType::Long),
2345 mk_primitive(PrimitiveType::String),
2346 ]);
2347 let mut maker = Maker::new(false, false);
2348 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2349 let resolved = match dt.resolution {
2350 Some(ResolutionInfo::Union(u)) => u,
2351 other => panic!("expected union resolution info, got {other:?}"),
2352 };
2353 assert_eq!(
2354 resolved.writer_to_reader.as_ref(),
2355 &[Some((1, Promotion::IntToLong))]
2356 );
2357 }
2358
2359 #[test]
2360 fn test_resolve_both_nullable_unions_direct_match() {
2361 let writer = mk_union(vec![
2362 mk_primitive(PrimitiveType::Null),
2363 mk_primitive(PrimitiveType::String),
2364 ]);
2365 let reader = mk_union(vec![
2366 mk_primitive(PrimitiveType::String),
2367 mk_primitive(PrimitiveType::Null),
2368 ]);
2369 let mut maker = Maker::new(false, false);
2370 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2371 assert!(matches!(dt.codec(), Codec::Utf8));
2372 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2373 assert!(dt.resolution.is_none());
2374 }
2375
2376 #[test]
2377 fn test_resolve_both_nullable_unions_with_promotion() {
2378 let writer = mk_union(vec![
2379 mk_primitive(PrimitiveType::Null),
2380 mk_primitive(PrimitiveType::Int),
2381 ]);
2382 let reader = mk_union(vec![
2383 mk_primitive(PrimitiveType::Double),
2384 mk_primitive(PrimitiveType::Null),
2385 ]);
2386 let mut maker = Maker::new(false, false);
2387 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2388 assert!(matches!(dt.codec(), Codec::Float64));
2389 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2390 assert_eq!(
2391 dt.resolution,
2392 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2393 );
2394 }
2395
2396 #[test]
2397 fn test_resolve_type_promotion() {
2398 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2399 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2400 let mut maker = Maker::new(false, false);
2401 let result = maker
2402 .make_data_type(&writer_schema, Some(&reader_schema), None)
2403 .unwrap();
2404 assert!(matches!(result.codec, Codec::Int64));
2405 assert_eq!(
2406 result.resolution,
2407 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2408 );
2409 }
2410
2411 #[test]
2412 fn test_nested_record_type_reuse_without_namespace() {
2413 let schema_str = r#"
2414 {
2415 "type": "record",
2416 "name": "Record",
2417 "fields": [
2418 {
2419 "name": "nested",
2420 "type": {
2421 "type": "record",
2422 "name": "Nested",
2423 "fields": [
2424 { "name": "nested_int", "type": "int" }
2425 ]
2426 }
2427 },
2428 { "name": "nestedRecord", "type": "Nested" },
2429 { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
2430 { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
2431 ]
2432 }
2433 "#;
2434
2435 let schema: Schema = serde_json::from_str(schema_str).unwrap();
2436
2437 let mut maker = Maker::new(false, false);
2438 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2439
2440 if let Codec::Struct(fields) = avro_data_type.codec() {
2441 assert_eq!(fields.len(), 4);
2442
2443 assert_eq!(fields[0].name(), "nested");
2445 let nested_data_type = fields[0].data_type();
2446 if let Codec::Struct(nested_fields) = nested_data_type.codec() {
2447 assert_eq!(nested_fields.len(), 1);
2448 assert_eq!(nested_fields[0].name(), "nested_int");
2449 assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
2450 } else {
2451 panic!(
2452 "'nested' field is not a struct but {:?}",
2453 nested_data_type.codec()
2454 );
2455 }
2456
2457 assert_eq!(fields[1].name(), "nestedRecord");
2459 let nested_record_data_type = fields[1].data_type();
2460 assert_eq!(
2461 nested_record_data_type.codec().data_type(),
2462 nested_data_type.codec().data_type()
2463 );
2464
2465 assert_eq!(fields[2].name(), "nestedArray");
2467 if let Codec::List(item_type) = fields[2].data_type().codec() {
2468 assert_eq!(
2469 item_type.codec().data_type(),
2470 nested_data_type.codec().data_type()
2471 );
2472 } else {
2473 panic!("'nestedArray' field is not a list");
2474 }
2475
2476 assert_eq!(fields[3].name(), "nestedMap");
2478 if let Codec::Map(value_type) = fields[3].data_type().codec() {
2479 assert_eq!(
2480 value_type.codec().data_type(),
2481 nested_data_type.codec().data_type()
2482 );
2483 } else {
2484 panic!("'nestedMap' field is not a map");
2485 }
2486 } else {
2487 panic!("Top-level schema is not a struct");
2488 }
2489 }
2490
2491 #[test]
2492 fn test_nested_enum_type_reuse_with_namespace() {
2493 let schema_str = r#"
2494 {
2495 "type": "record",
2496 "name": "Record",
2497 "namespace": "record_ns",
2498 "fields": [
2499 {
2500 "name": "status",
2501 "type": {
2502 "type": "enum",
2503 "name": "Status",
2504 "namespace": "enum_ns",
2505 "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
2506 }
2507 },
2508 { "name": "backupStatus", "type": "enum_ns.Status" },
2509 { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
2510 { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
2511 ]
2512 }
2513 "#;
2514
2515 let schema: Schema = serde_json::from_str(schema_str).unwrap();
2516
2517 let mut maker = Maker::new(false, false);
2518 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2519
2520 if let Codec::Struct(fields) = avro_data_type.codec() {
2521 assert_eq!(fields.len(), 4);
2522
2523 assert_eq!(fields[0].name(), "status");
2525 let status_data_type = fields[0].data_type();
2526 if let Codec::Enum(symbols) = status_data_type.codec() {
2527 assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
2528 } else {
2529 panic!(
2530 "'status' field is not an enum but {:?}",
2531 status_data_type.codec()
2532 );
2533 }
2534
2535 assert_eq!(fields[1].name(), "backupStatus");
2537 let backup_status_data_type = fields[1].data_type();
2538 assert_eq!(
2539 backup_status_data_type.codec().data_type(),
2540 status_data_type.codec().data_type()
2541 );
2542
2543 assert_eq!(fields[2].name(), "statusHistory");
2545 if let Codec::List(item_type) = fields[2].data_type().codec() {
2546 assert_eq!(
2547 item_type.codec().data_type(),
2548 status_data_type.codec().data_type()
2549 );
2550 } else {
2551 panic!("'statusHistory' field is not a list");
2552 }
2553
2554 assert_eq!(fields[3].name(), "statusMap");
2556 if let Codec::Map(value_type) = fields[3].data_type().codec() {
2557 assert_eq!(
2558 value_type.codec().data_type(),
2559 status_data_type.codec().data_type()
2560 );
2561 } else {
2562 panic!("'statusMap' field is not a map");
2563 }
2564 } else {
2565 panic!("Top-level schema is not a struct");
2566 }
2567 }
2568
2569 #[test]
2570 fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
2571 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2572 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2573 let mut maker = Maker::new(false, false);
2574 let data_type = maker
2575 .make_data_type(&writer_schema, Some(&reader_schema), None)
2576 .expect("resolution should succeed");
2577 let field = AvroField {
2578 name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
2579 data_type,
2580 };
2581 assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
2582 assert!(matches!(field.data_type().codec(), Codec::Utf8));
2583 }
2584
2585 fn json_string(s: &str) -> Value {
2586 Value::String(s.to_string())
2587 }
2588
2589 fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
2590 let stored = dt
2591 .metadata
2592 .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
2593 .cloned()
2594 .unwrap_or_default();
2595 let expected = serde_json::to_string(default_json).unwrap();
2596 assert_eq!(stored, expected, "stored default metadata should match");
2597 }
2598
2599 #[test]
2600 fn test_validate_and_store_default_null_and_nullability_rules() {
2601 let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
2602 let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
2603 assert_eq!(lit, AvroLiteral::Null);
2604 assert_default_stored(&dt_null, &Value::Null);
2605 let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2606 let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
2607 assert!(
2608 err.to_string()
2609 .contains("JSON null default is only valid for `null` type"),
2610 "unexpected error: {err}"
2611 );
2612 let mut dt_int_nf =
2613 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
2614 let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
2615 assert_eq!(lit2, AvroLiteral::Null);
2616 assert_default_stored(&dt_int_nf, &Value::Null);
2617 let mut dt_int_ns =
2618 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
2619 let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
2620 assert!(
2621 err2.to_string()
2622 .contains("JSON null default is only valid for `null` type"),
2623 "unexpected error: {err2}"
2624 );
2625 }
2626
2627 #[test]
2628 fn test_validate_and_store_default_primitives_and_temporal() {
2629 let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
2630 let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
2631 assert_eq!(lit, AvroLiteral::Boolean(true));
2632 assert_default_stored(&dt_bool, &Value::Bool(true));
2633 let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2634 let lit = dt_i32
2635 .parse_and_store_default(&serde_json::json!(123))
2636 .unwrap();
2637 assert_eq!(lit, AvroLiteral::Int(123));
2638 assert_default_stored(&dt_i32, &serde_json::json!(123));
2639 let err = dt_i32
2640 .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
2641 .unwrap_err();
2642 assert!(format!("{err}").contains("out of i32 range"));
2643 let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2644 let lit = dt_i64
2645 .parse_and_store_default(&serde_json::json!(1234567890))
2646 .unwrap();
2647 assert_eq!(lit, AvroLiteral::Long(1234567890));
2648 assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
2649 let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
2650 let lit = dt_f32
2651 .parse_and_store_default(&serde_json::json!(1.25))
2652 .unwrap();
2653 assert_eq!(lit, AvroLiteral::Float(1.25));
2654 assert_default_stored(&dt_f32, &serde_json::json!(1.25));
2655 let err = dt_f32
2656 .parse_and_store_default(&serde_json::json!(1e39))
2657 .unwrap_err();
2658 assert!(format!("{err}").contains("out of f32 range"));
2659 let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2660 let lit = dt_f64
2661 .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
2662 .unwrap();
2663 assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
2664 assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
2665 let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
2666 let l = dt_str
2667 .parse_and_store_default(&json_string("hello"))
2668 .unwrap();
2669 assert_eq!(l, AvroLiteral::String("hello".into()));
2670 assert_default_stored(&dt_str, &json_string("hello"));
2671 let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
2672 let l = dt_strv
2673 .parse_and_store_default(&json_string("view"))
2674 .unwrap();
2675 assert_eq!(l, AvroLiteral::String("view".into()));
2676 assert_default_stored(&dt_strv, &json_string("view"));
2677 let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
2678 let l = dt_uuid
2679 .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
2680 .unwrap();
2681 assert_eq!(
2682 l,
2683 AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
2684 );
2685 let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
2686 let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
2687 assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
2688 let err = dt_bin
2689 .parse_and_store_default(&json_string("€")) .unwrap_err();
2691 assert!(format!("{err}").contains("Invalid codepoint"));
2692 let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
2693 let ld = dt_date
2694 .parse_and_store_default(&serde_json::json!(1))
2695 .unwrap();
2696 assert_eq!(ld, AvroLiteral::Int(1));
2697 let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
2698 let lt = dt_tmill
2699 .parse_and_store_default(&serde_json::json!(86_400_000))
2700 .unwrap();
2701 assert_eq!(lt, AvroLiteral::Int(86_400_000));
2702 let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
2703 let ltm = dt_tmicros
2704 .parse_and_store_default(&serde_json::json!(1_000_000))
2705 .unwrap();
2706 assert_eq!(ltm, AvroLiteral::Long(1_000_000));
2707 let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
2708 let l1 = dt_ts_milli
2709 .parse_and_store_default(&serde_json::json!(123))
2710 .unwrap();
2711 assert_eq!(l1, AvroLiteral::Long(123));
2712 let mut dt_ts_micro =
2713 AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
2714 let l2 = dt_ts_micro
2715 .parse_and_store_default(&serde_json::json!(456))
2716 .unwrap();
2717 assert_eq!(l2, AvroLiteral::Long(456));
2718 }
2719
2720 #[test]
2721 fn test_validate_and_store_default_fixed_decimal_interval() {
2722 let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
2723 let l = dt_fixed
2724 .parse_and_store_default(&json_string("WXYZ"))
2725 .unwrap();
2726 assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
2727 let err = dt_fixed
2728 .parse_and_store_default(&json_string("TOO LONG"))
2729 .unwrap_err();
2730 assert!(err.to_string().contains("Default length"));
2731 let mut dt_dec_fixed =
2732 AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
2733 let l = dt_dec_fixed
2734 .parse_and_store_default(&json_string("abc"))
2735 .unwrap();
2736 assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
2737 let err = dt_dec_fixed
2738 .parse_and_store_default(&json_string("toolong"))
2739 .unwrap_err();
2740 assert!(err.to_string().contains("Default length"));
2741 let mut dt_dec_bytes =
2742 AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2743 let l = dt_dec_bytes
2744 .parse_and_store_default(&json_string("freeform"))
2745 .unwrap();
2746 assert_eq!(
2747 l,
2748 AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
2749 );
2750 let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
2751 let l = dt_interval
2752 .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
2753 .unwrap();
2754 assert_eq!(
2755 l,
2756 AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
2757 );
2758 let err = dt_interval
2759 .parse_and_store_default(&json_string("short"))
2760 .unwrap_err();
2761 assert!(err.to_string().contains("Default length"));
2762 }
2763
2764 #[test]
2765 fn test_validate_and_store_default_enum_list_map_struct() {
2766 let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
2767 .into_iter()
2768 .collect();
2769 let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
2770 let l = dt_enum
2771 .parse_and_store_default(&json_string("GREEN"))
2772 .unwrap();
2773 assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
2774 let err = dt_enum
2775 .parse_and_store_default(&json_string("YELLOW"))
2776 .unwrap_err();
2777 assert!(err.to_string().contains("Default enum symbol"));
2778 let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2779 let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
2780 let val = serde_json::json!([1, 2, 3]);
2781 let l = dt_list.parse_and_store_default(&val).unwrap();
2782 assert_eq!(
2783 l,
2784 AvroLiteral::Array(vec![
2785 AvroLiteral::Long(1),
2786 AvroLiteral::Long(2),
2787 AvroLiteral::Long(3)
2788 ])
2789 );
2790 let err = dt_list
2791 .parse_and_store_default(&serde_json::json!({"not":"array"}))
2792 .unwrap_err();
2793 assert!(err.to_string().contains("JSON array"));
2794 let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2795 let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
2796 let mv = serde_json::json!({"x": 1.5, "y": 2.5});
2797 let l = dt_map.parse_and_store_default(&mv).unwrap();
2798 let mut expected = IndexMap::new();
2799 expected.insert("x".into(), AvroLiteral::Double(1.5));
2800 expected.insert("y".into(), AvroLiteral::Double(2.5));
2801 assert_eq!(l, AvroLiteral::Map(expected));
2802 let err = dt_map
2804 .parse_and_store_default(&serde_json::json!(123))
2805 .unwrap_err();
2806 assert!(err.to_string().contains("JSON object"));
2807 let mut field_a = AvroField {
2808 name: "a".into(),
2809 data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
2810 };
2811 let field_b = AvroField {
2812 name: "b".into(),
2813 data_type: AvroDataType::new(
2814 Codec::Int64,
2815 HashMap::new(),
2816 Some(Nullability::NullFirst),
2817 ),
2818 };
2819 let mut c_md = HashMap::new();
2820 c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
2821 let field_c = AvroField {
2822 name: "c".into(),
2823 data_type: AvroDataType::new(Codec::Utf8, c_md, None),
2824 };
2825 field_a.data_type.metadata.insert("doc".into(), "na".into());
2826 let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
2827 let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
2828 let default_obj = serde_json::json!({"a": 7});
2829 let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
2830 let mut expected = IndexMap::new();
2831 expected.insert("a".into(), AvroLiteral::Int(7));
2832 expected.insert("b".into(), AvroLiteral::Null);
2833 expected.insert("c".into(), AvroLiteral::String("xyz".into()));
2834 assert_eq!(l, AvroLiteral::Map(expected));
2835 assert_default_stored(&dt_struct, &default_obj);
2836 let req_field = AvroField {
2837 name: "req".into(),
2838 data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
2839 };
2840 let mut dt_bad = AvroDataType::new(
2841 Codec::Struct(Arc::from(vec![req_field])),
2842 HashMap::new(),
2843 None,
2844 );
2845 let err = dt_bad
2846 .parse_and_store_default(&serde_json::json!({}))
2847 .unwrap_err();
2848 assert!(
2849 err.to_string().contains("missing required subfield 'req'"),
2850 "unexpected error: {err}"
2851 );
2852 let err = dt_struct
2853 .parse_and_store_default(&serde_json::json!(10))
2854 .unwrap_err();
2855 err.to_string().contains("must be a JSON object");
2856 }
2857
2858 #[test]
2859 fn test_resolve_array_promotion_and_reader_metadata() {
2860 let mut w_add: HashMap<&str, Value> = HashMap::new();
2861 w_add.insert("who", json_string("writer"));
2862 let mut r_add: HashMap<&str, Value> = HashMap::new();
2863 r_add.insert("who", json_string("reader"));
2864 let writer_schema = Schema::Complex(ComplexType::Array(Array {
2865 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2866 attributes: Attributes {
2867 logical_type: None,
2868 additional: w_add,
2869 },
2870 }));
2871 let reader_schema = Schema::Complex(ComplexType::Array(Array {
2872 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
2873 attributes: Attributes {
2874 logical_type: None,
2875 additional: r_add,
2876 },
2877 }));
2878 let mut maker = Maker::new(false, false);
2879 let dt = maker
2880 .make_data_type(&writer_schema, Some(&reader_schema), None)
2881 .unwrap();
2882 assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
2883 if let Codec::List(inner) = dt.codec() {
2884 assert!(matches!(inner.codec(), Codec::Int64));
2885 assert_eq!(
2886 inner.resolution,
2887 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2888 );
2889 } else {
2890 panic!("expected list codec");
2891 }
2892 }
2893
2894 #[test]
2895 fn test_resolve_fixed_success_name_and_size_match_and_alias() {
2896 let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
2897 name: "MD5",
2898 namespace: None,
2899 aliases: vec!["Hash16"],
2900 size: 16,
2901 attributes: Attributes::default(),
2902 }));
2903 let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
2904 name: "Hash16",
2905 namespace: None,
2906 aliases: vec![],
2907 size: 16,
2908 attributes: Attributes::default(),
2909 }));
2910 let mut maker = Maker::new(false, false);
2911 let dt = maker
2912 .make_data_type(&writer_schema, Some(&reader_schema), None)
2913 .unwrap();
2914 assert!(matches!(dt.codec(), Codec::Fixed(16)));
2915 }
2916
2917 #[test]
2918 fn test_resolve_records_mapping_default_fields_and_skip_fields() {
2919 let writer = Schema::Complex(ComplexType::Record(Record {
2920 name: "R",
2921 namespace: None,
2922 doc: None,
2923 aliases: vec![],
2924 fields: vec![
2925 crate::schema::Field {
2926 name: "a",
2927 doc: None,
2928 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2929 default: None,
2930 aliases: vec![],
2931 },
2932 crate::schema::Field {
2933 name: "skipme",
2934 doc: None,
2935 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2936 default: None,
2937 aliases: vec![],
2938 },
2939 crate::schema::Field {
2940 name: "b",
2941 doc: None,
2942 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2943 default: None,
2944 aliases: vec![],
2945 },
2946 ],
2947 attributes: Attributes::default(),
2948 }));
2949 let reader = Schema::Complex(ComplexType::Record(Record {
2950 name: "R",
2951 namespace: None,
2952 doc: None,
2953 aliases: vec![],
2954 fields: vec![
2955 crate::schema::Field {
2956 name: "b",
2957 doc: None,
2958 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2959 default: None,
2960 aliases: vec![],
2961 },
2962 crate::schema::Field {
2963 name: "a",
2964 doc: None,
2965 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2966 default: None,
2967 aliases: vec![],
2968 },
2969 crate::schema::Field {
2970 name: "name",
2971 doc: None,
2972 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2973 default: Some(json_string("anon")),
2974 aliases: vec![],
2975 },
2976 crate::schema::Field {
2977 name: "opt",
2978 doc: None,
2979 r#type: Schema::Union(vec![
2980 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2981 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2982 ]),
2983 default: None, aliases: vec![],
2985 },
2986 ],
2987 attributes: Attributes::default(),
2988 }));
2989 let mut maker = Maker::new(false, false);
2990 let dt = maker
2991 .make_data_type(&writer, Some(&reader), None)
2992 .expect("record resolution");
2993 let fields = match dt.codec() {
2994 Codec::Struct(f) => f,
2995 other => panic!("expected struct, got {other:?}"),
2996 };
2997 assert_eq!(fields.len(), 4);
2998 assert_eq!(fields[0].name(), "b");
2999 assert_eq!(fields[1].name(), "a");
3000 assert_eq!(fields[2].name(), "name");
3001 assert_eq!(fields[3].name(), "opt");
3002 assert!(matches!(
3003 fields[1].data_type().resolution,
3004 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3005 ));
3006 let rec = match dt.resolution {
3007 Some(ResolutionInfo::Record(ref r)) => r.clone(),
3008 other => panic!("expected record resolution, got {other:?}"),
3009 };
3010 assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
3011 assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3012 assert!(rec.skip_fields[0].is_none());
3013 assert!(rec.skip_fields[2].is_none());
3014 let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
3015 assert!(matches!(skip1.codec(), Codec::Utf8));
3016 let name_md = &fields[2].data_type().metadata;
3017 assert_eq!(
3018 name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3019 Some(&"\"anon\"".to_string())
3020 );
3021 let opt_md = &fields[3].data_type().metadata;
3022 assert_eq!(
3023 opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3024 Some(&"null".to_string())
3025 );
3026 }
3027
3028 #[test]
3029 fn test_named_type_alias_resolution_record_cross_namespace() {
3030 let writer_record = Record {
3031 name: "PersonV2",
3032 namespace: Some("com.example.v2"),
3033 doc: None,
3034 aliases: vec!["com.example.Person"],
3035 fields: vec![
3036 AvroFieldSchema {
3037 name: "name",
3038 doc: None,
3039 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3040 default: None,
3041 aliases: vec![],
3042 },
3043 AvroFieldSchema {
3044 name: "age",
3045 doc: None,
3046 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3047 default: None,
3048 aliases: vec![],
3049 },
3050 ],
3051 attributes: Attributes::default(),
3052 };
3053 let reader_record = Record {
3054 name: "Person",
3055 namespace: Some("com.example"),
3056 doc: None,
3057 aliases: vec![],
3058 fields: writer_record.fields.clone(),
3059 attributes: Attributes::default(),
3060 };
3061 let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3062 let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3063 let mut maker = Maker::new(false, false);
3064 let result = maker
3065 .make_data_type(&writer_schema, Some(&reader_schema), None)
3066 .expect("record alias resolution should succeed");
3067 match result.codec {
3068 Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3069 other => panic!("expected struct, got {other:?}"),
3070 }
3071 }
3072
3073 #[test]
3074 fn test_named_type_alias_resolution_enum_cross_namespace() {
3075 let writer_enum = Enum {
3076 name: "ColorV2",
3077 namespace: Some("org.example.v2"),
3078 doc: None,
3079 aliases: vec!["org.example.Color"],
3080 symbols: vec!["RED", "GREEN", "BLUE"],
3081 default: None,
3082 attributes: Attributes::default(),
3083 };
3084 let reader_enum = Enum {
3085 name: "Color",
3086 namespace: Some("org.example"),
3087 doc: None,
3088 aliases: vec![],
3089 symbols: vec!["RED", "GREEN", "BLUE"],
3090 default: None,
3091 attributes: Attributes::default(),
3092 };
3093 let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3094 let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3095 let mut maker = Maker::new(false, false);
3096 maker
3097 .make_data_type(&writer_schema, Some(&reader_schema), None)
3098 .expect("enum alias resolution should succeed");
3099 }
3100
3101 #[test]
3102 fn test_named_type_alias_resolution_fixed_cross_namespace() {
3103 let writer_fixed = Fixed {
3104 name: "Fx10V2",
3105 namespace: Some("ns.v2"),
3106 aliases: vec!["ns.Fx10"],
3107 size: 10,
3108 attributes: Attributes::default(),
3109 };
3110 let reader_fixed = Fixed {
3111 name: "Fx10",
3112 namespace: Some("ns"),
3113 aliases: vec![],
3114 size: 10,
3115 attributes: Attributes::default(),
3116 };
3117 let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3118 let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3119 let mut maker = Maker::new(false, false);
3120 maker
3121 .make_data_type(&writer_schema, Some(&reader_schema), None)
3122 .expect("fixed alias resolution should succeed");
3123 }
3124}