1use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, TypeName};
19use arrow_schema::{
20 ArrowError, DataType, Field, FieldRef, IntervalUnit, SchemaBuilder, SchemaRef, TimeUnit,
21};
22use std::borrow::Cow;
23use std::collections::HashMap;
24use std::sync::Arc;
25
26#[derive(Debug, Copy, Clone)]
32pub enum Nullability {
33 NullFirst,
35 NullSecond,
37}
38
39#[derive(Debug, Clone)]
41pub struct AvroDataType {
42 nullability: Option<Nullability>,
43 metadata: HashMap<String, String>,
44 codec: Codec,
45}
46
47impl AvroDataType {
48 pub fn field_with_name(&self, name: &str) -> Field {
50 let d = self.codec.data_type();
51 Field::new(name, d, self.nullability.is_some()).with_metadata(self.metadata.clone())
52 }
53
54 pub fn codec(&self) -> &Codec {
59 &self.codec
60 }
61
62 pub fn nullability(&self) -> Option<Nullability> {
70 self.nullability
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct AvroField {
77 name: String,
78 data_type: AvroDataType,
79}
80
81impl AvroField {
82 pub fn field(&self) -> Field {
84 self.data_type.field_with_name(&self.name)
85 }
86
87 pub fn data_type(&self) -> &AvroDataType {
89 &self.data_type
90 }
91
92 pub fn name(&self) -> &str {
97 &self.name
98 }
99}
100
101impl<'a> TryFrom<&Schema<'a>> for AvroField {
102 type Error = ArrowError;
103
104 fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
105 match schema {
106 Schema::Complex(ComplexType::Record(r)) => {
107 let mut resolver = Resolver::default();
108 let data_type = make_data_type(schema, None, &mut resolver)?;
109 Ok(AvroField {
110 data_type,
111 name: r.name.to_string(),
112 })
113 }
114 _ => Err(ArrowError::ParseError(format!(
115 "Expected record got {schema:?}"
116 ))),
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
125pub enum Codec {
126 Null,
128 Boolean,
130 Int32,
132 Int64,
134 Float32,
136 Float64,
138 Binary,
140 Utf8,
142 Date32,
144 TimeMillis,
146 TimeMicros,
148 TimestampMillis(bool),
153 TimestampMicros(bool),
158 Fixed(i32),
161 List(Arc<AvroDataType>),
163 Struct(Arc<[AvroField]>),
165 Interval,
167}
168
169impl Codec {
170 fn data_type(&self) -> DataType {
171 match self {
172 Self::Null => DataType::Null,
173 Self::Boolean => DataType::Boolean,
174 Self::Int32 => DataType::Int32,
175 Self::Int64 => DataType::Int64,
176 Self::Float32 => DataType::Float32,
177 Self::Float64 => DataType::Float64,
178 Self::Binary => DataType::Binary,
179 Self::Utf8 => DataType::Utf8,
180 Self::Date32 => DataType::Date32,
181 Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
182 Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
183 Self::TimestampMillis(is_utc) => {
184 DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
185 }
186 Self::TimestampMicros(is_utc) => {
187 DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
188 }
189 Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
190 Self::Fixed(size) => DataType::FixedSizeBinary(*size),
191 Self::List(f) => {
192 DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
193 }
194 Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
195 }
196 }
197}
198
199impl From<PrimitiveType> for Codec {
200 fn from(value: PrimitiveType) -> Self {
201 match value {
202 PrimitiveType::Null => Self::Null,
203 PrimitiveType::Boolean => Self::Boolean,
204 PrimitiveType::Int => Self::Int32,
205 PrimitiveType::Long => Self::Int64,
206 PrimitiveType::Float => Self::Float32,
207 PrimitiveType::Double => Self::Float64,
208 PrimitiveType::Bytes => Self::Binary,
209 PrimitiveType::String => Self::Utf8,
210 }
211 }
212}
213
214#[derive(Debug, Default)]
218struct Resolver<'a> {
219 map: HashMap<(&'a str, &'a str), AvroDataType>,
220}
221
222impl<'a> Resolver<'a> {
223 fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) {
224 self.map.insert((name, namespace.unwrap_or("")), schema);
225 }
226
227 fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroDataType, ArrowError> {
228 let (namespace, name) = name
229 .rsplit_once('.')
230 .unwrap_or_else(|| (namespace.unwrap_or(""), name));
231
232 self.map
233 .get(&(namespace, name))
234 .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
235 .cloned()
236 }
237}
238
239fn make_data_type<'a>(
246 schema: &Schema<'a>,
247 namespace: Option<&'a str>,
248 resolver: &mut Resolver<'a>,
249) -> Result<AvroDataType, ArrowError> {
250 match schema {
251 Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType {
252 nullability: None,
253 metadata: Default::default(),
254 codec: (*p).into(),
255 }),
256 Schema::TypeName(TypeName::Ref(name)) => resolver.resolve(name, namespace),
257 Schema::Union(f) => {
258 let null = f
260 .iter()
261 .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
262 match (f.len() == 2, null) {
263 (true, Some(0)) => {
264 let mut field = make_data_type(&f[1], namespace, resolver)?;
265 field.nullability = Some(Nullability::NullFirst);
266 Ok(field)
267 }
268 (true, Some(1)) => {
269 let mut field = make_data_type(&f[0], namespace, resolver)?;
270 field.nullability = Some(Nullability::NullSecond);
271 Ok(field)
272 }
273 _ => Err(ArrowError::NotYetImplemented(format!(
274 "Union of {f:?} not currently supported"
275 ))),
276 }
277 }
278 Schema::Complex(c) => match c {
279 ComplexType::Record(r) => {
280 let namespace = r.namespace.or(namespace);
281 let fields = r
282 .fields
283 .iter()
284 .map(|field| {
285 Ok(AvroField {
286 name: field.name.to_string(),
287 data_type: make_data_type(&field.r#type, namespace, resolver)?,
288 })
289 })
290 .collect::<Result<_, ArrowError>>()?;
291
292 let field = AvroDataType {
293 nullability: None,
294 codec: Codec::Struct(fields),
295 metadata: r.attributes.field_metadata(),
296 };
297 resolver.register(r.name, namespace, field.clone());
298 Ok(field)
299 }
300 ComplexType::Array(a) => {
301 let mut field = make_data_type(a.items.as_ref(), namespace, resolver)?;
302 Ok(AvroDataType {
303 nullability: None,
304 metadata: a.attributes.field_metadata(),
305 codec: Codec::List(Arc::new(field)),
306 })
307 }
308 ComplexType::Fixed(f) => {
309 let size = f.size.try_into().map_err(|e| {
310 ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
311 })?;
312
313 let field = AvroDataType {
314 nullability: None,
315 metadata: f.attributes.field_metadata(),
316 codec: Codec::Fixed(size),
317 };
318 resolver.register(f.name, namespace, field.clone());
319 Ok(field)
320 }
321 ComplexType::Enum(e) => Err(ArrowError::NotYetImplemented(format!(
322 "Enum of {e:?} not currently supported"
323 ))),
324 ComplexType::Map(m) => Err(ArrowError::NotYetImplemented(format!(
325 "Map of {m:?} not currently supported"
326 ))),
327 },
328 Schema::Type(t) => {
329 let mut field =
330 make_data_type(&Schema::TypeName(t.r#type.clone()), namespace, resolver)?;
331
332 match (t.attributes.logical_type, &mut field.codec) {
334 (Some("decimal"), c @ Codec::Fixed(_)) => {
335 return Err(ArrowError::NotYetImplemented(
336 "Decimals are not currently supported".to_string(),
337 ))
338 }
339 (Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
340 (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
341 (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
342 (Some("timestamp-millis"), c @ Codec::Int64) => *c = Codec::TimestampMillis(true),
343 (Some("timestamp-micros"), c @ Codec::Int64) => *c = Codec::TimestampMicros(true),
344 (Some("local-timestamp-millis"), c @ Codec::Int64) => {
345 *c = Codec::TimestampMillis(false)
346 }
347 (Some("local-timestamp-micros"), c @ Codec::Int64) => {
348 *c = Codec::TimestampMicros(false)
349 }
350 (Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval,
351 (Some(logical), _) => {
352 field.metadata.insert("logicalType".into(), logical.into());
354 }
355 (None, _) => {}
356 }
357
358 if !t.attributes.additional.is_empty() {
359 for (k, v) in &t.attributes.additional {
360 field.metadata.insert(k.to_string(), v.to_string());
361 }
362 }
363 Ok(field)
364 }
365 }
366}