1use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20
21pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(untagged)]
29pub enum TypeName<'a> {
30 Primitive(PrimitiveType),
31 Ref(&'a str),
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub enum PrimitiveType {
40 Null,
41 Boolean,
42 Int,
43 Long,
44 Float,
45 Double,
46 Bytes,
47 String,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
54#[serde(rename_all = "camelCase")]
55pub struct Attributes<'a> {
56 #[serde(default)]
60 pub logical_type: Option<&'a str>,
61
62 #[serde(flatten)]
64 pub additional: HashMap<&'a str, serde_json::Value>,
65}
66
67impl Attributes<'_> {
68 pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
70 self.additional
71 .iter()
72 .map(|(k, v)| (k.to_string(), v.to_string()))
73 .collect()
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
79#[serde(rename_all = "camelCase")]
80pub struct Type<'a> {
81 #[serde(borrow)]
82 pub r#type: TypeName<'a>,
83 #[serde(flatten)]
84 pub attributes: Attributes<'a>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89#[serde(untagged)]
90pub enum Schema<'a> {
91 #[serde(borrow)]
92 TypeName(TypeName<'a>),
93 #[serde(borrow)]
94 Union(Vec<Schema<'a>>),
95 #[serde(borrow)]
96 Complex(ComplexType<'a>),
97 #[serde(borrow)]
98 Type(Type<'a>),
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105#[serde(tag = "type", rename_all = "camelCase")]
106pub enum ComplexType<'a> {
107 #[serde(borrow)]
108 Record(Record<'a>),
109 #[serde(borrow)]
110 Enum(Enum<'a>),
111 #[serde(borrow)]
112 Array(Array<'a>),
113 #[serde(borrow)]
114 Map(Map<'a>),
115 #[serde(borrow)]
116 Fixed(Fixed<'a>),
117}
118
119#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
123pub struct Record<'a> {
124 #[serde(borrow)]
125 pub name: &'a str,
126 #[serde(borrow, default)]
127 pub namespace: Option<&'a str>,
128 #[serde(borrow, default)]
129 pub doc: Option<&'a str>,
130 #[serde(borrow, default)]
131 pub aliases: Vec<&'a str>,
132 #[serde(borrow)]
133 pub fields: Vec<Field<'a>>,
134 #[serde(flatten)]
135 pub attributes: Attributes<'a>,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140pub struct Field<'a> {
141 #[serde(borrow)]
142 pub name: &'a str,
143 #[serde(borrow, default)]
144 pub doc: Option<&'a str>,
145 #[serde(borrow)]
146 pub r#type: Schema<'a>,
147 #[serde(borrow, default)]
148 pub default: Option<&'a str>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
155pub struct Enum<'a> {
156 #[serde(borrow)]
157 pub name: &'a str,
158 #[serde(borrow, default)]
159 pub namespace: Option<&'a str>,
160 #[serde(borrow, default)]
161 pub doc: Option<&'a str>,
162 #[serde(borrow, default)]
163 pub aliases: Vec<&'a str>,
164 #[serde(borrow)]
165 pub symbols: Vec<&'a str>,
166 #[serde(borrow, default)]
167 pub default: Option<&'a str>,
168 #[serde(flatten)]
169 pub attributes: Attributes<'a>,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
176pub struct Array<'a> {
177 #[serde(borrow)]
178 pub items: Box<Schema<'a>>,
179 #[serde(flatten)]
180 pub attributes: Attributes<'a>,
181}
182
183#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
187pub struct Map<'a> {
188 #[serde(borrow)]
189 pub values: Box<Schema<'a>>,
190 #[serde(flatten)]
191 pub attributes: Attributes<'a>,
192}
193
194#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
198pub struct Fixed<'a> {
199 #[serde(borrow)]
200 pub name: &'a str,
201 #[serde(borrow, default)]
202 pub namespace: Option<&'a str>,
203 #[serde(borrow, default)]
204 pub aliases: Vec<&'a str>,
205 pub size: usize,
206 #[serde(flatten)]
207 pub attributes: Attributes<'a>,
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use crate::codec::{AvroDataType, AvroField};
214 use arrow_schema::{DataType, Fields, TimeUnit};
215 use serde_json::json;
216
217 #[test]
218 fn test_deserialize() {
219 let t: Schema = serde_json::from_str("\"string\"").unwrap();
220 assert_eq!(
221 t,
222 Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
223 );
224
225 let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
226 assert_eq!(
227 t,
228 Schema::Union(vec![
229 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
230 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
231 ])
232 );
233
234 let t: Type = serde_json::from_str(
235 r#"{
236 "type":"long",
237 "logicalType":"timestamp-micros"
238 }"#,
239 )
240 .unwrap();
241
242 let timestamp = Type {
243 r#type: TypeName::Primitive(PrimitiveType::Long),
244 attributes: Attributes {
245 logical_type: Some("timestamp-micros"),
246 additional: Default::default(),
247 },
248 };
249
250 assert_eq!(t, timestamp);
251
252 let t: ComplexType = serde_json::from_str(
253 r#"{
254 "type":"fixed",
255 "name":"fixed",
256 "namespace":"topLevelRecord.value",
257 "size":11,
258 "logicalType":"decimal",
259 "precision":25,
260 "scale":2
261 }"#,
262 )
263 .unwrap();
264
265 let decimal = ComplexType::Fixed(Fixed {
266 name: "fixed",
267 namespace: Some("topLevelRecord.value"),
268 aliases: vec![],
269 size: 11,
270 attributes: Attributes {
271 logical_type: Some("decimal"),
272 additional: vec![("precision", json!(25)), ("scale", json!(2))]
273 .into_iter()
274 .collect(),
275 },
276 });
277
278 assert_eq!(t, decimal);
279
280 let schema: Schema = serde_json::from_str(
281 r#"{
282 "type":"record",
283 "name":"topLevelRecord",
284 "fields":[
285 {
286 "name":"value",
287 "type":[
288 {
289 "type":"fixed",
290 "name":"fixed",
291 "namespace":"topLevelRecord.value",
292 "size":11,
293 "logicalType":"decimal",
294 "precision":25,
295 "scale":2
296 },
297 "null"
298 ]
299 }
300 ]
301 }"#,
302 )
303 .unwrap();
304
305 assert_eq!(
306 schema,
307 Schema::Complex(ComplexType::Record(Record {
308 name: "topLevelRecord",
309 namespace: None,
310 doc: None,
311 aliases: vec![],
312 fields: vec![Field {
313 name: "value",
314 doc: None,
315 r#type: Schema::Union(vec![
316 Schema::Complex(decimal),
317 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
318 ]),
319 default: None,
320 },],
321 attributes: Default::default(),
322 }))
323 );
324
325 let schema: Schema = serde_json::from_str(
326 r#"{
327 "type": "record",
328 "name": "LongList",
329 "aliases": ["LinkedLongs"],
330 "fields" : [
331 {"name": "value", "type": "long"},
332 {"name": "next", "type": ["null", "LongList"]}
333 ]
334 }"#,
335 )
336 .unwrap();
337
338 assert_eq!(
339 schema,
340 Schema::Complex(ComplexType::Record(Record {
341 name: "LongList",
342 namespace: None,
343 doc: None,
344 aliases: vec!["LinkedLongs"],
345 fields: vec![
346 Field {
347 name: "value",
348 doc: None,
349 r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
350 default: None,
351 },
352 Field {
353 name: "next",
354 doc: None,
355 r#type: Schema::Union(vec![
356 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
357 Schema::TypeName(TypeName::Ref("LongList")),
358 ]),
359 default: None,
360 }
361 ],
362 attributes: Attributes::default(),
363 }))
364 );
365
366 let err = AvroField::try_from(&schema).unwrap_err().to_string();
368 assert_eq!(err, "Parser error: Failed to resolve .LongList");
369
370 let schema: Schema = serde_json::from_str(
371 r#"{
372 "type":"record",
373 "name":"topLevelRecord",
374 "fields":[
375 {
376 "name":"id",
377 "type":[
378 "int",
379 "null"
380 ]
381 },
382 {
383 "name":"timestamp_col",
384 "type":[
385 {
386 "type":"long",
387 "logicalType":"timestamp-micros"
388 },
389 "null"
390 ]
391 }
392 ]
393 }"#,
394 )
395 .unwrap();
396
397 assert_eq!(
398 schema,
399 Schema::Complex(ComplexType::Record(Record {
400 name: "topLevelRecord",
401 namespace: None,
402 doc: None,
403 aliases: vec![],
404 fields: vec![
405 Field {
406 name: "id",
407 doc: None,
408 r#type: Schema::Union(vec![
409 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
410 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
411 ]),
412 default: None,
413 },
414 Field {
415 name: "timestamp_col",
416 doc: None,
417 r#type: Schema::Union(vec![
418 Schema::Type(timestamp),
419 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
420 ]),
421 default: None,
422 }
423 ],
424 attributes: Default::default(),
425 }))
426 );
427 let codec = AvroField::try_from(&schema).unwrap();
428 assert_eq!(
429 codec.field(),
430 arrow_schema::Field::new(
431 "topLevelRecord",
432 DataType::Struct(Fields::from(vec![
433 arrow_schema::Field::new("id", DataType::Int32, true),
434 arrow_schema::Field::new(
435 "timestamp_col",
436 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
437 true
438 ),
439 ])),
440 false
441 )
442 );
443
444 let schema: Schema = serde_json::from_str(
445 r#"{
446 "type": "record",
447 "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
448 "fields": [
449 {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
450 {"name": "clientProtocol", "type": ["null", "string"]},
451 {"name": "serverHash", "type": "MD5"},
452 {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
453 ]
454 }"#,
455 )
456 .unwrap();
457
458 assert_eq!(
459 schema,
460 Schema::Complex(ComplexType::Record(Record {
461 name: "HandshakeRequest",
462 namespace: Some("org.apache.avro.ipc"),
463 doc: None,
464 aliases: vec![],
465 fields: vec![
466 Field {
467 name: "clientHash",
468 doc: None,
469 r#type: Schema::Complex(ComplexType::Fixed(Fixed {
470 name: "MD5",
471 namespace: None,
472 aliases: vec![],
473 size: 16,
474 attributes: Default::default(),
475 })),
476 default: None,
477 },
478 Field {
479 name: "clientProtocol",
480 doc: None,
481 r#type: Schema::Union(vec![
482 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
483 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
484 ]),
485 default: None,
486 },
487 Field {
488 name: "serverHash",
489 doc: None,
490 r#type: Schema::TypeName(TypeName::Ref("MD5")),
491 default: None,
492 },
493 Field {
494 name: "meta",
495 doc: None,
496 r#type: Schema::Union(vec![
497 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
498 Schema::Complex(ComplexType::Map(Map {
499 values: Box::new(Schema::TypeName(TypeName::Primitive(
500 PrimitiveType::Bytes
501 ))),
502 attributes: Default::default(),
503 })),
504 ]),
505 default: None,
506 }
507 ],
508 attributes: Default::default(),
509 }))
510 );
511 }
512}