1use crate::schema::{
21 AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY,
22 AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability,
23 PrimitiveType, Record, Schema, Type, TypeName, make_full_name,
24};
25use arrow_schema::{
26 ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field, Fields,
27 IntervalUnit, TimeUnit, UnionFields, UnionMode,
28};
29#[cfg(feature = "small_decimals")]
30use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::fmt;
36use std::fmt::Display;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39
40#[derive(Debug, Clone, PartialEq)]
42pub(crate) enum ResolutionInfo {
43 Promotion(Promotion),
45 DefaultValue(AvroLiteral),
47 EnumMapping(EnumMapping),
49 Record(ResolvedRecord),
51 Union(ResolvedUnion),
53}
54
55#[derive(Debug, Clone, PartialEq)]
59pub(crate) enum AvroLiteral {
60 Null,
62 Boolean(bool),
64 Int(i32),
66 Long(i64),
68 Float(f32),
70 Double(f64),
72 Bytes(Vec<u8>),
74 String(String),
76 Enum(String),
78 Array(Vec<AvroLiteral>),
80 Map(IndexMap<String, AvroLiteral>),
82}
83
84#[derive(Debug, Clone, PartialEq)]
86pub(crate) struct ResolvedRecord {
87 pub(crate) writer_fields: Arc<[ResolvedField]>,
89 pub(crate) default_fields: Arc<[usize]>,
91}
92
93#[derive(Debug, Clone, PartialEq)]
95pub(crate) enum ResolvedField {
96 ToReader(usize),
98 Skip(AvroDataType),
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub(crate) enum Promotion {
109 Direct,
111 IntToLong,
113 IntToFloat,
115 IntToDouble,
117 LongToFloat,
119 LongToDouble,
121 FloatToDouble,
123 StringToBytes,
125 BytesToString,
127}
128
129impl Display for Promotion {
130 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
131 match self {
132 Self::Direct => write!(formatter, "Direct"),
133 Self::IntToLong => write!(formatter, "Int->Long"),
134 Self::IntToFloat => write!(formatter, "Int->Float"),
135 Self::IntToDouble => write!(formatter, "Int->Double"),
136 Self::LongToFloat => write!(formatter, "Long->Float"),
137 Self::LongToDouble => write!(formatter, "Long->Double"),
138 Self::FloatToDouble => write!(formatter, "Float->Double"),
139 Self::StringToBytes => write!(formatter, "String->Bytes"),
140 Self::BytesToString => write!(formatter, "Bytes->String"),
141 }
142 }
143}
144
145#[derive(Debug, Clone, PartialEq)]
147pub(crate) struct ResolvedUnion {
148 pub(crate) writer_to_reader: Arc<[Option<(usize, ResolutionInfo)>]>,
151 pub(crate) writer_is_union: bool,
153 pub(crate) reader_is_union: bool,
155}
156
157#[derive(Debug, Clone, PartialEq, Eq)]
161pub(crate) struct EnumMapping {
162 pub(crate) mapping: Arc<[i32]>,
164 pub(crate) default_index: i32,
167}
168
169#[cfg(feature = "canonical_extension_types")]
170fn with_extension_type(codec: &Codec, field: Field) -> Field {
171 match codec {
172 Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
173 _ => field,
174 }
175}
176
177#[derive(Debug, Clone, PartialEq)]
179pub(crate) struct AvroDataType {
180 nullability: Option<Nullability>,
181 metadata: HashMap<String, String>,
182 codec: Codec,
183 pub(crate) resolution: Option<ResolutionInfo>,
184}
185
186impl AvroDataType {
187 pub(crate) fn new(
189 codec: Codec,
190 metadata: HashMap<String, String>,
191 nullability: Option<Nullability>,
192 ) -> Self {
193 AvroDataType {
194 codec,
195 metadata,
196 nullability,
197 resolution: None,
198 }
199 }
200
201 #[inline]
202 fn new_with_resolution(
203 codec: Codec,
204 metadata: HashMap<String, String>,
205 nullability: Option<Nullability>,
206 resolution: Option<ResolutionInfo>,
207 ) -> Self {
208 Self {
209 codec,
210 metadata,
211 nullability,
212 resolution,
213 }
214 }
215
216 pub(crate) fn field_with_name(&self, name: &str) -> Field {
218 let mut nullable = self.nullability.is_some();
219 if !nullable {
220 if let Codec::Union(children, _, _) = self.codec() {
221 if children.iter().any(|c| matches!(c.codec(), Codec::Null)) {
223 nullable = true;
224 }
225 }
226 }
227 let data_type = self.codec.data_type();
228 let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
229 #[cfg(feature = "canonical_extension_types")]
230 return with_extension_type(&self.codec, field);
231 #[cfg(not(feature = "canonical_extension_types"))]
232 field
233 }
234
235 pub(crate) fn codec(&self) -> &Codec {
240 &self.codec
241 }
242
243 pub(crate) fn nullability(&self) -> Option<Nullability> {
251 self.nullability
252 }
253
254 #[inline]
255 fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
256 fn expect_string<'v>(
257 default_json: &'v Value,
258 data_type: &str,
259 ) -> Result<&'v str, ArrowError> {
260 match default_json {
261 Value::String(s) => Ok(s.as_str()),
262 _ => Err(ArrowError::SchemaError(format!(
263 "Default value must be a JSON string for {data_type}"
264 ))),
265 }
266 }
267
268 fn parse_bytes_default(
269 default_json: &Value,
270 expected_len: Option<usize>,
271 ) -> Result<Vec<u8>, ArrowError> {
272 let s = expect_string(default_json, "bytes/fixed logical types")?;
273 let mut out = Vec::with_capacity(s.len());
274 for ch in s.chars() {
275 let cp = ch as u32;
276 if cp > 0xFF {
277 return Err(ArrowError::SchemaError(format!(
278 "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF"
279 )));
280 }
281 out.push(cp as u8);
282 }
283 if let Some(len) = expected_len {
284 if out.len() != len {
285 return Err(ArrowError::SchemaError(format!(
286 "Default length {} does not match expected fixed size {len}",
287 out.len(),
288 )));
289 }
290 }
291 Ok(out)
292 }
293
294 fn parse_json_i64(default_json: &Value, data_type: &str) -> Result<i64, ArrowError> {
295 match default_json {
296 Value::Number(n) => n.as_i64().ok_or_else(|| {
297 ArrowError::SchemaError(format!("Default {data_type} must be an integer"))
298 }),
299 _ => Err(ArrowError::SchemaError(format!(
300 "Default {data_type} must be a JSON integer"
301 ))),
302 }
303 }
304
305 fn parse_json_f64(default_json: &Value, data_type: &str) -> Result<f64, ArrowError> {
306 match default_json {
307 Value::Number(n) => n.as_f64().ok_or_else(|| {
308 ArrowError::SchemaError(format!("Default {data_type} must be a number"))
309 }),
310 _ => Err(ArrowError::SchemaError(format!(
311 "Default {data_type} must be a JSON number"
312 ))),
313 }
314 }
315
316 if default_json.is_null() {
318 return match self.codec() {
319 Codec::Null => Ok(AvroLiteral::Null),
320 Codec::Union(encodings, _, _) if !encodings.is_empty()
321 && matches!(encodings[0].codec(), Codec::Null) =>
322 {
323 Ok(AvroLiteral::Null)
324 }
325 _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
326 _ => Err(ArrowError::SchemaError(
327 "JSON null default is only valid for `null` type or for a union whose first branch is `null`"
328 .to_string(),
329 )),
330 };
331 }
332 let lit = match self.codec() {
333 Codec::Null => {
334 return Err(ArrowError::SchemaError(
335 "Default for `null` type must be JSON null".to_string(),
336 ));
337 }
338 Codec::Boolean => match default_json {
339 Value::Bool(b) => AvroLiteral::Boolean(*b),
340 _ => {
341 return Err(ArrowError::SchemaError(
342 "Boolean default must be a JSON boolean".to_string(),
343 ));
344 }
345 },
346 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
347 let i = parse_json_i64(default_json, "int")?;
348 if i < i32::MIN as i64 || i > i32::MAX as i64 {
349 return Err(ArrowError::SchemaError(format!(
350 "Default int {i} out of i32 range"
351 )));
352 }
353 AvroLiteral::Int(i as i32)
354 }
355 Codec::Int64
356 | Codec::TimeMicros
357 | Codec::TimestampMillis(_)
358 | Codec::TimestampMicros(_)
359 | Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
360 #[cfg(feature = "avro_custom_types")]
361 Codec::DurationNanos
362 | Codec::DurationMicros
363 | Codec::DurationMillis
364 | Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
365 #[cfg(feature = "avro_custom_types")]
366 Codec::Int8 => {
367 let i = parse_json_i64(default_json, "int")?;
368 if i < i8::MIN as i64 || i > i8::MAX as i64 {
369 return Err(ArrowError::SchemaError(format!(
370 "Default int8 {i} out of i8 range"
371 )));
372 }
373 AvroLiteral::Int(i as i32)
374 }
375 #[cfg(feature = "avro_custom_types")]
376 Codec::Int16 => {
377 let i = parse_json_i64(default_json, "int")?;
378 if i < i16::MIN as i64 || i > i16::MAX as i64 {
379 return Err(ArrowError::SchemaError(format!(
380 "Default int16 {i} out of i16 range"
381 )));
382 }
383 AvroLiteral::Int(i as i32)
384 }
385 #[cfg(feature = "avro_custom_types")]
386 Codec::UInt8 => {
387 let i = parse_json_i64(default_json, "int")?;
388 if i < 0 || i > u8::MAX as i64 {
389 return Err(ArrowError::SchemaError(format!(
390 "Default uint8 {i} out of u8 range"
391 )));
392 }
393 AvroLiteral::Int(i as i32)
394 }
395 #[cfg(feature = "avro_custom_types")]
396 Codec::UInt16 => {
397 let i = parse_json_i64(default_json, "int")?;
398 if i < 0 || i > u16::MAX as i64 {
399 return Err(ArrowError::SchemaError(format!(
400 "Default uint16 {i} out of u16 range"
401 )));
402 }
403 AvroLiteral::Int(i as i32)
404 }
405 #[cfg(feature = "avro_custom_types")]
406 Codec::UInt32 => {
407 let i = parse_json_i64(default_json, "long")?;
408 if i < 0 || i > u32::MAX as i64 {
409 return Err(ArrowError::SchemaError(format!(
410 "Default uint32 {i} out of u32 range"
411 )));
412 }
413 AvroLiteral::Long(i)
414 }
415 #[cfg(feature = "avro_custom_types")]
416 Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
417 AvroLiteral::Long(parse_json_i64(default_json, "long")?)
418 }
419 #[cfg(feature = "avro_custom_types")]
420 Codec::UInt64 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?),
421 #[cfg(feature = "avro_custom_types")]
422 Codec::Float16 => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(2))?),
423 #[cfg(feature = "avro_custom_types")]
424 Codec::Time32Secs => {
425 let i = parse_json_i64(default_json, "int")?;
426 if i < i32::MIN as i64 || i > i32::MAX as i64 {
427 return Err(ArrowError::SchemaError(format!(
428 "Default time32-secs {i} out of i32 range"
429 )));
430 }
431 AvroLiteral::Int(i as i32)
432 }
433 #[cfg(feature = "avro_custom_types")]
434 Codec::IntervalYearMonth => {
435 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
436 }
437 #[cfg(feature = "avro_custom_types")]
438 Codec::IntervalMonthDayNano => {
439 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
440 }
441 #[cfg(feature = "avro_custom_types")]
442 Codec::IntervalDayTime => {
443 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
444 }
445 Codec::Float32 => {
446 let f = parse_json_f64(default_json, "float")?;
447 if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
448 return Err(ArrowError::SchemaError(format!(
449 "Default float {f} out of f32 range or not finite"
450 )));
451 }
452 AvroLiteral::Float(f as f32)
453 }
454 Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?),
455 Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
456 AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string())
457 }
458 Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
459 Codec::Fixed(sz) => {
460 AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?)
461 }
462 Codec::Decimal(_, _, fixed_size) => {
463 AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?)
464 }
465 Codec::Enum(symbols) => {
466 let s = expect_string(default_json, "enum")?;
467 if symbols.iter().any(|sym| sym == s) {
468 AvroLiteral::Enum(s.to_string())
469 } else {
470 return Err(ArrowError::SchemaError(format!(
471 "Default enum symbol {s:?} not found in reader enum symbols"
472 )));
473 }
474 }
475 Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
476 Codec::List(item_dt) => match default_json {
477 Value::Array(items) => AvroLiteral::Array(
478 items
479 .iter()
480 .map(|v| item_dt.parse_default_literal(v))
481 .collect::<Result<_, _>>()?,
482 ),
483 _ => {
484 return Err(ArrowError::SchemaError(
485 "Default value must be a JSON array for Avro array type".to_string(),
486 ));
487 }
488 },
489 Codec::Map(val_dt) => match default_json {
490 Value::Object(map) => {
491 let mut out = IndexMap::with_capacity(map.len());
492 for (k, v) in map {
493 out.insert(k.clone(), val_dt.parse_default_literal(v)?);
494 }
495 AvroLiteral::Map(out)
496 }
497 _ => {
498 return Err(ArrowError::SchemaError(
499 "Default value must be a JSON object for Avro map type".to_string(),
500 ));
501 }
502 },
503 Codec::Struct(fields) => match default_json {
504 Value::Object(obj) => {
505 let mut out: IndexMap<String, AvroLiteral> =
506 IndexMap::with_capacity(fields.len());
507 for f in fields.as_ref() {
508 let name = f.name().to_string();
509 if let Some(sub) = obj.get(&name) {
510 out.insert(name, f.data_type().parse_default_literal(sub)?);
511 } else {
512 let stored_default =
514 f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
515 if stored_default.is_none()
516 && f.data_type().nullability() == Some(Nullability::default())
517 {
518 out.insert(name, AvroLiteral::Null);
519 } else if let Some(default_json) = stored_default {
520 let v: Value =
521 serde_json::from_str(default_json).map_err(|e| {
522 ArrowError::SchemaError(format!(
523 "Failed to parse stored subfield default JSON for '{}': {e}",
524 f.name(),
525 ))
526 })?;
527 out.insert(name, f.data_type().parse_default_literal(&v)?);
528 } else {
529 return Err(ArrowError::SchemaError(format!(
530 "Record default missing required subfield '{}' with non-nullable type {:?}",
531 f.name(),
532 f.data_type().codec()
533 )));
534 }
535 }
536 }
537 AvroLiteral::Map(out)
538 }
539 _ => {
540 return Err(ArrowError::SchemaError(
541 "Default value for record/struct must be a JSON object".to_string(),
542 ));
543 }
544 },
545 Codec::Union(encodings, _, _) => {
546 let Some(default_encoding) = encodings.first() else {
547 return Err(ArrowError::SchemaError(
548 "Union with no branches cannot have a default".to_string(),
549 ));
550 };
551 default_encoding.parse_default_literal(default_json)?
552 }
553 #[cfg(feature = "avro_custom_types")]
554 Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?,
555 };
556 Ok(lit)
557 }
558
559 fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> {
560 let json_text = serde_json::to_string(default_json).map_err(|e| {
561 ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}"))
562 })?;
563 self.metadata
564 .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
565 Ok(())
566 }
567
568 fn parse_and_store_default(&mut self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
569 let lit = self.parse_default_literal(default_json)?;
570 self.store_default(default_json)?;
571 Ok(lit)
572 }
573}
574
575#[derive(Debug, Clone, PartialEq)]
577pub(crate) struct AvroField {
578 name: String,
579 data_type: AvroDataType,
580}
581
582impl AvroField {
583 pub(crate) fn field(&self) -> Field {
585 self.data_type.field_with_name(&self.name)
586 }
587
588 pub(crate) fn data_type(&self) -> &AvroDataType {
590 &self.data_type
591 }
592
593 pub(crate) fn with_utf8view(&self) -> Self {
602 let mut field = self.clone();
603 if let Codec::Utf8 = field.data_type.codec {
604 field.data_type.codec = Codec::Utf8View;
605 }
606 field
607 }
608
609 pub(crate) fn name(&self) -> &str {
614 &self.name
615 }
616}
617
618impl<'a> TryFrom<&Schema<'a>> for AvroField {
619 type Error = ArrowError;
620
621 fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
622 match schema {
623 Schema::Complex(ComplexType::Record(r)) => {
624 let mut resolver = Maker::new(false, false, Tz::default());
625 let data_type = resolver.make_data_type(schema, None, None)?;
626 Ok(AvroField {
627 data_type,
628 name: r.name.to_string(),
629 })
630 }
631 _ => Err(ArrowError::ParseError(format!(
632 "Expected record got {schema:?}"
633 ))),
634 }
635 }
636}
637
638#[derive(Debug)]
640pub(crate) struct AvroFieldBuilder<'a> {
641 writer_schema: &'a Schema<'a>,
642 reader_schema: Option<&'a Schema<'a>>,
643 use_utf8view: bool,
644 strict_mode: bool,
645 tz: Tz,
646}
647
648impl<'a> AvroFieldBuilder<'a> {
649 pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self {
651 Self {
652 writer_schema,
653 reader_schema: None,
654 use_utf8view: false,
655 strict_mode: false,
656 tz: Tz::default(),
657 }
658 }
659
660 #[inline]
665 pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self {
666 self.reader_schema = Some(reader_schema);
667 self
668 }
669
670 pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self {
672 self.use_utf8view = use_utf8view;
673 self
674 }
675
676 pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
678 self.strict_mode = strict_mode;
679 self
680 }
681
682 pub(crate) fn with_tz(mut self, tz: Tz) -> Self {
684 self.tz = tz;
685 self
686 }
687
688 pub(crate) fn build(self) -> Result<AvroField, ArrowError> {
690 match self.writer_schema {
691 Schema::Complex(ComplexType::Record(r)) => {
692 let mut resolver = Maker::new(self.use_utf8view, self.strict_mode, self.tz);
693 let data_type =
694 resolver.make_data_type(self.writer_schema, self.reader_schema, None)?;
695 Ok(AvroField {
696 name: r.name.to_string(),
697 data_type,
698 })
699 }
700 _ => Err(ArrowError::ParseError(format!(
701 "Expected a Record schema to build an AvroField, but got {:?}",
702 self.writer_schema
703 ))),
704 }
705 }
706}
707
708#[derive(Debug, Copy, Clone, PartialEq, Default)]
714pub enum Tz {
715 #[default]
717 OffsetZero,
718 Utc,
720}
721
722impl Tz {
723 pub fn as_str(&self) -> &'static str {
725 match self {
726 Self::OffsetZero => "+00:00",
727 Self::Utc => "UTC",
728 }
729 }
730}
731
732impl Display for Tz {
733 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
734 f.write_str(self.as_str())
735 }
736}
737
738#[derive(Debug, Clone, PartialEq)]
742pub(crate) enum Codec {
743 Null,
745 Boolean,
747 Int32,
749 Int64,
751 Float32,
753 Float64,
755 Binary,
757 Utf8,
759 Utf8View,
764 Date32,
766 TimeMillis,
768 TimeMicros,
770 TimestampMillis(Option<Tz>),
775 TimestampMicros(Option<Tz>),
780 TimestampNanos(Option<Tz>),
785 Fixed(i32),
788 Decimal(usize, Option<usize>, Option<usize>),
795 Uuid,
797 Enum(Arc<[String]>),
801 List(Arc<AvroDataType>),
803 Struct(Arc<[AvroField]>),
805 Map(Arc<AvroDataType>),
807 Interval,
809 Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
811 #[cfg(feature = "avro_custom_types")]
813 DurationNanos,
814 #[cfg(feature = "avro_custom_types")]
816 DurationMicros,
817 #[cfg(feature = "avro_custom_types")]
819 DurationMillis,
820 #[cfg(feature = "avro_custom_types")]
822 DurationSeconds,
823 #[cfg(feature = "avro_custom_types")]
824 RunEndEncoded(Arc<AvroDataType>, u8),
825 #[cfg(feature = "avro_custom_types")]
827 Int8,
828 #[cfg(feature = "avro_custom_types")]
830 Int16,
831 #[cfg(feature = "avro_custom_types")]
833 UInt8,
834 #[cfg(feature = "avro_custom_types")]
836 UInt16,
837 #[cfg(feature = "avro_custom_types")]
839 UInt32,
840 #[cfg(feature = "avro_custom_types")]
842 UInt64,
843 #[cfg(feature = "avro_custom_types")]
845 Float16,
846 #[cfg(feature = "avro_custom_types")]
848 Date64,
849 #[cfg(feature = "avro_custom_types")]
851 TimeNanos,
852 #[cfg(feature = "avro_custom_types")]
854 Time32Secs,
855 #[cfg(feature = "avro_custom_types")]
858 TimestampSecs(bool),
859 #[cfg(feature = "avro_custom_types")]
861 IntervalYearMonth,
862 #[cfg(feature = "avro_custom_types")]
864 IntervalMonthDayNano,
865 #[cfg(feature = "avro_custom_types")]
867 IntervalDayTime,
868}
869
870impl Codec {
871 fn data_type(&self) -> DataType {
872 match self {
873 Self::Null => DataType::Null,
874 Self::Boolean => DataType::Boolean,
875 Self::Int32 => DataType::Int32,
876 Self::Int64 => DataType::Int64,
877 Self::Float32 => DataType::Float32,
878 Self::Float64 => DataType::Float64,
879 Self::Binary => DataType::Binary,
880 Self::Utf8 => DataType::Utf8,
881 Self::Utf8View => DataType::Utf8View,
882 Self::Date32 => DataType::Date32,
883 Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
884 Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
885 Self::TimestampMillis(tz) => DataType::Timestamp(
886 TimeUnit::Millisecond,
887 tz.as_ref().map(|tz| tz.as_str().into()),
888 ),
889 Self::TimestampMicros(tz) => DataType::Timestamp(
890 TimeUnit::Microsecond,
891 tz.as_ref().map(|tz| tz.as_str().into()),
892 ),
893 Self::TimestampNanos(tz) => DataType::Timestamp(
894 TimeUnit::Nanosecond,
895 tz.as_ref().map(|tz| tz.as_str().into()),
896 ),
897 Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
898 Self::Fixed(size) => DataType::FixedSizeBinary(*size),
899 Self::Decimal(precision, scale, _size) => {
900 let p = *precision as u8;
901 let s = scale.unwrap_or(0) as i8;
902 #[cfg(feature = "small_decimals")]
903 {
904 if *precision <= DECIMAL32_MAX_PRECISION as usize {
905 DataType::Decimal32(p, s)
906 } else if *precision <= DECIMAL64_MAX_PRECISION as usize {
907 DataType::Decimal64(p, s)
908 } else if *precision <= DECIMAL128_MAX_PRECISION as usize {
909 DataType::Decimal128(p, s)
910 } else {
911 DataType::Decimal256(p, s)
912 }
913 }
914 #[cfg(not(feature = "small_decimals"))]
915 {
916 if *precision <= DECIMAL128_MAX_PRECISION as usize {
917 DataType::Decimal128(p, s)
918 } else {
919 DataType::Decimal256(p, s)
920 }
921 }
922 }
923 Self::Uuid => DataType::FixedSizeBinary(16),
924 Self::Enum(_) => {
925 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
926 }
927 Self::List(f) => {
928 DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
929 }
930 Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
931 Self::Map(value_type) => {
932 let val_field = value_type.field_with_name("value");
933 DataType::Map(
934 Arc::new(Field::new(
935 "entries",
936 DataType::Struct(Fields::from(vec![
937 Field::new("key", DataType::Utf8, false),
938 val_field,
939 ])),
940 false,
941 )),
942 false,
943 )
944 }
945 Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
946 #[cfg(feature = "avro_custom_types")]
947 Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
948 #[cfg(feature = "avro_custom_types")]
949 Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
950 #[cfg(feature = "avro_custom_types")]
951 Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
952 #[cfg(feature = "avro_custom_types")]
953 Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
954 #[cfg(feature = "avro_custom_types")]
955 Self::RunEndEncoded(values, bits) => {
956 let run_ends_dt = match *bits {
957 16 => DataType::Int16,
958 32 => DataType::Int32,
959 64 => DataType::Int64,
960 _ => unreachable!(),
961 };
962 DataType::RunEndEncoded(
963 Arc::new(Field::new("run_ends", run_ends_dt, false)),
964 Arc::new(Field::new("values", values.codec().data_type(), true)),
965 )
966 }
967 #[cfg(feature = "avro_custom_types")]
968 Self::Int8 => DataType::Int8,
969 #[cfg(feature = "avro_custom_types")]
970 Self::Int16 => DataType::Int16,
971 #[cfg(feature = "avro_custom_types")]
972 Self::UInt8 => DataType::UInt8,
973 #[cfg(feature = "avro_custom_types")]
974 Self::UInt16 => DataType::UInt16,
975 #[cfg(feature = "avro_custom_types")]
976 Self::UInt32 => DataType::UInt32,
977 #[cfg(feature = "avro_custom_types")]
978 Self::UInt64 => DataType::UInt64,
979 #[cfg(feature = "avro_custom_types")]
980 Self::Float16 => DataType::Float16,
981 #[cfg(feature = "avro_custom_types")]
982 Self::Date64 => DataType::Date64,
983 #[cfg(feature = "avro_custom_types")]
984 Self::TimeNanos => DataType::Time64(TimeUnit::Nanosecond),
985 #[cfg(feature = "avro_custom_types")]
986 Self::Time32Secs => DataType::Time32(TimeUnit::Second),
987 #[cfg(feature = "avro_custom_types")]
988 Self::TimestampSecs(is_utc) => {
989 DataType::Timestamp(TimeUnit::Second, is_utc.then(|| "+00:00".into()))
990 }
991 #[cfg(feature = "avro_custom_types")]
992 Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
993 #[cfg(feature = "avro_custom_types")]
994 Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
995 #[cfg(feature = "avro_custom_types")]
996 Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
997 }
998 }
999
1000 pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self {
1006 if use_utf8view && matches!(self, Self::Utf8) {
1007 Self::Utf8View
1008 } else {
1009 self
1010 }
1011 }
1012
1013 #[inline]
1014 fn union_field_name(&self) -> String {
1015 UnionFieldKind::from(self).as_ref().to_owned()
1016 }
1017}
1018
1019impl From<PrimitiveType> for Codec {
1020 fn from(value: PrimitiveType) -> Self {
1021 match value {
1022 PrimitiveType::Null => Self::Null,
1023 PrimitiveType::Boolean => Self::Boolean,
1024 PrimitiveType::Int => Self::Int32,
1025 PrimitiveType::Long => Self::Int64,
1026 PrimitiveType::Float => Self::Float32,
1027 PrimitiveType::Double => Self::Float64,
1028 PrimitiveType::Bytes => Self::Binary,
1029 PrimitiveType::String => Self::Utf8,
1030 }
1031 }
1032}
1033
1034const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
1043 const MAX_P: [usize; 32] = [
1048 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
1049 59, 62, 64, 67, 69, 71, 74, 76,
1050 ];
1051 match n {
1052 1..=32 => Some(MAX_P[n - 1]),
1053 _ => None,
1054 }
1055}
1056
1057fn parse_decimal_attributes(
1058 attributes: &Attributes,
1059 fallback_size: Option<usize>,
1060 precision_required: bool,
1061) -> Result<(usize, usize, Option<usize>), ArrowError> {
1062 let precision = attributes
1063 .additional
1064 .get("precision")
1065 .and_then(|v| v.as_u64())
1066 .or(if precision_required { None } else { Some(10) })
1067 .ok_or_else(|| ArrowError::ParseError("Decimal requires precision".to_string()))?
1068 as usize;
1069 let scale = attributes
1070 .additional
1071 .get("scale")
1072 .and_then(|v| v.as_u64())
1073 .unwrap_or(0) as usize;
1074 let size = attributes
1075 .additional
1076 .get("size")
1077 .and_then(|v| v.as_u64())
1078 .map(|s| s as usize)
1079 .or(fallback_size);
1080 if precision == 0 {
1081 return Err(ArrowError::ParseError(
1082 "Decimal requires precision > 0".to_string(),
1083 ));
1084 }
1085 if scale > precision {
1086 return Err(ArrowError::ParseError(format!(
1087 "Decimal has invalid scale > precision: scale={scale}, precision={precision}"
1088 )));
1089 }
1090 if precision > DECIMAL256_MAX_PRECISION as usize {
1091 return Err(ArrowError::ParseError(format!(
1092 "Decimal precision {precision} exceeds maximum supported by Arrow ({})",
1093 DECIMAL256_MAX_PRECISION
1094 )));
1095 }
1096 if let Some(sz) = size {
1097 let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
1098 ArrowError::ParseError(format!(
1099 "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
1100 ))
1101 })?;
1102 if precision > max_p {
1103 return Err(ArrowError::ParseError(format!(
1104 "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
1105 )));
1106 }
1107 }
1108 Ok((precision, scale, size))
1109}
1110
1111#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
1112#[strum(serialize_all = "snake_case")]
1113enum UnionFieldKind {
1114 Null,
1115 Boolean,
1116 Int,
1117 Long,
1118 Float,
1119 Double,
1120 Bytes,
1121 String,
1122 Date,
1123 TimeMillis,
1124 TimeMicros,
1125 TimestampMillisUtc,
1126 TimestampMillisLocal,
1127 TimestampMicrosUtc,
1128 TimestampMicrosLocal,
1129 TimestampNanosUtc,
1130 TimestampNanosLocal,
1131 Duration,
1132 Fixed,
1133 Decimal,
1134 Enum,
1135 Array,
1136 Record,
1137 Map,
1138 Uuid,
1139 Union,
1140}
1141
1142impl From<&Codec> for UnionFieldKind {
1143 fn from(c: &Codec) -> Self {
1144 match c {
1145 Codec::Null => Self::Null,
1146 Codec::Boolean => Self::Boolean,
1147 Codec::Int32 => Self::Int,
1148 Codec::Int64 => Self::Long,
1149 Codec::Float32 => Self::Float,
1150 Codec::Float64 => Self::Double,
1151 Codec::Binary => Self::Bytes,
1152 Codec::Utf8 | Codec::Utf8View => Self::String,
1153 Codec::Date32 => Self::Date,
1154 Codec::TimeMillis => Self::TimeMillis,
1155 Codec::TimeMicros => Self::TimeMicros,
1156 Codec::TimestampMillis(Some(Tz::OffsetZero)) => Self::TimestampMillisUtc,
1157 Codec::TimestampMillis(Some(Tz::Utc)) => Self::TimestampMillisUtc,
1158 Codec::TimestampMillis(None) => Self::TimestampMillisLocal,
1159 Codec::TimestampMicros(Some(Tz::OffsetZero)) => Self::TimestampMicrosUtc,
1160 Codec::TimestampMicros(Some(Tz::Utc)) => Self::TimestampMicrosUtc,
1161 Codec::TimestampMicros(None) => Self::TimestampMicrosLocal,
1162 Codec::TimestampNanos(Some(Tz::OffsetZero)) => Self::TimestampNanosUtc,
1163 Codec::TimestampNanos(Some(Tz::Utc)) => Self::TimestampNanosUtc,
1164 Codec::TimestampNanos(None) => Self::TimestampNanosLocal,
1165 Codec::Interval => Self::Duration,
1166 Codec::Fixed(_) => Self::Fixed,
1167 Codec::Decimal(..) => Self::Decimal,
1168 Codec::Enum(_) => Self::Enum,
1169 Codec::List(_) => Self::Array,
1170 Codec::Struct(_) => Self::Record,
1171 Codec::Map(_) => Self::Map,
1172 Codec::Uuid => Self::Uuid,
1173 Codec::Union(..) => Self::Union,
1174 #[cfg(feature = "avro_custom_types")]
1175 Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()),
1176 #[cfg(feature = "avro_custom_types")]
1177 Codec::DurationNanos
1178 | Codec::DurationMicros
1179 | Codec::DurationMillis
1180 | Codec::DurationSeconds => Self::Duration,
1181 #[cfg(feature = "avro_custom_types")]
1182 Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 => Self::Int,
1183 #[cfg(feature = "avro_custom_types")]
1184 Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
1185 Self::Long
1186 }
1187 #[cfg(feature = "avro_custom_types")]
1188 Codec::Time32Secs => Self::TimeMillis, #[cfg(feature = "avro_custom_types")]
1190 Codec::UInt64
1191 | Codec::Float16
1192 | Codec::IntervalYearMonth
1193 | Codec::IntervalMonthDayNano
1194 | Codec::IntervalDayTime => Self::Fixed,
1195 }
1196 }
1197}
1198
1199fn union_branch_name(dt: &AvroDataType) -> String {
1200 if let Some(name) = dt.metadata.get(AVRO_NAME_METADATA_KEY) {
1201 if name.contains(".") {
1202 return name.to_string();
1204 }
1205 if let Some(ns) = dt.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1206 return format!("{ns}.{name}");
1207 }
1208 return name.to_string();
1209 }
1210 dt.codec.union_field_name()
1211}
1212
1213fn build_union_fields(encodings: &[AvroDataType]) -> Result<UnionFields, ArrowError> {
1214 let arrow_fields: Vec<Field> = encodings
1215 .iter()
1216 .map(|encoding| encoding.field_with_name(&union_branch_name(encoding)))
1217 .collect();
1218 let type_ids: Vec<i8> = (0..arrow_fields.len()).map(|i| i as i8).collect();
1219 UnionFields::try_new(type_ids, arrow_fields)
1220}
1221
1222#[derive(Debug, Default)]
1226struct Resolver<'a> {
1227 map: HashMap<(&'a str, &'a str), AvroDataType>,
1228}
1229
1230impl<'a> Resolver<'a> {
1231 fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
1232 self.map.insert((namespace.unwrap_or(""), name), schema);
1233 }
1234
1235 fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
1236 let (namespace, name) = name
1237 .rsplit_once('.')
1238 .unwrap_or_else(|| (namespace.unwrap_or(""), name));
1239 self.map
1240 .get(&(namespace, name))
1241 .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
1242 .cloned()
1243 }
1244}
1245
1246fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet<String> {
1247 let mut out = HashSet::with_capacity(1 + aliases.len());
1248 let (full, _) = make_full_name(name, ns, None);
1249 out.insert(full);
1250 for a in aliases {
1251 let (fa, _) = make_full_name(a, None, ns);
1252 out.insert(fa);
1253 }
1254 out
1255}
1256
1257fn names_match(
1258 writer_name: &str,
1259 writer_namespace: Option<&str>,
1260 writer_aliases: &[&str],
1261 reader_name: &str,
1262 reader_namespace: Option<&str>,
1263 reader_aliases: &[&str],
1264) -> bool {
1265 let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases);
1266 let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases);
1267 !writer_set.is_disjoint(&reader_set)
1269}
1270
1271fn ensure_names_match(
1272 data_type: &str,
1273 writer_name: &str,
1274 writer_namespace: Option<&str>,
1275 writer_aliases: &[&str],
1276 reader_name: &str,
1277 reader_namespace: Option<&str>,
1278 reader_aliases: &[&str],
1279) -> Result<(), ArrowError> {
1280 if names_match(
1281 writer_name,
1282 writer_namespace,
1283 writer_aliases,
1284 reader_name,
1285 reader_namespace,
1286 reader_aliases,
1287 ) {
1288 Ok(())
1289 } else {
1290 Err(ArrowError::ParseError(format!(
1291 "{data_type} name mismatch writer={writer_name}, reader={reader_name}"
1292 )))
1293 }
1294}
1295
1296fn primitive_of(schema: &Schema) -> Option<PrimitiveType> {
1297 match schema {
1298 Schema::TypeName(TypeName::Primitive(primitive)) => Some(*primitive),
1299 Schema::Type(Type {
1300 r#type: TypeName::Primitive(primitive),
1301 ..
1302 }) => Some(*primitive),
1303 _ => None,
1304 }
1305}
1306
1307fn nullable_union_variants<'x, 'y>(
1308 variant: &'y [Schema<'x>],
1309) -> Option<(Nullability, &'y Schema<'x>)> {
1310 if variant.len() != 2 {
1311 return None;
1312 }
1313 let is_null = |schema: &Schema<'x>| {
1314 matches!(
1315 schema,
1316 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))
1317 )
1318 };
1319 match (is_null(&variant[0]), is_null(&variant[1])) {
1320 (true, false) => Some((Nullability::NullFirst, &variant[1])),
1321 (false, true) => Some((Nullability::NullSecond, &variant[0])),
1322 _ => None,
1323 }
1324}
1325
1326#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1327enum UnionBranchKey {
1328 Named(String),
1329 Primitive(PrimitiveType),
1330 Array,
1331 Map,
1332}
1333
1334fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> Option<UnionBranchKey> {
1335 let (name, namespace) = match s {
1336 Schema::TypeName(TypeName::Primitive(p))
1337 | Schema::Type(Type {
1338 r#type: TypeName::Primitive(p),
1339 ..
1340 }) => return Some(UnionBranchKey::Primitive(*p)),
1341 Schema::TypeName(TypeName::Ref(name))
1342 | Schema::Type(Type {
1343 r#type: TypeName::Ref(name),
1344 ..
1345 }) => (name, None),
1346 Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
1347 Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
1348 Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
1349 Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
1350 Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
1351 Schema::Union(_) => return None,
1352 };
1353 let (full, _) = make_full_name(name, namespace, enclosing_ns);
1354 Some(UnionBranchKey::Named(full))
1355}
1356
1357fn union_first_duplicate<'a>(
1358 branches: &'a [Schema<'a>],
1359 enclosing_ns: Option<&'a str>,
1360) -> Option<String> {
1361 let mut seen = HashSet::with_capacity(branches.len());
1362 for schema in branches {
1363 if let Some(key) = branch_key_of(schema, enclosing_ns) {
1364 if !seen.insert(key.clone()) {
1365 let msg = match key {
1366 UnionBranchKey::Named(full) => format!("named type {full}"),
1367 UnionBranchKey::Primitive(p) => format!("primitive {}", p.as_ref()),
1368 UnionBranchKey::Array => "array".to_string(),
1369 UnionBranchKey::Map => "map".to_string(),
1370 };
1371 return Some(msg);
1372 }
1373 }
1374 }
1375 None
1376}
1377
1378struct Maker<'a> {
1382 resolver: Resolver<'a>,
1383 use_utf8view: bool,
1384 strict_mode: bool,
1385 tz: Tz,
1386}
1387
1388impl<'a> Maker<'a> {
1389 fn new(use_utf8view: bool, strict_mode: bool, tz: Tz) -> Self {
1390 Self {
1391 resolver: Default::default(),
1392 use_utf8view,
1393 strict_mode,
1394 tz,
1395 }
1396 }
1397
1398 #[cfg(feature = "avro_custom_types")]
1399 #[inline]
1400 fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) {
1401 if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() {
1402 let mut inner = (*values).clone();
1403 inner.nullability = Some(nb);
1404 dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits);
1405 }
1406 }
1407
1408 fn make_data_type<'s>(
1409 &mut self,
1410 writer_schema: &'s Schema<'a>,
1411 reader_schema: Option<&'s Schema<'a>>,
1412 namespace: Option<&'a str>,
1413 ) -> Result<AvroDataType, ArrowError> {
1414 match reader_schema {
1415 Some(reader_schema) => self.resolve_type(writer_schema, reader_schema, namespace),
1416 None => self.parse_type(writer_schema, namespace),
1417 }
1418 }
1419
1420 fn parse_type<'s>(
1433 &mut self,
1434 schema: &'s Schema<'a>,
1435 namespace: Option<&'a str>,
1436 ) -> Result<AvroDataType, ArrowError> {
1437 match schema {
1438 Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType::new(
1439 Codec::from(*p).with_utf8view(self.use_utf8view),
1440 Default::default(),
1441 None,
1442 )),
1443 Schema::TypeName(TypeName::Ref(name)) => self.resolver.resolve(name, namespace),
1444 Schema::Union(f) => {
1445 let null = f
1446 .iter()
1447 .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
1448 match (f.len() == 2, null) {
1449 (true, Some(0)) => {
1450 let mut field = self.parse_type(&f[1], namespace)?;
1451 field.nullability = Some(Nullability::NullFirst);
1452 #[cfg(feature = "avro_custom_types")]
1453 Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst);
1454 return Ok(field);
1455 }
1456 (true, Some(1)) => {
1457 if self.strict_mode {
1458 return Err(ArrowError::SchemaError(
1459 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
1460 .to_string(),
1461 ));
1462 }
1463 let mut field = self.parse_type(&f[0], namespace)?;
1464 field.nullability = Some(Nullability::NullSecond);
1465 #[cfg(feature = "avro_custom_types")]
1466 Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond);
1467 return Ok(field);
1468 }
1469 _ => {}
1470 }
1471 if f.iter().any(|s| matches!(s, Schema::Union(_))) {
1473 return Err(ArrowError::SchemaError(
1474 "Avro unions may not immediately contain other unions".to_string(),
1475 ));
1476 }
1477 if let Some(dup) = union_first_duplicate(f, namespace) {
1479 return Err(ArrowError::SchemaError(format!(
1480 "Avro union contains duplicate branch type: {dup}"
1481 )));
1482 }
1483 let children: Vec<AvroDataType> = f
1485 .iter()
1486 .map(|s| self.parse_type(s, namespace))
1487 .collect::<Result<_, _>>()?;
1488 let union_fields = build_union_fields(&children)?;
1490 Ok(AvroDataType::new(
1491 Codec::Union(Arc::from(children), union_fields, UnionMode::Dense),
1492 Default::default(),
1493 None,
1494 ))
1495 }
1496 Schema::Complex(c) => match c {
1497 ComplexType::Record(r) => {
1498 let namespace = r.namespace.or(namespace);
1499 let mut metadata = r.attributes.field_metadata();
1500 let fields = r
1501 .fields
1502 .iter()
1503 .map(|field| {
1504 Ok(AvroField {
1505 name: field.name.to_string(),
1506 data_type: self.parse_type(&field.r#type, namespace)?,
1507 })
1508 })
1509 .collect::<Result<_, ArrowError>>()?;
1510 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), r.name.to_string());
1511 if let Some(ns) = namespace {
1512 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1513 }
1514 let field = AvroDataType {
1515 nullability: None,
1516 codec: Codec::Struct(fields),
1517 metadata,
1518 resolution: None,
1519 };
1520 self.resolver.register(r.name, namespace, field.clone());
1521 Ok(field)
1522 }
1523 ComplexType::Array(a) => {
1524 let field = self.parse_type(a.items.as_ref(), namespace)?;
1525 Ok(AvroDataType {
1526 nullability: None,
1527 metadata: a.attributes.field_metadata(),
1528 codec: Codec::List(Arc::new(field)),
1529 resolution: None,
1530 })
1531 }
1532 ComplexType::Fixed(f) => {
1533 let size = f.size.try_into().map_err(|e| {
1534 ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
1535 })?;
1536 let namespace = f.namespace.or(namespace);
1537 let mut metadata = f.attributes.field_metadata();
1538 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), f.name.to_string());
1539 if let Some(ns) = namespace {
1540 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1541 }
1542 let field = match f.attributes.logical_type {
1543 Some("decimal") => {
1544 let (precision, scale, _) =
1545 parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
1546 AvroDataType {
1547 nullability: None,
1548 metadata,
1549 codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
1550 resolution: None,
1551 }
1552 }
1553 Some("duration") => {
1554 if size != 12 {
1555 return Err(ArrowError::ParseError(format!(
1556 "Invalid fixed size for Duration: {size}, must be 12"
1557 )));
1558 };
1559 AvroDataType {
1560 nullability: None,
1561 metadata,
1562 codec: Codec::Interval,
1563 resolution: None,
1564 }
1565 }
1566 #[cfg(feature = "avro_custom_types")]
1567 Some("arrow.uint64") if size == 8 => AvroDataType {
1568 nullability: None,
1569 metadata,
1570 codec: Codec::UInt64,
1571 resolution: None,
1572 },
1573 #[cfg(feature = "avro_custom_types")]
1574 Some("arrow.float16") if size == 2 => AvroDataType {
1575 nullability: None,
1576 metadata,
1577 codec: Codec::Float16,
1578 resolution: None,
1579 },
1580 #[cfg(feature = "avro_custom_types")]
1581 Some("arrow.interval-year-month") if size == 4 => AvroDataType {
1582 nullability: None,
1583 metadata,
1584 codec: Codec::IntervalYearMonth,
1585 resolution: None,
1586 },
1587 #[cfg(feature = "avro_custom_types")]
1588 Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1589 nullability: None,
1590 metadata,
1591 codec: Codec::IntervalMonthDayNano,
1592 resolution: None,
1593 },
1594 #[cfg(feature = "avro_custom_types")]
1595 Some("arrow.interval-day-time") if size == 8 => AvroDataType {
1596 nullability: None,
1597 metadata,
1598 codec: Codec::IntervalDayTime,
1599 resolution: None,
1600 },
1601 _ => AvroDataType {
1602 nullability: None,
1603 metadata,
1604 codec: Codec::Fixed(size),
1605 resolution: None,
1606 },
1607 };
1608 self.resolver.register(f.name, namespace, field.clone());
1609 Ok(field)
1610 }
1611 ComplexType::Enum(e) => {
1612 let namespace = e.namespace.or(namespace);
1613 let symbols = e
1614 .symbols
1615 .iter()
1616 .map(|s| s.to_string())
1617 .collect::<Arc<[String]>>();
1618 let mut metadata = e.attributes.field_metadata();
1619 let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
1620 ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
1621 })?;
1622 metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), symbols_json);
1623 metadata.insert(AVRO_NAME_METADATA_KEY.to_string(), e.name.to_string());
1624 if let Some(ns) = namespace {
1625 metadata.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
1626 }
1627 let field = AvroDataType {
1628 nullability: None,
1629 metadata,
1630 codec: Codec::Enum(symbols),
1631 resolution: None,
1632 };
1633 self.resolver.register(e.name, namespace, field.clone());
1634 Ok(field)
1635 }
1636 ComplexType::Map(m) => {
1637 let val = self.parse_type(&m.values, namespace)?;
1638 Ok(AvroDataType {
1639 nullability: None,
1640 metadata: m.attributes.field_metadata(),
1641 codec: Codec::Map(Arc::new(val)),
1642 resolution: None,
1643 })
1644 }
1645 },
1646 Schema::Type(t) => {
1647 let mut field = self.parse_type(&Schema::TypeName(t.r#type.clone()), namespace)?;
1648 match (t.attributes.logical_type, &mut field.codec) {
1650 (Some("decimal"), c @ Codec::Binary) => {
1651 let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
1652 *c = Codec::Decimal(prec, Some(sc), None);
1653 }
1654 (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
1655 (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
1656 (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
1657 (Some("timestamp-millis"), c @ Codec::Int64) => {
1658 *c = Codec::TimestampMillis(Some(self.tz))
1659 }
1660 (Some("timestamp-micros"), c @ Codec::Int64) => {
1661 *c = Codec::TimestampMicros(Some(self.tz))
1662 }
1663 (Some("local-timestamp-millis"), c @ Codec::Int64) => {
1664 *c = Codec::TimestampMillis(None)
1665 }
1666 (Some("local-timestamp-micros"), c @ Codec::Int64) => {
1667 *c = Codec::TimestampMicros(None)
1668 }
1669 (Some("timestamp-nanos"), c @ Codec::Int64) => {
1670 *c = Codec::TimestampNanos(Some(self.tz))
1671 }
1672 (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
1673 *c = Codec::TimestampNanos(None)
1674 }
1675 (Some("uuid"), c @ Codec::Utf8) => {
1676 *c = Codec::Uuid;
1680 field.metadata.insert("logicalType".into(), "uuid".into());
1681 }
1682 #[cfg(feature = "avro_custom_types")]
1683 (Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1684 #[cfg(feature = "avro_custom_types")]
1685 (Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1686 #[cfg(feature = "avro_custom_types")]
1687 (Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1688 #[cfg(feature = "avro_custom_types")]
1689 (Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1690 *c = Codec::DurationSeconds
1691 }
1692 #[cfg(feature = "avro_custom_types")]
1693 (Some("arrow.run-end-encoded"), _) => {
1694 let bits_u8: u8 = t
1695 .attributes
1696 .additional
1697 .get("arrow.runEndIndexBits")
1698 .and_then(|v| v.as_u64())
1699 .and_then(|n| u8::try_from(n).ok())
1700 .ok_or_else(|| ArrowError::ParseError(
1701 "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)"
1702 .to_string(),
1703 ))?;
1704 if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 {
1705 return Err(ArrowError::ParseError(format!(
1706 "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64"
1707 )));
1708 }
1709 let values_site = field.clone();
1711 field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8);
1712 }
1713 #[cfg(feature = "avro_custom_types")]
1715 (Some("arrow.int8"), c @ Codec::Int32) => *c = Codec::Int8,
1716 #[cfg(feature = "avro_custom_types")]
1717 (Some("arrow.int16"), c @ Codec::Int32) => *c = Codec::Int16,
1718 #[cfg(feature = "avro_custom_types")]
1719 (Some("arrow.uint8"), c @ Codec::Int32) => *c = Codec::UInt8,
1720 #[cfg(feature = "avro_custom_types")]
1721 (Some("arrow.uint16"), c @ Codec::Int32) => *c = Codec::UInt16,
1722 #[cfg(feature = "avro_custom_types")]
1723 (Some("arrow.uint32"), c @ Codec::Int64) => *c = Codec::UInt32,
1724 #[cfg(feature = "avro_custom_types")]
1725 (Some("arrow.uint64"), c @ Codec::Fixed(8)) => *c = Codec::UInt64,
1726 #[cfg(feature = "avro_custom_types")]
1728 (Some("arrow.float16"), c @ Codec::Fixed(2)) => *c = Codec::Float16,
1729 #[cfg(feature = "avro_custom_types")]
1731 (Some("arrow.date64"), c @ Codec::Int64) => *c = Codec::Date64,
1732 #[cfg(feature = "avro_custom_types")]
1734 (Some("arrow.time64-nanosecond"), c @ Codec::Int64) => *c = Codec::TimeNanos,
1735 #[cfg(feature = "avro_custom_types")]
1736 (Some("arrow.time32-second"), c @ Codec::Int32) => *c = Codec::Time32Secs,
1737 #[cfg(feature = "avro_custom_types")]
1738 (Some("arrow.timestamp-second"), c @ Codec::Int64) => {
1739 *c = Codec::TimestampSecs(true)
1740 }
1741 #[cfg(feature = "avro_custom_types")]
1742 (Some("arrow.local-timestamp-second"), c @ Codec::Int64) => {
1743 *c = Codec::TimestampSecs(false)
1744 }
1745 #[cfg(feature = "avro_custom_types")]
1747 (Some("arrow.interval-year-month"), c @ Codec::Fixed(4)) => {
1748 *c = Codec::IntervalYearMonth
1749 }
1750 #[cfg(feature = "avro_custom_types")]
1751 (Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1752 *c = Codec::IntervalMonthDayNano
1753 }
1754 #[cfg(feature = "avro_custom_types")]
1755 (Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
1756 *c = Codec::IntervalDayTime
1757 }
1758 (Some(logical), _) => {
1759 field.metadata.insert("logicalType".into(), logical.into());
1761 }
1762 (None, _) => {}
1763 }
1764 if matches!(field.codec, Codec::Int64) {
1765 if let Some(unit) = t
1766 .attributes
1767 .additional
1768 .get("arrowTimeUnit")
1769 .and_then(|v| v.as_str())
1770 {
1771 if unit == "nanosecond" {
1772 field.codec = Codec::TimestampNanos(Some(self.tz));
1773 }
1774 }
1775 }
1776 if !t.attributes.additional.is_empty() {
1777 for (k, v) in &t.attributes.additional {
1778 field.metadata.insert(k.to_string(), v.to_string());
1779 }
1780 }
1781 Ok(field)
1782 }
1783 }
1784 }
1785
1786 fn resolve_type<'s>(
1787 &mut self,
1788 writer_schema: &'s Schema<'a>,
1789 reader_schema: &'s Schema<'a>,
1790 namespace: Option<&'a str>,
1791 ) -> Result<AvroDataType, ArrowError> {
1792 if let (Some(write_primitive), Some(read_primitive)) =
1793 (primitive_of(writer_schema), primitive_of(reader_schema))
1794 {
1795 return self.resolve_primitives(write_primitive, read_primitive, reader_schema);
1796 }
1797 match (writer_schema, reader_schema) {
1798 (Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
1799 let writer_variants = writer_variants.as_slice();
1800 let reader_variants = reader_variants.as_slice();
1801 match (
1802 nullable_union_variants(writer_variants),
1803 nullable_union_variants(reader_variants),
1804 ) {
1805 (Some((w_nb, w_nonnull)), Some((r_nb, r_nonnull))) => {
1806 let mut dt = self.resolve_type(w_nonnull, r_nonnull, namespace)?;
1807 let mut writer_to_reader = vec![None, None];
1808 writer_to_reader[w_nb.non_null_index()] = Some((
1809 r_nb.non_null_index(),
1810 dt.resolution
1811 .take()
1812 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct)),
1813 ));
1814 dt.nullability = Some(w_nb);
1815 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1816 writer_to_reader: Arc::from(writer_to_reader),
1817 writer_is_union: true,
1818 reader_is_union: true,
1819 }));
1820 #[cfg(feature = "avro_custom_types")]
1821 Self::propagate_nullability_into_ree(&mut dt, w_nb);
1822 Ok(dt)
1823 }
1824 _ => self.resolve_unions(writer_variants, reader_variants, namespace),
1825 }
1826 }
1827 (Schema::Union(writer_variants), reader_non_union) => {
1828 let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1829 .iter()
1830 .map(|writer| {
1831 self.resolve_type(writer, reader_non_union, namespace)
1832 .ok()
1833 .map(|tmp| {
1834 let resolution = tmp
1835 .resolution
1836 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1837 (0usize, resolution)
1838 })
1839 })
1840 .collect();
1841 let mut dt = self.parse_type(reader_non_union, namespace)?;
1842 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1843 writer_to_reader: Arc::from(writer_to_reader),
1844 writer_is_union: true,
1845 reader_is_union: false,
1846 }));
1847 Ok(dt)
1848 }
1849 (writer_non_union, Schema::Union(reader_variants)) => {
1850 if let Some((nullability, non_null_branch)) =
1851 nullable_union_variants(reader_variants)
1852 {
1853 let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1854 #[cfg(feature = "avro_custom_types")]
1855 Self::propagate_nullability_into_ree(&mut dt, nullability);
1856 dt.nullability = Some(nullability);
1857 if dt.resolution.is_none() {
1860 dt.resolution = Some(ResolutionInfo::Promotion(Promotion::Direct));
1861 }
1862 Ok(dt)
1863 } else {
1864 let Some((match_idx, mut match_dt)) =
1865 self.find_best_union_match(writer_non_union, reader_variants, namespace)
1866 else {
1867 return Err(ArrowError::SchemaError(
1868 "Writer schema does not match any reader union branch".to_string(),
1869 ));
1870 };
1871 let resolution = match_dt
1877 .resolution
1878 .take()
1879 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1880 let mut match_dt = Some(match_dt);
1881 let children = reader_variants
1882 .iter()
1883 .enumerate()
1884 .map(|(idx, variant)| {
1885 if idx == match_idx {
1886 Ok(match_dt.take().unwrap())
1887 } else {
1888 self.parse_type(variant, namespace)
1889 }
1890 })
1891 .collect::<Result<Vec<_>, _>>()?;
1892 let union_fields = build_union_fields(&children)?;
1893 let mut dt = AvroDataType::new(
1894 Codec::Union(children.into(), union_fields, UnionMode::Dense),
1895 Default::default(),
1896 None,
1897 );
1898 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1899 writer_to_reader: Arc::from(vec![Some((match_idx, resolution))]),
1900 writer_is_union: false,
1901 reader_is_union: true,
1902 }));
1903 Ok(dt)
1904 }
1905 }
1906 (
1907 Schema::Complex(ComplexType::Array(writer_array)),
1908 Schema::Complex(ComplexType::Array(reader_array)),
1909 ) => self.resolve_array(writer_array, reader_array, namespace),
1910 (
1911 Schema::Complex(ComplexType::Map(writer_map)),
1912 Schema::Complex(ComplexType::Map(reader_map)),
1913 ) => self.resolve_map(writer_map, reader_map, namespace),
1914 (
1915 Schema::Complex(ComplexType::Fixed(writer_fixed)),
1916 Schema::Complex(ComplexType::Fixed(reader_fixed)),
1917 ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace),
1918 (
1919 Schema::Complex(ComplexType::Record(writer_record)),
1920 Schema::Complex(ComplexType::Record(reader_record)),
1921 ) => self.resolve_records(writer_record, reader_record, namespace),
1922 (
1923 Schema::Complex(ComplexType::Enum(writer_enum)),
1924 Schema::Complex(ComplexType::Enum(reader_enum)),
1925 ) => self.resolve_enums(writer_enum, reader_enum, reader_schema, namespace),
1926 (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace),
1927 (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace),
1928 _ => Err(ArrowError::NotYetImplemented(
1929 "Other resolutions not yet implemented".to_string(),
1930 )),
1931 }
1932 }
1933
1934 fn find_best_union_match(
1935 &mut self,
1936 writer: &Schema<'a>,
1937 reader_variants: &[Schema<'a>],
1938 namespace: Option<&'a str>,
1939 ) -> Option<(usize, AvroDataType)> {
1940 let mut first_resolution = None;
1941 for (reader_index, reader) in reader_variants.iter().enumerate() {
1942 if let Ok(dt) = self.resolve_type(writer, reader, namespace) {
1943 match &dt.resolution {
1944 None | Some(ResolutionInfo::Promotion(Promotion::Direct)) => {
1945 return Some((reader_index, dt));
1947 }
1948 Some(_) => {
1949 if first_resolution.is_none() {
1950 first_resolution = Some((reader_index, dt));
1952 }
1953 }
1954 };
1955 }
1956 }
1957 first_resolution
1958 }
1959
1960 fn resolve_unions<'s>(
1961 &mut self,
1962 writer_variants: &'s [Schema<'a>],
1963 reader_variants: &'s [Schema<'a>],
1964 namespace: Option<&'a str>,
1965 ) -> Result<AvroDataType, ArrowError> {
1966 let mut resolved_reader_encodings = HashMap::new();
1967 let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> = writer_variants
1968 .iter()
1969 .map(|writer| {
1970 self.find_best_union_match(writer, reader_variants, namespace)
1971 .map(|(match_idx, mut match_dt)| {
1972 let resolution = match_dt
1973 .resolution
1974 .take()
1975 .unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
1976 resolved_reader_encodings.insert(match_idx, match_dt);
1979 (match_idx, resolution)
1980 })
1981 })
1982 .collect();
1983 let reader_encodings: Vec<AvroDataType> = reader_variants
1984 .iter()
1985 .enumerate()
1986 .map(|(reader_idx, reader_schema)| {
1987 if let Some(resolved) = resolved_reader_encodings.remove(&reader_idx) {
1988 Ok(resolved)
1989 } else {
1990 self.parse_type(reader_schema, namespace)
1991 }
1992 })
1993 .collect::<Result<_, _>>()?;
1994 let union_fields = build_union_fields(&reader_encodings)?;
1995 let mut dt = AvroDataType::new(
1996 Codec::Union(reader_encodings.into(), union_fields, UnionMode::Dense),
1997 Default::default(),
1998 None,
1999 );
2000 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
2001 writer_to_reader: Arc::from(writer_to_reader),
2002 writer_is_union: true,
2003 reader_is_union: true,
2004 }));
2005 Ok(dt)
2006 }
2007
2008 fn resolve_array(
2009 &mut self,
2010 writer_array: &Array<'a>,
2011 reader_array: &Array<'a>,
2012 namespace: Option<&'a str>,
2013 ) -> Result<AvroDataType, ArrowError> {
2014 Ok(AvroDataType {
2015 nullability: None,
2016 metadata: reader_array.attributes.field_metadata(),
2017 codec: Codec::List(Arc::new(self.make_data_type(
2018 writer_array.items.as_ref(),
2019 Some(reader_array.items.as_ref()),
2020 namespace,
2021 )?)),
2022 resolution: None,
2023 })
2024 }
2025
2026 fn resolve_map(
2027 &mut self,
2028 writer_map: &Map<'a>,
2029 reader_map: &Map<'a>,
2030 namespace: Option<&'a str>,
2031 ) -> Result<AvroDataType, ArrowError> {
2032 Ok(AvroDataType {
2033 nullability: None,
2034 metadata: reader_map.attributes.field_metadata(),
2035 codec: Codec::Map(Arc::new(self.make_data_type(
2036 &writer_map.values,
2037 Some(&reader_map.values),
2038 namespace,
2039 )?)),
2040 resolution: None,
2041 })
2042 }
2043
2044 fn resolve_fixed<'s>(
2045 &mut self,
2046 writer_fixed: &Fixed<'a>,
2047 reader_fixed: &Fixed<'a>,
2048 reader_schema: &'s Schema<'a>,
2049 namespace: Option<&'a str>,
2050 ) -> Result<AvroDataType, ArrowError> {
2051 ensure_names_match(
2052 "Fixed",
2053 writer_fixed.name,
2054 writer_fixed.namespace,
2055 &writer_fixed.aliases,
2056 reader_fixed.name,
2057 reader_fixed.namespace,
2058 &reader_fixed.aliases,
2059 )?;
2060 if writer_fixed.size != reader_fixed.size {
2061 return Err(ArrowError::SchemaError(format!(
2062 "Fixed size mismatch for {}: writer={}, reader={}",
2063 reader_fixed.name, writer_fixed.size, reader_fixed.size
2064 )));
2065 }
2066 self.parse_type(reader_schema, namespace)
2067 }
2068
2069 fn resolve_primitives(
2070 &mut self,
2071 write_primitive: PrimitiveType,
2072 read_primitive: PrimitiveType,
2073 reader_schema: &Schema<'a>,
2074 ) -> Result<AvroDataType, ArrowError> {
2075 if write_primitive == read_primitive {
2076 return self.parse_type(reader_schema, None);
2077 }
2078 let promotion = match (write_primitive, read_primitive) {
2079 (PrimitiveType::Int, PrimitiveType::Long) => Promotion::IntToLong,
2080 (PrimitiveType::Int, PrimitiveType::Float) => Promotion::IntToFloat,
2081 (PrimitiveType::Int, PrimitiveType::Double) => Promotion::IntToDouble,
2082 (PrimitiveType::Long, PrimitiveType::Float) => Promotion::LongToFloat,
2083 (PrimitiveType::Long, PrimitiveType::Double) => Promotion::LongToDouble,
2084 (PrimitiveType::Float, PrimitiveType::Double) => Promotion::FloatToDouble,
2085 (PrimitiveType::String, PrimitiveType::Bytes) => Promotion::StringToBytes,
2086 (PrimitiveType::Bytes, PrimitiveType::String) => Promotion::BytesToString,
2087 _ => {
2088 return Err(ArrowError::ParseError(format!(
2089 "Illegal promotion {write_primitive:?} to {read_primitive:?}"
2090 )));
2091 }
2092 };
2093 let mut datatype = self.parse_type(reader_schema, None)?;
2094 datatype.resolution = Some(ResolutionInfo::Promotion(promotion));
2095 Ok(datatype)
2096 }
2097
2098 fn resolve_enums(
2154 &mut self,
2155 writer_enum: &Enum<'a>,
2156 reader_enum: &Enum<'a>,
2157 reader_schema: &Schema<'a>,
2158 namespace: Option<&'a str>,
2159 ) -> Result<AvroDataType, ArrowError> {
2160 ensure_names_match(
2161 "Enum",
2162 writer_enum.name,
2163 writer_enum.namespace,
2164 &writer_enum.aliases,
2165 reader_enum.name,
2166 reader_enum.namespace,
2167 &reader_enum.aliases,
2168 )?;
2169 if writer_enum.symbols == reader_enum.symbols {
2170 return self.parse_type(reader_schema, namespace);
2171 }
2172 let reader_index: HashMap<&str, i32> = reader_enum
2173 .symbols
2174 .iter()
2175 .enumerate()
2176 .map(|(index, &symbol)| (symbol, index as i32))
2177 .collect();
2178 let default_index: i32 = match reader_enum.default {
2179 Some(symbol) => *reader_index.get(symbol).ok_or_else(|| {
2180 ArrowError::SchemaError(format!(
2181 "Reader enum '{}' default symbol '{symbol}' not found in symbols list",
2182 reader_enum.name,
2183 ))
2184 })?,
2185 None => -1,
2186 };
2187 let mapping: Vec<i32> = writer_enum
2188 .symbols
2189 .iter()
2190 .map(|&write_symbol| {
2191 reader_index
2192 .get(write_symbol)
2193 .copied()
2194 .unwrap_or(default_index)
2195 })
2196 .collect();
2197 if self.strict_mode && mapping.iter().any(|&m| m < 0) {
2198 return Err(ArrowError::SchemaError(format!(
2199 "Reader enum '{}' does not cover all writer symbols and no default is provided",
2200 reader_enum.name
2201 )));
2202 }
2203 let mut dt = self.parse_type(reader_schema, namespace)?;
2204 dt.resolution = Some(ResolutionInfo::EnumMapping(EnumMapping {
2205 mapping: Arc::from(mapping),
2206 default_index,
2207 }));
2208 let reader_ns = reader_enum.namespace.or(namespace);
2209 self.resolver
2210 .register(reader_enum.name, reader_ns, dt.clone());
2211 Ok(dt)
2212 }
2213
2214 #[inline]
2215 fn build_writer_lookup(
2216 writer_record: &Record<'a>,
2217 ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) {
2218 let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2);
2219 for (idx, wf) in writer_record.fields.iter().enumerate() {
2220 map.insert(wf.name, idx);
2222 }
2223 let mut ambiguous: HashSet<&str> = HashSet::new();
2225 for (idx, wf) in writer_record.fields.iter().enumerate() {
2226 for &alias in &wf.aliases {
2227 match map.entry(alias) {
2228 Entry::Occupied(e) if *e.get() != idx => {
2229 ambiguous.insert(alias);
2230 }
2231 Entry::Vacant(e) => {
2232 e.insert(idx);
2233 }
2234 _ => {}
2235 }
2236 }
2237 }
2238 (map, ambiguous)
2239 }
2240
2241 fn resolve_records(
2242 &mut self,
2243 writer_record: &Record<'a>,
2244 reader_record: &Record<'a>,
2245 namespace: Option<&'a str>,
2246 ) -> Result<AvroDataType, ArrowError> {
2247 ensure_names_match(
2248 "Record",
2249 writer_record.name,
2250 writer_record.namespace,
2251 &writer_record.aliases,
2252 reader_record.name,
2253 reader_record.namespace,
2254 &reader_record.aliases,
2255 )?;
2256 let writer_ns = writer_record.namespace.or(namespace);
2257 let reader_ns = reader_record.namespace.or(namespace);
2258 let mut reader_md = reader_record.attributes.field_metadata();
2259 reader_md.insert(
2260 AVRO_NAME_METADATA_KEY.to_string(),
2261 reader_record.name.to_string(),
2262 );
2263 if let Some(ns) = reader_ns {
2264 reader_md.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(), ns.to_string());
2265 }
2266 let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record);
2268 let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
2269 let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
2270 let mut default_fields: Vec<usize> = Vec::new();
2272 for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
2273 let mut match_idx = writer_lookup.get(r_field.name).copied();
2275 let mut matched_via_alias: Option<&str> = None;
2276 if match_idx.is_none() {
2277 for &alias in &r_field.aliases {
2278 if let Some(i) = writer_lookup.get(alias).copied() {
2279 if self.strict_mode && ambiguous_writer_aliases.contains(alias) {
2280 return Err(ArrowError::SchemaError(format!(
2281 "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields",
2282 r_field.name
2283 )));
2284 }
2285 match_idx = Some(i);
2286 matched_via_alias = Some(alias);
2287 break;
2288 }
2289 }
2290 }
2291 if let Some(wi) = match_idx {
2292 if writer_to_reader[wi].is_none() {
2293 let w_schema = &writer_record.fields[wi].r#type;
2294 let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?;
2295 writer_to_reader[wi] = Some(reader_idx);
2296 reader_fields.push(AvroField {
2297 name: r_field.name.to_owned(),
2298 data_type: dt,
2299 });
2300 continue;
2301 } else if self.strict_mode {
2302 let existing_reader = writer_to_reader[wi].unwrap();
2304 let via = matched_via_alias
2305 .map(|a| format!("alias '{a}'"))
2306 .unwrap_or_else(|| "name match".to_string());
2307 return Err(ArrowError::SchemaError(format!(
2308 "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})",
2309 writer_record.fields[wi].name
2310 )));
2311 }
2312 }
2314 let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
2316 if let Some(default_json) = r_field.default.as_ref() {
2317 dt.resolution = Some(ResolutionInfo::DefaultValue(
2318 dt.parse_and_store_default(default_json)?,
2319 ));
2320 default_fields.push(reader_idx);
2321 } else if dt.nullability() == Some(Nullability::NullFirst) {
2322 dt.resolution = Some(ResolutionInfo::DefaultValue(
2324 dt.parse_and_store_default(&Value::Null)?,
2325 ));
2326 default_fields.push(reader_idx);
2327 } else {
2328 return Err(ArrowError::SchemaError(format!(
2329 "Reader field '{}' not present in writer schema must have a default value",
2330 r_field.name
2331 )));
2332 }
2333 reader_fields.push(AvroField {
2334 name: r_field.name.to_owned(),
2335 data_type: dt,
2336 });
2337 }
2338 let writer_fields = writer_record
2340 .fields
2341 .iter()
2342 .enumerate()
2343 .map(|(writer_index, writer_field)| {
2344 if let Some(reader_index) = writer_to_reader[writer_index] {
2345 Ok(ResolvedField::ToReader(reader_index))
2346 } else {
2347 let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
2348 Ok(ResolvedField::Skip(dt))
2349 }
2350 })
2351 .collect::<Result<_, ArrowError>>()?;
2352 let resolved = AvroDataType::new_with_resolution(
2353 Codec::Struct(Arc::from(reader_fields)),
2354 reader_md,
2355 None,
2356 Some(ResolutionInfo::Record(ResolvedRecord {
2357 writer_fields,
2358 default_fields: Arc::from(default_fields),
2359 })),
2360 );
2361 self.resolver
2363 .register(reader_record.name, reader_ns, resolved.clone());
2364 Ok(resolved)
2365 }
2366}
2367
2368#[cfg(test)]
2369mod tests {
2370 use super::*;
2371 use crate::schema::{
2372 AVRO_ROOT_RECORD_DEFAULT_NAME, Array, Attributes, ComplexType, Field as AvroFieldSchema,
2373 Fixed, PrimitiveType, Record, Schema, Type, TypeName,
2374 };
2375 use indexmap::IndexMap;
2376 use serde_json::{self, Value};
2377
2378 fn create_schema_with_logical_type(
2379 primitive_type: PrimitiveType,
2380 logical_type: &'static str,
2381 ) -> Schema<'static> {
2382 let attributes = Attributes {
2383 logical_type: Some(logical_type),
2384 additional: Default::default(),
2385 };
2386
2387 Schema::Type(Type {
2388 r#type: TypeName::Primitive(primitive_type),
2389 attributes,
2390 })
2391 }
2392
2393 fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
2394 let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
2395 let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
2396 let mut maker = Maker::new(false, false, Tz::default());
2397 maker
2398 .make_data_type(&writer_schema, Some(&reader_schema), None)
2399 .expect("promotion should resolve")
2400 }
2401
2402 fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
2403 Schema::TypeName(TypeName::Primitive(pt))
2404 }
2405 fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
2406 Schema::Union(branches)
2407 }
2408
2409 #[test]
2410 fn test_date_logical_type() {
2411 let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
2412
2413 let mut maker = Maker::new(false, false, Tz::default());
2414 let result = maker.make_data_type(&schema, None, None).unwrap();
2415
2416 assert!(matches!(result.codec, Codec::Date32));
2417 }
2418
2419 #[test]
2420 fn test_time_millis_logical_type() {
2421 let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
2422
2423 let mut maker = Maker::new(false, false, Tz::default());
2424 let result = maker.make_data_type(&schema, None, None).unwrap();
2425
2426 assert!(matches!(result.codec, Codec::TimeMillis));
2427 }
2428
2429 #[test]
2430 fn test_time_micros_logical_type() {
2431 let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
2432
2433 let mut maker = Maker::new(false, false, Tz::default());
2434 let result = maker.make_data_type(&schema, None, None).unwrap();
2435
2436 assert!(matches!(result.codec, Codec::TimeMicros));
2437 }
2438
2439 #[test]
2440 fn test_timestamp_millis_logical_type() {
2441 for tz in [Tz::OffsetZero, Tz::Utc] {
2442 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
2443
2444 let mut maker = Maker::new(false, false, tz);
2445 let result = maker.make_data_type(&schema, None, None).unwrap();
2446
2447 let Codec::TimestampMillis(Some(actual_tz)) = result.codec else {
2448 panic!("Expected TimestampMillis codec");
2449 };
2450 assert_eq!(actual_tz, tz);
2451 }
2452 }
2453
2454 #[test]
2455 fn test_timestamp_micros_logical_type() {
2456 for tz in [Tz::OffsetZero, Tz::Utc] {
2457 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
2458
2459 let mut maker = Maker::new(false, false, tz);
2460 let result = maker.make_data_type(&schema, None, None).unwrap();
2461
2462 let Codec::TimestampMicros(Some(actual_tz)) = result.codec else {
2463 panic!("Expected TimestampMicros codec");
2464 };
2465 assert_eq!(actual_tz, tz);
2466 }
2467 }
2468
2469 #[test]
2470 fn test_timestamp_nanos_logical_type() {
2471 for tz in [Tz::OffsetZero, Tz::Utc] {
2472 let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-nanos");
2473
2474 let mut maker = Maker::new(false, false, tz);
2475 let result = maker.make_data_type(&schema, None, None).unwrap();
2476
2477 let Codec::TimestampNanos(Some(actual_tz)) = result.codec else {
2478 panic!("Expected TimestampNanos codec");
2479 };
2480 assert_eq!(actual_tz, tz);
2481 }
2482 }
2483
2484 #[test]
2485 fn test_local_timestamp_millis_logical_type() {
2486 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
2487
2488 let mut maker = Maker::new(false, false, Tz::default());
2489 let result = maker.make_data_type(&schema, None, None).unwrap();
2490
2491 assert!(matches!(result.codec, Codec::TimestampMillis(None)));
2492 }
2493
2494 #[test]
2495 fn test_local_timestamp_micros_logical_type() {
2496 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
2497
2498 let mut maker = Maker::new(false, false, Tz::default());
2499 let result = maker.make_data_type(&schema, None, None).unwrap();
2500
2501 assert!(matches!(result.codec, Codec::TimestampMicros(None)));
2502 }
2503
2504 #[test]
2505 fn test_local_timestamp_nanos_logical_type() {
2506 let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-nanos");
2507
2508 let mut maker = Maker::new(false, false, Tz::default());
2509 let result = maker.make_data_type(&schema, None, None).unwrap();
2510
2511 assert!(matches!(result.codec, Codec::TimestampNanos(None)));
2512 }
2513
2514 #[test]
2515 fn test_uuid_type() {
2516 let mut codec = Codec::Fixed(16);
2517 if let c @ Codec::Fixed(16) = &mut codec {
2518 *c = Codec::Uuid;
2519 }
2520 assert!(matches!(codec, Codec::Uuid));
2521 }
2522
2523 #[test]
2524 fn test_duration_logical_type() {
2525 let mut codec = Codec::Fixed(12);
2526
2527 if let c @ Codec::Fixed(12) = &mut codec {
2528 *c = Codec::Interval;
2529 }
2530
2531 assert!(matches!(codec, Codec::Interval));
2532 }
2533
2534 #[test]
2535 fn test_decimal_logical_type_not_implemented() {
2536 let codec = Codec::Fixed(16);
2537
2538 let process_decimal = || -> Result<(), ArrowError> {
2539 if let Codec::Fixed(_) = codec {
2540 return Err(ArrowError::NotYetImplemented(
2541 "Decimals are not currently supported".to_string(),
2542 ));
2543 }
2544 Ok(())
2545 };
2546
2547 let result = process_decimal();
2548
2549 assert!(result.is_err());
2550 if let Err(ArrowError::NotYetImplemented(msg)) = result {
2551 assert!(msg.contains("Decimals are not currently supported"));
2552 } else {
2553 panic!("Expected NotYetImplemented error");
2554 }
2555 }
2556 #[test]
2557 fn test_unknown_logical_type_added_to_metadata() {
2558 let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
2559
2560 let mut maker = Maker::new(false, false, Tz::default());
2561 let result = maker.make_data_type(&schema, None, None).unwrap();
2562
2563 assert_eq!(
2564 result.metadata.get("logicalType"),
2565 Some(&"custom-type".to_string())
2566 );
2567 }
2568
2569 #[test]
2570 fn test_string_with_utf8view_enabled() {
2571 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2572
2573 let mut maker = Maker::new(true, false, Tz::default());
2574 let result = maker.make_data_type(&schema, None, None).unwrap();
2575
2576 assert!(matches!(result.codec, Codec::Utf8View));
2577 }
2578
2579 #[test]
2580 fn test_string_without_utf8view_enabled() {
2581 let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2582
2583 let mut maker = Maker::new(false, false, Tz::default());
2584 let result = maker.make_data_type(&schema, None, None).unwrap();
2585
2586 assert!(matches!(result.codec, Codec::Utf8));
2587 }
2588
2589 #[test]
2590 fn test_record_with_string_and_utf8view_enabled() {
2591 let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2592
2593 let avro_field = crate::schema::Field {
2594 name: "string_field",
2595 r#type: field_schema,
2596 default: None,
2597 doc: None,
2598 aliases: vec![],
2599 };
2600
2601 let record = Record {
2602 name: "test_record",
2603 namespace: None,
2604 aliases: vec![],
2605 doc: None,
2606 fields: vec![avro_field],
2607 attributes: Attributes::default(),
2608 };
2609
2610 let schema = Schema::Complex(ComplexType::Record(record));
2611
2612 let mut maker = Maker::new(true, false, Tz::default());
2613 let result = maker.make_data_type(&schema, None, None).unwrap();
2614
2615 if let Codec::Struct(fields) = &result.codec {
2616 let first_field_codec = &fields[0].data_type().codec;
2617 assert!(matches!(first_field_codec, Codec::Utf8View));
2618 } else {
2619 panic!("Expected Struct codec");
2620 }
2621 }
2622
2623 #[test]
2624 fn test_union_with_strict_mode() {
2625 let schema = Schema::Union(vec![
2626 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2627 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2628 ]);
2629
2630 let mut maker = Maker::new(false, true, Tz::default());
2631 let result = maker.make_data_type(&schema, None, None);
2632
2633 assert!(result.is_err());
2634 match result {
2635 Err(ArrowError::SchemaError(msg)) => {
2636 assert!(msg.contains(
2637 "Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
2638 ));
2639 }
2640 _ => panic!("Expected SchemaError"),
2641 }
2642 }
2643
2644 #[test]
2645 fn test_resolve_int_to_float_promotion() {
2646 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Float);
2647 assert!(matches!(result.codec, Codec::Float32));
2648 assert_eq!(
2649 result.resolution,
2650 Some(ResolutionInfo::Promotion(Promotion::IntToFloat))
2651 );
2652 }
2653
2654 #[test]
2655 fn test_resolve_int_to_double_promotion() {
2656 let result = resolve_promotion(PrimitiveType::Int, PrimitiveType::Double);
2657 assert!(matches!(result.codec, Codec::Float64));
2658 assert_eq!(
2659 result.resolution,
2660 Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
2661 );
2662 }
2663
2664 #[test]
2665 fn test_resolve_long_to_float_promotion() {
2666 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Float);
2667 assert!(matches!(result.codec, Codec::Float32));
2668 assert_eq!(
2669 result.resolution,
2670 Some(ResolutionInfo::Promotion(Promotion::LongToFloat))
2671 );
2672 }
2673
2674 #[test]
2675 fn test_resolve_long_to_double_promotion() {
2676 let result = resolve_promotion(PrimitiveType::Long, PrimitiveType::Double);
2677 assert!(matches!(result.codec, Codec::Float64));
2678 assert_eq!(
2679 result.resolution,
2680 Some(ResolutionInfo::Promotion(Promotion::LongToDouble))
2681 );
2682 }
2683
2684 #[test]
2685 fn test_resolve_float_to_double_promotion() {
2686 let result = resolve_promotion(PrimitiveType::Float, PrimitiveType::Double);
2687 assert!(matches!(result.codec, Codec::Float64));
2688 assert_eq!(
2689 result.resolution,
2690 Some(ResolutionInfo::Promotion(Promotion::FloatToDouble))
2691 );
2692 }
2693
2694 #[test]
2695 fn test_resolve_string_to_bytes_promotion() {
2696 let result = resolve_promotion(PrimitiveType::String, PrimitiveType::Bytes);
2697 assert!(matches!(result.codec, Codec::Binary));
2698 assert_eq!(
2699 result.resolution,
2700 Some(ResolutionInfo::Promotion(Promotion::StringToBytes))
2701 );
2702 }
2703
2704 #[test]
2705 fn test_resolve_bytes_to_string_promotion() {
2706 let result = resolve_promotion(PrimitiveType::Bytes, PrimitiveType::String);
2707 assert!(matches!(result.codec, Codec::Utf8));
2708 assert_eq!(
2709 result.resolution,
2710 Some(ResolutionInfo::Promotion(Promotion::BytesToString))
2711 );
2712 }
2713
2714 #[test]
2715 fn test_resolve_illegal_promotion_double_to_float_errors() {
2716 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Double));
2717 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Float));
2718 let mut maker = Maker::new(false, false, Tz::default());
2719 let result = maker.make_data_type(&writer_schema, Some(&reader_schema), None);
2720 assert!(result.is_err());
2721 match result {
2722 Err(ArrowError::ParseError(msg)) => {
2723 assert!(msg.contains("Illegal promotion"));
2724 }
2725 _ => panic!("Expected ParseError for illegal promotion Double -> Float"),
2726 }
2727 }
2728
2729 #[test]
2730 fn test_promotion_within_nullable_union_keeps_writer_null_ordering() {
2731 let writer = Schema::Union(vec![
2732 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2733 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2734 ]);
2735 let reader = Schema::Union(vec![
2736 Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
2737 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2738 ]);
2739 let mut maker = Maker::new(false, false, Tz::default());
2740 let result = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2741 assert!(matches!(result.codec, Codec::Float64));
2742 assert_eq!(
2743 result.resolution,
2744 Some(ResolutionInfo::Union(ResolvedUnion {
2745 writer_to_reader: [
2746 None,
2747 Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
2748 ]
2749 .into(),
2750 writer_is_union: true,
2751 reader_is_union: true,
2752 }))
2753 );
2754 assert_eq!(result.nullability, Some(Nullability::NullFirst));
2755 }
2756
2757 #[test]
2758 fn test_resolve_writer_union_to_reader_non_union_partial_coverage() {
2759 let writer = mk_union(vec![
2760 mk_primitive(PrimitiveType::String),
2761 mk_primitive(PrimitiveType::Long),
2762 ]);
2763 let reader = mk_primitive(PrimitiveType::Bytes);
2764 let mut maker = Maker::new(false, false, Tz::default());
2765 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2766 assert!(matches!(dt.codec(), Codec::Binary));
2767 let resolved = match dt.resolution {
2768 Some(ResolutionInfo::Union(u)) => u,
2769 other => panic!("expected union resolution info, got {other:?}"),
2770 };
2771 assert!(resolved.writer_is_union && !resolved.reader_is_union);
2772 assert_eq!(
2773 resolved.writer_to_reader.as_ref(),
2774 &[
2775 Some((0, ResolutionInfo::Promotion(Promotion::StringToBytes))),
2776 None
2777 ]
2778 );
2779 }
2780
2781 #[test]
2782 fn test_resolve_writer_non_union_to_reader_union_prefers_direct_over_promotion() {
2783 let writer = mk_primitive(PrimitiveType::Long);
2784 let reader = mk_union(vec![
2785 mk_primitive(PrimitiveType::Long),
2786 mk_primitive(PrimitiveType::Double),
2787 ]);
2788 let mut maker = Maker::new(false, false, Tz::default());
2789 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2790 let resolved = match dt.resolution {
2791 Some(ResolutionInfo::Union(u)) => u,
2792 other => panic!("expected union resolution info, got {other:?}"),
2793 };
2794 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2795 assert_eq!(
2796 resolved.writer_to_reader.as_ref(),
2797 &[Some((0, ResolutionInfo::Promotion(Promotion::Direct)))]
2798 );
2799 }
2800
2801 #[test]
2802 fn test_resolve_writer_non_union_to_reader_union_uses_promotion_when_needed() {
2803 let writer = mk_primitive(PrimitiveType::Int);
2804 let reader = mk_union(vec![
2805 mk_primitive(PrimitiveType::Null),
2806 mk_primitive(PrimitiveType::Long),
2807 mk_primitive(PrimitiveType::String),
2808 ]);
2809 let mut maker = Maker::new(false, false, Tz::default());
2810 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
2811 let resolved = match dt.resolution {
2812 Some(ResolutionInfo::Union(u)) => u,
2813 other => panic!("expected union resolution info, got {other:?}"),
2814 };
2815 assert_eq!(
2816 resolved.writer_to_reader.as_ref(),
2817 &[Some((1, ResolutionInfo::Promotion(Promotion::IntToLong)))]
2818 );
2819 }
2820
2821 #[test]
2822 fn test_resolve_writer_non_union_to_reader_union_preserves_inner_record_defaults() {
2823 let writer = Schema::Complex(ComplexType::Record(Record {
2827 name: "Inner",
2828 namespace: None,
2829 doc: None,
2830 aliases: vec![],
2831 fields: vec![AvroFieldSchema {
2832 name: "a",
2833 doc: None,
2834 r#type: mk_primitive(PrimitiveType::Int),
2835 default: None,
2836 aliases: vec![],
2837 }],
2838 attributes: Attributes::default(),
2839 }));
2840 let reader = mk_union(vec![
2841 Schema::Complex(ComplexType::Record(Record {
2842 name: "Inner",
2843 namespace: None,
2844 doc: None,
2845 aliases: vec![],
2846 fields: vec![
2847 AvroFieldSchema {
2848 name: "a",
2849 doc: None,
2850 r#type: mk_primitive(PrimitiveType::Int),
2851 default: None,
2852 aliases: vec![],
2853 },
2854 AvroFieldSchema {
2855 name: "b",
2856 doc: None,
2857 r#type: mk_primitive(PrimitiveType::Int),
2858 default: Some(Value::Number(serde_json::Number::from(42))),
2859 aliases: vec![],
2860 },
2861 ],
2862 attributes: Attributes::default(),
2863 })),
2864 mk_primitive(PrimitiveType::String),
2865 ]);
2866 let mut maker = Maker::new(false, false, Default::default());
2867 let dt = maker
2868 .make_data_type(&writer, Some(&reader), None)
2869 .expect("resolution should succeed");
2870 let resolved = match dt.resolution.as_ref() {
2872 Some(ResolutionInfo::Union(u)) => u,
2873 other => panic!("expected union resolution info, got {other:?}"),
2874 };
2875 assert!(!resolved.writer_is_union && resolved.reader_is_union);
2876 assert_eq!(
2877 resolved.writer_to_reader.len(),
2878 1,
2879 "expected the non-union record to resolve to a union variant"
2880 );
2881 let resolution = match resolved.writer_to_reader.first().unwrap() {
2882 Some((0, resolution)) => resolution,
2883 other => panic!("unexpected writer-to-reader table value {other:?}"),
2884 };
2885 match resolution {
2886 ResolutionInfo::Record(ResolvedRecord {
2887 writer_fields,
2888 default_fields,
2889 }) => {
2890 assert_eq!(writer_fields.len(), 1);
2891 assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2892 assert_eq!(default_fields.len(), 1);
2893 assert_eq!(default_fields[0], 1);
2894 }
2895 other => panic!("unexpected resolution {other:?}"),
2896 }
2897 let children = match dt.codec() {
2899 Codec::Union(children, _, _) => children,
2900 other => panic!("expected union codec, got {other:?}"),
2901 };
2902 let inner_fields = match children[0].codec() {
2903 Codec::Struct(f) => f,
2904 other => panic!("expected struct codec for Inner, got {other:?}"),
2905 };
2906 assert_eq!(inner_fields.len(), 2);
2907 assert_eq!(inner_fields[1].name(), "b");
2908 assert_eq!(
2909 inner_fields[1].data_type().resolution,
2910 Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
2911 "field b should have DefaultValue(Int(42)) from schema resolution"
2912 );
2913 }
2914
2915 #[test]
2916 fn test_resolve_writer_union_to_reader_union_preserves_inner_record_defaults() {
2917 let writer = mk_union(vec![
2921 mk_primitive(PrimitiveType::String),
2922 Schema::Complex(ComplexType::Record(Record {
2923 name: "Inner",
2924 namespace: None,
2925 doc: None,
2926 aliases: vec![],
2927 fields: vec![AvroFieldSchema {
2928 name: "a",
2929 doc: None,
2930 r#type: mk_primitive(PrimitiveType::Int),
2931 default: None,
2932 aliases: vec![],
2933 }],
2934 attributes: Attributes::default(),
2935 })),
2936 ]);
2937 let reader = mk_union(vec![
2938 Schema::Complex(ComplexType::Record(Record {
2939 name: "Inner",
2940 namespace: None,
2941 doc: None,
2942 aliases: vec![],
2943 fields: vec![
2944 AvroFieldSchema {
2945 name: "a",
2946 doc: None,
2947 r#type: mk_primitive(PrimitiveType::Int),
2948 default: None,
2949 aliases: vec![],
2950 },
2951 AvroFieldSchema {
2952 name: "b",
2953 doc: None,
2954 r#type: mk_primitive(PrimitiveType::Int),
2955 default: Some(Value::Number(serde_json::Number::from(42))),
2956 aliases: vec![],
2957 },
2958 ],
2959 attributes: Attributes::default(),
2960 })),
2961 mk_primitive(PrimitiveType::String),
2962 ]);
2963 let mut maker = Maker::new(false, false, Default::default());
2964 let dt = maker
2965 .make_data_type(&writer, Some(&reader), None)
2966 .expect("resolution should succeed");
2967 let resolved = match dt.resolution.as_ref() {
2969 Some(ResolutionInfo::Union(u)) => u,
2970 other => panic!("expected union resolution info, got {other:?}"),
2971 };
2972 assert!(resolved.writer_is_union && resolved.reader_is_union);
2973 assert_eq!(resolved.writer_to_reader.len(), 2);
2974 let resolution = match resolved.writer_to_reader[1].as_ref() {
2975 Some((0, resolution)) => resolution,
2976 other => panic!("unexpected writer-to-reader table value {other:?}"),
2977 };
2978 match resolution {
2979 ResolutionInfo::Record(ResolvedRecord {
2980 writer_fields,
2981 default_fields,
2982 }) => {
2983 assert_eq!(writer_fields.len(), 1);
2984 assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2985 assert_eq!(default_fields.len(), 1);
2986 assert_eq!(default_fields[0], 1);
2987 }
2988 other => panic!("unexpected resolution {other:?}"),
2989 }
2990 let children = match dt.codec() {
2992 Codec::Union(children, _, _) => children,
2993 other => panic!("expected union codec, got {other:?}"),
2994 };
2995 let inner_fields = match children[0].codec() {
2996 Codec::Struct(f) => f,
2997 other => panic!("expected struct codec for Inner, got {other:?}"),
2998 };
2999 assert_eq!(inner_fields.len(), 2);
3000 assert_eq!(inner_fields[1].name(), "b");
3001 assert_eq!(
3002 inner_fields[1].data_type().resolution,
3003 Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
3004 "field b should have DefaultValue(Int(42)) from schema resolution"
3005 );
3006 }
3007
3008 #[test]
3009 fn test_resolve_both_nullable_unions_direct_match() {
3010 let writer = mk_union(vec![
3011 mk_primitive(PrimitiveType::Null),
3012 mk_primitive(PrimitiveType::String),
3013 ]);
3014 let reader = mk_union(vec![
3015 mk_primitive(PrimitiveType::String),
3016 mk_primitive(PrimitiveType::Null),
3017 ]);
3018 let mut maker = Maker::new(false, false, Tz::default());
3019 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
3020 assert!(matches!(dt.codec(), Codec::Utf8));
3021 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
3022 assert_eq!(
3023 dt.resolution,
3024 Some(ResolutionInfo::Union(ResolvedUnion {
3025 writer_to_reader: [
3026 None,
3027 Some((0, ResolutionInfo::Promotion(Promotion::Direct)))
3028 ]
3029 .into(),
3030 writer_is_union: true,
3031 reader_is_union: true
3032 }))
3033 );
3034 }
3035
3036 #[test]
3037 fn test_resolve_both_nullable_unions_with_promotion() {
3038 let writer = mk_union(vec![
3039 mk_primitive(PrimitiveType::Null),
3040 mk_primitive(PrimitiveType::Int),
3041 ]);
3042 let reader = mk_union(vec![
3043 mk_primitive(PrimitiveType::Double),
3044 mk_primitive(PrimitiveType::Null),
3045 ]);
3046 let mut maker = Maker::new(false, false, Tz::default());
3047 let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
3048 assert!(matches!(dt.codec(), Codec::Float64));
3049 assert_eq!(dt.nullability, Some(Nullability::NullFirst));
3050 assert_eq!(
3051 dt.resolution,
3052 Some(ResolutionInfo::Union(ResolvedUnion {
3053 writer_to_reader: [
3054 None,
3055 Some((0, ResolutionInfo::Promotion(Promotion::IntToDouble)))
3056 ]
3057 .into(),
3058 writer_is_union: true,
3059 reader_is_union: true
3060 }))
3061 );
3062 }
3063
3064 #[test]
3065 fn test_resolve_type_promotion() {
3066 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3067 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
3068 let mut maker = Maker::new(false, false, Tz::default());
3069 let result = maker
3070 .make_data_type(&writer_schema, Some(&reader_schema), None)
3071 .unwrap();
3072 assert!(matches!(result.codec, Codec::Int64));
3073 assert_eq!(
3074 result.resolution,
3075 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3076 );
3077 }
3078
3079 #[test]
3080 fn test_nested_record_type_reuse_without_namespace() {
3081 let schema_str = r#"
3082 {
3083 "type": "record",
3084 "name": "Record",
3085 "fields": [
3086 {
3087 "name": "nested",
3088 "type": {
3089 "type": "record",
3090 "name": "Nested",
3091 "fields": [
3092 { "name": "nested_int", "type": "int" }
3093 ]
3094 }
3095 },
3096 { "name": "nestedRecord", "type": "Nested" },
3097 { "name": "nestedArray", "type": { "type": "array", "items": "Nested" } },
3098 { "name": "nestedMap", "type": { "type": "map", "values": "Nested" } }
3099 ]
3100 }
3101 "#;
3102
3103 let schema: Schema = serde_json::from_str(schema_str).unwrap();
3104
3105 let mut maker = Maker::new(false, false, Tz::default());
3106 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3107
3108 if let Codec::Struct(fields) = avro_data_type.codec() {
3109 assert_eq!(fields.len(), 4);
3110
3111 assert_eq!(fields[0].name(), "nested");
3113 let nested_data_type = fields[0].data_type();
3114 if let Codec::Struct(nested_fields) = nested_data_type.codec() {
3115 assert_eq!(nested_fields.len(), 1);
3116 assert_eq!(nested_fields[0].name(), "nested_int");
3117 assert!(matches!(nested_fields[0].data_type().codec(), Codec::Int32));
3118 } else {
3119 panic!(
3120 "'nested' field is not a struct but {:?}",
3121 nested_data_type.codec()
3122 );
3123 }
3124
3125 assert_eq!(fields[1].name(), "nestedRecord");
3127 let nested_record_data_type = fields[1].data_type();
3128 assert_eq!(
3129 nested_record_data_type.codec().data_type(),
3130 nested_data_type.codec().data_type()
3131 );
3132
3133 assert_eq!(fields[2].name(), "nestedArray");
3135 if let Codec::List(item_type) = fields[2].data_type().codec() {
3136 assert_eq!(
3137 item_type.codec().data_type(),
3138 nested_data_type.codec().data_type()
3139 );
3140 } else {
3141 panic!("'nestedArray' field is not a list");
3142 }
3143
3144 assert_eq!(fields[3].name(), "nestedMap");
3146 if let Codec::Map(value_type) = fields[3].data_type().codec() {
3147 assert_eq!(
3148 value_type.codec().data_type(),
3149 nested_data_type.codec().data_type()
3150 );
3151 } else {
3152 panic!("'nestedMap' field is not a map");
3153 }
3154 } else {
3155 panic!("Top-level schema is not a struct");
3156 }
3157 }
3158
3159 #[test]
3160 fn test_nested_enum_type_reuse_with_namespace() {
3161 let schema_str = r#"
3162 {
3163 "type": "record",
3164 "name": "Record",
3165 "namespace": "record_ns",
3166 "fields": [
3167 {
3168 "name": "status",
3169 "type": {
3170 "type": "enum",
3171 "name": "Status",
3172 "namespace": "enum_ns",
3173 "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
3174 }
3175 },
3176 { "name": "backupStatus", "type": "enum_ns.Status" },
3177 { "name": "statusHistory", "type": { "type": "array", "items": "enum_ns.Status" } },
3178 { "name": "statusMap", "type": { "type": "map", "values": "enum_ns.Status" } }
3179 ]
3180 }
3181 "#;
3182
3183 let schema: Schema = serde_json::from_str(schema_str).unwrap();
3184
3185 let mut maker = Maker::new(false, false, Tz::default());
3186 let avro_data_type = maker.make_data_type(&schema, None, None).unwrap();
3187
3188 if let Codec::Struct(fields) = avro_data_type.codec() {
3189 assert_eq!(fields.len(), 4);
3190
3191 assert_eq!(fields[0].name(), "status");
3193 let status_data_type = fields[0].data_type();
3194 if let Codec::Enum(symbols) = status_data_type.codec() {
3195 assert_eq!(symbols.as_ref(), &["ACTIVE", "INACTIVE", "PENDING"]);
3196 } else {
3197 panic!(
3198 "'status' field is not an enum but {:?}",
3199 status_data_type.codec()
3200 );
3201 }
3202
3203 assert_eq!(fields[1].name(), "backupStatus");
3205 let backup_status_data_type = fields[1].data_type();
3206 assert_eq!(
3207 backup_status_data_type.codec().data_type(),
3208 status_data_type.codec().data_type()
3209 );
3210
3211 assert_eq!(fields[2].name(), "statusHistory");
3213 if let Codec::List(item_type) = fields[2].data_type().codec() {
3214 assert_eq!(
3215 item_type.codec().data_type(),
3216 status_data_type.codec().data_type()
3217 );
3218 } else {
3219 panic!("'statusHistory' field is not a list");
3220 }
3221
3222 assert_eq!(fields[3].name(), "statusMap");
3224 if let Codec::Map(value_type) = fields[3].data_type().codec() {
3225 assert_eq!(
3226 value_type.codec().data_type(),
3227 status_data_type.codec().data_type()
3228 );
3229 } else {
3230 panic!("'statusMap' field is not a map");
3231 }
3232 } else {
3233 panic!("Top-level schema is not a struct");
3234 }
3235 }
3236
3237 #[test]
3238 fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
3239 let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3240 let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
3241 let mut maker = Maker::new(false, false, Tz::default());
3242 let data_type = maker
3243 .make_data_type(&writer_schema, Some(&reader_schema), None)
3244 .expect("resolution should succeed");
3245 let field = AvroField {
3246 name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
3247 data_type,
3248 };
3249 assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
3250 assert!(matches!(field.data_type().codec(), Codec::Utf8));
3251 }
3252
3253 fn json_string(s: &str) -> Value {
3254 Value::String(s.to_string())
3255 }
3256
3257 fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
3258 let stored = dt
3259 .metadata
3260 .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
3261 .cloned()
3262 .unwrap_or_default();
3263 let expected = serde_json::to_string(default_json).unwrap();
3264 assert_eq!(stored, expected, "stored default metadata should match");
3265 }
3266
3267 #[test]
3268 fn test_validate_and_store_default_null_and_nullability_rules() {
3269 let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
3270 let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
3271 assert_eq!(lit, AvroLiteral::Null);
3272 assert_default_stored(&dt_null, &Value::Null);
3273 let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3274 let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
3275 assert!(
3276 err.to_string()
3277 .contains("JSON null default is only valid for `null` type"),
3278 "unexpected error: {err}"
3279 );
3280 let mut dt_int_nf =
3281 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst));
3282 let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
3283 assert_eq!(lit2, AvroLiteral::Null);
3284 assert_default_stored(&dt_int_nf, &Value::Null);
3285 let mut dt_int_ns =
3286 AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
3287 let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
3288 assert!(
3289 err2.to_string()
3290 .contains("JSON null default is only valid for `null` type"),
3291 "unexpected error: {err2}"
3292 );
3293 }
3294
3295 #[test]
3296 fn test_validate_and_store_default_primitives_and_temporal() {
3297 let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
3298 let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
3299 assert_eq!(lit, AvroLiteral::Boolean(true));
3300 assert_default_stored(&dt_bool, &Value::Bool(true));
3301 let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
3302 let lit = dt_i32
3303 .parse_and_store_default(&serde_json::json!(123))
3304 .unwrap();
3305 assert_eq!(lit, AvroLiteral::Int(123));
3306 assert_default_stored(&dt_i32, &serde_json::json!(123));
3307 let err = dt_i32
3308 .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1))
3309 .unwrap_err();
3310 assert!(format!("{err}").contains("out of i32 range"));
3311 let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3312 let lit = dt_i64
3313 .parse_and_store_default(&serde_json::json!(1234567890))
3314 .unwrap();
3315 assert_eq!(lit, AvroLiteral::Long(1234567890));
3316 assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
3317 let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None);
3318 let lit = dt_f32
3319 .parse_and_store_default(&serde_json::json!(1.25))
3320 .unwrap();
3321 assert_eq!(lit, AvroLiteral::Float(1.25));
3322 assert_default_stored(&dt_f32, &serde_json::json!(1.25));
3323 let err = dt_f32
3324 .parse_and_store_default(&serde_json::json!(1e39))
3325 .unwrap_err();
3326 assert!(format!("{err}").contains("out of f32 range"));
3327 let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3328 let lit = dt_f64
3329 .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
3330 .unwrap();
3331 assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
3332 assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI));
3333 let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
3334 let l = dt_str
3335 .parse_and_store_default(&json_string("hello"))
3336 .unwrap();
3337 assert_eq!(l, AvroLiteral::String("hello".into()));
3338 assert_default_stored(&dt_str, &json_string("hello"));
3339 let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None);
3340 let l = dt_strv
3341 .parse_and_store_default(&json_string("view"))
3342 .unwrap();
3343 assert_eq!(l, AvroLiteral::String("view".into()));
3344 assert_default_stored(&dt_strv, &json_string("view"));
3345 let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
3346 let l = dt_uuid
3347 .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
3348 .unwrap();
3349 assert_eq!(
3350 l,
3351 AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
3352 );
3353 let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None);
3354 let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
3355 assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
3356 let err = dt_bin
3357 .parse_and_store_default(&json_string("€")) .unwrap_err();
3359 assert!(format!("{err}").contains("Invalid codepoint"));
3360 let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None);
3361 let ld = dt_date
3362 .parse_and_store_default(&serde_json::json!(1))
3363 .unwrap();
3364 assert_eq!(ld, AvroLiteral::Int(1));
3365 let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None);
3366 let lt = dt_tmill
3367 .parse_and_store_default(&serde_json::json!(86_400_000))
3368 .unwrap();
3369 assert_eq!(lt, AvroLiteral::Int(86_400_000));
3370 let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None);
3371 let ltm = dt_tmicros
3372 .parse_and_store_default(&serde_json::json!(1_000_000))
3373 .unwrap();
3374 assert_eq!(ltm, AvroLiteral::Long(1_000_000));
3375 let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(None), HashMap::new(), None);
3376 let l1 = dt_ts_milli
3377 .parse_and_store_default(&serde_json::json!(123))
3378 .unwrap();
3379 assert_eq!(l1, AvroLiteral::Long(123));
3380 let mut dt_ts_micro = AvroDataType::new(Codec::TimestampMicros(None), HashMap::new(), None);
3381 let l2 = dt_ts_micro
3382 .parse_and_store_default(&serde_json::json!(456))
3383 .unwrap();
3384 assert_eq!(l2, AvroLiteral::Long(456));
3385 }
3386
3387 #[cfg(feature = "avro_custom_types")]
3388 #[test]
3389 fn test_validate_and_store_default_custom_integer_ranges() {
3390 let mut dt_i8 = AvroDataType::new(Codec::Int8, HashMap::new(), None);
3391 let lit_i8 = dt_i8
3392 .parse_and_store_default(&serde_json::json!(i8::MAX))
3393 .unwrap();
3394 assert_eq!(lit_i8, AvroLiteral::Int(i8::MAX as i32));
3395 let err_i8_high = dt_i8
3396 .parse_and_store_default(&serde_json::json!(i8::MAX as i64 + 1))
3397 .unwrap_err();
3398 assert!(err_i8_high.to_string().contains("out of i8 range"));
3399 let err_i8_low = dt_i8
3400 .parse_and_store_default(&serde_json::json!(i8::MIN as i64 - 1))
3401 .unwrap_err();
3402 assert!(err_i8_low.to_string().contains("out of i8 range"));
3403
3404 let mut dt_i16 = AvroDataType::new(Codec::Int16, HashMap::new(), None);
3405 let lit_i16 = dt_i16
3406 .parse_and_store_default(&serde_json::json!(i16::MIN))
3407 .unwrap();
3408 assert_eq!(lit_i16, AvroLiteral::Int(i16::MIN as i32));
3409 let err_i16_high = dt_i16
3410 .parse_and_store_default(&serde_json::json!(i16::MAX as i64 + 1))
3411 .unwrap_err();
3412 assert!(err_i16_high.to_string().contains("out of i16 range"));
3413 let err_i16_low = dt_i16
3414 .parse_and_store_default(&serde_json::json!(i16::MIN as i64 - 1))
3415 .unwrap_err();
3416 assert!(err_i16_low.to_string().contains("out of i16 range"));
3417
3418 let mut dt_u8 = AvroDataType::new(Codec::UInt8, HashMap::new(), None);
3419 let lit_u8 = dt_u8
3420 .parse_and_store_default(&serde_json::json!(u8::MAX))
3421 .unwrap();
3422 assert_eq!(lit_u8, AvroLiteral::Int(u8::MAX as i32));
3423 let err_u8_neg = dt_u8
3424 .parse_and_store_default(&serde_json::json!(-1))
3425 .unwrap_err();
3426 assert!(err_u8_neg.to_string().contains("out of u8 range"));
3427 let err_u8_high = dt_u8
3428 .parse_and_store_default(&serde_json::json!(u8::MAX as i64 + 1))
3429 .unwrap_err();
3430 assert!(err_u8_high.to_string().contains("out of u8 range"));
3431
3432 let mut dt_u16 = AvroDataType::new(Codec::UInt16, HashMap::new(), None);
3433 let lit_u16 = dt_u16
3434 .parse_and_store_default(&serde_json::json!(u16::MAX))
3435 .unwrap();
3436 assert_eq!(lit_u16, AvroLiteral::Int(u16::MAX as i32));
3437 let err_u16_neg = dt_u16
3438 .parse_and_store_default(&serde_json::json!(-1))
3439 .unwrap_err();
3440 assert!(err_u16_neg.to_string().contains("out of u16 range"));
3441 let err_u16_high = dt_u16
3442 .parse_and_store_default(&serde_json::json!(u16::MAX as i64 + 1))
3443 .unwrap_err();
3444 assert!(err_u16_high.to_string().contains("out of u16 range"));
3445
3446 let mut dt_u32 = AvroDataType::new(Codec::UInt32, HashMap::new(), None);
3447 let lit_u32 = dt_u32
3448 .parse_and_store_default(&serde_json::json!(u32::MAX as i64))
3449 .unwrap();
3450 assert_eq!(lit_u32, AvroLiteral::Long(u32::MAX as i64));
3451 let err_u32_neg = dt_u32
3452 .parse_and_store_default(&serde_json::json!(-1))
3453 .unwrap_err();
3454 assert!(err_u32_neg.to_string().contains("out of u32 range"));
3455 let err_u32_high = dt_u32
3456 .parse_and_store_default(&serde_json::json!(u32::MAX as i64 + 1))
3457 .unwrap_err();
3458 assert!(err_u32_high.to_string().contains("out of u32 range"));
3459 }
3460
3461 #[test]
3462 fn test_validate_and_store_default_fixed_decimal_interval() {
3463 let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None);
3464 let l = dt_fixed
3465 .parse_and_store_default(&json_string("WXYZ"))
3466 .unwrap();
3467 assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
3468 let err = dt_fixed
3469 .parse_and_store_default(&json_string("TOO LONG"))
3470 .unwrap_err();
3471 assert!(err.to_string().contains("Default length"));
3472 let mut dt_dec_fixed =
3473 AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None);
3474 let l = dt_dec_fixed
3475 .parse_and_store_default(&json_string("abc"))
3476 .unwrap();
3477 assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
3478 let err = dt_dec_fixed
3479 .parse_and_store_default(&json_string("toolong"))
3480 .unwrap_err();
3481 assert!(err.to_string().contains("Default length"));
3482 let mut dt_dec_bytes =
3483 AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None);
3484 let l = dt_dec_bytes
3485 .parse_and_store_default(&json_string("freeform"))
3486 .unwrap();
3487 assert_eq!(
3488 l,
3489 AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
3490 );
3491 let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None);
3492 let l = dt_interval
3493 .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
3494 .unwrap();
3495 assert_eq!(
3496 l,
3497 AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
3498 );
3499 let err = dt_interval
3500 .parse_and_store_default(&json_string("short"))
3501 .unwrap_err();
3502 assert!(err.to_string().contains("Default length"));
3503 }
3504
3505 #[test]
3506 fn test_validate_and_store_default_enum_list_map_struct() {
3507 let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()]
3508 .into_iter()
3509 .collect();
3510 let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None);
3511 let l = dt_enum
3512 .parse_and_store_default(&json_string("GREEN"))
3513 .unwrap();
3514 assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
3515 let err = dt_enum
3516 .parse_and_store_default(&json_string("YELLOW"))
3517 .unwrap_err();
3518 assert!(err.to_string().contains("Default enum symbol"));
3519 let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
3520 let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None);
3521 let val = serde_json::json!([1, 2, 3]);
3522 let l = dt_list.parse_and_store_default(&val).unwrap();
3523 assert_eq!(
3524 l,
3525 AvroLiteral::Array(vec![
3526 AvroLiteral::Long(1),
3527 AvroLiteral::Long(2),
3528 AvroLiteral::Long(3)
3529 ])
3530 );
3531 let err = dt_list
3532 .parse_and_store_default(&serde_json::json!({"not":"array"}))
3533 .unwrap_err();
3534 assert!(err.to_string().contains("JSON array"));
3535 let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
3536 let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None);
3537 let mv = serde_json::json!({"x": 1.5, "y": 2.5});
3538 let l = dt_map.parse_and_store_default(&mv).unwrap();
3539 let mut expected = IndexMap::new();
3540 expected.insert("x".into(), AvroLiteral::Double(1.5));
3541 expected.insert("y".into(), AvroLiteral::Double(2.5));
3542 assert_eq!(l, AvroLiteral::Map(expected));
3543 let err = dt_map
3545 .parse_and_store_default(&serde_json::json!(123))
3546 .unwrap_err();
3547 assert!(err.to_string().contains("JSON object"));
3548 let mut field_a = AvroField {
3549 name: "a".into(),
3550 data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
3551 };
3552 let field_b = AvroField {
3553 name: "b".into(),
3554 data_type: AvroDataType::new(
3555 Codec::Int64,
3556 HashMap::new(),
3557 Some(Nullability::NullFirst),
3558 ),
3559 };
3560 let mut c_md = HashMap::new();
3561 c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
3562 let field_c = AvroField {
3563 name: "c".into(),
3564 data_type: AvroDataType::new(Codec::Utf8, c_md, None),
3565 };
3566 field_a.data_type.metadata.insert("doc".into(), "na".into());
3567 let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]);
3568 let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None);
3569 let default_obj = serde_json::json!({"a": 7});
3570 let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
3571 let mut expected = IndexMap::new();
3572 expected.insert("a".into(), AvroLiteral::Int(7));
3573 expected.insert("b".into(), AvroLiteral::Null);
3574 expected.insert("c".into(), AvroLiteral::String("xyz".into()));
3575 assert_eq!(l, AvroLiteral::Map(expected));
3576 assert_default_stored(&dt_struct, &default_obj);
3577 let req_field = AvroField {
3578 name: "req".into(),
3579 data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
3580 };
3581 let mut dt_bad = AvroDataType::new(
3582 Codec::Struct(Arc::from(vec![req_field])),
3583 HashMap::new(),
3584 None,
3585 );
3586 let err = dt_bad
3587 .parse_and_store_default(&serde_json::json!({}))
3588 .unwrap_err();
3589 assert!(
3590 err.to_string().contains("missing required subfield 'req'"),
3591 "unexpected error: {err}"
3592 );
3593 let err = dt_struct
3594 .parse_and_store_default(&serde_json::json!(10))
3595 .unwrap_err();
3596 err.to_string().contains("must be a JSON object");
3597 }
3598
3599 #[test]
3600 fn test_resolve_array_promotion_and_reader_metadata() {
3601 let mut w_add: HashMap<&str, Value> = HashMap::new();
3602 w_add.insert("who", json_string("writer"));
3603 let mut r_add: HashMap<&str, Value> = HashMap::new();
3604 r_add.insert("who", json_string("reader"));
3605 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3606 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3607 attributes: Attributes {
3608 logical_type: None,
3609 additional: w_add,
3610 },
3611 }));
3612 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3613 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
3614 attributes: Attributes {
3615 logical_type: None,
3616 additional: r_add,
3617 },
3618 }));
3619 let mut maker = Maker::new(false, false, Tz::default());
3620 let dt = maker
3621 .make_data_type(&writer_schema, Some(&reader_schema), None)
3622 .unwrap();
3623 assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
3624 if let Codec::List(inner) = dt.codec() {
3625 assert!(matches!(inner.codec(), Codec::Int64));
3626 assert_eq!(
3627 inner.resolution,
3628 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3629 );
3630 } else {
3631 panic!("expected list codec");
3632 }
3633 }
3634
3635 #[test]
3636 fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
3637 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3638 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3639 attributes: Attributes::default(),
3640 }));
3641 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3642 items: Box::new(mk_union(vec![
3643 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3644 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3645 ])),
3646 attributes: Attributes::default(),
3647 }));
3648 let mut maker = Maker::new(false, false, Tz::default());
3649 let dt = maker
3650 .make_data_type(&writer_schema, Some(&reader_schema), None)
3651 .unwrap();
3652 if let Codec::List(inner) = dt.codec() {
3653 assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
3654 assert!(matches!(inner.codec(), Codec::Int32));
3655 match inner.resolution.as_ref() {
3656 Some(ResolutionInfo::Promotion(Promotion::Direct)) => {}
3657 other => panic!("expected Union resolution, got {other:?}"),
3658 }
3659 } else {
3660 panic!("expected List codec");
3661 }
3662 }
3663
3664 #[test]
3665 fn test_resolve_fixed_success_name_and_size_match_and_alias() {
3666 let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3667 name: "MD5",
3668 namespace: None,
3669 aliases: vec!["Hash16"],
3670 size: 16,
3671 attributes: Attributes::default(),
3672 }));
3673 let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
3674 name: "Hash16",
3675 namespace: None,
3676 aliases: vec![],
3677 size: 16,
3678 attributes: Attributes::default(),
3679 }));
3680 let mut maker = Maker::new(false, false, Tz::default());
3681 let dt = maker
3682 .make_data_type(&writer_schema, Some(&reader_schema), None)
3683 .unwrap();
3684 assert!(matches!(dt.codec(), Codec::Fixed(16)));
3685 }
3686
3687 #[cfg(feature = "avro_custom_types")]
3688 #[test]
3689 fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3690 let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3691 name: "ArrowIntervalMDN",
3692 namespace: None,
3693 aliases: vec![],
3694 size: 16,
3695 attributes: Attributes {
3696 logical_type: Some("arrow.interval-month-day-nano"),
3697 additional: Default::default(),
3698 },
3699 }));
3700 let mut maker = Maker::new(false, false, Default::default());
3701 let dt = maker.make_data_type(&schema, None, None).unwrap();
3702 assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3703 assert_eq!(
3704 dt.codec.data_type(),
3705 DataType::Interval(IntervalUnit::MonthDayNano)
3706 );
3707 }
3708
3709 #[test]
3710 fn test_resolve_records_mapping_default_fields_and_skip_fields() {
3711 let writer = Schema::Complex(ComplexType::Record(Record {
3712 name: "R",
3713 namespace: None,
3714 doc: None,
3715 aliases: vec![],
3716 fields: vec![
3717 crate::schema::Field {
3718 name: "a",
3719 doc: None,
3720 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3721 default: None,
3722 aliases: vec![],
3723 },
3724 crate::schema::Field {
3725 name: "skipme",
3726 doc: None,
3727 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3728 default: None,
3729 aliases: vec![],
3730 },
3731 crate::schema::Field {
3732 name: "b",
3733 doc: None,
3734 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3735 default: None,
3736 aliases: vec![],
3737 },
3738 ],
3739 attributes: Attributes::default(),
3740 }));
3741 let reader = Schema::Complex(ComplexType::Record(Record {
3742 name: "R",
3743 namespace: None,
3744 doc: None,
3745 aliases: vec![],
3746 fields: vec![
3747 crate::schema::Field {
3748 name: "b",
3749 doc: None,
3750 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3751 default: None,
3752 aliases: vec![],
3753 },
3754 crate::schema::Field {
3755 name: "a",
3756 doc: None,
3757 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
3758 default: None,
3759 aliases: vec![],
3760 },
3761 crate::schema::Field {
3762 name: "name",
3763 doc: None,
3764 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3765 default: Some(json_string("anon")),
3766 aliases: vec![],
3767 },
3768 crate::schema::Field {
3769 name: "opt",
3770 doc: None,
3771 r#type: Schema::Union(vec![
3772 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3773 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3774 ]),
3775 default: None, aliases: vec![],
3777 },
3778 ],
3779 attributes: Attributes::default(),
3780 }));
3781 let mut maker = Maker::new(false, false, Tz::default());
3782 let dt = maker
3783 .make_data_type(&writer, Some(&reader), None)
3784 .expect("record resolution");
3785 let fields = match dt.codec() {
3786 Codec::Struct(f) => f,
3787 other => panic!("expected struct, got {other:?}"),
3788 };
3789 assert_eq!(fields.len(), 4);
3790 assert_eq!(fields[0].name(), "b");
3791 assert_eq!(fields[1].name(), "a");
3792 assert_eq!(fields[2].name(), "name");
3793 assert_eq!(fields[3].name(), "opt");
3794 assert!(matches!(
3795 fields[1].data_type().resolution,
3796 Some(ResolutionInfo::Promotion(Promotion::IntToLong))
3797 ));
3798 let rec = match dt.resolution {
3799 Some(ResolutionInfo::Record(ref r)) => r.clone(),
3800 other => panic!("expected record resolution, got {other:?}"),
3801 };
3802 assert!(matches!(
3803 &rec.writer_fields[..],
3804 &[
3805 ResolvedField::ToReader(1),
3806 ResolvedField::Skip(_),
3807 ResolvedField::ToReader(0),
3808 ]
3809 ));
3810 assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3811 let ResolvedField::Skip(skip1) = &rec.writer_fields[1] else {
3812 panic!("should skip field 1")
3813 };
3814 assert!(matches!(skip1.codec(), Codec::Utf8));
3815 let name_md = &fields[2].data_type().metadata;
3816 assert_eq!(
3817 name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3818 Some(&"\"anon\"".to_string())
3819 );
3820 let opt_md = &fields[3].data_type().metadata;
3821 assert_eq!(
3822 opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
3823 Some(&"null".to_string())
3824 );
3825 }
3826
3827 #[test]
3828 fn test_named_type_alias_resolution_record_cross_namespace() {
3829 let writer_record = Record {
3830 name: "PersonV2",
3831 namespace: Some("com.example.v2"),
3832 doc: None,
3833 aliases: vec!["com.example.Person"],
3834 fields: vec![
3835 AvroFieldSchema {
3836 name: "name",
3837 doc: None,
3838 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
3839 default: None,
3840 aliases: vec![],
3841 },
3842 AvroFieldSchema {
3843 name: "age",
3844 doc: None,
3845 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3846 default: None,
3847 aliases: vec![],
3848 },
3849 ],
3850 attributes: Attributes::default(),
3851 };
3852 let reader_record = Record {
3853 name: "Person",
3854 namespace: Some("com.example"),
3855 doc: None,
3856 aliases: vec![],
3857 fields: writer_record.fields.clone(),
3858 attributes: Attributes::default(),
3859 };
3860 let writer_schema = Schema::Complex(ComplexType::Record(writer_record));
3861 let reader_schema = Schema::Complex(ComplexType::Record(reader_record));
3862 let mut maker = Maker::new(false, false, Tz::default());
3863 let result = maker
3864 .make_data_type(&writer_schema, Some(&reader_schema), None)
3865 .expect("record alias resolution should succeed");
3866 match result.codec {
3867 Codec::Struct(ref fields) => assert_eq!(fields.len(), 2),
3868 other => panic!("expected struct, got {other:?}"),
3869 }
3870 }
3871
3872 #[test]
3873 fn test_named_type_alias_resolution_enum_cross_namespace() {
3874 let writer_enum = Enum {
3875 name: "ColorV2",
3876 namespace: Some("org.example.v2"),
3877 doc: None,
3878 aliases: vec!["org.example.Color"],
3879 symbols: vec!["RED", "GREEN", "BLUE"],
3880 default: None,
3881 attributes: Attributes::default(),
3882 };
3883 let reader_enum = Enum {
3884 name: "Color",
3885 namespace: Some("org.example"),
3886 doc: None,
3887 aliases: vec![],
3888 symbols: vec!["RED", "GREEN", "BLUE"],
3889 default: None,
3890 attributes: Attributes::default(),
3891 };
3892 let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum));
3893 let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum));
3894 let mut maker = Maker::new(false, false, Tz::default());
3895 maker
3896 .make_data_type(&writer_schema, Some(&reader_schema), None)
3897 .expect("enum alias resolution should succeed");
3898 }
3899
3900 #[test]
3901 fn test_named_type_alias_resolution_fixed_cross_namespace() {
3902 let writer_fixed = Fixed {
3903 name: "Fx10V2",
3904 namespace: Some("ns.v2"),
3905 aliases: vec!["ns.Fx10"],
3906 size: 10,
3907 attributes: Attributes::default(),
3908 };
3909 let reader_fixed = Fixed {
3910 name: "Fx10",
3911 namespace: Some("ns"),
3912 aliases: vec![],
3913 size: 10,
3914 attributes: Attributes::default(),
3915 };
3916 let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed));
3917 let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed));
3918 let mut maker = Maker::new(false, false, Tz::default());
3919 maker
3920 .make_data_type(&writer_schema, Some(&reader_schema), None)
3921 .expect("fixed alias resolution should succeed");
3922 }
3923}