arrow_avro/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20
21/// The metadata key used for storing the JSON encoded [`Schema`]
22pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
23
24/// Either a [`PrimitiveType`] or a reference to a previously defined named type
25///
26/// <https://avro.apache.org/docs/1.11.1/specification/#names>
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(untagged)]
29pub enum TypeName<'a> {
30    Primitive(PrimitiveType),
31    Ref(&'a str),
32}
33
34/// A primitive type
35///
36/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub enum PrimitiveType {
40    Null,
41    Boolean,
42    Int,
43    Long,
44    Float,
45    Double,
46    Bytes,
47    String,
48}
49
50/// Additional attributes within a [`Schema`]
51///
52/// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration>
53#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
54#[serde(rename_all = "camelCase")]
55pub struct Attributes<'a> {
56    /// A logical type name
57    ///
58    /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types>
59    #[serde(default)]
60    pub logical_type: Option<&'a str>,
61
62    /// Additional JSON attributes
63    #[serde(flatten)]
64    pub additional: HashMap<&'a str, serde_json::Value>,
65}
66
67impl Attributes<'_> {
68    /// Returns the field metadata for this [`Attributes`]
69    pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
70        self.additional
71            .iter()
72            .map(|(k, v)| (k.to_string(), v.to_string()))
73            .collect()
74    }
75}
76
77/// A type definition that is not a variant of [`ComplexType`]
78#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
79#[serde(rename_all = "camelCase")]
80pub struct Type<'a> {
81    #[serde(borrow)]
82    pub r#type: TypeName<'a>,
83    #[serde(flatten)]
84    pub attributes: Attributes<'a>,
85}
86
87/// An Avro schema
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89#[serde(untagged)]
90pub enum Schema<'a> {
91    #[serde(borrow)]
92    TypeName(TypeName<'a>),
93    #[serde(borrow)]
94    Union(Vec<Schema<'a>>),
95    #[serde(borrow)]
96    Complex(ComplexType<'a>),
97    #[serde(borrow)]
98    Type(Type<'a>),
99}
100
101/// A complex type
102///
103/// <https://avro.apache.org/docs/1.11.1/specification/#complex-types>
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105#[serde(tag = "type", rename_all = "camelCase")]
106pub enum ComplexType<'a> {
107    #[serde(borrow)]
108    Record(Record<'a>),
109    #[serde(borrow)]
110    Enum(Enum<'a>),
111    #[serde(borrow)]
112    Array(Array<'a>),
113    #[serde(borrow)]
114    Map(Map<'a>),
115    #[serde(borrow)]
116    Fixed(Fixed<'a>),
117}
118
119/// A record
120///
121/// <https://avro.apache.org/docs/1.11.1/specification/#schema-record>
122#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
123pub struct Record<'a> {
124    #[serde(borrow)]
125    pub name: &'a str,
126    #[serde(borrow, default)]
127    pub namespace: Option<&'a str>,
128    #[serde(borrow, default)]
129    pub doc: Option<&'a str>,
130    #[serde(borrow, default)]
131    pub aliases: Vec<&'a str>,
132    #[serde(borrow)]
133    pub fields: Vec<Field<'a>>,
134    #[serde(flatten)]
135    pub attributes: Attributes<'a>,
136}
137
138/// A field within a [`Record`]
139#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140pub struct Field<'a> {
141    #[serde(borrow)]
142    pub name: &'a str,
143    #[serde(borrow, default)]
144    pub doc: Option<&'a str>,
145    #[serde(borrow)]
146    pub r#type: Schema<'a>,
147    #[serde(borrow, default)]
148    pub default: Option<&'a str>,
149}
150
151/// An enumeration
152///
153/// <https://avro.apache.org/docs/1.11.1/specification/#enums>
154#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
155pub struct Enum<'a> {
156    #[serde(borrow)]
157    pub name: &'a str,
158    #[serde(borrow, default)]
159    pub namespace: Option<&'a str>,
160    #[serde(borrow, default)]
161    pub doc: Option<&'a str>,
162    #[serde(borrow, default)]
163    pub aliases: Vec<&'a str>,
164    #[serde(borrow)]
165    pub symbols: Vec<&'a str>,
166    #[serde(borrow, default)]
167    pub default: Option<&'a str>,
168    #[serde(flatten)]
169    pub attributes: Attributes<'a>,
170}
171
172/// An array
173///
174/// <https://avro.apache.org/docs/1.11.1/specification/#arrays>
175#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
176pub struct Array<'a> {
177    #[serde(borrow)]
178    pub items: Box<Schema<'a>>,
179    #[serde(flatten)]
180    pub attributes: Attributes<'a>,
181}
182
183/// A map
184///
185/// <https://avro.apache.org/docs/1.11.1/specification/#maps>
186#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
187pub struct Map<'a> {
188    #[serde(borrow)]
189    pub values: Box<Schema<'a>>,
190    #[serde(flatten)]
191    pub attributes: Attributes<'a>,
192}
193
194/// A fixed length binary array
195///
196/// <https://avro.apache.org/docs/1.11.1/specification/#fixed>
197#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
198pub struct Fixed<'a> {
199    #[serde(borrow)]
200    pub name: &'a str,
201    #[serde(borrow, default)]
202    pub namespace: Option<&'a str>,
203    #[serde(borrow, default)]
204    pub aliases: Vec<&'a str>,
205    pub size: usize,
206    #[serde(flatten)]
207    pub attributes: Attributes<'a>,
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use crate::codec::{AvroDataType, AvroField};
214    use arrow_schema::{DataType, Fields, TimeUnit};
215    use serde_json::json;
216
217    #[test]
218    fn test_deserialize() {
219        let t: Schema = serde_json::from_str("\"string\"").unwrap();
220        assert_eq!(
221            t,
222            Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
223        );
224
225        let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
226        assert_eq!(
227            t,
228            Schema::Union(vec![
229                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
230                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
231            ])
232        );
233
234        let t: Type = serde_json::from_str(
235            r#"{
236                   "type":"long",
237                   "logicalType":"timestamp-micros"
238                }"#,
239        )
240        .unwrap();
241
242        let timestamp = Type {
243            r#type: TypeName::Primitive(PrimitiveType::Long),
244            attributes: Attributes {
245                logical_type: Some("timestamp-micros"),
246                additional: Default::default(),
247            },
248        };
249
250        assert_eq!(t, timestamp);
251
252        let t: ComplexType = serde_json::from_str(
253            r#"{
254                   "type":"fixed",
255                   "name":"fixed",
256                   "namespace":"topLevelRecord.value",
257                   "size":11,
258                   "logicalType":"decimal",
259                   "precision":25,
260                   "scale":2
261                }"#,
262        )
263        .unwrap();
264
265        let decimal = ComplexType::Fixed(Fixed {
266            name: "fixed",
267            namespace: Some("topLevelRecord.value"),
268            aliases: vec![],
269            size: 11,
270            attributes: Attributes {
271                logical_type: Some("decimal"),
272                additional: vec![("precision", json!(25)), ("scale", json!(2))]
273                    .into_iter()
274                    .collect(),
275            },
276        });
277
278        assert_eq!(t, decimal);
279
280        let schema: Schema = serde_json::from_str(
281            r#"{
282               "type":"record",
283               "name":"topLevelRecord",
284               "fields":[
285                  {
286                     "name":"value",
287                     "type":[
288                        {
289                           "type":"fixed",
290                           "name":"fixed",
291                           "namespace":"topLevelRecord.value",
292                           "size":11,
293                           "logicalType":"decimal",
294                           "precision":25,
295                           "scale":2
296                        },
297                        "null"
298                     ]
299                  }
300               ]
301            }"#,
302        )
303        .unwrap();
304
305        assert_eq!(
306            schema,
307            Schema::Complex(ComplexType::Record(Record {
308                name: "topLevelRecord",
309                namespace: None,
310                doc: None,
311                aliases: vec![],
312                fields: vec![Field {
313                    name: "value",
314                    doc: None,
315                    r#type: Schema::Union(vec![
316                        Schema::Complex(decimal),
317                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
318                    ]),
319                    default: None,
320                },],
321                attributes: Default::default(),
322            }))
323        );
324
325        let schema: Schema = serde_json::from_str(
326            r#"{
327                  "type": "record",
328                  "name": "LongList",
329                  "aliases": ["LinkedLongs"],
330                  "fields" : [
331                    {"name": "value", "type": "long"},
332                    {"name": "next", "type": ["null", "LongList"]}
333                  ]
334                }"#,
335        )
336        .unwrap();
337
338        assert_eq!(
339            schema,
340            Schema::Complex(ComplexType::Record(Record {
341                name: "LongList",
342                namespace: None,
343                doc: None,
344                aliases: vec!["LinkedLongs"],
345                fields: vec![
346                    Field {
347                        name: "value",
348                        doc: None,
349                        r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
350                        default: None,
351                    },
352                    Field {
353                        name: "next",
354                        doc: None,
355                        r#type: Schema::Union(vec![
356                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
357                            Schema::TypeName(TypeName::Ref("LongList")),
358                        ]),
359                        default: None,
360                    }
361                ],
362                attributes: Attributes::default(),
363            }))
364        );
365
366        // Recursive schema are not supported
367        let err = AvroField::try_from(&schema).unwrap_err().to_string();
368        assert_eq!(err, "Parser error: Failed to resolve .LongList");
369
370        let schema: Schema = serde_json::from_str(
371            r#"{
372               "type":"record",
373               "name":"topLevelRecord",
374               "fields":[
375                  {
376                     "name":"id",
377                     "type":[
378                        "int",
379                        "null"
380                     ]
381                  },
382                  {
383                     "name":"timestamp_col",
384                     "type":[
385                        {
386                           "type":"long",
387                           "logicalType":"timestamp-micros"
388                        },
389                        "null"
390                     ]
391                  }
392               ]
393            }"#,
394        )
395        .unwrap();
396
397        assert_eq!(
398            schema,
399            Schema::Complex(ComplexType::Record(Record {
400                name: "topLevelRecord",
401                namespace: None,
402                doc: None,
403                aliases: vec![],
404                fields: vec![
405                    Field {
406                        name: "id",
407                        doc: None,
408                        r#type: Schema::Union(vec![
409                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
410                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
411                        ]),
412                        default: None,
413                    },
414                    Field {
415                        name: "timestamp_col",
416                        doc: None,
417                        r#type: Schema::Union(vec![
418                            Schema::Type(timestamp),
419                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
420                        ]),
421                        default: None,
422                    }
423                ],
424                attributes: Default::default(),
425            }))
426        );
427        let codec = AvroField::try_from(&schema).unwrap();
428        assert_eq!(
429            codec.field(),
430            arrow_schema::Field::new(
431                "topLevelRecord",
432                DataType::Struct(Fields::from(vec![
433                    arrow_schema::Field::new("id", DataType::Int32, true),
434                    arrow_schema::Field::new(
435                        "timestamp_col",
436                        DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
437                        true
438                    ),
439                ])),
440                false
441            )
442        );
443
444        let schema: Schema = serde_json::from_str(
445            r#"{
446                  "type": "record",
447                  "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
448                  "fields": [
449                    {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
450                    {"name": "clientProtocol", "type": ["null", "string"]},
451                    {"name": "serverHash", "type": "MD5"},
452                    {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
453                  ]
454            }"#,
455        )
456        .unwrap();
457
458        assert_eq!(
459            schema,
460            Schema::Complex(ComplexType::Record(Record {
461                name: "HandshakeRequest",
462                namespace: Some("org.apache.avro.ipc"),
463                doc: None,
464                aliases: vec![],
465                fields: vec![
466                    Field {
467                        name: "clientHash",
468                        doc: None,
469                        r#type: Schema::Complex(ComplexType::Fixed(Fixed {
470                            name: "MD5",
471                            namespace: None,
472                            aliases: vec![],
473                            size: 16,
474                            attributes: Default::default(),
475                        })),
476                        default: None,
477                    },
478                    Field {
479                        name: "clientProtocol",
480                        doc: None,
481                        r#type: Schema::Union(vec![
482                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
483                            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
484                        ]),
485                        default: None,
486                    },
487                    Field {
488                        name: "serverHash",
489                        doc: None,
490                        r#type: Schema::TypeName(TypeName::Ref("MD5")),
491                        default: None,
492                    },
493                    Field {
494                        name: "meta",
495                        doc: None,
496                        r#type: Schema::Union(vec![
497                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
498                            Schema::Complex(ComplexType::Map(Map {
499                                values: Box::new(Schema::TypeName(TypeName::Primitive(
500                                    PrimitiveType::Bytes
501                                ))),
502                                attributes: Default::default(),
503                            })),
504                        ]),
505                        default: None,
506                    }
507                ],
508                attributes: Default::default(),
509            }))
510        );
511    }
512}