1use crate::schema::{
21 AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22 AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23 PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26 ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27 IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43 Promotion(Promotion),
45 DefaultValue(AvroLiteral),
47 EnumMapping(EnumMapping),
49 Record(ResolvedRecord),
51 Union(ResolvedUnion),
53}
54
55#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60 Null,
62 Boolean(bool),
64 Int(i32),
66 Long(i64),
68 Float(f32),
70 Double(f64),
72 Bytes(Vec<u8>),
74 String(String),
76 Enum(String),
78 Array(Vec<AvroLiteral>),
80 Map(IndexMap<String, AvroLiteral>),
82}
83
84#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87 pub(crate) writer_to_reader: Arc<[Option<usize>]>,
90 pub(crate) default_fields: Arc<[usize]>,
92 pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub(crate) enum Promotion {
103 Direct,
105 IntToLong,
107 IntToFloat,
109 IntToDouble,
111 LongToFloat,
113 LongToDouble,
115 FloatToDouble,
117 StringToBytes,
119 BytesToString,
121}
122
123impl Display for Promotion {
124 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
125 match self {
126 Self::Direct => write!(formatter, "Direct"),
127 Self::IntToLong => write!(formatter, "Int->Long"),
128 Self::IntToFloat => write!(formatter, "Int->Float"),
129 Self::IntToDouble => write!(formatter, "Int->Double"),
130 Self::LongToFloat => write!(formatter, "Long->Float"),
131 Self::LongToDouble => write!(formatter, "Long->Double"),
132 Self::FloatToDouble => write!(formatter, "Float->Double"),
133 Self::StringToBytes => write!(formatter, "String->Bytes"),
134 Self::BytesToString => write!(formatter, "Bytes->String"),
135 }
136 }
137}
138
139#[derive(Debug, Clone, PartialEq)]
141pub(crate) struct ResolvedUnion {
142 pub(crate) writer_to_reader: Arc<[Option<(usize, ResolutionInfo)>]>,
145 pub(crate) writer_is_union: bool,
147 pub(crate) reader_is_union: bool,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
155pub(crate) struct EnumMapping {
156 pub(crate) mapping: Arc<[i32]>,
158 pub(crate) default_index: i32,
161}
162
163#[cfg(feature = "canonical_extension_types")]
164fn with_extension_type(codec: &Codec, field: Field) -> Field {
165 match codec {
166 Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
167 _ => field,
168 }
169}
170
171#[derive(Debug, Clone, PartialEq)]
173pub(crate) struct AvroDataType {
174 nullability: Option<Nullability>,
175 metadata: HashMap<String, String>,
176 codec: Codec,
177 pub(crate) resolution: Option<ResolutionInfo>,
178}
179
180impl AvroDataType {
181 pub(crate) fn new(
183 codec: Codec,
184 metadata: HashMap<String, String>,
185 nullability: Option<Nullability>,
186 ) -> Self {
187 AvroDataType {
188 codec,
189 metadata,
190 nullability,
191 resolution: None,
192 }
193 }
194
195 #[inline]
196 fn new_with_resolution(
197 codec: Codec,
198 metadata: HashMap<String, String>,
199 nullability: Option<Nullability>,
200 resolution: Option<ResolutionInfo>,
201 ) -> Self {
202 Self {
203 codec,
204 metadata,
205 nullability,
206 resolution,
207 }
208 }
209
210 pub(crate) fn field_with_name(&self, name: &str) -> Field {
212 let mut nullable = self.nullability.is_some();
213 if !nullable {
214 if let Codec::Union(children, _, _) = self.codec() {
215 if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
217 nullable = true;
218 }
219 }
220 }
221 let data_type = self.codec.data_type();
222 let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
223 #[cfg(feature = "canonical_extension_types")]
224 return with_extension_type(&self.codec, field);
225 #[cfg(not(feature = "canonical_extension_types"))]
226 field
227 }
228
229 pub(crate) fn codec(&self) -> &Codec {
234 &self.codec
235 }
236
237 pub(crate) fn nullability(&self) -> Option<Nullability> {
245 self.nullability
246 }
247
248 #[inline]
249 fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
250 fn expect_string<'v>(
251 default_json: &'v Value,
252 data_type: &str,
253 ) -> Result<&'v str, ArrowError> {
254 match default_json {
255 Value::String(s) => Ok(s.as_str()),
256 _ => Err(ArrowError::SchemaError(format!(
257 "Default value must be a JSON string for {data_type}"
258 ))),
259 }
260 }
261
262 fn parse_bytes_default(
263 default_json: &Value,
264 expected_len: Option<usize>,
265 ) -> Result<Vec<u8>, ArrowError> {
266 let s = expect_string(default_json, "bytes/fixed logical types")?;
267 let mut out = Vec::with_capacity(s.len());
268 for ch in s.chars() {
269 let cp = ch as u32;
270 if cp > 0xFF {
271 return Err(ArrowError::SchemaError(format!(
272 "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
273 )));
274 }
275 out.push(cp as u8);
276 }
277 if let Some(len) = expected_len {
278 if out.len() != len {
279 return Err(ArrowError::SchemaError(format!(
280 "Default length {} does not match expected fixed size {len}",
281 out.len(),
282 )));
283 }
284 }
285 Ok(out)
286 }
287
288 fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
289 match default_json {
290 Value::Number(n) => n.as_i64().ok_or_else(|| {
291 ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
292 }),
293 _ => Err(ArrowError::SchemaError(format!(
294 "Default {data_type} must be a JSON integer"
295 ))),
296 }
297 }
298
299 fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
300 match default_json {
301 Value::Number(n) => n.as_f64().ok_or_else(|| {
302 ArrowError::SchemaError(format!("Default {data_type} must be a number"))
303 }),
304 _ => Err(ArrowError::SchemaError(format!(
305 "Default {data_type} must be a JSON number"
306 ))),
307 }
308 }
309
310 if default_json.is_null() {
312 return match self.codec() {
313 Codec::Null => Ok(AvroLiteral::Null),
314 Codec::Union(encodings, _, _) if !encodings.is_empty()
315 && matches!(encodings[0].codec(), Codec::Null) =>
316 {
317 Ok(AvroLiteral::Null)
318 }
319 _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
320 _ => Err(ArrowError::SchemaError(
321 "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
322 .to_string(),
323 )),
324 };
325 }
326 let lit = match self.codec() {
327 Codec::Null => {
328 return Err(ArrowError::SchemaError(
329 "Default for `null` type must be JSON null".to_string(),
330 ));
331 }
332 Codec::Boolean => match default_json {
333 Value::Bool(b) => AvroLiteral::Boolean(*b),
334 _ => {
335 return Err(ArrowError::SchemaError(
336 "Boolean default must be a JSON boolean".to_string(),
337 ));
338 }
339 },
340 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
341 let i = parse_json_i64(default_json, "int")?;
342 if i < i32::MIN as i64 || i > i32::MAX as i64 {
343 return Err(ArrowError::SchemaError(format!(
344 "Default int {i} out of i32 range"
345 )));
346 }
347 AvroLiteral::Int(i as i32)
348 }
349 Codec::Int64
350 | Codec::TimeMicros
351 | Codec::TimestampMillis(_)
352 | Codec::TimestampMicros(_)
353 | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
354 #[cfg(feature = "avro_custom_types")]
355 Codec::DurationNanos
356 | Codec::DurationMicros
357 | Codec::DurationMillis
358 | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
359 #[cfg(feature = "avro_custom_types")]
360 Codec::Int8 => {
361 let i = parse_json_i64(default_json, "int")?;
362 if i < i8::MIN as i64 || i > i8::MAX as i64 {
363 return Err(ArrowError::SchemaError(format!(
364 "Default int8 {i} out of i8 range"
365 )));
366 }
367 AvroLiteral::Int(i as i32)
368 }
369 #[cfg(feature = "avro_custom_types")]
370 Codec::Int16 => {
371 let i = parse_json_i64(default_json, "int")?;
372 if i < i16::MIN as i64 || i > i16::MAX as i64 {
373 return Err(ArrowError::SchemaError(format!(
374 "Default int16 {i} out of i16 range"
375 )));
376 }
377 AvroLiteral::Int(i as i32)
378 }
379 #[cfg(feature = "avro_custom_types")]
380 Codec::UInt8 => {
381 let i = parse_json_i64(default_json, "int")?;
382 if i < 0 || i > u8::MAX as i64 {
383 return Err(ArrowError::SchemaError(format!(
384 "Default uint8 {i} out of u8 range"
385 )));
386 }
387 AvroLiteral::Int(i as i32)
388 }
389 #[cfg(feature = "avro_custom_types")]
390 Codec::UInt16 => {
391 let i = parse_json_i64(default_json, "int")?;
392 if i < 0 || i > u16::MAX as i64 {
393 return Err(ArrowError::SchemaError(format!(
394 "Default uint16 {i} out of u16 range"
395 )));
396 }
397 AvroLiteral::Int(i as i32)
398 }
399 #[cfg(feature = "avro_custom_types")]
400 Codec::UInt32 => {
401 let i = parse_json_i64(default_json, "long")?;
402 if i < 0 || i > u32::MAX as i64 {
403 return Err(ArrowError::SchemaError(format!(
404 "Default uint32 {i} out of u32 range"
405 )));
406 }
407 AvroLiteral::Long(i)
408 }
409 #[cfg(feature = "avro_custom_types")]
410 Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
411 AvroLiteral::Long(parse_json_i64(default_json, "long")?)
412 }
413 #[cfg(feature = "avro_custom_types")]
414 Codec::UInt64 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?),
415 #[cfg(feature = "avro_custom_types")]
416 Codec::Float16 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(2))?),
417 #[cfg(feature = "avro_custom_types")]
418 Codec::Time32Secs => {
419 let i = parse_json_i64(default_json, "int")?;
420 if i < i32::MIN as i64 || i > i32::MAX as i64 {
421 return Err(ArrowError::SchemaError(format!(
422 "Default time32-secs {i} out of i32 range"
423 )));
424 }
425 AvroLiteral::Int(i as i32)
426 }
427 #[cfg(feature = "avro_custom_types")]
428 Codec::IntervalYearMonth => {
429 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
430 }
431 #[cfg(feature = "avro_custom_types")]
432 Codec::IntervalMonthDayNano => {
433 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
434 }
435 #[cfg(feature = "avro_custom_types")]
436 Codec::IntervalDayTime => {
437 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
438 }
439 Codec::Float32 => {
440 let f = parse_json_f64(default_json, "float")?;
441 if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
442 return Err(ArrowError::SchemaError(format!(
443 "Default float {f} out of f32 range or not finite"
444 )));
445 }
446 AvroLiteral::Float(f as f32)
447 }
448 Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
449 Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
450 AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
451 }
452 Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
453 Codec::Fixed(sz) => {
454 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
455 }
456 Codec::Decimal(_, _, fixed_size) => {
457 AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
458 }
459 Codec::Enum(symbols) => {
460 let s = expect_string(default_json, "enum")?;
461 if symbols.iter().any(|sym| sym == s) {
462 AvroLiteral::Enum(s.to_string())
463 } else {
464 return Err(ArrowError::SchemaError(format!(
465 "Default enum symbol {s:?} not found in reader enum symbols"
466 )));
467 }
468 }
469 Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
470 Codec::List(item_dt) => match default_json {
471 Value::Array(items) => AvroLiteral::Array(
472 items
473 .iter()
474 .map(|v| item_dt.parse_default_literal(v))
475 .collect::<Result<_, _>>()?,
476 ),
477 _ => {
478 return Err(ArrowError::SchemaError(
479 "Default value must be a JSON array for Avro array type".to_string(),
480 ));
481 }
482 },
483 Codec::Map(val_dt) => match default_json {
484 Value::Object(map) => {
485 let mut out = IndexMap::with_capacity(map.len());
486 for (k, v) in map {
487 out.insert(k.clone(), val_dt.parse_default_literal(v)?);
488 }
489 AvroLiteral::Map(out)
490 }
491 _ => {
492 return Err(ArrowError::SchemaError(
493 "Default value must be a JSON object for Avro map type".to_string(),
494 ));
495 }
496 },
497 Codec::Struct(fields) => match default_json {
498 Value::Object(obj) => {
499 let mut out: IndexMap<String, AvroLiteral> =
500 IndexMap::with_capacity(fields.len());
501 for f in fields.as_ref() {
502 let name = f.name().to_string();
503 if let Some(sub) = obj.get(&name) {
504 out.insert(name, f.data_type().parse_default_literal(sub)?);
505 } else {
506 let stored_default =
508 f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
509 if stored_default.is_none()
510 && f.data_type().nullability() == Some(Nullability::default())
511 {
512 out.insert(name, AvroLiteral::Null);
513 } else if let Some(default_json) = stored_default {
514 let v: Value =
515 serde_json::from_str(default_json).map_err(|e| {
516 ArrowError::SchemaError(format!(
517 "Failed to parse stored subfield default JSON for '{}': {e}",
518 f.name(),
519 ))
520 })?;
521 out.insert(name, f.data_type().parse_default_literal(&v)?);
522 } else {
523 return Err(ArrowError::SchemaError(format!(
524 "Record default missing required subfield '{}' with non-nullable type {:?}",
525 f.name(),
526 f.data_type().codec()
527 )));
528 }
529 }
530 }
531 AvroLiteral::Map(out)
532 }
533 _ => {
534 return Err(ArrowError::SchemaError(
535 "Default value for record/struct must be a JSON object".to_string(),
536 ));
537 }
538 },
539 Codec::Union(encodings, _, _) => {
540 let Some(default_encoding) = encodings.first() else {
541 return Err(ArrowError::SchemaError(
542 "Union with no branches cannot have a default".to_string(),
543 ));
544 };
545 default_encoding.parse_default_literal(default_json)?
546 }
547 #[cfg(feature = "avro_custom_types")]
548 Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
549 };
550 Ok(lit)
551 }
552
553 fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
554 let json_text = serde_json::to_string(default_json).map_err(|e| {
555 ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
556 })?;
557 self.metadata
558 .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
559 Ok(())
560 }
561
562 fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
563 let lit = self.parse_default_literal(default_json)?;
564 self.store_default(default_json)?;
565 Ok(lit)
566 }
567}
568
569#[derive(Debug, Clone, PartialEq)]
571pub(crate) struct AvroField {
572 name: String,
573 data_type: AvroDataType,
574}
575
576impl AvroField {
577 pub(crate) fn field(&self) -> Field {
579 self.data_type.field_with_name(&self.name)
580 }
581
582 pub(crate) fn data_type(&self) -> &AvroDataType {
584 &self.data_type
585 }
586
587 pub(crate) fn with_utf8view(&self) -> Self {
596 let mut field = self.clone();
597 if let Codec::Utf8 = field.data_type.codec {
598 field.data_type.codec = Codec::Utf8View;
599 }
600 field
601 }
602
603 pub(crate) fn name(&self) -> &str {
608 &self.name
609 }
610}
611
612impl<'a> TryFrom<&Schema<'a>> for AvroField {
613 type Error = ArrowError;
614
615 fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
616 match schema {
617 Schema::Complex(ComplexType::Record(r)) => {
618 let mut resolver = Maker::new(false, false);
619 let data_type = resolver.make_data_type(schema, None, None)?;
620 Ok(AvroField {
621 data_type,
622 name: r.name.to_string(),
623 })
624 }
625 _ => Err(ArrowError::ParseError(format!(
626 "Expected record got {schema:?}"
627 ))),
628 }
629 }
630}
631
632#[derive(Debug)]
634pub(crate) struct AvroFieldBuilder<'a> {
635 writer_schema: &'a Schema<'a>,
636 reader_schema: Option<&'a Schema<'a>>,
637 use_utf8view: bool,
638 strict_mode: bool,
639}
640
641impl<'a> AvroFieldBuilder<'a> {
642 pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
644 Self {
645 writer_schema,
646 reader_schema: None,
647 use_utf8view: false,
648 strict_mode: false,
649 }
650 }
651
652 #[inline]
657 pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
658 self.reader_schema = Some(reader_schema);
659 self
660 }
661
662 pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
664 self.use_utf8view = use_utf8view;
665 self
666 }
667
668 pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
670 self.strict_mode = strict_mode;
671 self
672 }
673
674 pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
676 match self.writer_schema {
677 Schema::Complex(ComplexType::Record(r)) => {
678 let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
679 let data_type =
680 resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
681 Ok(AvroField {
682 name: r.name.to_string(),
683 data_type,
684 })
685 }
686 _ => Err(ArrowError::ParseError(format!(
687 "Expected a Record schema to build an AvroField, but got {:?}",
688 self.writer_schema
689 ))),
690 }
691 }
692}
693
694#[derive(Debug, Clone, PartialEq)]
698pub(crate) enum Codec {
699 Null,
701 Boolean,
703 Int32,
705 Int64,
707 Float32,
709 Float64,
711 Binary,
713 Utf8,
715 Utf8View,
720 Date32,
722 TimeMillis,
724 TimeMicros,
726 TimestampMillis(bool),
731 TimestampMicros(bool),
736 TimestampNanos(bool),
741 Fixed(i32),
744 Decimal(usize, Option<usize>, Option<usize>),
751 Uuid,
753 Enum(Arc<[String]>),
757 List(Arc<AvroDataType>),
759 Struct(Arc<[AvroField]>),
761 Map(Arc<AvroDataType>),
763 Interval,
765 Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
767 #[cfg(feature = "avro_custom_types")]
769 DurationNanos,
770 #[cfg(feature = "avro_custom_types")]
772 DurationMicros,
773 #[cfg(feature = "avro_custom_types")]
775 DurationMillis,
776 #[cfg(feature = "avro_custom_types")]
778 DurationSeconds,
779 #[cfg(feature = "avro_custom_types")]
780 RunEndEncoded(Arc<AvroDataType>, u8),
781 #[cfg(feature = "avro_custom_types")]
783 Int8,
784 #[cfg(feature = "avro_custom_types")]
786 Int16,
787 #[cfg(feature = "avro_custom_types")]
789 UInt8,
790 #[cfg(feature = "avro_custom_types")]
792 UInt16,
793 #[cfg(feature = "avro_custom_types")]
795 UInt32,
796 #[cfg(feature = "avro_custom_types")]
798 UInt64,
799 #[cfg(feature = "avro_custom_types")]
801 Float16,
802 #[cfg(feature = "avro_custom_types")]
804 Date64,
805 #[cfg(feature = "avro_custom_types")]
807 TimeNanos,
808 #[cfg(feature = "avro_custom_types")]
810 Time32Secs,
811 #[cfg(feature = "avro_custom_types")]
814 TimestampSecs(bool),
815 #[cfg(feature = "avro_custom_types")]
817 IntervalYearMonth,
818 #[cfg(feature = "avro_custom_types")]
820 IntervalMonthDayNano,
821 #[cfg(feature = "avro_custom_types")]
823 IntervalDayTime,
824}
825
826impl Codec {
827 fn data_type(&self) -> DataType {
828 match self {
829 Self::Null => DataType::Null,
830 Self::Boolean => DataType::Boolean,
831 Self::Int32 => DataType::Int32,
832 Self::Int64 => DataType::Int64,
833 Self::Float32 => DataType::Float32,
834 Self::Float64 => DataType::Float64,
835 Self::Binary => DataType::Binary,
836 Self::Utf8 => DataType::Utf8,
837 Self::Utf8View => DataType::Utf8View,
838 Self::Date32 => DataType::Date32,
839 Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
840 Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
841 Self::TimestampMillis(is_utc) => {
842 DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
843 }
844 Self::TimestampMicros(is_utc) => {
845 DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
846 }
847 Self::TimestampNanos(is_utc) => {
848 DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
849 }
850 Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
851 Self::Fixed(size) => DataType::FixedSizeBinary(*size),
852 Self::Decimal(precision, scale, _size) => {
853 let p = *precision as u8;
854 let s = scale.unwrap_or(0) as i8;
855 #[cfg(feature = "small_decimals")]
856 {
857 if *precision <= DECIMAL32_MAX_PRECISION as usize {
858 DataType::Decimal32(p, s)
859 } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
860 DataType::Decimal64(p, s)
861 } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
862 DataType::Decimal128(p, s)
863 } else {
864 DataType::Decimal256(p, s)
865 }
866 }
867 #[cfg(not(feature = "small_decimals"))]
868 {
869 if *precision <= DECIMAL128_MAX_PRECISION as usize {
870 DataType::Decimal128(p, s)
871 } else {
872 DataType::Decimal256(p, s)
873 }
874 }
875 }
876 Self::Uuid => DataType::FixedSizeBinary(16),
877 Self::Enum(_) => {
878 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
879 }
880 Self::List(f) => {
881 DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
882 }
883 Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
884 Self::Map(value_type) => {
885 let val_field = value_type.field_with_name("value");
886 DataType::Map(
887 Arc::new(Field::new(
888 "entries",
889 DataType::Struct(Fields::from(vec![
890 Field::new("key", DataType::Utf8, false),
891 val_field,
892 ])),
893 false,
894 )),
895 false,
896 )
897 }
898 Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
899 #[cfg(feature = "avro_custom_types")]
900 Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
901 #[cfg(feature = "avro_custom_types")]
902 Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
903 #[cfg(feature = "avro_custom_types")]
904 Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
905 #[cfg(feature = "avro_custom_types")]
906 Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
907 #[cfg(feature = "avro_custom_types")]
908 Self::RunEndEncoded(values, bits) => {
909 let run_ends_dt = match *bits {
910 16 => DataType::Int16,
911 32 => DataType::Int32,
912 64 => DataType::Int64,
913 _ => unreachable!(),
914 };
915 DataType::RunEndEncoded(
916 Arc::new(Field::new("run_ends", run_ends_dt, false)),
917 Arc::new(Field::new("values", values.codec().data_type(), true)),
918 )
919 }
920 #[cfg(feature = "avro_custom_types")]
921 Self::Int8 => DataType::Int8,
922 #[cfg(feature = "avro_custom_types")]
923 Self::Int16 => DataType::Int16,
924 #[cfg(feature = "avro_custom_types")]
925 Self::UInt8 => DataType::UInt8,
926 #[cfg(feature = "avro_custom_types")]
927 Self::UInt16 => DataType::UInt16,
928 #[cfg(feature = "avro_custom_types")]
929 Self::UInt32 => DataType::UInt32,
930 #[cfg(feature = "avro_custom_types")]
931 Self::UInt64 => DataType::UInt64,
932 #[cfg(feature = "avro_custom_types")]
933 Self::Float16 => DataType::Float16,
934 #[cfg(feature = "avro_custom_types")]
935 Self::Date64 => DataType::Date64,
936 #[cfg(feature = "avro_custom_types")]
937 Self::TimeNanos => DataType::Time64(TimeUnit::Nanosecond),
938 #[cfg(feature = "avro_custom_types")]
939 Self::Time32Secs => DataType::Time32(TimeUnit::Second),
940 #[cfg(feature = "avro_custom_types")]
941 Self::TimestampSecs(is_utc) => {
942 DataType::Timestamp(TimeUnit::Second, is_utc.then(|| "+00:00".into()))
943 }
944 #[cfg(feature = "avro_custom_types")]
945 Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
946 #[cfg(feature = "avro_custom_types")]
947 Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
948 #[cfg(feature = "avro_custom_types")]
949 Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
950 }
951 }
952
953 pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
959 if use_utf8view && matches!(self, Self::Utf8) {
960 Self::Utf8View
961 } else {
962 self
963 }
964 }
965
966 #[inline]
967 fn union_field_name(&self) -> String {
968 UnionFieldKind::from(self).as_ref().to_owned()
969 }
970}
971
972impl From<PrimitiveType> for Codec {
973 fn from(value: PrimitiveType) -> Self {
974 match value {
975 PrimitiveType::Null => Self::Null,
976 PrimitiveType::Boolean => Self::Boolean,
977 PrimitiveType::Int => Self::Int32,
978 PrimitiveType::Long => Self::Int64,
979 PrimitiveType::Float => Self::Float32,
980 PrimitiveType::Double => Self::Float64,
981 PrimitiveType::Bytes => Self::Binary,
982 PrimitiveType::String => Self::Utf8,
983 }
984 }
985}
986
987const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
996 const MAX_P: [usize; 32] = [
1001 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1002 59, 62, 64, 67, 69, 71, 74, 76,
1003 ];
1004 match n {
1005 1..=32 => Some(MAX_P[n - 1]),
1006 _ => None,
1007 }
1008}
1009
1010fn parse_decimal_attributes(
1011 attributes: &Attributes,
1012 fallback_size: Option<usize>,
1013 precision_required: bool,
1014) -> Result<(usize, usize, Option<usize>), ArrowError> {
1015 let precision = attributes
1016 .additional
1017 .get("precision")
1018 .and_then(|v| v.as_u64())
1019 .or(if precision_required { None } else { Some(10) })
1020 .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
1021 as usize;
1022 let scale = attributes
1023 .additional
1024 .get("scale")
1025 .and_then(|v| v.as_u64())
1026 .unwrap_or(0) as usize;
1027 let size = attributes
1028 .additional
1029 .get("size")
1030 .and_then(|v| v.as_u64())
1031 .map(|s| s as usize)
1032 .or(fallback_size);
1033 if precision == 0 {
1034 return Err(ArrowError::ParseError(
1035 "Decimal requires precision > 0".to_string(),
1036 ));
1037 }
1038 if scale > precision {
1039 return Err(ArrowError::ParseError(format!(
1040 "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
1041 )));
1042 }
1043 if precision > DECIMAL256_MAX_PRECISION as usize {
1044 return Err(ArrowError::ParseError(format!(
1045 "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
1046 DECIMAL256_MAX_PRECISION
1047 )));
1048 }
1049 if let Some(sz) = size {
1050 let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
1051 ArrowError::ParseError(format!(
1052 "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
1053 ))
1054 })?;
1055 if precision > max_p {
1056 return Err(ArrowError::ParseError(format!(
1057 "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
1058 )));
1059 }
1060 }
1061 Ok((precision, scale, size))
1062}
1063
1064#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
1065#[strum(serialize_all = "snake_case")]
1066enum UnionFieldKind {
1067 Null,
1068 Boolean,
1069 Int,
1070 Long,
1071 Float,
1072 Double,
1073 Bytes,
1074 String,
1075 Date,
1076 TimeMillis,
1077 TimeMicros,
1078 TimestampMillisUtc,
1079 TimestampMillisLocal,
1080 TimestampMicrosUtc,
1081 TimestampMicrosLocal,
1082 TimestampNanosUtc,
1083 TimestampNanosLocal,
1084 Duration,
1085 Fixed,
1086 Decimal,
1087 Enum,
1088 Array,
1089 Record,
1090 Map,
1091 Uuid,
1092 Union,
1093}
1094
1095impl From<&Codec> for UnionFieldKind {
1096 fn from(c: &Codec) -> Self {
1097 match c {
1098 Codec::Null => Self::Null,
1099 Codec::Boolean => Self::Boolean,
1100 Codec::Int32 => Self::Int,
1101 Codec::Int64 => Self::Long,
1102 Codec::Float32 => Self::Float,
1103 Codec::Float64 => Self::Double,
1104 Codec::Binary => Self::Bytes,
1105 Codec::Utf8 | Codec::Utf8View => Self::String,
1106 Codec::Date32 => Self::Date,
1107 Codec::TimeMillis => Self::TimeMillis,
1108 Codec::TimeMicros => Self::TimeMicros,
1109 Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
1110 Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
1111 Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
1112 Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
1113 Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
1114 Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
1115 Codec::Interval => Self::Duration,
1116 Codec::Fixed(_) => Self::Fixed,
1117 Codec::Decimal(..) => Self::Decimal,
1118 Codec::Enum(_) => Self::Enum,
1119 Codec::List(_) => Self::Array,
1120 Codec::Struct(_) => Self::Record,
1121 Codec::Map(_) => Self::Map,
1122 Codec::Uuid => Self::Uuid,
1123 Codec::Union(..) => Self::Union,
1124 #[cfg(feature = "avro_custom_types")]
1125 Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
1126 #[cfg(feature = "avro_custom_types")]
1127 Codec::DurationNanos
1128 | Codec::DurationMicros
1129 | Codec::DurationMillis
1130 | Codec::DurationSeconds => Self::Duration,
1131 #[cfg(feature = "avro_custom_types")]
1132 Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 => Self::Int,
1133 #[cfg(feature = "avro_custom_types")]
1134 Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
1135 Self::Long
1136 }
1137 #[cfg(feature = "avro_custom_types")]
1138 Codec::Time32Secs => Self::TimeMillis, #[cfg(feature = "avro_custom_types")]
1140 Codec::UInt64
1141 | Codec::Float16
1142 | Codec::IntervalYearMonth
1143 | Codec::IntervalMonthDayNano
1144 | Codec::IntervalDayTime => Self::Fixed,
1145 }
1146 }
1147}
1148
1149fn union_branch_name(dt: &AvroDataType) -> String {
1150 if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
1151 if name.contains(".") {
1152 return name.to_string();
1154 }
1155 if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1156 return format!("{ns}.{name}");
1157 }
1158 return name.to_string();
1159 }
1160 dt.codec.union_field_name()
1161}
1162
1163fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
1164 let arrow_fields: Vec<Field> = encodings
1165 .iter()
1166 .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1167 .collect();
1168 let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1169 UnionFields::try_new(type_ids, arrow_fields)
1170}
1171
1172#[derive(Debug, Default)]
1176struct Resolver<'a> {
1177 map: HashMap<(&'a str, &'a str), AvroDataType>,
1178}
1179
1180impl<'a> Resolver<'a> {
1181 fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1182 self.map.insert((namespace.unwrap_or(""), name), schema);
1183 }
1184
1185 fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1186 let (namespace, name) = name
1187 .rsplit_once('.')
1188 .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1189 self.map
1190 .get(&(namespace, name))
1191 .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1192 .cloned()
1193 }
1194}
1195
1196fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1197 let mut out = HashSet::with_capacity(1 + aliases.len());
1198 let (full, _) = make_full_name(name, ns, None);
1199 out.insert(full);
1200 for a in aliases {
1201 let (fa, _) = make_full_name(a, None, ns);
1202 out.insert(fa);
1203 }
1204 out
1205}
1206
1207fn names_match(
1208 writer_name: &str,
1209 writer_namespace: Option<&str>,
1210 writer_aliases: &[&str],
1211 reader_name: &str,
1212 reader_namespace: Option<&str>,
1213 reader_aliases: &[&str],
1214) -> bool {
1215 let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1216 let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1217 !writer_set.is_disjoint(&reader_set)
1219}
1220
1221fn ensure_names_match(
1222 data_type: &str,
1223 writer_name: &str,
1224 writer_namespace: Option<&str>,
1225 writer_aliases: &[&str],
1226 reader_name: &str,
1227 reader_namespace: Option<&str>,
1228 reader_aliases: &[&str],
1229) -> Result<(), ArrowError> {
1230 if names_match(
1231 writer_name,
1232 writer_namespace,
1233 writer_aliases,
1234 reader_name,
1235 reader_namespace,
1236 reader_aliases,
1237 ) {
1238 Ok(())
1239 } else {
1240 Err(ArrowError::ParseError(format!(
1241 "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1242 )))
1243 }
1244}
1245
1246fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1247 match schema {
1248 Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1249 Schema::Type(Type {
1250 r#type: TypeName::Primitive(primitive),
1251 ..
1252 }) => Some(*primitive),
1253 _ => None,
1254 }
1255}
1256
1257fn nullable_union_variants<'x, 'y>(
1258 variant: &'y [Schema<'x>],
1259) -> Option<(Nullability, &'y Schema<'x>)> {
1260 if variant.len() != 2 {
1261 return None;
1262 }
1263 let is_null = |schema: &Schema<'x>| {
1264 matches!(
1265 schema,
1266 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1267 )
1268 };
1269 match (is_null(&variant[0]), is_null(&variant[1])) {
1270 (true, false) => Some((Nullability::NullFirst, &variant[1])),
1271 (false, true) => Some((Nullability::NullSecond, &variant[0])),
1272 _ => None,
1273 }
1274}
1275
1276#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1277enum UnionBranchKey {
1278 Named(String),
1279 Primitive(PrimitiveType),
1280 Array,
1281 Map,
1282}
1283
1284fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1285 let (name, namespace) = match s {
1286 Schema::TypeName(TypeName::Primitive(p))
1287 | Schema::Type(Type {
1288 r#type: TypeName::Primitive(p),
1289 ..
1290 }) => return Some(UnionBranchKey::Primitive(*p)),
1291 Schema::TypeName(TypeName::Ref(name))
1292 | Schema::Type(Type {
1293 r#type: TypeName::Ref(name),
1294 ..
1295 }) => (name, None),
1296 Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1297 Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1298 Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1299 Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1300 Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1301 Schema::Union(_) => return None,
1302 };
1303 let (full, _) = make_full_name(name, namespace, enclosing_ns);
1304 Some(UnionBranchKey::Named(full))
1305}
1306
1307fn union_first_duplicate<'a>(
1308 branches: &'a [Schema<'a>],
1309 enclosing_ns: Option<&'a str>,
1310) -> Option<String> {
1311 let mut seen = HashSet::with_capacity(branches.len());
1312 for schema in branches {
1313 if let Some(key) = branch_key_of(schema, enclosing_ns) {
1314 if !seen.insert(key.clone()) {
1315 let msg = match key {
1316 UnionBranchKey::Named(full) => format!("named type {full}"),
1317 UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1318 UnionBranchKey::Array => "array".to_string(),
1319 UnionBranchKey::Map => "map".to_string(),
1320 };
1321 return Some(msg);
1322 }
1323 }
1324 }
1325 None
1326}
1327
1328struct Maker<'a> {
1332 resolver: Resolver<'a>,
1333 use_utf8view: bool,
1334 strict_mode: bool,
1335}
1336
1337impl<'a> Maker<'a> {
1338 fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1339 Self {
1340 resolver: Default::default(),
1341 use_utf8view,
1342 strict_mode,
1343 }
1344 }
1345
1346 #[cfg(feature = "avro_custom_types")]
1347 #[inline]
1348 fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1349 if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1350 let mut inner = (*values).clone();
1351 inner.nullability = Some(nb);
1352 dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1353 }
1354 }
1355
1356 fn make_data_type<'s>(
1357 &mut self,
1358 writer_schema: &'s Schema<'a>,
1359 reader_schema: Option<&'s Schema<'a>>,
1360 namespace: Option<&'a str>,
1361 ) -> Result<AvroDataType, ArrowError> {
1362 match reader_schema {
1363 Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1364 None => self.parse_type(writer_schema, namespace),
1365 }
1366 }
1367
1368 fn parse_type<'s>(
1381 &mut self,
1382 schema: &'s Schema<'a>,
1383 namespace: Option<&'a str>,
1384 ) -> Result<AvroDataType, ArrowError> {
1385 match schema {
1386 Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1387 Codec::from(*p).with_utf8view(self.use_utf8view),
1388 Default::default(),
1389 None,
1390 )),
1391 Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1392 Schema::Union(f) => {
1393 let null = f
1394 .iter()
1395 .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1396 match (f.len() == 2, null) {
1397 (true, Some(0)) => {
1398 let mut field = self.parse_type(&f[1], namespace)?;
1399 field.nullability = Some(Nullability::NullFirst);
1400 #[cfg(feature = "avro_custom_types")]
1401 Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1402 return Ok(field);
1403 }
1404 (true, Some(1)) => {
1405 if self.strict_mode {
1406 return Err(ArrowError::SchemaError(
1407 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1408 .to_string(),
1409 ));
1410 }
1411 let mut field = self.parse_type(&f[0], namespace)?;
1412 field.nullability = Some(Nullability::NullSecond);
1413 #[cfg(feature = "avro_custom_types")]
1414 Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1415 return Ok(field);
1416 }
1417 _ => {}
1418 }
1419 if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1421 return Err(ArrowError::SchemaError(
1422 "Avro unions may not immediately contain other unions".to_string(),
1423 ));
1424 }
1425 if let Some(dup) = union_first_duplicate(f, namespace) {
1427 return Err(ArrowError::SchemaError(format!(
1428 "Avro union contains duplicate branch type: {dup}"
1429 )));
1430 }
1431 let children: Vec<AvroDataType> = f
1433 .iter()
1434 .map(|s| self.parse_type(s, namespace))
1435 .collect::<Result<_, _>>()?;
1436 let union_fields = build_union_fields(&children)?;
1438 Ok(AvroDataType::new(
1439 Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1440 Default::default(),
1441 None,
1442 ))
1443 }
1444 Schema::Complex(c) => match c {
1445 ComplexType::Record(r) => {
1446 let namespace = r.namespace.or(namespace);
1447 let mut metadata = r.attributes.field_metadata();
1448 let fields = r
1449 .fields
1450 .iter()
1451 .map(|field| {
1452 Ok(AvroField {
1453 name: field.name.to_string(),
1454 data_type: self.parse_type(&field.r#type, namespace)?,
1455 })
1456 })
1457 .collect::<Result<_, ArrowError>>()?;
1458 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1459 if let Some(ns) = namespace {
1460 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1461 }
1462 let field = AvroDataType {
1463 nullability: None,
1464 codec: Codec::Struct(fields),
1465 metadata,
1466 resolution: None,
1467 };
1468 self.resolver.register(r.name, namespace, field.clone());
1469 Ok(field)
1470 }
1471 ComplexType::Array(a) => {
1472 let field = self.parse_type(a.items.as_ref(), namespace)?;
1473 Ok(AvroDataType {
1474 nullability: None,
1475 metadata: a.attributes.field_metadata(),
1476 codec: Codec::List(Arc::new(field)),
1477 resolution: None,
1478 })
1479 }
1480 ComplexType::Fixed(f) => {
1481 let size = f.size.try_into().map_err(|e| {
1482 ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1483 })?;
1484 let namespace = f.namespace.or(namespace);
1485 let mut metadata = f.attributes.field_metadata();
1486 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1487 if let Some(ns) = namespace {
1488 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1489 }
1490 let field = match f.attributes.logical_type {
1491 Some("decimal") => {
1492 let (precision, scale, _) =
1493 parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1494 AvroDataType {
1495 nullability: None,
1496 metadata,
1497 codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1498 resolution: None,
1499 }
1500 }
1501 Some("duration") => {
1502 if size != 12 {
1503 return Err(ArrowError::ParseError(format!(
1504 "Invalid fixed size for Duration: {size}, must be 12"
1505 )));
1506 };
1507 AvroDataType {
1508 nullability: None,
1509 metadata,
1510 codec: Codec::Interval,
1511 resolution: None,
1512 }
1513 }
1514 #[cfg(feature = "avro_custom_types")]
1515 Some("arrow.uint64") if size == 8 => AvroDataType {
1516 nullability: None,
1517 metadata,
1518 codec: Codec::UInt64,
1519 resolution: None,
1520 },
1521 #[cfg(feature = "avro_custom_types")]
1522 Some("arrow.float16") if size == 2 => AvroDataType {
1523 nullability: None,
1524 metadata,
1525 codec: Codec::Float16,
1526 resolution: None,
1527 },
1528 #[cfg(feature = "avro_custom_types")]
1529 Some("arrow.interval-year-month") if size == 4 => AvroDataType {
1530 nullability: None,
1531 metadata,
1532 codec: Codec::IntervalYearMonth,
1533 resolution: None,
1534 },
1535 #[cfg(feature = "avro_custom_types")]
1536 Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1537 nullability: None,
1538 metadata,
1539 codec: Codec::IntervalMonthDayNano,
1540 resolution: None,
1541 },
1542 #[cfg(feature = "avro_custom_types")]
1543 Some("arrow.interval-day-time") if size == 8 => AvroDataType {
1544 nullability: None,
1545 metadata,
1546 codec: Codec::IntervalDayTime,
1547 resolution: None,
1548 },
1549 _ => AvroDataType {
1550 nullability: None,
1551 metadata,
1552 codec: Codec::Fixed(size),
1553 resolution: None,
1554 },
1555 };
1556 self.resolver.register(f.name, namespace, field.clone());
1557 Ok(field)
1558 }
1559 ComplexType::Enum(e) => {
1560 let namespace = e.namespace.or(namespace);
1561 let symbols = e
1562 .symbols
1563 .iter()
1564 .map(|s| s.to_string())
1565 .collect::<Arc<[String]>>();
1566 let mut metadata = e.attributes.field_metadata();
1567 let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1568 ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1569 })?;
1570 metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1571 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1572 if let Some(ns) = namespace {
1573 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1574 }
1575 let field = AvroDataType {
1576 nullability: None,
1577 metadata,
1578 codec: Codec::Enum(symbols),
1579 resolution: None,
1580 };
1581 self.resolver.register(e.name, namespace, field.clone());
1582 Ok(field)
1583 }
1584 ComplexType::Map(m) => {
1585 let val = self.parse_type(&m.values, namespace)?;
1586 Ok(AvroDataType {
1587 nullability: None,
1588 metadata: m.attributes.field_metadata(),
1589 codec: Codec::Map(Arc::new(val)),
1590 resolution: None,
1591 })
1592 }
1593 },
1594 Schema::Type(t) => {
1595 let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1596 match (t.attributes.logical_type, &mut field.codec) {
1598 (Some("decimal"), c @ Codec::Binary) => {
1599 let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1600 *c = Codec::Decimal(prec, Some(sc), None);
1601 }
1602 (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1603 (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1604 (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1605 (Some("timestamp-millis"), c @ Codec::Int64) => {
1606 *c = Codec::TimestampMillis(true)
1607 }
1608 (Some("timestamp-micros"), c @ Codec::Int64) => {
1609 *c = Codec::TimestampMicros(true)
1610 }
1611 (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1612 *c = Codec::TimestampMillis(false)
1613 }
1614 (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1615 *c = Codec::TimestampMicros(false)
1616 }
1617 (Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
1618 (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1619 *c = Codec::TimestampNanos(false)
1620 }
1621 (Some("uuid"), c @ Codec::Utf8) => {
1622 *c = Codec::Uuid;
1626 field.metadata.insert("logicalType".into(), "uuid".into());
1627 }
1628 #[cfg(feature = "avro_custom_types")]
1629 (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1630 #[cfg(feature = "avro_custom_types")]
1631 (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1632 #[cfg(feature = "avro_custom_types")]
1633 (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1634 #[cfg(feature = "avro_custom_types")]
1635 (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1636 *c = Codec::DurationSeconds
1637 }
1638 #[cfg(feature = "avro_custom_types")]
1639 (Some("arrow.run-end-encoded"), _) => {
1640 let bits_u8: u8 = t
1641 .attributes
1642 .additional
1643 .get("arrow.runEndIndexBits")
1644 .and_then(|v| v.as_u64())
1645 .and_then(|n| u8::try_from(n).ok())
1646 .ok_or_else(|| ArrowError::ParseError(
1647 "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1648 .to_string(),
1649 ))?;
1650 if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1651 return Err(ArrowError::ParseError(format!(
1652 "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1653 )));
1654 }
1655 let values_site = field.clone();
1657 field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1658 }
1659 #[cfg(feature = "avro_custom_types")]
1661 (Some("arrow.int8"), c @ Codec::Int32) => *c = Codec::Int8,
1662 #[cfg(feature = "avro_custom_types")]
1663 (Some("arrow.int16"), c @ Codec::Int32) => *c = Codec::Int16,
1664 #[cfg(feature = "avro_custom_types")]
1665 (Some("arrow.uint8"), c @ Codec::Int32) => *c = Codec::UInt8,
1666 #[cfg(feature = "avro_custom_types")]
1667 (Some("arrow.uint16"), c @ Codec::Int32) => *c = Codec::UInt16,
1668 #[cfg(feature = "avro_custom_types")]
1669 (Some("arrow.uint32"), c @ Codec::Int64) => *c = Codec::UInt32,
1670 #[cfg(feature = "avro_custom_types")]
1671 (Some("arrow.uint64"), c @ Codec::Fixed(8)) => *c = Codec::UInt64,
1672 #[cfg(feature = "avro_custom_types")]
1674 (Some("arrow.float16"), c @ Codec::Fixed(2)) => *c = Codec::Float16,
1675 #[cfg(feature = "avro_custom_types")]
1677 (Some("arrow.date64"), c @ Codec::Int64) => *c = Codec::Date64,
1678 #[cfg(feature = "avro_custom_types")]
1680 (Some("arrow.time64-nanosecond"), c @ Codec::Int64) => *c = Codec::TimeNanos,
1681 #[cfg(feature = "avro_custom_types")]
1682 (Some("arrow.time32-second"), c @ Codec::Int32) => *c = Codec::Time32Secs,
1683 #[cfg(feature = "avro_custom_types")]
1684 (Some("arrow.timestamp-second"), c @ Codec::Int64) => {
1685 *c = Codec::TimestampSecs(true)
1686 }
1687 #[cfg(feature = "avro_custom_types")]
1688 (Some("arrow.local-timestamp-second"), c @ Codec::Int64) => {
1689 *c = Codec::TimestampSecs(false)
1690 }
1691 #[cfg(feature = "avro_custom_types")]
1693 (Some("arrow.interval-year-month"), c @ Codec::Fixed(4)) => {
1694 *c = Codec::IntervalYearMonth
1695 }
1696 #[cfg(feature = "avro_custom_types")]
1697 (Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1698 *c = Codec::IntervalMonthDayNano
1699 }
1700 #[cfg(feature = "avro_custom_types")]
1701 (Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
1702 *c = Codec::IntervalDayTime
1703 }
1704 (Some(logical), _) => {
1705 field.metadata.insert("logicalType".into(), logical.into());
1707 }
1708 (None, _) => {}
1709 }
1710 if matches!(field.codec, Codec::Int64) {
1711 if let Some(unit) = t
1712 .attributes
1713 .additional
1714 .get("arrowTimeUnit")
1715 .and_then(|v| v.as_str())
1716 {
1717 if unit == "nanosecond" {
1718 field.codec = Codec::TimestampNanos(false);
1719 }
1720 }
1721 }
1722 if !t.attributes.additional.is_empty() {
1723 for (k, v) in &t.attributes.additional {
1724 field.metadata.insert(k.to_string(), v.to_string());
1725 }
1726 }
1727 Ok(field)
1728 }
1729 }
1730 }
1731
1732 fn resolve_type<'s>(
1733 &mut self,
1734 writer_schema: &'s Schema<'a>,
1735 reader_schema: &'s Schema<'a>,
1736 namespace: Option<&'a str>,
1737 ) -> Result<AvroDataType, ArrowError> {
1738 if let (Some(write_primitive), Some(read_primitive)) =
1739 (primitive_of(writer_schema), primitive_of(reader_schema))
1740 {
1741 return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1742 }
1743 match (writer_schema, reader_schema) {
1744 (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1745 let writer_variants = writer_variants.as_slice();
1746 let reader_variants = reader_variants.as_slice();
1747 match (
1748 nullable_union_variants(writer_variants),
1749 nullable_union_variants(reader_variants),
1750 ) {
1751 (Some((w_nb, w_nonnull)), Some((r_nb, r_nonnull))) => {
1752 let mut dt = self.resolve_type(w_nonnull, r_nonnull, namespace)?;
1753 let mut writer_to_reader = vec![None, None];
1754 writer_to_reader[w_nb.non_null_index()] = Some((
1755 r_nb.non_null_index(),
1756 dt.resolution
1757 .take()
1758 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct)),
1759 ));
1760 dt.nullability = Some(w_nb);
1761 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1762 writer_to_reader: Arc::from(writer_to_reader),
1763 writer_is_union: true,
1764 reader_is_union: true,
1765 }));
1766 #[cfg(feature = "avro_custom_types")]
1767 Self::propagate_nullability_into_ree(&mut dt, w_nb);
1768 Ok(dt)
1769 }
1770 _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1771 }
1772 }
1773 (Schema::Union(writer_variants), reader_non_union) => {
1774 let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1775 .iter()
1776 .map(|writer| {
1777 self.resolve_type(writer, reader_non_union, namespace)
1778 .ok()
1779 .map(|tmp| {
1780 let resolution = tmp
1781 .resolution
1782 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1783 (0usize, resolution)
1784 })
1785 })
1786 .collect();
1787 let mut dt = self.parse_type(reader_non_union, namespace)?;
1788 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1789 writer_to_reader: Arc::from(writer_to_reader),
1790 writer_is_union: true,
1791 reader_is_union: false,
1792 }));
1793 Ok(dt)
1794 }
1795 (writer_non_union, Schema::Union(reader_variants)) => {
1796 if let Some((nullability, non_null_branch)) =
1797 nullable_union_variants(reader_variants)
1798 {
1799 let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1800 #[cfg(feature = "avro_custom_types")]
1801 Self::propagate_nullability_into_ree(&mut dt, nullability);
1802 dt.nullability = Some(nullability);
1803 if dt.resolution.is_none() {
1806 dt.resolution = Some(ResolutionInfo::Promotion(Promotion::Direct));
1807 }
1808 Ok(dt)
1809 } else {
1810 let Some((match_idx, mut match_dt)) =
1811 self.find_best_union_match(writer_non_union, reader_variants, namespace)
1812 else {
1813 return Err(ArrowError::SchemaError(
1814 "Writer schema does not match any reader union branch".to_string(),
1815 ));
1816 };
1817 let resolution = match_dt
1823 .resolution
1824 .take()
1825 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1826 let mut match_dt = Some(match_dt);
1827 let children = reader_variants
1828 .iter()
1829 .enumerate()
1830 .map(|(idx, variant)| {
1831 if idx == match_idx {
1832 Ok(match_dt.take().unwrap())
1833 } else {
1834 self.parse_type(variant, namespace)
1835 }
1836 })
1837 .collect::<Result<Vec<_>, _>>()?;
1838 let union_fields = build_union_fields(&children)?;
1839 let mut dt = AvroDataType::new(
1840 Codec::Union(children.into(), union_fields, UnionMode::Dense),
1841 Default::default(),
1842 None,
1843 );
1844 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1845 writer_to_reader: Arc::from(vec![Some((match_idx, resolution))]),
1846 writer_is_union: false,
1847 reader_is_union: true,
1848 }));
1849 Ok(dt)
1850 }
1851 }
1852 (
1853 Schema::Complex(ComplexType::Array(writer_array)),
1854 Schema::Complex(ComplexType::Array(reader_array)),
1855 ) => self.resolve_array(writer_array, reader_array, namespace),
1856 (
1857 Schema::Complex(ComplexType::Map(writer_map)),
1858 Schema::Complex(ComplexType::Map(reader_map)),
1859 ) => self.resolve_map(writer_map, reader_map, namespace),
1860 (
1861 Schema::Complex(ComplexType::Fixed(writer_fixed)),
1862 Schema::Complex(ComplexType::Fixed(reader_fixed)),
1863 ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1864 (
1865 Schema::Complex(ComplexType::Record(writer_record)),
1866 Schema::Complex(ComplexType::Record(reader_record)),
1867 ) => self.resolve_records(writer_record, reader_record, namespace),
1868 (
1869 Schema::Complex(ComplexType::Enum(writer_enum)),
1870 Schema::Complex(ComplexType::Enum(reader_enum)),
1871 ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1872 (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1873 (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1874 _ => Err(ArrowError::NotYetImplemented(
1875 "Other resolutions not yet implemented".to_string(),
1876 )),
1877 }
1878 }
1879
1880 fn find_best_union_match(
1881 &mut self,
1882 writer: &Schema<'a>,
1883 reader_variants: &[Schema<'a>],
1884 namespace: Option<&'a str>,
1885 ) -> Option<(usize, AvroDataType)> {
1886 let mut first_resolution = None;
1887 for (reader_index, reader) in reader_variants.iter().enumerate() {
1888 if let Ok(dt) = self.resolve_type(writer, reader, namespace) {
1889 match &dt.resolution {
1890 None | Some(ResolutionInfo::Promotion(Promotion::Direct)) => {
1891 return Some((reader_index, dt));
1893 }
1894 Some(_) => {
1895 if first_resolution.is_none() {
1896 first_resolution = Some((reader_index, dt));
1898 }
1899 }
1900 };
1901 }
1902 }
1903 first_resolution
1904 }
1905
1906 fn resolve_unions<'s>(
1907 &mut self,
1908 writer_variants: &'s [Schema<'a>],
1909 reader_variants: &'s [Schema<'a>],
1910 namespace: Option<&'a str>,
1911 ) -> Result<AvroDataType, ArrowError> {
1912 let mut resolved_reader_encodings = HashMap::new();
1913 let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1914 .iter()
1915 .map(|writer| {
1916 self.find_best_union_match(writer, reader_variants, namespace)
1917 .map(|(match_idx, mut match_dt)| {
1918 let resolution = match_dt
1919 .resolution
1920 .take()
1921 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1922 resolved_reader_encodings.insert(match_idx, match_dt);
1925 (match_idx, resolution)
1926 })
1927 })
1928 .collect();
1929 let reader_encodings: Vec<AvroDataType> = reader_variants
1930 .iter()
1931 .enumerate()
1932 .map(|(reader_idx, reader_schema)| {
1933 if let Some(resolved) = resolved_reader_encodings.remove(&reader_idx) {
1934 Ok(resolved)
1935 } else {
1936 self.parse_type(reader_schema, namespace)
1937 }
1938 })
1939 .collect::<Result<_, _>>()?;
1940 let union_fields = build_union_fields(&reader_encodings)?;
1941 let mut dt = AvroDataType::new(
1942 Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1943 Default::default(),
1944 None,
1945 );
1946 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1947 writer_to_reader: Arc::from(writer_to_reader),
1948 writer_is_union: true,
1949 reader_is_union: true,
1950 }));
1951 Ok(dt)
1952 }
1953
1954 fn resolve_array(
1955 &mut self,
1956 writer_array: &Array<'a>,
1957 reader_array: &Array<'a>,
1958 namespace: Option<&'a str>,
1959 ) -> Result<AvroDataType, ArrowError> {
1960 Ok(AvroDataType {
1961 nullability: None,
1962 metadata: reader_array.attributes.field_metadata(),
1963 codec: Codec::List(Arc::new(self.make_data_type(
1964 writer_array.items.as_ref(),
1965 Some(reader_array.items.as_ref()),
1966 namespace,
1967 )?)),
1968 resolution: None,
1969 })
1970 }
1971
1972 fn resolve_map(
1973 &mut self,
1974 writer_map: &Map<'a>,
1975 reader_map: &Map<'a>,
1976 namespace: Option<&'a str>,
1977 ) -> Result<AvroDataType, ArrowError> {
1978 Ok(AvroDataType {
1979 nullability: None,
1980 metadata: reader_map.attributes.field_metadata(),
1981 codec: Codec::Map(Arc::new(self.make_data_type(
1982 &writer_map.values,
1983 Some(&reader_map.values),
1984 namespace,
1985 )?)),
1986 resolution: None,
1987 })
1988 }
1989
1990 fn resolve_fixed<'s>(
1991 &mut self,
1992 writer_fixed: &Fixed<'a>,
1993 reader_fixed: &Fixed<'a>,
1994 reader_schema: &'s Schema<'a>,
1995 namespace: Option<&'a str>,
1996 ) -> Result<AvroDataType, ArrowError> {
1997 ensure_names_match(
1998 "Fixed",
1999 writer_fixed.name,
2000 writer_fixed.namespace,
2001 &writer_fixed.aliases,
2002 reader_fixed.name,
2003 reader_fixed.namespace,
2004 &reader_fixed.aliases,
2005 )?;
2006 if writer_fixed.size != reader_fixed.size {
2007 return Err(ArrowError::SchemaError(format!(
2008 "Fixed size mismatch for {}: writer={}, reader={}",
2009 reader_fixed.name, writer_fixed.size, reader_fixed.size
2010 )));
2011 }
2012 self.parse_type(reader_schema, namespace)
2013 }
2014
2015 fn resolve_primitives(
2016 &mut self,
2017 write_primitive: PrimitiveType,
2018 read_primitive: PrimitiveType,
2019 reader_schema: &Schema<'a>,
2020 ) -> Result<AvroDataType, ArrowError> {
2021 if write_primitive == read_primitive {
2022 return self.parse_type(reader_schema, None);
2023 }
2024 let promotion = match (write_primitive, read_primitive) {
2025 (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
2026 (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
2027 (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
2028 (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
2029 (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
2030 (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
2031 (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
2032 (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
2033 _ => {
2034 return Err(ArrowError::ParseError(format!(
2035 "Illegal promotion {write_primitive:?} to {read_primitive:?}"
2036 )));
2037 }
2038 };
2039 let mut datatype = self.parse_type(reader_schema, None)?;
2040 datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
2041 Ok(datatype)
2042 }
2043
2044 fn resolve_enums(
2100 &mut self,
2101 writer_enum: &Enum<'a>,
2102 reader_enum: &Enum<'a>,
2103 reader_schema: &Schema<'a>,
2104 namespace: Option<&'a str>,
2105 ) -> Result<AvroDataType, ArrowError> {
2106 ensure_names_match(
2107 "Enum",
2108 writer_enum.name,
2109 writer_enum.namespace,
2110 &writer_enum.aliases,
2111 reader_enum.name,
2112 reader_enum.namespace,
2113 &reader_enum.aliases,
2114 )?;
2115 if writer_enum.symbols == reader_enum.symbols {
2116 return self.parse_type(reader_schema, namespace);
2117 }
2118 let reader_index: HashMap<&str, i32> = reader_enum
2119 .symbols
2120 .iter()
2121 .enumerate()
2122 .map(|(index, &symbol)| (symbol, index as i32))
2123 .collect();
2124 let default_index: i32 = match reader_enum.default {
2125 Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
2126 ArrowError::SchemaError(format!(
2127 "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
2128 reader_enum.name,
2129 ))
2130 })?,
2131 None => -1,
2132 };
2133 let mapping: Vec<i32> = writer_enum
2134 .symbols
2135 .iter()
2136 .map(|&write_symbol| {
2137 reader_index
2138 .get(write_symbol)
2139 .copied()
2140 .unwrap_or(default_index)
2141 })
2142 .collect();
2143 if self.strict_mode && mapping.iter().any(|&m| m < 0) {
2144 return Err(ArrowError::SchemaError(format!(
2145 "Reader enum '{}' does not cover all writer symbols and no default is provided",
2146 reader_enum.name
2147 )));
2148 }
2149 let mut dt = self.parse_type(reader_schema, namespace)?;
2150 dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
2151 mapping: Arc::from(mapping),
2152 default_index,
2153 }));
2154 let reader_ns = reader_enum.namespace.or(namespace);
2155 self.resolver
2156 .register(reader_enum.name, reader_ns, dt.clone());
2157 Ok(dt)
2158 }
2159
2160 #[inline]
2161 fn build_writer_lookup(
2162 writer_record: &Record<'a>,
2163 ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
2164 let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
2165 for (idx, wf) in writer_record.fields.iter().enumerate() {
2166 map.insert(wf.name, idx);
2168 }
2169 let mut ambiguous: HashSet<&str> = HashSet::new();
2171 for (idx, wf) in writer_record.fields.iter().enumerate() {
2172 for &alias in &wf.aliases {
2173 match map.entry(alias) {
2174 Entry::Occupied(e) if *e.get() != idx => {
2175 ambiguous.insert(alias);
2176 }
2177 Entry::Vacant(e) => {
2178 e.insert(idx);
2179 }
2180 _ => {}
2181 }
2182 }
2183 }
2184 (map, ambiguous)
2185 }
2186
2187 fn resolve_records(
2188 &mut self,
2189 writer_record: &Record<'a>,
2190 reader_record: &Record<'a>,
2191 namespace: Option<&'a str>,
2192 ) -> Result<AvroDataType, ArrowError> {
2193 ensure_names_match(
2194 "Record",
2195 writer_record.name,
2196 writer_record.namespace,
2197 &writer_record.aliases,
2198 reader_record.name,
2199 reader_record.namespace,
2200 &reader_record.aliases,
2201 )?;
2202 let writer_ns = writer_record.namespace.or(namespace);
2203 let reader_ns = reader_record.namespace.or(namespace);
2204 let mut reader_md = reader_record.attributes.field_metadata();
2205 reader_md.insert(
2206 AVRO_NAME_METADATA_KEY.to_string(),
2207 reader_record.name.to_string(),
2208 );
2209 if let Some(ns) = reader_ns {
2210 reader_md.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
2211 }
2212 let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
2214 let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
2215 let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
2216 let mut default_fields: Vec<usize> = Vec::new();
2218 for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
2219 let mut match_idx = writer_lookup.get(r_field.name).copied();
2221 let mut matched_via_alias: Option<&str> = None;
2222 if match_idx.is_none() {
2223 for &alias in &r_field.aliases {
2224 if let Some(i) = writer_lookup.get(alias).copied() {
2225 if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
2226 return Err(ArrowError::SchemaError(format!(
2227 "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
2228 r_field.name
2229 )));
2230 }
2231 match_idx = Some(i);
2232 matched_via_alias = Some(alias);
2233 break;
2234 }
2235 }
2236 }
2237 if let Some(wi) = match_idx {
2238 if writer_to_reader[wi].is_none() {
2239 let w_schema = &writer_record.fields[wi].r#type;
2240 let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
2241 writer_to_reader[wi] = Some(reader_idx);
2242 reader_fields.push(AvroField {
2243 name: r_field.name.to_owned(),
2244 data_type: dt,
2245 });
2246 continue;
2247 } else if self.strict_mode {
2248 let existing_reader = writer_to_reader[wi].unwrap();
2250 let via = matched_via_alias
2251 .map(|a| format!("alias '{a}'"))
2252 .unwrap_or_else(|| "name match".to_string());
2253 return Err(ArrowError::SchemaError(format!(
2254 "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
2255 writer_record.fields[wi].name
2256 )));
2257 }
2258 }
2260 let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
2262 if let Some(default_json) = r_field.default.as_ref() {
2263 dt.resolution = Some(ResolutionInfo::DefaultValue(
2264 dt.parse_and_store_default(default_json)?,
2265 ));
2266 default_fields.push(reader_idx);
2267 } else if dt.nullability() == Some(Nullability::NullFirst) {
2268 dt.resolution = Some(ResolutionInfo::DefaultValue(
2270 dt.parse_and_store_default(&Value::Null)?,
2271 ));
2272 default_fields.push(reader_idx);
2273 } else {
2274 return Err(ArrowError::SchemaError(format!(
2275 "Reader field '{}' not present in writer schema must have a default value",
2276 r_field.name
2277 )));
2278 }
2279 reader_fields.push(AvroField {
2280 name: r_field.name.to_owned(),
2281 data_type: dt,
2282 });
2283 }
2284 let mut skip_fields: Vec<Option<AvroDataType>> =
2286 Vec::with_capacity(writer_record.fields.len());
2287 for (writer_index, writer_field) in writer_record.fields.iter().enumerate() {
2288 if writer_to_reader[writer_index].is_some() {
2289 skip_fields.push(None);
2290 } else {
2291 skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?));
2292 }
2293 }
2294 let resolved = AvroDataType::new_with_resolution(
2295 Codec::Struct(Arc::from(reader_fields)),
2296 reader_md,
2297 None,
2298 Some(ResolutionInfo::Record(ResolvedRecord {
2299 writer_to_reader: Arc::from(writer_to_reader),
2300 default_fields: Arc::from(default_fields),
2301 skip_fields: Arc::from(skip_fields),
2302 })),
2303 );
2304 self.resolver
2306 .register(reader_record.name, reader_ns, resolved.clone());
2307 Ok(resolved)
2308 }
2309}
2310
2311#[cfg(test)]
2312mod tests {
2313 use super::*;
2314 use crate::schema::{
2315 AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2316 Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2317 };
2318 use indexmap::IndexMap;
2319 use serde_json::{self, Value};
2320
2321 fn create_schema_with_logical_type(
2322 primitive_type: PrimitiveType,
2323 logical_type: &'static str,
2324 ) -> Schema<'static> {
2325 let attributes = Attributes {
2326 logical_type: Some(logical_type),
2327 additional: Default::default(),
2328 };
2329
2330 Schema::Type(Type {
2331 r#type: TypeName::Primitive(primitive_type),
2332 attributes,
2333 })
2334 }
2335
2336 fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2337 let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2338 let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2339 let mut maker = Maker::new(false, false);
2340 maker
2341 .make_data_type(&writer_schema, Some(&reader_schema), None)
2342 .expect("promotion should resolve")
2343 }
2344
2345 fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2346 Schema::TypeName(TypeName::Primitive(pt))
2347 }
2348 fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2349 Schema::Union(branches)
2350 }
2351
2352 #[test]
2353 fn test_date_logical_type() {
2354 let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2355
2356 let mut maker = Maker::new(false, false);
2357 let result = maker.make_data_type(&schema, None, None).unwrap();
2358
2359 assert!(matches!(result.codec, Codec::Date32));
2360 }
2361
2362 #[test]
2363 fn test_time_millis_logical_type() {
2364 let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2365
2366 let mut maker = Maker::new(false, false);
2367 let result = maker.make_data_type(&schema, None, None).unwrap();
2368
2369 assert!(matches!(result.codec, Codec::TimeMillis));
2370 }
2371
2372 #[test]
2373 fn test_time_micros_logical_type() {
2374 let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2375
2376 let mut maker = Maker::new(false, false);
2377 let result = maker.make_data_type(&schema, None, None).unwrap();
2378
2379 assert!(matches!(result.codec, Codec::TimeMicros));
2380 }
2381
2382 #[test]
2383 fn test_timestamp_millis_logical_type() {
2384 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2385
2386 let mut maker = Maker::new(false, false);
2387 let result = maker.make_data_type(&schema, None, None).unwrap();
2388
2389 assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2390 }
2391
2392 #[test]
2393 fn test_timestamp_micros_logical_type() {
2394 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2395
2396 let mut maker = Maker::new(false, false);
2397 let result = maker.make_data_type(&schema, None, None).unwrap();
2398
2399 assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2400 }
2401
2402 #[test]
2403 fn test_local_timestamp_millis_logical_type() {
2404 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2405
2406 let mut maker = Maker::new(false, false);
2407 let result = maker.make_data_type(&schema, None, None).unwrap();
2408
2409 assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2410 }
2411
2412 #[test]
2413 fn test_local_timestamp_micros_logical_type() {
2414 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2415
2416 let mut maker = Maker::new(false, false);
2417 let result = maker.make_data_type(&schema, None, None).unwrap();
2418
2419 assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2420 }
2421
2422 #[test]
2423 fn test_uuid_type() {
2424 let mut codec = Codec::Fixed(16);
2425 if let c @ Codec::Fixed(16) = &mut codec {
2426 *c = Codec::Uuid;
2427 }
2428 assert!(matches!(codec, Codec::Uuid));
2429 }
2430
2431 #[test]
2432 fn test_duration_logical_type() {
2433 let mut codec = Codec::Fixed(12);
2434
2435 if let c @ Codec::Fixed(12) = &mut codec {
2436 *c = Codec::Interval;
2437 }
2438
2439 assert!(matches!(codec, Codec::Interval));
2440 }
2441
2442 #[test]
2443 fn test_decimal_logical_type_not_implemented() {
2444 let codec = Codec::Fixed(16);
2445
2446 let process_decimal = || -> Result<(), ArrowError> {
2447 if let Codec::Fixed(_) = codec {
2448 return Err(ArrowError::NotYetImplemented(
2449 "Decimals are not currently supported".to_string(),
2450 ));
2451 }
2452 Ok(())
2453 };
2454
2455 let result = process_decimal();
2456
2457 assert!(result.is_err());
2458 if let Err(ArrowError::NotYetImplemented(msg)) = result {
2459 assert!(msg.contains("Decimals are not currently supported"));
2460 } else {
2461 panic!("Expected NotYetImplemented error");
2462 }
2463 }
2464 #[test]
2465 fn test_unknown_logical_type_added_to_metadata() {
2466 let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2467
2468 let mut maker = Maker::new(false, false);
2469 let result = maker.make_data_type(&schema, None, None).unwrap();
2470
2471 assert_eq!(
2472 result.metadata.get("logicalType"),
2473 Some(&"custom-type".to_string())
2474 );
2475 }
2476
2477 #[test]
2478 fn test_string_with_utf8view_enabled() {
2479 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2480
2481 let mut maker = Maker::new(true, false);
2482 let result = maker.make_data_type(&schema, None, None).unwrap();
2483
2484 assert!(matches!(result.codec, Codec::Utf8View));
2485 }
2486
2487 #[test]
2488 fn test_string_without_utf8view_enabled() {
2489 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2490
2491 let mut maker = Maker::new(false, false);
2492 let result = maker.make_data_type(&schema, None, None).unwrap();
2493
2494 assert!(matches!(result.codec, Codec::Utf8));
2495 }
2496
2497 #[test]
2498 fn test_record_with_string_and_utf8view_enabled() {
2499 let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2500
2501 let avro_field = crate::schema::Field {
2502 name: "string_field",
2503 r#type: field_schema,
2504 default: None,
2505 doc: None,
2506 aliases: vec![],
2507 };
2508
2509 let record = Record {
2510 name: "test_record",
2511 namespace: None,
2512 aliases: vec![],
2513 doc: None,
2514 fields: vec![avro_field],
2515 attributes: Attributes::default(),
2516 };
2517
2518 let schema = Schema::Complex(ComplexType::Record(record));
2519
2520 let mut maker = Maker::new(true, false);
2521 let result = maker.make_data_type(&schema, None, None).unwrap();
2522
2523 if let Codec::Struct(fields) = &result.codec {
2524 let first_field_codec = &fields[0].data_type().codec;
2525 assert!(matches!(first_field_codec, Codec::Utf8View));
2526 } else {
2527 panic!("Expected Struct codec");
2528 }
2529 }
2530
2531 #[test]
2532 fn test_union_with_strict_mode() {
2533 let schema = Schema::Union(vec![
2534 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2535 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2536 ]);
2537
2538 let mut maker = Maker::new(false, true);
2539 let result = maker.make_data_type(&schema, None, None);
2540
2541 assert!(result.is_err());
2542 match result {
2543 Err(ArrowError::SchemaError(msg)) => {
2544 assert!(msg.contains(
2545 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2546 ));
2547 }
2548 _ => panic!("Expected SchemaError"),
2549 }
2550 }
2551
2552 #[test]
2553 fn test_resolve_int_to_float_promotion() {
2554 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2555 assert!(matches!(result.codec, Codec::Float32));
2556 assert_eq!(
2557 result.resolution,
2558 Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2559 );
2560 }
2561
2562 #[test]
2563 fn test_resolve_int_to_double_promotion() {
2564 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2565 assert!(matches!(result.codec, Codec::Float64));
2566 assert_eq!(
2567 result.resolution,
2568 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2569 );
2570 }
2571
2572 #[test]
2573 fn test_resolve_long_to_float_promotion() {
2574 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2575 assert!(matches!(result.codec, Codec::Float32));
2576 assert_eq!(
2577 result.resolution,
2578 Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2579 );
2580 }
2581
2582 #[test]
2583 fn test_resolve_long_to_double_promotion() {
2584 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2585 assert!(matches!(result.codec, Codec::Float64));
2586 assert_eq!(
2587 result.resolution,
2588 Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2589 );
2590 }
2591
2592 #[test]
2593 fn test_resolve_float_to_double_promotion() {
2594 let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2595 assert!(matches!(result.codec, Codec::Float64));
2596 assert_eq!(
2597 result.resolution,
2598 Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2599 );
2600 }
2601
2602 #[test]
2603 fn test_resolve_string_to_bytes_promotion() {
2604 let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2605 assert!(matches!(result.codec, Codec::Binary));
2606 assert_eq!(
2607 result.resolution,
2608 Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2609 );
2610 }
2611
2612 #[test]
2613 fn test_resolve_bytes_to_string_promotion() {
2614 let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2615 assert!(matches!(result.codec, Codec::Utf8));
2616 assert_eq!(
2617 result.resolution,
2618 Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2619 );
2620 }
2621
2622 #[test]
2623 fn test_resolve_illegal_promotion_double_to_float_errors() {
2624 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2625 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2626 let mut maker = Maker::new(false, false);
2627 let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2628 assert!(result.is_err());
2629 match result {
2630 Err(ArrowError::ParseError(msg)) => {
2631 assert!(msg.contains("Illegal promotion"));
2632 }
2633 _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2634 }
2635 }
2636
2637 #[test]
2638 fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2639 let writer = Schema::Union(vec![
2640 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2641 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2642 ]);
2643 let reader = Schema::Union(vec![
2644 Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2645 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2646 ]);
2647 let mut maker = Maker::new(false, false);
2648 let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2649 assert!(matches!(result.codec, Codec::Float64));
2650 assert_eq!(
2651 result.resolution,
2652 Some(ResolutionInfo::Union(ResolvedUnion {
2653 writer_to_reader: [
2654 None,
2655 Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2656 ]
2657 .into(),
2658 writer_is_union: true,
2659 reader_is_union: true,
2660 }))
2661 );
2662 assert_eq!(result.nullability, Some(Nullability::NullFirst));
2663 }
2664
2665 #[test]
2666 fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2667 let writer = mk_union(vec![
2668 mk_primitive(PrimitiveType::String),
2669 mk_primitive(PrimitiveType::Long),
2670 ]);
2671 let reader = mk_primitive(PrimitiveType::Bytes);
2672 let mut maker = Maker::new(false, false);
2673 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2674 assert!(matches!(dt.codec(), Codec::Binary));
2675 let resolved = match dt.resolution {
2676 Some(ResolutionInfo::Union(u)) => u,
2677 other => panic!("expected union resolution info, got {other:?}"),
2678 };
2679 assert!(resolved.writer_is_union && !resolved.reader_is_union);
2680 assert_eq!(
2681 resolved.writer_to_reader.as_ref(),
2682 &[
2683 Some((0, ResolutionInfo::Promotion(Promotion::StringToBytes))),
2684 None
2685 ]
2686 );
2687 }
2688
2689 #[test]
2690 fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2691 let writer = mk_primitive(PrimitiveType::Long);
2692 let reader = mk_union(vec![
2693 mk_primitive(PrimitiveType::Long),
2694 mk_primitive(PrimitiveType::Double),
2695 ]);
2696 let mut maker = Maker::new(false, false);
2697 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2698 let resolved = match dt.resolution {
2699 Some(ResolutionInfo::Union(u)) => u,
2700 other => panic!("expected union resolution info, got {other:?}"),
2701 };
2702 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2703 assert_eq!(
2704 resolved.writer_to_reader.as_ref(),
2705 &[Some((0, ResolutionInfo::Promotion(Promotion::Direct)))]
2706 );
2707 }
2708
2709 #[test]
2710 fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2711 let writer = mk_primitive(PrimitiveType::Int);
2712 let reader = mk_union(vec![
2713 mk_primitive(PrimitiveType::Null),
2714 mk_primitive(PrimitiveType::Long),
2715 mk_primitive(PrimitiveType::String),
2716 ]);
2717 let mut maker = Maker::new(false, false);
2718 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2719 let resolved = match dt.resolution {
2720 Some(ResolutionInfo::Union(u)) => u,
2721 other => panic!("expected union resolution info, got {other:?}"),
2722 };
2723 assert_eq!(
2724 resolved.writer_to_reader.as_ref(),
2725 &[Some((1, ResolutionInfo::Promotion(Promotion::IntToLong)))]
2726 );
2727 }
2728
2729 #[test]
2730 fn test_resolve_writer_non_union_to_reader_union_preserves_inner_record_defaults() {
2731 let writer = Schema::Complex(ComplexType::Record(Record {
2735 name: "Inner",
2736 namespace: None,
2737 doc: None,
2738 aliases: vec![],
2739 fields: vec![AvroFieldSchema {
2740 name: "a",
2741 doc: None,
2742 r#type: mk_primitive(PrimitiveType::Int),
2743 default: None,
2744 aliases: vec![],
2745 }],
2746 attributes: Attributes::default(),
2747 }));
2748 let reader = mk_union(vec![
2749 Schema::Complex(ComplexType::Record(Record {
2750 name: "Inner",
2751 namespace: None,
2752 doc: None,
2753 aliases: vec![],
2754 fields: vec![
2755 AvroFieldSchema {
2756 name: "a",
2757 doc: None,
2758 r#type: mk_primitive(PrimitiveType::Int),
2759 default: None,
2760 aliases: vec![],
2761 },
2762 AvroFieldSchema {
2763 name: "b",
2764 doc: None,
2765 r#type: mk_primitive(PrimitiveType::Int),
2766 default: Some(Value::Number(serde_json::Number::from(42))),
2767 aliases: vec![],
2768 },
2769 ],
2770 attributes: Attributes::default(),
2771 })),
2772 mk_primitive(PrimitiveType::String),
2773 ]);
2774 let mut maker = Maker::new(false, false);
2775 let dt = maker
2776 .make_data_type(&writer, Some(&reader), None)
2777 .expect("resolution should succeed");
2778 let resolved = match dt.resolution.as_ref() {
2780 Some(ResolutionInfo::Union(u)) => u,
2781 other => panic!("expected union resolution info, got {other:?}"),
2782 };
2783 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2784 assert_eq!(
2785 resolved.writer_to_reader.len(),
2786 1,
2787 "expected the non-union record to resolve to a union variant"
2788 );
2789 let resolution = match resolved.writer_to_reader.first().unwrap() {
2790 Some((0, resolution)) => resolution,
2791 other => panic!("unexpected writer-to-reader table value {other:?}"),
2792 };
2793 match resolution {
2794 ResolutionInfo::Record(ResolvedRecord {
2795 writer_to_reader,
2796 default_fields,
2797 skip_fields,
2798 }) => {
2799 assert_eq!(writer_to_reader.len(), 1);
2800 assert_eq!(writer_to_reader[0], Some(0));
2801 assert_eq!(default_fields.len(), 1);
2802 assert_eq!(default_fields[0], 1);
2803 assert_eq!(skip_fields.len(), 1);
2804 assert_eq!(skip_fields[0], None);
2805 }
2806 other => panic!("unexpected resolution {other:?}"),
2807 }
2808 let children = match dt.codec() {
2810 Codec::Union(children, _, _) => children,
2811 other => panic!("expected union codec, got {other:?}"),
2812 };
2813 let inner_fields = match children[0].codec() {
2814 Codec::Struct(f) => f,
2815 other => panic!("expected struct codec for Inner, got {other:?}"),
2816 };
2817 assert_eq!(inner_fields.len(), 2);
2818 assert_eq!(inner_fields[1].name(), "b");
2819 assert_eq!(
2820 inner_fields[1].data_type().resolution,
2821 Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2822 "field b should have DefaultValue(Int(42)) from schema resolution"
2823 );
2824 }
2825
2826 #[test]
2827 fn test_resolve_writer_union_to_reader_union_preserves_inner_record_defaults() {
2828 let writer = mk_union(vec![
2832 mk_primitive(PrimitiveType::String),
2833 Schema::Complex(ComplexType::Record(Record {
2834 name: "Inner",
2835 namespace: None,
2836 doc: None,
2837 aliases: vec![],
2838 fields: vec![AvroFieldSchema {
2839 name: "a",
2840 doc: None,
2841 r#type: mk_primitive(PrimitiveType::Int),
2842 default: None,
2843 aliases: vec![],
2844 }],
2845 attributes: Attributes::default(),
2846 })),
2847 ]);
2848 let reader = mk_union(vec![
2849 Schema::Complex(ComplexType::Record(Record {
2850 name: "Inner",
2851 namespace: None,
2852 doc: None,
2853 aliases: vec![],
2854 fields: vec![
2855 AvroFieldSchema {
2856 name: "a",
2857 doc: None,
2858 r#type: mk_primitive(PrimitiveType::Int),
2859 default: None,
2860 aliases: vec![],
2861 },
2862 AvroFieldSchema {
2863 name: "b",
2864 doc: None,
2865 r#type: mk_primitive(PrimitiveType::Int),
2866 default: Some(Value::Number(serde_json::Number::from(42))),
2867 aliases: vec![],
2868 },
2869 ],
2870 attributes: Attributes::default(),
2871 })),
2872 mk_primitive(PrimitiveType::String),
2873 ]);
2874 let mut maker = Maker::new(false, false);
2875 let dt = maker
2876 .make_data_type(&writer, Some(&reader), None)
2877 .expect("resolution should succeed");
2878 let resolved = match dt.resolution.as_ref() {
2880 Some(ResolutionInfo::Union(u)) => u,
2881 other => panic!("expected union resolution info, got {other:?}"),
2882 };
2883 assert!(resolved.writer_is_union && resolved.reader_is_union);
2884 assert_eq!(resolved.writer_to_reader.len(), 2);
2885 let resolution = match resolved.writer_to_reader[1].as_ref() {
2886 Some((0, resolution)) => resolution,
2887 other => panic!("unexpected writer-to-reader table value {other:?}"),
2888 };
2889 match resolution {
2890 ResolutionInfo::Record(ResolvedRecord {
2891 writer_to_reader,
2892 default_fields,
2893 skip_fields,
2894 }) => {
2895 assert_eq!(writer_to_reader.len(), 1);
2896 assert_eq!(writer_to_reader[0], Some(0));
2897 assert_eq!(default_fields.len(), 1);
2898 assert_eq!(default_fields[0], 1);
2899 assert_eq!(skip_fields.len(), 1);
2900 assert_eq!(skip_fields[0], None);
2901 }
2902 other => panic!("unexpected resolution {other:?}"),
2903 }
2904 let children = match dt.codec() {
2906 Codec::Union(children, _, _) => children,
2907 other => panic!("expected union codec, got {other:?}"),
2908 };
2909 let inner_fields = match children[0].codec() {
2910 Codec::Struct(f) => f,
2911 other => panic!("expected struct codec for Inner, got {other:?}"),
2912 };
2913 assert_eq!(inner_fields.len(), 2);
2914 assert_eq!(inner_fields[1].name(), "b");
2915 assert_eq!(
2916 inner_fields[1].data_type().resolution,
2917 Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2918 "field b should have DefaultValue(Int(42)) from schema resolution"
2919 );
2920 }
2921
2922 #[test]
2923 fn test_resolve_both_nullable_unions_direct_match() {
2924 let writer = mk_union(vec![
2925 mk_primitive(PrimitiveType::Null),
2926 mk_primitive(PrimitiveType::String),
2927 ]);
2928 let reader = mk_union(vec![
2929 mk_primitive(PrimitiveType::String),
2930 mk_primitive(PrimitiveType::Null),
2931 ]);
2932 let mut maker = Maker::new(false, false);
2933 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2934 assert!(matches!(dt.codec(), Codec::Utf8));
2935 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2936 assert_eq!(
2937 dt.resolution,
2938 Some(ResolutionInfo::Union(ResolvedUnion {
2939 writer_to_reader: [
2940 None,
2941 Some((0, ResolutionInfo::Promotion(Promotion::Direct)))
2942 ]
2943 .into(),
2944 writer_is_union: true,
2945 reader_is_union: true
2946 }))
2947 );
2948 }
2949
2950 #[test]
2951 fn test_resolve_both_nullable_unions_with_promotion() {
2952 let writer = mk_union(vec![
2953 mk_primitive(PrimitiveType::Null),
2954 mk_primitive(PrimitiveType::Int),
2955 ]);
2956 let reader = mk_union(vec![
2957 mk_primitive(PrimitiveType::Double),
2958 mk_primitive(PrimitiveType::Null),
2959 ]);
2960 let mut maker = Maker::new(false, false);
2961 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2962 assert!(matches!(dt.codec(), Codec::Float64));
2963 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2964 assert_eq!(
2965 dt.resolution,
2966 Some(ResolutionInfo::Union(ResolvedUnion {
2967 writer_to_reader: [
2968 None,
2969 Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2970 ]
2971 .into(),
2972 writer_is_union: true,
2973 reader_is_union: true
2974 }))
2975 );
2976 }
2977
2978 #[test]
2979 fn test_resolve_type_promotion() {
2980 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2981 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2982 let mut maker = Maker::new(false, false);
2983 let result = maker
2984 .make_data_type(&writer_schema, Some(&reader_schema), None)
2985 .unwrap();
2986 assert!(matches!(result.codec, Codec::Int64));
2987 assert_eq!(
2988 result.resolution,
2989 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2990 );
2991 }
2992
2993 #[test]
2994 fn test_nested_record_type_reuse_without_namespace() {
2995 let schema_str = r#"
2996 {
2997 "type": "record",
2998 "name": "Record",
2999 "fields": [
3000 {
3001 "name": "nested",
3002 "type": {
3003 "type": "record",
3004 "name": "Nested",
3005 "fields": [
3006 { "name": "nested_int", "type": "int" }
3007 ]
3008 }
3009 },
3010 { "name": "nestedRecord", "type": "Nested" },
3011 { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
3012 { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
3013 ]
3014 }
3015 "#;
3016
3017 let schema: Schema = serde_json::from_str(schema_str).unwrap();
3018
3019 let mut maker = Maker::new(false, false);
3020 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3021
3022 if let Codec::Struct(fields) = avro_data_type.codec() {
3023 assert_eq!(fields.len(), 4);
3024
3025 assert_eq!(fields[0].name(), "nested");
3027 let nested_data_type = fields[0].data_type();
3028 if let Codec::Struct(nested_fields) = nested_data_type.codec() {
3029 assert_eq!(nested_fields.len(), 1);
3030 assert_eq!(nested_fields[0].name(), "nested_int");
3031 assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
3032 } else {
3033 panic!(
3034 "'nested' field is not a struct but {:?}",
3035 nested_data_type.codec()
3036 );
3037 }
3038
3039 assert_eq!(fields[1].name(), "nestedRecord");
3041 let nested_record_data_type = fields[1].data_type();
3042 assert_eq!(
3043 nested_record_data_type.codec().data_type(),
3044 nested_data_type.codec().data_type()
3045 );
3046
3047 assert_eq!(fields[2].name(), "nestedArray");
3049 if let Codec::List(item_type) = fields[2].data_type().codec() {
3050 assert_eq!(
3051 item_type.codec().data_type(),
3052 nested_data_type.codec().data_type()
3053 );
3054 } else {
3055 panic!("'nestedArray' field is not a list");
3056 }
3057
3058 assert_eq!(fields[3].name(), "nestedMap");
3060 if let Codec::Map(value_type) = fields[3].data_type().codec() {
3061 assert_eq!(
3062 value_type.codec().data_type(),
3063 nested_data_type.codec().data_type()
3064 );
3065 } else {
3066 panic!("'nestedMap' field is not a map");
3067 }
3068 } else {
3069 panic!("Top-level schema is not a struct");
3070 }
3071 }
3072
3073 #[test]
3074 fn test_nested_enum_type_reuse_with_namespace() {
3075 let schema_str = r#"
3076 {
3077 "type": "record",
3078 "name": "Record",
3079 "namespace": "record_ns",
3080 "fields": [
3081 {
3082 "name": "status",
3083 "type": {
3084 "type": "enum",
3085 "name": "Status",
3086 "namespace": "enum_ns",
3087 "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
3088 }
3089 },
3090 { "name": "backupStatus", "type": "enum_ns.Status" },
3091 { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
3092 { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
3093 ]
3094 }
3095 "#;
3096
3097 let schema: Schema = serde_json::from_str(schema_str).unwrap();
3098
3099 let mut maker = Maker::new(false, false);
3100 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3101
3102 if let Codec::Struct(fields) = avro_data_type.codec() {
3103 assert_eq!(fields.len(), 4);
3104
3105 assert_eq!(fields[0].name(), "status");
3107 let status_data_type = fields[0].data_type();
3108 if let Codec::Enum(symbols) = status_data_type.codec() {
3109 assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
3110 } else {
3111 panic!(
3112 "'status' field is not an enum but {:?}",
3113 status_data_type.codec()
3114 );
3115 }
3116
3117 assert_eq!(fields[1].name(), "backupStatus");
3119 let backup_status_data_type = fields[1].data_type();
3120 assert_eq!(
3121 backup_status_data_type.codec().data_type(),
3122 status_data_type.codec().data_type()
3123 );
3124
3125 assert_eq!(fields[2].name(), "statusHistory");
3127 if let Codec::List(item_type) = fields[2].data_type().codec() {
3128 assert_eq!(
3129 item_type.codec().data_type(),
3130 status_data_type.codec().data_type()
3131 );
3132 } else {
3133 panic!("'statusHistory' field is not a list");
3134 }
3135
3136 assert_eq!(fields[3].name(), "statusMap");
3138 if let Codec::Map(value_type) = fields[3].data_type().codec() {
3139 assert_eq!(
3140 value_type.codec().data_type(),
3141 status_data_type.codec().data_type()
3142 );
3143 } else {
3144 panic!("'statusMap' field is not a map");
3145 }
3146 } else {
3147 panic!("Top-level schema is not a struct");
3148 }
3149 }
3150
3151 #[test]
3152 fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
3153 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3154 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3155 let mut maker = Maker::new(false, false);
3156 let data_type = maker
3157 .make_data_type(&writer_schema, Some(&reader_schema), None)
3158 .expect("resolution should succeed");
3159 let field = AvroField {
3160 name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
3161 data_type,
3162 };
3163 assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
3164 assert!(matches!(field.data_type().codec(), Codec::Utf8));
3165 }
3166
3167 fn json_string(s: &str) -> Value {
3168 Value::String(s.to_string())
3169 }
3170
3171 fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
3172 let stored = dt
3173 .metadata
3174 .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
3175 .cloned()
3176 .unwrap_or_default();
3177 let expected = serde_json::to_string(default_json).unwrap();
3178 assert_eq!(stored, expected, "stored default metadata should match");
3179 }
3180
3181 #[test]
3182 fn test_validate_and_store_default_null_and_nullability_rules() {
3183 let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
3184 let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
3185 assert_eq!(lit, AvroLiteral::Null);
3186 assert_default_stored(&dt_null, &Value::Null);
3187 let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3188 let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
3189 assert!(
3190 err.to_string()
3191 .contains("JSON null default is only valid for `null` type"),
3192 "unexpected error: {err}"
3193 );
3194 let mut dt_int_nf =
3195 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
3196 let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
3197 assert_eq!(lit2, AvroLiteral::Null);
3198 assert_default_stored(&dt_int_nf, &Value::Null);
3199 let mut dt_int_ns =
3200 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
3201 let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
3202 assert!(
3203 err2.to_string()
3204 .contains("JSON null default is only valid for `null` type"),
3205 "unexpected error: {err2}"
3206 );
3207 }
3208
3209 #[test]
3210 fn test_validate_and_store_default_primitives_and_temporal() {
3211 let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
3212 let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
3213 assert_eq!(lit, AvroLiteral::Boolean(true));
3214 assert_default_stored(&dt_bool, &Value::Bool(true));
3215 let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3216 let lit = dt_i32
3217 .parse_and_store_default(&serde_json::json!(123))
3218 .unwrap();
3219 assert_eq!(lit, AvroLiteral::Int(123));
3220 assert_default_stored(&dt_i32, &serde_json::json!(123));
3221 let err = dt_i32
3222 .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
3223 .unwrap_err();
3224 assert!(format!("{err}").contains("out of i32 range"));
3225 let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3226 let lit = dt_i64
3227 .parse_and_store_default(&serde_json::json!(1234567890))
3228 .unwrap();
3229 assert_eq!(lit, AvroLiteral::Long(1234567890));
3230 assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
3231 let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
3232 let lit = dt_f32
3233 .parse_and_store_default(&serde_json::json!(1.25))
3234 .unwrap();
3235 assert_eq!(lit, AvroLiteral::Float(1.25));
3236 assert_default_stored(&dt_f32, &serde_json::json!(1.25));
3237 let err = dt_f32
3238 .parse_and_store_default(&serde_json::json!(1e39))
3239 .unwrap_err();
3240 assert!(format!("{err}").contains("out of f32 range"));
3241 let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3242 let lit = dt_f64
3243 .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
3244 .unwrap();
3245 assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
3246 assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
3247 let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
3248 let l = dt_str
3249 .parse_and_store_default(&json_string("hello"))
3250 .unwrap();
3251 assert_eq!(l, AvroLiteral::String("hello".into()));
3252 assert_default_stored(&dt_str, &json_string("hello"));
3253 let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
3254 let l = dt_strv
3255 .parse_and_store_default(&json_string("view"))
3256 .unwrap();
3257 assert_eq!(l, AvroLiteral::String("view".into()));
3258 assert_default_stored(&dt_strv, &json_string("view"));
3259 let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
3260 let l = dt_uuid
3261 .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
3262 .unwrap();
3263 assert_eq!(
3264 l,
3265 AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
3266 );
3267 let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
3268 let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
3269 assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
3270 let err = dt_bin
3271 .parse_and_store_default(&json_string("€")) .unwrap_err();
3273 assert!(format!("{err}").contains("Invalid codepoint"));
3274 let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
3275 let ld = dt_date
3276 .parse_and_store_default(&serde_json::json!(1))
3277 .unwrap();
3278 assert_eq!(ld, AvroLiteral::Int(1));
3279 let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
3280 let lt = dt_tmill
3281 .parse_and_store_default(&serde_json::json!(86_400_000))
3282 .unwrap();
3283 assert_eq!(lt, AvroLiteral::Int(86_400_000));
3284 let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
3285 let ltm = dt_tmicros
3286 .parse_and_store_default(&serde_json::json!(1_000_000))
3287 .unwrap();
3288 assert_eq!(ltm, AvroLiteral::Long(1_000_000));
3289 let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
3290 let l1 = dt_ts_milli
3291 .parse_and_store_default(&serde_json::json!(123))
3292 .unwrap();
3293 assert_eq!(l1, AvroLiteral::Long(123));
3294 let mut dt_ts_micro =
3295 AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
3296 let l2 = dt_ts_micro
3297 .parse_and_store_default(&serde_json::json!(456))
3298 .unwrap();
3299 assert_eq!(l2, AvroLiteral::Long(456));
3300 }
3301
3302 #[cfg(feature = "avro_custom_types")]
3303 #[test]
3304 fn test_validate_and_store_default_custom_integer_ranges() {
3305 let mut dt_i8 = AvroDataType::new(Codec::Int8, HashMap::new(), None);
3306 let lit_i8 = dt_i8
3307 .parse_and_store_default(&serde_json::json!(i8::MAX))
3308 .unwrap();
3309 assert_eq!(lit_i8, AvroLiteral::Int(i8::MAX as i32));
3310 let err_i8_high = dt_i8
3311 .parse_and_store_default(&serde_json::json!(i8::MAX as i64 + 1))
3312 .unwrap_err();
3313 assert!(err_i8_high.to_string().contains("out of i8 range"));
3314 let err_i8_low = dt_i8
3315 .parse_and_store_default(&serde_json::json!(i8::MIN as i64 - 1))
3316 .unwrap_err();
3317 assert!(err_i8_low.to_string().contains("out of i8 range"));
3318
3319 let mut dt_i16 = AvroDataType::new(Codec::Int16, HashMap::new(), None);
3320 let lit_i16 = dt_i16
3321 .parse_and_store_default(&serde_json::json!(i16::MIN))
3322 .unwrap();
3323 assert_eq!(lit_i16, AvroLiteral::Int(i16::MIN as i32));
3324 let err_i16_high = dt_i16
3325 .parse_and_store_default(&serde_json::json!(i16::MAX as i64 + 1))
3326 .unwrap_err();
3327 assert!(err_i16_high.to_string().contains("out of i16 range"));
3328 let err_i16_low = dt_i16
3329 .parse_and_store_default(&serde_json::json!(i16::MIN as i64 - 1))
3330 .unwrap_err();
3331 assert!(err_i16_low.to_string().contains("out of i16 range"));
3332
3333 let mut dt_u8 = AvroDataType::new(Codec::UInt8, HashMap::new(), None);
3334 let lit_u8 = dt_u8
3335 .parse_and_store_default(&serde_json::json!(u8::MAX))
3336 .unwrap();
3337 assert_eq!(lit_u8, AvroLiteral::Int(u8::MAX as i32));
3338 let err_u8_neg = dt_u8
3339 .parse_and_store_default(&serde_json::json!(-1))
3340 .unwrap_err();
3341 assert!(err_u8_neg.to_string().contains("out of u8 range"));
3342 let err_u8_high = dt_u8
3343 .parse_and_store_default(&serde_json::json!(u8::MAX as i64 + 1))
3344 .unwrap_err();
3345 assert!(err_u8_high.to_string().contains("out of u8 range"));
3346
3347 let mut dt_u16 = AvroDataType::new(Codec::UInt16, HashMap::new(), None);
3348 let lit_u16 = dt_u16
3349 .parse_and_store_default(&serde_json::json!(u16::MAX))
3350 .unwrap();
3351 assert_eq!(lit_u16, AvroLiteral::Int(u16::MAX as i32));
3352 let err_u16_neg = dt_u16
3353 .parse_and_store_default(&serde_json::json!(-1))
3354 .unwrap_err();
3355 assert!(err_u16_neg.to_string().contains("out of u16 range"));
3356 let err_u16_high = dt_u16
3357 .parse_and_store_default(&serde_json::json!(u16::MAX as i64 + 1))
3358 .unwrap_err();
3359 assert!(err_u16_high.to_string().contains("out of u16 range"));
3360
3361 let mut dt_u32 = AvroDataType::new(Codec::UInt32, HashMap::new(), None);
3362 let lit_u32 = dt_u32
3363 .parse_and_store_default(&serde_json::json!(u32::MAX as i64))
3364 .unwrap();
3365 assert_eq!(lit_u32, AvroLiteral::Long(u32::MAX as i64));
3366 let err_u32_neg = dt_u32
3367 .parse_and_store_default(&serde_json::json!(-1))
3368 .unwrap_err();
3369 assert!(err_u32_neg.to_string().contains("out of u32 range"));
3370 let err_u32_high = dt_u32
3371 .parse_and_store_default(&serde_json::json!(u32::MAX as i64 + 1))
3372 .unwrap_err();
3373 assert!(err_u32_high.to_string().contains("out of u32 range"));
3374 }
3375
3376 #[test]
3377 fn test_validate_and_store_default_fixed_decimal_interval() {
3378 let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
3379 let l = dt_fixed
3380 .parse_and_store_default(&json_string("WXYZ"))
3381 .unwrap();
3382 assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
3383 let err = dt_fixed
3384 .parse_and_store_default(&json_string("TOO LONG"))
3385 .unwrap_err();
3386 assert!(err.to_string().contains("Default length"));
3387 let mut dt_dec_fixed =
3388 AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
3389 let l = dt_dec_fixed
3390 .parse_and_store_default(&json_string("abc"))
3391 .unwrap();
3392 assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
3393 let err = dt_dec_fixed
3394 .parse_and_store_default(&json_string("toolong"))
3395 .unwrap_err();
3396 assert!(err.to_string().contains("Default length"));
3397 let mut dt_dec_bytes =
3398 AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3399 let l = dt_dec_bytes
3400 .parse_and_store_default(&json_string("freeform"))
3401 .unwrap();
3402 assert_eq!(
3403 l,
3404 AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
3405 );
3406 let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
3407 let l = dt_interval
3408 .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
3409 .unwrap();
3410 assert_eq!(
3411 l,
3412 AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
3413 );
3414 let err = dt_interval
3415 .parse_and_store_default(&json_string("short"))
3416 .unwrap_err();
3417 assert!(err.to_string().contains("Default length"));
3418 }
3419
3420 #[test]
3421 fn test_validate_and_store_default_enum_list_map_struct() {
3422 let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
3423 .into_iter()
3424 .collect();
3425 let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
3426 let l = dt_enum
3427 .parse_and_store_default(&json_string("GREEN"))
3428 .unwrap();
3429 assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
3430 let err = dt_enum
3431 .parse_and_store_default(&json_string("YELLOW"))
3432 .unwrap_err();
3433 assert!(err.to_string().contains("Default enum symbol"));
3434 let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3435 let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
3436 let val = serde_json::json!([1, 2, 3]);
3437 let l = dt_list.parse_and_store_default(&val).unwrap();
3438 assert_eq!(
3439 l,
3440 AvroLiteral::Array(vec![
3441 AvroLiteral::Long(1),
3442 AvroLiteral::Long(2),
3443 AvroLiteral::Long(3)
3444 ])
3445 );
3446 let err = dt_list
3447 .parse_and_store_default(&serde_json::json!({"not":"array"}))
3448 .unwrap_err();
3449 assert!(err.to_string().contains("JSON array"));
3450 let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3451 let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
3452 let mv = serde_json::json!({"x": 1.5, "y": 2.5});
3453 let l = dt_map.parse_and_store_default(&mv).unwrap();
3454 let mut expected = IndexMap::new();
3455 expected.insert("x".into(), AvroLiteral::Double(1.5));
3456 expected.insert("y".into(), AvroLiteral::Double(2.5));
3457 assert_eq!(l, AvroLiteral::Map(expected));
3458 let err = dt_map
3460 .parse_and_store_default(&serde_json::json!(123))
3461 .unwrap_err();
3462 assert!(err.to_string().contains("JSON object"));
3463 let mut field_a = AvroField {
3464 name: "a".into(),
3465 data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
3466 };
3467 let field_b = AvroField {
3468 name: "b".into(),
3469 data_type: AvroDataType::new(
3470 Codec::Int64,
3471 HashMap::new(),
3472 Some(Nullability::NullFirst),
3473 ),
3474 };
3475 let mut c_md = HashMap::new();
3476 c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
3477 let field_c = AvroField {
3478 name: "c".into(),
3479 data_type: AvroDataType::new(Codec::Utf8, c_md, None),
3480 };
3481 field_a.data_type.metadata.insert("doc".into(), "na".into());
3482 let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
3483 let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
3484 let default_obj = serde_json::json!({"a": 7});
3485 let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
3486 let mut expected = IndexMap::new();
3487 expected.insert("a".into(), AvroLiteral::Int(7));
3488 expected.insert("b".into(), AvroLiteral::Null);
3489 expected.insert("c".into(), AvroLiteral::String("xyz".into()));
3490 assert_eq!(l, AvroLiteral::Map(expected));
3491 assert_default_stored(&dt_struct, &default_obj);
3492 let req_field = AvroField {
3493 name: "req".into(),
3494 data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
3495 };
3496 let mut dt_bad = AvroDataType::new(
3497 Codec::Struct(Arc::from(vec![req_field])),
3498 HashMap::new(),
3499 None,
3500 );
3501 let err = dt_bad
3502 .parse_and_store_default(&serde_json::json!({}))
3503 .unwrap_err();
3504 assert!(
3505 err.to_string().contains("missing required subfield 'req'"),
3506 "unexpected error: {err}"
3507 );
3508 let err = dt_struct
3509 .parse_and_store_default(&serde_json::json!(10))
3510 .unwrap_err();
3511 err.to_string().contains("must be a JSON object");
3512 }
3513
3514 #[test]
3515 fn test_resolve_array_promotion_and_reader_metadata() {
3516 let mut w_add: HashMap<&str, Value> = HashMap::new();
3517 w_add.insert("who", json_string("writer"));
3518 let mut r_add: HashMap<&str, Value> = HashMap::new();
3519 r_add.insert("who", json_string("reader"));
3520 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3521 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3522 attributes: Attributes {
3523 logical_type: None,
3524 additional: w_add,
3525 },
3526 }));
3527 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3528 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
3529 attributes: Attributes {
3530 logical_type: None,
3531 additional: r_add,
3532 },
3533 }));
3534 let mut maker = Maker::new(false, false);
3535 let dt = maker
3536 .make_data_type(&writer_schema, Some(&reader_schema), None)
3537 .unwrap();
3538 assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
3539 if let Codec::List(inner) = dt.codec() {
3540 assert!(matches!(inner.codec(), Codec::Int64));
3541 assert_eq!(
3542 inner.resolution,
3543 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3544 );
3545 } else {
3546 panic!("expected list codec");
3547 }
3548 }
3549
3550 #[test]
3551 fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
3552 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3553 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3554 attributes: Attributes::default(),
3555 }));
3556 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3557 items: Box::new(mk_union(vec![
3558 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3559 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3560 ])),
3561 attributes: Attributes::default(),
3562 }));
3563 let mut maker = Maker::new(false, false);
3564 let dt = maker
3565 .make_data_type(&writer_schema, Some(&reader_schema), None)
3566 .unwrap();
3567 if let Codec::List(inner) = dt.codec() {
3568 assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
3569 assert!(matches!(inner.codec(), Codec::Int32));
3570 match inner.resolution.as_ref() {
3571 Some(ResolutionInfo::Promotion(Promotion::Direct)) => {}
3572 other => panic!("expected Union resolution, got {other:?}"),
3573 }
3574 } else {
3575 panic!("expected List codec");
3576 }
3577 }
3578
3579 #[test]
3580 fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3581 let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3582 name: "MD5",
3583 namespace: None,
3584 aliases: vec!["Hash16"],
3585 size: 16,
3586 attributes: Attributes::default(),
3587 }));
3588 let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3589 name: "Hash16",
3590 namespace: None,
3591 aliases: vec![],
3592 size: 16,
3593 attributes: Attributes::default(),
3594 }));
3595 let mut maker = Maker::new(false, false);
3596 let dt = maker
3597 .make_data_type(&writer_schema, Some(&reader_schema), None)
3598 .unwrap();
3599 assert!(matches!(dt.codec(), Codec::Fixed(16)));
3600 }
3601
3602 #[cfg(feature = "avro_custom_types")]
3603 #[test]
3604 fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3605 let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3606 name: "ArrowIntervalMDN",
3607 namespace: None,
3608 aliases: vec![],
3609 size: 16,
3610 attributes: Attributes {
3611 logical_type: Some("arrow.interval-month-day-nano"),
3612 additional: Default::default(),
3613 },
3614 }));
3615 let mut maker = Maker::new(false, false);
3616 let dt = maker.make_data_type(&schema, None, None).unwrap();
3617 assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3618 assert_eq!(
3619 dt.codec.data_type(),
3620 DataType::Interval(IntervalUnit::MonthDayNano)
3621 );
3622 }
3623
3624 #[test]
3625 fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3626 let writer = Schema::Complex(ComplexType::Record(Record {
3627 name: "R",
3628 namespace: None,
3629 doc: None,
3630 aliases: vec![],
3631 fields: vec![
3632 crate::schema::Field {
3633 name: "a",
3634 doc: None,
3635 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3636 default: None,
3637 aliases: vec![],
3638 },
3639 crate::schema::Field {
3640 name: "skipme",
3641 doc: None,
3642 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3643 default: None,
3644 aliases: vec![],
3645 },
3646 crate::schema::Field {
3647 name: "b",
3648 doc: None,
3649 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3650 default: None,
3651 aliases: vec![],
3652 },
3653 ],
3654 attributes: Attributes::default(),
3655 }));
3656 let reader = Schema::Complex(ComplexType::Record(Record {
3657 name: "R",
3658 namespace: None,
3659 doc: None,
3660 aliases: vec![],
3661 fields: vec![
3662 crate::schema::Field {
3663 name: "b",
3664 doc: None,
3665 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3666 default: None,
3667 aliases: vec![],
3668 },
3669 crate::schema::Field {
3670 name: "a",
3671 doc: None,
3672 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3673 default: None,
3674 aliases: vec![],
3675 },
3676 crate::schema::Field {
3677 name: "name",
3678 doc: None,
3679 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3680 default: Some(json_string("anon")),
3681 aliases: vec![],
3682 },
3683 crate::schema::Field {
3684 name: "opt",
3685 doc: None,
3686 r#type: Schema::Union(vec![
3687 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3688 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3689 ]),
3690 default: None, aliases: vec![],
3692 },
3693 ],
3694 attributes: Attributes::default(),
3695 }));
3696 let mut maker = Maker::new(false, false);
3697 let dt = maker
3698 .make_data_type(&writer, Some(&reader), None)
3699 .expect("record resolution");
3700 let fields = match dt.codec() {
3701 Codec::Struct(f) => f,
3702 other => panic!("expected struct, got {other:?}"),
3703 };
3704 assert_eq!(fields.len(), 4);
3705 assert_eq!(fields[0].name(), "b");
3706 assert_eq!(fields[1].name(), "a");
3707 assert_eq!(fields[2].name(), "name");
3708 assert_eq!(fields[3].name(), "opt");
3709 assert!(matches!(
3710 fields[1].data_type().resolution,
3711 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3712 ));
3713 let rec = match dt.resolution {
3714 Some(ResolutionInfo::Record(ref r)) => r.clone(),
3715 other => panic!("expected record resolution, got {other:?}"),
3716 };
3717 assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
3718 assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3719 assert!(rec.skip_fields[0].is_none());
3720 assert!(rec.skip_fields[2].is_none());
3721 let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
3722 assert!(matches!(skip1.codec(), Codec::Utf8));
3723 let name_md = &fields[2].data_type().metadata;
3724 assert_eq!(
3725 name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3726 Some(&"\"anon\"".to_string())
3727 );
3728 let opt_md = &fields[3].data_type().metadata;
3729 assert_eq!(
3730 opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3731 Some(&"null".to_string())
3732 );
3733 }
3734
3735 #[test]
3736 fn test_named_type_alias_resolution_record_cross_namespace() {
3737 let writer_record = Record {
3738 name: "PersonV2",
3739 namespace: Some("com.example.v2"),
3740 doc: None,
3741 aliases: vec!["com.example.Person"],
3742 fields: vec![
3743 AvroFieldSchema {
3744 name: "name",
3745 doc: None,
3746 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3747 default: None,
3748 aliases: vec![],
3749 },
3750 AvroFieldSchema {
3751 name: "age",
3752 doc: None,
3753 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3754 default: None,
3755 aliases: vec![],
3756 },
3757 ],
3758 attributes: Attributes::default(),
3759 };
3760 let reader_record = Record {
3761 name: "Person",
3762 namespace: Some("com.example"),
3763 doc: None,
3764 aliases: vec![],
3765 fields: writer_record.fields.clone(),
3766 attributes: Attributes::default(),
3767 };
3768 let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3769 let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3770 let mut maker = Maker::new(false, false);
3771 let result = maker
3772 .make_data_type(&writer_schema, Some(&reader_schema), None)
3773 .expect("record alias resolution should succeed");
3774 match result.codec {
3775 Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3776 other => panic!("expected struct, got {other:?}"),
3777 }
3778 }
3779
3780 #[test]
3781 fn test_named_type_alias_resolution_enum_cross_namespace() {
3782 let writer_enum = Enum {
3783 name: "ColorV2",
3784 namespace: Some("org.example.v2"),
3785 doc: None,
3786 aliases: vec!["org.example.Color"],
3787 symbols: vec!["RED", "GREEN", "BLUE"],
3788 default: None,
3789 attributes: Attributes::default(),
3790 };
3791 let reader_enum = Enum {
3792 name: "Color",
3793 namespace: Some("org.example"),
3794 doc: None,
3795 aliases: vec![],
3796 symbols: vec!["RED", "GREEN", "BLUE"],
3797 default: None,
3798 attributes: Attributes::default(),
3799 };
3800 let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3801 let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3802 let mut maker = Maker::new(false, false);
3803 maker
3804 .make_data_type(&writer_schema, Some(&reader_schema), None)
3805 .expect("enum alias resolution should succeed");
3806 }
3807
3808 #[test]
3809 fn test_named_type_alias_resolution_fixed_cross_namespace() {
3810 let writer_fixed = Fixed {
3811 name: "Fx10V2",
3812 namespace: Some("ns.v2"),
3813 aliases: vec!["ns.Fx10"],
3814 size: 10,
3815 attributes: Attributes::default(),
3816 };
3817 let reader_fixed = Fixed {
3818 name: "Fx10",
3819 namespace: Some("ns"),
3820 aliases: vec![],
3821 size: 10,
3822 attributes: Attributes::default(),
3823 };
3824 let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3825 let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3826 let mut maker = Maker::new(false, false);
3827 maker
3828 .make_data_type(&writer_schema, Some(&reader_schema), None)
3829 .expect("fixed alias resolution should succeed");
3830 }
3831}