1use crate::schema::{
21 AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22 AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23 PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26 ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27 IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43 Promotion(Promotion),
45 DefaultValue(AvroLiteral),
47 EnumMapping(EnumMapping),
49 Record(ResolvedRecord),
51 Union(ResolvedUnion),
53}
54
55#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60 Null,
62 Boolean(bool),
64 Int(i32),
66 Long(i64),
68 Float(f32),
70 Double(f64),
72 Bytes(Vec<u8>),
74 String(String),
76 Enum(String),
78 Array(Vec<AvroLiteral>),
80 Map(IndexMap<String, AvroLiteral>),
82}
83
84#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87 pub(crate) writer_fields: Arc<[ResolvedField]>,
89 pub(crate) default_fields: Arc<[usize]>,
91}
92
93#[derive(Debug, Clone, PartialEq)]
95pub(crate) enum ResolvedField {
96 ToReader(usize),
98 Skip(AvroDataType),
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub(crate) enum Promotion {
109 Direct,
111 IntToLong,
113 IntToFloat,
115 IntToDouble,
117 LongToFloat,
119 LongToDouble,
121 FloatToDouble,
123 StringToBytes,
125 BytesToString,
127}
128
129impl Display for Promotion {
130 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
131 match self {
132 Self::Direct => write!(formatter, "Direct"),
133 Self::IntToLong => write!(formatter, "Int->Long"),
134 Self::IntToFloat => write!(formatter, "Int->Float"),
135 Self::IntToDouble => write!(formatter, "Int->Double"),
136 Self::LongToFloat => write!(formatter, "Long->Float"),
137 Self::LongToDouble => write!(formatter, "Long->Double"),
138 Self::FloatToDouble => write!(formatter, "Float->Double"),
139 Self::StringToBytes => write!(formatter, "String->Bytes"),
140 Self::BytesToString => write!(formatter, "Bytes->String"),
141 }
142 }
143}
144
145#[derive(Debug, Clone, PartialEq)]
147pub(crate) struct ResolvedUnion {
148 pub(crate) writer_to_reader: Arc<[Option<(usize, ResolutionInfo)>]>,
151 pub(crate) writer_is_union: bool,
153 pub(crate) reader_is_union: bool,
155}
156
157#[derive(Debug, Clone, PartialEq, Eq)]
161pub(crate) struct EnumMapping {
162 pub(crate) mapping: Arc<[i32]>,
164 pub(crate) default_index: i32,
167}
168
169#[cfg(feature = "canonical_extension_types")]
170fn with_extension_type(codec: &Codec, field: Field) -> Field {
171 match codec {
172 Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
173 _ => field,
174 }
175}
176
177#[derive(Debug, Clone, PartialEq)]
179pub(crate) struct AvroDataType {
180 nullability: Option<Nullability>,
181 metadata: HashMap<String, String>,
182 codec: Codec,
183 pub(crate) resolution: Option<ResolutionInfo>,
184}
185
186impl AvroDataType {
187 pub(crate) fn new(
189 codec: Codec,
190 metadata: HashMap<String, String>,
191 nullability: Option<Nullability>,
192 ) -> Self {
193 AvroDataType {
194 codec,
195 metadata,
196 nullability,
197 resolution: None,
198 }
199 }
200
201 #[inline]
202 fn new_with_resolution(
203 codec: Codec,
204 metadata: HashMap<String, String>,
205 nullability: Option<Nullability>,
206 resolution: Option<ResolutionInfo>,
207 ) -> Self {
208 Self {
209 codec,
210 metadata,
211 nullability,
212 resolution,
213 }
214 }
215
216 pub(crate) fn field_with_name(&self, name: &str) -> Field {
218 let mut nullable = self.nullability.is_some();
219 if !nullable {
220 if let Codec::Union(children, _, _) = self.codec() {
221 if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
223 nullable = true;
224 }
225 }
226 }
227 let data_type = self.codec.data_type();
228 let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
229 #[cfg(feature = "canonical_extension_types")]
230 return with_extension_type(&self.codec, field);
231 #[cfg(not(feature = "canonical_extension_types"))]
232 field
233 }
234
235 pub(crate) fn codec(&self) -> &Codec {
240 &self.codec
241 }
242
243 pub(crate) fn nullability(&self) -> Option<Nullability> {
251 self.nullability
252 }
253
254 #[inline]
255 fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
256 fn expect_string<'v>(
257 default_json: &'v Value,
258 data_type: &str,
259 ) -> Result<&'v str, ArrowError> {
260 match default_json {
261 Value::String(s) => Ok(s.as_str()),
262 _ => Err(ArrowError::SchemaError(format!(
263 "Default value must be a JSON string for {data_type}"
264 ))),
265 }
266 }
267
268 fn parse_bytes_default(
269 default_json: &Value,
270 expected_len: Option<usize>,
271 ) -> Result<Vec<u8>, ArrowError> {
272 let s = expect_string(default_json, "bytes/fixed logical types")?;
273 let mut out = Vec::with_capacity(s.len());
274 for ch in s.chars() {
275 let cp = ch as u32;
276 if cp > 0xFF {
277 return Err(ArrowError::SchemaError(format!(
278 "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
279 )));
280 }
281 out.push(cp as u8);
282 }
283 if let Some(len) = expected_len {
284 if out.len() != len {
285 return Err(ArrowError::SchemaError(format!(
286 "Default length {} does not match expected fixed size {len}",
287 out.len(),
288 )));
289 }
290 }
291 Ok(out)
292 }
293
294 fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
295 match default_json {
296 Value::Number(n) => n.as_i64().ok_or_else(|| {
297 ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
298 }),
299 _ => Err(ArrowError::SchemaError(format!(
300 "Default {data_type} must be a JSON integer"
301 ))),
302 }
303 }
304
305 fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
306 match default_json {
307 Value::Number(n) => n.as_f64().ok_or_else(|| {
308 ArrowError::SchemaError(format!("Default {data_type} must be a number"))
309 }),
310 _ => Err(ArrowError::SchemaError(format!(
311 "Default {data_type} must be a JSON number"
312 ))),
313 }
314 }
315
316 if default_json.is_null() {
318 return match self.codec() {
319 Codec::Null => Ok(AvroLiteral::Null),
320 Codec::Union(encodings, _, _) if !encodings.is_empty()
321 && matches!(encodings[0].codec(), Codec::Null) =>
322 {
323 Ok(AvroLiteral::Null)
324 }
325 _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
326 _ => Err(ArrowError::SchemaError(
327 "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
328 .to_string(),
329 )),
330 };
331 }
332 let lit = match self.codec() {
333 Codec::Null => {
334 return Err(ArrowError::SchemaError(
335 "Default for `null` type must be JSON null".to_string(),
336 ));
337 }
338 Codec::Boolean => match default_json {
339 Value::Bool(b) => AvroLiteral::Boolean(*b),
340 _ => {
341 return Err(ArrowError::SchemaError(
342 "Boolean default must be a JSON boolean".to_string(),
343 ));
344 }
345 },
346 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
347 let i = parse_json_i64(default_json, "int")?;
348 if i < i32::MIN as i64 || i > i32::MAX as i64 {
349 return Err(ArrowError::SchemaError(format!(
350 "Default int {i} out of i32 range"
351 )));
352 }
353 AvroLiteral::Int(i as i32)
354 }
355 Codec::Int64
356 | Codec::TimeMicros
357 | Codec::TimestampMillis(_)
358 | Codec::TimestampMicros(_)
359 | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
360 #[cfg(feature = "avro_custom_types")]
361 Codec::DurationNanos
362 | Codec::DurationMicros
363 | Codec::DurationMillis
364 | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
365 #[cfg(feature = "avro_custom_types")]
366 Codec::Int8 => {
367 let i = parse_json_i64(default_json, "int")?;
368 if i < i8::MIN as i64 || i > i8::MAX as i64 {
369 return Err(ArrowError::SchemaError(format!(
370 "Default int8 {i} out of i8 range"
371 )));
372 }
373 AvroLiteral::Int(i as i32)
374 }
375 #[cfg(feature = "avro_custom_types")]
376 Codec::Int16 => {
377 let i = parse_json_i64(default_json, "int")?;
378 if i < i16::MIN as i64 || i > i16::MAX as i64 {
379 return Err(ArrowError::SchemaError(format!(
380 "Default int16 {i} out of i16 range"
381 )));
382 }
383 AvroLiteral::Int(i as i32)
384 }
385 #[cfg(feature = "avro_custom_types")]
386 Codec::UInt8 => {
387 let i = parse_json_i64(default_json, "int")?;
388 if i < 0 || i > u8::MAX as i64 {
389 return Err(ArrowError::SchemaError(format!(
390 "Default uint8 {i} out of u8 range"
391 )));
392 }
393 AvroLiteral::Int(i as i32)
394 }
395 #[cfg(feature = "avro_custom_types")]
396 Codec::UInt16 => {
397 let i = parse_json_i64(default_json, "int")?;
398 if i < 0 || i > u16::MAX as i64 {
399 return Err(ArrowError::SchemaError(format!(
400 "Default uint16 {i} out of u16 range"
401 )));
402 }
403 AvroLiteral::Int(i as i32)
404 }
405 #[cfg(feature = "avro_custom_types")]
406 Codec::UInt32 => {
407 let i = parse_json_i64(default_json, "long")?;
408 if i < 0 || i > u32::MAX as i64 {
409 return Err(ArrowError::SchemaError(format!(
410 "Default uint32 {i} out of u32 range"
411 )));
412 }
413 AvroLiteral::Long(i)
414 }
415 #[cfg(feature = "avro_custom_types")]
416 Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
417 AvroLiteral::Long(parse_json_i64(default_json, "long")?)
418 }
419 #[cfg(feature = "avro_custom_types")]
420 Codec::UInt64 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?),
421 #[cfg(feature = "avro_custom_types")]
422 Codec::Float16 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(2))?),
423 #[cfg(feature = "avro_custom_types")]
424 Codec::Time32Secs => {
425 let i = parse_json_i64(default_json, "int")?;
426 if i < i32::MIN as i64 || i > i32::MAX as i64 {
427 return Err(ArrowError::SchemaError(format!(
428 "Default time32-secs {i} out of i32 range"
429 )));
430 }
431 AvroLiteral::Int(i as i32)
432 }
433 #[cfg(feature = "avro_custom_types")]
434 Codec::IntervalYearMonth => {
435 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
436 }
437 #[cfg(feature = "avro_custom_types")]
438 Codec::IntervalMonthDayNano => {
439 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
440 }
441 #[cfg(feature = "avro_custom_types")]
442 Codec::IntervalDayTime => {
443 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
444 }
445 Codec::Float32 => {
446 let f = parse_json_f64(default_json, "float")?;
447 if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
448 return Err(ArrowError::SchemaError(format!(
449 "Default float {f} out of f32 range or not finite"
450 )));
451 }
452 AvroLiteral::Float(f as f32)
453 }
454 Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
455 Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
456 AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
457 }
458 Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
459 Codec::Fixed(sz) => {
460 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
461 }
462 Codec::Decimal(_, _, fixed_size) => {
463 AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
464 }
465 Codec::Enum(symbols) => {
466 let s = expect_string(default_json, "enum")?;
467 if symbols.iter().any(|sym| sym == s) {
468 AvroLiteral::Enum(s.to_string())
469 } else {
470 return Err(ArrowError::SchemaError(format!(
471 "Default enum symbol {s:?} not found in reader enum symbols"
472 )));
473 }
474 }
475 Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
476 Codec::List(item_dt) => match default_json {
477 Value::Array(items) => AvroLiteral::Array(
478 items
479 .iter()
480 .map(|v| item_dt.parse_default_literal(v))
481 .collect::<Result<_, _>>()?,
482 ),
483 _ => {
484 return Err(ArrowError::SchemaError(
485 "Default value must be a JSON array for Avro array type".to_string(),
486 ));
487 }
488 },
489 Codec::Map(val_dt) => match default_json {
490 Value::Object(map) => {
491 let mut out = IndexMap::with_capacity(map.len());
492 for (k, v) in map {
493 out.insert(k.clone(), val_dt.parse_default_literal(v)?);
494 }
495 AvroLiteral::Map(out)
496 }
497 _ => {
498 return Err(ArrowError::SchemaError(
499 "Default value must be a JSON object for Avro map type".to_string(),
500 ));
501 }
502 },
503 Codec::Struct(fields) => match default_json {
504 Value::Object(obj) => {
505 let mut out: IndexMap<String, AvroLiteral> =
506 IndexMap::with_capacity(fields.len());
507 for f in fields.as_ref() {
508 let name = f.name().to_string();
509 if let Some(sub) = obj.get(&name) {
510 out.insert(name, f.data_type().parse_default_literal(sub)?);
511 } else {
512 let stored_default =
514 f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
515 if stored_default.is_none()
516 && f.data_type().nullability() == Some(Nullability::default())
517 {
518 out.insert(name, AvroLiteral::Null);
519 } else if let Some(default_json) = stored_default {
520 let v: Value =
521 serde_json::from_str(default_json).map_err(|e| {
522 ArrowError::SchemaError(format!(
523 "Failed to parse stored subfield default JSON for '{}': {e}",
524 f.name(),
525 ))
526 })?;
527 out.insert(name, f.data_type().parse_default_literal(&v)?);
528 } else {
529 return Err(ArrowError::SchemaError(format!(
530 "Record default missing required subfield '{}' with non-nullable type {:?}",
531 f.name(),
532 f.data_type().codec()
533 )));
534 }
535 }
536 }
537 AvroLiteral::Map(out)
538 }
539 _ => {
540 return Err(ArrowError::SchemaError(
541 "Default value for record/struct must be a JSON object".to_string(),
542 ));
543 }
544 },
545 Codec::Union(encodings, _, _) => {
546 let Some(default_encoding) = encodings.first() else {
547 return Err(ArrowError::SchemaError(
548 "Union with no branches cannot have a default".to_string(),
549 ));
550 };
551 default_encoding.parse_default_literal(default_json)?
552 }
553 #[cfg(feature = "avro_custom_types")]
554 Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
555 };
556 Ok(lit)
557 }
558
559 fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
560 let json_text = serde_json::to_string(default_json).map_err(|e| {
561 ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
562 })?;
563 self.metadata
564 .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
565 Ok(())
566 }
567
568 fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
569 let lit = self.parse_default_literal(default_json)?;
570 self.store_default(default_json)?;
571 Ok(lit)
572 }
573}
574
575#[derive(Debug, Clone, PartialEq)]
577pub(crate) struct AvroField {
578 name: String,
579 data_type: AvroDataType,
580}
581
582impl AvroField {
583 pub(crate) fn field(&self) -> Field {
585 self.data_type.field_with_name(&self.name)
586 }
587
588 pub(crate) fn data_type(&self) -> &AvroDataType {
590 &self.data_type
591 }
592
593 pub(crate) fn with_utf8view(&self) -> Self {
602 let mut field = self.clone();
603 if let Codec::Utf8 = field.data_type.codec {
604 field.data_type.codec = Codec::Utf8View;
605 }
606 field
607 }
608
609 pub(crate) fn name(&self) -> &str {
614 &self.name
615 }
616}
617
618impl<'a> TryFrom<&Schema<'a>> for AvroField {
619 type Error = ArrowError;
620
621 fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
622 match schema {
623 Schema::Complex(ComplexType::Record(r)) => {
624 let mut resolver = Maker::new(false, false);
625 let data_type = resolver.make_data_type(schema, None, None)?;
626 Ok(AvroField {
627 data_type,
628 name: r.name.to_string(),
629 })
630 }
631 _ => Err(ArrowError::ParseError(format!(
632 "Expected record got {schema:?}"
633 ))),
634 }
635 }
636}
637
638#[derive(Debug)]
640pub(crate) struct AvroFieldBuilder<'a> {
641 writer_schema: &'a Schema<'a>,
642 reader_schema: Option<&'a Schema<'a>>,
643 use_utf8view: bool,
644 strict_mode: bool,
645}
646
647impl<'a> AvroFieldBuilder<'a> {
648 pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
650 Self {
651 writer_schema,
652 reader_schema: None,
653 use_utf8view: false,
654 strict_mode: false,
655 }
656 }
657
658 #[inline]
663 pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
664 self.reader_schema = Some(reader_schema);
665 self
666 }
667
668 pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
670 self.use_utf8view = use_utf8view;
671 self
672 }
673
674 pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
676 self.strict_mode = strict_mode;
677 self
678 }
679
680 pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
682 match self.writer_schema {
683 Schema::Complex(ComplexType::Record(r)) => {
684 let mut resolver = Maker::new(self.use_utf8view, self.strict_mode);
685 let data_type =
686 resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
687 Ok(AvroField {
688 name: r.name.to_string(),
689 data_type,
690 })
691 }
692 _ => Err(ArrowError::ParseError(format!(
693 "Expected a Record schema to build an AvroField, but got {:?}",
694 self.writer_schema
695 ))),
696 }
697 }
698}
699
700#[derive(Debug, Clone, PartialEq)]
704pub(crate) enum Codec {
705 Null,
707 Boolean,
709 Int32,
711 Int64,
713 Float32,
715 Float64,
717 Binary,
719 Utf8,
721 Utf8View,
726 Date32,
728 TimeMillis,
730 TimeMicros,
732 TimestampMillis(bool),
737 TimestampMicros(bool),
742 TimestampNanos(bool),
747 Fixed(i32),
750 Decimal(usize, Option<usize>, Option<usize>),
757 Uuid,
759 Enum(Arc<[String]>),
763 List(Arc<AvroDataType>),
765 Struct(Arc<[AvroField]>),
767 Map(Arc<AvroDataType>),
769 Interval,
771 Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
773 #[cfg(feature = "avro_custom_types")]
775 DurationNanos,
776 #[cfg(feature = "avro_custom_types")]
778 DurationMicros,
779 #[cfg(feature = "avro_custom_types")]
781 DurationMillis,
782 #[cfg(feature = "avro_custom_types")]
784 DurationSeconds,
785 #[cfg(feature = "avro_custom_types")]
786 RunEndEncoded(Arc<AvroDataType>, u8),
787 #[cfg(feature = "avro_custom_types")]
789 Int8,
790 #[cfg(feature = "avro_custom_types")]
792 Int16,
793 #[cfg(feature = "avro_custom_types")]
795 UInt8,
796 #[cfg(feature = "avro_custom_types")]
798 UInt16,
799 #[cfg(feature = "avro_custom_types")]
801 UInt32,
802 #[cfg(feature = "avro_custom_types")]
804 UInt64,
805 #[cfg(feature = "avro_custom_types")]
807 Float16,
808 #[cfg(feature = "avro_custom_types")]
810 Date64,
811 #[cfg(feature = "avro_custom_types")]
813 TimeNanos,
814 #[cfg(feature = "avro_custom_types")]
816 Time32Secs,
817 #[cfg(feature = "avro_custom_types")]
820 TimestampSecs(bool),
821 #[cfg(feature = "avro_custom_types")]
823 IntervalYearMonth,
824 #[cfg(feature = "avro_custom_types")]
826 IntervalMonthDayNano,
827 #[cfg(feature = "avro_custom_types")]
829 IntervalDayTime,
830}
831
832impl Codec {
833 fn data_type(&self) -> DataType {
834 match self {
835 Self::Null => DataType::Null,
836 Self::Boolean => DataType::Boolean,
837 Self::Int32 => DataType::Int32,
838 Self::Int64 => DataType::Int64,
839 Self::Float32 => DataType::Float32,
840 Self::Float64 => DataType::Float64,
841 Self::Binary => DataType::Binary,
842 Self::Utf8 => DataType::Utf8,
843 Self::Utf8View => DataType::Utf8View,
844 Self::Date32 => DataType::Date32,
845 Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
846 Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
847 Self::TimestampMillis(is_utc) => {
848 DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
849 }
850 Self::TimestampMicros(is_utc) => {
851 DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
852 }
853 Self::TimestampNanos(is_utc) => {
854 DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
855 }
856 Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
857 Self::Fixed(size) => DataType::FixedSizeBinary(*size),
858 Self::Decimal(precision, scale, _size) => {
859 let p = *precision as u8;
860 let s = scale.unwrap_or(0) as i8;
861 #[cfg(feature = "small_decimals")]
862 {
863 if *precision <= DECIMAL32_MAX_PRECISION as usize {
864 DataType::Decimal32(p, s)
865 } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
866 DataType::Decimal64(p, s)
867 } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
868 DataType::Decimal128(p, s)
869 } else {
870 DataType::Decimal256(p, s)
871 }
872 }
873 #[cfg(not(feature = "small_decimals"))]
874 {
875 if *precision <= DECIMAL128_MAX_PRECISION as usize {
876 DataType::Decimal128(p, s)
877 } else {
878 DataType::Decimal256(p, s)
879 }
880 }
881 }
882 Self::Uuid => DataType::FixedSizeBinary(16),
883 Self::Enum(_) => {
884 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
885 }
886 Self::List(f) => {
887 DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
888 }
889 Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
890 Self::Map(value_type) => {
891 let val_field = value_type.field_with_name("value");
892 DataType::Map(
893 Arc::new(Field::new(
894 "entries",
895 DataType::Struct(Fields::from(vec![
896 Field::new("key", DataType::Utf8, false),
897 val_field,
898 ])),
899 false,
900 )),
901 false,
902 )
903 }
904 Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
905 #[cfg(feature = "avro_custom_types")]
906 Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
907 #[cfg(feature = "avro_custom_types")]
908 Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
909 #[cfg(feature = "avro_custom_types")]
910 Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
911 #[cfg(feature = "avro_custom_types")]
912 Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
913 #[cfg(feature = "avro_custom_types")]
914 Self::RunEndEncoded(values, bits) => {
915 let run_ends_dt = match *bits {
916 16 => DataType::Int16,
917 32 => DataType::Int32,
918 64 => DataType::Int64,
919 _ => unreachable!(),
920 };
921 DataType::RunEndEncoded(
922 Arc::new(Field::new("run_ends", run_ends_dt, false)),
923 Arc::new(Field::new("values", values.codec().data_type(), true)),
924 )
925 }
926 #[cfg(feature = "avro_custom_types")]
927 Self::Int8 => DataType::Int8,
928 #[cfg(feature = "avro_custom_types")]
929 Self::Int16 => DataType::Int16,
930 #[cfg(feature = "avro_custom_types")]
931 Self::UInt8 => DataType::UInt8,
932 #[cfg(feature = "avro_custom_types")]
933 Self::UInt16 => DataType::UInt16,
934 #[cfg(feature = "avro_custom_types")]
935 Self::UInt32 => DataType::UInt32,
936 #[cfg(feature = "avro_custom_types")]
937 Self::UInt64 => DataType::UInt64,
938 #[cfg(feature = "avro_custom_types")]
939 Self::Float16 => DataType::Float16,
940 #[cfg(feature = "avro_custom_types")]
941 Self::Date64 => DataType::Date64,
942 #[cfg(feature = "avro_custom_types")]
943 Self::TimeNanos => DataType::Time64(TimeUnit::Nanosecond),
944 #[cfg(feature = "avro_custom_types")]
945 Self::Time32Secs => DataType::Time32(TimeUnit::Second),
946 #[cfg(feature = "avro_custom_types")]
947 Self::TimestampSecs(is_utc) => {
948 DataType::Timestamp(TimeUnit::Second, is_utc.then(|| "+00:00".into()))
949 }
950 #[cfg(feature = "avro_custom_types")]
951 Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
952 #[cfg(feature = "avro_custom_types")]
953 Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
954 #[cfg(feature = "avro_custom_types")]
955 Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
956 }
957 }
958
959 pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
965 if use_utf8view && matches!(self, Self::Utf8) {
966 Self::Utf8View
967 } else {
968 self
969 }
970 }
971
972 #[inline]
973 fn union_field_name(&self) -> String {
974 UnionFieldKind::from(self).as_ref().to_owned()
975 }
976}
977
978impl From<PrimitiveType> for Codec {
979 fn from(value: PrimitiveType) -> Self {
980 match value {
981 PrimitiveType::Null => Self::Null,
982 PrimitiveType::Boolean => Self::Boolean,
983 PrimitiveType::Int => Self::Int32,
984 PrimitiveType::Long => Self::Int64,
985 PrimitiveType::Float => Self::Float32,
986 PrimitiveType::Double => Self::Float64,
987 PrimitiveType::Bytes => Self::Binary,
988 PrimitiveType::String => Self::Utf8,
989 }
990 }
991}
992
993const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
1002 const MAX_P: [usize; 32] = [
1007 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1008 59, 62, 64, 67, 69, 71, 74, 76,
1009 ];
1010 match n {
1011 1..=32 => Some(MAX_P[n - 1]),
1012 _ => None,
1013 }
1014}
1015
1016fn parse_decimal_attributes(
1017 attributes: &Attributes,
1018 fallback_size: Option<usize>,
1019 precision_required: bool,
1020) -> Result<(usize, usize, Option<usize>), ArrowError> {
1021 let precision = attributes
1022 .additional
1023 .get("precision")
1024 .and_then(|v| v.as_u64())
1025 .or(if precision_required { None } else { Some(10) })
1026 .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
1027 as usize;
1028 let scale = attributes
1029 .additional
1030 .get("scale")
1031 .and_then(|v| v.as_u64())
1032 .unwrap_or(0) as usize;
1033 let size = attributes
1034 .additional
1035 .get("size")
1036 .and_then(|v| v.as_u64())
1037 .map(|s| s as usize)
1038 .or(fallback_size);
1039 if precision == 0 {
1040 return Err(ArrowError::ParseError(
1041 "Decimal requires precision > 0".to_string(),
1042 ));
1043 }
1044 if scale > precision {
1045 return Err(ArrowError::ParseError(format!(
1046 "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
1047 )));
1048 }
1049 if precision > DECIMAL256_MAX_PRECISION as usize {
1050 return Err(ArrowError::ParseError(format!(
1051 "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
1052 DECIMAL256_MAX_PRECISION
1053 )));
1054 }
1055 if let Some(sz) = size {
1056 let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
1057 ArrowError::ParseError(format!(
1058 "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
1059 ))
1060 })?;
1061 if precision > max_p {
1062 return Err(ArrowError::ParseError(format!(
1063 "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
1064 )));
1065 }
1066 }
1067 Ok((precision, scale, size))
1068}
1069
1070#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
1071#[strum(serialize_all = "snake_case")]
1072enum UnionFieldKind {
1073 Null,
1074 Boolean,
1075 Int,
1076 Long,
1077 Float,
1078 Double,
1079 Bytes,
1080 String,
1081 Date,
1082 TimeMillis,
1083 TimeMicros,
1084 TimestampMillisUtc,
1085 TimestampMillisLocal,
1086 TimestampMicrosUtc,
1087 TimestampMicrosLocal,
1088 TimestampNanosUtc,
1089 TimestampNanosLocal,
1090 Duration,
1091 Fixed,
1092 Decimal,
1093 Enum,
1094 Array,
1095 Record,
1096 Map,
1097 Uuid,
1098 Union,
1099}
1100
1101impl From<&Codec> for UnionFieldKind {
1102 fn from(c: &Codec) -> Self {
1103 match c {
1104 Codec::Null => Self::Null,
1105 Codec::Boolean => Self::Boolean,
1106 Codec::Int32 => Self::Int,
1107 Codec::Int64 => Self::Long,
1108 Codec::Float32 => Self::Float,
1109 Codec::Float64 => Self::Double,
1110 Codec::Binary => Self::Bytes,
1111 Codec::Utf8 | Codec::Utf8View => Self::String,
1112 Codec::Date32 => Self::Date,
1113 Codec::TimeMillis => Self::TimeMillis,
1114 Codec::TimeMicros => Self::TimeMicros,
1115 Codec::TimestampMillis(true) => Self::TimestampMillisUtc,
1116 Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
1117 Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
1118 Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
1119 Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
1120 Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
1121 Codec::Interval => Self::Duration,
1122 Codec::Fixed(_) => Self::Fixed,
1123 Codec::Decimal(..) => Self::Decimal,
1124 Codec::Enum(_) => Self::Enum,
1125 Codec::List(_) => Self::Array,
1126 Codec::Struct(_) => Self::Record,
1127 Codec::Map(_) => Self::Map,
1128 Codec::Uuid => Self::Uuid,
1129 Codec::Union(..) => Self::Union,
1130 #[cfg(feature = "avro_custom_types")]
1131 Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
1132 #[cfg(feature = "avro_custom_types")]
1133 Codec::DurationNanos
1134 | Codec::DurationMicros
1135 | Codec::DurationMillis
1136 | Codec::DurationSeconds => Self::Duration,
1137 #[cfg(feature = "avro_custom_types")]
1138 Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 => Self::Int,
1139 #[cfg(feature = "avro_custom_types")]
1140 Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
1141 Self::Long
1142 }
1143 #[cfg(feature = "avro_custom_types")]
1144 Codec::Time32Secs => Self::TimeMillis, #[cfg(feature = "avro_custom_types")]
1146 Codec::UInt64
1147 | Codec::Float16
1148 | Codec::IntervalYearMonth
1149 | Codec::IntervalMonthDayNano
1150 | Codec::IntervalDayTime => Self::Fixed,
1151 }
1152 }
1153}
1154
1155fn union_branch_name(dt: &AvroDataType) -> String {
1156 if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
1157 if name.contains(".") {
1158 return name.to_string();
1160 }
1161 if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1162 return format!("{ns}.{name}");
1163 }
1164 return name.to_string();
1165 }
1166 dt.codec.union_field_name()
1167}
1168
1169fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
1170 let arrow_fields: Vec<Field> = encodings
1171 .iter()
1172 .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1173 .collect();
1174 let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1175 UnionFields::try_new(type_ids, arrow_fields)
1176}
1177
1178#[derive(Debug, Default)]
1182struct Resolver<'a> {
1183 map: HashMap<(&'a str, &'a str), AvroDataType>,
1184}
1185
1186impl<'a> Resolver<'a> {
1187 fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1188 self.map.insert((namespace.unwrap_or(""), name), schema);
1189 }
1190
1191 fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1192 let (namespace, name) = name
1193 .rsplit_once('.')
1194 .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1195 self.map
1196 .get(&(namespace, name))
1197 .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1198 .cloned()
1199 }
1200}
1201
1202fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1203 let mut out = HashSet::with_capacity(1 + aliases.len());
1204 let (full, _) = make_full_name(name, ns, None);
1205 out.insert(full);
1206 for a in aliases {
1207 let (fa, _) = make_full_name(a, None, ns);
1208 out.insert(fa);
1209 }
1210 out
1211}
1212
1213fn names_match(
1214 writer_name: &str,
1215 writer_namespace: Option<&str>,
1216 writer_aliases: &[&str],
1217 reader_name: &str,
1218 reader_namespace: Option<&str>,
1219 reader_aliases: &[&str],
1220) -> bool {
1221 let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1222 let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1223 !writer_set.is_disjoint(&reader_set)
1225}
1226
1227fn ensure_names_match(
1228 data_type: &str,
1229 writer_name: &str,
1230 writer_namespace: Option<&str>,
1231 writer_aliases: &[&str],
1232 reader_name: &str,
1233 reader_namespace: Option<&str>,
1234 reader_aliases: &[&str],
1235) -> Result<(), ArrowError> {
1236 if names_match(
1237 writer_name,
1238 writer_namespace,
1239 writer_aliases,
1240 reader_name,
1241 reader_namespace,
1242 reader_aliases,
1243 ) {
1244 Ok(())
1245 } else {
1246 Err(ArrowError::ParseError(format!(
1247 "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1248 )))
1249 }
1250}
1251
1252fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1253 match schema {
1254 Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1255 Schema::Type(Type {
1256 r#type: TypeName::Primitive(primitive),
1257 ..
1258 }) => Some(*primitive),
1259 _ => None,
1260 }
1261}
1262
1263fn nullable_union_variants<'x, 'y>(
1264 variant: &'y [Schema<'x>],
1265) -> Option<(Nullability, &'y Schema<'x>)> {
1266 if variant.len() != 2 {
1267 return None;
1268 }
1269 let is_null = |schema: &Schema<'x>| {
1270 matches!(
1271 schema,
1272 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1273 )
1274 };
1275 match (is_null(&variant[0]), is_null(&variant[1])) {
1276 (true, false) => Some((Nullability::NullFirst, &variant[1])),
1277 (false, true) => Some((Nullability::NullSecond, &variant[0])),
1278 _ => None,
1279 }
1280}
1281
1282#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1283enum UnionBranchKey {
1284 Named(String),
1285 Primitive(PrimitiveType),
1286 Array,
1287 Map,
1288}
1289
1290fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1291 let (name, namespace) = match s {
1292 Schema::TypeName(TypeName::Primitive(p))
1293 | Schema::Type(Type {
1294 r#type: TypeName::Primitive(p),
1295 ..
1296 }) => return Some(UnionBranchKey::Primitive(*p)),
1297 Schema::TypeName(TypeName::Ref(name))
1298 | Schema::Type(Type {
1299 r#type: TypeName::Ref(name),
1300 ..
1301 }) => (name, None),
1302 Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1303 Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1304 Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1305 Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1306 Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1307 Schema::Union(_) => return None,
1308 };
1309 let (full, _) = make_full_name(name, namespace, enclosing_ns);
1310 Some(UnionBranchKey::Named(full))
1311}
1312
1313fn union_first_duplicate<'a>(
1314 branches: &'a [Schema<'a>],
1315 enclosing_ns: Option<&'a str>,
1316) -> Option<String> {
1317 let mut seen = HashSet::with_capacity(branches.len());
1318 for schema in branches {
1319 if let Some(key) = branch_key_of(schema, enclosing_ns) {
1320 if !seen.insert(key.clone()) {
1321 let msg = match key {
1322 UnionBranchKey::Named(full) => format!("named type {full}"),
1323 UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1324 UnionBranchKey::Array => "array".to_string(),
1325 UnionBranchKey::Map => "map".to_string(),
1326 };
1327 return Some(msg);
1328 }
1329 }
1330 }
1331 None
1332}
1333
1334struct Maker<'a> {
1338 resolver: Resolver<'a>,
1339 use_utf8view: bool,
1340 strict_mode: bool,
1341}
1342
1343impl<'a> Maker<'a> {
1344 fn new(use_utf8view: bool, strict_mode: bool) -> Self {
1345 Self {
1346 resolver: Default::default(),
1347 use_utf8view,
1348 strict_mode,
1349 }
1350 }
1351
1352 #[cfg(feature = "avro_custom_types")]
1353 #[inline]
1354 fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1355 if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1356 let mut inner = (*values).clone();
1357 inner.nullability = Some(nb);
1358 dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1359 }
1360 }
1361
1362 fn make_data_type<'s>(
1363 &mut self,
1364 writer_schema: &'s Schema<'a>,
1365 reader_schema: Option<&'s Schema<'a>>,
1366 namespace: Option<&'a str>,
1367 ) -> Result<AvroDataType, ArrowError> {
1368 match reader_schema {
1369 Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1370 None => self.parse_type(writer_schema, namespace),
1371 }
1372 }
1373
1374 fn parse_type<'s>(
1387 &mut self,
1388 schema: &'s Schema<'a>,
1389 namespace: Option<&'a str>,
1390 ) -> Result<AvroDataType, ArrowError> {
1391 match schema {
1392 Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1393 Codec::from(*p).with_utf8view(self.use_utf8view),
1394 Default::default(),
1395 None,
1396 )),
1397 Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1398 Schema::Union(f) => {
1399 let null = f
1400 .iter()
1401 .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1402 match (f.len() == 2, null) {
1403 (true, Some(0)) => {
1404 let mut field = self.parse_type(&f[1], namespace)?;
1405 field.nullability = Some(Nullability::NullFirst);
1406 #[cfg(feature = "avro_custom_types")]
1407 Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1408 return Ok(field);
1409 }
1410 (true, Some(1)) => {
1411 if self.strict_mode {
1412 return Err(ArrowError::SchemaError(
1413 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1414 .to_string(),
1415 ));
1416 }
1417 let mut field = self.parse_type(&f[0], namespace)?;
1418 field.nullability = Some(Nullability::NullSecond);
1419 #[cfg(feature = "avro_custom_types")]
1420 Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1421 return Ok(field);
1422 }
1423 _ => {}
1424 }
1425 if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1427 return Err(ArrowError::SchemaError(
1428 "Avro unions may not immediately contain other unions".to_string(),
1429 ));
1430 }
1431 if let Some(dup) = union_first_duplicate(f, namespace) {
1433 return Err(ArrowError::SchemaError(format!(
1434 "Avro union contains duplicate branch type: {dup}"
1435 )));
1436 }
1437 let children: Vec<AvroDataType> = f
1439 .iter()
1440 .map(|s| self.parse_type(s, namespace))
1441 .collect::<Result<_, _>>()?;
1442 let union_fields = build_union_fields(&children)?;
1444 Ok(AvroDataType::new(
1445 Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1446 Default::default(),
1447 None,
1448 ))
1449 }
1450 Schema::Complex(c) => match c {
1451 ComplexType::Record(r) => {
1452 let namespace = r.namespace.or(namespace);
1453 let mut metadata = r.attributes.field_metadata();
1454 let fields = r
1455 .fields
1456 .iter()
1457 .map(|field| {
1458 Ok(AvroField {
1459 name: field.name.to_string(),
1460 data_type: self.parse_type(&field.r#type, namespace)?,
1461 })
1462 })
1463 .collect::<Result<_, ArrowError>>()?;
1464 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1465 if let Some(ns) = namespace {
1466 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1467 }
1468 let field = AvroDataType {
1469 nullability: None,
1470 codec: Codec::Struct(fields),
1471 metadata,
1472 resolution: None,
1473 };
1474 self.resolver.register(r.name, namespace, field.clone());
1475 Ok(field)
1476 }
1477 ComplexType::Array(a) => {
1478 let field = self.parse_type(a.items.as_ref(), namespace)?;
1479 Ok(AvroDataType {
1480 nullability: None,
1481 metadata: a.attributes.field_metadata(),
1482 codec: Codec::List(Arc::new(field)),
1483 resolution: None,
1484 })
1485 }
1486 ComplexType::Fixed(f) => {
1487 let size = f.size.try_into().map_err(|e| {
1488 ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1489 })?;
1490 let namespace = f.namespace.or(namespace);
1491 let mut metadata = f.attributes.field_metadata();
1492 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1493 if let Some(ns) = namespace {
1494 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1495 }
1496 let field = match f.attributes.logical_type {
1497 Some("decimal") => {
1498 let (precision, scale, _) =
1499 parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1500 AvroDataType {
1501 nullability: None,
1502 metadata,
1503 codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1504 resolution: None,
1505 }
1506 }
1507 Some("duration") => {
1508 if size != 12 {
1509 return Err(ArrowError::ParseError(format!(
1510 "Invalid fixed size for Duration: {size}, must be 12"
1511 )));
1512 };
1513 AvroDataType {
1514 nullability: None,
1515 metadata,
1516 codec: Codec::Interval,
1517 resolution: None,
1518 }
1519 }
1520 #[cfg(feature = "avro_custom_types")]
1521 Some("arrow.uint64") if size == 8 => AvroDataType {
1522 nullability: None,
1523 metadata,
1524 codec: Codec::UInt64,
1525 resolution: None,
1526 },
1527 #[cfg(feature = "avro_custom_types")]
1528 Some("arrow.float16") if size == 2 => AvroDataType {
1529 nullability: None,
1530 metadata,
1531 codec: Codec::Float16,
1532 resolution: None,
1533 },
1534 #[cfg(feature = "avro_custom_types")]
1535 Some("arrow.interval-year-month") if size == 4 => AvroDataType {
1536 nullability: None,
1537 metadata,
1538 codec: Codec::IntervalYearMonth,
1539 resolution: None,
1540 },
1541 #[cfg(feature = "avro_custom_types")]
1542 Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1543 nullability: None,
1544 metadata,
1545 codec: Codec::IntervalMonthDayNano,
1546 resolution: None,
1547 },
1548 #[cfg(feature = "avro_custom_types")]
1549 Some("arrow.interval-day-time") if size == 8 => AvroDataType {
1550 nullability: None,
1551 metadata,
1552 codec: Codec::IntervalDayTime,
1553 resolution: None,
1554 },
1555 _ => AvroDataType {
1556 nullability: None,
1557 metadata,
1558 codec: Codec::Fixed(size),
1559 resolution: None,
1560 },
1561 };
1562 self.resolver.register(f.name, namespace, field.clone());
1563 Ok(field)
1564 }
1565 ComplexType::Enum(e) => {
1566 let namespace = e.namespace.or(namespace);
1567 let symbols = e
1568 .symbols
1569 .iter()
1570 .map(|s| s.to_string())
1571 .collect::<Arc<[String]>>();
1572 let mut metadata = e.attributes.field_metadata();
1573 let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1574 ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1575 })?;
1576 metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1577 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1578 if let Some(ns) = namespace {
1579 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1580 }
1581 let field = AvroDataType {
1582 nullability: None,
1583 metadata,
1584 codec: Codec::Enum(symbols),
1585 resolution: None,
1586 };
1587 self.resolver.register(e.name, namespace, field.clone());
1588 Ok(field)
1589 }
1590 ComplexType::Map(m) => {
1591 let val = self.parse_type(&m.values, namespace)?;
1592 Ok(AvroDataType {
1593 nullability: None,
1594 metadata: m.attributes.field_metadata(),
1595 codec: Codec::Map(Arc::new(val)),
1596 resolution: None,
1597 })
1598 }
1599 },
1600 Schema::Type(t) => {
1601 let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1602 match (t.attributes.logical_type, &mut field.codec) {
1604 (Some("decimal"), c @ Codec::Binary) => {
1605 let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1606 *c = Codec::Decimal(prec, Some(sc), None);
1607 }
1608 (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1609 (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1610 (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1611 (Some("timestamp-millis"), c @ Codec::Int64) => {
1612 *c = Codec::TimestampMillis(true)
1613 }
1614 (Some("timestamp-micros"), c @ Codec::Int64) => {
1615 *c = Codec::TimestampMicros(true)
1616 }
1617 (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1618 *c = Codec::TimestampMillis(false)
1619 }
1620 (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1621 *c = Codec::TimestampMicros(false)
1622 }
1623 (Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
1624 (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1625 *c = Codec::TimestampNanos(false)
1626 }
1627 (Some("uuid"), c @ Codec::Utf8) => {
1628 *c = Codec::Uuid;
1632 field.metadata.insert("logicalType".into(), "uuid".into());
1633 }
1634 #[cfg(feature = "avro_custom_types")]
1635 (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1636 #[cfg(feature = "avro_custom_types")]
1637 (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1638 #[cfg(feature = "avro_custom_types")]
1639 (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1640 #[cfg(feature = "avro_custom_types")]
1641 (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1642 *c = Codec::DurationSeconds
1643 }
1644 #[cfg(feature = "avro_custom_types")]
1645 (Some("arrow.run-end-encoded"), _) => {
1646 let bits_u8: u8 = t
1647 .attributes
1648 .additional
1649 .get("arrow.runEndIndexBits")
1650 .and_then(|v| v.as_u64())
1651 .and_then(|n| u8::try_from(n).ok())
1652 .ok_or_else(|| ArrowError::ParseError(
1653 "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1654 .to_string(),
1655 ))?;
1656 if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1657 return Err(ArrowError::ParseError(format!(
1658 "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1659 )));
1660 }
1661 let values_site = field.clone();
1663 field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1664 }
1665 #[cfg(feature = "avro_custom_types")]
1667 (Some("arrow.int8"), c @ Codec::Int32) => *c = Codec::Int8,
1668 #[cfg(feature = "avro_custom_types")]
1669 (Some("arrow.int16"), c @ Codec::Int32) => *c = Codec::Int16,
1670 #[cfg(feature = "avro_custom_types")]
1671 (Some("arrow.uint8"), c @ Codec::Int32) => *c = Codec::UInt8,
1672 #[cfg(feature = "avro_custom_types")]
1673 (Some("arrow.uint16"), c @ Codec::Int32) => *c = Codec::UInt16,
1674 #[cfg(feature = "avro_custom_types")]
1675 (Some("arrow.uint32"), c @ Codec::Int64) => *c = Codec::UInt32,
1676 #[cfg(feature = "avro_custom_types")]
1677 (Some("arrow.uint64"), c @ Codec::Fixed(8)) => *c = Codec::UInt64,
1678 #[cfg(feature = "avro_custom_types")]
1680 (Some("arrow.float16"), c @ Codec::Fixed(2)) => *c = Codec::Float16,
1681 #[cfg(feature = "avro_custom_types")]
1683 (Some("arrow.date64"), c @ Codec::Int64) => *c = Codec::Date64,
1684 #[cfg(feature = "avro_custom_types")]
1686 (Some("arrow.time64-nanosecond"), c @ Codec::Int64) => *c = Codec::TimeNanos,
1687 #[cfg(feature = "avro_custom_types")]
1688 (Some("arrow.time32-second"), c @ Codec::Int32) => *c = Codec::Time32Secs,
1689 #[cfg(feature = "avro_custom_types")]
1690 (Some("arrow.timestamp-second"), c @ Codec::Int64) => {
1691 *c = Codec::TimestampSecs(true)
1692 }
1693 #[cfg(feature = "avro_custom_types")]
1694 (Some("arrow.local-timestamp-second"), c @ Codec::Int64) => {
1695 *c = Codec::TimestampSecs(false)
1696 }
1697 #[cfg(feature = "avro_custom_types")]
1699 (Some("arrow.interval-year-month"), c @ Codec::Fixed(4)) => {
1700 *c = Codec::IntervalYearMonth
1701 }
1702 #[cfg(feature = "avro_custom_types")]
1703 (Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1704 *c = Codec::IntervalMonthDayNano
1705 }
1706 #[cfg(feature = "avro_custom_types")]
1707 (Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
1708 *c = Codec::IntervalDayTime
1709 }
1710 (Some(logical), _) => {
1711 field.metadata.insert("logicalType".into(), logical.into());
1713 }
1714 (None, _) => {}
1715 }
1716 if matches!(field.codec, Codec::Int64) {
1717 if let Some(unit) = t
1718 .attributes
1719 .additional
1720 .get("arrowTimeUnit")
1721 .and_then(|v| v.as_str())
1722 {
1723 if unit == "nanosecond" {
1724 field.codec = Codec::TimestampNanos(false);
1725 }
1726 }
1727 }
1728 if !t.attributes.additional.is_empty() {
1729 for (k, v) in &t.attributes.additional {
1730 field.metadata.insert(k.to_string(), v.to_string());
1731 }
1732 }
1733 Ok(field)
1734 }
1735 }
1736 }
1737
1738 fn resolve_type<'s>(
1739 &mut self,
1740 writer_schema: &'s Schema<'a>,
1741 reader_schema: &'s Schema<'a>,
1742 namespace: Option<&'a str>,
1743 ) -> Result<AvroDataType, ArrowError> {
1744 if let (Some(write_primitive), Some(read_primitive)) =
1745 (primitive_of(writer_schema), primitive_of(reader_schema))
1746 {
1747 return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1748 }
1749 match (writer_schema, reader_schema) {
1750 (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1751 let writer_variants = writer_variants.as_slice();
1752 let reader_variants = reader_variants.as_slice();
1753 match (
1754 nullable_union_variants(writer_variants),
1755 nullable_union_variants(reader_variants),
1756 ) {
1757 (Some((w_nb, w_nonnull)), Some((r_nb, r_nonnull))) => {
1758 let mut dt = self.resolve_type(w_nonnull, r_nonnull, namespace)?;
1759 let mut writer_to_reader = vec![None, None];
1760 writer_to_reader[w_nb.non_null_index()] = Some((
1761 r_nb.non_null_index(),
1762 dt.resolution
1763 .take()
1764 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct)),
1765 ));
1766 dt.nullability = Some(w_nb);
1767 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1768 writer_to_reader: Arc::from(writer_to_reader),
1769 writer_is_union: true,
1770 reader_is_union: true,
1771 }));
1772 #[cfg(feature = "avro_custom_types")]
1773 Self::propagate_nullability_into_ree(&mut dt, w_nb);
1774 Ok(dt)
1775 }
1776 _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1777 }
1778 }
1779 (Schema::Union(writer_variants), reader_non_union) => {
1780 let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1781 .iter()
1782 .map(|writer| {
1783 self.resolve_type(writer, reader_non_union, namespace)
1784 .ok()
1785 .map(|tmp| {
1786 let resolution = tmp
1787 .resolution
1788 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1789 (0usize, resolution)
1790 })
1791 })
1792 .collect();
1793 let mut dt = self.parse_type(reader_non_union, namespace)?;
1794 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1795 writer_to_reader: Arc::from(writer_to_reader),
1796 writer_is_union: true,
1797 reader_is_union: false,
1798 }));
1799 Ok(dt)
1800 }
1801 (writer_non_union, Schema::Union(reader_variants)) => {
1802 if let Some((nullability, non_null_branch)) =
1803 nullable_union_variants(reader_variants)
1804 {
1805 let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1806 #[cfg(feature = "avro_custom_types")]
1807 Self::propagate_nullability_into_ree(&mut dt, nullability);
1808 dt.nullability = Some(nullability);
1809 if dt.resolution.is_none() {
1812 dt.resolution = Some(ResolutionInfo::Promotion(Promotion::Direct));
1813 }
1814 Ok(dt)
1815 } else {
1816 let Some((match_idx, mut match_dt)) =
1817 self.find_best_union_match(writer_non_union, reader_variants, namespace)
1818 else {
1819 return Err(ArrowError::SchemaError(
1820 "Writer schema does not match any reader union branch".to_string(),
1821 ));
1822 };
1823 let resolution = match_dt
1829 .resolution
1830 .take()
1831 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1832 let mut match_dt = Some(match_dt);
1833 let children = reader_variants
1834 .iter()
1835 .enumerate()
1836 .map(|(idx, variant)| {
1837 if idx == match_idx {
1838 Ok(match_dt.take().unwrap())
1839 } else {
1840 self.parse_type(variant, namespace)
1841 }
1842 })
1843 .collect::<Result<Vec<_>, _>>()?;
1844 let union_fields = build_union_fields(&children)?;
1845 let mut dt = AvroDataType::new(
1846 Codec::Union(children.into(), union_fields, UnionMode::Dense),
1847 Default::default(),
1848 None,
1849 );
1850 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1851 writer_to_reader: Arc::from(vec![Some((match_idx, resolution))]),
1852 writer_is_union: false,
1853 reader_is_union: true,
1854 }));
1855 Ok(dt)
1856 }
1857 }
1858 (
1859 Schema::Complex(ComplexType::Array(writer_array)),
1860 Schema::Complex(ComplexType::Array(reader_array)),
1861 ) => self.resolve_array(writer_array, reader_array, namespace),
1862 (
1863 Schema::Complex(ComplexType::Map(writer_map)),
1864 Schema::Complex(ComplexType::Map(reader_map)),
1865 ) => self.resolve_map(writer_map, reader_map, namespace),
1866 (
1867 Schema::Complex(ComplexType::Fixed(writer_fixed)),
1868 Schema::Complex(ComplexType::Fixed(reader_fixed)),
1869 ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1870 (
1871 Schema::Complex(ComplexType::Record(writer_record)),
1872 Schema::Complex(ComplexType::Record(reader_record)),
1873 ) => self.resolve_records(writer_record, reader_record, namespace),
1874 (
1875 Schema::Complex(ComplexType::Enum(writer_enum)),
1876 Schema::Complex(ComplexType::Enum(reader_enum)),
1877 ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1878 (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1879 (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1880 _ => Err(ArrowError::NotYetImplemented(
1881 "Other resolutions not yet implemented".to_string(),
1882 )),
1883 }
1884 }
1885
1886 fn find_best_union_match(
1887 &mut self,
1888 writer: &Schema<'a>,
1889 reader_variants: &[Schema<'a>],
1890 namespace: Option<&'a str>,
1891 ) -> Option<(usize, AvroDataType)> {
1892 let mut first_resolution = None;
1893 for (reader_index, reader) in reader_variants.iter().enumerate() {
1894 if let Ok(dt) = self.resolve_type(writer, reader, namespace) {
1895 match &dt.resolution {
1896 None | Some(ResolutionInfo::Promotion(Promotion::Direct)) => {
1897 return Some((reader_index, dt));
1899 }
1900 Some(_) => {
1901 if first_resolution.is_none() {
1902 first_resolution = Some((reader_index, dt));
1904 }
1905 }
1906 };
1907 }
1908 }
1909 first_resolution
1910 }
1911
1912 fn resolve_unions<'s>(
1913 &mut self,
1914 writer_variants: &'s [Schema<'a>],
1915 reader_variants: &'s [Schema<'a>],
1916 namespace: Option<&'a str>,
1917 ) -> Result<AvroDataType, ArrowError> {
1918 let mut resolved_reader_encodings = HashMap::new();
1919 let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1920 .iter()
1921 .map(|writer| {
1922 self.find_best_union_match(writer, reader_variants, namespace)
1923 .map(|(match_idx, mut match_dt)| {
1924 let resolution = match_dt
1925 .resolution
1926 .take()
1927 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1928 resolved_reader_encodings.insert(match_idx, match_dt);
1931 (match_idx, resolution)
1932 })
1933 })
1934 .collect();
1935 let reader_encodings: Vec<AvroDataType> = reader_variants
1936 .iter()
1937 .enumerate()
1938 .map(|(reader_idx, reader_schema)| {
1939 if let Some(resolved) = resolved_reader_encodings.remove(&reader_idx) {
1940 Ok(resolved)
1941 } else {
1942 self.parse_type(reader_schema, namespace)
1943 }
1944 })
1945 .collect::<Result<_, _>>()?;
1946 let union_fields = build_union_fields(&reader_encodings)?;
1947 let mut dt = AvroDataType::new(
1948 Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1949 Default::default(),
1950 None,
1951 );
1952 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1953 writer_to_reader: Arc::from(writer_to_reader),
1954 writer_is_union: true,
1955 reader_is_union: true,
1956 }));
1957 Ok(dt)
1958 }
1959
1960 fn resolve_array(
1961 &mut self,
1962 writer_array: &Array<'a>,
1963 reader_array: &Array<'a>,
1964 namespace: Option<&'a str>,
1965 ) -> Result<AvroDataType, ArrowError> {
1966 Ok(AvroDataType {
1967 nullability: None,
1968 metadata: reader_array.attributes.field_metadata(),
1969 codec: Codec::List(Arc::new(self.make_data_type(
1970 writer_array.items.as_ref(),
1971 Some(reader_array.items.as_ref()),
1972 namespace,
1973 )?)),
1974 resolution: None,
1975 })
1976 }
1977
1978 fn resolve_map(
1979 &mut self,
1980 writer_map: &Map<'a>,
1981 reader_map: &Map<'a>,
1982 namespace: Option<&'a str>,
1983 ) -> Result<AvroDataType, ArrowError> {
1984 Ok(AvroDataType {
1985 nullability: None,
1986 metadata: reader_map.attributes.field_metadata(),
1987 codec: Codec::Map(Arc::new(self.make_data_type(
1988 &writer_map.values,
1989 Some(&reader_map.values),
1990 namespace,
1991 )?)),
1992 resolution: None,
1993 })
1994 }
1995
1996 fn resolve_fixed<'s>(
1997 &mut self,
1998 writer_fixed: &Fixed<'a>,
1999 reader_fixed: &Fixed<'a>,
2000 reader_schema: &'s Schema<'a>,
2001 namespace: Option<&'a str>,
2002 ) -> Result<AvroDataType, ArrowError> {
2003 ensure_names_match(
2004 "Fixed",
2005 writer_fixed.name,
2006 writer_fixed.namespace,
2007 &writer_fixed.aliases,
2008 reader_fixed.name,
2009 reader_fixed.namespace,
2010 &reader_fixed.aliases,
2011 )?;
2012 if writer_fixed.size != reader_fixed.size {
2013 return Err(ArrowError::SchemaError(format!(
2014 "Fixed size mismatch for {}: writer={}, reader={}",
2015 reader_fixed.name, writer_fixed.size, reader_fixed.size
2016 )));
2017 }
2018 self.parse_type(reader_schema, namespace)
2019 }
2020
2021 fn resolve_primitives(
2022 &mut self,
2023 write_primitive: PrimitiveType,
2024 read_primitive: PrimitiveType,
2025 reader_schema: &Schema<'a>,
2026 ) -> Result<AvroDataType, ArrowError> {
2027 if write_primitive == read_primitive {
2028 return self.parse_type(reader_schema, None);
2029 }
2030 let promotion = match (write_primitive, read_primitive) {
2031 (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
2032 (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
2033 (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
2034 (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
2035 (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
2036 (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
2037 (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
2038 (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
2039 _ => {
2040 return Err(ArrowError::ParseError(format!(
2041 "Illegal promotion {write_primitive:?} to {read_primitive:?}"
2042 )));
2043 }
2044 };
2045 let mut datatype = self.parse_type(reader_schema, None)?;
2046 datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
2047 Ok(datatype)
2048 }
2049
2050 fn resolve_enums(
2106 &mut self,
2107 writer_enum: &Enum<'a>,
2108 reader_enum: &Enum<'a>,
2109 reader_schema: &Schema<'a>,
2110 namespace: Option<&'a str>,
2111 ) -> Result<AvroDataType, ArrowError> {
2112 ensure_names_match(
2113 "Enum",
2114 writer_enum.name,
2115 writer_enum.namespace,
2116 &writer_enum.aliases,
2117 reader_enum.name,
2118 reader_enum.namespace,
2119 &reader_enum.aliases,
2120 )?;
2121 if writer_enum.symbols == reader_enum.symbols {
2122 return self.parse_type(reader_schema, namespace);
2123 }
2124 let reader_index: HashMap<&str, i32> = reader_enum
2125 .symbols
2126 .iter()
2127 .enumerate()
2128 .map(|(index, &symbol)| (symbol, index as i32))
2129 .collect();
2130 let default_index: i32 = match reader_enum.default {
2131 Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
2132 ArrowError::SchemaError(format!(
2133 "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
2134 reader_enum.name,
2135 ))
2136 })?,
2137 None => -1,
2138 };
2139 let mapping: Vec<i32> = writer_enum
2140 .symbols
2141 .iter()
2142 .map(|&write_symbol| {
2143 reader_index
2144 .get(write_symbol)
2145 .copied()
2146 .unwrap_or(default_index)
2147 })
2148 .collect();
2149 if self.strict_mode && mapping.iter().any(|&m| m < 0) {
2150 return Err(ArrowError::SchemaError(format!(
2151 "Reader enum '{}' does not cover all writer symbols and no default is provided",
2152 reader_enum.name
2153 )));
2154 }
2155 let mut dt = self.parse_type(reader_schema, namespace)?;
2156 dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
2157 mapping: Arc::from(mapping),
2158 default_index,
2159 }));
2160 let reader_ns = reader_enum.namespace.or(namespace);
2161 self.resolver
2162 .register(reader_enum.name, reader_ns, dt.clone());
2163 Ok(dt)
2164 }
2165
2166 #[inline]
2167 fn build_writer_lookup(
2168 writer_record: &Record<'a>,
2169 ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
2170 let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
2171 for (idx, wf) in writer_record.fields.iter().enumerate() {
2172 map.insert(wf.name, idx);
2174 }
2175 let mut ambiguous: HashSet<&str> = HashSet::new();
2177 for (idx, wf) in writer_record.fields.iter().enumerate() {
2178 for &alias in &wf.aliases {
2179 match map.entry(alias) {
2180 Entry::Occupied(e) if *e.get() != idx => {
2181 ambiguous.insert(alias);
2182 }
2183 Entry::Vacant(e) => {
2184 e.insert(idx);
2185 }
2186 _ => {}
2187 }
2188 }
2189 }
2190 (map, ambiguous)
2191 }
2192
2193 fn resolve_records(
2194 &mut self,
2195 writer_record: &Record<'a>,
2196 reader_record: &Record<'a>,
2197 namespace: Option<&'a str>,
2198 ) -> Result<AvroDataType, ArrowError> {
2199 ensure_names_match(
2200 "Record",
2201 writer_record.name,
2202 writer_record.namespace,
2203 &writer_record.aliases,
2204 reader_record.name,
2205 reader_record.namespace,
2206 &reader_record.aliases,
2207 )?;
2208 let writer_ns = writer_record.namespace.or(namespace);
2209 let reader_ns = reader_record.namespace.or(namespace);
2210 let mut reader_md = reader_record.attributes.field_metadata();
2211 reader_md.insert(
2212 AVRO_NAME_METADATA_KEY.to_string(),
2213 reader_record.name.to_string(),
2214 );
2215 if let Some(ns) = reader_ns {
2216 reader_md.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
2217 }
2218 let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
2220 let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
2221 let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
2222 let mut default_fields: Vec<usize> = Vec::new();
2224 for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
2225 let mut match_idx = writer_lookup.get(r_field.name).copied();
2227 let mut matched_via_alias: Option<&str> = None;
2228 if match_idx.is_none() {
2229 for &alias in &r_field.aliases {
2230 if let Some(i) = writer_lookup.get(alias).copied() {
2231 if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
2232 return Err(ArrowError::SchemaError(format!(
2233 "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
2234 r_field.name
2235 )));
2236 }
2237 match_idx = Some(i);
2238 matched_via_alias = Some(alias);
2239 break;
2240 }
2241 }
2242 }
2243 if let Some(wi) = match_idx {
2244 if writer_to_reader[wi].is_none() {
2245 let w_schema = &writer_record.fields[wi].r#type;
2246 let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
2247 writer_to_reader[wi] = Some(reader_idx);
2248 reader_fields.push(AvroField {
2249 name: r_field.name.to_owned(),
2250 data_type: dt,
2251 });
2252 continue;
2253 } else if self.strict_mode {
2254 let existing_reader = writer_to_reader[wi].unwrap();
2256 let via = matched_via_alias
2257 .map(|a| format!("alias '{a}'"))
2258 .unwrap_or_else(|| "name match".to_string());
2259 return Err(ArrowError::SchemaError(format!(
2260 "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
2261 writer_record.fields[wi].name
2262 )));
2263 }
2264 }
2266 let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
2268 if let Some(default_json) = r_field.default.as_ref() {
2269 dt.resolution = Some(ResolutionInfo::DefaultValue(
2270 dt.parse_and_store_default(default_json)?,
2271 ));
2272 default_fields.push(reader_idx);
2273 } else if dt.nullability() == Some(Nullability::NullFirst) {
2274 dt.resolution = Some(ResolutionInfo::DefaultValue(
2276 dt.parse_and_store_default(&Value::Null)?,
2277 ));
2278 default_fields.push(reader_idx);
2279 } else {
2280 return Err(ArrowError::SchemaError(format!(
2281 "Reader field '{}' not present in writer schema must have a default value",
2282 r_field.name
2283 )));
2284 }
2285 reader_fields.push(AvroField {
2286 name: r_field.name.to_owned(),
2287 data_type: dt,
2288 });
2289 }
2290 let writer_fields = writer_record
2292 .fields
2293 .iter()
2294 .enumerate()
2295 .map(|(writer_index, writer_field)| {
2296 if let Some(reader_index) = writer_to_reader[writer_index] {
2297 Ok(ResolvedField::ToReader(reader_index))
2298 } else {
2299 let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
2300 Ok(ResolvedField::Skip(dt))
2301 }
2302 })
2303 .collect::<Result<_, ArrowError>>()?;
2304 let resolved = AvroDataType::new_with_resolution(
2305 Codec::Struct(Arc::from(reader_fields)),
2306 reader_md,
2307 None,
2308 Some(ResolutionInfo::Record(ResolvedRecord {
2309 writer_fields,
2310 default_fields: Arc::from(default_fields),
2311 })),
2312 );
2313 self.resolver
2315 .register(reader_record.name, reader_ns, resolved.clone());
2316 Ok(resolved)
2317 }
2318}
2319
2320#[cfg(test)]
2321mod tests {
2322 use super::*;
2323 use crate::schema::{
2324 AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2325 Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2326 };
2327 use indexmap::IndexMap;
2328 use serde_json::{self, Value};
2329
2330 fn create_schema_with_logical_type(
2331 primitive_type: PrimitiveType,
2332 logical_type: &'static str,
2333 ) -> Schema<'static> {
2334 let attributes = Attributes {
2335 logical_type: Some(logical_type),
2336 additional: Default::default(),
2337 };
2338
2339 Schema::Type(Type {
2340 r#type: TypeName::Primitive(primitive_type),
2341 attributes,
2342 })
2343 }
2344
2345 fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2346 let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2347 let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2348 let mut maker = Maker::new(false, false);
2349 maker
2350 .make_data_type(&writer_schema, Some(&reader_schema), None)
2351 .expect("promotion should resolve")
2352 }
2353
2354 fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2355 Schema::TypeName(TypeName::Primitive(pt))
2356 }
2357 fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2358 Schema::Union(branches)
2359 }
2360
2361 #[test]
2362 fn test_date_logical_type() {
2363 let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2364
2365 let mut maker = Maker::new(false, false);
2366 let result = maker.make_data_type(&schema, None, None).unwrap();
2367
2368 assert!(matches!(result.codec, Codec::Date32));
2369 }
2370
2371 #[test]
2372 fn test_time_millis_logical_type() {
2373 let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2374
2375 let mut maker = Maker::new(false, false);
2376 let result = maker.make_data_type(&schema, None, None).unwrap();
2377
2378 assert!(matches!(result.codec, Codec::TimeMillis));
2379 }
2380
2381 #[test]
2382 fn test_time_micros_logical_type() {
2383 let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2384
2385 let mut maker = Maker::new(false, false);
2386 let result = maker.make_data_type(&schema, None, None).unwrap();
2387
2388 assert!(matches!(result.codec, Codec::TimeMicros));
2389 }
2390
2391 #[test]
2392 fn test_timestamp_millis_logical_type() {
2393 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2394
2395 let mut maker = Maker::new(false, false);
2396 let result = maker.make_data_type(&schema, None, None).unwrap();
2397
2398 assert!(matches!(result.codec, Codec::TimestampMillis(true)));
2399 }
2400
2401 #[test]
2402 fn test_timestamp_micros_logical_type() {
2403 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2404
2405 let mut maker = Maker::new(false, false);
2406 let result = maker.make_data_type(&schema, None, None).unwrap();
2407
2408 assert!(matches!(result.codec, Codec::TimestampMicros(true)));
2409 }
2410
2411 #[test]
2412 fn test_local_timestamp_millis_logical_type() {
2413 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2414
2415 let mut maker = Maker::new(false, false);
2416 let result = maker.make_data_type(&schema, None, None).unwrap();
2417
2418 assert!(matches!(result.codec, Codec::TimestampMillis(false)));
2419 }
2420
2421 #[test]
2422 fn test_local_timestamp_micros_logical_type() {
2423 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2424
2425 let mut maker = Maker::new(false, false);
2426 let result = maker.make_data_type(&schema, None, None).unwrap();
2427
2428 assert!(matches!(result.codec, Codec::TimestampMicros(false)));
2429 }
2430
2431 #[test]
2432 fn test_uuid_type() {
2433 let mut codec = Codec::Fixed(16);
2434 if let c @ Codec::Fixed(16) = &mut codec {
2435 *c = Codec::Uuid;
2436 }
2437 assert!(matches!(codec, Codec::Uuid));
2438 }
2439
2440 #[test]
2441 fn test_duration_logical_type() {
2442 let mut codec = Codec::Fixed(12);
2443
2444 if let c @ Codec::Fixed(12) = &mut codec {
2445 *c = Codec::Interval;
2446 }
2447
2448 assert!(matches!(codec, Codec::Interval));
2449 }
2450
2451 #[test]
2452 fn test_decimal_logical_type_not_implemented() {
2453 let codec = Codec::Fixed(16);
2454
2455 let process_decimal = || -> Result<(), ArrowError> {
2456 if let Codec::Fixed(_) = codec {
2457 return Err(ArrowError::NotYetImplemented(
2458 "Decimals are not currently supported".to_string(),
2459 ));
2460 }
2461 Ok(())
2462 };
2463
2464 let result = process_decimal();
2465
2466 assert!(result.is_err());
2467 if let Err(ArrowError::NotYetImplemented(msg)) = result {
2468 assert!(msg.contains("Decimals are not currently supported"));
2469 } else {
2470 panic!("Expected NotYetImplemented error");
2471 }
2472 }
2473 #[test]
2474 fn test_unknown_logical_type_added_to_metadata() {
2475 let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2476
2477 let mut maker = Maker::new(false, false);
2478 let result = maker.make_data_type(&schema, None, None).unwrap();
2479
2480 assert_eq!(
2481 result.metadata.get("logicalType"),
2482 Some(&"custom-type".to_string())
2483 );
2484 }
2485
2486 #[test]
2487 fn test_string_with_utf8view_enabled() {
2488 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2489
2490 let mut maker = Maker::new(true, false);
2491 let result = maker.make_data_type(&schema, None, None).unwrap();
2492
2493 assert!(matches!(result.codec, Codec::Utf8View));
2494 }
2495
2496 #[test]
2497 fn test_string_without_utf8view_enabled() {
2498 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2499
2500 let mut maker = Maker::new(false, false);
2501 let result = maker.make_data_type(&schema, None, None).unwrap();
2502
2503 assert!(matches!(result.codec, Codec::Utf8));
2504 }
2505
2506 #[test]
2507 fn test_record_with_string_and_utf8view_enabled() {
2508 let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2509
2510 let avro_field = crate::schema::Field {
2511 name: "string_field",
2512 r#type: field_schema,
2513 default: None,
2514 doc: None,
2515 aliases: vec![],
2516 };
2517
2518 let record = Record {
2519 name: "test_record",
2520 namespace: None,
2521 aliases: vec![],
2522 doc: None,
2523 fields: vec![avro_field],
2524 attributes: Attributes::default(),
2525 };
2526
2527 let schema = Schema::Complex(ComplexType::Record(record));
2528
2529 let mut maker = Maker::new(true, false);
2530 let result = maker.make_data_type(&schema, None, None).unwrap();
2531
2532 if let Codec::Struct(fields) = &result.codec {
2533 let first_field_codec = &fields[0].data_type().codec;
2534 assert!(matches!(first_field_codec, Codec::Utf8View));
2535 } else {
2536 panic!("Expected Struct codec");
2537 }
2538 }
2539
2540 #[test]
2541 fn test_union_with_strict_mode() {
2542 let schema = Schema::Union(vec![
2543 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2544 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2545 ]);
2546
2547 let mut maker = Maker::new(false, true);
2548 let result = maker.make_data_type(&schema, None, None);
2549
2550 assert!(result.is_err());
2551 match result {
2552 Err(ArrowError::SchemaError(msg)) => {
2553 assert!(msg.contains(
2554 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2555 ));
2556 }
2557 _ => panic!("Expected SchemaError"),
2558 }
2559 }
2560
2561 #[test]
2562 fn test_resolve_int_to_float_promotion() {
2563 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2564 assert!(matches!(result.codec, Codec::Float32));
2565 assert_eq!(
2566 result.resolution,
2567 Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2568 );
2569 }
2570
2571 #[test]
2572 fn test_resolve_int_to_double_promotion() {
2573 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2574 assert!(matches!(result.codec, Codec::Float64));
2575 assert_eq!(
2576 result.resolution,
2577 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2578 );
2579 }
2580
2581 #[test]
2582 fn test_resolve_long_to_float_promotion() {
2583 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2584 assert!(matches!(result.codec, Codec::Float32));
2585 assert_eq!(
2586 result.resolution,
2587 Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2588 );
2589 }
2590
2591 #[test]
2592 fn test_resolve_long_to_double_promotion() {
2593 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2594 assert!(matches!(result.codec, Codec::Float64));
2595 assert_eq!(
2596 result.resolution,
2597 Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2598 );
2599 }
2600
2601 #[test]
2602 fn test_resolve_float_to_double_promotion() {
2603 let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2604 assert!(matches!(result.codec, Codec::Float64));
2605 assert_eq!(
2606 result.resolution,
2607 Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2608 );
2609 }
2610
2611 #[test]
2612 fn test_resolve_string_to_bytes_promotion() {
2613 let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2614 assert!(matches!(result.codec, Codec::Binary));
2615 assert_eq!(
2616 result.resolution,
2617 Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2618 );
2619 }
2620
2621 #[test]
2622 fn test_resolve_bytes_to_string_promotion() {
2623 let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2624 assert!(matches!(result.codec, Codec::Utf8));
2625 assert_eq!(
2626 result.resolution,
2627 Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2628 );
2629 }
2630
2631 #[test]
2632 fn test_resolve_illegal_promotion_double_to_float_errors() {
2633 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2634 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2635 let mut maker = Maker::new(false, false);
2636 let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2637 assert!(result.is_err());
2638 match result {
2639 Err(ArrowError::ParseError(msg)) => {
2640 assert!(msg.contains("Illegal promotion"));
2641 }
2642 _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2643 }
2644 }
2645
2646 #[test]
2647 fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2648 let writer = Schema::Union(vec![
2649 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2650 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2651 ]);
2652 let reader = Schema::Union(vec![
2653 Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2654 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2655 ]);
2656 let mut maker = Maker::new(false, false);
2657 let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2658 assert!(matches!(result.codec, Codec::Float64));
2659 assert_eq!(
2660 result.resolution,
2661 Some(ResolutionInfo::Union(ResolvedUnion {
2662 writer_to_reader: [
2663 None,
2664 Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2665 ]
2666 .into(),
2667 writer_is_union: true,
2668 reader_is_union: true,
2669 }))
2670 );
2671 assert_eq!(result.nullability, Some(Nullability::NullFirst));
2672 }
2673
2674 #[test]
2675 fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2676 let writer = mk_union(vec![
2677 mk_primitive(PrimitiveType::String),
2678 mk_primitive(PrimitiveType::Long),
2679 ]);
2680 let reader = mk_primitive(PrimitiveType::Bytes);
2681 let mut maker = Maker::new(false, false);
2682 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2683 assert!(matches!(dt.codec(), Codec::Binary));
2684 let resolved = match dt.resolution {
2685 Some(ResolutionInfo::Union(u)) => u,
2686 other => panic!("expected union resolution info, got {other:?}"),
2687 };
2688 assert!(resolved.writer_is_union && !resolved.reader_is_union);
2689 assert_eq!(
2690 resolved.writer_to_reader.as_ref(),
2691 &[
2692 Some((0, ResolutionInfo::Promotion(Promotion::StringToBytes))),
2693 None
2694 ]
2695 );
2696 }
2697
2698 #[test]
2699 fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2700 let writer = mk_primitive(PrimitiveType::Long);
2701 let reader = mk_union(vec![
2702 mk_primitive(PrimitiveType::Long),
2703 mk_primitive(PrimitiveType::Double),
2704 ]);
2705 let mut maker = Maker::new(false, false);
2706 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2707 let resolved = match dt.resolution {
2708 Some(ResolutionInfo::Union(u)) => u,
2709 other => panic!("expected union resolution info, got {other:?}"),
2710 };
2711 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2712 assert_eq!(
2713 resolved.writer_to_reader.as_ref(),
2714 &[Some((0, ResolutionInfo::Promotion(Promotion::Direct)))]
2715 );
2716 }
2717
2718 #[test]
2719 fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2720 let writer = mk_primitive(PrimitiveType::Int);
2721 let reader = mk_union(vec![
2722 mk_primitive(PrimitiveType::Null),
2723 mk_primitive(PrimitiveType::Long),
2724 mk_primitive(PrimitiveType::String),
2725 ]);
2726 let mut maker = Maker::new(false, false);
2727 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2728 let resolved = match dt.resolution {
2729 Some(ResolutionInfo::Union(u)) => u,
2730 other => panic!("expected union resolution info, got {other:?}"),
2731 };
2732 assert_eq!(
2733 resolved.writer_to_reader.as_ref(),
2734 &[Some((1, ResolutionInfo::Promotion(Promotion::IntToLong)))]
2735 );
2736 }
2737
2738 #[test]
2739 fn test_resolve_writer_non_union_to_reader_union_preserves_inner_record_defaults() {
2740 let writer = Schema::Complex(ComplexType::Record(Record {
2744 name: "Inner",
2745 namespace: None,
2746 doc: None,
2747 aliases: vec![],
2748 fields: vec![AvroFieldSchema {
2749 name: "a",
2750 doc: None,
2751 r#type: mk_primitive(PrimitiveType::Int),
2752 default: None,
2753 aliases: vec![],
2754 }],
2755 attributes: Attributes::default(),
2756 }));
2757 let reader = mk_union(vec![
2758 Schema::Complex(ComplexType::Record(Record {
2759 name: "Inner",
2760 namespace: None,
2761 doc: None,
2762 aliases: vec![],
2763 fields: vec![
2764 AvroFieldSchema {
2765 name: "a",
2766 doc: None,
2767 r#type: mk_primitive(PrimitiveType::Int),
2768 default: None,
2769 aliases: vec![],
2770 },
2771 AvroFieldSchema {
2772 name: "b",
2773 doc: None,
2774 r#type: mk_primitive(PrimitiveType::Int),
2775 default: Some(Value::Number(serde_json::Number::from(42))),
2776 aliases: vec![],
2777 },
2778 ],
2779 attributes: Attributes::default(),
2780 })),
2781 mk_primitive(PrimitiveType::String),
2782 ]);
2783 let mut maker = Maker::new(false, false);
2784 let dt = maker
2785 .make_data_type(&writer, Some(&reader), None)
2786 .expect("resolution should succeed");
2787 let resolved = match dt.resolution.as_ref() {
2789 Some(ResolutionInfo::Union(u)) => u,
2790 other => panic!("expected union resolution info, got {other:?}"),
2791 };
2792 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2793 assert_eq!(
2794 resolved.writer_to_reader.len(),
2795 1,
2796 "expected the non-union record to resolve to a union variant"
2797 );
2798 let resolution = match resolved.writer_to_reader.first().unwrap() {
2799 Some((0, resolution)) => resolution,
2800 other => panic!("unexpected writer-to-reader table value {other:?}"),
2801 };
2802 match resolution {
2803 ResolutionInfo::Record(ResolvedRecord {
2804 writer_fields,
2805 default_fields,
2806 }) => {
2807 assert_eq!(writer_fields.len(), 1);
2808 assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2809 assert_eq!(default_fields.len(), 1);
2810 assert_eq!(default_fields[0], 1);
2811 }
2812 other => panic!("unexpected resolution {other:?}"),
2813 }
2814 let children = match dt.codec() {
2816 Codec::Union(children, _, _) => children,
2817 other => panic!("expected union codec, got {other:?}"),
2818 };
2819 let inner_fields = match children[0].codec() {
2820 Codec::Struct(f) => f,
2821 other => panic!("expected struct codec for Inner, got {other:?}"),
2822 };
2823 assert_eq!(inner_fields.len(), 2);
2824 assert_eq!(inner_fields[1].name(), "b");
2825 assert_eq!(
2826 inner_fields[1].data_type().resolution,
2827 Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2828 "field b should have DefaultValue(Int(42)) from schema resolution"
2829 );
2830 }
2831
2832 #[test]
2833 fn test_resolve_writer_union_to_reader_union_preserves_inner_record_defaults() {
2834 let writer = mk_union(vec![
2838 mk_primitive(PrimitiveType::String),
2839 Schema::Complex(ComplexType::Record(Record {
2840 name: "Inner",
2841 namespace: None,
2842 doc: None,
2843 aliases: vec![],
2844 fields: vec![AvroFieldSchema {
2845 name: "a",
2846 doc: None,
2847 r#type: mk_primitive(PrimitiveType::Int),
2848 default: None,
2849 aliases: vec![],
2850 }],
2851 attributes: Attributes::default(),
2852 })),
2853 ]);
2854 let reader = mk_union(vec![
2855 Schema::Complex(ComplexType::Record(Record {
2856 name: "Inner",
2857 namespace: None,
2858 doc: None,
2859 aliases: vec![],
2860 fields: vec![
2861 AvroFieldSchema {
2862 name: "a",
2863 doc: None,
2864 r#type: mk_primitive(PrimitiveType::Int),
2865 default: None,
2866 aliases: vec![],
2867 },
2868 AvroFieldSchema {
2869 name: "b",
2870 doc: None,
2871 r#type: mk_primitive(PrimitiveType::Int),
2872 default: Some(Value::Number(serde_json::Number::from(42))),
2873 aliases: vec![],
2874 },
2875 ],
2876 attributes: Attributes::default(),
2877 })),
2878 mk_primitive(PrimitiveType::String),
2879 ]);
2880 let mut maker = Maker::new(false, false);
2881 let dt = maker
2882 .make_data_type(&writer, Some(&reader), None)
2883 .expect("resolution should succeed");
2884 let resolved = match dt.resolution.as_ref() {
2886 Some(ResolutionInfo::Union(u)) => u,
2887 other => panic!("expected union resolution info, got {other:?}"),
2888 };
2889 assert!(resolved.writer_is_union && resolved.reader_is_union);
2890 assert_eq!(resolved.writer_to_reader.len(), 2);
2891 let resolution = match resolved.writer_to_reader[1].as_ref() {
2892 Some((0, resolution)) => resolution,
2893 other => panic!("unexpected writer-to-reader table value {other:?}"),
2894 };
2895 match resolution {
2896 ResolutionInfo::Record(ResolvedRecord {
2897 writer_fields,
2898 default_fields,
2899 }) => {
2900 assert_eq!(writer_fields.len(), 1);
2901 assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2902 assert_eq!(default_fields.len(), 1);
2903 assert_eq!(default_fields[0], 1);
2904 }
2905 other => panic!("unexpected resolution {other:?}"),
2906 }
2907 let children = match dt.codec() {
2909 Codec::Union(children, _, _) => children,
2910 other => panic!("expected union codec, got {other:?}"),
2911 };
2912 let inner_fields = match children[0].codec() {
2913 Codec::Struct(f) => f,
2914 other => panic!("expected struct codec for Inner, got {other:?}"),
2915 };
2916 assert_eq!(inner_fields.len(), 2);
2917 assert_eq!(inner_fields[1].name(), "b");
2918 assert_eq!(
2919 inner_fields[1].data_type().resolution,
2920 Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2921 "field b should have DefaultValue(Int(42)) from schema resolution"
2922 );
2923 }
2924
2925 #[test]
2926 fn test_resolve_both_nullable_unions_direct_match() {
2927 let writer = mk_union(vec![
2928 mk_primitive(PrimitiveType::Null),
2929 mk_primitive(PrimitiveType::String),
2930 ]);
2931 let reader = mk_union(vec![
2932 mk_primitive(PrimitiveType::String),
2933 mk_primitive(PrimitiveType::Null),
2934 ]);
2935 let mut maker = Maker::new(false, false);
2936 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2937 assert!(matches!(dt.codec(), Codec::Utf8));
2938 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2939 assert_eq!(
2940 dt.resolution,
2941 Some(ResolutionInfo::Union(ResolvedUnion {
2942 writer_to_reader: [
2943 None,
2944 Some((0, ResolutionInfo::Promotion(Promotion::Direct)))
2945 ]
2946 .into(),
2947 writer_is_union: true,
2948 reader_is_union: true
2949 }))
2950 );
2951 }
2952
2953 #[test]
2954 fn test_resolve_both_nullable_unions_with_promotion() {
2955 let writer = mk_union(vec![
2956 mk_primitive(PrimitiveType::Null),
2957 mk_primitive(PrimitiveType::Int),
2958 ]);
2959 let reader = mk_union(vec![
2960 mk_primitive(PrimitiveType::Double),
2961 mk_primitive(PrimitiveType::Null),
2962 ]);
2963 let mut maker = Maker::new(false, false);
2964 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2965 assert!(matches!(dt.codec(), Codec::Float64));
2966 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
2967 assert_eq!(
2968 dt.resolution,
2969 Some(ResolutionInfo::Union(ResolvedUnion {
2970 writer_to_reader: [
2971 None,
2972 Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2973 ]
2974 .into(),
2975 writer_is_union: true,
2976 reader_is_union: true
2977 }))
2978 );
2979 }
2980
2981 #[test]
2982 fn test_resolve_type_promotion() {
2983 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2984 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2985 let mut maker = Maker::new(false, false);
2986 let result = maker
2987 .make_data_type(&writer_schema, Some(&reader_schema), None)
2988 .unwrap();
2989 assert!(matches!(result.codec, Codec::Int64));
2990 assert_eq!(
2991 result.resolution,
2992 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
2993 );
2994 }
2995
2996 #[test]
2997 fn test_nested_record_type_reuse_without_namespace() {
2998 let schema_str = r#"
2999 {
3000 "type": "record",
3001 "name": "Record",
3002 "fields": [
3003 {
3004 "name": "nested",
3005 "type": {
3006 "type": "record",
3007 "name": "Nested",
3008 "fields": [
3009 { "name": "nested_int", "type": "int" }
3010 ]
3011 }
3012 },
3013 { "name": "nestedRecord", "type": "Nested" },
3014 { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
3015 { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
3016 ]
3017 }
3018 "#;
3019
3020 let schema: Schema = serde_json::from_str(schema_str).unwrap();
3021
3022 let mut maker = Maker::new(false, false);
3023 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3024
3025 if let Codec::Struct(fields) = avro_data_type.codec() {
3026 assert_eq!(fields.len(), 4);
3027
3028 assert_eq!(fields[0].name(), "nested");
3030 let nested_data_type = fields[0].data_type();
3031 if let Codec::Struct(nested_fields) = nested_data_type.codec() {
3032 assert_eq!(nested_fields.len(), 1);
3033 assert_eq!(nested_fields[0].name(), "nested_int");
3034 assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
3035 } else {
3036 panic!(
3037 "'nested' field is not a struct but {:?}",
3038 nested_data_type.codec()
3039 );
3040 }
3041
3042 assert_eq!(fields[1].name(), "nestedRecord");
3044 let nested_record_data_type = fields[1].data_type();
3045 assert_eq!(
3046 nested_record_data_type.codec().data_type(),
3047 nested_data_type.codec().data_type()
3048 );
3049
3050 assert_eq!(fields[2].name(), "nestedArray");
3052 if let Codec::List(item_type) = fields[2].data_type().codec() {
3053 assert_eq!(
3054 item_type.codec().data_type(),
3055 nested_data_type.codec().data_type()
3056 );
3057 } else {
3058 panic!("'nestedArray' field is not a list");
3059 }
3060
3061 assert_eq!(fields[3].name(), "nestedMap");
3063 if let Codec::Map(value_type) = fields[3].data_type().codec() {
3064 assert_eq!(
3065 value_type.codec().data_type(),
3066 nested_data_type.codec().data_type()
3067 );
3068 } else {
3069 panic!("'nestedMap' field is not a map");
3070 }
3071 } else {
3072 panic!("Top-level schema is not a struct");
3073 }
3074 }
3075
3076 #[test]
3077 fn test_nested_enum_type_reuse_with_namespace() {
3078 let schema_str = r#"
3079 {
3080 "type": "record",
3081 "name": "Record",
3082 "namespace": "record_ns",
3083 "fields": [
3084 {
3085 "name": "status",
3086 "type": {
3087 "type": "enum",
3088 "name": "Status",
3089 "namespace": "enum_ns",
3090 "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
3091 }
3092 },
3093 { "name": "backupStatus", "type": "enum_ns.Status" },
3094 { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
3095 { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
3096 ]
3097 }
3098 "#;
3099
3100 let schema: Schema = serde_json::from_str(schema_str).unwrap();
3101
3102 let mut maker = Maker::new(false, false);
3103 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3104
3105 if let Codec::Struct(fields) = avro_data_type.codec() {
3106 assert_eq!(fields.len(), 4);
3107
3108 assert_eq!(fields[0].name(), "status");
3110 let status_data_type = fields[0].data_type();
3111 if let Codec::Enum(symbols) = status_data_type.codec() {
3112 assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
3113 } else {
3114 panic!(
3115 "'status' field is not an enum but {:?}",
3116 status_data_type.codec()
3117 );
3118 }
3119
3120 assert_eq!(fields[1].name(), "backupStatus");
3122 let backup_status_data_type = fields[1].data_type();
3123 assert_eq!(
3124 backup_status_data_type.codec().data_type(),
3125 status_data_type.codec().data_type()
3126 );
3127
3128 assert_eq!(fields[2].name(), "statusHistory");
3130 if let Codec::List(item_type) = fields[2].data_type().codec() {
3131 assert_eq!(
3132 item_type.codec().data_type(),
3133 status_data_type.codec().data_type()
3134 );
3135 } else {
3136 panic!("'statusHistory' field is not a list");
3137 }
3138
3139 assert_eq!(fields[3].name(), "statusMap");
3141 if let Codec::Map(value_type) = fields[3].data_type().codec() {
3142 assert_eq!(
3143 value_type.codec().data_type(),
3144 status_data_type.codec().data_type()
3145 );
3146 } else {
3147 panic!("'statusMap' field is not a map");
3148 }
3149 } else {
3150 panic!("Top-level schema is not a struct");
3151 }
3152 }
3153
3154 #[test]
3155 fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
3156 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3157 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3158 let mut maker = Maker::new(false, false);
3159 let data_type = maker
3160 .make_data_type(&writer_schema, Some(&reader_schema), None)
3161 .expect("resolution should succeed");
3162 let field = AvroField {
3163 name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
3164 data_type,
3165 };
3166 assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
3167 assert!(matches!(field.data_type().codec(), Codec::Utf8));
3168 }
3169
3170 fn json_string(s: &str) -> Value {
3171 Value::String(s.to_string())
3172 }
3173
3174 fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
3175 let stored = dt
3176 .metadata
3177 .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
3178 .cloned()
3179 .unwrap_or_default();
3180 let expected = serde_json::to_string(default_json).unwrap();
3181 assert_eq!(stored, expected, "stored default metadata should match");
3182 }
3183
3184 #[test]
3185 fn test_validate_and_store_default_null_and_nullability_rules() {
3186 let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
3187 let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
3188 assert_eq!(lit, AvroLiteral::Null);
3189 assert_default_stored(&dt_null, &Value::Null);
3190 let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3191 let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
3192 assert!(
3193 err.to_string()
3194 .contains("JSON null default is only valid for `null` type"),
3195 "unexpected error: {err}"
3196 );
3197 let mut dt_int_nf =
3198 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
3199 let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
3200 assert_eq!(lit2, AvroLiteral::Null);
3201 assert_default_stored(&dt_int_nf, &Value::Null);
3202 let mut dt_int_ns =
3203 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
3204 let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
3205 assert!(
3206 err2.to_string()
3207 .contains("JSON null default is only valid for `null` type"),
3208 "unexpected error: {err2}"
3209 );
3210 }
3211
3212 #[test]
3213 fn test_validate_and_store_default_primitives_and_temporal() {
3214 let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
3215 let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
3216 assert_eq!(lit, AvroLiteral::Boolean(true));
3217 assert_default_stored(&dt_bool, &Value::Bool(true));
3218 let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3219 let lit = dt_i32
3220 .parse_and_store_default(&serde_json::json!(123))
3221 .unwrap();
3222 assert_eq!(lit, AvroLiteral::Int(123));
3223 assert_default_stored(&dt_i32, &serde_json::json!(123));
3224 let err = dt_i32
3225 .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
3226 .unwrap_err();
3227 assert!(format!("{err}").contains("out of i32 range"));
3228 let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3229 let lit = dt_i64
3230 .parse_and_store_default(&serde_json::json!(1234567890))
3231 .unwrap();
3232 assert_eq!(lit, AvroLiteral::Long(1234567890));
3233 assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
3234 let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
3235 let lit = dt_f32
3236 .parse_and_store_default(&serde_json::json!(1.25))
3237 .unwrap();
3238 assert_eq!(lit, AvroLiteral::Float(1.25));
3239 assert_default_stored(&dt_f32, &serde_json::json!(1.25));
3240 let err = dt_f32
3241 .parse_and_store_default(&serde_json::json!(1e39))
3242 .unwrap_err();
3243 assert!(format!("{err}").contains("out of f32 range"));
3244 let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3245 let lit = dt_f64
3246 .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
3247 .unwrap();
3248 assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
3249 assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
3250 let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
3251 let l = dt_str
3252 .parse_and_store_default(&json_string("hello"))
3253 .unwrap();
3254 assert_eq!(l, AvroLiteral::String("hello".into()));
3255 assert_default_stored(&dt_str, &json_string("hello"));
3256 let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
3257 let l = dt_strv
3258 .parse_and_store_default(&json_string("view"))
3259 .unwrap();
3260 assert_eq!(l, AvroLiteral::String("view".into()));
3261 assert_default_stored(&dt_strv, &json_string("view"));
3262 let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
3263 let l = dt_uuid
3264 .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
3265 .unwrap();
3266 assert_eq!(
3267 l,
3268 AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
3269 );
3270 let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
3271 let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
3272 assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
3273 let err = dt_bin
3274 .parse_and_store_default(&json_string("€")) .unwrap_err();
3276 assert!(format!("{err}").contains("Invalid codepoint"));
3277 let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
3278 let ld = dt_date
3279 .parse_and_store_default(&serde_json::json!(1))
3280 .unwrap();
3281 assert_eq!(ld, AvroLiteral::Int(1));
3282 let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
3283 let lt = dt_tmill
3284 .parse_and_store_default(&serde_json::json!(86_400_000))
3285 .unwrap();
3286 assert_eq!(lt, AvroLiteral::Int(86_400_000));
3287 let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
3288 let ltm = dt_tmicros
3289 .parse_and_store_default(&serde_json::json!(1_000_000))
3290 .unwrap();
3291 assert_eq!(ltm, AvroLiteral::Long(1_000_000));
3292 let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None);
3293 let l1 = dt_ts_milli
3294 .parse_and_store_default(&serde_json::json!(123))
3295 .unwrap();
3296 assert_eq!(l1, AvroLiteral::Long(123));
3297 let mut dt_ts_micro =
3298 AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None);
3299 let l2 = dt_ts_micro
3300 .parse_and_store_default(&serde_json::json!(456))
3301 .unwrap();
3302 assert_eq!(l2, AvroLiteral::Long(456));
3303 }
3304
3305 #[cfg(feature = "avro_custom_types")]
3306 #[test]
3307 fn test_validate_and_store_default_custom_integer_ranges() {
3308 let mut dt_i8 = AvroDataType::new(Codec::Int8, HashMap::new(), None);
3309 let lit_i8 = dt_i8
3310 .parse_and_store_default(&serde_json::json!(i8::MAX))
3311 .unwrap();
3312 assert_eq!(lit_i8, AvroLiteral::Int(i8::MAX as i32));
3313 let err_i8_high = dt_i8
3314 .parse_and_store_default(&serde_json::json!(i8::MAX as i64 + 1))
3315 .unwrap_err();
3316 assert!(err_i8_high.to_string().contains("out of i8 range"));
3317 let err_i8_low = dt_i8
3318 .parse_and_store_default(&serde_json::json!(i8::MIN as i64 - 1))
3319 .unwrap_err();
3320 assert!(err_i8_low.to_string().contains("out of i8 range"));
3321
3322 let mut dt_i16 = AvroDataType::new(Codec::Int16, HashMap::new(), None);
3323 let lit_i16 = dt_i16
3324 .parse_and_store_default(&serde_json::json!(i16::MIN))
3325 .unwrap();
3326 assert_eq!(lit_i16, AvroLiteral::Int(i16::MIN as i32));
3327 let err_i16_high = dt_i16
3328 .parse_and_store_default(&serde_json::json!(i16::MAX as i64 + 1))
3329 .unwrap_err();
3330 assert!(err_i16_high.to_string().contains("out of i16 range"));
3331 let err_i16_low = dt_i16
3332 .parse_and_store_default(&serde_json::json!(i16::MIN as i64 - 1))
3333 .unwrap_err();
3334 assert!(err_i16_low.to_string().contains("out of i16 range"));
3335
3336 let mut dt_u8 = AvroDataType::new(Codec::UInt8, HashMap::new(), None);
3337 let lit_u8 = dt_u8
3338 .parse_and_store_default(&serde_json::json!(u8::MAX))
3339 .unwrap();
3340 assert_eq!(lit_u8, AvroLiteral::Int(u8::MAX as i32));
3341 let err_u8_neg = dt_u8
3342 .parse_and_store_default(&serde_json::json!(-1))
3343 .unwrap_err();
3344 assert!(err_u8_neg.to_string().contains("out of u8 range"));
3345 let err_u8_high = dt_u8
3346 .parse_and_store_default(&serde_json::json!(u8::MAX as i64 + 1))
3347 .unwrap_err();
3348 assert!(err_u8_high.to_string().contains("out of u8 range"));
3349
3350 let mut dt_u16 = AvroDataType::new(Codec::UInt16, HashMap::new(), None);
3351 let lit_u16 = dt_u16
3352 .parse_and_store_default(&serde_json::json!(u16::MAX))
3353 .unwrap();
3354 assert_eq!(lit_u16, AvroLiteral::Int(u16::MAX as i32));
3355 let err_u16_neg = dt_u16
3356 .parse_and_store_default(&serde_json::json!(-1))
3357 .unwrap_err();
3358 assert!(err_u16_neg.to_string().contains("out of u16 range"));
3359 let err_u16_high = dt_u16
3360 .parse_and_store_default(&serde_json::json!(u16::MAX as i64 + 1))
3361 .unwrap_err();
3362 assert!(err_u16_high.to_string().contains("out of u16 range"));
3363
3364 let mut dt_u32 = AvroDataType::new(Codec::UInt32, HashMap::new(), None);
3365 let lit_u32 = dt_u32
3366 .parse_and_store_default(&serde_json::json!(u32::MAX as i64))
3367 .unwrap();
3368 assert_eq!(lit_u32, AvroLiteral::Long(u32::MAX as i64));
3369 let err_u32_neg = dt_u32
3370 .parse_and_store_default(&serde_json::json!(-1))
3371 .unwrap_err();
3372 assert!(err_u32_neg.to_string().contains("out of u32 range"));
3373 let err_u32_high = dt_u32
3374 .parse_and_store_default(&serde_json::json!(u32::MAX as i64 + 1))
3375 .unwrap_err();
3376 assert!(err_u32_high.to_string().contains("out of u32 range"));
3377 }
3378
3379 #[test]
3380 fn test_validate_and_store_default_fixed_decimal_interval() {
3381 let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
3382 let l = dt_fixed
3383 .parse_and_store_default(&json_string("WXYZ"))
3384 .unwrap();
3385 assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
3386 let err = dt_fixed
3387 .parse_and_store_default(&json_string("TOO LONG"))
3388 .unwrap_err();
3389 assert!(err.to_string().contains("Default length"));
3390 let mut dt_dec_fixed =
3391 AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
3392 let l = dt_dec_fixed
3393 .parse_and_store_default(&json_string("abc"))
3394 .unwrap();
3395 assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
3396 let err = dt_dec_fixed
3397 .parse_and_store_default(&json_string("toolong"))
3398 .unwrap_err();
3399 assert!(err.to_string().contains("Default length"));
3400 let mut dt_dec_bytes =
3401 AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3402 let l = dt_dec_bytes
3403 .parse_and_store_default(&json_string("freeform"))
3404 .unwrap();
3405 assert_eq!(
3406 l,
3407 AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
3408 );
3409 let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
3410 let l = dt_interval
3411 .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
3412 .unwrap();
3413 assert_eq!(
3414 l,
3415 AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
3416 );
3417 let err = dt_interval
3418 .parse_and_store_default(&json_string("short"))
3419 .unwrap_err();
3420 assert!(err.to_string().contains("Default length"));
3421 }
3422
3423 #[test]
3424 fn test_validate_and_store_default_enum_list_map_struct() {
3425 let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
3426 .into_iter()
3427 .collect();
3428 let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
3429 let l = dt_enum
3430 .parse_and_store_default(&json_string("GREEN"))
3431 .unwrap();
3432 assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
3433 let err = dt_enum
3434 .parse_and_store_default(&json_string("YELLOW"))
3435 .unwrap_err();
3436 assert!(err.to_string().contains("Default enum symbol"));
3437 let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3438 let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
3439 let val = serde_json::json!([1, 2, 3]);
3440 let l = dt_list.parse_and_store_default(&val).unwrap();
3441 assert_eq!(
3442 l,
3443 AvroLiteral::Array(vec![
3444 AvroLiteral::Long(1),
3445 AvroLiteral::Long(2),
3446 AvroLiteral::Long(3)
3447 ])
3448 );
3449 let err = dt_list
3450 .parse_and_store_default(&serde_json::json!({"not":"array"}))
3451 .unwrap_err();
3452 assert!(err.to_string().contains("JSON array"));
3453 let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3454 let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
3455 let mv = serde_json::json!({"x": 1.5, "y": 2.5});
3456 let l = dt_map.parse_and_store_default(&mv).unwrap();
3457 let mut expected = IndexMap::new();
3458 expected.insert("x".into(), AvroLiteral::Double(1.5));
3459 expected.insert("y".into(), AvroLiteral::Double(2.5));
3460 assert_eq!(l, AvroLiteral::Map(expected));
3461 let err = dt_map
3463 .parse_and_store_default(&serde_json::json!(123))
3464 .unwrap_err();
3465 assert!(err.to_string().contains("JSON object"));
3466 let mut field_a = AvroField {
3467 name: "a".into(),
3468 data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
3469 };
3470 let field_b = AvroField {
3471 name: "b".into(),
3472 data_type: AvroDataType::new(
3473 Codec::Int64,
3474 HashMap::new(),
3475 Some(Nullability::NullFirst),
3476 ),
3477 };
3478 let mut c_md = HashMap::new();
3479 c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
3480 let field_c = AvroField {
3481 name: "c".into(),
3482 data_type: AvroDataType::new(Codec::Utf8, c_md, None),
3483 };
3484 field_a.data_type.metadata.insert("doc".into(), "na".into());
3485 let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
3486 let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
3487 let default_obj = serde_json::json!({"a": 7});
3488 let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
3489 let mut expected = IndexMap::new();
3490 expected.insert("a".into(), AvroLiteral::Int(7));
3491 expected.insert("b".into(), AvroLiteral::Null);
3492 expected.insert("c".into(), AvroLiteral::String("xyz".into()));
3493 assert_eq!(l, AvroLiteral::Map(expected));
3494 assert_default_stored(&dt_struct, &default_obj);
3495 let req_field = AvroField {
3496 name: "req".into(),
3497 data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
3498 };
3499 let mut dt_bad = AvroDataType::new(
3500 Codec::Struct(Arc::from(vec![req_field])),
3501 HashMap::new(),
3502 None,
3503 );
3504 let err = dt_bad
3505 .parse_and_store_default(&serde_json::json!({}))
3506 .unwrap_err();
3507 assert!(
3508 err.to_string().contains("missing required subfield 'req'"),
3509 "unexpected error: {err}"
3510 );
3511 let err = dt_struct
3512 .parse_and_store_default(&serde_json::json!(10))
3513 .unwrap_err();
3514 err.to_string().contains("must be a JSON object");
3515 }
3516
3517 #[test]
3518 fn test_resolve_array_promotion_and_reader_metadata() {
3519 let mut w_add: HashMap<&str, Value> = HashMap::new();
3520 w_add.insert("who", json_string("writer"));
3521 let mut r_add: HashMap<&str, Value> = HashMap::new();
3522 r_add.insert("who", json_string("reader"));
3523 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3524 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3525 attributes: Attributes {
3526 logical_type: None,
3527 additional: w_add,
3528 },
3529 }));
3530 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3531 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
3532 attributes: Attributes {
3533 logical_type: None,
3534 additional: r_add,
3535 },
3536 }));
3537 let mut maker = Maker::new(false, false);
3538 let dt = maker
3539 .make_data_type(&writer_schema, Some(&reader_schema), None)
3540 .unwrap();
3541 assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
3542 if let Codec::List(inner) = dt.codec() {
3543 assert!(matches!(inner.codec(), Codec::Int64));
3544 assert_eq!(
3545 inner.resolution,
3546 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3547 );
3548 } else {
3549 panic!("expected list codec");
3550 }
3551 }
3552
3553 #[test]
3554 fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
3555 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3556 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3557 attributes: Attributes::default(),
3558 }));
3559 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3560 items: Box::new(mk_union(vec![
3561 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3562 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3563 ])),
3564 attributes: Attributes::default(),
3565 }));
3566 let mut maker = Maker::new(false, false);
3567 let dt = maker
3568 .make_data_type(&writer_schema, Some(&reader_schema), None)
3569 .unwrap();
3570 if let Codec::List(inner) = dt.codec() {
3571 assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
3572 assert!(matches!(inner.codec(), Codec::Int32));
3573 match inner.resolution.as_ref() {
3574 Some(ResolutionInfo::Promotion(Promotion::Direct)) => {}
3575 other => panic!("expected Union resolution, got {other:?}"),
3576 }
3577 } else {
3578 panic!("expected List codec");
3579 }
3580 }
3581
3582 #[test]
3583 fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3584 let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3585 name: "MD5",
3586 namespace: None,
3587 aliases: vec!["Hash16"],
3588 size: 16,
3589 attributes: Attributes::default(),
3590 }));
3591 let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3592 name: "Hash16",
3593 namespace: None,
3594 aliases: vec![],
3595 size: 16,
3596 attributes: Attributes::default(),
3597 }));
3598 let mut maker = Maker::new(false, false);
3599 let dt = maker
3600 .make_data_type(&writer_schema, Some(&reader_schema), None)
3601 .unwrap();
3602 assert!(matches!(dt.codec(), Codec::Fixed(16)));
3603 }
3604
3605 #[cfg(feature = "avro_custom_types")]
3606 #[test]
3607 fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3608 let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3609 name: "ArrowIntervalMDN",
3610 namespace: None,
3611 aliases: vec![],
3612 size: 16,
3613 attributes: Attributes {
3614 logical_type: Some("arrow.interval-month-day-nano"),
3615 additional: Default::default(),
3616 },
3617 }));
3618 let mut maker = Maker::new(false, false);
3619 let dt = maker.make_data_type(&schema, None, None).unwrap();
3620 assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3621 assert_eq!(
3622 dt.codec.data_type(),
3623 DataType::Interval(IntervalUnit::MonthDayNano)
3624 );
3625 }
3626
3627 #[test]
3628 fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3629 let writer = Schema::Complex(ComplexType::Record(Record {
3630 name: "R",
3631 namespace: None,
3632 doc: None,
3633 aliases: vec![],
3634 fields: vec![
3635 crate::schema::Field {
3636 name: "a",
3637 doc: None,
3638 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3639 default: None,
3640 aliases: vec![],
3641 },
3642 crate::schema::Field {
3643 name: "skipme",
3644 doc: None,
3645 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3646 default: None,
3647 aliases: vec![],
3648 },
3649 crate::schema::Field {
3650 name: "b",
3651 doc: None,
3652 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3653 default: None,
3654 aliases: vec![],
3655 },
3656 ],
3657 attributes: Attributes::default(),
3658 }));
3659 let reader = Schema::Complex(ComplexType::Record(Record {
3660 name: "R",
3661 namespace: None,
3662 doc: None,
3663 aliases: vec![],
3664 fields: vec![
3665 crate::schema::Field {
3666 name: "b",
3667 doc: None,
3668 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3669 default: None,
3670 aliases: vec![],
3671 },
3672 crate::schema::Field {
3673 name: "a",
3674 doc: None,
3675 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3676 default: None,
3677 aliases: vec![],
3678 },
3679 crate::schema::Field {
3680 name: "name",
3681 doc: None,
3682 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3683 default: Some(json_string("anon")),
3684 aliases: vec![],
3685 },
3686 crate::schema::Field {
3687 name: "opt",
3688 doc: None,
3689 r#type: Schema::Union(vec![
3690 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3691 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3692 ]),
3693 default: None, aliases: vec![],
3695 },
3696 ],
3697 attributes: Attributes::default(),
3698 }));
3699 let mut maker = Maker::new(false, false);
3700 let dt = maker
3701 .make_data_type(&writer, Some(&reader), None)
3702 .expect("record resolution");
3703 let fields = match dt.codec() {
3704 Codec::Struct(f) => f,
3705 other => panic!("expected struct, got {other:?}"),
3706 };
3707 assert_eq!(fields.len(), 4);
3708 assert_eq!(fields[0].name(), "b");
3709 assert_eq!(fields[1].name(), "a");
3710 assert_eq!(fields[2].name(), "name");
3711 assert_eq!(fields[3].name(), "opt");
3712 assert!(matches!(
3713 fields[1].data_type().resolution,
3714 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3715 ));
3716 let rec = match dt.resolution {
3717 Some(ResolutionInfo::Record(ref r)) => r.clone(),
3718 other => panic!("expected record resolution, got {other:?}"),
3719 };
3720 assert!(matches!(
3721 &rec.writer_fields[..],
3722 &[
3723 ResolvedField::ToReader(1),
3724 ResolvedField::Skip(_),
3725 ResolvedField::ToReader(0),
3726 ]
3727 ));
3728 assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3729 let ResolvedField::Skip(skip1) = &rec.writer_fields[1] else {
3730 panic!("should skip field 1")
3731 };
3732 assert!(matches!(skip1.codec(), Codec::Utf8));
3733 let name_md = &fields[2].data_type().metadata;
3734 assert_eq!(
3735 name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3736 Some(&"\"anon\"".to_string())
3737 );
3738 let opt_md = &fields[3].data_type().metadata;
3739 assert_eq!(
3740 opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3741 Some(&"null".to_string())
3742 );
3743 }
3744
3745 #[test]
3746 fn test_named_type_alias_resolution_record_cross_namespace() {
3747 let writer_record = Record {
3748 name: "PersonV2",
3749 namespace: Some("com.example.v2"),
3750 doc: None,
3751 aliases: vec!["com.example.Person"],
3752 fields: vec![
3753 AvroFieldSchema {
3754 name: "name",
3755 doc: None,
3756 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3757 default: None,
3758 aliases: vec![],
3759 },
3760 AvroFieldSchema {
3761 name: "age",
3762 doc: None,
3763 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3764 default: None,
3765 aliases: vec![],
3766 },
3767 ],
3768 attributes: Attributes::default(),
3769 };
3770 let reader_record = Record {
3771 name: "Person",
3772 namespace: Some("com.example"),
3773 doc: None,
3774 aliases: vec![],
3775 fields: writer_record.fields.clone(),
3776 attributes: Attributes::default(),
3777 };
3778 let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3779 let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3780 let mut maker = Maker::new(false, false);
3781 let result = maker
3782 .make_data_type(&writer_schema, Some(&reader_schema), None)
3783 .expect("record alias resolution should succeed");
3784 match result.codec {
3785 Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3786 other => panic!("expected struct, got {other:?}"),
3787 }
3788 }
3789
3790 #[test]
3791 fn test_named_type_alias_resolution_enum_cross_namespace() {
3792 let writer_enum = Enum {
3793 name: "ColorV2",
3794 namespace: Some("org.example.v2"),
3795 doc: None,
3796 aliases: vec!["org.example.Color"],
3797 symbols: vec!["RED", "GREEN", "BLUE"],
3798 default: None,
3799 attributes: Attributes::default(),
3800 };
3801 let reader_enum = Enum {
3802 name: "Color",
3803 namespace: Some("org.example"),
3804 doc: None,
3805 aliases: vec![],
3806 symbols: vec!["RED", "GREEN", "BLUE"],
3807 default: None,
3808 attributes: Attributes::default(),
3809 };
3810 let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3811 let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3812 let mut maker = Maker::new(false, false);
3813 maker
3814 .make_data_type(&writer_schema, Some(&reader_schema), None)
3815 .expect("enum alias resolution should succeed");
3816 }
3817
3818 #[test]
3819 fn test_named_type_alias_resolution_fixed_cross_namespace() {
3820 let writer_fixed = Fixed {
3821 name: "Fx10V2",
3822 namespace: Some("ns.v2"),
3823 aliases: vec!["ns.Fx10"],
3824 size: 10,
3825 attributes: Attributes::default(),
3826 };
3827 let reader_fixed = Fixed {
3828 name: "Fx10",
3829 namespace: Some("ns"),
3830 aliases: vec![],
3831 size: 10,
3832 attributes: Attributes::default(),
3833 };
3834 let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3835 let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3836 let mut maker = Maker::new(false, false);
3837 maker
3838 .make_data_type(&writer_schema, Some(&reader_schema), None)
3839 .expect("fixed alias resolution should succeed");
3840 }
3841}