arrow_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, TypeName};
19use arrow_schema::{
20    ArrowError, DataType, Field, FieldRef, IntervalUnit, SchemaBuilder, SchemaRef, TimeUnit,
21};
22use std::borrow::Cow;
23use std::collections::HashMap;
24use std::sync::Arc;
25
26/// Avro types are not nullable, with nullability instead encoded as a union
27/// where one of the variants is the null type.
28///
29/// To accommodate this we special case two-variant unions where one of the
30/// variants is the null type, and use this to derive arrow's notion of nullability
31#[derive(Debug, Copy, Clone)]
32pub enum Nullability {
33    /// The nulls are encoded as the first union variant
34    NullFirst,
35    /// The nulls are encoded as the second union variant
36    NullSecond,
37}
38
39/// An Avro datatype mapped to the arrow data model
40#[derive(Debug, Clone)]
41pub struct AvroDataType {
42    nullability: Option<Nullability>,
43    metadata: HashMap<String, String>,
44    codec: Codec,
45}
46
47impl AvroDataType {
48    /// Returns an arrow [`Field`] with the given name
49    pub fn field_with_name(&self, name: &str) -> Field {
50        let d = self.codec.data_type();
51        Field::new(name, d, self.nullability.is_some()).with_metadata(self.metadata.clone())
52    }
53
54    /// Returns a reference to the codec used by this data type
55    ///
56    /// The codec determines how Avro data is encoded and mapped to Arrow data types.
57    /// This is useful when we need to inspect or use the specific encoding of a field.
58    pub fn codec(&self) -> &Codec {
59        &self.codec
60    }
61
62    /// Returns the nullability status of this data type
63    ///
64    /// In Avro, nullability is represented through unions with null types.
65    /// The returned value indicates how nulls are encoded in the Avro format:
66    /// - `Some(Nullability::NullFirst)` - Nulls are encoded as the first union variant
67    /// - `Some(Nullability::NullSecond)` - Nulls are encoded as the second union variant
68    /// - `None` - The type is not nullable
69    pub fn nullability(&self) -> Option<Nullability> {
70        self.nullability
71    }
72}
73
74/// A named [`AvroDataType`]
75#[derive(Debug, Clone)]
76pub struct AvroField {
77    name: String,
78    data_type: AvroDataType,
79}
80
81impl AvroField {
82    /// Returns the arrow [`Field`]
83    pub fn field(&self) -> Field {
84        self.data_type.field_with_name(&self.name)
85    }
86
87    /// Returns the [`AvroDataType`]
88    pub fn data_type(&self) -> &AvroDataType {
89        &self.data_type
90    }
91
92    /// Returns the name of this Avro field
93    ///
94    /// This is the field name as defined in the Avro schema.
95    /// It's used to identify fields within a record structure.
96    pub fn name(&self) -> &str {
97        &self.name
98    }
99}
100
101impl<'a> TryFrom<&Schema<'a>> for AvroField {
102    type Error = ArrowError;
103
104    fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
105        match schema {
106            Schema::Complex(ComplexType::Record(r)) => {
107                let mut resolver = Resolver::default();
108                let data_type = make_data_type(schema, None, &mut resolver)?;
109                Ok(AvroField {
110                    data_type,
111                    name: r.name.to_string(),
112                })
113            }
114            _ => Err(ArrowError::ParseError(format!(
115                "Expected record got {schema:?}"
116            ))),
117        }
118    }
119}
120
121/// An Avro encoding
122///
123/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
124#[derive(Debug, Clone)]
125pub enum Codec {
126    /// Represents Avro null type, maps to Arrow's Null data type
127    Null,
128    /// Represents Avro boolean type, maps to Arrow's Boolean data type
129    Boolean,
130    /// Represents Avro int type, maps to Arrow's Int32 data type
131    Int32,
132    /// Represents Avro long type, maps to Arrow's Int64 data type
133    Int64,
134    /// Represents Avro float type, maps to Arrow's Float32 data type
135    Float32,
136    /// Represents Avro double type, maps to Arrow's Float64 data type
137    Float64,
138    /// Represents Avro bytes type, maps to Arrow's Binary data type
139    Binary,
140    /// String data represented as UTF-8 encoded bytes, corresponding to Arrow's StringArray
141    Utf8,
142    /// Represents Avro date logical type, maps to Arrow's Date32 data type
143    Date32,
144    /// Represents Avro time-millis logical type, maps to Arrow's Time32(TimeUnit::Millisecond) data type
145    TimeMillis,
146    /// Represents Avro time-micros logical type, maps to Arrow's Time64(TimeUnit::Microsecond) data type
147    TimeMicros,
148    /// Represents Avro timestamp-millis or local-timestamp-millis logical type
149    ///
150    /// Maps to Arrow's Timestamp(TimeUnit::Millisecond) data type
151    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
152    TimestampMillis(bool),
153    /// Represents Avro timestamp-micros or local-timestamp-micros logical type
154    ///
155    /// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
156    /// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
157    TimestampMicros(bool),
158    /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
159    /// The i32 parameter indicates the fixed binary size
160    Fixed(i32),
161    /// Represents Avro array type, maps to Arrow's List data type
162    List(Arc<AvroDataType>),
163    /// Represents Avro record type, maps to Arrow's Struct data type
164    Struct(Arc<[AvroField]>),
165    /// Represents Avro duration logical type, maps to Arrow's Interval(IntervalUnit::MonthDayNano) data type
166    Interval,
167}
168
169impl Codec {
170    fn data_type(&self) -> DataType {
171        match self {
172            Self::Null => DataType::Null,
173            Self::Boolean => DataType::Boolean,
174            Self::Int32 => DataType::Int32,
175            Self::Int64 => DataType::Int64,
176            Self::Float32 => DataType::Float32,
177            Self::Float64 => DataType::Float64,
178            Self::Binary => DataType::Binary,
179            Self::Utf8 => DataType::Utf8,
180            Self::Date32 => DataType::Date32,
181            Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
182            Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
183            Self::TimestampMillis(is_utc) => {
184                DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
185            }
186            Self::TimestampMicros(is_utc) => {
187                DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
188            }
189            Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
190            Self::Fixed(size) => DataType::FixedSizeBinary(*size),
191            Self::List(f) => {
192                DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
193            }
194            Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
195        }
196    }
197}
198
199impl From<PrimitiveType> for Codec {
200    fn from(value: PrimitiveType) -> Self {
201        match value {
202            PrimitiveType::Null => Self::Null,
203            PrimitiveType::Boolean => Self::Boolean,
204            PrimitiveType::Int => Self::Int32,
205            PrimitiveType::Long => Self::Int64,
206            PrimitiveType::Float => Self::Float32,
207            PrimitiveType::Double => Self::Float64,
208            PrimitiveType::Bytes => Self::Binary,
209            PrimitiveType::String => Self::Utf8,
210        }
211    }
212}
213
214/// Resolves Avro type names to [`AvroDataType`]
215///
216/// See <https://avro.apache.org/docs/1.11.1/specification/#names>
217#[derive(Debug, Default)]
218struct Resolver<'a> {
219    map: HashMap<(&'a str, &'a str), AvroDataType>,
220}
221
222impl<'a> Resolver<'a> {
223    fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
224        self.map.insert((name, namespace.unwrap_or("")), schema);
225    }
226
227    fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
228        let (namespace, name) = name
229            .rsplit_once('.')
230            .unwrap_or_else(|| (namespace.unwrap_or(""), name));
231
232        self.map
233            .get(&(namespace, name))
234            .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
235            .cloned()
236    }
237}
238
239/// Parses a [`AvroDataType`] from the provided [`Schema`] and the given `name` and `namespace`
240///
241/// `name`: is name used to refer to `schema` in its parent
242/// `namespace`: an optional qualifier used as part of a type hierarchy
243///
244/// See [`Resolver`] for more information
245fn make_data_type<'a>(
246    schema: &Schema<'a>,
247    namespace: Option<&'a str>,
248    resolver: &mut Resolver<'a>,
249) -> Result<AvroDataType, ArrowError> {
250    match schema {
251        Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType {
252            nullability: None,
253            metadata: Default::default(),
254            codec: (*p).into(),
255        }),
256        Schema::TypeName(TypeName::Ref(name)) => resolver.resolve(name, namespace),
257        Schema::Union(f) => {
258            // Special case the common case of nullable primitives
259            let null = f
260                .iter()
261                .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
262            match (f.len() == 2, null) {
263                (true, Some(0)) => {
264                    let mut field = make_data_type(&f[1], namespace, resolver)?;
265                    field.nullability = Some(Nullability::NullFirst);
266                    Ok(field)
267                }
268                (true, Some(1)) => {
269                    let mut field = make_data_type(&f[0], namespace, resolver)?;
270                    field.nullability = Some(Nullability::NullSecond);
271                    Ok(field)
272                }
273                _ => Err(ArrowError::NotYetImplemented(format!(
274                    "Union of {f:?} not currently supported"
275                ))),
276            }
277        }
278        Schema::Complex(c) => match c {
279            ComplexType::Record(r) => {
280                let namespace = r.namespace.or(namespace);
281                let fields = r
282                    .fields
283                    .iter()
284                    .map(|field| {
285                        Ok(AvroField {
286                            name: field.name.to_string(),
287                            data_type: make_data_type(&field.r#type, namespace, resolver)?,
288                        })
289                    })
290                    .collect::<Result<_, ArrowError>>()?;
291
292                let field = AvroDataType {
293                    nullability: None,
294                    codec: Codec::Struct(fields),
295                    metadata: r.attributes.field_metadata(),
296                };
297                resolver.register(r.name, namespace, field.clone());
298                Ok(field)
299            }
300            ComplexType::Array(a) => {
301                let mut field = make_data_type(a.items.as_ref(), namespace, resolver)?;
302                Ok(AvroDataType {
303                    nullability: None,
304                    metadata: a.attributes.field_metadata(),
305                    codec: Codec::List(Arc::new(field)),
306                })
307            }
308            ComplexType::Fixed(f) => {
309                let size = f.size.try_into().map_err(|e| {
310                    ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
311                })?;
312
313                let field = AvroDataType {
314                    nullability: None,
315                    metadata: f.attributes.field_metadata(),
316                    codec: Codec::Fixed(size),
317                };
318                resolver.register(f.name, namespace, field.clone());
319                Ok(field)
320            }
321            ComplexType::Enum(e) => Err(ArrowError::NotYetImplemented(format!(
322                "Enum of {e:?} not currently supported"
323            ))),
324            ComplexType::Map(m) => Err(ArrowError::NotYetImplemented(format!(
325                "Map of {m:?} not currently supported"
326            ))),
327        },
328        Schema::Type(t) => {
329            let mut field =
330                make_data_type(&Schema::TypeName(t.r#type.clone()), namespace, resolver)?;
331
332            // https://avro.apache.org/docs/1.11.1/specification/#logical-types
333            match (t.attributes.logical_type, &mut field.codec) {
334                (Some("decimal"), c @ Codec::Fixed(_)) => {
335                    return Err(ArrowError::NotYetImplemented(
336                        "Decimals are not currently supported".to_string(),
337                    ))
338                }
339                (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
340                (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
341                (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
342                (Some("timestamp-millis"), c @ Codec::Int64) => *c = Codec::TimestampMillis(true),
343                (Some("timestamp-micros"), c @ Codec::Int64) => *c = Codec::TimestampMicros(true),
344                (Some("local-timestamp-millis"), c @ Codec::Int64) => {
345                    *c = Codec::TimestampMillis(false)
346                }
347                (Some("local-timestamp-micros"), c @ Codec::Int64) => {
348                    *c = Codec::TimestampMicros(false)
349                }
350                (Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval,
351                (Some(logical), _) => {
352                    // Insert unrecognized logical type into metadata map
353                    field.metadata.insert("logicalType".into(), logical.into());
354                }
355                (None, _) => {}
356            }
357
358            if !t.attributes.additional.is_empty() {
359                for (k, v) in &t.attributes.additional {
360                    field.metadata.insert(k.to_string(), v.to_string());
361                }
362            }
363            Ok(field)
364        }
365    }
366}