1use crate::schema::{
21 AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22 AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23 PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26 ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27 IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43 Promotion(Promotion),
45 DefaultValue(AvroLiteral),
47 EnumMapping(EnumMapping),
49 Record(ResolvedRecord),
51 Union(ResolvedUnion),
53}
54
55#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60 Null,
62 Boolean(bool),
64 Int(i32),
66 Long(i64),
68 Float(f32),
70 Double(f64),
72 Bytes(Vec<u8>),
74 String(String),
76 Enum(String),
78 Array(Vec<AvroLiteral>),
80 Map(IndexMap<String, AvroLiteral>),
82}
83
84#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87 pub(crate) writer_fields: Arc<[ResolvedField]>,
89 pub(crate) default_fields: Arc<[usize]>,
91}
92
93#[derive(Debug, Clone, PartialEq)]
95pub(crate) enum ResolvedField {
96 ToReader(usize, AvroDataType),
100 Skip(AvroDataType),
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub(crate) enum Promotion {
111 Direct,
113 IntToLong,
115 IntToFloat,
117 IntToDouble,
119 LongToFloat,
121 LongToDouble,
123 FloatToDouble,
125 StringToBytes,
127 BytesToString,
129}
130
131impl Display for Promotion {
132 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
133 match self {
134 Self::Direct => write!(formatter, "Direct"),
135 Self::IntToLong => write!(formatter, "Int->Long"),
136 Self::IntToFloat => write!(formatter, "Int->Float"),
137 Self::IntToDouble => write!(formatter, "Int->Double"),
138 Self::LongToFloat => write!(formatter, "Long->Float"),
139 Self::LongToDouble => write!(formatter, "Long->Double"),
140 Self::FloatToDouble => write!(formatter, "Float->Double"),
141 Self::StringToBytes => write!(formatter, "String->Bytes"),
142 Self::BytesToString => write!(formatter, "Bytes->String"),
143 }
144 }
145}
146
147#[derive(Debug, Clone, PartialEq)]
149pub(crate) struct ResolvedUnion {
150 pub(crate) writer_to_reader: Arc<[Option<(usize, ResolutionInfo)>]>,
153 pub(crate) writer_is_union: bool,
155 pub(crate) reader_is_union: bool,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq)]
163pub(crate) struct EnumMapping {
164 pub(crate) mapping: Arc<[i32]>,
166 pub(crate) default_index: i32,
169}
170
171#[cfg(feature = "canonical_extension_types")]
172fn with_extension_type(codec: &Codec, field: Field) -> Field {
173 match codec {
174 Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
175 _ => field,
176 }
177}
178
179#[derive(Debug, Clone, PartialEq)]
181pub(crate) struct AvroDataType {
182 nullability: Option<Nullability>,
183 metadata: HashMap<String, String>,
184 codec: Codec,
185 pub(crate) resolution: Option<ResolutionInfo>,
186}
187
188impl AvroDataType {
189 pub(crate) fn new(
191 codec: Codec,
192 metadata: HashMap<String, String>,
193 nullability: Option<Nullability>,
194 ) -> Self {
195 AvroDataType {
196 codec,
197 metadata,
198 nullability,
199 resolution: None,
200 }
201 }
202
203 #[inline]
204 fn new_with_resolution(
205 codec: Codec,
206 metadata: HashMap<String, String>,
207 nullability: Option<Nullability>,
208 resolution: Option<ResolutionInfo>,
209 ) -> Self {
210 Self {
211 codec,
212 metadata,
213 nullability,
214 resolution,
215 }
216 }
217
218 pub(crate) fn field_with_name(&self, name: &str) -> Field {
220 let mut nullable = self.nullability.is_some();
221 if !nullable {
222 if let Codec::Union(children, _, _) = self.codec() {
223 if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
225 nullable = true;
226 }
227 }
228 }
229 let data_type = self.codec.data_type();
230 let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
231 #[cfg(feature = "canonical_extension_types")]
232 return with_extension_type(&self.codec, field);
233 #[cfg(not(feature = "canonical_extension_types"))]
234 field
235 }
236
237 pub(crate) fn codec(&self) -> &Codec {
242 &self.codec
243 }
244
245 pub(crate) fn nullability(&self) -> Option<Nullability> {
253 self.nullability
254 }
255
256 #[inline]
257 fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
258 fn expect_string<'v>(
259 default_json: &'v Value,
260 data_type: &str,
261 ) -> Result<&'v str, ArrowError> {
262 match default_json {
263 Value::String(s) => Ok(s.as_str()),
264 _ => Err(ArrowError::SchemaError(format!(
265 "Default value must be a JSON string for {data_type}"
266 ))),
267 }
268 }
269
270 fn parse_bytes_default(
271 default_json: &Value,
272 expected_len: Option<usize>,
273 ) -> Result<Vec<u8>, ArrowError> {
274 let s = expect_string(default_json, "bytes/fixed logical types")?;
275 let mut out = Vec::with_capacity(s.len());
276 for ch in s.chars() {
277 let cp = ch as u32;
278 if cp > 0xFF {
279 return Err(ArrowError::SchemaError(format!(
280 "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
281 )));
282 }
283 out.push(cp as u8);
284 }
285 if let Some(len) = expected_len {
286 if out.len() != len {
287 return Err(ArrowError::SchemaError(format!(
288 "Default length {} does not match expected fixed size {len}",
289 out.len(),
290 )));
291 }
292 }
293 Ok(out)
294 }
295
296 fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
297 match default_json {
298 Value::Number(n) => n.as_i64().ok_or_else(|| {
299 ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
300 }),
301 _ => Err(ArrowError::SchemaError(format!(
302 "Default {data_type} must be a JSON integer"
303 ))),
304 }
305 }
306
307 fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
308 match default_json {
309 Value::Number(n) => n.as_f64().ok_or_else(|| {
310 ArrowError::SchemaError(format!("Default {data_type} must be a number"))
311 }),
312 _ => Err(ArrowError::SchemaError(format!(
313 "Default {data_type} must be a JSON number"
314 ))),
315 }
316 }
317
318 if default_json.is_null() {
320 return match self.codec() {
321 Codec::Null => Ok(AvroLiteral::Null),
322 Codec::Union(encodings, _, _) if !encodings.is_empty()
323 && matches!(encodings[0].codec(), Codec::Null) =>
324 {
325 Ok(AvroLiteral::Null)
326 }
327 _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
328 _ => Err(ArrowError::SchemaError(
329 "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
330 .to_string(),
331 )),
332 };
333 }
334 let lit = match self.codec() {
335 Codec::Null => {
336 return Err(ArrowError::SchemaError(
337 "Default for `null` type must be JSON null".to_string(),
338 ));
339 }
340 Codec::Boolean => match default_json {
341 Value::Bool(b) => AvroLiteral::Boolean(*b),
342 _ => {
343 return Err(ArrowError::SchemaError(
344 "Boolean default must be a JSON boolean".to_string(),
345 ));
346 }
347 },
348 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
349 let i = parse_json_i64(default_json, "int")?;
350 if i < i32::MIN as i64 || i > i32::MAX as i64 {
351 return Err(ArrowError::SchemaError(format!(
352 "Default int {i} out of i32 range"
353 )));
354 }
355 AvroLiteral::Int(i as i32)
356 }
357 Codec::Int64
358 | Codec::TimeMicros
359 | Codec::TimestampMillis(_)
360 | Codec::TimestampMicros(_)
361 | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
362 #[cfg(feature = "avro_custom_types")]
363 Codec::DurationNanos
364 | Codec::DurationMicros
365 | Codec::DurationMillis
366 | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
367 #[cfg(feature = "avro_custom_types")]
368 Codec::Int8 => {
369 let i = parse_json_i64(default_json, "int")?;
370 if i < i8::MIN as i64 || i > i8::MAX as i64 {
371 return Err(ArrowError::SchemaError(format!(
372 "Default int8 {i} out of i8 range"
373 )));
374 }
375 AvroLiteral::Int(i as i32)
376 }
377 #[cfg(feature = "avro_custom_types")]
378 Codec::Int16 => {
379 let i = parse_json_i64(default_json, "int")?;
380 if i < i16::MIN as i64 || i > i16::MAX as i64 {
381 return Err(ArrowError::SchemaError(format!(
382 "Default int16 {i} out of i16 range"
383 )));
384 }
385 AvroLiteral::Int(i as i32)
386 }
387 #[cfg(feature = "avro_custom_types")]
388 Codec::UInt8 => {
389 let i = parse_json_i64(default_json, "int")?;
390 if i < 0 || i > u8::MAX as i64 {
391 return Err(ArrowError::SchemaError(format!(
392 "Default uint8 {i} out of u8 range"
393 )));
394 }
395 AvroLiteral::Int(i as i32)
396 }
397 #[cfg(feature = "avro_custom_types")]
398 Codec::UInt16 => {
399 let i = parse_json_i64(default_json, "int")?;
400 if i < 0 || i > u16::MAX as i64 {
401 return Err(ArrowError::SchemaError(format!(
402 "Default uint16 {i} out of u16 range"
403 )));
404 }
405 AvroLiteral::Int(i as i32)
406 }
407 #[cfg(feature = "avro_custom_types")]
408 Codec::UInt32 => {
409 let i = parse_json_i64(default_json, "long")?;
410 if i < 0 || i > u32::MAX as i64 {
411 return Err(ArrowError::SchemaError(format!(
412 "Default uint32 {i} out of u32 range"
413 )));
414 }
415 AvroLiteral::Long(i)
416 }
417 #[cfg(feature = "avro_custom_types")]
418 Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
419 AvroLiteral::Long(parse_json_i64(default_json, "long")?)
420 }
421 #[cfg(feature = "avro_custom_types")]
422 Codec::UInt64 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?),
423 #[cfg(feature = "avro_custom_types")]
424 Codec::Float16 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(2))?),
425 #[cfg(feature = "avro_custom_types")]
426 Codec::Time32Secs => {
427 let i = parse_json_i64(default_json, "int")?;
428 if i < i32::MIN as i64 || i > i32::MAX as i64 {
429 return Err(ArrowError::SchemaError(format!(
430 "Default time32-secs {i} out of i32 range"
431 )));
432 }
433 AvroLiteral::Int(i as i32)
434 }
435 #[cfg(feature = "avro_custom_types")]
436 Codec::IntervalYearMonth => {
437 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
438 }
439 #[cfg(feature = "avro_custom_types")]
440 Codec::IntervalMonthDayNano => {
441 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
442 }
443 #[cfg(feature = "avro_custom_types")]
444 Codec::IntervalDayTime => {
445 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
446 }
447 Codec::Float32 => {
448 let f = parse_json_f64(default_json, "float")?;
449 if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
450 return Err(ArrowError::SchemaError(format!(
451 "Default float {f} out of f32 range or not finite"
452 )));
453 }
454 AvroLiteral::Float(f as f32)
455 }
456 Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
457 Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
458 AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
459 }
460 Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
461 Codec::Fixed(sz) => {
462 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
463 }
464 Codec::Decimal(_, _, fixed_size) => {
465 AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
466 }
467 Codec::Enum(symbols) => {
468 let s = expect_string(default_json, "enum")?;
469 if symbols.iter().any(|sym| sym == s) {
470 AvroLiteral::Enum(s.to_string())
471 } else {
472 return Err(ArrowError::SchemaError(format!(
473 "Default enum symbol {s:?} not found in reader enum symbols"
474 )));
475 }
476 }
477 Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
478 Codec::List(item_dt) => match default_json {
479 Value::Array(items) => AvroLiteral::Array(
480 items
481 .iter()
482 .map(|v| item_dt.parse_default_literal(v))
483 .collect::<Result<_, _>>()?,
484 ),
485 _ => {
486 return Err(ArrowError::SchemaError(
487 "Default value must be a JSON array for Avro array type".to_string(),
488 ));
489 }
490 },
491 Codec::Map(val_dt) => match default_json {
492 Value::Object(map) => {
493 let mut out = IndexMap::with_capacity(map.len());
494 for (k, v) in map {
495 out.insert(k.clone(), val_dt.parse_default_literal(v)?);
496 }
497 AvroLiteral::Map(out)
498 }
499 _ => {
500 return Err(ArrowError::SchemaError(
501 "Default value must be a JSON object for Avro map type".to_string(),
502 ));
503 }
504 },
505 Codec::Struct(fields) => match default_json {
506 Value::Object(obj) => {
507 let mut out: IndexMap<String, AvroLiteral> =
508 IndexMap::with_capacity(fields.len());
509 for f in fields.as_ref() {
510 let name = f.name().to_string();
511 if let Some(sub) = obj.get(&name) {
512 out.insert(name, f.data_type().parse_default_literal(sub)?);
513 } else {
514 let stored_default =
516 f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
517 if stored_default.is_none()
518 && f.data_type().nullability() == Some(Nullability::default())
519 {
520 out.insert(name, AvroLiteral::Null);
521 } else if let Some(default_json) = stored_default {
522 let v: Value =
523 serde_json::from_str(default_json).map_err(|e| {
524 ArrowError::SchemaError(format!(
525 "Failed to parse stored subfield default JSON for '{}': {e}",
526 f.name(),
527 ))
528 })?;
529 out.insert(name, f.data_type().parse_default_literal(&v)?);
530 } else {
531 return Err(ArrowError::SchemaError(format!(
532 "Record default missing required subfield '{}' with non-nullable type {:?}",
533 f.name(),
534 f.data_type().codec()
535 )));
536 }
537 }
538 }
539 AvroLiteral::Map(out)
540 }
541 _ => {
542 return Err(ArrowError::SchemaError(
543 "Default value for record/struct must be a JSON object".to_string(),
544 ));
545 }
546 },
547 Codec::Union(encodings, _, _) => {
548 let Some(default_encoding) = encodings.first() else {
549 return Err(ArrowError::SchemaError(
550 "Union with no branches cannot have a default".to_string(),
551 ));
552 };
553 default_encoding.parse_default_literal(default_json)?
554 }
555 #[cfg(feature = "avro_custom_types")]
556 Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
557 };
558 Ok(lit)
559 }
560
561 fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
562 let json_text = serde_json::to_string(default_json).map_err(|e| {
563 ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
564 })?;
565 self.metadata
566 .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
567 Ok(())
568 }
569
570 fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
571 let lit = self.parse_default_literal(default_json)?;
572 self.store_default(default_json)?;
573 Ok(lit)
574 }
575}
576
577#[derive(Debug, Clone, PartialEq)]
579pub(crate) struct AvroField {
580 name: String,
581 data_type: AvroDataType,
582}
583
584impl AvroField {
585 pub(crate) fn field(&self) -> Field {
587 self.data_type.field_with_name(&self.name)
588 }
589
590 pub(crate) fn data_type(&self) -> &AvroDataType {
592 &self.data_type
593 }
594
595 pub(crate) fn with_utf8view(&self) -> Self {
604 let mut field = self.clone();
605 if let Codec::Utf8 = field.data_type.codec {
606 field.data_type.codec = Codec::Utf8View;
607 }
608 field
609 }
610
611 pub(crate) fn name(&self) -> &str {
616 &self.name
617 }
618}
619
620impl<'a> TryFrom<&Schema<'a>> for AvroField {
621 type Error = ArrowError;
622
623 fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
624 match schema {
625 Schema::Complex(ComplexType::Record(r)) => {
626 let mut resolver = Maker::new(false, false, Tz::default());
627 let data_type = resolver.make_data_type(schema, None, None)?;
628 Ok(AvroField {
629 data_type,
630 name: r.name.to_string(),
631 })
632 }
633 _ => Err(ArrowError::ParseError(format!(
634 "Expected record got {schema:?}"
635 ))),
636 }
637 }
638}
639
640#[derive(Debug)]
642pub(crate) struct AvroFieldBuilder<'a> {
643 writer_schema: &'a Schema<'a>,
644 reader_schema: Option<&'a Schema<'a>>,
645 use_utf8view: bool,
646 strict_mode: bool,
647 tz: Tz,
648}
649
650impl<'a> AvroFieldBuilder<'a> {
651 pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
653 Self {
654 writer_schema,
655 reader_schema: None,
656 use_utf8view: false,
657 strict_mode: false,
658 tz: Tz::default(),
659 }
660 }
661
662 #[inline]
667 pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
668 self.reader_schema = Some(reader_schema);
669 self
670 }
671
672 pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
674 self.use_utf8view = use_utf8view;
675 self
676 }
677
678 pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
680 self.strict_mode = strict_mode;
681 self
682 }
683
684 pub(crate) fn with_tz(mut self, tz: Tz) -> Self {
686 self.tz = tz;
687 self
688 }
689
690 pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
692 match self.writer_schema {
693 Schema::Complex(ComplexType::Record(r)) => {
694 let mut resolver = Maker::new(self.use_utf8view, self.strict_mode, self.tz);
695 let data_type =
696 resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
697 Ok(AvroField {
698 name: r.name.to_string(),
699 data_type,
700 })
701 }
702 _ => Err(ArrowError::ParseError(format!(
703 "Expected a Record schema to build an AvroField, but got {:?}",
704 self.writer_schema
705 ))),
706 }
707 }
708}
709
710#[derive(Debug, Copy, Clone, PartialEq, Default)]
716pub enum Tz {
717 #[default]
719 OffsetZero,
720 Utc,
722}
723
724impl Tz {
725 pub fn as_str(&self) -> &'static str {
727 match self {
728 Self::OffsetZero => "+00:00",
729 Self::Utc => "UTC",
730 }
731 }
732}
733
734impl Display for Tz {
735 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
736 f.write_str(self.as_str())
737 }
738}
739
740#[derive(Debug, Clone, PartialEq)]
744pub(crate) enum Codec {
745 Null,
747 Boolean,
749 Int32,
751 Int64,
753 Float32,
755 Float64,
757 Binary,
759 Utf8,
761 Utf8View,
766 Date32,
768 TimeMillis,
770 TimeMicros,
772 TimestampMillis(Option<Tz>),
777 TimestampMicros(Option<Tz>),
782 TimestampNanos(Option<Tz>),
787 Fixed(i32),
790 Decimal(usize, Option<usize>, Option<usize>),
797 Uuid,
799 Enum(Arc<[String]>),
803 List(Arc<AvroDataType>),
805 Struct(Arc<[AvroField]>),
807 Map(Arc<AvroDataType>),
809 Interval,
811 Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
813 #[cfg(feature = "avro_custom_types")]
815 DurationNanos,
816 #[cfg(feature = "avro_custom_types")]
818 DurationMicros,
819 #[cfg(feature = "avro_custom_types")]
821 DurationMillis,
822 #[cfg(feature = "avro_custom_types")]
824 DurationSeconds,
825 #[cfg(feature = "avro_custom_types")]
826 RunEndEncoded(Arc<AvroDataType>, u8),
827 #[cfg(feature = "avro_custom_types")]
829 Int8,
830 #[cfg(feature = "avro_custom_types")]
832 Int16,
833 #[cfg(feature = "avro_custom_types")]
835 UInt8,
836 #[cfg(feature = "avro_custom_types")]
838 UInt16,
839 #[cfg(feature = "avro_custom_types")]
841 UInt32,
842 #[cfg(feature = "avro_custom_types")]
844 UInt64,
845 #[cfg(feature = "avro_custom_types")]
847 Float16,
848 #[cfg(feature = "avro_custom_types")]
850 Date64,
851 #[cfg(feature = "avro_custom_types")]
853 TimeNanos,
854 #[cfg(feature = "avro_custom_types")]
856 Time32Secs,
857 #[cfg(feature = "avro_custom_types")]
860 TimestampSecs(bool),
861 #[cfg(feature = "avro_custom_types")]
863 IntervalYearMonth,
864 #[cfg(feature = "avro_custom_types")]
866 IntervalMonthDayNano,
867 #[cfg(feature = "avro_custom_types")]
869 IntervalDayTime,
870}
871
872impl Codec {
873 fn data_type(&self) -> DataType {
874 match self {
875 Self::Null => DataType::Null,
876 Self::Boolean => DataType::Boolean,
877 Self::Int32 => DataType::Int32,
878 Self::Int64 => DataType::Int64,
879 Self::Float32 => DataType::Float32,
880 Self::Float64 => DataType::Float64,
881 Self::Binary => DataType::Binary,
882 Self::Utf8 => DataType::Utf8,
883 Self::Utf8View => DataType::Utf8View,
884 Self::Date32 => DataType::Date32,
885 Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
886 Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
887 Self::TimestampMillis(tz) => DataType::Timestamp(
888 TimeUnit::Millisecond,
889 tz.as_ref().map(|tz| tz.as_str().into()),
890 ),
891 Self::TimestampMicros(tz) => DataType::Timestamp(
892 TimeUnit::Microsecond,
893 tz.as_ref().map(|tz| tz.as_str().into()),
894 ),
895 Self::TimestampNanos(tz) => DataType::Timestamp(
896 TimeUnit::Nanosecond,
897 tz.as_ref().map(|tz| tz.as_str().into()),
898 ),
899 Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
900 Self::Fixed(size) => DataType::FixedSizeBinary(*size),
901 Self::Decimal(precision, scale, _size) => {
902 let p = *precision as u8;
903 let s = scale.unwrap_or(0) as i8;
904 #[cfg(feature = "small_decimals")]
905 {
906 if *precision <= DECIMAL32_MAX_PRECISION as usize {
907 DataType::Decimal32(p, s)
908 } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
909 DataType::Decimal64(p, s)
910 } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
911 DataType::Decimal128(p, s)
912 } else {
913 DataType::Decimal256(p, s)
914 }
915 }
916 #[cfg(not(feature = "small_decimals"))]
917 {
918 if *precision <= DECIMAL128_MAX_PRECISION as usize {
919 DataType::Decimal128(p, s)
920 } else {
921 DataType::Decimal256(p, s)
922 }
923 }
924 }
925 Self::Uuid => DataType::FixedSizeBinary(16),
926 Self::Enum(_) => {
927 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
928 }
929 Self::List(f) => {
930 DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
931 }
932 Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
933 Self::Map(value_type) => {
934 let val_field = value_type.field_with_name("value");
935 DataType::Map(
936 Arc::new(Field::new(
937 "entries",
938 DataType::Struct(Fields::from(vec![
939 Field::new("key", DataType::Utf8, false),
940 val_field,
941 ])),
942 false,
943 )),
944 false,
945 )
946 }
947 Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
948 #[cfg(feature = "avro_custom_types")]
949 Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
950 #[cfg(feature = "avro_custom_types")]
951 Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
952 #[cfg(feature = "avro_custom_types")]
953 Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
954 #[cfg(feature = "avro_custom_types")]
955 Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
956 #[cfg(feature = "avro_custom_types")]
957 Self::RunEndEncoded(values, bits) => {
958 let run_ends_dt = match *bits {
959 16 => DataType::Int16,
960 32 => DataType::Int32,
961 64 => DataType::Int64,
962 _ => unreachable!(),
963 };
964 DataType::RunEndEncoded(
965 Arc::new(Field::new("run_ends", run_ends_dt, false)),
966 Arc::new(Field::new("values", values.codec().data_type(), true)),
967 )
968 }
969 #[cfg(feature = "avro_custom_types")]
970 Self::Int8 => DataType::Int8,
971 #[cfg(feature = "avro_custom_types")]
972 Self::Int16 => DataType::Int16,
973 #[cfg(feature = "avro_custom_types")]
974 Self::UInt8 => DataType::UInt8,
975 #[cfg(feature = "avro_custom_types")]
976 Self::UInt16 => DataType::UInt16,
977 #[cfg(feature = "avro_custom_types")]
978 Self::UInt32 => DataType::UInt32,
979 #[cfg(feature = "avro_custom_types")]
980 Self::UInt64 => DataType::UInt64,
981 #[cfg(feature = "avro_custom_types")]
982 Self::Float16 => DataType::Float16,
983 #[cfg(feature = "avro_custom_types")]
984 Self::Date64 => DataType::Date64,
985 #[cfg(feature = "avro_custom_types")]
986 Self::TimeNanos => DataType::Time64(TimeUnit::Nanosecond),
987 #[cfg(feature = "avro_custom_types")]
988 Self::Time32Secs => DataType::Time32(TimeUnit::Second),
989 #[cfg(feature = "avro_custom_types")]
990 Self::TimestampSecs(is_utc) => {
991 DataType::Timestamp(TimeUnit::Second, is_utc.then(|| "+00:00".into()))
992 }
993 #[cfg(feature = "avro_custom_types")]
994 Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
995 #[cfg(feature = "avro_custom_types")]
996 Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
997 #[cfg(feature = "avro_custom_types")]
998 Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
999 }
1000 }
1001
1002 pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
1008 if use_utf8view && matches!(self, Self::Utf8) {
1009 Self::Utf8View
1010 } else {
1011 self
1012 }
1013 }
1014
1015 #[inline]
1016 fn union_field_name(&self) -> String {
1017 UnionFieldKind::from(self).as_ref().to_owned()
1018 }
1019}
1020
1021impl From<PrimitiveType> for Codec {
1022 fn from(value: PrimitiveType) -> Self {
1023 match value {
1024 PrimitiveType::Null => Self::Null,
1025 PrimitiveType::Boolean => Self::Boolean,
1026 PrimitiveType::Int => Self::Int32,
1027 PrimitiveType::Long => Self::Int64,
1028 PrimitiveType::Float => Self::Float32,
1029 PrimitiveType::Double => Self::Float64,
1030 PrimitiveType::Bytes => Self::Binary,
1031 PrimitiveType::String => Self::Utf8,
1032 }
1033 }
1034}
1035
1036const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
1045 const MAX_P: [usize; 32] = [
1050 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1051 59, 62, 64, 67, 69, 71, 74, 76,
1052 ];
1053 match n {
1054 1..=32 => Some(MAX_P[n - 1]),
1055 _ => None,
1056 }
1057}
1058
1059fn parse_decimal_attributes(
1060 attributes: &Attributes,
1061 fallback_size: Option<usize>,
1062 precision_required: bool,
1063) -> Result<(usize, usize, Option<usize>), ArrowError> {
1064 let precision = attributes
1065 .additional
1066 .get("precision")
1067 .and_then(|v| v.as_u64())
1068 .or(if precision_required { None } else { Some(10) })
1069 .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
1070 as usize;
1071 let scale = attributes
1072 .additional
1073 .get("scale")
1074 .and_then(|v| v.as_u64())
1075 .unwrap_or(0) as usize;
1076 let size = attributes
1077 .additional
1078 .get("size")
1079 .and_then(|v| v.as_u64())
1080 .map(|s| s as usize)
1081 .or(fallback_size);
1082 if precision == 0 {
1083 return Err(ArrowError::ParseError(
1084 "Decimal requires precision > 0".to_string(),
1085 ));
1086 }
1087 if scale > precision {
1088 return Err(ArrowError::ParseError(format!(
1089 "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
1090 )));
1091 }
1092 if precision > DECIMAL256_MAX_PRECISION as usize {
1093 return Err(ArrowError::ParseError(format!(
1094 "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
1095 DECIMAL256_MAX_PRECISION
1096 )));
1097 }
1098 if let Some(sz) = size {
1099 let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
1100 ArrowError::ParseError(format!(
1101 "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
1102 ))
1103 })?;
1104 if precision > max_p {
1105 return Err(ArrowError::ParseError(format!(
1106 "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
1107 )));
1108 }
1109 }
1110 Ok((precision, scale, size))
1111}
1112
1113#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
1114#[strum(serialize_all = "snake_case")]
1115enum UnionFieldKind {
1116 Null,
1117 Boolean,
1118 Int,
1119 Long,
1120 Float,
1121 Double,
1122 Bytes,
1123 String,
1124 Date,
1125 TimeMillis,
1126 TimeMicros,
1127 TimestampMillisUtc,
1128 TimestampMillisLocal,
1129 TimestampMicrosUtc,
1130 TimestampMicrosLocal,
1131 TimestampNanosUtc,
1132 TimestampNanosLocal,
1133 Duration,
1134 Fixed,
1135 Decimal,
1136 Enum,
1137 Array,
1138 Record,
1139 Map,
1140 Uuid,
1141 Union,
1142}
1143
1144impl From<&Codec> for UnionFieldKind {
1145 fn from(c: &Codec) -> Self {
1146 match c {
1147 Codec::Null => Self::Null,
1148 Codec::Boolean => Self::Boolean,
1149 Codec::Int32 => Self::Int,
1150 Codec::Int64 => Self::Long,
1151 Codec::Float32 => Self::Float,
1152 Codec::Float64 => Self::Double,
1153 Codec::Binary => Self::Bytes,
1154 Codec::Utf8 | Codec::Utf8View => Self::String,
1155 Codec::Date32 => Self::Date,
1156 Codec::TimeMillis => Self::TimeMillis,
1157 Codec::TimeMicros => Self::TimeMicros,
1158 Codec::TimestampMillis(Some(Tz::OffsetZero)) => Self::TimestampMillisUtc,
1159 Codec::TimestampMillis(Some(Tz::Utc)) => Self::TimestampMillisUtc,
1160 Codec::TimestampMillis(None) => Self::TimestampMillisLocal,
1161 Codec::TimestampMicros(Some(Tz::OffsetZero)) => Self::TimestampMicrosUtc,
1162 Codec::TimestampMicros(Some(Tz::Utc)) => Self::TimestampMicrosUtc,
1163 Codec::TimestampMicros(None) => Self::TimestampMicrosLocal,
1164 Codec::TimestampNanos(Some(Tz::OffsetZero)) => Self::TimestampNanosUtc,
1165 Codec::TimestampNanos(Some(Tz::Utc)) => Self::TimestampNanosUtc,
1166 Codec::TimestampNanos(None) => Self::TimestampNanosLocal,
1167 Codec::Interval => Self::Duration,
1168 Codec::Fixed(_) => Self::Fixed,
1169 Codec::Decimal(..) => Self::Decimal,
1170 Codec::Enum(_) => Self::Enum,
1171 Codec::List(_) => Self::Array,
1172 Codec::Struct(_) => Self::Record,
1173 Codec::Map(_) => Self::Map,
1174 Codec::Uuid => Self::Uuid,
1175 Codec::Union(..) => Self::Union,
1176 #[cfg(feature = "avro_custom_types")]
1177 Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
1178 #[cfg(feature = "avro_custom_types")]
1179 Codec::DurationNanos
1180 | Codec::DurationMicros
1181 | Codec::DurationMillis
1182 | Codec::DurationSeconds => Self::Duration,
1183 #[cfg(feature = "avro_custom_types")]
1184 Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 => Self::Int,
1185 #[cfg(feature = "avro_custom_types")]
1186 Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
1187 Self::Long
1188 }
1189 #[cfg(feature = "avro_custom_types")]
1190 Codec::Time32Secs => Self::TimeMillis, #[cfg(feature = "avro_custom_types")]
1192 Codec::UInt64
1193 | Codec::Float16
1194 | Codec::IntervalYearMonth
1195 | Codec::IntervalMonthDayNano
1196 | Codec::IntervalDayTime => Self::Fixed,
1197 }
1198 }
1199}
1200
1201fn union_branch_name(dt: &AvroDataType) -> String {
1202 if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
1203 if name.contains(".") {
1204 return name.to_string();
1206 }
1207 if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1208 return format!("{ns}.{name}");
1209 }
1210 return name.to_string();
1211 }
1212 dt.codec.union_field_name()
1213}
1214
1215fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
1216 let arrow_fields: Vec<Field> = encodings
1217 .iter()
1218 .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1219 .collect();
1220 let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1221 UnionFields::try_new(type_ids, arrow_fields)
1222}
1223
1224#[derive(Debug, Default)]
1228struct Resolver<'a> {
1229 map: HashMap<(&'a str, &'a str), AvroDataType>,
1230}
1231
1232impl<'a> Resolver<'a> {
1233 fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1234 self.map.insert((namespace.unwrap_or(""), name), schema);
1235 }
1236
1237 fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1238 let (namespace, name) = name
1239 .rsplit_once('.')
1240 .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1241 self.map
1242 .get(&(namespace, name))
1243 .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1244 .cloned()
1245 }
1246}
1247
1248fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1249 let mut out = HashSet::with_capacity(1 + aliases.len());
1250 let (full, _) = make_full_name(name, ns, None);
1251 out.insert(full);
1252 for a in aliases {
1253 let (fa, _) = make_full_name(a, None, ns);
1254 out.insert(fa);
1255 }
1256 out
1257}
1258
1259fn names_match(
1260 writer_name: &str,
1261 writer_namespace: Option<&str>,
1262 writer_aliases: &[&str],
1263 reader_name: &str,
1264 reader_namespace: Option<&str>,
1265 reader_aliases: &[&str],
1266) -> bool {
1267 let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1268 let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1269 !writer_set.is_disjoint(&reader_set)
1271}
1272
1273fn ensure_names_match(
1274 data_type: &str,
1275 writer_name: &str,
1276 writer_namespace: Option<&str>,
1277 writer_aliases: &[&str],
1278 reader_name: &str,
1279 reader_namespace: Option<&str>,
1280 reader_aliases: &[&str],
1281) -> Result<(), ArrowError> {
1282 if names_match(
1283 writer_name,
1284 writer_namespace,
1285 writer_aliases,
1286 reader_name,
1287 reader_namespace,
1288 reader_aliases,
1289 ) {
1290 Ok(())
1291 } else {
1292 Err(ArrowError::ParseError(format!(
1293 "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1294 )))
1295 }
1296}
1297
1298fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1299 match schema {
1300 Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1301 Schema::Type(Type {
1302 r#type: TypeName::Primitive(primitive),
1303 ..
1304 }) => Some(*primitive),
1305 _ => None,
1306 }
1307}
1308
1309fn nullable_union_variants<'x, 'y>(
1310 variant: &'y [Schema<'x>],
1311) -> Option<(Nullability, &'y Schema<'x>)> {
1312 if variant.len() != 2 {
1313 return None;
1314 }
1315 let is_null = |schema: &Schema<'x>| {
1316 matches!(
1317 schema,
1318 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1319 )
1320 };
1321 match (is_null(&variant[0]), is_null(&variant[1])) {
1322 (true, false) => Some((Nullability::NullFirst, &variant[1])),
1323 (false, true) => Some((Nullability::NullSecond, &variant[0])),
1324 _ => None,
1325 }
1326}
1327
1328#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1329enum UnionBranchKey {
1330 Named(String),
1331 Primitive(PrimitiveType),
1332 Array,
1333 Map,
1334}
1335
1336fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1337 let (name, namespace) = match s {
1338 Schema::TypeName(TypeName::Primitive(p))
1339 | Schema::Type(Type {
1340 r#type: TypeName::Primitive(p),
1341 ..
1342 }) => return Some(UnionBranchKey::Primitive(*p)),
1343 Schema::TypeName(TypeName::Ref(name))
1344 | Schema::Type(Type {
1345 r#type: TypeName::Ref(name),
1346 ..
1347 }) => (name, None),
1348 Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1349 Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1350 Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1351 Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1352 Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1353 Schema::Union(_) => return None,
1354 };
1355 let (full, _) = make_full_name(name, namespace, enclosing_ns);
1356 Some(UnionBranchKey::Named(full))
1357}
1358
1359fn union_first_duplicate<'a>(
1360 branches: &'a [Schema<'a>],
1361 enclosing_ns: Option<&'a str>,
1362) -> Option<String> {
1363 let mut seen = HashSet::with_capacity(branches.len());
1364 for schema in branches {
1365 if let Some(key) = branch_key_of(schema, enclosing_ns) {
1366 if !seen.insert(key.clone()) {
1367 let msg = match key {
1368 UnionBranchKey::Named(full) => format!("named type {full}"),
1369 UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1370 UnionBranchKey::Array => "array".to_string(),
1371 UnionBranchKey::Map => "map".to_string(),
1372 };
1373 return Some(msg);
1374 }
1375 }
1376 }
1377 None
1378}
1379
1380struct Maker<'a> {
1384 resolver: Resolver<'a>,
1385 use_utf8view: bool,
1386 strict_mode: bool,
1387 tz: Tz,
1388}
1389
1390impl<'a> Maker<'a> {
1391 fn new(use_utf8view: bool, strict_mode: bool, tz: Tz) -> Self {
1392 Self {
1393 resolver: Default::default(),
1394 use_utf8view,
1395 strict_mode,
1396 tz,
1397 }
1398 }
1399
1400 #[cfg(feature = "avro_custom_types")]
1401 #[inline]
1402 fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1403 if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1404 let mut inner = (*values).clone();
1405 inner.nullability = Some(nb);
1406 dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1407 }
1408 }
1409
1410 fn make_data_type<'s>(
1411 &mut self,
1412 writer_schema: &'s Schema<'a>,
1413 reader_schema: Option<&'s Schema<'a>>,
1414 namespace: Option<&'a str>,
1415 ) -> Result<AvroDataType, ArrowError> {
1416 match reader_schema {
1417 Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1418 None => self.parse_type(writer_schema, namespace),
1419 }
1420 }
1421
1422 fn parse_type<'s>(
1435 &mut self,
1436 schema: &'s Schema<'a>,
1437 namespace: Option<&'a str>,
1438 ) -> Result<AvroDataType, ArrowError> {
1439 match schema {
1440 Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1441 Codec::from(*p).with_utf8view(self.use_utf8view),
1442 Default::default(),
1443 None,
1444 )),
1445 Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1446 Schema::Union(f) => {
1447 let null = f
1448 .iter()
1449 .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1450 match (f.len() == 2, null) {
1451 (true, Some(0)) => {
1452 let mut field = self.parse_type(&f[1], namespace)?;
1453 field.nullability = Some(Nullability::NullFirst);
1454 #[cfg(feature = "avro_custom_types")]
1455 Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1456 return Ok(field);
1457 }
1458 (true, Some(1)) => {
1459 if self.strict_mode {
1460 return Err(ArrowError::SchemaError(
1461 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1462 .to_string(),
1463 ));
1464 }
1465 let mut field = self.parse_type(&f[0], namespace)?;
1466 field.nullability = Some(Nullability::NullSecond);
1467 #[cfg(feature = "avro_custom_types")]
1468 Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1469 return Ok(field);
1470 }
1471 _ => {}
1472 }
1473 if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1475 return Err(ArrowError::SchemaError(
1476 "Avro unions may not immediately contain other unions".to_string(),
1477 ));
1478 }
1479 if let Some(dup) = union_first_duplicate(f, namespace) {
1481 return Err(ArrowError::SchemaError(format!(
1482 "Avro union contains duplicate branch type: {dup}"
1483 )));
1484 }
1485 let children: Vec<AvroDataType> = f
1487 .iter()
1488 .map(|s| self.parse_type(s, namespace))
1489 .collect::<Result<_, _>>()?;
1490 let union_fields = build_union_fields(&children)?;
1492 Ok(AvroDataType::new(
1493 Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1494 Default::default(),
1495 None,
1496 ))
1497 }
1498 Schema::Complex(c) => match c {
1499 ComplexType::Record(r) => {
1500 let namespace = r.namespace.or(namespace);
1501 let mut metadata = r.attributes.field_metadata();
1502 let fields = r
1503 .fields
1504 .iter()
1505 .map(|field| {
1506 Ok(AvroField {
1507 name: field.name.to_string(),
1508 data_type: self.parse_type(&field.r#type, namespace)?,
1509 })
1510 })
1511 .collect::<Result<_, ArrowError>>()?;
1512 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1513 if let Some(ns) = namespace {
1514 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1515 }
1516 let field = AvroDataType {
1517 nullability: None,
1518 codec: Codec::Struct(fields),
1519 metadata,
1520 resolution: None,
1521 };
1522 self.resolver.register(r.name, namespace, field.clone());
1523 Ok(field)
1524 }
1525 ComplexType::Array(a) => {
1526 let field = self.parse_type(a.items.as_ref(), namespace)?;
1527 Ok(AvroDataType {
1528 nullability: None,
1529 metadata: a.attributes.field_metadata(),
1530 codec: Codec::List(Arc::new(field)),
1531 resolution: None,
1532 })
1533 }
1534 ComplexType::Fixed(f) => {
1535 let size = f.size.try_into().map_err(|e| {
1536 ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1537 })?;
1538 let namespace = f.namespace.or(namespace);
1539 let mut metadata = f.attributes.field_metadata();
1540 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1541 if let Some(ns) = namespace {
1542 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1543 }
1544 let field = match f.attributes.logical_type {
1545 Some("decimal") => {
1546 let (precision, scale, _) =
1547 parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1548 AvroDataType {
1549 nullability: None,
1550 metadata,
1551 codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1552 resolution: None,
1553 }
1554 }
1555 Some("duration") => {
1556 if size != 12 {
1557 return Err(ArrowError::ParseError(format!(
1558 "Invalid fixed size for Duration: {size}, must be 12"
1559 )));
1560 };
1561 AvroDataType {
1562 nullability: None,
1563 metadata,
1564 codec: Codec::Interval,
1565 resolution: None,
1566 }
1567 }
1568 #[cfg(feature = "avro_custom_types")]
1569 Some("arrow.uint64") if size == 8 => AvroDataType {
1570 nullability: None,
1571 metadata,
1572 codec: Codec::UInt64,
1573 resolution: None,
1574 },
1575 #[cfg(feature = "avro_custom_types")]
1576 Some("arrow.float16") if size == 2 => AvroDataType {
1577 nullability: None,
1578 metadata,
1579 codec: Codec::Float16,
1580 resolution: None,
1581 },
1582 #[cfg(feature = "avro_custom_types")]
1583 Some("arrow.interval-year-month") if size == 4 => AvroDataType {
1584 nullability: None,
1585 metadata,
1586 codec: Codec::IntervalYearMonth,
1587 resolution: None,
1588 },
1589 #[cfg(feature = "avro_custom_types")]
1590 Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1591 nullability: None,
1592 metadata,
1593 codec: Codec::IntervalMonthDayNano,
1594 resolution: None,
1595 },
1596 #[cfg(feature = "avro_custom_types")]
1597 Some("arrow.interval-day-time") if size == 8 => AvroDataType {
1598 nullability: None,
1599 metadata,
1600 codec: Codec::IntervalDayTime,
1601 resolution: None,
1602 },
1603 _ => AvroDataType {
1604 nullability: None,
1605 metadata,
1606 codec: Codec::Fixed(size),
1607 resolution: None,
1608 },
1609 };
1610 self.resolver.register(f.name, namespace, field.clone());
1611 Ok(field)
1612 }
1613 ComplexType::Enum(e) => {
1614 let namespace = e.namespace.or(namespace);
1615 let symbols = e
1616 .symbols
1617 .iter()
1618 .map(|s| s.to_string())
1619 .collect::<Arc<[String]>>();
1620 let mut metadata = e.attributes.field_metadata();
1621 let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1622 ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1623 })?;
1624 metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1625 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1626 if let Some(ns) = namespace {
1627 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1628 }
1629 let field = AvroDataType {
1630 nullability: None,
1631 metadata,
1632 codec: Codec::Enum(symbols),
1633 resolution: None,
1634 };
1635 self.resolver.register(e.name, namespace, field.clone());
1636 Ok(field)
1637 }
1638 ComplexType::Map(m) => {
1639 let val = self.parse_type(&m.values, namespace)?;
1640 Ok(AvroDataType {
1641 nullability: None,
1642 metadata: m.attributes.field_metadata(),
1643 codec: Codec::Map(Arc::new(val)),
1644 resolution: None,
1645 })
1646 }
1647 },
1648 Schema::Type(t) => {
1649 let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1650 match (t.attributes.logical_type, &mut field.codec) {
1652 (Some("decimal"), c @ Codec::Binary) => {
1653 let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1654 *c = Codec::Decimal(prec, Some(sc), None);
1655 }
1656 (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1657 (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1658 (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1659 (Some("timestamp-millis"), c @ Codec::Int64) => {
1660 *c = Codec::TimestampMillis(Some(self.tz))
1661 }
1662 (Some("timestamp-micros"), c @ Codec::Int64) => {
1663 *c = Codec::TimestampMicros(Some(self.tz))
1664 }
1665 (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1666 *c = Codec::TimestampMillis(None)
1667 }
1668 (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1669 *c = Codec::TimestampMicros(None)
1670 }
1671 (Some("timestamp-nanos"), c @ Codec::Int64) => {
1672 *c = Codec::TimestampNanos(Some(self.tz))
1673 }
1674 (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1675 *c = Codec::TimestampNanos(None)
1676 }
1677 (Some("uuid"), c @ Codec::Utf8) => {
1678 *c = Codec::Uuid;
1682 field.metadata.insert("logicalType".into(), "uuid".into());
1683 }
1684 #[cfg(feature = "avro_custom_types")]
1685 (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1686 #[cfg(feature = "avro_custom_types")]
1687 (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1688 #[cfg(feature = "avro_custom_types")]
1689 (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1690 #[cfg(feature = "avro_custom_types")]
1691 (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1692 *c = Codec::DurationSeconds
1693 }
1694 #[cfg(feature = "avro_custom_types")]
1695 (Some("arrow.run-end-encoded"), _) => {
1696 let bits_u8: u8 = t
1697 .attributes
1698 .additional
1699 .get("arrow.runEndIndexBits")
1700 .and_then(|v| v.as_u64())
1701 .and_then(|n| u8::try_from(n).ok())
1702 .ok_or_else(|| ArrowError::ParseError(
1703 "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1704 .to_string(),
1705 ))?;
1706 if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1707 return Err(ArrowError::ParseError(format!(
1708 "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1709 )));
1710 }
1711 let values_site = field.clone();
1713 field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1714 }
1715 #[cfg(feature = "avro_custom_types")]
1717 (Some("arrow.int8"), c @ Codec::Int32) => *c = Codec::Int8,
1718 #[cfg(feature = "avro_custom_types")]
1719 (Some("arrow.int16"), c @ Codec::Int32) => *c = Codec::Int16,
1720 #[cfg(feature = "avro_custom_types")]
1721 (Some("arrow.uint8"), c @ Codec::Int32) => *c = Codec::UInt8,
1722 #[cfg(feature = "avro_custom_types")]
1723 (Some("arrow.uint16"), c @ Codec::Int32) => *c = Codec::UInt16,
1724 #[cfg(feature = "avro_custom_types")]
1725 (Some("arrow.uint32"), c @ Codec::Int64) => *c = Codec::UInt32,
1726 #[cfg(feature = "avro_custom_types")]
1727 (Some("arrow.uint64"), c @ Codec::Fixed(8)) => *c = Codec::UInt64,
1728 #[cfg(feature = "avro_custom_types")]
1730 (Some("arrow.float16"), c @ Codec::Fixed(2)) => *c = Codec::Float16,
1731 #[cfg(feature = "avro_custom_types")]
1733 (Some("arrow.date64"), c @ Codec::Int64) => *c = Codec::Date64,
1734 #[cfg(feature = "avro_custom_types")]
1736 (Some("arrow.time64-nanosecond"), c @ Codec::Int64) => *c = Codec::TimeNanos,
1737 #[cfg(feature = "avro_custom_types")]
1738 (Some("arrow.time32-second"), c @ Codec::Int32) => *c = Codec::Time32Secs,
1739 #[cfg(feature = "avro_custom_types")]
1740 (Some("arrow.timestamp-second"), c @ Codec::Int64) => {
1741 *c = Codec::TimestampSecs(true)
1742 }
1743 #[cfg(feature = "avro_custom_types")]
1744 (Some("arrow.local-timestamp-second"), c @ Codec::Int64) => {
1745 *c = Codec::TimestampSecs(false)
1746 }
1747 #[cfg(feature = "avro_custom_types")]
1749 (Some("arrow.interval-year-month"), c @ Codec::Fixed(4)) => {
1750 *c = Codec::IntervalYearMonth
1751 }
1752 #[cfg(feature = "avro_custom_types")]
1753 (Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1754 *c = Codec::IntervalMonthDayNano
1755 }
1756 #[cfg(feature = "avro_custom_types")]
1757 (Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
1758 *c = Codec::IntervalDayTime
1759 }
1760 (Some(logical), _) => {
1761 field.metadata.insert("logicalType".into(), logical.into());
1763 }
1764 (None, _) => {}
1765 }
1766 if matches!(field.codec, Codec::Int64) {
1767 if let Some(unit) = t
1768 .attributes
1769 .additional
1770 .get("arrowTimeUnit")
1771 .and_then(|v| v.as_str())
1772 {
1773 if unit == "nanosecond" {
1774 field.codec = Codec::TimestampNanos(Some(self.tz));
1775 }
1776 }
1777 }
1778 if !t.attributes.additional.is_empty() {
1779 for (k, v) in &t.attributes.additional {
1780 field.metadata.insert(k.to_string(), v.to_string());
1781 }
1782 }
1783 Ok(field)
1784 }
1785 }
1786 }
1787
1788 fn resolve_type<'s>(
1789 &mut self,
1790 writer_schema: &'s Schema<'a>,
1791 reader_schema: &'s Schema<'a>,
1792 namespace: Option<&'a str>,
1793 ) -> Result<AvroDataType, ArrowError> {
1794 if let (Some(write_primitive), Some(read_primitive)) =
1795 (primitive_of(writer_schema), primitive_of(reader_schema))
1796 {
1797 return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1798 }
1799 match (writer_schema, reader_schema) {
1800 (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1801 let writer_variants = writer_variants.as_slice();
1802 let reader_variants = reader_variants.as_slice();
1803 match (
1804 nullable_union_variants(writer_variants),
1805 nullable_union_variants(reader_variants),
1806 ) {
1807 (Some((w_nb, w_nonnull)), Some((r_nb, r_nonnull))) => {
1808 let mut dt = self.resolve_type(w_nonnull, r_nonnull, namespace)?;
1809 let mut writer_to_reader = vec![None, None];
1810 writer_to_reader[w_nb.non_null_index()] = Some((
1811 r_nb.non_null_index(),
1812 dt.resolution
1813 .take()
1814 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct)),
1815 ));
1816 dt.nullability = Some(w_nb);
1817 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1818 writer_to_reader: Arc::from(writer_to_reader),
1819 writer_is_union: true,
1820 reader_is_union: true,
1821 }));
1822 #[cfg(feature = "avro_custom_types")]
1823 Self::propagate_nullability_into_ree(&mut dt, w_nb);
1824 Ok(dt)
1825 }
1826 _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1827 }
1828 }
1829 (Schema::Union(writer_variants), reader_non_union) => {
1830 let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1831 .iter()
1832 .map(|writer| {
1833 self.resolve_type(writer, reader_non_union, namespace)
1834 .ok()
1835 .map(|tmp| {
1836 let resolution = tmp
1837 .resolution
1838 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1839 (0usize, resolution)
1840 })
1841 })
1842 .collect();
1843 let mut dt = self.parse_type(reader_non_union, namespace)?;
1844 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1845 writer_to_reader: Arc::from(writer_to_reader),
1846 writer_is_union: true,
1847 reader_is_union: false,
1848 }));
1849 Ok(dt)
1850 }
1851 (writer_non_union, Schema::Union(reader_variants)) => {
1852 if let Some((nullability, non_null_branch)) =
1853 nullable_union_variants(reader_variants)
1854 {
1855 let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1856 #[cfg(feature = "avro_custom_types")]
1857 Self::propagate_nullability_into_ree(&mut dt, nullability);
1858 dt.nullability = Some(nullability);
1859 if dt.resolution.is_none() {
1862 dt.resolution = Some(ResolutionInfo::Promotion(Promotion::Direct));
1863 }
1864 Ok(dt)
1865 } else {
1866 let Some((match_idx, mut match_dt)) =
1867 self.find_best_union_match(writer_non_union, reader_variants, namespace)
1868 else {
1869 return Err(ArrowError::SchemaError(
1870 "Writer schema does not match any reader union branch".to_string(),
1871 ));
1872 };
1873 let resolution = match_dt
1879 .resolution
1880 .take()
1881 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1882 let mut match_dt = Some(match_dt);
1883 let children = reader_variants
1884 .iter()
1885 .enumerate()
1886 .map(|(idx, variant)| {
1887 if idx == match_idx {
1888 Ok(match_dt.take().unwrap())
1889 } else {
1890 self.parse_type(variant, namespace)
1891 }
1892 })
1893 .collect::<Result<Vec<_>, _>>()?;
1894 let union_fields = build_union_fields(&children)?;
1895 let mut dt = AvroDataType::new(
1896 Codec::Union(children.into(), union_fields, UnionMode::Dense),
1897 Default::default(),
1898 None,
1899 );
1900 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1901 writer_to_reader: Arc::from(vec![Some((match_idx, resolution))]),
1902 writer_is_union: false,
1903 reader_is_union: true,
1904 }));
1905 Ok(dt)
1906 }
1907 }
1908 (
1909 Schema::Complex(ComplexType::Array(writer_array)),
1910 Schema::Complex(ComplexType::Array(reader_array)),
1911 ) => self.resolve_array(writer_array, reader_array, namespace),
1912 (
1913 Schema::Complex(ComplexType::Map(writer_map)),
1914 Schema::Complex(ComplexType::Map(reader_map)),
1915 ) => self.resolve_map(writer_map, reader_map, namespace),
1916 (
1917 Schema::Complex(ComplexType::Fixed(writer_fixed)),
1918 Schema::Complex(ComplexType::Fixed(reader_fixed)),
1919 ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1920 (
1921 Schema::Complex(ComplexType::Record(writer_record)),
1922 Schema::Complex(ComplexType::Record(reader_record)),
1923 ) => self.resolve_records(writer_record, reader_record, namespace),
1924 (
1925 Schema::Complex(ComplexType::Enum(writer_enum)),
1926 Schema::Complex(ComplexType::Enum(reader_enum)),
1927 ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1928 (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1929 (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1930 _ => Err(ArrowError::NotYetImplemented(
1931 "Other resolutions not yet implemented".to_string(),
1932 )),
1933 }
1934 }
1935
1936 fn find_best_union_match(
1937 &mut self,
1938 writer: &Schema<'a>,
1939 reader_variants: &[Schema<'a>],
1940 namespace: Option<&'a str>,
1941 ) -> Option<(usize, AvroDataType)> {
1942 let mut first_resolution = None;
1943 for (reader_index, reader) in reader_variants.iter().enumerate() {
1944 if let Ok(dt) = self.resolve_type(writer, reader, namespace) {
1945 match &dt.resolution {
1946 None | Some(ResolutionInfo::Promotion(Promotion::Direct)) => {
1947 return Some((reader_index, dt));
1949 }
1950 Some(_) => {
1951 if first_resolution.is_none() {
1952 first_resolution = Some((reader_index, dt));
1954 }
1955 }
1956 };
1957 }
1958 }
1959 first_resolution
1960 }
1961
1962 fn resolve_unions<'s>(
1963 &mut self,
1964 writer_variants: &'s [Schema<'a>],
1965 reader_variants: &'s [Schema<'a>],
1966 namespace: Option<&'a str>,
1967 ) -> Result<AvroDataType, ArrowError> {
1968 let mut resolved_reader_encodings = HashMap::new();
1969 let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1970 .iter()
1971 .map(|writer| {
1972 self.find_best_union_match(writer, reader_variants, namespace)
1973 .map(|(match_idx, mut match_dt)| {
1974 let resolution = match_dt
1975 .resolution
1976 .take()
1977 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1978 resolved_reader_encodings.insert(match_idx, match_dt);
1981 (match_idx, resolution)
1982 })
1983 })
1984 .collect();
1985 let reader_encodings: Vec<AvroDataType> = reader_variants
1986 .iter()
1987 .enumerate()
1988 .map(|(reader_idx, reader_schema)| {
1989 if let Some(resolved) = resolved_reader_encodings.remove(&reader_idx) {
1990 Ok(resolved)
1991 } else {
1992 self.parse_type(reader_schema, namespace)
1993 }
1994 })
1995 .collect::<Result<_, _>>()?;
1996 let union_fields = build_union_fields(&reader_encodings)?;
1997 let mut dt = AvroDataType::new(
1998 Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1999 Default::default(),
2000 None,
2001 );
2002 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
2003 writer_to_reader: Arc::from(writer_to_reader),
2004 writer_is_union: true,
2005 reader_is_union: true,
2006 }));
2007 Ok(dt)
2008 }
2009
2010 fn resolve_array(
2011 &mut self,
2012 writer_array: &Array<'a>,
2013 reader_array: &Array<'a>,
2014 namespace: Option<&'a str>,
2015 ) -> Result<AvroDataType, ArrowError> {
2016 Ok(AvroDataType {
2017 nullability: None,
2018 metadata: reader_array.attributes.field_metadata(),
2019 codec: Codec::List(Arc::new(self.make_data_type(
2020 writer_array.items.as_ref(),
2021 Some(reader_array.items.as_ref()),
2022 namespace,
2023 )?)),
2024 resolution: None,
2025 })
2026 }
2027
2028 fn resolve_map(
2029 &mut self,
2030 writer_map: &Map<'a>,
2031 reader_map: &Map<'a>,
2032 namespace: Option<&'a str>,
2033 ) -> Result<AvroDataType, ArrowError> {
2034 Ok(AvroDataType {
2035 nullability: None,
2036 metadata: reader_map.attributes.field_metadata(),
2037 codec: Codec::Map(Arc::new(self.make_data_type(
2038 &writer_map.values,
2039 Some(&reader_map.values),
2040 namespace,
2041 )?)),
2042 resolution: None,
2043 })
2044 }
2045
2046 fn resolve_fixed<'s>(
2047 &mut self,
2048 writer_fixed: &Fixed<'a>,
2049 reader_fixed: &Fixed<'a>,
2050 reader_schema: &'s Schema<'a>,
2051 namespace: Option<&'a str>,
2052 ) -> Result<AvroDataType, ArrowError> {
2053 ensure_names_match(
2054 "Fixed",
2055 writer_fixed.name,
2056 writer_fixed.namespace,
2057 &writer_fixed.aliases,
2058 reader_fixed.name,
2059 reader_fixed.namespace,
2060 &reader_fixed.aliases,
2061 )?;
2062 if writer_fixed.size != reader_fixed.size {
2063 return Err(ArrowError::SchemaError(format!(
2064 "Fixed size mismatch for {}: writer={}, reader={}",
2065 reader_fixed.name, writer_fixed.size, reader_fixed.size
2066 )));
2067 }
2068 self.parse_type(reader_schema, namespace)
2069 }
2070
2071 fn resolve_primitives(
2072 &mut self,
2073 write_primitive: PrimitiveType,
2074 read_primitive: PrimitiveType,
2075 reader_schema: &Schema<'a>,
2076 ) -> Result<AvroDataType, ArrowError> {
2077 if write_primitive == read_primitive {
2078 return self.parse_type(reader_schema, None);
2079 }
2080 let promotion = match (write_primitive, read_primitive) {
2081 (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
2082 (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
2083 (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
2084 (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
2085 (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
2086 (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
2087 (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
2088 (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
2089 _ => {
2090 return Err(ArrowError::ParseError(format!(
2091 "Illegal promotion {write_primitive:?} to {read_primitive:?}"
2092 )));
2093 }
2094 };
2095 let mut datatype = self.parse_type(reader_schema, None)?;
2096 datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
2097 Ok(datatype)
2098 }
2099
2100 fn resolve_enums(
2156 &mut self,
2157 writer_enum: &Enum<'a>,
2158 reader_enum: &Enum<'a>,
2159 reader_schema: &Schema<'a>,
2160 namespace: Option<&'a str>,
2161 ) -> Result<AvroDataType, ArrowError> {
2162 ensure_names_match(
2163 "Enum",
2164 writer_enum.name,
2165 writer_enum.namespace,
2166 &writer_enum.aliases,
2167 reader_enum.name,
2168 reader_enum.namespace,
2169 &reader_enum.aliases,
2170 )?;
2171 if writer_enum.symbols == reader_enum.symbols {
2172 return self.parse_type(reader_schema, namespace);
2173 }
2174 let reader_index: HashMap<&str, i32> = reader_enum
2175 .symbols
2176 .iter()
2177 .enumerate()
2178 .map(|(index, &symbol)| (symbol, index as i32))
2179 .collect();
2180 let default_index: i32 = match reader_enum.default {
2181 Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
2182 ArrowError::SchemaError(format!(
2183 "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
2184 reader_enum.name,
2185 ))
2186 })?,
2187 None => -1,
2188 };
2189 let mapping: Vec<i32> = writer_enum
2190 .symbols
2191 .iter()
2192 .map(|&write_symbol| {
2193 reader_index
2194 .get(write_symbol)
2195 .copied()
2196 .unwrap_or(default_index)
2197 })
2198 .collect();
2199 if self.strict_mode && mapping.iter().any(|&m| m < 0) {
2200 return Err(ArrowError::SchemaError(format!(
2201 "Reader enum '{}' does not cover all writer symbols and no default is provided",
2202 reader_enum.name
2203 )));
2204 }
2205 let mut dt = self.parse_type(reader_schema, namespace)?;
2206 dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
2207 mapping: Arc::from(mapping),
2208 default_index,
2209 }));
2210 let reader_ns = reader_enum.namespace.or(namespace);
2211 self.resolver
2212 .register(reader_enum.name, reader_ns, dt.clone());
2213 Ok(dt)
2214 }
2215
2216 #[inline]
2217 fn build_writer_lookup(
2218 writer_record: &Record<'a>,
2219 ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
2220 let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
2221 for (idx, wf) in writer_record.fields.iter().enumerate() {
2222 map.insert(wf.name, idx);
2224 }
2225 let mut ambiguous: HashSet<&str> = HashSet::new();
2227 for (idx, wf) in writer_record.fields.iter().enumerate() {
2228 for &alias in &wf.aliases {
2229 match map.entry(alias) {
2230 Entry::Occupied(e) if *e.get() != idx => {
2231 ambiguous.insert(alias);
2232 }
2233 Entry::Vacant(e) => {
2234 e.insert(idx);
2235 }
2236 _ => {}
2237 }
2238 }
2239 }
2240 (map, ambiguous)
2241 }
2242
2243 fn resolve_records(
2244 &mut self,
2245 writer_record: &Record<'a>,
2246 reader_record: &Record<'a>,
2247 namespace: Option<&'a str>,
2248 ) -> Result<AvroDataType, ArrowError> {
2249 ensure_names_match(
2250 "Record",
2251 writer_record.name,
2252 writer_record.namespace,
2253 &writer_record.aliases,
2254 reader_record.name,
2255 reader_record.namespace,
2256 &reader_record.aliases,
2257 )?;
2258 let writer_ns = writer_record.namespace.or(namespace);
2259 let reader_ns = reader_record.namespace.or(namespace);
2260 let mut reader_md = reader_record.attributes.field_metadata();
2261 reader_md.insert(
2262 AVRO_NAME_METADATA_KEY.to_string(),
2263 reader_record.name.to_string(),
2264 );
2265 if let Some(ns) = reader_ns {
2266 reader_md.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
2267 }
2268 let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
2270 let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
2271 let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
2272 let mut default_fields: Vec<usize> = Vec::new();
2274 for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
2275 let mut match_idx = writer_lookup.get(r_field.name).copied();
2277 let mut matched_via_alias: Option<&str> = None;
2278 if match_idx.is_none() {
2279 for &alias in &r_field.aliases {
2280 if let Some(i) = writer_lookup.get(alias).copied() {
2281 if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
2282 return Err(ArrowError::SchemaError(format!(
2283 "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
2284 r_field.name
2285 )));
2286 }
2287 match_idx = Some(i);
2288 matched_via_alias = Some(alias);
2289 break;
2290 }
2291 }
2292 }
2293 if let Some(wi) = match_idx {
2294 if writer_to_reader[wi].is_none() {
2295 let w_schema = &writer_record.fields[wi].r#type;
2296 let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
2297 writer_to_reader[wi] = Some(reader_idx);
2298 reader_fields.push(AvroField {
2299 name: r_field.name.to_owned(),
2300 data_type: dt,
2301 });
2302 continue;
2303 } else if self.strict_mode {
2304 let existing_reader = writer_to_reader[wi].unwrap();
2306 let via = matched_via_alias
2307 .map(|a| format!("alias '{a}'"))
2308 .unwrap_or_else(|| "name match".to_string());
2309 return Err(ArrowError::SchemaError(format!(
2310 "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
2311 writer_record.fields[wi].name
2312 )));
2313 }
2314 }
2316 let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
2318 if let Some(default_json) = r_field.default.as_ref() {
2319 dt.resolution = Some(ResolutionInfo::DefaultValue(
2320 dt.parse_and_store_default(default_json)?,
2321 ));
2322 default_fields.push(reader_idx);
2323 } else if dt.nullability() == Some(Nullability::NullFirst) {
2324 dt.resolution = Some(ResolutionInfo::DefaultValue(
2326 dt.parse_and_store_default(&Value::Null)?,
2327 ));
2328 default_fields.push(reader_idx);
2329 } else {
2330 return Err(ArrowError::SchemaError(format!(
2331 "Reader field '{}' not present in writer schema must have a default value",
2332 r_field.name
2333 )));
2334 }
2335 reader_fields.push(AvroField {
2336 name: r_field.name.to_owned(),
2337 data_type: dt,
2338 });
2339 }
2340 let writer_fields = writer_record
2342 .fields
2343 .iter()
2344 .enumerate()
2345 .map(|(writer_index, writer_field)| {
2346 let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
2347 if let Some(reader_index) = writer_to_reader[writer_index] {
2348 Ok(ResolvedField::ToReader(reader_index, dt))
2349 } else {
2350 Ok(ResolvedField::Skip(dt))
2351 }
2352 })
2353 .collect::<Result<_, ArrowError>>()?;
2354 let resolved = AvroDataType::new_with_resolution(
2355 Codec::Struct(Arc::from(reader_fields)),
2356 reader_md,
2357 None,
2358 Some(ResolutionInfo::Record(ResolvedRecord {
2359 writer_fields,
2360 default_fields: Arc::from(default_fields),
2361 })),
2362 );
2363 self.resolver
2365 .register(reader_record.name, reader_ns, resolved.clone());
2366 Ok(resolved)
2367 }
2368}
2369
2370#[cfg(test)]
2371mod tests {
2372 use super::*;
2373 use crate::schema::{
2374 AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2375 Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2376 };
2377 use indexmap::IndexMap;
2378 use serde_json::{self, Value};
2379
2380 fn create_schema_with_logical_type(
2381 primitive_type: PrimitiveType,
2382 logical_type: &'static str,
2383 ) -> Schema<'static> {
2384 let attributes = Attributes {
2385 logical_type: Some(logical_type),
2386 additional: Default::default(),
2387 };
2388
2389 Schema::Type(Type {
2390 r#type: TypeName::Primitive(primitive_type),
2391 attributes,
2392 })
2393 }
2394
2395 fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2396 let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2397 let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2398 let mut maker = Maker::new(false, false, Tz::default());
2399 maker
2400 .make_data_type(&writer_schema, Some(&reader_schema), None)
2401 .expect("promotion should resolve")
2402 }
2403
2404 fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2405 Schema::TypeName(TypeName::Primitive(pt))
2406 }
2407 fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2408 Schema::Union(branches)
2409 }
2410
2411 #[test]
2412 fn test_date_logical_type() {
2413 let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2414
2415 let mut maker = Maker::new(false, false, Tz::default());
2416 let result = maker.make_data_type(&schema, None, None).unwrap();
2417
2418 assert!(matches!(result.codec, Codec::Date32));
2419 }
2420
2421 #[test]
2422 fn test_time_millis_logical_type() {
2423 let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2424
2425 let mut maker = Maker::new(false, false, Tz::default());
2426 let result = maker.make_data_type(&schema, None, None).unwrap();
2427
2428 assert!(matches!(result.codec, Codec::TimeMillis));
2429 }
2430
2431 #[test]
2432 fn test_time_micros_logical_type() {
2433 let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2434
2435 let mut maker = Maker::new(false, false, Tz::default());
2436 let result = maker.make_data_type(&schema, None, None).unwrap();
2437
2438 assert!(matches!(result.codec, Codec::TimeMicros));
2439 }
2440
2441 #[test]
2442 fn test_timestamp_millis_logical_type() {
2443 for tz in [Tz::OffsetZero, Tz::Utc] {
2444 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2445
2446 let mut maker = Maker::new(false, false, tz);
2447 let result = maker.make_data_type(&schema, None, None).unwrap();
2448
2449 let Codec::TimestampMillis(Some(actual_tz)) = result.codec else {
2450 panic!("Expected TimestampMillis codec");
2451 };
2452 assert_eq!(actual_tz, tz);
2453 }
2454 }
2455
2456 #[test]
2457 fn test_timestamp_micros_logical_type() {
2458 for tz in [Tz::OffsetZero, Tz::Utc] {
2459 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2460
2461 let mut maker = Maker::new(false, false, tz);
2462 let result = maker.make_data_type(&schema, None, None).unwrap();
2463
2464 let Codec::TimestampMicros(Some(actual_tz)) = result.codec else {
2465 panic!("Expected TimestampMicros codec");
2466 };
2467 assert_eq!(actual_tz, tz);
2468 }
2469 }
2470
2471 #[test]
2472 fn test_timestamp_nanos_logical_type() {
2473 for tz in [Tz::OffsetZero, Tz::Utc] {
2474 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-nanos");
2475
2476 let mut maker = Maker::new(false, false, tz);
2477 let result = maker.make_data_type(&schema, None, None).unwrap();
2478
2479 let Codec::TimestampNanos(Some(actual_tz)) = result.codec else {
2480 panic!("Expected TimestampNanos codec");
2481 };
2482 assert_eq!(actual_tz, tz);
2483 }
2484 }
2485
2486 #[test]
2487 fn test_local_timestamp_millis_logical_type() {
2488 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2489
2490 let mut maker = Maker::new(false, false, Tz::default());
2491 let result = maker.make_data_type(&schema, None, None).unwrap();
2492
2493 assert!(matches!(result.codec, Codec::TimestampMillis(None)));
2494 }
2495
2496 #[test]
2497 fn test_local_timestamp_micros_logical_type() {
2498 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2499
2500 let mut maker = Maker::new(false, false, Tz::default());
2501 let result = maker.make_data_type(&schema, None, None).unwrap();
2502
2503 assert!(matches!(result.codec, Codec::TimestampMicros(None)));
2504 }
2505
2506 #[test]
2507 fn test_local_timestamp_nanos_logical_type() {
2508 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-nanos");
2509
2510 let mut maker = Maker::new(false, false, Tz::default());
2511 let result = maker.make_data_type(&schema, None, None).unwrap();
2512
2513 assert!(matches!(result.codec, Codec::TimestampNanos(None)));
2514 }
2515
2516 #[test]
2517 fn test_uuid_type() {
2518 let mut codec = Codec::Fixed(16);
2519 if let c @ Codec::Fixed(16) = &mut codec {
2520 *c = Codec::Uuid;
2521 }
2522 assert!(matches!(codec, Codec::Uuid));
2523 }
2524
2525 #[test]
2526 fn test_duration_logical_type() {
2527 let mut codec = Codec::Fixed(12);
2528
2529 if let c @ Codec::Fixed(12) = &mut codec {
2530 *c = Codec::Interval;
2531 }
2532
2533 assert!(matches!(codec, Codec::Interval));
2534 }
2535
2536 #[test]
2537 fn test_decimal_logical_type_not_implemented() {
2538 let codec = Codec::Fixed(16);
2539
2540 let process_decimal = || -> Result<(), ArrowError> {
2541 if let Codec::Fixed(_) = codec {
2542 return Err(ArrowError::NotYetImplemented(
2543 "Decimals are not currently supported".to_string(),
2544 ));
2545 }
2546 Ok(())
2547 };
2548
2549 let result = process_decimal();
2550
2551 assert!(result.is_err());
2552 if let Err(ArrowError::NotYetImplemented(msg)) = result {
2553 assert!(msg.contains("Decimals are not currently supported"));
2554 } else {
2555 panic!("Expected NotYetImplemented error");
2556 }
2557 }
2558 #[test]
2559 fn test_unknown_logical_type_added_to_metadata() {
2560 let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2561
2562 let mut maker = Maker::new(false, false, Tz::default());
2563 let result = maker.make_data_type(&schema, None, None).unwrap();
2564
2565 assert_eq!(
2566 result.metadata.get("logicalType"),
2567 Some(&"custom-type".to_string())
2568 );
2569 }
2570
2571 #[test]
2572 fn test_string_with_utf8view_enabled() {
2573 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2574
2575 let mut maker = Maker::new(true, false, Tz::default());
2576 let result = maker.make_data_type(&schema, None, None).unwrap();
2577
2578 assert!(matches!(result.codec, Codec::Utf8View));
2579 }
2580
2581 #[test]
2582 fn test_string_without_utf8view_enabled() {
2583 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2584
2585 let mut maker = Maker::new(false, false, Tz::default());
2586 let result = maker.make_data_type(&schema, None, None).unwrap();
2587
2588 assert!(matches!(result.codec, Codec::Utf8));
2589 }
2590
2591 #[test]
2592 fn test_record_with_string_and_utf8view_enabled() {
2593 let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2594
2595 let avro_field = crate::schema::Field {
2596 name: "string_field",
2597 r#type: field_schema,
2598 default: None,
2599 doc: None,
2600 aliases: vec![],
2601 };
2602
2603 let record = Record {
2604 name: "test_record",
2605 namespace: None,
2606 aliases: vec![],
2607 doc: None,
2608 fields: vec![avro_field],
2609 attributes: Attributes::default(),
2610 };
2611
2612 let schema = Schema::Complex(ComplexType::Record(record));
2613
2614 let mut maker = Maker::new(true, false, Tz::default());
2615 let result = maker.make_data_type(&schema, None, None).unwrap();
2616
2617 if let Codec::Struct(fields) = &result.codec {
2618 let first_field_codec = &fields[0].data_type().codec;
2619 assert!(matches!(first_field_codec, Codec::Utf8View));
2620 } else {
2621 panic!("Expected Struct codec");
2622 }
2623 }
2624
2625 #[test]
2626 fn test_union_with_strict_mode() {
2627 let schema = Schema::Union(vec![
2628 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2629 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2630 ]);
2631
2632 let mut maker = Maker::new(false, true, Tz::default());
2633 let result = maker.make_data_type(&schema, None, None);
2634
2635 assert!(result.is_err());
2636 match result {
2637 Err(ArrowError::SchemaError(msg)) => {
2638 assert!(msg.contains(
2639 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2640 ));
2641 }
2642 _ => panic!("Expected SchemaError"),
2643 }
2644 }
2645
2646 #[test]
2647 fn test_resolve_int_to_float_promotion() {
2648 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2649 assert!(matches!(result.codec, Codec::Float32));
2650 assert_eq!(
2651 result.resolution,
2652 Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2653 );
2654 }
2655
2656 #[test]
2657 fn test_resolve_int_to_double_promotion() {
2658 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2659 assert!(matches!(result.codec, Codec::Float64));
2660 assert_eq!(
2661 result.resolution,
2662 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2663 );
2664 }
2665
2666 #[test]
2667 fn test_resolve_long_to_float_promotion() {
2668 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2669 assert!(matches!(result.codec, Codec::Float32));
2670 assert_eq!(
2671 result.resolution,
2672 Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2673 );
2674 }
2675
2676 #[test]
2677 fn test_resolve_long_to_double_promotion() {
2678 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2679 assert!(matches!(result.codec, Codec::Float64));
2680 assert_eq!(
2681 result.resolution,
2682 Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2683 );
2684 }
2685
2686 #[test]
2687 fn test_resolve_float_to_double_promotion() {
2688 let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2689 assert!(matches!(result.codec, Codec::Float64));
2690 assert_eq!(
2691 result.resolution,
2692 Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2693 );
2694 }
2695
2696 #[test]
2697 fn test_resolve_string_to_bytes_promotion() {
2698 let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2699 assert!(matches!(result.codec, Codec::Binary));
2700 assert_eq!(
2701 result.resolution,
2702 Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2703 );
2704 }
2705
2706 #[test]
2707 fn test_resolve_bytes_to_string_promotion() {
2708 let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2709 assert!(matches!(result.codec, Codec::Utf8));
2710 assert_eq!(
2711 result.resolution,
2712 Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2713 );
2714 }
2715
2716 #[test]
2717 fn test_resolve_illegal_promotion_double_to_float_errors() {
2718 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2719 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2720 let mut maker = Maker::new(false, false, Tz::default());
2721 let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2722 assert!(result.is_err());
2723 match result {
2724 Err(ArrowError::ParseError(msg)) => {
2725 assert!(msg.contains("Illegal promotion"));
2726 }
2727 _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2728 }
2729 }
2730
2731 #[test]
2732 fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2733 let writer = Schema::Union(vec![
2734 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2735 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2736 ]);
2737 let reader = Schema::Union(vec![
2738 Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2739 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2740 ]);
2741 let mut maker = Maker::new(false, false, Tz::default());
2742 let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2743 assert!(matches!(result.codec, Codec::Float64));
2744 assert_eq!(
2745 result.resolution,
2746 Some(ResolutionInfo::Union(ResolvedUnion {
2747 writer_to_reader: [
2748 None,
2749 Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2750 ]
2751 .into(),
2752 writer_is_union: true,
2753 reader_is_union: true,
2754 }))
2755 );
2756 assert_eq!(result.nullability, Some(Nullability::NullFirst));
2757 }
2758
2759 #[test]
2760 fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2761 let writer = mk_union(vec![
2762 mk_primitive(PrimitiveType::String),
2763 mk_primitive(PrimitiveType::Long),
2764 ]);
2765 let reader = mk_primitive(PrimitiveType::Bytes);
2766 let mut maker = Maker::new(false, false, Tz::default());
2767 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2768 assert!(matches!(dt.codec(), Codec::Binary));
2769 let resolved = match dt.resolution {
2770 Some(ResolutionInfo::Union(u)) => u,
2771 other => panic!("expected union resolution info, got {other:?}"),
2772 };
2773 assert!(resolved.writer_is_union && !resolved.reader_is_union);
2774 assert_eq!(
2775 resolved.writer_to_reader.as_ref(),
2776 &[
2777 Some((0, ResolutionInfo::Promotion(Promotion::StringToBytes))),
2778 None
2779 ]
2780 );
2781 }
2782
2783 #[test]
2784 fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2785 let writer = mk_primitive(PrimitiveType::Long);
2786 let reader = mk_union(vec![
2787 mk_primitive(PrimitiveType::Long),
2788 mk_primitive(PrimitiveType::Double),
2789 ]);
2790 let mut maker = Maker::new(false, false, Tz::default());
2791 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2792 let resolved = match dt.resolution {
2793 Some(ResolutionInfo::Union(u)) => u,
2794 other => panic!("expected union resolution info, got {other:?}"),
2795 };
2796 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2797 assert_eq!(
2798 resolved.writer_to_reader.as_ref(),
2799 &[Some((0, ResolutionInfo::Promotion(Promotion::Direct)))]
2800 );
2801 }
2802
2803 #[test]
2804 fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2805 let writer = mk_primitive(PrimitiveType::Int);
2806 let reader = mk_union(vec![
2807 mk_primitive(PrimitiveType::Null),
2808 mk_primitive(PrimitiveType::Long),
2809 mk_primitive(PrimitiveType::String),
2810 ]);
2811 let mut maker = Maker::new(false, false, Tz::default());
2812 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2813 let resolved = match dt.resolution {
2814 Some(ResolutionInfo::Union(u)) => u,
2815 other => panic!("expected union resolution info, got {other:?}"),
2816 };
2817 assert_eq!(
2818 resolved.writer_to_reader.as_ref(),
2819 &[Some((1, ResolutionInfo::Promotion(Promotion::IntToLong)))]
2820 );
2821 }
2822
2823 #[test]
2824 fn test_resolve_writer_non_union_to_reader_union_preserves_inner_record_defaults() {
2825 let writer = Schema::Complex(ComplexType::Record(Record {
2829 name: "Inner",
2830 namespace: None,
2831 doc: None,
2832 aliases: vec![],
2833 fields: vec![AvroFieldSchema {
2834 name: "a",
2835 doc: None,
2836 r#type: mk_primitive(PrimitiveType::Int),
2837 default: None,
2838 aliases: vec![],
2839 }],
2840 attributes: Attributes::default(),
2841 }));
2842 let reader = mk_union(vec![
2843 Schema::Complex(ComplexType::Record(Record {
2844 name: "Inner",
2845 namespace: None,
2846 doc: None,
2847 aliases: vec![],
2848 fields: vec![
2849 AvroFieldSchema {
2850 name: "a",
2851 doc: None,
2852 r#type: mk_primitive(PrimitiveType::Int),
2853 default: None,
2854 aliases: vec![],
2855 },
2856 AvroFieldSchema {
2857 name: "b",
2858 doc: None,
2859 r#type: mk_primitive(PrimitiveType::Int),
2860 default: Some(Value::Number(serde_json::Number::from(42))),
2861 aliases: vec![],
2862 },
2863 ],
2864 attributes: Attributes::default(),
2865 })),
2866 mk_primitive(PrimitiveType::String),
2867 ]);
2868 let mut maker = Maker::new(false, false, Default::default());
2869 let dt = maker
2870 .make_data_type(&writer, Some(&reader), None)
2871 .expect("resolution should succeed");
2872 let resolved = match dt.resolution.as_ref() {
2874 Some(ResolutionInfo::Union(u)) => u,
2875 other => panic!("expected union resolution info, got {other:?}"),
2876 };
2877 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2878 assert_eq!(
2879 resolved.writer_to_reader.len(),
2880 1,
2881 "expected the non-union record to resolve to a union variant"
2882 );
2883 let resolution = match resolved.writer_to_reader.first().unwrap() {
2884 Some((0, resolution)) => resolution,
2885 other => panic!("unexpected writer-to-reader table value {other:?}"),
2886 };
2887 match resolution {
2888 ResolutionInfo::Record(ResolvedRecord {
2889 writer_fields,
2890 default_fields,
2891 }) => {
2892 assert_eq!(writer_fields.len(), 1);
2893 assert!(matches!(writer_fields[0], ResolvedField::ToReader(0, _)));
2894 assert_eq!(default_fields.len(), 1);
2895 assert_eq!(default_fields[0], 1);
2896 }
2897 other => panic!("unexpected resolution {other:?}"),
2898 }
2899 let children = match dt.codec() {
2901 Codec::Union(children, _, _) => children,
2902 other => panic!("expected union codec, got {other:?}"),
2903 };
2904 let inner_fields = match children[0].codec() {
2905 Codec::Struct(f) => f,
2906 other => panic!("expected struct codec for Inner, got {other:?}"),
2907 };
2908 assert_eq!(inner_fields.len(), 2);
2909 assert_eq!(inner_fields[1].name(), "b");
2910 assert_eq!(
2911 inner_fields[1].data_type().resolution,
2912 Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2913 "field b should have DefaultValue(Int(42)) from schema resolution"
2914 );
2915 }
2916
2917 #[test]
2918 fn test_resolve_writer_union_to_reader_union_preserves_inner_record_defaults() {
2919 let writer = mk_union(vec![
2923 mk_primitive(PrimitiveType::String),
2924 Schema::Complex(ComplexType::Record(Record {
2925 name: "Inner",
2926 namespace: None,
2927 doc: None,
2928 aliases: vec![],
2929 fields: vec![AvroFieldSchema {
2930 name: "a",
2931 doc: None,
2932 r#type: mk_primitive(PrimitiveType::Int),
2933 default: None,
2934 aliases: vec![],
2935 }],
2936 attributes: Attributes::default(),
2937 })),
2938 ]);
2939 let reader = mk_union(vec![
2940 Schema::Complex(ComplexType::Record(Record {
2941 name: "Inner",
2942 namespace: None,
2943 doc: None,
2944 aliases: vec![],
2945 fields: vec![
2946 AvroFieldSchema {
2947 name: "a",
2948 doc: None,
2949 r#type: mk_primitive(PrimitiveType::Int),
2950 default: None,
2951 aliases: vec![],
2952 },
2953 AvroFieldSchema {
2954 name: "b",
2955 doc: None,
2956 r#type: mk_primitive(PrimitiveType::Int),
2957 default: Some(Value::Number(serde_json::Number::from(42))),
2958 aliases: vec![],
2959 },
2960 ],
2961 attributes: Attributes::default(),
2962 })),
2963 mk_primitive(PrimitiveType::String),
2964 ]);
2965 let mut maker = Maker::new(false, false, Default::default());
2966 let dt = maker
2967 .make_data_type(&writer, Some(&reader), None)
2968 .expect("resolution should succeed");
2969 let resolved = match dt.resolution.as_ref() {
2971 Some(ResolutionInfo::Union(u)) => u,
2972 other => panic!("expected union resolution info, got {other:?}"),
2973 };
2974 assert!(resolved.writer_is_union && resolved.reader_is_union);
2975 assert_eq!(resolved.writer_to_reader.len(), 2);
2976 let resolution = match resolved.writer_to_reader[1].as_ref() {
2977 Some((0, resolution)) => resolution,
2978 other => panic!("unexpected writer-to-reader table value {other:?}"),
2979 };
2980 match resolution {
2981 ResolutionInfo::Record(ResolvedRecord {
2982 writer_fields,
2983 default_fields,
2984 }) => {
2985 assert_eq!(writer_fields.len(), 1);
2986 assert!(matches!(writer_fields[0], ResolvedField::ToReader(0, _)));
2987 assert_eq!(default_fields.len(), 1);
2988 assert_eq!(default_fields[0], 1);
2989 }
2990 other => panic!("unexpected resolution {other:?}"),
2991 }
2992 let children = match dt.codec() {
2994 Codec::Union(children, _, _) => children,
2995 other => panic!("expected union codec, got {other:?}"),
2996 };
2997 let inner_fields = match children[0].codec() {
2998 Codec::Struct(f) => f,
2999 other => panic!("expected struct codec for Inner, got {other:?}"),
3000 };
3001 assert_eq!(inner_fields.len(), 2);
3002 assert_eq!(inner_fields[1].name(), "b");
3003 assert_eq!(
3004 inner_fields[1].data_type().resolution,
3005 Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
3006 "field b should have DefaultValue(Int(42)) from schema resolution"
3007 );
3008 }
3009
3010 #[test]
3011 fn test_resolve_both_nullable_unions_direct_match() {
3012 let writer = mk_union(vec![
3013 mk_primitive(PrimitiveType::Null),
3014 mk_primitive(PrimitiveType::String),
3015 ]);
3016 let reader = mk_union(vec![
3017 mk_primitive(PrimitiveType::String),
3018 mk_primitive(PrimitiveType::Null),
3019 ]);
3020 let mut maker = Maker::new(false, false, Tz::default());
3021 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
3022 assert!(matches!(dt.codec(), Codec::Utf8));
3023 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
3024 assert_eq!(
3025 dt.resolution,
3026 Some(ResolutionInfo::Union(ResolvedUnion {
3027 writer_to_reader: [
3028 None,
3029 Some((0, ResolutionInfo::Promotion(Promotion::Direct)))
3030 ]
3031 .into(),
3032 writer_is_union: true,
3033 reader_is_union: true
3034 }))
3035 );
3036 }
3037
3038 #[test]
3039 fn test_resolve_both_nullable_unions_with_promotion() {
3040 let writer = mk_union(vec![
3041 mk_primitive(PrimitiveType::Null),
3042 mk_primitive(PrimitiveType::Int),
3043 ]);
3044 let reader = mk_union(vec![
3045 mk_primitive(PrimitiveType::Double),
3046 mk_primitive(PrimitiveType::Null),
3047 ]);
3048 let mut maker = Maker::new(false, false, Tz::default());
3049 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
3050 assert!(matches!(dt.codec(), Codec::Float64));
3051 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
3052 assert_eq!(
3053 dt.resolution,
3054 Some(ResolutionInfo::Union(ResolvedUnion {
3055 writer_to_reader: [
3056 None,
3057 Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
3058 ]
3059 .into(),
3060 writer_is_union: true,
3061 reader_is_union: true
3062 }))
3063 );
3064 }
3065
3066 #[test]
3067 fn test_resolve_type_promotion() {
3068 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3069 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
3070 let mut maker = Maker::new(false, false, Tz::default());
3071 let result = maker
3072 .make_data_type(&writer_schema, Some(&reader_schema), None)
3073 .unwrap();
3074 assert!(matches!(result.codec, Codec::Int64));
3075 assert_eq!(
3076 result.resolution,
3077 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3078 );
3079 }
3080
3081 #[test]
3082 fn test_nested_record_type_reuse_without_namespace() {
3083 let schema_str = r#"
3084 {
3085 "type": "record",
3086 "name": "Record",
3087 "fields": [
3088 {
3089 "name": "nested",
3090 "type": {
3091 "type": "record",
3092 "name": "Nested",
3093 "fields": [
3094 { "name": "nested_int", "type": "int" }
3095 ]
3096 }
3097 },
3098 { "name": "nestedRecord", "type": "Nested" },
3099 { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
3100 { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
3101 ]
3102 }
3103 "#;
3104
3105 let schema: Schema = serde_json::from_str(schema_str).unwrap();
3106
3107 let mut maker = Maker::new(false, false, Tz::default());
3108 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3109
3110 if let Codec::Struct(fields) = avro_data_type.codec() {
3111 assert_eq!(fields.len(), 4);
3112
3113 assert_eq!(fields[0].name(), "nested");
3115 let nested_data_type = fields[0].data_type();
3116 if let Codec::Struct(nested_fields) = nested_data_type.codec() {
3117 assert_eq!(nested_fields.len(), 1);
3118 assert_eq!(nested_fields[0].name(), "nested_int");
3119 assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
3120 } else {
3121 panic!(
3122 "'nested' field is not a struct but {:?}",
3123 nested_data_type.codec()
3124 );
3125 }
3126
3127 assert_eq!(fields[1].name(), "nestedRecord");
3129 let nested_record_data_type = fields[1].data_type();
3130 assert_eq!(
3131 nested_record_data_type.codec().data_type(),
3132 nested_data_type.codec().data_type()
3133 );
3134
3135 assert_eq!(fields[2].name(), "nestedArray");
3137 if let Codec::List(item_type) = fields[2].data_type().codec() {
3138 assert_eq!(
3139 item_type.codec().data_type(),
3140 nested_data_type.codec().data_type()
3141 );
3142 } else {
3143 panic!("'nestedArray' field is not a list");
3144 }
3145
3146 assert_eq!(fields[3].name(), "nestedMap");
3148 if let Codec::Map(value_type) = fields[3].data_type().codec() {
3149 assert_eq!(
3150 value_type.codec().data_type(),
3151 nested_data_type.codec().data_type()
3152 );
3153 } else {
3154 panic!("'nestedMap' field is not a map");
3155 }
3156 } else {
3157 panic!("Top-level schema is not a struct");
3158 }
3159 }
3160
3161 #[test]
3162 fn test_nested_enum_type_reuse_with_namespace() {
3163 let schema_str = r#"
3164 {
3165 "type": "record",
3166 "name": "Record",
3167 "namespace": "record_ns",
3168 "fields": [
3169 {
3170 "name": "status",
3171 "type": {
3172 "type": "enum",
3173 "name": "Status",
3174 "namespace": "enum_ns",
3175 "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
3176 }
3177 },
3178 { "name": "backupStatus", "type": "enum_ns.Status" },
3179 { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
3180 { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
3181 ]
3182 }
3183 "#;
3184
3185 let schema: Schema = serde_json::from_str(schema_str).unwrap();
3186
3187 let mut maker = Maker::new(false, false, Tz::default());
3188 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3189
3190 if let Codec::Struct(fields) = avro_data_type.codec() {
3191 assert_eq!(fields.len(), 4);
3192
3193 assert_eq!(fields[0].name(), "status");
3195 let status_data_type = fields[0].data_type();
3196 if let Codec::Enum(symbols) = status_data_type.codec() {
3197 assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
3198 } else {
3199 panic!(
3200 "'status' field is not an enum but {:?}",
3201 status_data_type.codec()
3202 );
3203 }
3204
3205 assert_eq!(fields[1].name(), "backupStatus");
3207 let backup_status_data_type = fields[1].data_type();
3208 assert_eq!(
3209 backup_status_data_type.codec().data_type(),
3210 status_data_type.codec().data_type()
3211 );
3212
3213 assert_eq!(fields[2].name(), "statusHistory");
3215 if let Codec::List(item_type) = fields[2].data_type().codec() {
3216 assert_eq!(
3217 item_type.codec().data_type(),
3218 status_data_type.codec().data_type()
3219 );
3220 } else {
3221 panic!("'statusHistory' field is not a list");
3222 }
3223
3224 assert_eq!(fields[3].name(), "statusMap");
3226 if let Codec::Map(value_type) = fields[3].data_type().codec() {
3227 assert_eq!(
3228 value_type.codec().data_type(),
3229 status_data_type.codec().data_type()
3230 );
3231 } else {
3232 panic!("'statusMap' field is not a map");
3233 }
3234 } else {
3235 panic!("Top-level schema is not a struct");
3236 }
3237 }
3238
3239 #[test]
3240 fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
3241 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3242 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3243 let mut maker = Maker::new(false, false, Tz::default());
3244 let data_type = maker
3245 .make_data_type(&writer_schema, Some(&reader_schema), None)
3246 .expect("resolution should succeed");
3247 let field = AvroField {
3248 name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
3249 data_type,
3250 };
3251 assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
3252 assert!(matches!(field.data_type().codec(), Codec::Utf8));
3253 }
3254
3255 fn json_string(s: &str) -> Value {
3256 Value::String(s.to_string())
3257 }
3258
3259 fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
3260 let stored = dt
3261 .metadata
3262 .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
3263 .cloned()
3264 .unwrap_or_default();
3265 let expected = serde_json::to_string(default_json).unwrap();
3266 assert_eq!(stored, expected, "stored default metadata should match");
3267 }
3268
3269 #[test]
3270 fn test_validate_and_store_default_null_and_nullability_rules() {
3271 let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
3272 let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
3273 assert_eq!(lit, AvroLiteral::Null);
3274 assert_default_stored(&dt_null, &Value::Null);
3275 let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3276 let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
3277 assert!(
3278 err.to_string()
3279 .contains("JSON null default is only valid for `null` type"),
3280 "unexpected error: {err}"
3281 );
3282 let mut dt_int_nf =
3283 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
3284 let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
3285 assert_eq!(lit2, AvroLiteral::Null);
3286 assert_default_stored(&dt_int_nf, &Value::Null);
3287 let mut dt_int_ns =
3288 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
3289 let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
3290 assert!(
3291 err2.to_string()
3292 .contains("JSON null default is only valid for `null` type"),
3293 "unexpected error: {err2}"
3294 );
3295 }
3296
3297 #[test]
3298 fn test_validate_and_store_default_primitives_and_temporal() {
3299 let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
3300 let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
3301 assert_eq!(lit, AvroLiteral::Boolean(true));
3302 assert_default_stored(&dt_bool, &Value::Bool(true));
3303 let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3304 let lit = dt_i32
3305 .parse_and_store_default(&serde_json::json!(123))
3306 .unwrap();
3307 assert_eq!(lit, AvroLiteral::Int(123));
3308 assert_default_stored(&dt_i32, &serde_json::json!(123));
3309 let err = dt_i32
3310 .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
3311 .unwrap_err();
3312 assert!(format!("{err}").contains("out of i32 range"));
3313 let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3314 let lit = dt_i64
3315 .parse_and_store_default(&serde_json::json!(1234567890))
3316 .unwrap();
3317 assert_eq!(lit, AvroLiteral::Long(1234567890));
3318 assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
3319 let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
3320 let lit = dt_f32
3321 .parse_and_store_default(&serde_json::json!(1.25))
3322 .unwrap();
3323 assert_eq!(lit, AvroLiteral::Float(1.25));
3324 assert_default_stored(&dt_f32, &serde_json::json!(1.25));
3325 let err = dt_f32
3326 .parse_and_store_default(&serde_json::json!(1e39))
3327 .unwrap_err();
3328 assert!(format!("{err}").contains("out of f32 range"));
3329 let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3330 let lit = dt_f64
3331 .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
3332 .unwrap();
3333 assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
3334 assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
3335 let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
3336 let l = dt_str
3337 .parse_and_store_default(&json_string("hello"))
3338 .unwrap();
3339 assert_eq!(l, AvroLiteral::String("hello".into()));
3340 assert_default_stored(&dt_str, &json_string("hello"));
3341 let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
3342 let l = dt_strv
3343 .parse_and_store_default(&json_string("view"))
3344 .unwrap();
3345 assert_eq!(l, AvroLiteral::String("view".into()));
3346 assert_default_stored(&dt_strv, &json_string("view"));
3347 let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
3348 let l = dt_uuid
3349 .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
3350 .unwrap();
3351 assert_eq!(
3352 l,
3353 AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
3354 );
3355 let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
3356 let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
3357 assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
3358 let err = dt_bin
3359 .parse_and_store_default(&json_string("€")) .unwrap_err();
3361 assert!(format!("{err}").contains("Invalid codepoint"));
3362 let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
3363 let ld = dt_date
3364 .parse_and_store_default(&serde_json::json!(1))
3365 .unwrap();
3366 assert_eq!(ld, AvroLiteral::Int(1));
3367 let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
3368 let lt = dt_tmill
3369 .parse_and_store_default(&serde_json::json!(86_400_000))
3370 .unwrap();
3371 assert_eq!(lt, AvroLiteral::Int(86_400_000));
3372 let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
3373 let ltm = dt_tmicros
3374 .parse_and_store_default(&serde_json::json!(1_000_000))
3375 .unwrap();
3376 assert_eq!(ltm, AvroLiteral::Long(1_000_000));
3377 let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(None), HashMap::new(), None);
3378 let l1 = dt_ts_milli
3379 .parse_and_store_default(&serde_json::json!(123))
3380 .unwrap();
3381 assert_eq!(l1, AvroLiteral::Long(123));
3382 let mut dt_ts_micro = AvroDataType::new(Codec::TimestampMicros(None), HashMap::new(), None);
3383 let l2 = dt_ts_micro
3384 .parse_and_store_default(&serde_json::json!(456))
3385 .unwrap();
3386 assert_eq!(l2, AvroLiteral::Long(456));
3387 }
3388
3389 #[cfg(feature = "avro_custom_types")]
3390 #[test]
3391 fn test_validate_and_store_default_custom_integer_ranges() {
3392 let mut dt_i8 = AvroDataType::new(Codec::Int8, HashMap::new(), None);
3393 let lit_i8 = dt_i8
3394 .parse_and_store_default(&serde_json::json!(i8::MAX))
3395 .unwrap();
3396 assert_eq!(lit_i8, AvroLiteral::Int(i8::MAX as i32));
3397 let err_i8_high = dt_i8
3398 .parse_and_store_default(&serde_json::json!(i8::MAX as i64 + 1))
3399 .unwrap_err();
3400 assert!(err_i8_high.to_string().contains("out of i8 range"));
3401 let err_i8_low = dt_i8
3402 .parse_and_store_default(&serde_json::json!(i8::MIN as i64 - 1))
3403 .unwrap_err();
3404 assert!(err_i8_low.to_string().contains("out of i8 range"));
3405
3406 let mut dt_i16 = AvroDataType::new(Codec::Int16, HashMap::new(), None);
3407 let lit_i16 = dt_i16
3408 .parse_and_store_default(&serde_json::json!(i16::MIN))
3409 .unwrap();
3410 assert_eq!(lit_i16, AvroLiteral::Int(i16::MIN as i32));
3411 let err_i16_high = dt_i16
3412 .parse_and_store_default(&serde_json::json!(i16::MAX as i64 + 1))
3413 .unwrap_err();
3414 assert!(err_i16_high.to_string().contains("out of i16 range"));
3415 let err_i16_low = dt_i16
3416 .parse_and_store_default(&serde_json::json!(i16::MIN as i64 - 1))
3417 .unwrap_err();
3418 assert!(err_i16_low.to_string().contains("out of i16 range"));
3419
3420 let mut dt_u8 = AvroDataType::new(Codec::UInt8, HashMap::new(), None);
3421 let lit_u8 = dt_u8
3422 .parse_and_store_default(&serde_json::json!(u8::MAX))
3423 .unwrap();
3424 assert_eq!(lit_u8, AvroLiteral::Int(u8::MAX as i32));
3425 let err_u8_neg = dt_u8
3426 .parse_and_store_default(&serde_json::json!(-1))
3427 .unwrap_err();
3428 assert!(err_u8_neg.to_string().contains("out of u8 range"));
3429 let err_u8_high = dt_u8
3430 .parse_and_store_default(&serde_json::json!(u8::MAX as i64 + 1))
3431 .unwrap_err();
3432 assert!(err_u8_high.to_string().contains("out of u8 range"));
3433
3434 let mut dt_u16 = AvroDataType::new(Codec::UInt16, HashMap::new(), None);
3435 let lit_u16 = dt_u16
3436 .parse_and_store_default(&serde_json::json!(u16::MAX))
3437 .unwrap();
3438 assert_eq!(lit_u16, AvroLiteral::Int(u16::MAX as i32));
3439 let err_u16_neg = dt_u16
3440 .parse_and_store_default(&serde_json::json!(-1))
3441 .unwrap_err();
3442 assert!(err_u16_neg.to_string().contains("out of u16 range"));
3443 let err_u16_high = dt_u16
3444 .parse_and_store_default(&serde_json::json!(u16::MAX as i64 + 1))
3445 .unwrap_err();
3446 assert!(err_u16_high.to_string().contains("out of u16 range"));
3447
3448 let mut dt_u32 = AvroDataType::new(Codec::UInt32, HashMap::new(), None);
3449 let lit_u32 = dt_u32
3450 .parse_and_store_default(&serde_json::json!(u32::MAX as i64))
3451 .unwrap();
3452 assert_eq!(lit_u32, AvroLiteral::Long(u32::MAX as i64));
3453 let err_u32_neg = dt_u32
3454 .parse_and_store_default(&serde_json::json!(-1))
3455 .unwrap_err();
3456 assert!(err_u32_neg.to_string().contains("out of u32 range"));
3457 let err_u32_high = dt_u32
3458 .parse_and_store_default(&serde_json::json!(u32::MAX as i64 + 1))
3459 .unwrap_err();
3460 assert!(err_u32_high.to_string().contains("out of u32 range"));
3461 }
3462
3463 #[test]
3464 fn test_validate_and_store_default_fixed_decimal_interval() {
3465 let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
3466 let l = dt_fixed
3467 .parse_and_store_default(&json_string("WXYZ"))
3468 .unwrap();
3469 assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
3470 let err = dt_fixed
3471 .parse_and_store_default(&json_string("TOO LONG"))
3472 .unwrap_err();
3473 assert!(err.to_string().contains("Default length"));
3474 let mut dt_dec_fixed =
3475 AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
3476 let l = dt_dec_fixed
3477 .parse_and_store_default(&json_string("abc"))
3478 .unwrap();
3479 assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
3480 let err = dt_dec_fixed
3481 .parse_and_store_default(&json_string("toolong"))
3482 .unwrap_err();
3483 assert!(err.to_string().contains("Default length"));
3484 let mut dt_dec_bytes =
3485 AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3486 let l = dt_dec_bytes
3487 .parse_and_store_default(&json_string("freeform"))
3488 .unwrap();
3489 assert_eq!(
3490 l,
3491 AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
3492 );
3493 let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
3494 let l = dt_interval
3495 .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
3496 .unwrap();
3497 assert_eq!(
3498 l,
3499 AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
3500 );
3501 let err = dt_interval
3502 .parse_and_store_default(&json_string("short"))
3503 .unwrap_err();
3504 assert!(err.to_string().contains("Default length"));
3505 }
3506
3507 #[test]
3508 fn test_validate_and_store_default_enum_list_map_struct() {
3509 let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
3510 .into_iter()
3511 .collect();
3512 let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
3513 let l = dt_enum
3514 .parse_and_store_default(&json_string("GREEN"))
3515 .unwrap();
3516 assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
3517 let err = dt_enum
3518 .parse_and_store_default(&json_string("YELLOW"))
3519 .unwrap_err();
3520 assert!(err.to_string().contains("Default enum symbol"));
3521 let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3522 let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
3523 let val = serde_json::json!([1, 2, 3]);
3524 let l = dt_list.parse_and_store_default(&val).unwrap();
3525 assert_eq!(
3526 l,
3527 AvroLiteral::Array(vec![
3528 AvroLiteral::Long(1),
3529 AvroLiteral::Long(2),
3530 AvroLiteral::Long(3)
3531 ])
3532 );
3533 let err = dt_list
3534 .parse_and_store_default(&serde_json::json!({"not":"array"}))
3535 .unwrap_err();
3536 assert!(err.to_string().contains("JSON array"));
3537 let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3538 let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
3539 let mv = serde_json::json!({"x": 1.5, "y": 2.5});
3540 let l = dt_map.parse_and_store_default(&mv).unwrap();
3541 let mut expected = IndexMap::new();
3542 expected.insert("x".into(), AvroLiteral::Double(1.5));
3543 expected.insert("y".into(), AvroLiteral::Double(2.5));
3544 assert_eq!(l, AvroLiteral::Map(expected));
3545 let err = dt_map
3547 .parse_and_store_default(&serde_json::json!(123))
3548 .unwrap_err();
3549 assert!(err.to_string().contains("JSON object"));
3550 let mut field_a = AvroField {
3551 name: "a".into(),
3552 data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
3553 };
3554 let field_b = AvroField {
3555 name: "b".into(),
3556 data_type: AvroDataType::new(
3557 Codec::Int64,
3558 HashMap::new(),
3559 Some(Nullability::NullFirst),
3560 ),
3561 };
3562 let mut c_md = HashMap::new();
3563 c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
3564 let field_c = AvroField {
3565 name: "c".into(),
3566 data_type: AvroDataType::new(Codec::Utf8, c_md, None),
3567 };
3568 field_a.data_type.metadata.insert("doc".into(), "na".into());
3569 let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
3570 let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
3571 let default_obj = serde_json::json!({"a": 7});
3572 let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
3573 let mut expected = IndexMap::new();
3574 expected.insert("a".into(), AvroLiteral::Int(7));
3575 expected.insert("b".into(), AvroLiteral::Null);
3576 expected.insert("c".into(), AvroLiteral::String("xyz".into()));
3577 assert_eq!(l, AvroLiteral::Map(expected));
3578 assert_default_stored(&dt_struct, &default_obj);
3579 let req_field = AvroField {
3580 name: "req".into(),
3581 data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
3582 };
3583 let mut dt_bad = AvroDataType::new(
3584 Codec::Struct(Arc::from(vec![req_field])),
3585 HashMap::new(),
3586 None,
3587 );
3588 let err = dt_bad
3589 .parse_and_store_default(&serde_json::json!({}))
3590 .unwrap_err();
3591 assert!(
3592 err.to_string().contains("missing required subfield 'req'"),
3593 "unexpected error: {err}"
3594 );
3595 let err = dt_struct
3596 .parse_and_store_default(&serde_json::json!(10))
3597 .unwrap_err();
3598 err.to_string().contains("must be a JSON object");
3599 }
3600
3601 #[test]
3602 fn test_resolve_array_promotion_and_reader_metadata() {
3603 let mut w_add: HashMap<&str, Value> = HashMap::new();
3604 w_add.insert("who", json_string("writer"));
3605 let mut r_add: HashMap<&str, Value> = HashMap::new();
3606 r_add.insert("who", json_string("reader"));
3607 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3608 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3609 attributes: Attributes {
3610 logical_type: None,
3611 additional: w_add,
3612 },
3613 }));
3614 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3615 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
3616 attributes: Attributes {
3617 logical_type: None,
3618 additional: r_add,
3619 },
3620 }));
3621 let mut maker = Maker::new(false, false, Tz::default());
3622 let dt = maker
3623 .make_data_type(&writer_schema, Some(&reader_schema), None)
3624 .unwrap();
3625 assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
3626 if let Codec::List(inner) = dt.codec() {
3627 assert!(matches!(inner.codec(), Codec::Int64));
3628 assert_eq!(
3629 inner.resolution,
3630 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3631 );
3632 } else {
3633 panic!("expected list codec");
3634 }
3635 }
3636
3637 #[test]
3638 fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
3639 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3640 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3641 attributes: Attributes::default(),
3642 }));
3643 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3644 items: Box::new(mk_union(vec![
3645 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3646 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3647 ])),
3648 attributes: Attributes::default(),
3649 }));
3650 let mut maker = Maker::new(false, false, Tz::default());
3651 let dt = maker
3652 .make_data_type(&writer_schema, Some(&reader_schema), None)
3653 .unwrap();
3654 if let Codec::List(inner) = dt.codec() {
3655 assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
3656 assert!(matches!(inner.codec(), Codec::Int32));
3657 match inner.resolution.as_ref() {
3658 Some(ResolutionInfo::Promotion(Promotion::Direct)) => {}
3659 other => panic!("expected Union resolution, got {other:?}"),
3660 }
3661 } else {
3662 panic!("expected List codec");
3663 }
3664 }
3665
3666 #[test]
3667 fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3668 let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3669 name: "MD5",
3670 namespace: None,
3671 aliases: vec!["Hash16"],
3672 size: 16,
3673 attributes: Attributes::default(),
3674 }));
3675 let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3676 name: "Hash16",
3677 namespace: None,
3678 aliases: vec![],
3679 size: 16,
3680 attributes: Attributes::default(),
3681 }));
3682 let mut maker = Maker::new(false, false, Tz::default());
3683 let dt = maker
3684 .make_data_type(&writer_schema, Some(&reader_schema), None)
3685 .unwrap();
3686 assert!(matches!(dt.codec(), Codec::Fixed(16)));
3687 }
3688
3689 #[cfg(feature = "avro_custom_types")]
3690 #[test]
3691 fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3692 let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3693 name: "ArrowIntervalMDN",
3694 namespace: None,
3695 aliases: vec![],
3696 size: 16,
3697 attributes: Attributes {
3698 logical_type: Some("arrow.interval-month-day-nano"),
3699 additional: Default::default(),
3700 },
3701 }));
3702 let mut maker = Maker::new(false, false, Default::default());
3703 let dt = maker.make_data_type(&schema, None, None).unwrap();
3704 assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3705 assert_eq!(
3706 dt.codec.data_type(),
3707 DataType::Interval(IntervalUnit::MonthDayNano)
3708 );
3709 }
3710
3711 #[test]
3712 fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3713 let writer = Schema::Complex(ComplexType::Record(Record {
3714 name: "R",
3715 namespace: None,
3716 doc: None,
3717 aliases: vec![],
3718 fields: vec![
3719 crate::schema::Field {
3720 name: "a",
3721 doc: None,
3722 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3723 default: None,
3724 aliases: vec![],
3725 },
3726 crate::schema::Field {
3727 name: "skipme",
3728 doc: None,
3729 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3730 default: None,
3731 aliases: vec![],
3732 },
3733 crate::schema::Field {
3734 name: "b",
3735 doc: None,
3736 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3737 default: None,
3738 aliases: vec![],
3739 },
3740 ],
3741 attributes: Attributes::default(),
3742 }));
3743 let reader = Schema::Complex(ComplexType::Record(Record {
3744 name: "R",
3745 namespace: None,
3746 doc: None,
3747 aliases: vec![],
3748 fields: vec![
3749 crate::schema::Field {
3750 name: "b",
3751 doc: None,
3752 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3753 default: None,
3754 aliases: vec![],
3755 },
3756 crate::schema::Field {
3757 name: "a",
3758 doc: None,
3759 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3760 default: None,
3761 aliases: vec![],
3762 },
3763 crate::schema::Field {
3764 name: "name",
3765 doc: None,
3766 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3767 default: Some(json_string("anon")),
3768 aliases: vec![],
3769 },
3770 crate::schema::Field {
3771 name: "opt",
3772 doc: None,
3773 r#type: Schema::Union(vec![
3774 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3775 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3776 ]),
3777 default: None, aliases: vec![],
3779 },
3780 ],
3781 attributes: Attributes::default(),
3782 }));
3783 let mut maker = Maker::new(false, false, Tz::default());
3784 let dt = maker
3785 .make_data_type(&writer, Some(&reader), None)
3786 .expect("record resolution");
3787 let fields = match dt.codec() {
3788 Codec::Struct(f) => f,
3789 other => panic!("expected struct, got {other:?}"),
3790 };
3791 assert_eq!(fields.len(), 4);
3792 assert_eq!(fields[0].name(), "b");
3793 assert_eq!(fields[1].name(), "a");
3794 assert_eq!(fields[2].name(), "name");
3795 assert_eq!(fields[3].name(), "opt");
3796 assert!(matches!(
3797 fields[1].data_type().resolution,
3798 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3799 ));
3800 let rec = match dt.resolution {
3801 Some(ResolutionInfo::Record(ref r)) => r.clone(),
3802 other => panic!("expected record resolution, got {other:?}"),
3803 };
3804 assert!(matches!(
3805 &rec.writer_fields[..],
3806 &[
3807 ResolvedField::ToReader(1, _),
3808 ResolvedField::Skip(_),
3809 ResolvedField::ToReader(0, _),
3810 ]
3811 ));
3812 assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3813 let ResolvedField::Skip(skip1) = &rec.writer_fields[1] else {
3814 panic!("should skip field 1")
3815 };
3816 assert!(matches!(skip1.codec(), Codec::Utf8));
3817 let name_md = &fields[2].data_type().metadata;
3818 assert_eq!(
3819 name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3820 Some(&"\"anon\"".to_string())
3821 );
3822 let opt_md = &fields[3].data_type().metadata;
3823 assert_eq!(
3824 opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3825 Some(&"null".to_string())
3826 );
3827 }
3828
3829 #[test]
3830 fn test_named_type_alias_resolution_record_cross_namespace() {
3831 let writer_record = Record {
3832 name: "PersonV2",
3833 namespace: Some("com.example.v2"),
3834 doc: None,
3835 aliases: vec!["com.example.Person"],
3836 fields: vec![
3837 AvroFieldSchema {
3838 name: "name",
3839 doc: None,
3840 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3841 default: None,
3842 aliases: vec![],
3843 },
3844 AvroFieldSchema {
3845 name: "age",
3846 doc: None,
3847 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3848 default: None,
3849 aliases: vec![],
3850 },
3851 ],
3852 attributes: Attributes::default(),
3853 };
3854 let reader_record = Record {
3855 name: "Person",
3856 namespace: Some("com.example"),
3857 doc: None,
3858 aliases: vec![],
3859 fields: writer_record.fields.clone(),
3860 attributes: Attributes::default(),
3861 };
3862 let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3863 let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3864 let mut maker = Maker::new(false, false, Tz::default());
3865 let result = maker
3866 .make_data_type(&writer_schema, Some(&reader_schema), None)
3867 .expect("record alias resolution should succeed");
3868 match result.codec {
3869 Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3870 other => panic!("expected struct, got {other:?}"),
3871 }
3872 }
3873
3874 #[test]
3875 fn test_named_type_alias_resolution_enum_cross_namespace() {
3876 let writer_enum = Enum {
3877 name: "ColorV2",
3878 namespace: Some("org.example.v2"),
3879 doc: None,
3880 aliases: vec!["org.example.Color"],
3881 symbols: vec!["RED", "GREEN", "BLUE"],
3882 default: None,
3883 attributes: Attributes::default(),
3884 };
3885 let reader_enum = Enum {
3886 name: "Color",
3887 namespace: Some("org.example"),
3888 doc: None,
3889 aliases: vec![],
3890 symbols: vec!["RED", "GREEN", "BLUE"],
3891 default: None,
3892 attributes: Attributes::default(),
3893 };
3894 let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3895 let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3896 let mut maker = Maker::new(false, false, Tz::default());
3897 maker
3898 .make_data_type(&writer_schema, Some(&reader_schema), None)
3899 .expect("enum alias resolution should succeed");
3900 }
3901
3902 #[test]
3903 fn test_named_type_alias_resolution_fixed_cross_namespace() {
3904 let writer_fixed = Fixed {
3905 name: "Fx10V2",
3906 namespace: Some("ns.v2"),
3907 aliases: vec!["ns.Fx10"],
3908 size: 10,
3909 attributes: Attributes::default(),
3910 };
3911 let reader_fixed = Fixed {
3912 name: "Fx10",
3913 namespace: Some("ns"),
3914 aliases: vec![],
3915 size: 10,
3916 attributes: Attributes::default(),
3917 };
3918 let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3919 let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3920 let mut maker = Maker::new(false, false, Tz::default());
3921 maker
3922 .make_data_type(&writer_schema, Some(&reader_schema), None)
3923 .expect("fixed alias resolution should succeed");
3924 }
3925}