1use crate::schema::{
21 AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22 AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23 PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26 ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27 IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43 Promotion(Promotion),
45 DefaultValue(AvroLiteral),
47 EnumMapping(EnumMapping),
49 Record(ResolvedRecord),
51 Union(ResolvedUnion),
53}
54
55#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60 Null,
62 Boolean(bool),
64 Int(i32),
66 Long(i64),
68 Float(f32),
70 Double(f64),
72 Bytes(Vec<u8>),
74 String(String),
76 Enum(String),
78 Array(Vec<AvroLiteral>),
80 Map(IndexMap<String, AvroLiteral>),
82}
83
84#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87 pub(crate) writer_to_reader: Arc<[Option<usize>]>,
90 pub(crate) default_fields: Arc<[usize]>,
92 pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub(crate) enum Promotion {
103 Direct,
105 IntToLong,
107 IntToFloat,
109 IntToDouble,
111 LongToFloat,
113 LongToDouble,
115 FloatToDouble,
117 StringToBytes,
119 BytesToString,
121}
122
123impl Display for Promotion {
124 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
125 match self {
126 Self::Direct => write!(formatter, "Direct"),
127 Self::IntToLong => write!(formatter, "Int->Long"),
128 Self::IntToFloat => write!(formatter, "Int->Float"),
129 Self::IntToDouble => write!(formatter, "Int->Double"),
130 Self::LongToFloat => write!(formatter, "Long->Float"),
131 Self::LongToDouble => write!(formatter, "Long->Double"),
132 Self::FloatToDouble => write!(formatter, "Float->Double"),
133 Self::StringToBytes => write!(formatter, "String->Bytes"),
134 Self::BytesToString => write!(formatter, "Bytes->String"),
135 }
136 }
137}
138
139#[derive(Debug, Clone, PartialEq)]
141pub(crate) struct ResolvedUnion {
142 pub(crate) writer_to_reader: Arc<[Option<(usize, Promotion)>]>,
145 pub(crate) writer_is_union: bool,
147 pub(crate) reader_is_union: bool,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
155pub(crate) struct EnumMapping {
156 pub(crate) mapping: Arc<[i32]>,
158 pub(crate) default_index: i32,
161}
162
163#[cfg(feature = "canonical_extension_types")]
164fn with_extension_type(codec: &Codec, field: Field) -> Field {
165 match codec {
166 Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
167 _ => field,
168 }
169}
170
171#[derive(Debug, Clone, PartialEq)]
173pub(crate) struct AvroDataType {
174 nullability: Option<Nullability>,
175 metadata: HashMap<String, String>,
176 codec: Codec,
177 pub(crate) resolution: Option<ResolutionInfo>,
178}
179
180impl AvroDataType {
181 pub(crate) fn new(
183 codec: Codec,
184 metadata: HashMap<String, String>,
185 nullability: Option<Nullability>,
186 ) -> Self {
187 AvroDataType {
188 codec,
189 metadata,
190 nullability,
191 resolution: None,
192 }
193 }
194
195 #[inline]
196 fn new_with_resolution(
197 codec: Codec,
198 metadata: HashMap<String, String>,
199 nullability: Option<Nullability>,
200 resolution: Option<ResolutionInfo>,
201 ) -> Self {
202 Self {
203 codec,
204 metadata,
205 nullability,
206 resolution,
207 }
208 }
209
210 pub(crate) fn field_with_name(&self, name: &str) -> Field {
212 let mut nullable = self.nullability.is_some();
213 if !nullable {
214 if let Codec::Union(children, _, _) = self.codec() {
215 if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
217 nullable = true;
218 }
219 }
220 }
221 let data_type = self.codec.data_type();
222 let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
223 #[cfg(feature = "canonical_extension_types")]
224 return with_extension_type(&self.codec, field);
225 #[cfg(not(feature = "canonical_extension_types"))]
226 field
227 }
228
229 pub(crate) fn codec(&self) -> &Codec {
234 &self.codec
235 }
236
237 pub(crate) fn nullability(&self) -> Option<Nullability> {
245 self.nullability
246 }
247
248 #[inline]
249 fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
250 fn expect_string<'v>(
251 default_json: &'v Value,
252 data_type: &str,
253 ) -> Result<&'v str, ArrowError> {
254 match default_json {
255 Value::String(s) => Ok(s.as_str()),
256 _ => Err(ArrowError::SchemaError(format!(
257 "Default value must be a JSON string for {data_type}"
258 ))),
259 }
260 }
261
262 fn parse_bytes_default(
263 default_json: &Value,
264 expected_len: Option<usize>,
265 ) -> Result<Vec<u8>, ArrowError> {
266 let s = expect_string(default_json, "bytes/fixed logical types")?;
267 let mut out = Vec::with_capacity(s.len());
268 for ch in s.chars() {
269 let cp = ch as u32;
270 if cp > 0xFF {
271 return Err(ArrowError::SchemaError(format!(
272 "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
273 )));
274 }
275 out.push(cp as u8);
276 }
277 if let Some(len) = expected_len {
278 if out.len() != len {
279 return Err(ArrowError::SchemaError(format!(
280 "Default length {} does not match expected fixed size {len}",
281 out.len(),
282 )));
283 }
284 }
285 Ok(out)
286 }
287
288 fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
289 match default_json {
290 Value::Number(n) => n.as_i64().ok_or_else(|| {
291 ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
292 }),
293 _ => Err(ArrowError::SchemaError(format!(
294 "Default {data_type} must be a JSON integer"
295 ))),
296 }
297 }
298
299 fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
300 match default_json {
301 Value::Number(n) => n.as_f64().ok_or_else(|| {
302 ArrowError::SchemaError(format!("Default {data_type} must be a number"))
303 }),
304 _ => Err(ArrowError::SchemaError(format!(
305 "Default {data_type} must be a JSON number"
306 ))),
307 }
308 }
309
310 if default_json.is_null() {
312 return match self.codec() {
313 Codec::Null => Ok(AvroLiteral::Null),
314 Codec::Union(encodings, _, _) if !encodings.is_empty()
315 && matches!(encodings[0].codec(), Codec::Null) =>
316 {
317 Ok(AvroLiteral::Null)
318 }
319 _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
320 _ => Err(ArrowError::SchemaError(
321 "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
322 .to_string(),
323 )),
324 };
325 }
326 let lit = match self.codec() {
327 Codec::Null => {
328 return Err(ArrowError::SchemaError(
329 "Default for `null` type must be JSON null".to_string(),
330 ));
331 }
332 Codec::Boolean => match default_json {
333 Value::Bool(b) => AvroLiteral::Boolean(*b),
334 _ => {
335 return Err(ArrowError::SchemaError(
336 "Boolean default must be a JSON boolean".to_string(),
337 ));
338 }
339 },
340 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
341 let i = parse_json_i64(default_json, "int")?;
342 if i < i32::MIN as i64 || i > i32::MAX as i64 {
343 return Err(ArrowError::SchemaError(format!(
344 "Default int {i} out of i32 range"
345 )));
346 }
347 AvroLiteral::Int(i as i32)
348 }
349 Codec::Int64
350 | Codec::TimeMicros
351 | Codec::TimestampMillis(_)
352 | Codec::TimestampMicros(_)
353 | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
354 #[cfg(feature = "avro_custom_types")]
355 Codec::DurationNanos
356 | Codec::DurationMicros
357 | Codec::DurationMillis
358 | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
359 Codec::Float32 => {
360 let f = parse_json_f64(default_json, "float")?;
361 if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
362 return Err(ArrowError::SchemaError(format!(
363 "Default float {f} out of f32 range or not finite"
364 )));
365 }
366 AvroLiteral::Float(f as f32)
367 }
368 Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
369 Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
370 AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
371 }
372 Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
373 Codec::Fixed(sz) => {
374 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
375 }
376 Codec::Decimal(_, _, fixed_size) => {
377 AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
378 }
379 Codec::Enum(symbols) => {
380 let s = expect_string(default_json, "enum")?;
381 if symbols.iter().any(|sym| sym == s) {
382 AvroLiteral::Enum(s.to_string())
383 } else {
384 return Err(ArrowError::SchemaError(format!(
385 "Default enum symbol {s:?} not found in reader enum symbols"
386 )));
387 }
388 }
389 Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
390 Codec::List(item_dt) => match default_json {
391 Value::Array(items) => AvroLiteral::Array(
392 items
393 .iter()
394 .map(|v| item_dt.parse_default_literal(v))
395 .collect::<Result<_, _>>()?,
396 ),
397 _ => {
398 return Err(ArrowError::SchemaError(
399 "Default value must be a JSON array for Avro array type".to_string(),
400 ));
401 }
402 },
403 Codec::Map(val_dt) => match default_json {
404 Value::Object(map) => {
405 let mut out = IndexMap::with_capacity(map.len());
406 for (k, v) in map {
407 out.insert(k.clone(), val_dt.parse_default_literal(v)?);
408 }
409 AvroLiteral::Map(out)
410 }
411 _ => {
412 return Err(ArrowError::SchemaError(
413 "Default value must be a JSON object for Avro map type".to_string(),
414 ));
415 }
416 },
417 Codec::Struct(fields) => match default_json {
418 Value::Object(obj) => {
419 let mut out: IndexMap<String, AvroLiteral> =
420 IndexMap::with_capacity(fields.len());
421 for f in fields.as_ref() {
422 let name = f.name().to_string();
423 if let Some(sub) = obj.get(&name) {
424 out.insert(name, f.data_type().parse_default_literal(sub)?);
425 } else {
426 let stored_default =
428 f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
429 if stored_default.is_none()
430 && f.data_type().nullability() == Some(Nullability::default())
431 {
432 out.insert(name, AvroLiteral::Null);
433 } else if let Some(default_json) = stored_default {
434 let v: Value =
435 serde_json::from_str(default_json).map_err(|e| {
436 ArrowError::SchemaError(format!(
437 "Failed to parse stored subfield default JSON for '{}': {e}",
438 f.name(),
439 ))
440 })?;
441 out.insert(name, f.data_type().parse_default_literal(&v)?);
442 } else {
443 return Err(ArrowError::SchemaError(format!(
444 "Record default missing required subfield '{}' with non-nullable type {:?}",
445 f.name(),
446 f.data_type().codec()
447 )));
448 }
449 }
450 }
451 AvroLiteral::Map(out)
452 }
453 _ => {
454 return Err(ArrowError::SchemaError(
455 "Default value for record/struct must be a JSON object".to_string(),
456 ));
457 }
458 },
459 Codec::Union(encodings, _, _) => {
460 let Some(default_encoding) = encodings.first() else {
461 return Err(ArrowError::SchemaError(
462 "Union with no branches cannot have a default".to_string(),
463 ));
464 };
465 default_encoding.parse_default_literal(default_json)?
466 }
467 #[cfg(feature = "avro_custom_types")]
468 Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
469 };
470 Ok(lit)
471 }
472
473 fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
474 let json_text = serde_json::to_string(default_json).map_err(|e| {
475 ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
476 })?;
477 self.metadata
478 .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
479 Ok(())
480 }
481
482 fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
483 let lit = self.parse_default_literal(default_json)?;
484 self.store_default(default_json)?;
485 Ok(lit)
486 }
487}
488
489#[derive(Debug, Clone, PartialEq)]
491pub(crate) struct AvroField {
492 name: String,
493 data_type: AvroDataType,
494}
495
496impl AvroField {
497 pub(crate) fn field(&self) -> Field {
499 self.data_type.field_with_name(&self.name)
500 }
501
502 pub(crate) fn data_type(&self) -> &AvroDataType {
504 &self.data_type
505 }
506
507 pub(crate) fn with_utf8view(&self) -> Self {
516 let mut field = self.clone();
517 if let Codec::Utf8 = field.data_type.codec {
518 field.data_type.codec = Codec::Utf8View;
519 }
520 field
521 }
522
523 pub(crate) fn name(&self) -> &str {
528 &self.name
529 }
530}
531
532impl<'a> TryFrom<&Schema<'a>> for AvroField {
533 type Error = ArrowError;
534
535 fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
536 match schema {
537 Schema::Complex(ComplexType::Record(r)) => {
538 let mut resolver = Maker::new(false, false);
539 let data_type = resolver.make_data_type(schema, None, None)?;
540 Ok(AvroField {
541 data_type,
542 name: r.name.to_string(),
543 })
544 }
545 _ => Err(ArrowError::ParseError(format!(
546 "Expected record got {schema:?}"
547 ))),
548 }
549 }
550}
551
552#[derive(Debug)]
554pub(crate) struct AvroFieldBuilder<'a> {
555 writer_schema: &'a Schema<'a>,
556 reader_schema: Option<&'a Schema<'a>>,
557 use_utf8view: bool,
558 strict_mode: bool,
559}
560
561impl<'a> AvroFieldBuilder<'a> {
562 pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
564 Self {
565 writer_schema,
566 reader_schema: None,
567 use_utf8view: false,
568 strict_mode: false,
569 }
570 }
571
572 #[inline]
577 pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
578 self.reader_schema = Some(reader_schema);
579 self
580 }
581
582 pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
584 self.use_utf8view = use_utf8view;
585 self
586 }
587
588 pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
590 self.strict_mode = strict_mode;
591 self
592 }
593
594 pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
596 match self.writer_schema {
597 Schema::Complex(ComplexType::Record(r)) => {
598 let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
599 let data_type =
600 resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
601 Ok(AvroField {
602 name: r.name.to_string(),
603 data_type,
604 })
605 }
606 _ => Err(ArrowError::ParseError(format!(
607 "Expected a Record schema to build an AvroField, but got {:?}",
608 self.writer_schema
609 ))),
610 }
611 }
612}
613
614#[derive(Debug, Clone, PartialEq)]
618pub(crate) enum Codec {
619 Null,
621 Boolean,
623 Int32,
625 Int64,
627 Float32,
629 Float64,
631 Binary,
633 Utf8,
635 Utf8View,
640 Date32,
642 TimeMillis,
644 TimeMicros,
646 TimestampMillis(bool),
651 TimestampMicros(bool),
656 TimestampNanos(bool),
661 Fixed(i32),
664 Decimal(usize, Option<usize>, Option<usize>),
671 Uuid,
673 Enum(Arc<[String]>),
677 List(Arc<AvroDataType>),
679 Struct(Arc<[AvroField]>),
681 Map(Arc<AvroDataType>),
683 Interval,
685 Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
687 #[cfg(feature = "avro_custom_types")]
689 DurationNanos,
690 #[cfg(feature = "avro_custom_types")]
692 DurationMicros,
693 #[cfg(feature = "avro_custom_types")]
695 DurationMillis,
696 #[cfg(feature = "avro_custom_types")]
698 DurationSeconds,
699 #[cfg(feature = "avro_custom_types")]
700 RunEndEncoded(Arc<AvroDataType>, u8),
701}
702
703impl Codec {
704 fn data_type(&self) -> DataType {
705 match self {
706 Self::Null => DataType::Null,
707 Self::Boolean => DataType::Boolean,
708 Self::Int32 => DataType::Int32,
709 Self::Int64 => DataType::Int64,
710 Self::Float32 => DataType::Float32,
711 Self::Float64 => DataType::Float64,
712 Self::Binary => DataType::Binary,
713 Self::Utf8 => DataType::Utf8,
714 Self::Utf8View => DataType::Utf8View,
715 Self::Date32 => DataType::Date32,
716 Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
717 Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
718 Self::TimestampMillis(is_utc) => {
719 DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
720 }
721 Self::TimestampMicros(is_utc) => {
722 DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
723 }
724 Self::TimestampNanos(is_utc) => {
725 DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
726 }
727 Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
728 Self::Fixed(size) => DataType::FixedSizeBinary(*size),
729 Self::Decimal(precision, scale, _size) => {
730 let p = *precision as u8;
731 let s = scale.unwrap_or(0) as i8;
732 #[cfg(feature = "small_decimals")]
733 {
734 if *precision <= DECIMAL32_MAX_PRECISION as usize {
735 DataType::Decimal32(p, s)
736 } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
737 DataType::Decimal64(p, s)
738 } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
739 DataType::Decimal128(p, s)
740 } else {
741 DataType::Decimal256(p, s)
742 }
743 }
744 #[cfg(not(feature = "small_decimals"))]
745 {
746 if *precision <= DECIMAL128_MAX_PRECISION as usize {
747 DataType::Decimal128(p, s)
748 } else {
749 DataType::Decimal256(p, s)
750 }
751 }
752 }
753 Self::Uuid => DataType::FixedSizeBinary(16),
754 Self::Enum(_) => {
755 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
756 }
757 Self::List(f) => {
758 DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
759 }
760 Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
761 Self::Map(value_type) => {
762 let val_field = value_type.field_with_name("value");
763 DataType::Map(
764 Arc::new(Field::new(
765 "entries",
766 DataType::Struct(Fields::from(vec![
767 Field::new("key", DataType::Utf8, false),
768 val_field,
769 ])),
770 false,
771 )),
772 false,
773 )
774 }
775 Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
776 #[cfg(feature = "avro_custom_types")]
777 Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
778 #[cfg(feature = "avro_custom_types")]
779 Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
780 #[cfg(feature = "avro_custom_types")]
781 Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
782 #[cfg(feature = "avro_custom_types")]
783 Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
784 #[cfg(feature = "avro_custom_types")]
785 Self::RunEndEncoded(values, bits) => {
786 let run_ends_dt = match *bits {
787 16 => DataType::Int16,
788 32 => DataType::Int32,
789 64 => DataType::Int64,
790 _ => unreachable!(),
791 };
792 DataType::RunEndEncoded(
793 Arc::new(Field::new("run_ends", run_ends_dt, false)),
794 Arc::new(Field::new("values", values.codec().data_type(), true)),
795 )
796 }
797 }
798 }
799
800 pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
806 if use_utf8view && matches!(self, Self::Utf8) {
807 Self::Utf8View
808 } else {
809 self
810 }
811 }
812
813 #[inline]
814 fn union_field_name(&self) -> String {
815 UnionFieldKind::from(self).as_ref().to_owned()
816 }
817}
818
819impl From<PrimitiveType> for Codec {
820 fn from(value: PrimitiveType) -> Self {
821 match value {
822 PrimitiveType::Null => Self::Null,
823 PrimitiveType::Boolean => Self::Boolean,
824 PrimitiveType::Int => Self::Int32,
825 PrimitiveType::Long => Self::Int64,
826 PrimitiveType::Float => Self::Float32,
827 PrimitiveType::Double => Self::Float64,
828 PrimitiveType::Bytes => Self::Binary,
829 PrimitiveType::String => Self::Utf8,
830 }
831 }
832}
833
834const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
843 const MAX_P: [usize; 32] = [
848 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
849 59, 62, 64, 67, 69, 71, 74, 76,
850 ];
851 match n {
852 1..=32 => Some(MAX_P[n - 1]),
853 _ => None,
854 }
855}
856
857fn parse_decimal_attributes(
858 attributes: &Attributes,
859 fallback_size: Option<usize>,
860 precision_required: bool,
861) -> Result<(usize, usize, Option<usize>), ArrowError> {
862 let precision = attributes
863 .additional
864 .get("precision")
865 .and_then(|v| v.as_u64())
866 .or(if precision_required { None } else { Some(10) })
867 .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
868 as usize;
869 let scale = attributes
870 .additional
871 .get("scale")
872 .and_then(|v| v.as_u64())
873 .unwrap_or(0) as usize;
874 let size = attributes
875 .additional
876 .get("size")
877 .and_then(|v| v.as_u64())
878 .map(|s| s as usize)
879 .or(fallback_size);
880 if precision == 0 {
881 return Err(ArrowError::ParseError(
882 "Decimal requires precision > 0".to_string(),
883 ));
884 }
885 if scale > precision {
886 return Err(ArrowError::ParseError(format!(
887 "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
888 )));
889 }
890 if precision > DECIMAL256_MAX_PRECISION as usize {
891 return Err(ArrowError::ParseError(format!(
892 "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
893 DECIMAL256_MAX_PRECISION
894 )));
895 }
896 if let Some(sz) = size {
897 let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
898 ArrowError::ParseError(format!(
899 "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
900 ))
901 })?;
902 if precision > max_p {
903 return Err(ArrowError::ParseError(format!(
904 "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
905 )));
906 }
907 }
908 Ok((precision, scale, size))
909}
910
911#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
912#[strum(serialize_all = "snake_case")]
913enum UnionFieldKind {
914 Null,
915 Boolean,
916 Int,
917 Long,
918 Float,
919 Double,
920 Bytes,
921 String,
922 Date,
923 TimeMillis,
924 TimeMicros,
925 TimestampMillisUtc,
926 TimestampMillisLocal,
927 TimestampMicrosUtc,
928 TimestampMicrosLocal,
929 TimestampNanosUtc,
930 TimestampNanosLocal,
931 Duration,
932 Fixed,
933 Decimal,
934 Enum,
935 Array,
936 Record,
937 Map,
938 Uuid,
939 Union,
940}
941
942impl From<&Codec> for UnionFieldKind {
943 fn from(c: &Codec) -> Self {
944 match c {
945 Codec::Null => Self::Null,
946 Codec::Boolean => Self::Boolean,
947 Codec::Int32 => Self::Int,
948 Codec::Int64 => Self::Long,
949 Codec::Float32 => Self::Float,
950 Codec::Float64 => Self::Double,
951 Codec::Binary => Self::Bytes,
952 Codec::Utf8 | Codec::Utf8View => Self::String,
953 Codec::Date32 => Self::Date,
954 Codec::TimeMillis => Self::TimeMillis,
955 Codec::TimeMicros => Self::TimeMicros,
956 Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
957 Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
958 Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
959 Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
960 Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
961 Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
962 Codec::Interval => Self::Duration,
963 Codec::Fixed(_) => Self::Fixed,
964 Codec::Decimal(..) => Self::Decimal,
965 Codec::Enum(_) => Self::Enum,
966 Codec::List(_) => Self::Array,
967 Codec::Struct(_) => Self::Record,
968 Codec::Map(_) => Self::Map,
969 Codec::Uuid => Self::Uuid,
970 Codec::Union(..) => Self::Union,
971 #[cfg(feature = "avro_custom_types")]
972 Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
973 #[cfg(feature = "avro_custom_types")]
974 Codec::DurationNanos
975 | Codec::DurationMicros
976 | Codec::DurationMillis
977 | Codec::DurationSeconds => Self::Duration,
978 }
979 }
980}
981
982fn union_branch_name(dt: &AvroDataType) -> String {
983 if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
984 if name.contains(".") {
985 return name.to_string();
987 }
988 if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
989 return format!("{ns}.{name}");
990 }
991 return name.to_string();
992 }
993 dt.codec.union_field_name()
994}
995
996fn build_union_fields(encodings: &[AvroDataType]) -> UnionFields {
997 let arrow_fields: Vec<Field> = encodings
998 .iter()
999 .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1000 .collect();
1001 let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1002 UnionFields::new(type_ids, arrow_fields)
1003}
1004
1005#[derive(Debug, Default)]
1009struct Resolver<'a> {
1010 map: HashMap<(&'a str, &'a str), AvroDataType>,
1011}
1012
1013impl<'a> Resolver<'a> {
1014 fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1015 self.map.insert((namespace.unwrap_or(""), name), schema);
1016 }
1017
1018 fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1019 let (namespace, name) = name
1020 .rsplit_once('.')
1021 .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1022 self.map
1023 .get(&(namespace, name))
1024 .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1025 .cloned()
1026 }
1027}
1028
1029fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1030 let mut out = HashSet::with_capacity(1 + aliases.len());
1031 let (full, _) = make_full_name(name, ns, None);
1032 out.insert(full);
1033 for a in aliases {
1034 let (fa, _) = make_full_name(a, None, ns);
1035 out.insert(fa);
1036 }
1037 out
1038}
1039
1040fn names_match(
1041 writer_name: &str,
1042 writer_namespace: Option<&str>,
1043 writer_aliases: &[&str],
1044 reader_name: &str,
1045 reader_namespace: Option<&str>,
1046 reader_aliases: &[&str],
1047) -> bool {
1048 let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1049 let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1050 !writer_set.is_disjoint(&reader_set)
1052}
1053
1054fn ensure_names_match(
1055 data_type: &str,
1056 writer_name: &str,
1057 writer_namespace: Option<&str>,
1058 writer_aliases: &[&str],
1059 reader_name: &str,
1060 reader_namespace: Option<&str>,
1061 reader_aliases: &[&str],
1062) -> Result<(), ArrowError> {
1063 if names_match(
1064 writer_name,
1065 writer_namespace,
1066 writer_aliases,
1067 reader_name,
1068 reader_namespace,
1069 reader_aliases,
1070 ) {
1071 Ok(())
1072 } else {
1073 Err(ArrowError::ParseError(format!(
1074 "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1075 )))
1076 }
1077}
1078
1079fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1080 match schema {
1081 Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1082 Schema::Type(Type {
1083 r#type: TypeName::Primitive(primitive),
1084 ..
1085 }) => Some(*primitive),
1086 _ => None,
1087 }
1088}
1089
1090fn nullable_union_variants<'x, 'y>(
1091 variant: &'y [Schema<'x>],
1092) -> Option<(Nullability, &'y Schema<'x>)> {
1093 if variant.len() != 2 {
1094 return None;
1095 }
1096 let is_null = |schema: &Schema<'x>| {
1097 matches!(
1098 schema,
1099 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1100 )
1101 };
1102 match (is_null(&variant[0]), is_null(&variant[1])) {
1103 (true, false) => Some((Nullability::NullFirst, &variant[1])),
1104 (false, true) => Some((Nullability::NullSecond, &variant[0])),
1105 _ => None,
1106 }
1107}
1108
1109#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1110enum UnionBranchKey {
1111 Named(String),
1112 Primitive(PrimitiveType),
1113 Array,
1114 Map,
1115}
1116
1117fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1118 let (name, namespace) = match s {
1119 Schema::TypeName(TypeName::Primitive(p))
1120 | Schema::Type(Type {
1121 r#type: TypeName::Primitive(p),
1122 ..
1123 }) => return Some(UnionBranchKey::Primitive(*p)),
1124 Schema::TypeName(TypeName::Ref(name))
1125 | Schema::Type(Type {
1126 r#type: TypeName::Ref(name),
1127 ..
1128 }) => (name, None),
1129 Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1130 Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1131 Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1132 Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1133 Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1134 Schema::Union(_) => return None,
1135 };
1136 let (full, _) = make_full_name(name, namespace, enclosing_ns);
1137 Some(UnionBranchKey::Named(full))
1138}
1139
1140fn union_first_duplicate<'a>(
1141 branches: &'a [Schema<'a>],
1142 enclosing_ns: Option<&'a str>,
1143) -> Option<String> {
1144 let mut seen = HashSet::with_capacity(branches.len());
1145 for schema in branches {
1146 if let Some(key) = branch_key_of(schema, enclosing_ns) {
1147 if !seen.insert(key.clone()) {
1148 let msg = match key {
1149 UnionBranchKey::Named(full) => format!("named type {full}"),
1150 UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1151 UnionBranchKey::Array => "array".to_string(),
1152 UnionBranchKey::Map => "map".to_string(),
1153 };
1154 return Some(msg);
1155 }
1156 }
1157 }
1158 None
1159}
1160
1161struct Maker<'a> {
1165 resolver: Resolver<'a>,
1166 use_utf8view: bool,
1167 strict_mode: bool,
1168}
1169
1170impl<'a> Maker<'a> {
1171 fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1172 Self {
1173 resolver: Default::default(),
1174 use_utf8view,
1175 strict_mode,
1176 }
1177 }
1178
1179 #[cfg(feature = "avro_custom_types")]
1180 #[inline]
1181 fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1182 if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1183 let mut inner = (*values).clone();
1184 inner.nullability = Some(nb);
1185 dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1186 }
1187 }
1188
1189 fn make_data_type<'s>(
1190 &mut self,
1191 writer_schema: &'s Schema<'a>,
1192 reader_schema: Option<&'s Schema<'a>>,
1193 namespace: Option<&'a str>,
1194 ) -> Result<AvroDataType, ArrowError> {
1195 match reader_schema {
1196 Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1197 None => self.parse_type(writer_schema, namespace),
1198 }
1199 }
1200
1201 fn parse_type<'s>(
1214 &mut self,
1215 schema: &'s Schema<'a>,
1216 namespace: Option<&'a str>,
1217 ) -> Result<AvroDataType, ArrowError> {
1218 match schema {
1219 Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1220 Codec::from(*p).with_utf8view(self.use_utf8view),
1221 Default::default(),
1222 None,
1223 )),
1224 Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1225 Schema::Union(f) => {
1226 let null = f
1227 .iter()
1228 .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1229 match (f.len() == 2, null) {
1230 (true, Some(0)) => {
1231 let mut field = self.parse_type(&f[1], namespace)?;
1232 field.nullability = Some(Nullability::NullFirst);
1233 #[cfg(feature = "avro_custom_types")]
1234 Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1235 return Ok(field);
1236 }
1237 (true, Some(1)) => {
1238 if self.strict_mode {
1239 return Err(ArrowError::SchemaError(
1240 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1241 .to_string(),
1242 ));
1243 }
1244 let mut field = self.parse_type(&f[0], namespace)?;
1245 field.nullability = Some(Nullability::NullSecond);
1246 #[cfg(feature = "avro_custom_types")]
1247 Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1248 return Ok(field);
1249 }
1250 _ => {}
1251 }
1252 if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1254 return Err(ArrowError::SchemaError(
1255 "Avro unions may not immediately contain other unions".to_string(),
1256 ));
1257 }
1258 if let Some(dup) = union_first_duplicate(f, namespace) {
1260 return Err(ArrowError::SchemaError(format!(
1261 "Avro union contains duplicate branch type: {dup}"
1262 )));
1263 }
1264 let children: Vec<AvroDataType> = f
1266 .iter()
1267 .map(|s| self.parse_type(s, namespace))
1268 .collect::<Result<_, _>>()?;
1269 let union_fields = build_union_fields(&children);
1271 Ok(AvroDataType::new(
1272 Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1273 Default::default(),
1274 None,
1275 ))
1276 }
1277 Schema::Complex(c) => match c {
1278 ComplexType::Record(r) => {
1279 let namespace = r.namespace.or(namespace);
1280 let mut metadata = r.attributes.field_metadata();
1281 let fields = r
1282 .fields
1283 .iter()
1284 .map(|field| {
1285 Ok(AvroField {
1286 name: field.name.to_string(),
1287 data_type: self.parse_type(&field.r#type, namespace)?,
1288 })
1289 })
1290 .collect::<Result<_, ArrowError>>()?;
1291 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1292 if let Some(ns) = namespace {
1293 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1294 }
1295 let field = AvroDataType {
1296 nullability: None,
1297 codec: Codec::Struct(fields),
1298 metadata,
1299 resolution: None,
1300 };
1301 self.resolver.register(r.name, namespace, field.clone());
1302 Ok(field)
1303 }
1304 ComplexType::Array(a) => {
1305 let field = self.parse_type(a.items.as_ref(), namespace)?;
1306 Ok(AvroDataType {
1307 nullability: None,
1308 metadata: a.attributes.field_metadata(),
1309 codec: Codec::List(Arc::new(field)),
1310 resolution: None,
1311 })
1312 }
1313 ComplexType::Fixed(f) => {
1314 let size = f.size.try_into().map_err(|e| {
1315 ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1316 })?;
1317 let namespace = f.namespace.or(namespace);
1318 let mut metadata = f.attributes.field_metadata();
1319 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1320 if let Some(ns) = namespace {
1321 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1322 }
1323 let field = match f.attributes.logical_type {
1324 Some("decimal") => {
1325 let (precision, scale, _) =
1326 parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1327 AvroDataType {
1328 nullability: None,
1329 metadata,
1330 codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1331 resolution: None,
1332 }
1333 }
1334 Some("duration") => {
1335 if size != 12 {
1336 return Err(ArrowError::ParseError(format!(
1337 "Invalid fixed size for Duration: {size}, must be 12"
1338 )));
1339 };
1340 AvroDataType {
1341 nullability: None,
1342 metadata,
1343 codec: Codec::Interval,
1344 resolution: None,
1345 }
1346 }
1347 _ => AvroDataType {
1348 nullability: None,
1349 metadata,
1350 codec: Codec::Fixed(size),
1351 resolution: None,
1352 },
1353 };
1354 self.resolver.register(f.name, namespace, field.clone());
1355 Ok(field)
1356 }
1357 ComplexType::Enum(e) => {
1358 let namespace = e.namespace.or(namespace);
1359 let symbols = e
1360 .symbols
1361 .iter()
1362 .map(|s| s.to_string())
1363 .collect::<Arc<[String]>>();
1364 let mut metadata = e.attributes.field_metadata();
1365 let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1366 ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1367 })?;
1368 metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1369 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1370 if let Some(ns) = namespace {
1371 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1372 }
1373 let field = AvroDataType {
1374 nullability: None,
1375 metadata,
1376 codec: Codec::Enum(symbols),
1377 resolution: None,
1378 };
1379 self.resolver.register(e.name, namespace, field.clone());
1380 Ok(field)
1381 }
1382 ComplexType::Map(m) => {
1383 let val = self.parse_type(&m.values, namespace)?;
1384 Ok(AvroDataType {
1385 nullability: None,
1386 metadata: m.attributes.field_metadata(),
1387 codec: Codec::Map(Arc::new(val)),
1388 resolution: None,
1389 })
1390 }
1391 },
1392 Schema::Type(t) => {
1393 let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1394 match (t.attributes.logical_type, &mut field.codec) {
1396 (Some("decimal"), c @ Codec::Binary) => {
1397 let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1398 *c = Codec::Decimal(prec, Some(sc), None);
1399 }
1400 (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1401 (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1402 (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1403 (Some("timestamp-millis"), c @ Codec::Int64) => {
1404 *c = Codec::TimestampMillis(true)
1405 }
1406 (Some("timestamp-micros"), c @ Codec::Int64) => {
1407 *c = Codec::TimestampMicros(true)
1408 }
1409 (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1410 *c = Codec::TimestampMillis(false)
1411 }
1412 (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1413 *c = Codec::TimestampMicros(false)
1414 }
1415 (Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
1416 (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1417 *c = Codec::TimestampNanos(false)
1418 }
1419 (Some("uuid"), c @ Codec::Utf8) => {
1420 *c = Codec::Uuid;
1424 field.metadata.insert("logicalType".into(), "uuid".into());
1425 }
1426 #[cfg(feature = "avro_custom_types")]
1427 (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1428 #[cfg(feature = "avro_custom_types")]
1429 (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1430 #[cfg(feature = "avro_custom_types")]
1431 (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1432 #[cfg(feature = "avro_custom_types")]
1433 (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1434 *c = Codec::DurationSeconds
1435 }
1436 #[cfg(feature = "avro_custom_types")]
1437 (Some("arrow.run-end-encoded"), _) => {
1438 let bits_u8: u8 = t
1439 .attributes
1440 .additional
1441 .get("arrow.runEndIndexBits")
1442 .and_then(|v| v.as_u64())
1443 .and_then(|n| u8::try_from(n).ok())
1444 .ok_or_else(|| ArrowError::ParseError(
1445 "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1446 .to_string(),
1447 ))?;
1448 if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1449 return Err(ArrowError::ParseError(format!(
1450 "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1451 )));
1452 }
1453 let values_site = field.clone();
1455 field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1456 }
1457 (Some(logical), _) => {
1458 field.metadata.insert("logicalType".into(), logical.into());
1460 }
1461 (None, _) => {}
1462 }
1463 if matches!(field.codec, Codec::Int64) {
1464 if let Some(unit) = t
1465 .attributes
1466 .additional
1467 .get("arrowTimeUnit")
1468 .and_then(|v| v.as_str())
1469 {
1470 if unit == "nanosecond" {
1471 field.codec = Codec::TimestampNanos(false);
1472 }
1473 }
1474 }
1475 if !t.attributes.additional.is_empty() {
1476 for (k, v) in &t.attributes.additional {
1477 field.metadata.insert(k.to_string(), v.to_string());
1478 }
1479 }
1480 Ok(field)
1481 }
1482 }
1483 }
1484
1485 fn resolve_type<'s>(
1486 &mut self,
1487 writer_schema: &'s Schema<'a>,
1488 reader_schema: &'s Schema<'a>,
1489 namespace: Option<&'a str>,
1490 ) -> Result<AvroDataType, ArrowError> {
1491 if let (Some(write_primitive), Some(read_primitive)) =
1492 (primitive_of(writer_schema), primitive_of(reader_schema))
1493 {
1494 return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1495 }
1496 match (writer_schema, reader_schema) {
1497 (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1498 let writer_variants = writer_variants.as_slice();
1499 let reader_variants = reader_variants.as_slice();
1500 match (
1501 nullable_union_variants(writer_variants),
1502 nullable_union_variants(reader_variants),
1503 ) {
1504 (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
1505 let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?;
1506 dt.nullability = Some(w_nb);
1507 #[cfg(feature = "avro_custom_types")]
1508 Self::propagate_nullability_into_ree(&mut dt, w_nb);
1509 Ok(dt)
1510 }
1511 _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1512 }
1513 }
1514 (Schema::Union(writer_variants), reader_non_union) => {
1515 let writer_to_reader: Vec<Option<(usize, Promotion)>> = writer_variants
1516 .iter()
1517 .map(|writer| {
1518 self.resolve_type(writer, reader_non_union, namespace)
1519 .ok()
1520 .map(|tmp| (0usize, Self::coercion_from(&tmp)))
1521 })
1522 .collect();
1523 let mut dt = self.parse_type(reader_non_union, namespace)?;
1524 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1525 writer_to_reader: Arc::from(writer_to_reader),
1526 writer_is_union: true,
1527 reader_is_union: false,
1528 }));
1529 Ok(dt)
1530 }
1531 (writer_non_union, Schema::Union(reader_variants)) => {
1532 let promo = self.find_best_promotion(
1533 writer_non_union,
1534 reader_variants.as_slice(),
1535 namespace,
1536 );
1537 let Some((reader_index, promotion)) = promo else {
1538 return Err(ArrowError::SchemaError(
1539 "Writer schema does not match any reader union branch".to_string(),
1540 ));
1541 };
1542 let mut dt = self.parse_type(reader_schema, namespace)?;
1543 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1544 writer_to_reader: Arc::from(vec![Some((reader_index, promotion))]),
1545 writer_is_union: false,
1546 reader_is_union: true,
1547 }));
1548 Ok(dt)
1549 }
1550 (
1551 Schema::Complex(ComplexType::Array(writer_array)),
1552 Schema::Complex(ComplexType::Array(reader_array)),
1553 ) => self.resolve_array(writer_array, reader_array, namespace),
1554 (
1555 Schema::Complex(ComplexType::Map(writer_map)),
1556 Schema::Complex(ComplexType::Map(reader_map)),
1557 ) => self.resolve_map(writer_map, reader_map, namespace),
1558 (
1559 Schema::Complex(ComplexType::Fixed(writer_fixed)),
1560 Schema::Complex(ComplexType::Fixed(reader_fixed)),
1561 ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1562 (
1563 Schema::Complex(ComplexType::Record(writer_record)),
1564 Schema::Complex(ComplexType::Record(reader_record)),
1565 ) => self.resolve_records(writer_record, reader_record, namespace),
1566 (
1567 Schema::Complex(ComplexType::Enum(writer_enum)),
1568 Schema::Complex(ComplexType::Enum(reader_enum)),
1569 ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1570 (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1571 (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1572 _ => Err(ArrowError::NotYetImplemented(
1573 "Other resolutions not yet implemented".to_string(),
1574 )),
1575 }
1576 }
1577
1578 #[inline]
1579 fn coercion_from(dt: &AvroDataType) -> Promotion {
1580 match dt.resolution.as_ref() {
1581 Some(ResolutionInfo::Promotion(promotion)) => *promotion,
1582 _ => Promotion::Direct,
1583 }
1584 }
1585
1586 fn find_best_promotion(
1587 &mut self,
1588 writer: &Schema<'a>,
1589 reader_variants: &[Schema<'a>],
1590 namespace: Option<&'a str>,
1591 ) -> Option<(usize, Promotion)> {
1592 let mut first_promotion: Option<(usize, Promotion)> = None;
1593 for (reader_index, reader) in reader_variants.iter().enumerate() {
1594 if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
1595 let promotion = Self::coercion_from(&tmp);
1596 if promotion == Promotion::Direct {
1597 return Some((reader_index, promotion));
1599 } else if first_promotion.is_none() {
1600 first_promotion = Some((reader_index, promotion));
1602 }
1603 }
1604 }
1605 first_promotion
1606 }
1607
1608 fn resolve_unions<'s>(
1609 &mut self,
1610 writer_variants: &'s [Schema<'a>],
1611 reader_variants: &'s [Schema<'a>],
1612 namespace: Option<&'a str>,
1613 ) -> Result<AvroDataType, ArrowError> {
1614 let reader_encodings: Vec<AvroDataType> = reader_variants
1615 .iter()
1616 .map(|reader_schema| self.parse_type(reader_schema, namespace))
1617 .collect::<Result<_, _>>()?;
1618 let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
1619 Vec::with_capacity(writer_variants.len());
1620 for writer in writer_variants {
1621 writer_to_reader.push(self.find_best_promotion(writer, reader_variants, namespace));
1622 }
1623 let union_fields = build_union_fields(&reader_encodings);
1624 let mut dt = AvroDataType::new(
1625 Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1626 Default::default(),
1627 None,
1628 );
1629 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1630 writer_to_reader: Arc::from(writer_to_reader),
1631 writer_is_union: true,
1632 reader_is_union: true,
1633 }));
1634 Ok(dt)
1635 }
1636
1637 fn resolve_array(
1638 &mut self,
1639 writer_array: &Array<'a>,
1640 reader_array: &Array<'a>,
1641 namespace: Option<&'a str>,
1642 ) -> Result<AvroDataType, ArrowError> {
1643 Ok(AvroDataType {
1644 nullability: None,
1645 metadata: reader_array.attributes.field_metadata(),
1646 codec: Codec::List(Arc::new(self.make_data_type(
1647 writer_array.items.as_ref(),
1648 Some(reader_array.items.as_ref()),
1649 namespace,
1650 )?)),
1651 resolution: None,
1652 })
1653 }
1654
1655 fn resolve_map(
1656 &mut self,
1657 writer_map: &Map<'a>,
1658 reader_map: &Map<'a>,
1659 namespace: Option<&'a str>,
1660 ) -> Result<AvroDataType, ArrowError> {
1661 Ok(AvroDataType {
1662 nullability: None,
1663 metadata: reader_map.attributes.field_metadata(),
1664 codec: Codec::Map(Arc::new(self.make_data_type(
1665 &writer_map.values,
1666 Some(&reader_map.values),
1667 namespace,
1668 )?)),
1669 resolution: None,
1670 })
1671 }
1672
1673 fn resolve_fixed<'s>(
1674 &mut self,
1675 writer_fixed: &Fixed<'a>,
1676 reader_fixed: &Fixed<'a>,
1677 reader_schema: &'s Schema<'a>,
1678 namespace: Option<&'a str>,
1679 ) -> Result<AvroDataType, ArrowError> {
1680 ensure_names_match(
1681 "Fixed",
1682 writer_fixed.name,
1683 writer_fixed.namespace,
1684 &writer_fixed.aliases,
1685 reader_fixed.name,
1686 reader_fixed.namespace,
1687 &reader_fixed.aliases,
1688 )?;
1689 if writer_fixed.size != reader_fixed.size {
1690 return Err(ArrowError::SchemaError(format!(
1691 "Fixed size mismatch for {}: writer={}, reader={}",
1692 reader_fixed.name, writer_fixed.size, reader_fixed.size
1693 )));
1694 }
1695 self.parse_type(reader_schema, namespace)
1696 }
1697
1698 fn resolve_primitives(
1699 &mut self,
1700 write_primitive: PrimitiveType,
1701 read_primitive: PrimitiveType,
1702 reader_schema: &Schema<'a>,
1703 ) -> Result<AvroDataType, ArrowError> {
1704 if write_primitive == read_primitive {
1705 return self.parse_type(reader_schema, None);
1706 }
1707 let promotion = match (write_primitive, read_primitive) {
1708 (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
1709 (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
1710 (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
1711 (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
1712 (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
1713 (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
1714 (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
1715 (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
1716 _ => {
1717 return Err(ArrowError::ParseError(format!(
1718 "Illegal promotion {write_primitive:?} to {read_primitive:?}"
1719 )));
1720 }
1721 };
1722 let mut datatype = self.parse_type(reader_schema, None)?;
1723 datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
1724 Ok(datatype)
1725 }
1726
1727 fn resolve_enums(
1783 &mut self,
1784 writer_enum: &Enum<'a>,
1785 reader_enum: &Enum<'a>,
1786 reader_schema: &Schema<'a>,
1787 namespace: Option<&'a str>,
1788 ) -> Result<AvroDataType, ArrowError> {
1789 ensure_names_match(
1790 "Enum",
1791 writer_enum.name,
1792 writer_enum.namespace,
1793 &writer_enum.aliases,
1794 reader_enum.name,
1795 reader_enum.namespace,
1796 &reader_enum.aliases,
1797 )?;
1798 if writer_enum.symbols == reader_enum.symbols {
1799 return self.parse_type(reader_schema, namespace);
1800 }
1801 let reader_index: HashMap<&str, i32> = reader_enum
1802 .symbols
1803 .iter()
1804 .enumerate()
1805 .map(|(index, &symbol)| (symbol, index as i32))
1806 .collect();
1807 let default_index: i32 = match reader_enum.default {
1808 Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
1809 ArrowError::SchemaError(format!(
1810 "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
1811 reader_enum.name,
1812 ))
1813 })?,
1814 None => -1,
1815 };
1816 let mapping: Vec<i32> = writer_enum
1817 .symbols
1818 .iter()
1819 .map(|&write_symbol| {
1820 reader_index
1821 .get(write_symbol)
1822 .copied()
1823 .unwrap_or(default_index)
1824 })
1825 .collect();
1826 if self.strict_mode && mapping.iter().any(|&m| m < 0) {
1827 return Err(ArrowError::SchemaError(format!(
1828 "Reader enum '{}' does not cover all writer symbols and no default is provided",
1829 reader_enum.name
1830 )));
1831 }
1832 let mut dt = self.parse_type(reader_schema, namespace)?;
1833 dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
1834 mapping: Arc::from(mapping),
1835 default_index,
1836 }));
1837 let reader_ns = reader_enum.namespace.or(namespace);
1838 self.resolver
1839 .register(reader_enum.name, reader_ns, dt.clone());
1840 Ok(dt)
1841 }
1842
1843 #[inline]
1844 fn build_writer_lookup(
1845 writer_record: &Record<'a>,
1846 ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
1847 let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
1848 for (idx, wf) in writer_record.fields.iter().enumerate() {
1849 map.insert(wf.name, idx);
1851 }
1852 let mut ambiguous: HashSet<&str> = HashSet::new();
1854 for (idx, wf) in writer_record.fields.iter().enumerate() {
1855 for &alias in &wf.aliases {
1856 match map.entry(alias) {
1857 Entry::Occupied(e) if *e.get() != idx => {
1858 ambiguous.insert(alias);
1859 }
1860 Entry::Vacant(e) => {
1861 e.insert(idx);
1862 }
1863 _ => {}
1864 }
1865 }
1866 }
1867 (map, ambiguous)
1868 }
1869
1870 fn resolve_records(
1871 &mut self,
1872 writer_record: &Record<'a>,
1873 reader_record: &Record<'a>,
1874 namespace: Option<&'a str>,
1875 ) -> Result<AvroDataType, ArrowError> {
1876 ensure_names_match(
1877 "Record",
1878 writer_record.name,
1879 writer_record.namespace,
1880 &writer_record.aliases,
1881 reader_record.name,
1882 reader_record.namespace,
1883 &reader_record.aliases,
1884 )?;
1885 let writer_ns = writer_record.namespace.or(namespace);
1886 let reader_ns = reader_record.namespace.or(namespace);
1887 let reader_md = reader_record.attributes.field_metadata();
1888 let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
1890 let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
1891 let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
1892 let mut default_fields: Vec<usize> = Vec::new();
1894 for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
1895 let mut match_idx = writer_lookup.get(r_field.name).copied();
1897 let mut matched_via_alias: Option<&str> = None;
1898 if match_idx.is_none() {
1899 for &alias in &r_field.aliases {
1900 if let Some(i) = writer_lookup.get(alias).copied() {
1901 if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
1902 return Err(ArrowError::SchemaError(format!(
1903 "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
1904 r_field.name
1905 )));
1906 }
1907 match_idx = Some(i);
1908 matched_via_alias = Some(alias);
1909 break;
1910 }
1911 }
1912 }
1913 if let Some(wi) = match_idx {
1914 if writer_to_reader[wi].is_none() {
1915 let w_schema = &writer_record.fields[wi].r#type;
1916 let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
1917 writer_to_reader[wi] = Some(reader_idx);
1918 reader_fields.push(AvroField {
1919 name: r_field.name.to_owned(),
1920 data_type: dt,
1921 });
1922 continue;
1923 } else if self.strict_mode {
1924 let existing_reader = writer_to_reader[wi].unwrap();
1926 let via = matched_via_alias
1927 .map(|a| format!("alias '{a}'"))
1928 .unwrap_or_else(|| "name match".to_string());
1929 return Err(ArrowError::SchemaError(format!(
1930 "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
1931 writer_record.fields[wi].name
1932 )));
1933 }
1934 }
1936 let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
1938 if let Some(default_json) = r_field.default.as_ref() {
1939 dt.resolution = Some(ResolutionInfo::DefaultValue(
1940 dt.parse_and_store_default(default_json)?,
1941 ));
1942 default_fields.push(reader_idx);
1943 } else if dt.nullability() == Some(Nullability::NullFirst) {
1944 dt.resolution = Some(ResolutionInfo::DefaultValue(
1946 dt.parse_and_store_default(&Value::Null)?,
1947 ));
1948 default_fields.push(reader_idx);
1949 } else {
1950 return Err(ArrowError::SchemaError(format!(
1951 "Reader field '{}' not present in writer schema must have a default value",
1952 r_field.name
1953 )));
1954 }
1955 reader_fields.push(AvroField {
1956 name: r_field.name.to_owned(),
1957 data_type: dt,
1958 });
1959 }
1960 let mut skip_fields: Vec<Option<AvroDataType>> =
1962 Vec::with_capacity(writer_record.fields.len());
1963 for (writer_index, writer_field) in writer_record.fields.iter().enumerate() {
1964 if writer_to_reader[writer_index].is_some() {
1965 skip_fields.push(None);
1966 } else {
1967 skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?));
1968 }
1969 }
1970 let resolved = AvroDataType::new_with_resolution(
1971 Codec::Struct(Arc::from(reader_fields)),
1972 reader_md,
1973 None,
1974 Some(ResolutionInfo::Record(ResolvedRecord {
1975 writer_to_reader: Arc::from(writer_to_reader),
1976 default_fields: Arc::from(default_fields),
1977 skip_fields: Arc::from(skip_fields),
1978 })),
1979 );
1980 self.resolver
1982 .register(reader_record.name, reader_ns, resolved.clone());
1983 Ok(resolved)
1984 }
1985}
1986
1987#[cfg(test)]
1988mod tests {
1989 use super::*;
1990 use crate::schema::{
1991 AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
1992 Fixed, PrimitiveType, Record, Schema, Type, TypeName,
1993 };
1994 use indexmap::IndexMap;
1995 use serde_json::{self, Value};
1996
1997 fn create_schema_with_logical_type(
1998 primitive_type: PrimitiveType,
1999 logical_type: &'static str,
2000 ) -> Schema<'static> {
2001 let attributes = Attributes {
2002 logical_type: Some(logical_type),
2003 additional: Default::default(),
2004 };
2005
2006 Schema::Type(Type {
2007 r#type: TypeName::Primitive(primitive_type),
2008 attributes,
2009 })
2010 }
2011
2012 fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2013 let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2014 let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2015 let mut maker = Maker::new(false, false);
2016 maker
2017 .make_data_type(&writer_schema, Some(&reader_schema), None)
2018 .expect("promotion should resolve")
2019 }
2020
2021 fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2022 Schema::TypeName(TypeName::Primitive(pt))
2023 }
2024 fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2025 Schema::Union(branches)
2026 }
2027
2028 #[test]
2029 fn test_date_logical_type() {
2030 let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2031
2032 let mut maker = Maker::new(false, false);
2033 let result = maker.make_data_type(&schema, None, None).unwrap();
2034
2035 assert!(matches!(result.codec, Codec::Date32));
2036 }
2037
2038 #[test]
2039 fn test_time_millis_logical_type() {
2040 let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2041
2042 let mut maker = Maker::new(false, false);
2043 let result = maker.make_data_type(&schema, None, None).unwrap();
2044
2045 assert!(matches!(result.codec, Codec::TimeMillis));
2046 }
2047
2048 #[test]
2049 fn test_time_micros_logical_type() {
2050 let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2051
2052 let mut maker = Maker::new(false, false);
2053 let result = maker.make_data_type(&schema, None, None).unwrap();
2054
2055 assert!(matches!(result.codec, Codec::TimeMicros));
2056 }
2057
2058 #[test]
2059 fn test_timestamp_millis_logical_type() {
2060 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2061
2062 let mut maker = Maker::new(false, false);
2063 let result = maker.make_data_type(&schema, None, None).unwrap();
2064
2065 assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2066 }
2067
2068 #[test]
2069 fn test_timestamp_micros_logical_type() {
2070 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2071
2072 let mut maker = Maker::new(false, false);
2073 let result = maker.make_data_type(&schema, None, None).unwrap();
2074
2075 assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2076 }
2077
2078 #[test]
2079 fn test_local_timestamp_millis_logical_type() {
2080 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2081
2082 let mut maker = Maker::new(false, false);
2083 let result = maker.make_data_type(&schema, None, None).unwrap();
2084
2085 assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2086 }
2087
2088 #[test]
2089 fn test_local_timestamp_micros_logical_type() {
2090 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2091
2092 let mut maker = Maker::new(false, false);
2093 let result = maker.make_data_type(&schema, None, None).unwrap();
2094
2095 assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2096 }
2097
2098 #[test]
2099 fn test_uuid_type() {
2100 let mut codec = Codec::Fixed(16);
2101 if let c @ Codec::Fixed(16) = &mut codec {
2102 *c = Codec::Uuid;
2103 }
2104 assert!(matches!(codec, Codec::Uuid));
2105 }
2106
2107 #[test]
2108 fn test_duration_logical_type() {
2109 let mut codec = Codec::Fixed(12);
2110
2111 if let c @ Codec::Fixed(12) = &mut codec {
2112 *c = Codec::Interval;
2113 }
2114
2115 assert!(matches!(codec, Codec::Interval));
2116 }
2117
2118 #[test]
2119 fn test_decimal_logical_type_not_implemented() {
2120 let codec = Codec::Fixed(16);
2121
2122 let process_decimal = || -> Result<(), ArrowError> {
2123 if let Codec::Fixed(_) = codec {
2124 return Err(ArrowError::NotYetImplemented(
2125 "Decimals are not currently supported".to_string(),
2126 ));
2127 }
2128 Ok(())
2129 };
2130
2131 let result = process_decimal();
2132
2133 assert!(result.is_err());
2134 if let Err(ArrowError::NotYetImplemented(msg)) = result {
2135 assert!(msg.contains("Decimals are not currently supported"));
2136 } else {
2137 panic!("Expected NotYetImplemented error");
2138 }
2139 }
2140 #[test]
2141 fn test_unknown_logical_type_added_to_metadata() {
2142 let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2143
2144 let mut maker = Maker::new(false, false);
2145 let result = maker.make_data_type(&schema, None, None).unwrap();
2146
2147 assert_eq!(
2148 result.metadata.get("logicalType"),
2149 Some(&"custom-type".to_string())
2150 );
2151 }
2152
2153 #[test]
2154 fn test_string_with_utf8view_enabled() {
2155 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2156
2157 let mut maker = Maker::new(true, false);
2158 let result = maker.make_data_type(&schema, None, None).unwrap();
2159
2160 assert!(matches!(result.codec, Codec::Utf8View));
2161 }
2162
2163 #[test]
2164 fn test_string_without_utf8view_enabled() {
2165 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2166
2167 let mut maker = Maker::new(false, false);
2168 let result = maker.make_data_type(&schema, None, None).unwrap();
2169
2170 assert!(matches!(result.codec, Codec::Utf8));
2171 }
2172
2173 #[test]
2174 fn test_record_with_string_and_utf8view_enabled() {
2175 let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2176
2177 let avro_field = crate::schema::Field {
2178 name: "string_field",
2179 r#type: field_schema,
2180 default: None,
2181 doc: None,
2182 aliases: vec![],
2183 };
2184
2185 let record = Record {
2186 name: "test_record",
2187 namespace: None,
2188 aliases: vec![],
2189 doc: None,
2190 fields: vec![avro_field],
2191 attributes: Attributes::default(),
2192 };
2193
2194 let schema = Schema::Complex(ComplexType::Record(record));
2195
2196 let mut maker = Maker::new(true, false);
2197 let result = maker.make_data_type(&schema, None, None).unwrap();
2198
2199 if let Codec::Struct(fields) = &result.codec {
2200 let first_field_codec = &fields[0].data_type().codec;
2201 assert!(matches!(first_field_codec, Codec::Utf8View));
2202 } else {
2203 panic!("Expected Struct codec");
2204 }
2205 }
2206
2207 #[test]
2208 fn test_union_with_strict_mode() {
2209 let schema = Schema::Union(vec![
2210 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2211 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2212 ]);
2213
2214 let mut maker = Maker::new(false, true);
2215 let result = maker.make_data_type(&schema, None, None);
2216
2217 assert!(result.is_err());
2218 match result {
2219 Err(ArrowError::SchemaError(msg)) => {
2220 assert!(msg.contains(
2221 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2222 ));
2223 }
2224 _ => panic!("Expected SchemaError"),
2225 }
2226 }
2227
2228 #[test]
2229 fn test_resolve_int_to_float_promotion() {
2230 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2231 assert!(matches!(result.codec, Codec::Float32));
2232 assert_eq!(
2233 result.resolution,
2234 Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2235 );
2236 }
2237
2238 #[test]
2239 fn test_resolve_int_to_double_promotion() {
2240 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2241 assert!(matches!(result.codec, Codec::Float64));
2242 assert_eq!(
2243 result.resolution,
2244 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2245 );
2246 }
2247
2248 #[test]
2249 fn test_resolve_long_to_float_promotion() {
2250 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2251 assert!(matches!(result.codec, Codec::Float32));
2252 assert_eq!(
2253 result.resolution,
2254 Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2255 );
2256 }
2257
2258 #[test]
2259 fn test_resolve_long_to_double_promotion() {
2260 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2261 assert!(matches!(result.codec, Codec::Float64));
2262 assert_eq!(
2263 result.resolution,
2264 Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2265 );
2266 }
2267
2268 #[test]
2269 fn test_resolve_float_to_double_promotion() {
2270 let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2271 assert!(matches!(result.codec, Codec::Float64));
2272 assert_eq!(
2273 result.resolution,
2274 Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2275 );
2276 }
2277
2278 #[test]
2279 fn test_resolve_string_to_bytes_promotion() {
2280 let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2281 assert!(matches!(result.codec, Codec::Binary));
2282 assert_eq!(
2283 result.resolution,
2284 Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2285 );
2286 }
2287
2288 #[test]
2289 fn test_resolve_bytes_to_string_promotion() {
2290 let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2291 assert!(matches!(result.codec, Codec::Utf8));
2292 assert_eq!(
2293 result.resolution,
2294 Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2295 );
2296 }
2297
2298 #[test]
2299 fn test_resolve_illegal_promotion_double_to_float_errors() {
2300 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2301 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2302 let mut maker = Maker::new(false, false);
2303 let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2304 assert!(result.is_err());
2305 match result {
2306 Err(ArrowError::ParseError(msg)) => {
2307 assert!(msg.contains("Illegal promotion"));
2308 }
2309 _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2310 }
2311 }
2312
2313 #[test]
2314 fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2315 let writer = Schema::Union(vec![
2316 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2317 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2318 ]);
2319 let reader = Schema::Union(vec![
2320 Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2321 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2322 ]);
2323 let mut maker = Maker::new(false, false);
2324 let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2325 assert!(matches!(result.codec, Codec::Float64));
2326 assert_eq!(
2327 result.resolution,
2328 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2329 );
2330 assert_eq!(result.nullability, Some(Nullability::NullFirst));
2331 }
2332
2333 #[test]
2334 fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2335 let writer = mk_union(vec![
2336 mk_primitive(PrimitiveType::String),
2337 mk_primitive(PrimitiveType::Long),
2338 ]);
2339 let reader = mk_primitive(PrimitiveType::Bytes);
2340 let mut maker = Maker::new(false, false);
2341 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2342 assert!(matches!(dt.codec(), Codec::Binary));
2343 let resolved = match dt.resolution {
2344 Some(ResolutionInfo::Union(u)) => u,
2345 other => panic!("expected union resolution info, got {other:?}"),
2346 };
2347 assert!(resolved.writer_is_union && !resolved.reader_is_union);
2348 assert_eq!(
2349 resolved.writer_to_reader.as_ref(),
2350 &[Some((0, Promotion::StringToBytes)), None]
2351 );
2352 }
2353
2354 #[test]
2355 fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2356 let writer = mk_primitive(PrimitiveType::Long);
2357 let reader = mk_union(vec![
2358 mk_primitive(PrimitiveType::Long),
2359 mk_primitive(PrimitiveType::Double),
2360 ]);
2361 let mut maker = Maker::new(false, false);
2362 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2363 let resolved = match dt.resolution {
2364 Some(ResolutionInfo::Union(u)) => u,
2365 other => panic!("expected union resolution info, got {other:?}"),
2366 };
2367 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2368 assert_eq!(
2369 resolved.writer_to_reader.as_ref(),
2370 &[Some((0, Promotion::Direct))]
2371 );
2372 }
2373
2374 #[test]
2375 fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2376 let writer = mk_primitive(PrimitiveType::Int);
2377 let reader = mk_union(vec![
2378 mk_primitive(PrimitiveType::Null),
2379 mk_primitive(PrimitiveType::Long),
2380 mk_primitive(PrimitiveType::String),
2381 ]);
2382 let mut maker = Maker::new(false, false);
2383 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2384 let resolved = match dt.resolution {
2385 Some(ResolutionInfo::Union(u)) => u,
2386 other => panic!("expected union resolution info, got {other:?}"),
2387 };
2388 assert_eq!(
2389 resolved.writer_to_reader.as_ref(),
2390 &[Some((1, Promotion::IntToLong))]
2391 );
2392 }
2393
2394 #[test]
2395 fn test_resolve_both_nullable_unions_direct_match() {
2396 let writer = mk_union(vec![
2397 mk_primitive(PrimitiveType::Null),
2398 mk_primitive(PrimitiveType::String),
2399 ]);
2400 let reader = mk_union(vec![
2401 mk_primitive(PrimitiveType::String),
2402 mk_primitive(PrimitiveType::Null),
2403 ]);
2404 let mut maker = Maker::new(false, false);
2405 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2406 assert!(matches!(dt.codec(), Codec::Utf8));
2407 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2408 assert!(dt.resolution.is_none());
2409 }
2410
2411 #[test]
2412 fn test_resolve_both_nullable_unions_with_promotion() {
2413 let writer = mk_union(vec![
2414 mk_primitive(PrimitiveType::Null),
2415 mk_primitive(PrimitiveType::Int),
2416 ]);
2417 let reader = mk_union(vec![
2418 mk_primitive(PrimitiveType::Double),
2419 mk_primitive(PrimitiveType::Null),
2420 ]);
2421 let mut maker = Maker::new(false, false);
2422 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2423 assert!(matches!(dt.codec(), Codec::Float64));
2424 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2425 assert_eq!(
2426 dt.resolution,
2427 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2428 );
2429 }
2430
2431 #[test]
2432 fn test_resolve_type_promotion() {
2433 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2434 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2435 let mut maker = Maker::new(false, false);
2436 let result = maker
2437 .make_data_type(&writer_schema, Some(&reader_schema), None)
2438 .unwrap();
2439 assert!(matches!(result.codec, Codec::Int64));
2440 assert_eq!(
2441 result.resolution,
2442 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2443 );
2444 }
2445
2446 #[test]
2447 fn test_nested_record_type_reuse_without_namespace() {
2448 let schema_str = r#"
2449 {
2450 "type": "record",
2451 "name": "Record",
2452 "fields": [
2453 {
2454 "name": "nested",
2455 "type": {
2456 "type": "record",
2457 "name": "Nested",
2458 "fields": [
2459 { "name": "nested_int", "type": "int" }
2460 ]
2461 }
2462 },
2463 { "name": "nestedRecord", "type": "Nested" },
2464 { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
2465 { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
2466 ]
2467 }
2468 "#;
2469
2470 let schema: Schema = serde_json::from_str(schema_str).unwrap();
2471
2472 let mut maker = Maker::new(false, false);
2473 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2474
2475 if let Codec::Struct(fields) = avro_data_type.codec() {
2476 assert_eq!(fields.len(), 4);
2477
2478 assert_eq!(fields[0].name(), "nested");
2480 let nested_data_type = fields[0].data_type();
2481 if let Codec::Struct(nested_fields) = nested_data_type.codec() {
2482 assert_eq!(nested_fields.len(), 1);
2483 assert_eq!(nested_fields[0].name(), "nested_int");
2484 assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
2485 } else {
2486 panic!(
2487 "'nested' field is not a struct but {:?}",
2488 nested_data_type.codec()
2489 );
2490 }
2491
2492 assert_eq!(fields[1].name(), "nestedRecord");
2494 let nested_record_data_type = fields[1].data_type();
2495 assert_eq!(
2496 nested_record_data_type.codec().data_type(),
2497 nested_data_type.codec().data_type()
2498 );
2499
2500 assert_eq!(fields[2].name(), "nestedArray");
2502 if let Codec::List(item_type) = fields[2].data_type().codec() {
2503 assert_eq!(
2504 item_type.codec().data_type(),
2505 nested_data_type.codec().data_type()
2506 );
2507 } else {
2508 panic!("'nestedArray' field is not a list");
2509 }
2510
2511 assert_eq!(fields[3].name(), "nestedMap");
2513 if let Codec::Map(value_type) = fields[3].data_type().codec() {
2514 assert_eq!(
2515 value_type.codec().data_type(),
2516 nested_data_type.codec().data_type()
2517 );
2518 } else {
2519 panic!("'nestedMap' field is not a map");
2520 }
2521 } else {
2522 panic!("Top-level schema is not a struct");
2523 }
2524 }
2525
2526 #[test]
2527 fn test_nested_enum_type_reuse_with_namespace() {
2528 let schema_str = r#"
2529 {
2530 "type": "record",
2531 "name": "Record",
2532 "namespace": "record_ns",
2533 "fields": [
2534 {
2535 "name": "status",
2536 "type": {
2537 "type": "enum",
2538 "name": "Status",
2539 "namespace": "enum_ns",
2540 "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
2541 }
2542 },
2543 { "name": "backupStatus", "type": "enum_ns.Status" },
2544 { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
2545 { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
2546 ]
2547 }
2548 "#;
2549
2550 let schema: Schema = serde_json::from_str(schema_str).unwrap();
2551
2552 let mut maker = Maker::new(false, false);
2553 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
2554
2555 if let Codec::Struct(fields) = avro_data_type.codec() {
2556 assert_eq!(fields.len(), 4);
2557
2558 assert_eq!(fields[0].name(), "status");
2560 let status_data_type = fields[0].data_type();
2561 if let Codec::Enum(symbols) = status_data_type.codec() {
2562 assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
2563 } else {
2564 panic!(
2565 "'status' field is not an enum but {:?}",
2566 status_data_type.codec()
2567 );
2568 }
2569
2570 assert_eq!(fields[1].name(), "backupStatus");
2572 let backup_status_data_type = fields[1].data_type();
2573 assert_eq!(
2574 backup_status_data_type.codec().data_type(),
2575 status_data_type.codec().data_type()
2576 );
2577
2578 assert_eq!(fields[2].name(), "statusHistory");
2580 if let Codec::List(item_type) = fields[2].data_type().codec() {
2581 assert_eq!(
2582 item_type.codec().data_type(),
2583 status_data_type.codec().data_type()
2584 );
2585 } else {
2586 panic!("'statusHistory' field is not a list");
2587 }
2588
2589 assert_eq!(fields[3].name(), "statusMap");
2591 if let Codec::Map(value_type) = fields[3].data_type().codec() {
2592 assert_eq!(
2593 value_type.codec().data_type(),
2594 status_data_type.codec().data_type()
2595 );
2596 } else {
2597 panic!("'statusMap' field is not a map");
2598 }
2599 } else {
2600 panic!("Top-level schema is not a struct");
2601 }
2602 }
2603
2604 #[test]
2605 fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
2606 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2607 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2608 let mut maker = Maker::new(false, false);
2609 let data_type = maker
2610 .make_data_type(&writer_schema, Some(&reader_schema), None)
2611 .expect("resolution should succeed");
2612 let field = AvroField {
2613 name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
2614 data_type,
2615 };
2616 assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
2617 assert!(matches!(field.data_type().codec(), Codec::Utf8));
2618 }
2619
2620 fn json_string(s: &str) -> Value {
2621 Value::String(s.to_string())
2622 }
2623
2624 fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
2625 let stored = dt
2626 .metadata
2627 .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
2628 .cloned()
2629 .unwrap_or_default();
2630 let expected = serde_json::to_string(default_json).unwrap();
2631 assert_eq!(stored, expected, "stored default metadata should match");
2632 }
2633
2634 #[test]
2635 fn test_validate_and_store_default_null_and_nullability_rules() {
2636 let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
2637 let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
2638 assert_eq!(lit, AvroLiteral::Null);
2639 assert_default_stored(&dt_null, &Value::Null);
2640 let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2641 let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
2642 assert!(
2643 err.to_string()
2644 .contains("JSON null default is only valid for `null` type"),
2645 "unexpected error: {err}"
2646 );
2647 let mut dt_int_nf =
2648 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
2649 let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
2650 assert_eq!(lit2, AvroLiteral::Null);
2651 assert_default_stored(&dt_int_nf, &Value::Null);
2652 let mut dt_int_ns =
2653 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
2654 let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
2655 assert!(
2656 err2.to_string()
2657 .contains("JSON null default is only valid for `null` type"),
2658 "unexpected error: {err2}"
2659 );
2660 }
2661
2662 #[test]
2663 fn test_validate_and_store_default_primitives_and_temporal() {
2664 let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
2665 let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
2666 assert_eq!(lit, AvroLiteral::Boolean(true));
2667 assert_default_stored(&dt_bool, &Value::Bool(true));
2668 let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
2669 let lit = dt_i32
2670 .parse_and_store_default(&serde_json::json!(123))
2671 .unwrap();
2672 assert_eq!(lit, AvroLiteral::Int(123));
2673 assert_default_stored(&dt_i32, &serde_json::json!(123));
2674 let err = dt_i32
2675 .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
2676 .unwrap_err();
2677 assert!(format!("{err}").contains("out of i32 range"));
2678 let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2679 let lit = dt_i64
2680 .parse_and_store_default(&serde_json::json!(1234567890))
2681 .unwrap();
2682 assert_eq!(lit, AvroLiteral::Long(1234567890));
2683 assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
2684 let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
2685 let lit = dt_f32
2686 .parse_and_store_default(&serde_json::json!(1.25))
2687 .unwrap();
2688 assert_eq!(lit, AvroLiteral::Float(1.25));
2689 assert_default_stored(&dt_f32, &serde_json::json!(1.25));
2690 let err = dt_f32
2691 .parse_and_store_default(&serde_json::json!(1e39))
2692 .unwrap_err();
2693 assert!(format!("{err}").contains("out of f32 range"));
2694 let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2695 let lit = dt_f64
2696 .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
2697 .unwrap();
2698 assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
2699 assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
2700 let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
2701 let l = dt_str
2702 .parse_and_store_default(&json_string("hello"))
2703 .unwrap();
2704 assert_eq!(l, AvroLiteral::String("hello".into()));
2705 assert_default_stored(&dt_str, &json_string("hello"));
2706 let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
2707 let l = dt_strv
2708 .parse_and_store_default(&json_string("view"))
2709 .unwrap();
2710 assert_eq!(l, AvroLiteral::String("view".into()));
2711 assert_default_stored(&dt_strv, &json_string("view"));
2712 let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
2713 let l = dt_uuid
2714 .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
2715 .unwrap();
2716 assert_eq!(
2717 l,
2718 AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
2719 );
2720 let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
2721 let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
2722 assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
2723 let err = dt_bin
2724 .parse_and_store_default(&json_string("€")) .unwrap_err();
2726 assert!(format!("{err}").contains("Invalid codepoint"));
2727 let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
2728 let ld = dt_date
2729 .parse_and_store_default(&serde_json::json!(1))
2730 .unwrap();
2731 assert_eq!(ld, AvroLiteral::Int(1));
2732 let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
2733 let lt = dt_tmill
2734 .parse_and_store_default(&serde_json::json!(86_400_000))
2735 .unwrap();
2736 assert_eq!(lt, AvroLiteral::Int(86_400_000));
2737 let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
2738 let ltm = dt_tmicros
2739 .parse_and_store_default(&serde_json::json!(1_000_000))
2740 .unwrap();
2741 assert_eq!(ltm, AvroLiteral::Long(1_000_000));
2742 let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
2743 let l1 = dt_ts_milli
2744 .parse_and_store_default(&serde_json::json!(123))
2745 .unwrap();
2746 assert_eq!(l1, AvroLiteral::Long(123));
2747 let mut dt_ts_micro =
2748 AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
2749 let l2 = dt_ts_micro
2750 .parse_and_store_default(&serde_json::json!(456))
2751 .unwrap();
2752 assert_eq!(l2, AvroLiteral::Long(456));
2753 }
2754
2755 #[test]
2756 fn test_validate_and_store_default_fixed_decimal_interval() {
2757 let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
2758 let l = dt_fixed
2759 .parse_and_store_default(&json_string("WXYZ"))
2760 .unwrap();
2761 assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
2762 let err = dt_fixed
2763 .parse_and_store_default(&json_string("TOO LONG"))
2764 .unwrap_err();
2765 assert!(err.to_string().contains("Default length"));
2766 let mut dt_dec_fixed =
2767 AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
2768 let l = dt_dec_fixed
2769 .parse_and_store_default(&json_string("abc"))
2770 .unwrap();
2771 assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
2772 let err = dt_dec_fixed
2773 .parse_and_store_default(&json_string("toolong"))
2774 .unwrap_err();
2775 assert!(err.to_string().contains("Default length"));
2776 let mut dt_dec_bytes =
2777 AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
2778 let l = dt_dec_bytes
2779 .parse_and_store_default(&json_string("freeform"))
2780 .unwrap();
2781 assert_eq!(
2782 l,
2783 AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
2784 );
2785 let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
2786 let l = dt_interval
2787 .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
2788 .unwrap();
2789 assert_eq!(
2790 l,
2791 AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
2792 );
2793 let err = dt_interval
2794 .parse_and_store_default(&json_string("short"))
2795 .unwrap_err();
2796 assert!(err.to_string().contains("Default length"));
2797 }
2798
2799 #[test]
2800 fn test_validate_and_store_default_enum_list_map_struct() {
2801 let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
2802 .into_iter()
2803 .collect();
2804 let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
2805 let l = dt_enum
2806 .parse_and_store_default(&json_string("GREEN"))
2807 .unwrap();
2808 assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
2809 let err = dt_enum
2810 .parse_and_store_default(&json_string("YELLOW"))
2811 .unwrap_err();
2812 assert!(err.to_string().contains("Default enum symbol"));
2813 let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
2814 let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
2815 let val = serde_json::json!([1, 2, 3]);
2816 let l = dt_list.parse_and_store_default(&val).unwrap();
2817 assert_eq!(
2818 l,
2819 AvroLiteral::Array(vec![
2820 AvroLiteral::Long(1),
2821 AvroLiteral::Long(2),
2822 AvroLiteral::Long(3)
2823 ])
2824 );
2825 let err = dt_list
2826 .parse_and_store_default(&serde_json::json!({"not":"array"}))
2827 .unwrap_err();
2828 assert!(err.to_string().contains("JSON array"));
2829 let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
2830 let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
2831 let mv = serde_json::json!({"x": 1.5, "y": 2.5});
2832 let l = dt_map.parse_and_store_default(&mv).unwrap();
2833 let mut expected = IndexMap::new();
2834 expected.insert("x".into(), AvroLiteral::Double(1.5));
2835 expected.insert("y".into(), AvroLiteral::Double(2.5));
2836 assert_eq!(l, AvroLiteral::Map(expected));
2837 let err = dt_map
2839 .parse_and_store_default(&serde_json::json!(123))
2840 .unwrap_err();
2841 assert!(err.to_string().contains("JSON object"));
2842 let mut field_a = AvroField {
2843 name: "a".into(),
2844 data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
2845 };
2846 let field_b = AvroField {
2847 name: "b".into(),
2848 data_type: AvroDataType::new(
2849 Codec::Int64,
2850 HashMap::new(),
2851 Some(Nullability::NullFirst),
2852 ),
2853 };
2854 let mut c_md = HashMap::new();
2855 c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
2856 let field_c = AvroField {
2857 name: "c".into(),
2858 data_type: AvroDataType::new(Codec::Utf8, c_md, None),
2859 };
2860 field_a.data_type.metadata.insert("doc".into(), "na".into());
2861 let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
2862 let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
2863 let default_obj = serde_json::json!({"a": 7});
2864 let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
2865 let mut expected = IndexMap::new();
2866 expected.insert("a".into(), AvroLiteral::Int(7));
2867 expected.insert("b".into(), AvroLiteral::Null);
2868 expected.insert("c".into(), AvroLiteral::String("xyz".into()));
2869 assert_eq!(l, AvroLiteral::Map(expected));
2870 assert_default_stored(&dt_struct, &default_obj);
2871 let req_field = AvroField {
2872 name: "req".into(),
2873 data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
2874 };
2875 let mut dt_bad = AvroDataType::new(
2876 Codec::Struct(Arc::from(vec![req_field])),
2877 HashMap::new(),
2878 None,
2879 );
2880 let err = dt_bad
2881 .parse_and_store_default(&serde_json::json!({}))
2882 .unwrap_err();
2883 assert!(
2884 err.to_string().contains("missing required subfield 'req'"),
2885 "unexpected error: {err}"
2886 );
2887 let err = dt_struct
2888 .parse_and_store_default(&serde_json::json!(10))
2889 .unwrap_err();
2890 err.to_string().contains("must be a JSON object");
2891 }
2892
2893 #[test]
2894 fn test_resolve_array_promotion_and_reader_metadata() {
2895 let mut w_add: HashMap<&str, Value> = HashMap::new();
2896 w_add.insert("who", json_string("writer"));
2897 let mut r_add: HashMap<&str, Value> = HashMap::new();
2898 r_add.insert("who", json_string("reader"));
2899 let writer_schema = Schema::Complex(ComplexType::Array(Array {
2900 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2901 attributes: Attributes {
2902 logical_type: None,
2903 additional: w_add,
2904 },
2905 }));
2906 let reader_schema = Schema::Complex(ComplexType::Array(Array {
2907 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
2908 attributes: Attributes {
2909 logical_type: None,
2910 additional: r_add,
2911 },
2912 }));
2913 let mut maker = Maker::new(false, false);
2914 let dt = maker
2915 .make_data_type(&writer_schema, Some(&reader_schema), None)
2916 .unwrap();
2917 assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
2918 if let Codec::List(inner) = dt.codec() {
2919 assert!(matches!(inner.codec(), Codec::Int64));
2920 assert_eq!(
2921 inner.resolution,
2922 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2923 );
2924 } else {
2925 panic!("expected list codec");
2926 }
2927 }
2928
2929 #[test]
2930 fn test_resolve_fixed_success_name_and_size_match_and_alias() {
2931 let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
2932 name: "MD5",
2933 namespace: None,
2934 aliases: vec!["Hash16"],
2935 size: 16,
2936 attributes: Attributes::default(),
2937 }));
2938 let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
2939 name: "Hash16",
2940 namespace: None,
2941 aliases: vec![],
2942 size: 16,
2943 attributes: Attributes::default(),
2944 }));
2945 let mut maker = Maker::new(false, false);
2946 let dt = maker
2947 .make_data_type(&writer_schema, Some(&reader_schema), None)
2948 .unwrap();
2949 assert!(matches!(dt.codec(), Codec::Fixed(16)));
2950 }
2951
2952 #[test]
2953 fn test_resolve_records_mapping_default_fields_and_skip_fields() {
2954 let writer = Schema::Complex(ComplexType::Record(Record {
2955 name: "R",
2956 namespace: None,
2957 doc: None,
2958 aliases: vec![],
2959 fields: vec![
2960 crate::schema::Field {
2961 name: "a",
2962 doc: None,
2963 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2964 default: None,
2965 aliases: vec![],
2966 },
2967 crate::schema::Field {
2968 name: "skipme",
2969 doc: None,
2970 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2971 default: None,
2972 aliases: vec![],
2973 },
2974 crate::schema::Field {
2975 name: "b",
2976 doc: None,
2977 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2978 default: None,
2979 aliases: vec![],
2980 },
2981 ],
2982 attributes: Attributes::default(),
2983 }));
2984 let reader = Schema::Complex(ComplexType::Record(Record {
2985 name: "R",
2986 namespace: None,
2987 doc: None,
2988 aliases: vec![],
2989 fields: vec![
2990 crate::schema::Field {
2991 name: "b",
2992 doc: None,
2993 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2994 default: None,
2995 aliases: vec![],
2996 },
2997 crate::schema::Field {
2998 name: "a",
2999 doc: None,
3000 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3001 default: None,
3002 aliases: vec![],
3003 },
3004 crate::schema::Field {
3005 name: "name",
3006 doc: None,
3007 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3008 default: Some(json_string("anon")),
3009 aliases: vec![],
3010 },
3011 crate::schema::Field {
3012 name: "opt",
3013 doc: None,
3014 r#type: Schema::Union(vec![
3015 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3016 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3017 ]),
3018 default: None, aliases: vec![],
3020 },
3021 ],
3022 attributes: Attributes::default(),
3023 }));
3024 let mut maker = Maker::new(false, false);
3025 let dt = maker
3026 .make_data_type(&writer, Some(&reader), None)
3027 .expect("record resolution");
3028 let fields = match dt.codec() {
3029 Codec::Struct(f) => f,
3030 other => panic!("expected struct, got {other:?}"),
3031 };
3032 assert_eq!(fields.len(), 4);
3033 assert_eq!(fields[0].name(), "b");
3034 assert_eq!(fields[1].name(), "a");
3035 assert_eq!(fields[2].name(), "name");
3036 assert_eq!(fields[3].name(), "opt");
3037 assert!(matches!(
3038 fields[1].data_type().resolution,
3039 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3040 ));
3041 let rec = match dt.resolution {
3042 Some(ResolutionInfo::Record(ref r)) => r.clone(),
3043 other => panic!("expected record resolution, got {other:?}"),
3044 };
3045 assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
3046 assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3047 assert!(rec.skip_fields[0].is_none());
3048 assert!(rec.skip_fields[2].is_none());
3049 let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
3050 assert!(matches!(skip1.codec(), Codec::Utf8));
3051 let name_md = &fields[2].data_type().metadata;
3052 assert_eq!(
3053 name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3054 Some(&"\"anon\"".to_string())
3055 );
3056 let opt_md = &fields[3].data_type().metadata;
3057 assert_eq!(
3058 opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3059 Some(&"null".to_string())
3060 );
3061 }
3062
3063 #[test]
3064 fn test_named_type_alias_resolution_record_cross_namespace() {
3065 let writer_record = Record {
3066 name: "PersonV2",
3067 namespace: Some("com.example.v2"),
3068 doc: None,
3069 aliases: vec!["com.example.Person"],
3070 fields: vec![
3071 AvroFieldSchema {
3072 name: "name",
3073 doc: None,
3074 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3075 default: None,
3076 aliases: vec![],
3077 },
3078 AvroFieldSchema {
3079 name: "age",
3080 doc: None,
3081 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3082 default: None,
3083 aliases: vec![],
3084 },
3085 ],
3086 attributes: Attributes::default(),
3087 };
3088 let reader_record = Record {
3089 name: "Person",
3090 namespace: Some("com.example"),
3091 doc: None,
3092 aliases: vec![],
3093 fields: writer_record.fields.clone(),
3094 attributes: Attributes::default(),
3095 };
3096 let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3097 let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3098 let mut maker = Maker::new(false, false);
3099 let result = maker
3100 .make_data_type(&writer_schema, Some(&reader_schema), None)
3101 .expect("record alias resolution should succeed");
3102 match result.codec {
3103 Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3104 other => panic!("expected struct, got {other:?}"),
3105 }
3106 }
3107
3108 #[test]
3109 fn test_named_type_alias_resolution_enum_cross_namespace() {
3110 let writer_enum = Enum {
3111 name: "ColorV2",
3112 namespace: Some("org.example.v2"),
3113 doc: None,
3114 aliases: vec!["org.example.Color"],
3115 symbols: vec!["RED", "GREEN", "BLUE"],
3116 default: None,
3117 attributes: Attributes::default(),
3118 };
3119 let reader_enum = Enum {
3120 name: "Color",
3121 namespace: Some("org.example"),
3122 doc: None,
3123 aliases: vec![],
3124 symbols: vec!["RED", "GREEN", "BLUE"],
3125 default: None,
3126 attributes: Attributes::default(),
3127 };
3128 let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3129 let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3130 let mut maker = Maker::new(false, false);
3131 maker
3132 .make_data_type(&writer_schema, Some(&reader_schema), None)
3133 .expect("enum alias resolution should succeed");
3134 }
3135
3136 #[test]
3137 fn test_named_type_alias_resolution_fixed_cross_namespace() {
3138 let writer_fixed = Fixed {
3139 name: "Fx10V2",
3140 namespace: Some("ns.v2"),
3141 aliases: vec!["ns.Fx10"],
3142 size: 10,
3143 attributes: Attributes::default(),
3144 };
3145 let reader_fixed = Fixed {
3146 name: "Fx10",
3147 namespace: Some("ns"),
3148 aliases: vec![],
3149 size: 10,
3150 attributes: Attributes::default(),
3151 };
3152 let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3153 let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3154 let mut maker = Maker::new(false, false);
3155 maker
3156 .make_data_type(&writer_schema, Some(&reader_schema), None)
3157 .expect("fixed alias resolution should succeed");
3158 }
3159}