1use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20
21pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(untagged)]
29pub enum TypeName<'a> {
33 Primitive(PrimitiveType),
35 Ref(&'a str),
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub enum PrimitiveType {
45 Null,
47 Boolean,
49 Int,
51 Long,
53 Float,
55 Double,
57 Bytes,
59 String,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
67#[serde(rename_all = "camelCase")]
68pub struct Attributes<'a> {
69 #[serde(default)]
73 pub logical_type: Option<&'a str>,
74
75 #[serde(flatten)]
77 pub additional: HashMap<&'a str, serde_json::Value>,
78}
79
80impl Attributes<'_> {
81 pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
83 self.additional
84 .iter()
85 .map(|(k, v)| (k.to_string(), v.to_string()))
86 .collect()
87 }
88}
89
90#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
92#[serde(rename_all = "camelCase")]
93pub struct Type<'a> {
94 #[serde(borrow)]
96 pub r#type: TypeName<'a>,
97 #[serde(flatten)]
99 pub attributes: Attributes<'a>,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(untagged)]
108pub enum Schema<'a> {
109 #[serde(borrow)]
111 TypeName(TypeName<'a>),
112 #[serde(borrow)]
114 Union(Vec<Schema<'a>>),
115 #[serde(borrow)]
117 Complex(ComplexType<'a>),
118 #[serde(borrow)]
120 Type(Type<'a>),
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
127#[serde(tag = "type", rename_all = "camelCase")]
128pub enum ComplexType<'a> {
129 #[serde(borrow)]
131 Record(Record<'a>),
132 #[serde(borrow)]
134 Enum(Enum<'a>),
135 #[serde(borrow)]
137 Array(Array<'a>),
138 #[serde(borrow)]
140 Map(Map<'a>),
141 #[serde(borrow)]
143 Fixed(Fixed<'a>),
144}
145
146#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
150pub struct Record<'a> {
151 #[serde(borrow)]
153 pub name: &'a str,
154 #[serde(borrow, default)]
156 pub namespace: Option<&'a str>,
157 #[serde(borrow, default)]
159 pub doc: Option<&'a str>,
160 #[serde(borrow, default)]
162 pub aliases: Vec<&'a str>,
163 #[serde(borrow)]
165 pub fields: Vec<Field<'a>>,
166 #[serde(flatten)]
168 pub attributes: Attributes<'a>,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
173pub struct Field<'a> {
174 #[serde(borrow)]
176 pub name: &'a str,
177 #[serde(borrow, default)]
179 pub doc: Option<&'a str>,
180 #[serde(borrow)]
182 pub r#type: Schema<'a>,
183 #[serde(borrow, default)]
185 pub default: Option<&'a str>,
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
192pub struct Enum<'a> {
193 #[serde(borrow)]
195 pub name: &'a str,
196 #[serde(borrow, default)]
198 pub namespace: Option<&'a str>,
199 #[serde(borrow, default)]
201 pub doc: Option<&'a str>,
202 #[serde(borrow, default)]
204 pub aliases: Vec<&'a str>,
205 #[serde(borrow)]
207 pub symbols: Vec<&'a str>,
208 #[serde(borrow, default)]
210 pub default: Option<&'a str>,
211 #[serde(flatten)]
213 pub attributes: Attributes<'a>,
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
220pub struct Array<'a> {
221 #[serde(borrow)]
223 pub items: Box<Schema<'a>>,
224 #[serde(flatten)]
226 pub attributes: Attributes<'a>,
227}
228
229#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
233pub struct Map<'a> {
234 #[serde(borrow)]
236 pub values: Box<Schema<'a>>,
237 #[serde(flatten)]
239 pub attributes: Attributes<'a>,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
246pub struct Fixed<'a> {
247 #[serde(borrow)]
249 pub name: &'a str,
250 #[serde(borrow, default)]
252 pub namespace: Option<&'a str>,
253 #[serde(borrow, default)]
255 pub aliases: Vec<&'a str>,
256 pub size: usize,
258 #[serde(flatten)]
260 pub attributes: Attributes<'a>,
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::codec::{AvroDataType, AvroField};
267 use arrow_schema::{DataType, Fields, TimeUnit};
268 use serde_json::json;
269
270 #[test]
271 fn test_deserialize() {
272 let t: Schema = serde_json::from_str("\"string\"").unwrap();
273 assert_eq!(
274 t,
275 Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
276 );
277
278 let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
279 assert_eq!(
280 t,
281 Schema::Union(vec![
282 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
283 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
284 ])
285 );
286
287 let t: Type = serde_json::from_str(
288 r#"{
289 "type":"long",
290 "logicalType":"timestamp-micros"
291 }"#,
292 )
293 .unwrap();
294
295 let timestamp = Type {
296 r#type: TypeName::Primitive(PrimitiveType::Long),
297 attributes: Attributes {
298 logical_type: Some("timestamp-micros"),
299 additional: Default::default(),
300 },
301 };
302
303 assert_eq!(t, timestamp);
304
305 let t: ComplexType = serde_json::from_str(
306 r#"{
307 "type":"fixed",
308 "name":"fixed",
309 "namespace":"topLevelRecord.value",
310 "size":11,
311 "logicalType":"decimal",
312 "precision":25,
313 "scale":2
314 }"#,
315 )
316 .unwrap();
317
318 let decimal = ComplexType::Fixed(Fixed {
319 name: "fixed",
320 namespace: Some("topLevelRecord.value"),
321 aliases: vec![],
322 size: 11,
323 attributes: Attributes {
324 logical_type: Some("decimal"),
325 additional: vec![("precision", json!(25)), ("scale", json!(2))]
326 .into_iter()
327 .collect(),
328 },
329 });
330
331 assert_eq!(t, decimal);
332
333 let schema: Schema = serde_json::from_str(
334 r#"{
335 "type":"record",
336 "name":"topLevelRecord",
337 "fields":[
338 {
339 "name":"value",
340 "type":[
341 {
342 "type":"fixed",
343 "name":"fixed",
344 "namespace":"topLevelRecord.value",
345 "size":11,
346 "logicalType":"decimal",
347 "precision":25,
348 "scale":2
349 },
350 "null"
351 ]
352 }
353 ]
354 }"#,
355 )
356 .unwrap();
357
358 assert_eq!(
359 schema,
360 Schema::Complex(ComplexType::Record(Record {
361 name: "topLevelRecord",
362 namespace: None,
363 doc: None,
364 aliases: vec![],
365 fields: vec![Field {
366 name: "value",
367 doc: None,
368 r#type: Schema::Union(vec![
369 Schema::Complex(decimal),
370 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
371 ]),
372 default: None,
373 },],
374 attributes: Default::default(),
375 }))
376 );
377
378 let schema: Schema = serde_json::from_str(
379 r#"{
380 "type": "record",
381 "name": "LongList",
382 "aliases": ["LinkedLongs"],
383 "fields" : [
384 {"name": "value", "type": "long"},
385 {"name": "next", "type": ["null", "LongList"]}
386 ]
387 }"#,
388 )
389 .unwrap();
390
391 assert_eq!(
392 schema,
393 Schema::Complex(ComplexType::Record(Record {
394 name: "LongList",
395 namespace: None,
396 doc: None,
397 aliases: vec!["LinkedLongs"],
398 fields: vec![
399 Field {
400 name: "value",
401 doc: None,
402 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
403 default: None,
404 },
405 Field {
406 name: "next",
407 doc: None,
408 r#type: Schema::Union(vec![
409 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
410 Schema::TypeName(TypeName::Ref("LongList")),
411 ]),
412 default: None,
413 }
414 ],
415 attributes: Attributes::default(),
416 }))
417 );
418
419 let err = AvroField::try_from(&schema).unwrap_err().to_string();
421 assert_eq!(err, "Parser error: Failed to resolve .LongList");
422
423 let schema: Schema = serde_json::from_str(
424 r#"{
425 "type":"record",
426 "name":"topLevelRecord",
427 "fields":[
428 {
429 "name":"id",
430 "type":[
431 "int",
432 "null"
433 ]
434 },
435 {
436 "name":"timestamp_col",
437 "type":[
438 {
439 "type":"long",
440 "logicalType":"timestamp-micros"
441 },
442 "null"
443 ]
444 }
445 ]
446 }"#,
447 )
448 .unwrap();
449
450 assert_eq!(
451 schema,
452 Schema::Complex(ComplexType::Record(Record {
453 name: "topLevelRecord",
454 namespace: None,
455 doc: None,
456 aliases: vec![],
457 fields: vec![
458 Field {
459 name: "id",
460 doc: None,
461 r#type: Schema::Union(vec![
462 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
463 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
464 ]),
465 default: None,
466 },
467 Field {
468 name: "timestamp_col",
469 doc: None,
470 r#type: Schema::Union(vec![
471 Schema::Type(timestamp),
472 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
473 ]),
474 default: None,
475 }
476 ],
477 attributes: Default::default(),
478 }))
479 );
480 let codec = AvroField::try_from(&schema).unwrap();
481 assert_eq!(
482 codec.field(),
483 arrow_schema::Field::new(
484 "topLevelRecord",
485 DataType::Struct(Fields::from(vec![
486 arrow_schema::Field::new("id", DataType::Int32, true),
487 arrow_schema::Field::new(
488 "timestamp_col",
489 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
490 true
491 ),
492 ])),
493 false
494 )
495 );
496
497 let schema: Schema = serde_json::from_str(
498 r#"{
499 "type": "record",
500 "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
501 "fields": [
502 {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
503 {"name": "clientProtocol", "type": ["null", "string"]},
504 {"name": "serverHash", "type": "MD5"},
505 {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
506 ]
507 }"#,
508 )
509 .unwrap();
510
511 assert_eq!(
512 schema,
513 Schema::Complex(ComplexType::Record(Record {
514 name: "HandshakeRequest",
515 namespace: Some("org.apache.avro.ipc"),
516 doc: None,
517 aliases: vec![],
518 fields: vec![
519 Field {
520 name: "clientHash",
521 doc: None,
522 r#type: Schema::Complex(ComplexType::Fixed(Fixed {
523 name: "MD5",
524 namespace: None,
525 aliases: vec![],
526 size: 16,
527 attributes: Default::default(),
528 })),
529 default: None,
530 },
531 Field {
532 name: "clientProtocol",
533 doc: None,
534 r#type: Schema::Union(vec![
535 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
536 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
537 ]),
538 default: None,
539 },
540 Field {
541 name: "serverHash",
542 doc: None,
543 r#type: Schema::TypeName(TypeName::Ref("MD5")),
544 default: None,
545 },
546 Field {
547 name: "meta",
548 doc: None,
549 r#type: Schema::Union(vec![
550 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
551 Schema::Complex(ComplexType::Map(Map {
552 values: Box::new(Schema::TypeName(TypeName::Primitive(
553 PrimitiveType::Bytes
554 ))),
555 attributes: Default::default(),
556 })),
557 ]),
558 default: None,
559 }
560 ],
561 attributes: Default::default(),
562 }))
563 );
564 }
565}