arrow_avro/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20
21/// The metadata key used for storing the JSON encoded [`Schema`]
22pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
23
24/// Either a [`PrimitiveType`] or a reference to a previously defined named type
25///
26/// <https://avro.apache.org/docs/1.11.1/specification/#names>
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(untagged)]
29/// A type name in an Avro schema
30///
31/// This represents the different ways a type can be referenced in an Avro schema.
32pub enum TypeName<'a> {
33    /// A primitive type like null, boolean, int, etc.
34    Primitive(PrimitiveType),
35    /// A reference to another named type
36    Ref(&'a str),
37}
38
39/// A primitive type
40///
41/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub enum PrimitiveType {
45    /// null: no value
46    Null,
47    /// boolean: a binary value
48    Boolean,
49    /// int: 32-bit signed integer
50    Int,
51    /// long: 64-bit signed integer
52    Long,
53    /// float: single precision (32-bit) IEEE 754 floating-point number
54    Float,
55    /// double: double precision (64-bit) IEEE 754 floating-point number
56    Double,
57    /// bytes: sequence of 8-bit unsigned bytes
58    Bytes,
59    /// string: Unicode character sequence
60    String,
61}
62
63/// Additional attributes within a [`Schema`]
64///
65/// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration>
66#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
67#[serde(rename_all = "camelCase")]
68pub struct Attributes<'a> {
69    /// A logical type name
70    ///
71    /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types>
72    #[serde(default)]
73    pub logical_type: Option<&'a str>,
74
75    /// Additional JSON attributes
76    #[serde(flatten)]
77    pub additional: HashMap<&'a str, serde_json::Value>,
78}
79
80impl Attributes<'_> {
81    /// Returns the field metadata for this [`Attributes`]
82    pub(crate) fn field_metadata(&self) -> HashMap<String, String> {
83        self.additional
84            .iter()
85            .map(|(k, v)| (k.to_string(), v.to_string()))
86            .collect()
87    }
88}
89
90/// A type definition that is not a variant of [`ComplexType`]
91#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
92#[serde(rename_all = "camelCase")]
93pub struct Type<'a> {
94    /// The type of this Avro data structure
95    #[serde(borrow)]
96    pub r#type: TypeName<'a>,
97    /// Additional attributes associated with this type
98    #[serde(flatten)]
99    pub attributes: Attributes<'a>,
100}
101
102/// An Avro schema
103///
104/// This represents the different shapes of Avro schemas as defined in the specification.
105/// See <https://avro.apache.org/docs/1.11.1/specification/#schemas> for more details.
106#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(untagged)]
108pub enum Schema<'a> {
109    /// A direct type name (primitive or reference)
110    #[serde(borrow)]
111    TypeName(TypeName<'a>),
112    /// A union of multiple schemas (e.g., ["null", "string"])
113    #[serde(borrow)]
114    Union(Vec<Schema<'a>>),
115    /// A complex type such as record, array, map, etc.
116    #[serde(borrow)]
117    Complex(ComplexType<'a>),
118    /// A type with attributes
119    #[serde(borrow)]
120    Type(Type<'a>),
121}
122
123/// A complex type
124///
125/// <https://avro.apache.org/docs/1.11.1/specification/#complex-types>
126#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
127#[serde(tag = "type", rename_all = "camelCase")]
128pub enum ComplexType<'a> {
129    /// Record type: a sequence of fields with names and types
130    #[serde(borrow)]
131    Record(Record<'a>),
132    /// Enum type: a set of named values
133    #[serde(borrow)]
134    Enum(Enum<'a>),
135    /// Array type: a sequence of values of the same type
136    #[serde(borrow)]
137    Array(Array<'a>),
138    /// Map type: a mapping from strings to values of the same type
139    #[serde(borrow)]
140    Map(Map<'a>),
141    /// Fixed type: a fixed-size byte array
142    #[serde(borrow)]
143    Fixed(Fixed<'a>),
144}
145
146/// A record
147///
148/// <https://avro.apache.org/docs/1.11.1/specification/#schema-record>
149#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
150pub struct Record<'a> {
151    /// Name of the record
152    #[serde(borrow)]
153    pub name: &'a str,
154    /// Optional namespace for the record, provides a way to organize names
155    #[serde(borrow, default)]
156    pub namespace: Option<&'a str>,
157    /// Optional documentation string for the record
158    #[serde(borrow, default)]
159    pub doc: Option<&'a str>,
160    /// Alternative names for this record
161    #[serde(borrow, default)]
162    pub aliases: Vec<&'a str>,
163    /// The fields contained in this record
164    #[serde(borrow)]
165    pub fields: Vec<Field<'a>>,
166    /// Additional attributes for this record
167    #[serde(flatten)]
168    pub attributes: Attributes<'a>,
169}
170
171/// A field within a [`Record`]
172#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
173pub struct Field<'a> {
174    /// Name of the field within the record
175    #[serde(borrow)]
176    pub name: &'a str,
177    /// Optional documentation for this field
178    #[serde(borrow, default)]
179    pub doc: Option<&'a str>,
180    /// The field's type definition
181    #[serde(borrow)]
182    pub r#type: Schema<'a>,
183    /// Optional default value for this field
184    #[serde(borrow, default)]
185    pub default: Option<&'a str>,
186}
187
188/// An enumeration
189///
190/// <https://avro.apache.org/docs/1.11.1/specification/#enums>
191#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
192pub struct Enum<'a> {
193    /// Name of the enum
194    #[serde(borrow)]
195    pub name: &'a str,
196    /// Optional namespace for the enum, provides organizational structure
197    #[serde(borrow, default)]
198    pub namespace: Option<&'a str>,
199    /// Optional documentation string describing the enum
200    #[serde(borrow, default)]
201    pub doc: Option<&'a str>,
202    /// Alternative names for this enum
203    #[serde(borrow, default)]
204    pub aliases: Vec<&'a str>,
205    /// The symbols (values) that this enum can have
206    #[serde(borrow)]
207    pub symbols: Vec<&'a str>,
208    /// Optional default value for this enum
209    #[serde(borrow, default)]
210    pub default: Option<&'a str>,
211    /// Additional attributes for this enum
212    #[serde(flatten)]
213    pub attributes: Attributes<'a>,
214}
215
216/// An array
217///
218/// <https://avro.apache.org/docs/1.11.1/specification/#arrays>
219#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
220pub struct Array<'a> {
221    /// The schema for items in this array
222    #[serde(borrow)]
223    pub items: Box<Schema<'a>>,
224    /// Additional attributes for this array
225    #[serde(flatten)]
226    pub attributes: Attributes<'a>,
227}
228
229/// A map
230///
231/// <https://avro.apache.org/docs/1.11.1/specification/#maps>
232#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
233pub struct Map<'a> {
234    /// The schema for values in this map
235    #[serde(borrow)]
236    pub values: Box<Schema<'a>>,
237    /// Additional attributes for this map
238    #[serde(flatten)]
239    pub attributes: Attributes<'a>,
240}
241
242/// A fixed length binary array
243///
244/// <https://avro.apache.org/docs/1.11.1/specification/#fixed>
245#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
246pub struct Fixed<'a> {
247    /// Name of the fixed type
248    #[serde(borrow)]
249    pub name: &'a str,
250    /// Optional namespace for the fixed type
251    #[serde(borrow, default)]
252    pub namespace: Option<&'a str>,
253    /// Alternative names for this fixed type
254    #[serde(borrow, default)]
255    pub aliases: Vec<&'a str>,
256    /// The number of bytes in this fixed type
257    pub size: usize,
258    /// Additional attributes for this fixed type
259    #[serde(flatten)]
260    pub attributes: Attributes<'a>,
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::codec::{AvroDataType, AvroField};
267    use arrow_schema::{DataType, Fields, TimeUnit};
268    use serde_json::json;
269
270    #[test]
271    fn test_deserialize() {
272        let t: Schema = serde_json::from_str("\"string\"").unwrap();
273        assert_eq!(
274            t,
275            Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
276        );
277
278        let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
279        assert_eq!(
280            t,
281            Schema::Union(vec![
282                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
283                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
284            ])
285        );
286
287        let t: Type = serde_json::from_str(
288            r#"{
289                   "type":"long",
290                   "logicalType":"timestamp-micros"
291                }"#,
292        )
293        .unwrap();
294
295        let timestamp = Type {
296            r#type: TypeName::Primitive(PrimitiveType::Long),
297            attributes: Attributes {
298                logical_type: Some("timestamp-micros"),
299                additional: Default::default(),
300            },
301        };
302
303        assert_eq!(t, timestamp);
304
305        let t: ComplexType = serde_json::from_str(
306            r#"{
307                   "type":"fixed",
308                   "name":"fixed",
309                   "namespace":"topLevelRecord.value",
310                   "size":11,
311                   "logicalType":"decimal",
312                   "precision":25,
313                   "scale":2
314                }"#,
315        )
316        .unwrap();
317
318        let decimal = ComplexType::Fixed(Fixed {
319            name: "fixed",
320            namespace: Some("topLevelRecord.value"),
321            aliases: vec![],
322            size: 11,
323            attributes: Attributes {
324                logical_type: Some("decimal"),
325                additional: vec![("precision", json!(25)), ("scale", json!(2))]
326                    .into_iter()
327                    .collect(),
328            },
329        });
330
331        assert_eq!(t, decimal);
332
333        let schema: Schema = serde_json::from_str(
334            r#"{
335               "type":"record",
336               "name":"topLevelRecord",
337               "fields":[
338                  {
339                     "name":"value",
340                     "type":[
341                        {
342                           "type":"fixed",
343                           "name":"fixed",
344                           "namespace":"topLevelRecord.value",
345                           "size":11,
346                           "logicalType":"decimal",
347                           "precision":25,
348                           "scale":2
349                        },
350                        "null"
351                     ]
352                  }
353               ]
354            }"#,
355        )
356        .unwrap();
357
358        assert_eq!(
359            schema,
360            Schema::Complex(ComplexType::Record(Record {
361                name: "topLevelRecord",
362                namespace: None,
363                doc: None,
364                aliases: vec![],
365                fields: vec![Field {
366                    name: "value",
367                    doc: None,
368                    r#type: Schema::Union(vec![
369                        Schema::Complex(decimal),
370                        Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
371                    ]),
372                    default: None,
373                },],
374                attributes: Default::default(),
375            }))
376        );
377
378        let schema: Schema = serde_json::from_str(
379            r#"{
380                  "type": "record",
381                  "name": "LongList",
382                  "aliases": ["LinkedLongs"],
383                  "fields" : [
384                    {"name": "value", "type": "long"},
385                    {"name": "next", "type": ["null", "LongList"]}
386                  ]
387                }"#,
388        )
389        .unwrap();
390
391        assert_eq!(
392            schema,
393            Schema::Complex(ComplexType::Record(Record {
394                name: "LongList",
395                namespace: None,
396                doc: None,
397                aliases: vec!["LinkedLongs"],
398                fields: vec![
399                    Field {
400                        name: "value",
401                        doc: None,
402                        r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
403                        default: None,
404                    },
405                    Field {
406                        name: "next",
407                        doc: None,
408                        r#type: Schema::Union(vec![
409                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
410                            Schema::TypeName(TypeName::Ref("LongList")),
411                        ]),
412                        default: None,
413                    }
414                ],
415                attributes: Attributes::default(),
416            }))
417        );
418
419        // Recursive schema are not supported
420        let err = AvroField::try_from(&schema).unwrap_err().to_string();
421        assert_eq!(err, "Parser error: Failed to resolve .LongList");
422
423        let schema: Schema = serde_json::from_str(
424            r#"{
425               "type":"record",
426               "name":"topLevelRecord",
427               "fields":[
428                  {
429                     "name":"id",
430                     "type":[
431                        "int",
432                        "null"
433                     ]
434                  },
435                  {
436                     "name":"timestamp_col",
437                     "type":[
438                        {
439                           "type":"long",
440                           "logicalType":"timestamp-micros"
441                        },
442                        "null"
443                     ]
444                  }
445               ]
446            }"#,
447        )
448        .unwrap();
449
450        assert_eq!(
451            schema,
452            Schema::Complex(ComplexType::Record(Record {
453                name: "topLevelRecord",
454                namespace: None,
455                doc: None,
456                aliases: vec![],
457                fields: vec![
458                    Field {
459                        name: "id",
460                        doc: None,
461                        r#type: Schema::Union(vec![
462                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
463                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
464                        ]),
465                        default: None,
466                    },
467                    Field {
468                        name: "timestamp_col",
469                        doc: None,
470                        r#type: Schema::Union(vec![
471                            Schema::Type(timestamp),
472                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
473                        ]),
474                        default: None,
475                    }
476                ],
477                attributes: Default::default(),
478            }))
479        );
480        let codec = AvroField::try_from(&schema).unwrap();
481        assert_eq!(
482            codec.field(),
483            arrow_schema::Field::new(
484                "topLevelRecord",
485                DataType::Struct(Fields::from(vec![
486                    arrow_schema::Field::new("id", DataType::Int32, true),
487                    arrow_schema::Field::new(
488                        "timestamp_col",
489                        DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
490                        true
491                    ),
492                ])),
493                false
494            )
495        );
496
497        let schema: Schema = serde_json::from_str(
498            r#"{
499                  "type": "record",
500                  "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
501                  "fields": [
502                    {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}},
503                    {"name": "clientProtocol", "type": ["null", "string"]},
504                    {"name": "serverHash", "type": "MD5"},
505                    {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
506                  ]
507            }"#,
508        )
509        .unwrap();
510
511        assert_eq!(
512            schema,
513            Schema::Complex(ComplexType::Record(Record {
514                name: "HandshakeRequest",
515                namespace: Some("org.apache.avro.ipc"),
516                doc: None,
517                aliases: vec![],
518                fields: vec![
519                    Field {
520                        name: "clientHash",
521                        doc: None,
522                        r#type: Schema::Complex(ComplexType::Fixed(Fixed {
523                            name: "MD5",
524                            namespace: None,
525                            aliases: vec![],
526                            size: 16,
527                            attributes: Default::default(),
528                        })),
529                        default: None,
530                    },
531                    Field {
532                        name: "clientProtocol",
533                        doc: None,
534                        r#type: Schema::Union(vec![
535                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
536                            Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
537                        ]),
538                        default: None,
539                    },
540                    Field {
541                        name: "serverHash",
542                        doc: None,
543                        r#type: Schema::TypeName(TypeName::Ref("MD5")),
544                        default: None,
545                    },
546                    Field {
547                        name: "meta",
548                        doc: None,
549                        r#type: Schema::Union(vec![
550                            Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
551                            Schema::Complex(ComplexType::Map(Map {
552                                values: Box::new(Schema::TypeName(TypeName::Primitive(
553                                    PrimitiveType::Bytes
554                                ))),
555                                attributes: Default::default(),
556                            })),
557                        ]),
558                        default: None,
559                    }
560                ],
561                attributes: Default::default(),
562            }))
563        );
564    }
565}