1use crate::codec::{
21 AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo, ResolvedField,
22 ResolvedRecord, ResolvedUnion, Tz,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36 DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37 Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use strum_macros::AsRefStr;
42use uuid::Uuid;
43
44use std::cmp::Ordering;
45use std::mem;
46use std::sync::Arc;
47
48const DEFAULT_CAPACITY: usize = 1024;
49
50macro_rules! decode_decimal {
52 ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
53 let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
54 $builder.append_value(<$Int>::from_be_bytes(bytes));
55 }};
56}
57
58macro_rules! flush_decimal {
60 ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
61 let (_, vals, _) = $builder.finish().into_parts();
62 let dec = <$ArrayTy>::try_new(vals, $nulls)?
63 .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
64 Arc::new(dec) as ArrayRef
65 }};
66}
67
68macro_rules! append_decimal_default {
71 ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
72 match $lit {
73 AvroLiteral::Bytes(b) => {
74 let ext = sign_cast_to::<$N>(b)?;
75 let val = <$Int>::from_be_bytes(ext);
76 $builder.append_value(val);
77 Ok(())
78 }
79 _ => Err(AvroError::InvalidArgument(
80 concat!(
81 "Default for ",
82 $name,
83 " must be bytes (two's-complement big-endian)"
84 )
85 .to_string(),
86 )),
87 }
88 }};
89}
90
91#[derive(Debug)]
93pub(crate) struct RecordDecoder {
94 schema: SchemaRef,
95 fields: Vec<Decoder>,
96 projector: Option<Projector>,
97}
98
99impl RecordDecoder {
100 pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
111 match data_type.codec() {
112 Codec::Struct(reader_fields) => {
113 let mut arrow_fields = Vec::with_capacity(reader_fields.len());
115 let mut encodings = Vec::with_capacity(reader_fields.len());
116 let mut field_defaults = Vec::with_capacity(reader_fields.len());
117 for avro_field in reader_fields.iter() {
118 arrow_fields.push(avro_field.field());
119 encodings.push(Decoder::try_new(avro_field.data_type())?);
120
121 if let Some(ResolutionInfo::DefaultValue(lit)) =
122 avro_field.data_type().resolution.as_ref()
123 {
124 field_defaults.push(Some(lit.clone()));
125 } else {
126 field_defaults.push(None);
127 }
128 }
129 let projector = match data_type.resolution.as_ref() {
130 Some(ResolutionInfo::Record(rec)) => {
131 Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
132 }
133 _ => None,
134 };
135 Ok(Self {
136 schema: Arc::new(ArrowSchema::new(arrow_fields)),
137 fields: encodings,
138 projector,
139 })
140 }
141 other => Err(AvroError::ParseError(format!(
142 "Expected record got {other:?}"
143 ))),
144 }
145 }
146
147 pub(crate) fn schema(&self) -> &SchemaRef {
149 &self.schema
150 }
151
152 pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
154 let mut cursor = AvroCursor::new(buf);
155 match self.projector.as_mut() {
156 Some(proj) => {
157 for _ in 0..count {
158 proj.project_record(&mut cursor, &mut self.fields)?;
159 }
160 }
161 None => {
162 for _ in 0..count {
163 for field in &mut self.fields {
164 field.decode(&mut cursor)?;
165 }
166 }
167 }
168 }
169 Ok(cursor.position())
170 }
171
172 pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
174 let arrays = self
175 .fields
176 .iter_mut()
177 .map(|x| x.flush(None))
178 .collect::<Result<Vec<_>, _>>()?;
179 RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
180 }
181}
182
183#[derive(Debug, AsRefStr)]
184enum Decoder {
185 Null(usize),
186 Boolean(BooleanBufferBuilder),
187 Int32(Vec<i32>),
188 Int64(Vec<i64>),
189 #[cfg(feature = "avro_custom_types")]
190 DurationSecond(Vec<i64>),
191 #[cfg(feature = "avro_custom_types")]
192 DurationMillisecond(Vec<i64>),
193 #[cfg(feature = "avro_custom_types")]
194 DurationMicrosecond(Vec<i64>),
195 #[cfg(feature = "avro_custom_types")]
196 DurationNanosecond(Vec<i64>),
197 #[cfg(feature = "avro_custom_types")]
198 Int8(Vec<i8>),
199 #[cfg(feature = "avro_custom_types")]
200 Int16(Vec<i16>),
201 #[cfg(feature = "avro_custom_types")]
202 UInt8(Vec<u8>),
203 #[cfg(feature = "avro_custom_types")]
204 UInt16(Vec<u16>),
205 #[cfg(feature = "avro_custom_types")]
206 UInt32(Vec<u32>),
207 #[cfg(feature = "avro_custom_types")]
208 UInt64(Vec<u64>),
209 #[cfg(feature = "avro_custom_types")]
210 Float16(Vec<u16>), #[cfg(feature = "avro_custom_types")]
212 Date64(Vec<i64>),
213 #[cfg(feature = "avro_custom_types")]
214 TimeNanos(Vec<i64>),
215 #[cfg(feature = "avro_custom_types")]
216 Time32Secs(Vec<i32>),
217 #[cfg(feature = "avro_custom_types")]
218 TimestampSecs(bool, Vec<i64>),
219 #[cfg(feature = "avro_custom_types")]
220 IntervalYearMonth(Vec<i32>),
221 #[cfg(feature = "avro_custom_types")]
222 IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
223 #[cfg(feature = "avro_custom_types")]
224 IntervalDayTime(Vec<IntervalDayTime>),
225 Float32(Vec<f32>),
226 Float64(Vec<f64>),
227 Date32(Vec<i32>),
228 TimeMillis(Vec<i32>),
229 TimeMicros(Vec<i64>),
230 TimestampMillis(Option<Tz>, Vec<i64>),
231 TimestampMicros(Option<Tz>, Vec<i64>),
232 TimestampNanos(Option<Tz>, Vec<i64>),
233 Int32ToInt64(Vec<i64>),
234 Int32ToFloat32(Vec<f32>),
235 Int32ToFloat64(Vec<f64>),
236 Int64ToFloat32(Vec<f32>),
237 Int64ToFloat64(Vec<f64>),
238 Float32ToFloat64(Vec<f64>),
239 BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
240 StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
241 Binary(OffsetBufferBuilder<i32>, Vec<u8>),
242 String(OffsetBufferBuilder<i32>, Vec<u8>),
244 StringView(OffsetBufferBuilder<i32>, Vec<u8>),
246 Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
247 Record(
248 Fields,
249 Vec<Decoder>,
250 Vec<Option<AvroLiteral>>,
251 Option<Projector>,
252 ),
253 Map(
254 FieldRef,
255 OffsetBufferBuilder<i32>,
256 OffsetBufferBuilder<i32>,
257 Vec<u8>,
258 Box<Decoder>,
259 ),
260 Fixed(i32, Vec<u8>),
261 Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
262 Duration(IntervalMonthDayNanoBuilder),
263 Uuid(Vec<u8>),
264 #[cfg(feature = "small_decimals")]
265 Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
266 #[cfg(feature = "small_decimals")]
267 Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
268 Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
269 Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
270 #[cfg(feature = "avro_custom_types")]
271 RunEndEncoded(u8, usize, Box<Decoder>),
272 Union(UnionDecoder),
273 Nullable(NullablePlan, NullBufferBuilder, Box<Decoder>),
274}
275
276impl Decoder {
277 fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
278 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
279 if info.writer_is_union && !info.reader_is_union {
280 let mut clone = data_type.clone();
281 clone.resolution = None; let target = Self::try_new_internal(&clone)?;
283 let decoder = Self::Union(
284 UnionDecoderBuilder::new()
285 .with_resolved_union(info.clone())
286 .with_target(target)
287 .build()?,
288 );
289 return Ok(decoder);
290 }
291 }
292 Self::try_new_internal(data_type)
293 }
294
295 fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
296 let promotion = match data_type.resolution.as_ref() {
298 Some(ResolutionInfo::Promotion(p)) => Some(*p),
299 _ => None,
300 };
301 let decoder = match (data_type.codec(), promotion) {
302 (Codec::Int64, Some(Promotion::IntToLong)) => {
303 Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
304 }
305 (Codec::Float32, Some(Promotion::IntToFloat)) => {
306 Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
307 }
308 (Codec::Float64, Some(Promotion::IntToDouble)) => {
309 Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
310 }
311 (Codec::Float32, Some(Promotion::LongToFloat)) => {
312 Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
313 }
314 (Codec::Float64, Some(Promotion::LongToDouble)) => {
315 Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
316 }
317 (Codec::Float64, Some(Promotion::FloatToDouble)) => {
318 Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
319 }
320 (Codec::Utf8, Some(Promotion::BytesToString))
321 | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
322 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
323 Vec::with_capacity(DEFAULT_CAPACITY),
324 ),
325 (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
326 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
327 Vec::with_capacity(DEFAULT_CAPACITY),
328 ),
329 (Codec::Null, _) => Self::Null(0),
330 (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
331 (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
332 (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
333 (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
334 (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
335 (Codec::Binary, _) => Self::Binary(
336 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
337 Vec::with_capacity(DEFAULT_CAPACITY),
338 ),
339 (Codec::Utf8, _) => Self::String(
340 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
341 Vec::with_capacity(DEFAULT_CAPACITY),
342 ),
343 (Codec::Utf8View, _) => Self::StringView(
344 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
345 Vec::with_capacity(DEFAULT_CAPACITY),
346 ),
347 (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
348 (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
349 (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
350 (Codec::TimestampMillis(tz), _) => {
351 Self::TimestampMillis(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
352 }
353 (Codec::TimestampMicros(tz), _) => {
354 Self::TimestampMicros(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
355 }
356 (Codec::TimestampNanos(tz), _) => {
357 Self::TimestampNanos(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
358 }
359 #[cfg(feature = "avro_custom_types")]
360 (Codec::DurationNanos, _) => {
361 Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
362 }
363 #[cfg(feature = "avro_custom_types")]
364 (Codec::DurationMicros, _) => {
365 Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
366 }
367 #[cfg(feature = "avro_custom_types")]
368 (Codec::DurationMillis, _) => {
369 Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
370 }
371 #[cfg(feature = "avro_custom_types")]
372 (Codec::DurationSeconds, _) => {
373 Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
374 }
375 #[cfg(feature = "avro_custom_types")]
376 (Codec::Int8, _) => Self::Int8(Vec::with_capacity(DEFAULT_CAPACITY)),
377 #[cfg(feature = "avro_custom_types")]
378 (Codec::Int16, _) => Self::Int16(Vec::with_capacity(DEFAULT_CAPACITY)),
379 #[cfg(feature = "avro_custom_types")]
380 (Codec::UInt8, _) => Self::UInt8(Vec::with_capacity(DEFAULT_CAPACITY)),
381 #[cfg(feature = "avro_custom_types")]
382 (Codec::UInt16, _) => Self::UInt16(Vec::with_capacity(DEFAULT_CAPACITY)),
383 #[cfg(feature = "avro_custom_types")]
384 (Codec::UInt32, _) => Self::UInt32(Vec::with_capacity(DEFAULT_CAPACITY)),
385 #[cfg(feature = "avro_custom_types")]
386 (Codec::UInt64, _) => Self::UInt64(Vec::with_capacity(DEFAULT_CAPACITY)),
387 #[cfg(feature = "avro_custom_types")]
388 (Codec::Float16, _) => Self::Float16(Vec::with_capacity(DEFAULT_CAPACITY)),
389 #[cfg(feature = "avro_custom_types")]
390 (Codec::Date64, _) => Self::Date64(Vec::with_capacity(DEFAULT_CAPACITY)),
391 #[cfg(feature = "avro_custom_types")]
392 (Codec::TimeNanos, _) => Self::TimeNanos(Vec::with_capacity(DEFAULT_CAPACITY)),
393 #[cfg(feature = "avro_custom_types")]
394 (Codec::Time32Secs, _) => Self::Time32Secs(Vec::with_capacity(DEFAULT_CAPACITY)),
395 #[cfg(feature = "avro_custom_types")]
396 (Codec::TimestampSecs(is_utc), _) => {
397 Self::TimestampSecs(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
398 }
399 #[cfg(feature = "avro_custom_types")]
400 (Codec::IntervalYearMonth, _) => {
401 Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
402 }
403 #[cfg(feature = "avro_custom_types")]
404 (Codec::IntervalMonthDayNano, _) => {
405 Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
406 }
407 #[cfg(feature = "avro_custom_types")]
408 (Codec::IntervalDayTime, _) => {
409 Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
410 }
411 (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
412 (Codec::Decimal(precision, scale, size), _) => {
413 let p = *precision;
414 let s = *scale;
415 let prec = p as u8;
416 let scl = s.unwrap_or(0) as i8;
417 #[cfg(feature = "small_decimals")]
418 {
419 if p <= DECIMAL32_MAX_PRECISION as usize {
420 let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
421 .with_precision_and_scale(prec, scl)?;
422 Self::Decimal32(p, s, *size, builder)
423 } else if p <= DECIMAL64_MAX_PRECISION as usize {
424 let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
425 .with_precision_and_scale(prec, scl)?;
426 Self::Decimal64(p, s, *size, builder)
427 } else if p <= DECIMAL128_MAX_PRECISION as usize {
428 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
429 .with_precision_and_scale(prec, scl)?;
430 Self::Decimal128(p, s, *size, builder)
431 } else if p <= DECIMAL256_MAX_PRECISION as usize {
432 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
433 .with_precision_and_scale(prec, scl)?;
434 Self::Decimal256(p, s, *size, builder)
435 } else {
436 return Err(AvroError::ParseError(format!(
437 "Decimal precision {p} exceeds maximum supported"
438 )));
439 }
440 }
441 #[cfg(not(feature = "small_decimals"))]
442 {
443 if p <= DECIMAL128_MAX_PRECISION as usize {
444 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
445 .with_precision_and_scale(prec, scl)?;
446 Self::Decimal128(p, s, *size, builder)
447 } else if p <= DECIMAL256_MAX_PRECISION as usize {
448 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
449 .with_precision_and_scale(prec, scl)?;
450 Self::Decimal256(p, s, *size, builder)
451 } else {
452 return Err(AvroError::ParseError(format!(
453 "Decimal precision {p} exceeds maximum supported"
454 )));
455 }
456 }
457 }
458 (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
459 (Codec::List(item), _) => {
460 let decoder = Self::try_new(item)?;
461 Self::Array(
462 Arc::new(item.field_with_name("item")),
463 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
464 Box::new(decoder),
465 )
466 }
467 (Codec::Enum(symbols), _) => {
468 let res = match data_type.resolution.as_ref() {
469 Some(ResolutionInfo::EnumMapping(mapping)) => {
470 Some(EnumResolution::new(mapping))
471 }
472 _ => None,
473 };
474 Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
475 }
476 (Codec::Struct(fields), _) => {
477 let mut arrow_fields = Vec::with_capacity(fields.len());
478 let mut encodings = Vec::with_capacity(fields.len());
479 let mut field_defaults = Vec::with_capacity(fields.len());
480 for avro_field in fields.iter() {
481 let encoding = Self::try_new(avro_field.data_type())?;
482 arrow_fields.push(avro_field.field());
483 encodings.push(encoding);
484
485 if let Some(ResolutionInfo::DefaultValue(lit)) =
486 avro_field.data_type().resolution.as_ref()
487 {
488 field_defaults.push(Some(lit.clone()));
489 } else {
490 field_defaults.push(None);
491 }
492 }
493 let projector =
494 if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
495 Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
496 } else {
497 None
498 };
499 Self::Record(arrow_fields.into(), encodings, field_defaults, projector)
500 }
501 (Codec::Map(child), _) => {
502 let val_field = child.field_with_name("value");
503 let map_field = Arc::new(ArrowField::new(
504 "entries",
505 DataType::Struct(Fields::from(vec![
506 ArrowField::new("key", DataType::Utf8, false),
507 val_field,
508 ])),
509 false,
510 ));
511 let val_dec = Self::try_new(child)?;
512 Self::Map(
513 map_field,
514 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
515 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
516 Vec::with_capacity(DEFAULT_CAPACITY),
517 Box::new(val_dec),
518 )
519 }
520 (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
521 (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
522 let decoders = encodings
523 .iter()
524 .map(Self::try_new_internal)
525 .collect::<Result<Vec<_>, _>>()?;
526 if fields.len() != decoders.len() {
527 return Err(AvroError::SchemaError(format!(
528 "Union has {} fields but {} decoders",
529 fields.len(),
530 decoders.len()
531 )));
532 }
533 let branch_count = decoders.len();
536 let max_addr = (i32::MAX as usize) + 1;
537 if branch_count > max_addr {
538 return Err(AvroError::SchemaError(format!(
539 "Union has {branch_count} branches, which exceeds the maximum addressable \
540 branches by an Avro int tag ({} + 1).",
541 i32::MAX
542 )));
543 }
544 let mut builder = UnionDecoderBuilder::new()
545 .with_fields(fields.clone())
546 .with_branches(decoders);
547 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
548 if info.reader_is_union {
549 builder = builder.with_resolved_union(info.clone());
550 }
551 }
552 Self::Union(builder.build()?)
553 }
554 (Codec::Union(_, _, _), _) => {
555 return Err(AvroError::NYI(
556 "Sparse Arrow unions are not yet supported".to_string(),
557 ));
558 }
559 #[cfg(feature = "avro_custom_types")]
560 (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
561 let inner = Self::try_new(values_dt)?;
562 let byte_width: u8 = match *width_bits_or_bytes {
563 2 | 4 | 8 => *width_bits_or_bytes,
564 16 => 2,
565 32 => 4,
566 64 => 8,
567 other => {
568 return Err(AvroError::InvalidArgument(format!(
569 "Unsupported run-end width {other} for RunEndEncoded; \
570 expected 16/32/64 bits or 2/4/8 bytes"
571 )));
572 }
573 };
574 Self::RunEndEncoded(byte_width, 0, Box::new(inner))
575 }
576 };
577 Ok(match data_type.nullability() {
578 Some(nullability) => {
579 let plan = match &data_type.resolution {
581 None => NullablePlan::ReadTag {
582 nullability,
583 resolution: ResolutionPlan::Promotion(Promotion::Direct),
584 },
585 Some(ResolutionInfo::Promotion(_)) => {
586 NullablePlan::FromSingle {
589 resolution: ResolutionPlan::Promotion(Promotion::Direct),
590 }
591 }
592 Some(ResolutionInfo::Union(info)) if !info.writer_is_union => {
593 let Some(Some((_, resolution))) = info.writer_to_reader.first() else {
594 return Err(AvroError::SchemaError(
595 "unexpected union resolution info for non-union writer and union reader type".into(),
596 ));
597 };
598 let resolution = ResolutionPlan::try_new(&decoder, resolution)?;
599 NullablePlan::FromSingle { resolution }
600 }
601 Some(ResolutionInfo::Union(info)) => {
602 let Some((_, resolution)) =
603 info.writer_to_reader[nullability.non_null_index()].as_ref()
604 else {
605 return Err(AvroError::SchemaError(
606 "unexpected union resolution info for nullable writer type".into(),
607 ));
608 };
609 NullablePlan::ReadTag {
610 nullability,
611 resolution: ResolutionPlan::try_new(&decoder, resolution)?,
612 }
613 }
614 Some(resolution) => NullablePlan::FromSingle {
615 resolution: ResolutionPlan::try_new(&decoder, resolution)?,
616 },
617 };
618 Self::Nullable(
619 plan,
620 NullBufferBuilder::new(DEFAULT_CAPACITY),
621 Box::new(decoder),
622 )
623 }
624 None => decoder,
625 })
626 }
627
628 fn append_null(&mut self) -> Result<(), AvroError> {
630 match self {
631 Self::Null(count) => *count += 1,
632 Self::Boolean(b) => b.append(false),
633 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
634 Self::Int64(v)
635 | Self::Int32ToInt64(v)
636 | Self::TimeMicros(v)
637 | Self::TimestampMillis(_, v)
638 | Self::TimestampMicros(_, v)
639 | Self::TimestampNanos(_, v) => v.push(0),
640 #[cfg(feature = "avro_custom_types")]
641 Self::DurationSecond(v)
642 | Self::DurationMillisecond(v)
643 | Self::DurationMicrosecond(v)
644 | Self::DurationNanosecond(v) => v.push(0),
645 #[cfg(feature = "avro_custom_types")]
646 Self::Int8(v) => v.push(0),
647 #[cfg(feature = "avro_custom_types")]
648 Self::Int16(v) => v.push(0),
649 #[cfg(feature = "avro_custom_types")]
650 Self::UInt8(v) => v.push(0),
651 #[cfg(feature = "avro_custom_types")]
652 Self::UInt16(v) => v.push(0),
653 #[cfg(feature = "avro_custom_types")]
654 Self::UInt32(v) => v.push(0),
655 #[cfg(feature = "avro_custom_types")]
656 Self::UInt64(v) => v.push(0),
657 #[cfg(feature = "avro_custom_types")]
658 Self::Float16(v) => v.push(0),
659 #[cfg(feature = "avro_custom_types")]
660 Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => v.push(0),
661 #[cfg(feature = "avro_custom_types")]
662 Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
663 #[cfg(feature = "avro_custom_types")]
664 Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
665 #[cfg(feature = "avro_custom_types")]
666 Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
667 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
668 Self::Float64(v)
669 | Self::Int32ToFloat64(v)
670 | Self::Int64ToFloat64(v)
671 | Self::Float32ToFloat64(v) => v.push(0.),
672 Self::Binary(offsets, _)
673 | Self::String(offsets, _)
674 | Self::StringView(offsets, _)
675 | Self::BytesToString(offsets, _)
676 | Self::StringToBytes(offsets, _) => {
677 offsets.push_length(0);
678 }
679 Self::Uuid(v) => {
680 v.extend([0; 16]);
681 }
682 Self::Array(_, offsets, _) => {
683 offsets.push_length(0);
684 }
685 Self::Record(_, e, _, _) => {
686 for encoding in e.iter_mut() {
687 encoding.append_null()?;
688 }
689 }
690 Self::Map(_, _koff, moff, _, _) => {
691 moff.push_length(0);
692 }
693 Self::Fixed(sz, accum) => {
694 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
695 }
696 #[cfg(feature = "small_decimals")]
697 Self::Decimal32(_, _, _, builder) => builder.append_value(0),
698 #[cfg(feature = "small_decimals")]
699 Self::Decimal64(_, _, _, builder) => builder.append_value(0),
700 Self::Decimal128(_, _, _, builder) => builder.append_value(0),
701 Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
702 Self::Enum(indices, _, _) => indices.push(0),
703 Self::Duration(builder) => builder.append_null(),
704 #[cfg(feature = "avro_custom_types")]
705 Self::RunEndEncoded(_, len, inner) => {
706 *len += 1;
707 inner.append_null()?;
708 }
709 Self::Union(u) => u.append_null()?,
710 Self::Nullable(_, null_buffer, inner) => {
711 null_buffer.append(false);
712 inner.append_null()?;
713 }
714 }
715 Ok(())
716 }
717
718 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
720 match self {
721 Self::Nullable(_, nb, inner) => {
722 if matches!(lit, AvroLiteral::Null) {
723 nb.append(false);
724 inner.append_null()
725 } else {
726 nb.append(true);
727 inner.append_default(lit)
728 }
729 }
730 Self::Null(count) => match lit {
731 AvroLiteral::Null => {
732 *count += 1;
733 Ok(())
734 }
735 _ => Err(AvroError::InvalidArgument(
736 "Non-null default for null type".to_string(),
737 )),
738 },
739 Self::Boolean(b) => match lit {
740 AvroLiteral::Boolean(v) => {
741 b.append(*v);
742 Ok(())
743 }
744 _ => Err(AvroError::InvalidArgument(
745 "Default for boolean must be boolean".to_string(),
746 )),
747 },
748 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
749 AvroLiteral::Int(i) => {
750 v.push(*i);
751 Ok(())
752 }
753 _ => Err(AvroError::InvalidArgument(
754 "Default for int32/date32/time-millis must be int".to_string(),
755 )),
756 },
757 #[cfg(feature = "avro_custom_types")]
758 Self::DurationSecond(v)
759 | Self::DurationMillisecond(v)
760 | Self::DurationMicrosecond(v)
761 | Self::DurationNanosecond(v) => match lit {
762 AvroLiteral::Long(i) => {
763 v.push(*i);
764 Ok(())
765 }
766 _ => Err(AvroError::InvalidArgument(
767 "Default for duration long must be long".to_string(),
768 )),
769 },
770 #[cfg(feature = "avro_custom_types")]
771 Self::Int8(v) => match lit {
772 AvroLiteral::Int(i) => {
773 let x = i8::try_from(*i).map_err(|_| {
774 AvroError::InvalidArgument(format!(
775 "Default for int8 out of range for i8: {i}"
776 ))
777 })?;
778 v.push(x);
779 Ok(())
780 }
781 _ => Err(AvroError::InvalidArgument(
782 "Default for int8 must be int".to_string(),
783 )),
784 },
785 #[cfg(feature = "avro_custom_types")]
786 Self::Int16(v) => match lit {
787 AvroLiteral::Int(i) => {
788 let x = i16::try_from(*i).map_err(|_| {
789 AvroError::InvalidArgument(format!(
790 "Default for int16 out of range for i16: {i}"
791 ))
792 })?;
793 v.push(x);
794 Ok(())
795 }
796 _ => Err(AvroError::InvalidArgument(
797 "Default for int16 must be int".to_string(),
798 )),
799 },
800 #[cfg(feature = "avro_custom_types")]
801 Self::UInt8(v) => match lit {
802 AvroLiteral::Int(i) => {
803 let x = u8::try_from(*i).map_err(|_| {
804 AvroError::InvalidArgument(format!(
805 "Default for uint8 out of range for u8: {i}"
806 ))
807 })?;
808 v.push(x);
809 Ok(())
810 }
811 _ => Err(AvroError::InvalidArgument(
812 "Default for uint8 must be int".to_string(),
813 )),
814 },
815 #[cfg(feature = "avro_custom_types")]
816 Self::UInt16(v) => match lit {
817 AvroLiteral::Int(i) => {
818 let x = u16::try_from(*i).map_err(|_| {
819 AvroError::InvalidArgument(format!(
820 "Default for uint16 out of range for u16: {i}"
821 ))
822 })?;
823 v.push(x);
824 Ok(())
825 }
826 _ => Err(AvroError::InvalidArgument(
827 "Default for uint16 must be int".to_string(),
828 )),
829 },
830 #[cfg(feature = "avro_custom_types")]
831 Self::UInt32(v) => match lit {
832 AvroLiteral::Long(i) => {
833 let x = u32::try_from(*i).map_err(|_| {
834 AvroError::InvalidArgument(format!(
835 "Default for uint32 out of range for u32: {i}"
836 ))
837 })?;
838 v.push(x);
839 Ok(())
840 }
841 _ => Err(AvroError::InvalidArgument(
842 "Default for uint32 must be long".to_string(),
843 )),
844 },
845 #[cfg(feature = "avro_custom_types")]
846 Self::UInt64(v) => match lit {
847 AvroLiteral::Bytes(b) => {
848 if b.len() != 8 {
849 return Err(AvroError::InvalidArgument(format!(
850 "uint64 default must be exactly 8 bytes, got {}",
851 b.len()
852 )));
853 }
854 v.push(u64::from_le_bytes([
855 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
856 ]));
857 Ok(())
858 }
859 _ => Err(AvroError::InvalidArgument(
860 "Default for uint64 must be bytes (8-byte LE)".to_string(),
861 )),
862 },
863 #[cfg(feature = "avro_custom_types")]
864 Self::Float16(v) => match lit {
865 AvroLiteral::Bytes(b) => {
866 if b.len() != 2 {
867 return Err(AvroError::InvalidArgument(format!(
868 "float16 default must be exactly 2 bytes, got {}",
869 b.len()
870 )));
871 }
872 v.push(u16::from_le_bytes([b[0], b[1]]));
873 Ok(())
874 }
875 _ => Err(AvroError::InvalidArgument(
876 "Default for float16 must be bytes (2-byte LE IEEE-754)".to_string(),
877 )),
878 },
879 #[cfg(feature = "avro_custom_types")]
880 Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => match lit {
881 AvroLiteral::Long(i) => {
882 v.push(*i);
883 Ok(())
884 }
885 _ => Err(AvroError::InvalidArgument(
886 "Default for date64/time-nanos/timestamp-secs must be long".to_string(),
887 )),
888 },
889 #[cfg(feature = "avro_custom_types")]
890 Self::Time32Secs(v) => match lit {
891 AvroLiteral::Int(i) => {
892 v.push(*i);
893 Ok(())
894 }
895 _ => Err(AvroError::InvalidArgument(
896 "Default for time32-secs must be int".to_string(),
897 )),
898 },
899 #[cfg(feature = "avro_custom_types")]
900 Self::IntervalYearMonth(v) => match lit {
901 AvroLiteral::Bytes(b) => {
902 if b.len() != 4 {
903 return Err(AvroError::InvalidArgument(format!(
904 "interval-year-month default must be exactly 4 bytes, got {}",
905 b.len()
906 )));
907 }
908 v.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
909 Ok(())
910 }
911 _ => Err(AvroError::InvalidArgument(
912 "Default for interval-year-month must be bytes (4-byte LE)".to_string(),
913 )),
914 },
915 #[cfg(feature = "avro_custom_types")]
916 Self::IntervalMonthDayNano(v) => match lit {
917 AvroLiteral::Bytes(b) => {
918 if b.len() != 16 {
919 return Err(AvroError::InvalidArgument(format!(
920 "interval-month-day-nano default must be exactly 16 bytes, got {}",
921 b.len()
922 )));
923 }
924 let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
925 let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
926 let nanos =
927 i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
928 v.push(IntervalMonthDayNano::new(months, days, nanos));
929 Ok(())
930 }
931 _ => Err(AvroError::InvalidArgument(
932 "Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
933 )),
934 },
935 #[cfg(feature = "avro_custom_types")]
936 Self::IntervalDayTime(v) => match lit {
937 AvroLiteral::Bytes(b) => {
938 if b.len() != 8 {
939 return Err(AvroError::InvalidArgument(format!(
940 "interval-day-time default must be exactly 8 bytes, got {}",
941 b.len()
942 )));
943 }
944 let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
945 let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
946 v.push(IntervalDayTime::new(days, milliseconds));
947 Ok(())
948 }
949 _ => Err(AvroError::InvalidArgument(
950 "Default for interval-day-time must be bytes (8-byte LE)".to_string(),
951 )),
952 },
953 Self::Int64(v)
954 | Self::Int32ToInt64(v)
955 | Self::TimeMicros(v)
956 | Self::TimestampMillis(_, v)
957 | Self::TimestampMicros(_, v)
958 | Self::TimestampNanos(_, v) => match lit {
959 AvroLiteral::Long(i) => {
960 v.push(*i);
961 Ok(())
962 }
963 AvroLiteral::Int(i) => {
964 v.push(*i as i64);
965 Ok(())
966 }
967 _ => Err(AvroError::InvalidArgument(
968 "Default for long/time-micros/timestamp must be long or int".to_string(),
969 )),
970 },
971 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
972 AvroLiteral::Float(f) => {
973 v.push(*f);
974 Ok(())
975 }
976 _ => Err(AvroError::InvalidArgument(
977 "Default for float must be float".to_string(),
978 )),
979 },
980 Self::Float64(v)
981 | Self::Int32ToFloat64(v)
982 | Self::Int64ToFloat64(v)
983 | Self::Float32ToFloat64(v) => match lit {
984 AvroLiteral::Double(f) => {
985 v.push(*f);
986 Ok(())
987 }
988 _ => Err(AvroError::InvalidArgument(
989 "Default for double must be double".to_string(),
990 )),
991 },
992 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
993 AvroLiteral::Bytes(b) => {
994 offsets.push_length(b.len());
995 values.extend_from_slice(b);
996 Ok(())
997 }
998 _ => Err(AvroError::InvalidArgument(
999 "Default for bytes must be bytes".to_string(),
1000 )),
1001 },
1002 Self::BytesToString(offsets, values)
1003 | Self::String(offsets, values)
1004 | Self::StringView(offsets, values) => match lit {
1005 AvroLiteral::String(s) => {
1006 let b = s.as_bytes();
1007 offsets.push_length(b.len());
1008 values.extend_from_slice(b);
1009 Ok(())
1010 }
1011 _ => Err(AvroError::InvalidArgument(
1012 "Default for string must be string".to_string(),
1013 )),
1014 },
1015 Self::Uuid(values) => match lit {
1016 AvroLiteral::String(s) => {
1017 let uuid = Uuid::try_parse(s).map_err(|e| {
1018 AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
1019 })?;
1020 values.extend_from_slice(uuid.as_bytes());
1021 Ok(())
1022 }
1023 _ => Err(AvroError::InvalidArgument(
1024 "Default for uuid must be string".to_string(),
1025 )),
1026 },
1027 Self::Fixed(sz, accum) => match lit {
1028 AvroLiteral::Bytes(b) => {
1029 if b.len() != *sz as usize {
1030 return Err(AvroError::InvalidArgument(format!(
1031 "Fixed default length {} does not match size {sz}",
1032 b.len(),
1033 )));
1034 }
1035 accum.extend_from_slice(b);
1036 Ok(())
1037 }
1038 _ => Err(AvroError::InvalidArgument(
1039 "Default for fixed must be bytes".to_string(),
1040 )),
1041 },
1042 #[cfg(feature = "small_decimals")]
1043 Self::Decimal32(_, _, _, builder) => {
1044 append_decimal_default!(lit, builder, 4, i32, "decimal32")
1045 }
1046 #[cfg(feature = "small_decimals")]
1047 Self::Decimal64(_, _, _, builder) => {
1048 append_decimal_default!(lit, builder, 8, i64, "decimal64")
1049 }
1050 Self::Decimal128(_, _, _, builder) => {
1051 append_decimal_default!(lit, builder, 16, i128, "decimal128")
1052 }
1053 Self::Decimal256(_, _, _, builder) => {
1054 append_decimal_default!(lit, builder, 32, i256, "decimal256")
1055 }
1056 Self::Duration(builder) => match lit {
1057 AvroLiteral::Bytes(b) => {
1058 if b.len() != 12 {
1059 return Err(AvroError::InvalidArgument(format!(
1060 "Duration default must be exactly 12 bytes, got {}",
1061 b.len()
1062 )));
1063 }
1064 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1065 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1066 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1067 let nanos = (millis as i64) * 1_000_000;
1068 builder.append_value(IntervalMonthDayNano::new(
1069 months as i32,
1070 days as i32,
1071 nanos,
1072 ));
1073 Ok(())
1074 }
1075 _ => Err(AvroError::InvalidArgument(
1076 "Default for duration must be 12-byte little-endian months/days/millis"
1077 .to_string(),
1078 )),
1079 },
1080 Self::Array(_, offsets, inner) => match lit {
1081 AvroLiteral::Array(items) => {
1082 offsets.push_length(items.len());
1083 for item in items {
1084 inner.append_default(item)?;
1085 }
1086 Ok(())
1087 }
1088 _ => Err(AvroError::InvalidArgument(
1089 "Default for array must be an array literal".to_string(),
1090 )),
1091 },
1092 Self::Map(_, koff, moff, kdata, valdec) => match lit {
1093 AvroLiteral::Map(entries) => {
1094 moff.push_length(entries.len());
1095 for (k, v) in entries {
1096 let kb = k.as_bytes();
1097 koff.push_length(kb.len());
1098 kdata.extend_from_slice(kb);
1099 valdec.append_default(v)?;
1100 }
1101 Ok(())
1102 }
1103 _ => Err(AvroError::InvalidArgument(
1104 "Default for map must be a map/object literal".to_string(),
1105 )),
1106 },
1107 Self::Enum(indices, symbols, _) => match lit {
1108 AvroLiteral::Enum(sym) => {
1109 let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
1110 AvroError::InvalidArgument(format!(
1111 "Enum default symbol {sym:?} not in reader symbols"
1112 ))
1113 })?;
1114 indices.push(pos as i32);
1115 Ok(())
1116 }
1117 _ => Err(AvroError::InvalidArgument(
1118 "Default for enum must be a symbol".to_string(),
1119 )),
1120 },
1121 #[cfg(feature = "avro_custom_types")]
1122 Self::RunEndEncoded(_, len, inner) => {
1123 *len += 1;
1124 inner.append_default(lit)
1125 }
1126 Self::Union(u) => u.append_default(lit),
1127 Self::Record(field_meta, decoders, field_defaults, _) => match lit {
1128 AvroLiteral::Map(entries) => {
1129 for (i, dec) in decoders.iter_mut().enumerate() {
1130 let name = field_meta[i].name();
1131 if let Some(sub) = entries.get(name) {
1132 dec.append_default(sub)?;
1133 } else if let Some(default_literal) = field_defaults[i].as_ref() {
1134 dec.append_default(default_literal)?;
1135 } else {
1136 dec.append_null()?;
1137 }
1138 }
1139 Ok(())
1140 }
1141 AvroLiteral::Null => {
1142 for (i, dec) in decoders.iter_mut().enumerate() {
1143 if let Some(default_literal) = field_defaults[i].as_ref() {
1144 dec.append_default(default_literal)?;
1145 } else {
1146 dec.append_null()?;
1147 }
1148 }
1149 Ok(())
1150 }
1151 _ => Err(AvroError::InvalidArgument(
1152 "Default for record must be a map/object or null".to_string(),
1153 )),
1154 },
1155 }
1156 }
1157
1158 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1160 match self {
1161 Self::Null(x) => *x += 1,
1162 Self::Boolean(values) => values.append(buf.get_bool()?),
1163 Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
1164 values.push(buf.get_int()?)
1165 }
1166 Self::Int64(values)
1167 | Self::TimeMicros(values)
1168 | Self::TimestampMillis(_, values)
1169 | Self::TimestampMicros(_, values)
1170 | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
1171 #[cfg(feature = "avro_custom_types")]
1172 Self::DurationSecond(values)
1173 | Self::DurationMillisecond(values)
1174 | Self::DurationMicrosecond(values)
1175 | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
1176 #[cfg(feature = "avro_custom_types")]
1177 Self::Int8(values) => {
1178 let raw = buf.get_int()?;
1179 let x = i8::try_from(raw).map_err(|_| {
1180 AvroError::ParseError(format!("int8 value {raw} out of range for i8"))
1181 })?;
1182 values.push(x);
1183 }
1184 #[cfg(feature = "avro_custom_types")]
1185 Self::Int16(values) => {
1186 let raw = buf.get_int()?;
1187 let x = i16::try_from(raw).map_err(|_| {
1188 AvroError::ParseError(format!("int16 value {raw} out of range for i16"))
1189 })?;
1190 values.push(x);
1191 }
1192 #[cfg(feature = "avro_custom_types")]
1193 Self::UInt8(values) => {
1194 let raw = buf.get_int()?;
1195 let x = u8::try_from(raw).map_err(|_| {
1196 AvroError::ParseError(format!("uint8 value {raw} out of range for u8"))
1197 })?;
1198 values.push(x);
1199 }
1200 #[cfg(feature = "avro_custom_types")]
1201 Self::UInt16(values) => {
1202 let raw = buf.get_int()?;
1203 let x = u16::try_from(raw).map_err(|_| {
1204 AvroError::ParseError(format!("uint16 value {raw} out of range for u16"))
1205 })?;
1206 values.push(x);
1207 }
1208 #[cfg(feature = "avro_custom_types")]
1209 Self::UInt32(values) => {
1210 let raw = buf.get_long()?;
1211 let x = u32::try_from(raw).map_err(|_| {
1212 AvroError::ParseError(format!("uint32 value {raw} out of range for u32"))
1213 })?;
1214 values.push(x);
1215 }
1216 #[cfg(feature = "avro_custom_types")]
1217 Self::UInt64(values) => {
1218 let b = buf.get_fixed(8)?;
1219 values.push(u64::from_le_bytes([
1220 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
1221 ]));
1222 }
1223 #[cfg(feature = "avro_custom_types")]
1224 Self::Float16(values) => {
1225 let b = buf.get_fixed(2)?;
1226 values.push(u16::from_le_bytes([b[0], b[1]]));
1227 }
1228 #[cfg(feature = "avro_custom_types")]
1229 Self::Date64(values) | Self::TimeNanos(values) | Self::TimestampSecs(_, values) => {
1230 values.push(buf.get_long()?)
1231 }
1232 #[cfg(feature = "avro_custom_types")]
1233 Self::Time32Secs(values) => values.push(buf.get_int()?),
1234 #[cfg(feature = "avro_custom_types")]
1235 Self::IntervalYearMonth(values) => {
1236 let b = buf.get_fixed(4)?;
1237 values.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
1238 }
1239 #[cfg(feature = "avro_custom_types")]
1240 Self::IntervalMonthDayNano(values) => {
1241 let b = buf.get_fixed(16)?;
1242 let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1243 let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1244 let nanos =
1245 i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
1246 values.push(IntervalMonthDayNano::new(months, days, nanos));
1247 }
1248 #[cfg(feature = "avro_custom_types")]
1249 Self::IntervalDayTime(values) => {
1250 let b = buf.get_fixed(8)?;
1251 let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1253 let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1254 values.push(IntervalDayTime::new(days, milliseconds));
1255 }
1256 Self::Float32(values) => values.push(buf.get_float()?),
1257 Self::Float64(values) => values.push(buf.get_double()?),
1258 Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
1259 Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
1260 Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
1261 Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
1262 Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
1263 Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
1264 Self::StringToBytes(offsets, values)
1265 | Self::BytesToString(offsets, values)
1266 | Self::Binary(offsets, values)
1267 | Self::String(offsets, values)
1268 | Self::StringView(offsets, values) => {
1269 let data = buf.get_bytes()?;
1270 offsets.push_length(data.len());
1271 values.extend_from_slice(data);
1272 }
1273 Self::Uuid(values) => {
1274 let s_bytes = buf.get_bytes()?;
1275 let s = std::str::from_utf8(s_bytes).map_err(|e| {
1276 AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
1277 })?;
1278 let uuid = Uuid::try_parse(s)
1279 .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
1280 values.extend_from_slice(uuid.as_bytes());
1281 }
1282 Self::Array(_, off, encoding) => {
1283 let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
1284 off.push_length(total_items);
1285 }
1286 Self::Record(_, encodings, _, None) => {
1287 for encoding in encodings {
1288 encoding.decode(buf)?;
1289 }
1290 }
1291 Self::Record(_, encodings, _, Some(proj)) => {
1292 proj.project_record(buf, encodings)?;
1293 }
1294 Self::Map(_, koff, moff, kdata, valdec) => {
1295 let newly_added = read_blocks(buf, |cur| {
1296 let kb = cur.get_bytes()?;
1297 koff.push_length(kb.len());
1298 kdata.extend_from_slice(kb);
1299 valdec.decode(cur)
1300 })?;
1301 moff.push_length(newly_added);
1302 }
1303 Self::Fixed(sz, accum) => {
1304 let fx = buf.get_fixed(*sz as usize)?;
1305 accum.extend_from_slice(fx);
1306 }
1307 #[cfg(feature = "small_decimals")]
1308 Self::Decimal32(_, _, size, builder) => {
1309 decode_decimal!(size, buf, builder, 4, i32);
1310 }
1311 #[cfg(feature = "small_decimals")]
1312 Self::Decimal64(_, _, size, builder) => {
1313 decode_decimal!(size, buf, builder, 8, i64);
1314 }
1315 Self::Decimal128(_, _, size, builder) => {
1316 decode_decimal!(size, buf, builder, 16, i128);
1317 }
1318 Self::Decimal256(_, _, size, builder) => {
1319 decode_decimal!(size, buf, builder, 32, i256);
1320 }
1321 Self::Enum(indices, _, None) => {
1322 indices.push(buf.get_int()?);
1323 }
1324 Self::Enum(indices, _, Some(res)) => {
1325 let raw = buf.get_int()?;
1326 let resolved = res.resolve(raw)?;
1327 indices.push(resolved);
1328 }
1329 Self::Duration(builder) => {
1330 let b = buf.get_fixed(12)?;
1331 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1332 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1333 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1334 let nanos = (millis as i64) * 1_000_000;
1335 builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
1336 }
1337 #[cfg(feature = "avro_custom_types")]
1338 Self::RunEndEncoded(_, len, inner) => {
1339 *len += 1;
1340 inner.decode(buf)?;
1341 }
1342 Self::Union(u) => u.decode(buf)?,
1343 Self::Nullable(plan, nb, encoding) => {
1344 match plan {
1345 NullablePlan::FromSingle { resolution } => {
1346 encoding.decode_with_resolution(buf, resolution)?;
1347 nb.append(true);
1348 }
1349 NullablePlan::ReadTag {
1350 nullability,
1351 resolution,
1352 } => {
1353 let branch = buf.read_vlq()?;
1354 let is_not_null = match *nullability {
1355 Nullability::NullFirst => branch != 0,
1356 Nullability::NullSecond => branch == 0,
1357 };
1358 if is_not_null {
1359 encoding.decode_with_resolution(buf, resolution)?;
1361 } else {
1362 encoding.append_null()?;
1363 }
1364 nb.append(is_not_null);
1365 }
1366 }
1367 }
1368 }
1369 Ok(())
1370 }
1371
1372 fn decode_with_promotion(
1373 &mut self,
1374 buf: &mut AvroCursor<'_>,
1375 promotion: Promotion,
1376 ) -> Result<(), AvroError> {
1377 #[cfg(feature = "avro_custom_types")]
1378 if let Self::RunEndEncoded(_, len, inner) = self {
1379 *len += 1;
1380 return inner.decode_with_promotion(buf, promotion);
1381 }
1382
1383 macro_rules! promote_numeric_to {
1384 ($variant:ident, $getter:ident, $to:ty) => {{
1385 match self {
1386 Self::$variant(v) => {
1387 let x = buf.$getter()?;
1388 v.push(x as $to);
1389 Ok(())
1390 }
1391 other => Err(AvroError::ParseError(format!(
1392 "Promotion {promotion} target mismatch: expected {}, got {}",
1393 stringify!($variant),
1394 <Self as ::std::convert::AsRef<str>>::as_ref(other)
1395 ))),
1396 }
1397 }};
1398 }
1399 match promotion {
1400 Promotion::Direct => self.decode(buf),
1401 Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1402 Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1403 Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1404 Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1405 Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1406 Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1407 Promotion::StringToBytes => match self {
1408 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1409 let data = buf.get_bytes()?;
1410 offsets.push_length(data.len());
1411 values.extend_from_slice(data);
1412 Ok(())
1413 }
1414 other => Err(AvroError::ParseError(format!(
1415 "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1416 <Self as AsRef<str>>::as_ref(other)
1417 ))),
1418 },
1419 Promotion::BytesToString => match self {
1420 Self::String(offsets, values)
1421 | Self::StringView(offsets, values)
1422 | Self::BytesToString(offsets, values) => {
1423 let data = buf.get_bytes()?;
1424 offsets.push_length(data.len());
1425 values.extend_from_slice(data);
1426 Ok(())
1427 }
1428 other => Err(AvroError::ParseError(format!(
1429 "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1430 <Self as AsRef<str>>::as_ref(other)
1431 ))),
1432 },
1433 }
1434 }
1435
1436 fn decode_with_resolution<'d>(
1437 &'d mut self,
1438 buf: &mut AvroCursor<'_>,
1439 resolution: &'d ResolutionPlan,
1440 ) -> Result<(), AvroError> {
1441 #[cfg(feature = "avro_custom_types")]
1442 if let Self::RunEndEncoded(_, len, inner) = self {
1443 *len += 1;
1444 return inner.decode_with_resolution(buf, resolution);
1445 }
1446
1447 match resolution {
1448 ResolutionPlan::Promotion(promotion) => {
1449 let promotion = *promotion;
1450 self.decode_with_promotion(buf, promotion)
1451 }
1452 ResolutionPlan::DefaultValue(lit) => self.append_default(lit),
1453 ResolutionPlan::EnumMapping(res) => {
1454 let Self::Enum(indices, _, _) = self else {
1455 return Err(AvroError::SchemaError(
1456 "enum mapping resolution provided for non-enum decoder".into(),
1457 ));
1458 };
1459 let raw = buf.get_int()?;
1460 let resolved = res.resolve(raw)?;
1461 indices.push(resolved);
1462 Ok(())
1463 }
1464 ResolutionPlan::Record(proj) => {
1465 let Self::Record(_, encodings, _, _) = self else {
1466 return Err(AvroError::SchemaError(
1467 "record projection provided for non-record decoder".into(),
1468 ));
1469 };
1470 proj.project_record(buf, encodings)
1471 }
1472 }
1473 }
1474
1475 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1477 Ok(match self {
1478 Self::Nullable(_, n, e) => e.flush(n.finish())?,
1479 Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1480 Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1481 Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1482 Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1483 Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1484 Self::TimeMillis(values) => {
1485 Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1486 }
1487 Self::TimeMicros(values) => {
1488 Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1489 }
1490 Self::TimestampMillis(tz, values) => Arc::new(
1491 flush_primitive::<TimestampMillisecondType>(values, nulls)
1492 .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1493 ),
1494 Self::TimestampMicros(tz, values) => Arc::new(
1495 flush_primitive::<TimestampMicrosecondType>(values, nulls)
1496 .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1497 ),
1498 Self::TimestampNanos(tz, values) => Arc::new(
1499 flush_primitive::<TimestampNanosecondType>(values, nulls)
1500 .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1501 ),
1502 #[cfg(feature = "avro_custom_types")]
1503 Self::DurationSecond(values) => {
1504 Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1505 }
1506 #[cfg(feature = "avro_custom_types")]
1507 Self::DurationMillisecond(values) => {
1508 Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1509 }
1510 #[cfg(feature = "avro_custom_types")]
1511 Self::DurationMicrosecond(values) => {
1512 Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1513 }
1514 #[cfg(feature = "avro_custom_types")]
1515 Self::DurationNanosecond(values) => {
1516 Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1517 }
1518 #[cfg(feature = "avro_custom_types")]
1519 Self::Int8(values) => Arc::new(flush_primitive::<Int8Type>(values, nulls)),
1520 #[cfg(feature = "avro_custom_types")]
1521 Self::Int16(values) => Arc::new(flush_primitive::<Int16Type>(values, nulls)),
1522 #[cfg(feature = "avro_custom_types")]
1523 Self::UInt8(values) => Arc::new(flush_primitive::<UInt8Type>(values, nulls)),
1524 #[cfg(feature = "avro_custom_types")]
1525 Self::UInt16(values) => Arc::new(flush_primitive::<UInt16Type>(values, nulls)),
1526 #[cfg(feature = "avro_custom_types")]
1527 Self::UInt32(values) => Arc::new(flush_primitive::<UInt32Type>(values, nulls)),
1528 #[cfg(feature = "avro_custom_types")]
1529 Self::UInt64(values) => Arc::new(flush_primitive::<UInt64Type>(values, nulls)),
1530 #[cfg(feature = "avro_custom_types")]
1531 Self::Float16(values) => {
1532 let len = values.len();
1535 let buf: Buffer = std::mem::take(values).into();
1536 let scalar_buf = ScalarBuffer::new(buf, 0, len);
1537 Arc::new(Float16Array::new(scalar_buf, nulls))
1538 }
1539 #[cfg(feature = "avro_custom_types")]
1540 Self::Date64(values) => Arc::new(flush_primitive::<Date64Type>(values, nulls)),
1541 #[cfg(feature = "avro_custom_types")]
1542 Self::TimeNanos(values) => {
1543 Arc::new(flush_primitive::<Time64NanosecondType>(values, nulls))
1544 }
1545 #[cfg(feature = "avro_custom_types")]
1546 Self::Time32Secs(values) => {
1547 Arc::new(flush_primitive::<Time32SecondType>(values, nulls))
1548 }
1549 #[cfg(feature = "avro_custom_types")]
1550 Self::TimestampSecs(is_utc, values) => Arc::new(
1551 flush_primitive::<TimestampSecondType>(values, nulls)
1552 .with_timezone_opt(is_utc.then(|| "+00:00")),
1553 ),
1554 #[cfg(feature = "avro_custom_types")]
1555 Self::IntervalYearMonth(values) => {
1556 Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
1557 }
1558 #[cfg(feature = "avro_custom_types")]
1559 Self::IntervalMonthDayNano(values) => {
1560 Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1561 }
1562 #[cfg(feature = "avro_custom_types")]
1563 Self::IntervalDayTime(values) => {
1564 Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
1565 }
1566 Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1567 Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1568 Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1569 Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1570 Arc::new(flush_primitive::<Float32Type>(values, nulls))
1571 }
1572 Self::Int32ToFloat64(values)
1573 | Self::Int64ToFloat64(values)
1574 | Self::Float32ToFloat64(values) => {
1575 Arc::new(flush_primitive::<Float64Type>(values, nulls))
1576 }
1577 Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1578 let offsets = flush_offsets(offsets);
1579 let values = flush_values(values).into();
1580 Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1581 }
1582 Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1583 let offsets = flush_offsets(offsets);
1584 let values = flush_values(values).into();
1585 Arc::new(StringArray::try_new(offsets, values, nulls)?)
1586 }
1587 Self::StringView(offsets, values) => {
1588 let offsets = flush_offsets(offsets);
1589 let values = flush_values(values);
1590 let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1591 let values: Vec<&str> = (0..array.len())
1592 .map(|i| {
1593 if array.is_valid(i) {
1594 array.value(i)
1595 } else {
1596 ""
1597 }
1598 })
1599 .collect();
1600 Arc::new(StringViewArray::from(values))
1601 }
1602 Self::Array(field, offsets, values) => {
1603 let values = values.flush(None)?;
1604 let offsets = flush_offsets(offsets);
1605 Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1606 }
1607 Self::Record(fields, encodings, _, _) => {
1608 let arrays = encodings
1609 .iter_mut()
1610 .map(|x| x.flush(None))
1611 .collect::<Result<Vec<_>, _>>()?;
1612 Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1613 }
1614 Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1615 let moff = flush_offsets(m_off);
1616 let koff = flush_offsets(k_off);
1617 let kd = flush_values(kdata).into();
1618 let val_arr = valdec.flush(None)?;
1619 let key_arr = StringArray::try_new(koff, kd, None)?;
1620 if key_arr.len() != val_arr.len() {
1621 return Err(AvroError::InvalidArgument(format!(
1622 "Map keys length ({}) != map values length ({})",
1623 key_arr.len(),
1624 val_arr.len()
1625 )));
1626 }
1627 let final_len = moff.len() - 1;
1628 if let Some(n) = &nulls {
1629 if n.len() != final_len {
1630 return Err(AvroError::InvalidArgument(format!(
1631 "Map array null buffer length {} != final map length {final_len}",
1632 n.len()
1633 )));
1634 }
1635 }
1636 let entries_fields = match map_field.data_type() {
1637 DataType::Struct(fields) => fields.clone(),
1638 other => {
1639 return Err(AvroError::InvalidArgument(format!(
1640 "Map entries field must be a Struct, got {other:?}"
1641 )));
1642 }
1643 };
1644 let entries_struct =
1645 StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1646 let map_arr =
1647 MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1648 Arc::new(map_arr)
1649 }
1650 Self::Fixed(sz, accum) => {
1651 let b: Buffer = flush_values(accum).into();
1652 let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1653 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1654 Arc::new(arr)
1655 }
1656 Self::Uuid(values) => {
1657 let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1658 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1659 Arc::new(arr)
1660 }
1661 #[cfg(feature = "small_decimals")]
1662 Self::Decimal32(precision, scale, _, builder) => {
1663 flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1664 }
1665 #[cfg(feature = "small_decimals")]
1666 Self::Decimal64(precision, scale, _, builder) => {
1667 flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1668 }
1669 Self::Decimal128(precision, scale, _, builder) => {
1670 flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1671 }
1672 Self::Decimal256(precision, scale, _, builder) => {
1673 flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1674 }
1675 Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1676 Self::Duration(builder) => {
1677 let (_, vals, _) = builder.finish().into_parts();
1678 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1679 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1680 Arc::new(vals)
1681 }
1682 #[cfg(feature = "avro_custom_types")]
1683 Self::RunEndEncoded(width, len, inner) => {
1684 let values = inner.flush(nulls)?;
1685 let n = *len;
1686 let arr = values.as_ref();
1687 let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1688 if n > 0 {
1689 run_starts.push(0);
1690 for i in 1..n {
1691 if !values_equal_at(arr, i - 1, i) {
1692 run_starts.push(i);
1693 }
1694 }
1695 }
1696 if n > (u32::MAX as usize) {
1697 return Err(AvroError::InvalidArgument(format!(
1698 "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1699 )));
1700 }
1701 let run_count = run_starts.len();
1702 let take_idx: PrimitiveArray<UInt32Type> =
1703 run_starts.iter().map(|&s| s as u32).collect();
1704 let per_run_values = if run_count == 0 {
1705 values.slice(0, 0)
1706 } else {
1707 take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1708 AvroError::ParseError(format!("take() for REE values failed: {e}"))
1709 })?
1710 };
1711
1712 macro_rules! build_run_array {
1713 ($Native:ty, $ArrowTy:ty) => {{
1714 let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1715 for (idx, &_start) in run_starts.iter().enumerate() {
1716 let end = if idx + 1 < run_count {
1717 run_starts[idx + 1]
1718 } else {
1719 n
1720 };
1721 ends.push(end as $Native);
1722 }
1723 let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1724 let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1725 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1726 Arc::new(run_arr) as ArrayRef
1727 }};
1728 }
1729 match *width {
1730 2 => {
1731 if n > i16::MAX as usize {
1732 return Err(AvroError::InvalidArgument(format!(
1733 "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1734 )));
1735 }
1736 build_run_array!(i16, Int16Type)
1737 }
1738 4 => build_run_array!(i32, Int32Type),
1739 8 => build_run_array!(i64, Int64Type),
1740 other => {
1741 return Err(AvroError::InvalidArgument(format!(
1742 "Unsupported run-end width {other} for RunEndEncoded"
1743 )));
1744 }
1745 }
1746 }
1747 Self::Union(u) => u.flush(nulls)?,
1748 })
1749 }
1750}
1751
1752#[derive(Debug)]
1754enum NullablePlan {
1755 ReadTag {
1757 nullability: Nullability,
1758 resolution: ResolutionPlan,
1759 },
1760 FromSingle { resolution: ResolutionPlan },
1763}
1764
1765#[derive(Debug)]
1767enum ResolutionPlan {
1768 Promotion(Promotion),
1770 DefaultValue(AvroLiteral),
1772 EnumMapping(EnumResolution),
1774 Record(Projector),
1776}
1777
1778impl ResolutionPlan {
1779 fn try_new(decoder: &Decoder, resolution: &ResolutionInfo) -> Result<Self, AvroError> {
1780 match (decoder, resolution) {
1781 (_, ResolutionInfo::Promotion(p)) => Ok(ResolutionPlan::Promotion(*p)),
1782 (_, ResolutionInfo::DefaultValue(lit)) => Ok(ResolutionPlan::DefaultValue(lit.clone())),
1783 (_, ResolutionInfo::EnumMapping(m)) => {
1784 Ok(ResolutionPlan::EnumMapping(EnumResolution::new(m)))
1785 }
1786 (Decoder::Record(_, _, field_defaults, _), ResolutionInfo::Record(r)) => Ok(
1787 ResolutionPlan::Record(ProjectorBuilder::try_new(r, field_defaults).build()?),
1788 ),
1789 (_, ResolutionInfo::Record(_)) => Err(AvroError::SchemaError(
1790 "record resolution on non-record decoder".into(),
1791 )),
1792 (_, ResolutionInfo::Union(_)) => Err(AvroError::SchemaError(
1793 "union variant cannot be resolved to a union type".into(),
1794 )),
1795 }
1796 }
1797}
1798
1799#[derive(Debug)]
1800struct EnumResolution {
1801 mapping: Arc<[i32]>,
1802 default_index: i32,
1803}
1804
1805impl EnumResolution {
1806 fn new(mapping: &EnumMapping) -> Self {
1807 EnumResolution {
1808 mapping: mapping.mapping.clone(),
1809 default_index: mapping.default_index,
1810 }
1811 }
1812
1813 fn resolve(&self, index: i32) -> Result<i32, AvroError> {
1814 let resolved = usize::try_from(index)
1815 .ok()
1816 .and_then(|idx| self.mapping.get(idx).copied())
1817 .filter(|&idx| idx >= 0)
1818 .unwrap_or(self.default_index);
1819 if resolved >= 0 {
1820 Ok(resolved)
1821 } else {
1822 Err(AvroError::ParseError(format!(
1823 "Enum symbol index {index} not resolvable and no default provided",
1824 )))
1825 }
1826 }
1827}
1828
1829#[derive(Debug)]
1831struct DispatchLookupTable {
1832 to_reader: Box<[i8]>,
1848 resolution: Box<[ResolutionPlan]>,
1853}
1854
1855const NO_SOURCE: i8 = -1;
1858
1859impl DispatchLookupTable {
1860 fn from_writer_to_reader(
1861 reader_branches: &[Decoder],
1862 resolution_map: &[Option<(usize, ResolutionInfo)>],
1863 ) -> Result<Self, AvroError> {
1864 let mut to_reader = Vec::with_capacity(resolution_map.len());
1865 let mut resolution = Vec::with_capacity(resolution_map.len());
1866 for map in resolution_map {
1867 match map {
1868 Some((idx, res)) => {
1869 let idx = *idx;
1870 let idx_i8 = i8::try_from(idx).map_err(|_| {
1871 AvroError::SchemaError(format!(
1872 "Reader branch index {idx} exceeds i8 range (max {})",
1873 i8::MAX
1874 ))
1875 })?;
1876 let plan = ResolutionPlan::try_new(&reader_branches[idx], res)?;
1877 to_reader.push(idx_i8);
1878 resolution.push(plan);
1879 }
1880 None => {
1881 to_reader.push(NO_SOURCE);
1882 resolution.push(ResolutionPlan::DefaultValue(AvroLiteral::Null));
1883 }
1884 }
1885 }
1886 Ok(Self {
1887 to_reader: to_reader.into_boxed_slice(),
1888 resolution: resolution.into_boxed_slice(),
1889 })
1890 }
1891
1892 #[inline]
1894 fn resolve(&self, writer_index: usize) -> Option<(usize, &ResolutionPlan)> {
1895 let reader_index = *self.to_reader.get(writer_index)?;
1896 (reader_index >= 0).then(|| (reader_index as usize, &self.resolution[writer_index]))
1897 }
1898}
1899
1900#[derive(Debug)]
1901struct UnionDecoder {
1902 fields: UnionFields,
1903 branches: UnionDecoderBranches,
1904 default_emit_idx: usize,
1905 null_emit_idx: usize,
1906 plan: UnionReadPlan,
1907}
1908
1909#[derive(Debug, Default)]
1910struct UnionDecoderBranches {
1911 decoders: Vec<Decoder>,
1912 reader_type_codes: Vec<i8>,
1913 type_ids: Vec<i8>,
1914 offsets: Vec<i32>,
1915 counts: Vec<i32>,
1916}
1917
1918impl UnionDecoderBranches {
1919 fn new(decoders: Vec<Decoder>, reader_type_codes: Vec<i8>) -> Self {
1920 let branch_len = decoders.len().max(reader_type_codes.len());
1921 Self {
1922 decoders,
1923 reader_type_codes,
1924 type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1925 offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1926 counts: vec![0; branch_len],
1927 }
1928 }
1929
1930 fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1931 let branches_len = self.decoders.len();
1932 let Some(reader_branch) = self.decoders.get_mut(reader_idx) else {
1933 return Err(AvroError::ParseError(format!(
1934 "Union branch index {reader_idx} out of range ({branches_len} branches)"
1935 )));
1936 };
1937 self.type_ids.push(self.reader_type_codes[reader_idx]);
1938 self.offsets.push(self.counts[reader_idx]);
1939 self.counts[reader_idx] += 1;
1940 Ok(reader_branch)
1941 }
1942}
1943
1944impl Default for UnionDecoder {
1945 fn default() -> Self {
1946 Self {
1947 fields: UnionFields::empty(),
1948 branches: Default::default(),
1949 default_emit_idx: 0,
1950 null_emit_idx: 0,
1951 plan: UnionReadPlan::Passthrough,
1952 }
1953 }
1954}
1955
1956#[derive(Debug)]
1957enum UnionReadPlan {
1958 ReaderUnion {
1959 lookup_table: DispatchLookupTable,
1960 },
1961 FromSingle {
1962 reader_idx: usize,
1963 resolution: ResolutionPlan,
1964 },
1965 ToSingle {
1966 target: Box<Decoder>,
1967 lookup_table: DispatchLookupTable,
1968 },
1969 Passthrough,
1970}
1971
1972impl UnionReadPlan {
1973 fn from_resolved(
1974 reader_branches: &[Decoder],
1975 resolved: Option<ResolvedUnion>,
1976 ) -> Result<Self, AvroError> {
1977 let Some(info) = resolved else {
1978 return Ok(Self::Passthrough);
1979 };
1980 match (info.writer_is_union, info.reader_is_union) {
1981 (true, true) => {
1982 let lookup_table =
1983 DispatchLookupTable::from_writer_to_reader(reader_branches, &info.writer_to_reader)?;
1984 Ok(Self::ReaderUnion { lookup_table })
1985 }
1986 (false, true) => {
1987 let Some((idx, resolution)) =
1988 info.writer_to_reader.first().and_then(Option::as_ref)
1989 else {
1990 return Err(AvroError::SchemaError(
1991 "Writer type does not match any reader union branch".to_string(),
1992 ));
1993 };
1994 let reader_idx = *idx;
1995 Ok(Self::FromSingle {
1996 reader_idx,
1997 resolution: ResolutionPlan::try_new(&reader_branches[reader_idx], resolution)?,
1998 })
1999 }
2000 (true, false) => Err(AvroError::InvalidArgument(
2001 "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
2002 .to_string(),
2003 )),
2004 _ => Err(AvroError::SchemaError(
2006 "ResolvedUnion constructed for non-union sides; resolver should return None"
2007 .to_string(),
2008 )),
2009 }
2010 }
2011}
2012
2013impl UnionDecoder {
2014 fn try_new(
2015 fields: UnionFields,
2016 branches: Vec<Decoder>,
2017 resolved: Option<ResolvedUnion>,
2018 ) -> Result<Self, AvroError> {
2019 let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
2020 let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
2021 let default_emit_idx = 0;
2022 let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
2023 let max_addr = (i32::MAX as usize) + 1;
2025 if branches.len() > max_addr {
2026 return Err(AvroError::SchemaError(format!(
2027 "Reader union has {} branches, which exceeds the maximum addressable \
2028 branches by an Avro int tag ({} + 1).",
2029 branches.len(),
2030 i32::MAX
2031 )));
2032 }
2033 let plan = UnionReadPlan::from_resolved(&branches, resolved)?;
2034 Ok(Self {
2035 fields,
2036 branches: UnionDecoderBranches::new(branches, reader_type_codes),
2037 default_emit_idx,
2038 null_emit_idx,
2039 plan,
2040 })
2041 }
2042
2043 fn with_single_target(target: Decoder, info: ResolvedUnion) -> Result<Self, AvroError> {
2044 debug_assert!(info.writer_is_union && !info.reader_is_union);
2046 let mut reader_branches = [target];
2047 let lookup_table =
2048 DispatchLookupTable::from_writer_to_reader(&reader_branches, &info.writer_to_reader)?;
2049 let target = Box::new(mem::replace(&mut reader_branches[0], Decoder::Null(0)));
2050 Ok(Self {
2051 plan: UnionReadPlan::ToSingle {
2052 target,
2053 lookup_table,
2054 },
2055 ..Self::default()
2056 })
2057 }
2058
2059 #[inline]
2060 fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
2061 let raw = buf.get_long()?;
2066 if raw < 0 {
2067 return Err(AvroError::ParseError(format!(
2068 "Negative union branch index {raw}"
2069 )));
2070 }
2071 usize::try_from(raw).map_err(|_| {
2072 AvroError::ParseError(format!(
2073 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2074 (usize::BITS as usize)
2075 ))
2076 })
2077 }
2078
2079 #[inline]
2080 fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
2081 where
2082 F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
2083 {
2084 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2085 return action(target);
2086 }
2087 let reader_idx = match &self.plan {
2088 UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
2089 _ => fallback_idx,
2090 };
2091 self.branches.emit_to(reader_idx).and_then(action)
2092 }
2093
2094 fn append_null(&mut self) -> Result<(), AvroError> {
2095 self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
2096 }
2097
2098 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
2099 self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
2100 }
2101
2102 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2103 match &mut self.plan {
2104 UnionReadPlan::Passthrough => {
2105 let reader_idx = Self::read_tag(buf)?;
2106 let decoder = self.branches.emit_to(reader_idx)?;
2107 decoder.decode(buf)
2108 }
2109 UnionReadPlan::ReaderUnion { lookup_table } => {
2110 let idx = Self::read_tag(buf)?;
2111 let Some((reader_idx, resolution)) = lookup_table.resolve(idx) else {
2112 return Err(AvroError::ParseError(format!(
2113 "Union branch index {idx} not resolvable by reader schema"
2114 )));
2115 };
2116 let decoder = self.branches.emit_to(reader_idx)?;
2117 decoder.decode_with_resolution(buf, resolution)
2118 }
2119 UnionReadPlan::FromSingle {
2120 reader_idx,
2121 resolution,
2122 } => {
2123 let decoder = self.branches.emit_to(*reader_idx)?;
2124 decoder.decode_with_resolution(buf, resolution)
2125 }
2126 UnionReadPlan::ToSingle {
2127 target,
2128 lookup_table,
2129 } => {
2130 let idx = Self::read_tag(buf)?;
2131 let Some((_, resolution)) = lookup_table.resolve(idx) else {
2132 return Err(AvroError::ParseError(format!(
2133 "Writer union branch index {idx} not resolvable by reader schema"
2134 )));
2135 };
2136 target.decode_with_resolution(buf, resolution)
2137 }
2138 }
2139 }
2140
2141 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
2142 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2143 return target.flush(nulls);
2144 }
2145 debug_assert!(
2146 nulls.is_none(),
2147 "UnionArray does not accept a validity bitmap; \
2148 nulls should have been materialized as a Null child during decode"
2149 );
2150 let children = self
2151 .branches
2152 .decoders
2153 .iter_mut()
2154 .map(|d| d.flush(None))
2155 .collect::<Result<Vec<_>, _>>()?;
2156 let arr = UnionArray::try_new(
2157 self.fields.clone(),
2158 flush_values(&mut self.branches.type_ids)
2159 .into_iter()
2160 .collect(),
2161 Some(
2162 flush_values(&mut self.branches.offsets)
2163 .into_iter()
2164 .collect(),
2165 ),
2166 children,
2167 )
2168 .map_err(|e| AvroError::ParseError(e.to_string()))?;
2169 Ok(Arc::new(arr))
2170 }
2171}
2172
2173#[derive(Debug, Default)]
2174struct UnionDecoderBuilder {
2175 fields: Option<UnionFields>,
2176 branches: Option<Vec<Decoder>>,
2177 resolved: Option<ResolvedUnion>,
2178 target: Option<Decoder>,
2179}
2180
2181impl UnionDecoderBuilder {
2182 fn new() -> Self {
2183 Self::default()
2184 }
2185
2186 fn with_fields(mut self, fields: UnionFields) -> Self {
2187 self.fields = Some(fields);
2188 self
2189 }
2190
2191 fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
2192 self.branches = Some(branches);
2193 self
2194 }
2195
2196 fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
2197 self.resolved = Some(resolved_union);
2198 self
2199 }
2200
2201 fn with_target(mut self, target: Decoder) -> Self {
2202 self.target = Some(target);
2203 self
2204 }
2205
2206 fn build(self) -> Result<UnionDecoder, AvroError> {
2207 match (self.resolved, self.fields, self.branches, self.target) {
2208 (resolved, Some(fields), Some(branches), None) => {
2209 UnionDecoder::try_new(fields, branches, resolved)
2210 }
2211 (Some(info), None, None, Some(target))
2212 if info.writer_is_union && !info.reader_is_union =>
2213 {
2214 UnionDecoder::with_single_target(target, info)
2215 }
2216 _ => Err(AvroError::InvalidArgument(
2217 "Invalid UnionDecoderBuilder configuration: expected either \
2218 (fields + branches + resolved) with no target for reader-unions, or \
2219 (resolved + target) with no fields/branches for writer-union to single."
2220 .to_string(),
2221 )),
2222 }
2223 }
2224}
2225
2226#[derive(Debug, Copy, Clone)]
2227enum NegativeBlockBehavior {
2228 ProcessItems,
2229 SkipBySize,
2230}
2231
2232#[inline]
2233fn skip_blocks(
2234 buf: &mut AvroCursor,
2235 mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2236) -> Result<usize, AvroError> {
2237 process_blockwise(
2238 buf,
2239 move |c| skip_item(c),
2240 NegativeBlockBehavior::SkipBySize,
2241 )
2242}
2243
2244#[inline]
2245fn flush_dict(
2246 indices: &mut Vec<i32>,
2247 symbols: &[String],
2248 nulls: Option<NullBuffer>,
2249) -> Result<ArrayRef, AvroError> {
2250 let keys = flush_primitive::<Int32Type>(indices, nulls);
2251 let values = Arc::new(StringArray::from_iter_values(
2252 symbols.iter().map(|s| s.as_str()),
2253 ));
2254 DictionaryArray::try_new(keys, values)
2255 .map_err(Into::into)
2256 .map(|arr| Arc::new(arr) as ArrayRef)
2257}
2258
2259#[inline]
2260fn read_blocks(
2261 buf: &mut AvroCursor,
2262 decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2263) -> Result<usize, AvroError> {
2264 process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
2265}
2266
2267#[inline]
2268fn process_blockwise(
2269 buf: &mut AvroCursor,
2270 mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2271 negative_behavior: NegativeBlockBehavior,
2272) -> Result<usize, AvroError> {
2273 let mut total = 0usize;
2274 loop {
2275 let block_count = buf.get_long()?;
2280 match block_count.cmp(&0) {
2281 Ordering::Equal => break,
2282 Ordering::Less => {
2283 let count = (-block_count) as usize;
2284 let size_in_bytes = buf.get_long()? as usize;
2286 match negative_behavior {
2287 NegativeBlockBehavior::ProcessItems => {
2288 for _ in 0..count {
2290 on_item(buf)?;
2291 }
2292 }
2293 NegativeBlockBehavior::SkipBySize => {
2294 let _ = buf.get_fixed(size_in_bytes)?;
2296 }
2297 }
2298 total += count;
2299 }
2300 Ordering::Greater => {
2301 let count = block_count as usize;
2302 for _ in 0..count {
2303 on_item(buf)?;
2304 }
2305 total += count;
2306 }
2307 }
2308 }
2309 Ok(total)
2310}
2311
2312#[inline]
2313fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
2314 std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
2315}
2316
2317#[inline]
2318fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
2319 std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
2320}
2321
2322#[inline]
2323fn flush_primitive<T: ArrowPrimitiveType>(
2324 values: &mut Vec<T::Native>,
2325 nulls: Option<NullBuffer>,
2326) -> PrimitiveArray<T> {
2327 PrimitiveArray::new(flush_values(values).into(), nulls)
2328}
2329
2330#[inline]
2331fn read_decimal_bytes_be<const N: usize>(
2332 buf: &mut AvroCursor<'_>,
2333 size: &Option<usize>,
2334) -> Result<[u8; N], AvroError> {
2335 match size {
2336 Some(n) if *n == N => {
2337 let raw = buf.get_fixed(N)?;
2338 let mut arr = [0u8; N];
2339 arr.copy_from_slice(raw);
2340 Ok(arr)
2341 }
2342 Some(n) => {
2343 let raw = buf.get_fixed(*n)?;
2344 sign_cast_to::<N>(raw)
2345 }
2346 None => {
2347 let raw = buf.get_bytes()?;
2348 sign_cast_to::<N>(raw)
2349 }
2350 }
2351}
2352
2353#[inline]
2362fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
2363 let len = raw.len();
2364 if len == N {
2366 let mut out = [0u8; N];
2367 out.copy_from_slice(raw);
2368 return Ok(out);
2369 }
2370 let first = raw.first().copied().unwrap_or(0u8);
2372 let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
2373 let mut out = [sign_byte; N];
2375 if len > N {
2376 let extra = len - N;
2379 if raw[..extra].iter().any(|&b| b != sign_byte) {
2381 return Err(AvroError::ParseError(format!(
2382 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2383 len, N
2384 )));
2385 }
2386 if N > 0 {
2387 let first_kept = raw[extra];
2388 let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
2389 if sign_bit_mismatch {
2390 return Err(AvroError::ParseError(format!(
2391 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2392 len, N
2393 )));
2394 }
2395 }
2396 out.copy_from_slice(&raw[extra..]);
2397 return Ok(out);
2398 }
2399 out[N - len..].copy_from_slice(raw);
2400 Ok(out)
2401}
2402
2403#[cfg(feature = "avro_custom_types")]
2404#[inline]
2405fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
2406 match (arr.is_null(i), arr.is_null(j)) {
2407 (true, true) => true,
2408 (true, false) | (false, true) => false,
2409 (false, false) => {
2410 let a = arr.slice(i, 1);
2411 let b = arr.slice(j, 1);
2412 a == b
2413 }
2414 }
2415}
2416
2417#[derive(Debug)]
2418struct Projector {
2419 writer_projections: Vec<FieldProjection>,
2420 default_injections: Arc<[(usize, AvroLiteral)]>,
2421}
2422
2423#[derive(Debug)]
2424enum FieldProjection {
2425 ToReader(usize),
2426 Skip(Skipper),
2427}
2428
2429#[derive(Debug)]
2430struct ProjectorBuilder<'a> {
2431 rec: &'a ResolvedRecord,
2432 field_defaults: &'a [Option<AvroLiteral>],
2433}
2434
2435impl<'a> ProjectorBuilder<'a> {
2436 #[inline]
2437 fn try_new(rec: &'a ResolvedRecord, field_defaults: &'a [Option<AvroLiteral>]) -> Self {
2438 Self {
2439 rec,
2440 field_defaults,
2441 }
2442 }
2443
2444 #[inline]
2445 fn build(self) -> Result<Projector, AvroError> {
2446 let mut default_injections: Vec<(usize, AvroLiteral)> =
2447 Vec::with_capacity(self.rec.default_fields.len());
2448 for &idx in self.rec.default_fields.as_ref() {
2449 let lit = self
2450 .field_defaults
2451 .get(idx)
2452 .and_then(|lit| lit.clone())
2453 .unwrap_or(AvroLiteral::Null);
2454 default_injections.push((idx, lit));
2455 }
2456 let writer_projections = self
2457 .rec
2458 .writer_fields
2459 .iter()
2460 .map(|field| match field {
2461 ResolvedField::ToReader(index) => Ok(FieldProjection::ToReader(*index)),
2462 ResolvedField::Skip(datatype) => {
2463 let skipper = Skipper::from_avro(datatype)?;
2464 Ok(FieldProjection::Skip(skipper))
2465 }
2466 })
2467 .collect::<Result<_, AvroError>>()?;
2468 Ok(Projector {
2469 writer_projections,
2470 default_injections: default_injections.into(),
2471 })
2472 }
2473}
2474
2475impl Projector {
2476 #[inline]
2477 fn project_record(
2478 &self,
2479 buf: &mut AvroCursor<'_>,
2480 encodings: &mut [Decoder],
2481 ) -> Result<(), AvroError> {
2482 for field_proj in self.writer_projections.iter() {
2483 match field_proj {
2484 FieldProjection::ToReader(index) => encodings[*index].decode(buf)?,
2485 FieldProjection::Skip(skipper) => skipper.skip(buf)?,
2486 }
2487 }
2488 for (reader_index, lit) in self.default_injections.as_ref() {
2489 encodings[*reader_index].append_default(lit)?;
2490 }
2491 Ok(())
2492 }
2493}
2494
2495#[derive(Debug)]
2501enum Skipper {
2502 Null,
2503 Boolean,
2504 Int32,
2505 Int64,
2506 Float32,
2507 Float64,
2508 Bytes,
2509 String,
2510 TimeMicros,
2511 TimestampMillis,
2512 TimestampMicros,
2513 TimestampNanos,
2514 Fixed(usize),
2515 Decimal(Option<usize>),
2516 UuidString,
2517 Enum,
2518 DurationFixed12,
2519 List(Box<Skipper>),
2520 Map(Box<Skipper>),
2521 Struct(Vec<Skipper>),
2522 Union(Vec<Skipper>),
2523 Nullable(Nullability, Box<Skipper>),
2524 #[cfg(feature = "avro_custom_types")]
2525 RunEndEncoded(Box<Skipper>),
2526}
2527
2528impl Skipper {
2529 fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
2530 let mut base = match dt.codec() {
2531 Codec::Null => Self::Null,
2532 Codec::Boolean => Self::Boolean,
2533 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
2534 Codec::Int64 => Self::Int64,
2535 Codec::TimeMicros => Self::TimeMicros,
2536 Codec::TimestampMillis(_) => Self::TimestampMillis,
2537 Codec::TimestampMicros(_) => Self::TimestampMicros,
2538 Codec::TimestampNanos(_) => Self::TimestampNanos,
2539 #[cfg(feature = "avro_custom_types")]
2540 Codec::DurationNanos
2541 | Codec::DurationMicros
2542 | Codec::DurationMillis
2543 | Codec::DurationSeconds => Self::Int64,
2544 #[cfg(feature = "avro_custom_types")]
2545 Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 | Codec::Time32Secs => {
2546 Self::Int32
2547 }
2548 #[cfg(feature = "avro_custom_types")]
2549 Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
2550 Self::Int64
2551 }
2552 #[cfg(feature = "avro_custom_types")]
2553 Codec::UInt64 => Self::Fixed(8),
2554 #[cfg(feature = "avro_custom_types")]
2555 Codec::Float16 => Self::Fixed(2),
2556 #[cfg(feature = "avro_custom_types")]
2557 Codec::IntervalYearMonth => Self::Fixed(4),
2558 #[cfg(feature = "avro_custom_types")]
2559 Codec::IntervalMonthDayNano => Self::Fixed(16),
2560 #[cfg(feature = "avro_custom_types")]
2561 Codec::IntervalDayTime => Self::Fixed(8),
2562 Codec::Float32 => Self::Float32,
2563 Codec::Float64 => Self::Float64,
2564 Codec::Binary => Self::Bytes,
2565 Codec::Utf8 | Codec::Utf8View => Self::String,
2566 Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2567 Codec::Decimal(_, _, size) => Self::Decimal(*size),
2568 Codec::Uuid => Self::UuidString, Codec::Enum(_) => Self::Enum,
2570 Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2571 Codec::Struct(fields) => Self::Struct(
2572 fields
2573 .iter()
2574 .map(|f| Skipper::from_avro(f.data_type()))
2575 .collect::<Result<_, _>>()?,
2576 ),
2577 Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2578 Codec::Interval => Self::DurationFixed12,
2579 Codec::Union(encodings, _, _) => {
2580 let max_addr = (i32::MAX as usize) + 1;
2581 if encodings.len() > max_addr {
2582 return Err(AvroError::SchemaError(format!(
2583 "Writer union has {} branches, which exceeds the maximum addressable \
2584 branches by an Avro int tag ({} + 1).",
2585 encodings.len(),
2586 i32::MAX
2587 )));
2588 }
2589 Self::Union(
2590 encodings
2591 .iter()
2592 .map(Skipper::from_avro)
2593 .collect::<Result<_, _>>()?,
2594 )
2595 }
2596 #[cfg(feature = "avro_custom_types")]
2597 Codec::RunEndEncoded(inner, _w) => {
2598 Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2599 }
2600 };
2601 if let Some(n) = dt.nullability() {
2602 base = Self::Nullable(n, Box::new(base));
2603 }
2604 Ok(base)
2605 }
2606
2607 fn skip(&self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2608 match self {
2609 Self::Null => Ok(()),
2610 Self::Boolean => {
2611 buf.get_bool()?;
2612 Ok(())
2613 }
2614 Self::Int32 => {
2615 buf.skip_int()?;
2616 Ok(())
2617 }
2618 Self::Int64
2619 | Self::TimeMicros
2620 | Self::TimestampMillis
2621 | Self::TimestampMicros
2622 | Self::TimestampNanos => {
2623 buf.skip_long()?;
2624 Ok(())
2625 }
2626 Self::Float32 => {
2627 buf.get_float()?;
2628 Ok(())
2629 }
2630 Self::Float64 => {
2631 buf.get_double()?;
2632 Ok(())
2633 }
2634 Self::Bytes | Self::String | Self::UuidString => {
2635 buf.get_bytes()?;
2636 Ok(())
2637 }
2638 Self::Fixed(sz) => {
2639 buf.get_fixed(*sz)?;
2640 Ok(())
2641 }
2642 Self::Decimal(size) => {
2643 if let Some(s) = size {
2644 buf.get_fixed(*s)
2645 } else {
2646 buf.get_bytes()
2647 }?;
2648 Ok(())
2649 }
2650 Self::Enum => {
2651 buf.skip_int()?;
2652 Ok(())
2653 }
2654 Self::DurationFixed12 => {
2655 buf.get_fixed(12)?;
2656 Ok(())
2657 }
2658 Self::List(item) => {
2659 skip_blocks(buf, |c| item.skip(c))?;
2660 Ok(())
2661 }
2662 Self::Map(value) => {
2663 skip_blocks(buf, |c| {
2664 c.get_bytes()?; value.skip(c)
2666 })?;
2667 Ok(())
2668 }
2669 Self::Struct(fields) => {
2670 for f in fields.iter() {
2671 f.skip(buf)?
2672 }
2673 Ok(())
2674 }
2675 Self::Union(encodings) => {
2676 let raw = buf.get_long()?;
2678 if raw < 0 {
2679 return Err(AvroError::ParseError(format!(
2680 "Negative union branch index {raw}"
2681 )));
2682 }
2683 let idx: usize = usize::try_from(raw).map_err(|_| {
2684 AvroError::ParseError(format!(
2685 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2686 (usize::BITS as usize)
2687 ))
2688 })?;
2689 let Some(encoding) = encodings.get(idx) else {
2690 return Err(AvroError::ParseError(format!(
2691 "Union branch index {idx} out of range for skipper ({} branches)",
2692 encodings.len()
2693 )));
2694 };
2695 encoding.skip(buf)
2696 }
2697 Self::Nullable(order, inner) => {
2698 let branch = buf.read_vlq()?;
2699 let is_not_null = match *order {
2700 Nullability::NullFirst => branch != 0,
2701 Nullability::NullSecond => branch == 0,
2702 };
2703 if is_not_null {
2704 inner.skip(buf)?;
2705 }
2706 Ok(())
2707 }
2708 #[cfg(feature = "avro_custom_types")]
2709 Self::RunEndEncoded(inner) => inner.skip(buf),
2710 }
2711 }
2712}
2713
2714#[cfg(test)]
2715mod tests {
2716 use super::*;
2717 use crate::codec::AvroFieldBuilder;
2718 use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2719 use arrow_array::cast::AsArray;
2720 use indexmap::IndexMap;
2721 use std::collections::HashMap;
2722
2723 fn encode_avro_int(value: i32) -> Vec<u8> {
2724 let mut buf = Vec::new();
2725 let mut v = (value << 1) ^ (value >> 31);
2726 while v & !0x7F != 0 {
2727 buf.push(((v & 0x7F) | 0x80) as u8);
2728 v >>= 7;
2729 }
2730 buf.push(v as u8);
2731 buf
2732 }
2733
2734 fn encode_avro_long(value: i64) -> Vec<u8> {
2735 let mut buf = Vec::new();
2736 let mut v = (value << 1) ^ (value >> 63);
2737 while v & !0x7F != 0 {
2738 buf.push(((v & 0x7F) | 0x80) as u8);
2739 v >>= 7;
2740 }
2741 buf.push(v as u8);
2742 buf
2743 }
2744
2745 fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2746 let mut buf = encode_avro_long(bytes.len() as i64);
2747 buf.extend_from_slice(bytes);
2748 buf
2749 }
2750
2751 fn avro_from_codec(codec: Codec) -> AvroDataType {
2752 AvroDataType::new(codec, Default::default(), None)
2753 }
2754
2755 fn resolved_root_datatype(
2756 writer: Schema<'static>,
2757 reader: Schema<'static>,
2758 use_utf8view: bool,
2759 strict_mode: bool,
2760 ) -> AvroDataType {
2761 let writer_record = Schema::Complex(ComplexType::Record(Record {
2763 name: "Root",
2764 namespace: None,
2765 doc: None,
2766 aliases: vec![],
2767 fields: vec![Field {
2768 name: "v",
2769 r#type: writer,
2770 default: None,
2771 doc: None,
2772 aliases: vec![],
2773 }],
2774 attributes: Attributes::default(),
2775 }));
2776
2777 let reader_record = Schema::Complex(ComplexType::Record(Record {
2779 name: "Root",
2780 namespace: None,
2781 doc: None,
2782 aliases: vec![],
2783 fields: vec![Field {
2784 name: "v",
2785 r#type: reader,
2786 default: None,
2787 doc: None,
2788 aliases: vec![],
2789 }],
2790 attributes: Attributes::default(),
2791 }));
2792
2793 let field = AvroFieldBuilder::new(&writer_record)
2795 .with_reader_schema(&reader_record)
2796 .with_utf8view(use_utf8view)
2797 .with_strict_mode(strict_mode)
2798 .build()
2799 .expect("schema resolution should succeed");
2800
2801 match field.data_type().codec() {
2802 Codec::Struct(fields) => fields[0].data_type().clone(),
2803 other => panic!("expected wrapper struct, got {other:?}"),
2804 }
2805 }
2806
2807 fn decoder_for_promotion(
2808 writer: PrimitiveType,
2809 reader: PrimitiveType,
2810 use_utf8view: bool,
2811 ) -> Decoder {
2812 let ws = Schema::TypeName(TypeName::Primitive(writer));
2813 let rs = Schema::TypeName(TypeName::Primitive(reader));
2814 let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2815 Decoder::try_new(&dt).unwrap()
2816 }
2817
2818 fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2819 AvroDataType::new(codec, HashMap::new(), nullability)
2820 }
2821
2822 #[cfg(feature = "avro_custom_types")]
2823 fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2824 let mut out = Vec::with_capacity(10);
2825 while x >= 0x80 {
2826 out.push((x as u8) | 0x80);
2827 x >>= 7;
2828 }
2829 out.push(x as u8);
2830 out
2831 }
2832
2833 #[test]
2834 fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2835 let ws = Schema::Union(vec![
2836 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2837 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2838 ]);
2839 let rs = Schema::Union(vec![
2840 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2841 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2842 ]);
2843
2844 let dt = resolved_root_datatype(ws, rs, false, false);
2845 let mut dec = Decoder::try_new(&dt).unwrap();
2846
2847 let mut rec1 = encode_avro_long(0);
2848 rec1.extend(encode_avro_int(7));
2849 let mut cur1 = AvroCursor::new(&rec1);
2850 dec.decode(&mut cur1).unwrap();
2851
2852 let mut rec2 = encode_avro_long(1);
2853 rec2.extend(encode_avro_bytes("abc".as_bytes()));
2854 let mut cur2 = AvroCursor::new(&rec2);
2855 dec.decode(&mut cur2).unwrap();
2856
2857 let arr = dec.flush(None).unwrap();
2858 let ua = arr
2859 .as_any()
2860 .downcast_ref::<UnionArray>()
2861 .expect("dense union output");
2862
2863 assert_eq!(
2864 ua.type_id(0),
2865 1,
2866 "first value must select reader 'long' branch"
2867 );
2868 assert_eq!(ua.value_offset(0), 0);
2869
2870 assert_eq!(
2871 ua.type_id(1),
2872 0,
2873 "second value must select reader 'string' branch"
2874 );
2875 assert_eq!(ua.value_offset(1), 0);
2876
2877 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2878 assert_eq!(long_child.len(), 1);
2879 assert_eq!(long_child.value(0), 7);
2880
2881 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2882 assert_eq!(str_child.len(), 1);
2883 assert_eq!(str_child.value(0), "abc");
2884 }
2885
2886 #[test]
2887 fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2888 let ws = Schema::Union(vec![
2889 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2890 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2891 ]);
2892 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2893
2894 let dt = resolved_root_datatype(ws, rs, false, false);
2895 let mut dec = Decoder::try_new(&dt).unwrap();
2896
2897 let mut data = encode_avro_long(0);
2898 data.extend(encode_avro_int(5));
2899 let mut cur = AvroCursor::new(&data);
2900 dec.decode(&mut cur).unwrap();
2901
2902 let arr = dec.flush(None).unwrap();
2903 let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2904 assert_eq!(out.len(), 1);
2905 assert_eq!(out.value(0), 5);
2906 }
2907
2908 #[test]
2909 fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2910 let ws = Schema::Union(vec![
2911 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2912 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2913 ]);
2914 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2915
2916 let dt = resolved_root_datatype(ws, rs, false, false);
2917 let mut dec = Decoder::try_new(&dt).unwrap();
2918
2919 let mut data = encode_avro_long(1);
2920 data.extend(encode_avro_bytes("z".as_bytes()));
2921 let mut cur = AvroCursor::new(&data);
2922 let res = dec.decode(&mut cur);
2923 assert!(
2924 res.is_err(),
2925 "expected error when writer union branch does not resolve to reader non-union type"
2926 );
2927 }
2928
2929 #[test]
2930 fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2931 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2932 let rs = Schema::Union(vec![
2933 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2934 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2935 ]);
2936
2937 let dt = resolved_root_datatype(ws, rs, false, false);
2938 let mut dec = Decoder::try_new(&dt).unwrap();
2939
2940 let data = encode_avro_int(6);
2941 let mut cur = AvroCursor::new(&data);
2942 dec.decode(&mut cur).unwrap();
2943
2944 let arr = dec.flush(None).unwrap();
2945 let ua = arr
2946 .as_any()
2947 .downcast_ref::<UnionArray>()
2948 .expect("dense union output");
2949 assert_eq!(ua.len(), 1);
2950 assert_eq!(
2951 ua.type_id(0),
2952 1,
2953 "must resolve to reader 'long' branch (type_id 1)"
2954 );
2955 assert_eq!(ua.value_offset(0), 0);
2956
2957 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2958 assert_eq!(long_child.len(), 1);
2959 assert_eq!(long_child.value(0), 6);
2960
2961 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2962 assert_eq!(str_child.len(), 0, "string branch must be empty");
2963 }
2964
2965 #[test]
2966 fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2967 let ws = Schema::Union(vec![
2968 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2969 Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2970 ]);
2971 let rs = Schema::Union(vec![
2972 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2973 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2974 ]);
2975
2976 let dt = resolved_root_datatype(ws, rs, false, false);
2977 let mut dec = Decoder::try_new(&dt).unwrap();
2978
2979 let mut data = encode_avro_long(1);
2980 data.push(1);
2981 let mut cur = AvroCursor::new(&data);
2982 let res = dec.decode(&mut cur);
2983 assert!(
2984 res.is_err(),
2985 "expected error for unmapped writer 'boolean' branch"
2986 );
2987 }
2988
2989 #[test]
2990 fn test_schema_resolution_promotion_int_to_long() {
2991 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2992 assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2993 for v in [0, 1, -2, 123456] {
2994 let data = encode_avro_int(v);
2995 let mut cur = AvroCursor::new(&data);
2996 dec.decode(&mut cur).unwrap();
2997 }
2998 let arr = dec.flush(None).unwrap();
2999 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
3000 assert_eq!(a.value(0), 0);
3001 assert_eq!(a.value(1), 1);
3002 assert_eq!(a.value(2), -2);
3003 assert_eq!(a.value(3), 123456);
3004 }
3005
3006 #[test]
3007 fn test_schema_resolution_promotion_int_to_float() {
3008 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
3009 assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
3010 for v in [0, 42, -7] {
3011 let data = encode_avro_int(v);
3012 let mut cur = AvroCursor::new(&data);
3013 dec.decode(&mut cur).unwrap();
3014 }
3015 let arr = dec.flush(None).unwrap();
3016 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3017 assert_eq!(a.value(0), 0.0);
3018 assert_eq!(a.value(1), 42.0);
3019 assert_eq!(a.value(2), -7.0);
3020 }
3021
3022 #[test]
3023 fn test_schema_resolution_promotion_int_to_double() {
3024 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
3025 assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
3026 for v in [1, -1, 10_000] {
3027 let data = encode_avro_int(v);
3028 let mut cur = AvroCursor::new(&data);
3029 dec.decode(&mut cur).unwrap();
3030 }
3031 let arr = dec.flush(None).unwrap();
3032 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3033 assert_eq!(a.value(0), 1.0);
3034 assert_eq!(a.value(1), -1.0);
3035 assert_eq!(a.value(2), 10_000.0);
3036 }
3037
3038 #[test]
3039 fn test_schema_resolution_promotion_long_to_float() {
3040 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
3041 assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
3042 for v in [0_i64, 1_000_000_i64, -123_i64] {
3043 let data = encode_avro_long(v);
3044 let mut cur = AvroCursor::new(&data);
3045 dec.decode(&mut cur).unwrap();
3046 }
3047 let arr = dec.flush(None).unwrap();
3048 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3049 assert_eq!(a.value(0), 0.0);
3050 assert_eq!(a.value(1), 1_000_000.0);
3051 assert_eq!(a.value(2), -123.0);
3052 }
3053
3054 #[test]
3055 fn test_schema_resolution_promotion_long_to_double() {
3056 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
3057 assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
3058 for v in [2_i64, -2_i64, 9_223_372_i64] {
3059 let data = encode_avro_long(v);
3060 let mut cur = AvroCursor::new(&data);
3061 dec.decode(&mut cur).unwrap();
3062 }
3063 let arr = dec.flush(None).unwrap();
3064 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3065 assert_eq!(a.value(0), 2.0);
3066 assert_eq!(a.value(1), -2.0);
3067 assert_eq!(a.value(2), 9_223_372.0);
3068 }
3069
3070 #[test]
3071 fn test_schema_resolution_promotion_float_to_double() {
3072 let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
3073 assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
3074 for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
3075 let data = v.to_le_bytes().to_vec();
3076 let mut cur = AvroCursor::new(&data);
3077 dec.decode(&mut cur).unwrap();
3078 }
3079 let arr = dec.flush(None).unwrap();
3080 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3081 assert_eq!(a.value(0), 0.5_f64);
3082 assert_eq!(a.value(1), -3.25_f64);
3083 assert_eq!(a.value(2), 1.0e6_f64);
3084 }
3085
3086 #[test]
3087 fn test_schema_resolution_promotion_bytes_to_string_utf8() {
3088 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
3089 assert!(matches!(dec, Decoder::BytesToString(_, _)));
3090 for s in ["hello", "world", "héllo"] {
3091 let data = encode_avro_bytes(s.as_bytes());
3092 let mut cur = AvroCursor::new(&data);
3093 dec.decode(&mut cur).unwrap();
3094 }
3095 let arr = dec.flush(None).unwrap();
3096 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3097 assert_eq!(a.value(0), "hello");
3098 assert_eq!(a.value(1), "world");
3099 assert_eq!(a.value(2), "héllo");
3100 }
3101
3102 #[test]
3103 fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
3104 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
3105 assert!(matches!(dec, Decoder::BytesToString(_, _)));
3106 let data = encode_avro_bytes("abc".as_bytes());
3107 let mut cur = AvroCursor::new(&data);
3108 dec.decode(&mut cur).unwrap();
3109 let arr = dec.flush(None).unwrap();
3110 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3111 assert_eq!(a.value(0), "abc");
3112 }
3113
3114 #[test]
3115 fn test_schema_resolution_promotion_string_to_bytes() {
3116 let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
3117 assert!(matches!(dec, Decoder::StringToBytes(_, _)));
3118 for s in ["", "abc", "data"] {
3119 let data = encode_avro_bytes(s.as_bytes());
3120 let mut cur = AvroCursor::new(&data);
3121 dec.decode(&mut cur).unwrap();
3122 }
3123 let arr = dec.flush(None).unwrap();
3124 let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3125 assert_eq!(a.value(0), b"");
3126 assert_eq!(a.value(1), b"abc");
3127 assert_eq!(a.value(2), "data".as_bytes());
3128 }
3129
3130 #[test]
3131 fn test_schema_resolution_no_promotion_passthrough_int() {
3132 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3133 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3134 let writer_record = Schema::Complex(ComplexType::Record(Record {
3136 name: "Root",
3137 namespace: None,
3138 doc: None,
3139 aliases: vec![],
3140 fields: vec![Field {
3141 name: "v",
3142 r#type: ws,
3143 default: None,
3144 doc: None,
3145 aliases: vec![],
3146 }],
3147 attributes: Attributes::default(),
3148 }));
3149 let reader_record = Schema::Complex(ComplexType::Record(Record {
3150 name: "Root",
3151 namespace: None,
3152 doc: None,
3153 aliases: vec![],
3154 fields: vec![Field {
3155 name: "v",
3156 r#type: rs,
3157 default: None,
3158 doc: None,
3159 aliases: vec![],
3160 }],
3161 attributes: Attributes::default(),
3162 }));
3163 let field = AvroFieldBuilder::new(&writer_record)
3164 .with_reader_schema(&reader_record)
3165 .with_utf8view(false)
3166 .with_strict_mode(false)
3167 .build()
3168 .unwrap();
3169 let dt = match field.data_type().codec() {
3171 Codec::Struct(fields) => fields[0].data_type().clone(),
3172 other => panic!("expected wrapper struct, got {other:?}"),
3173 };
3174 let mut dec = Decoder::try_new(&dt).unwrap();
3175 assert!(matches!(dec, Decoder::Int32(_)));
3176 for v in [7, -9] {
3177 let data = encode_avro_int(v);
3178 let mut cur = AvroCursor::new(&data);
3179 dec.decode(&mut cur).unwrap();
3180 }
3181 let arr = dec.flush(None).unwrap();
3182 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3183 assert_eq!(a.value(0), 7);
3184 assert_eq!(a.value(1), -9);
3185 }
3186
3187 #[test]
3188 fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
3189 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3190 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
3191 let writer_record = Schema::Complex(ComplexType::Record(Record {
3192 name: "Root",
3193 namespace: None,
3194 doc: None,
3195 aliases: vec![],
3196 fields: vec![Field {
3197 name: "v",
3198 r#type: ws,
3199 default: None,
3200 doc: None,
3201 aliases: vec![],
3202 }],
3203 attributes: Attributes::default(),
3204 }));
3205 let reader_record = Schema::Complex(ComplexType::Record(Record {
3206 name: "Root",
3207 namespace: None,
3208 doc: None,
3209 aliases: vec![],
3210 fields: vec![Field {
3211 name: "v",
3212 r#type: rs,
3213 default: None,
3214 doc: None,
3215 aliases: vec![],
3216 }],
3217 attributes: Attributes::default(),
3218 }));
3219 let res = AvroFieldBuilder::new(&writer_record)
3220 .with_reader_schema(&reader_record)
3221 .with_utf8view(false)
3222 .with_strict_mode(false)
3223 .build();
3224 assert!(res.is_err(), "expected error for illegal promotion");
3225 }
3226
3227 #[test]
3228 fn test_map_decoding_one_entry() {
3229 let value_type = avro_from_codec(Codec::Utf8);
3230 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3231 let mut decoder = Decoder::try_new(&map_type).unwrap();
3232 let mut data = Vec::new();
3234 data.extend_from_slice(&encode_avro_long(1));
3235 data.extend_from_slice(&encode_avro_bytes(b"hello")); data.extend_from_slice(&encode_avro_bytes(b"world")); data.extend_from_slice(&encode_avro_long(0));
3238 let mut cursor = AvroCursor::new(&data);
3239 decoder.decode(&mut cursor).unwrap();
3240 let array = decoder.flush(None).unwrap();
3241 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3242 assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 1);
3244 let entries = map_arr.value(0);
3245 let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
3246 assert_eq!(struct_entries.len(), 1);
3247 let key_arr = struct_entries
3248 .column_by_name("key")
3249 .unwrap()
3250 .as_any()
3251 .downcast_ref::<StringArray>()
3252 .unwrap();
3253 let val_arr = struct_entries
3254 .column_by_name("value")
3255 .unwrap()
3256 .as_any()
3257 .downcast_ref::<StringArray>()
3258 .unwrap();
3259 assert_eq!(key_arr.value(0), "hello");
3260 assert_eq!(val_arr.value(0), "world");
3261 }
3262
3263 #[test]
3264 fn test_map_decoding_empty() {
3265 let value_type = avro_from_codec(Codec::Utf8);
3266 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3267 let mut decoder = Decoder::try_new(&map_type).unwrap();
3268 let data = encode_avro_long(0);
3269 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3270 let array = decoder.flush(None).unwrap();
3271 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3272 assert_eq!(map_arr.len(), 1);
3273 assert_eq!(map_arr.value_length(0), 0);
3274 }
3275
3276 #[test]
3277 fn test_fixed_decoding() {
3278 let avro_type = avro_from_codec(Codec::Fixed(3));
3279 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3280
3281 let data1 = [1u8, 2, 3];
3282 let mut cursor1 = AvroCursor::new(&data1);
3283 decoder
3284 .decode(&mut cursor1)
3285 .expect("Failed to decode data1");
3286 assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
3287 let data2 = [4u8, 5, 6];
3288 let mut cursor2 = AvroCursor::new(&data2);
3289 decoder
3290 .decode(&mut cursor2)
3291 .expect("Failed to decode data2");
3292 assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
3293 let array = decoder.flush(None).expect("Failed to flush decoder");
3294 assert_eq!(array.len(), 2, "Array should contain two items");
3295 let fixed_size_binary_array = array
3296 .as_any()
3297 .downcast_ref::<FixedSizeBinaryArray>()
3298 .expect("Failed to downcast to FixedSizeBinaryArray");
3299 assert_eq!(
3300 fixed_size_binary_array.value_length(),
3301 3,
3302 "Fixed size of binary values should be 3"
3303 );
3304 assert_eq!(
3305 fixed_size_binary_array.value(0),
3306 &[1, 2, 3],
3307 "First item mismatch"
3308 );
3309 assert_eq!(
3310 fixed_size_binary_array.value(1),
3311 &[4, 5, 6],
3312 "Second item mismatch"
3313 );
3314 }
3315
3316 #[test]
3317 fn test_fixed_decoding_empty() {
3318 let avro_type = avro_from_codec(Codec::Fixed(5));
3319 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3320
3321 let array = decoder
3322 .flush(None)
3323 .expect("Failed to flush decoder for empty input");
3324
3325 assert_eq!(array.len(), 0, "Array should be empty");
3326 let fixed_size_binary_array = array
3327 .as_any()
3328 .downcast_ref::<FixedSizeBinaryArray>()
3329 .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
3330
3331 assert_eq!(
3332 fixed_size_binary_array.value_length(),
3333 5,
3334 "Fixed size of binary values should be 5 as per type"
3335 );
3336 }
3337
3338 #[test]
3339 fn test_uuid_decoding() {
3340 let avro_type = avro_from_codec(Codec::Uuid);
3341 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3342 let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
3343 let data = encode_avro_bytes(uuid_str.as_bytes());
3344 let mut cursor = AvroCursor::new(&data);
3345 decoder.decode(&mut cursor).expect("Failed to decode data");
3346 assert_eq!(
3347 cursor.position(),
3348 data.len(),
3349 "Cursor should advance by varint size + data size"
3350 );
3351 let array = decoder.flush(None).expect("Failed to flush decoder");
3352 let fixed_size_binary_array = array
3353 .as_any()
3354 .downcast_ref::<FixedSizeBinaryArray>()
3355 .expect("Array should be a FixedSizeBinaryArray");
3356 assert_eq!(fixed_size_binary_array.len(), 1);
3357 assert_eq!(fixed_size_binary_array.value_length(), 16);
3358 let expected_bytes = [
3359 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
3360 0x6b, 0xf6,
3361 ];
3362 assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
3363 }
3364
3365 #[test]
3366 fn test_array_decoding() {
3367 let item_dt = avro_from_codec(Codec::Int32);
3368 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3369 let mut decoder = Decoder::try_new(&list_dt).unwrap();
3370 let mut row1 = Vec::new();
3371 row1.extend_from_slice(&encode_avro_long(2));
3372 row1.extend_from_slice(&encode_avro_int(10));
3373 row1.extend_from_slice(&encode_avro_int(20));
3374 row1.extend_from_slice(&encode_avro_long(0));
3375 let row2 = encode_avro_long(0);
3376 let mut cursor = AvroCursor::new(&row1);
3377 decoder.decode(&mut cursor).unwrap();
3378 let mut cursor2 = AvroCursor::new(&row2);
3379 decoder.decode(&mut cursor2).unwrap();
3380 let array = decoder.flush(None).unwrap();
3381 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3382 assert_eq!(list_arr.len(), 2);
3383 let offsets = list_arr.value_offsets();
3384 assert_eq!(offsets, &[0, 2, 2]);
3385 let values = list_arr.values();
3386 let int_arr = values.as_primitive::<Int32Type>();
3387 assert_eq!(int_arr.len(), 2);
3388 assert_eq!(int_arr.value(0), 10);
3389 assert_eq!(int_arr.value(1), 20);
3390 }
3391
3392 #[test]
3393 fn test_array_decoding_with_negative_block_count() {
3394 let item_dt = avro_from_codec(Codec::Int32);
3395 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3396 let mut decoder = Decoder::try_new(&list_dt).unwrap();
3397 let mut data = encode_avro_long(-3);
3398 data.extend_from_slice(&encode_avro_long(12));
3399 data.extend_from_slice(&encode_avro_int(1));
3400 data.extend_from_slice(&encode_avro_int(2));
3401 data.extend_from_slice(&encode_avro_int(3));
3402 data.extend_from_slice(&encode_avro_long(0));
3403 let mut cursor = AvroCursor::new(&data);
3404 decoder.decode(&mut cursor).unwrap();
3405 let array = decoder.flush(None).unwrap();
3406 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3407 assert_eq!(list_arr.len(), 1);
3408 assert_eq!(list_arr.value_length(0), 3);
3409 let values = list_arr.values().as_primitive::<Int32Type>();
3410 assert_eq!(values.len(), 3);
3411 assert_eq!(values.value(0), 1);
3412 assert_eq!(values.value(1), 2);
3413 assert_eq!(values.value(2), 3);
3414 }
3415
3416 #[test]
3417 fn test_nested_array_decoding() {
3418 let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3419 let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
3420 let mut decoder = Decoder::try_new(&nested_ty).unwrap();
3421 let mut buf = Vec::new();
3422 buf.extend(encode_avro_long(1));
3423 buf.extend(encode_avro_long(2));
3424 buf.extend(encode_avro_int(5));
3425 buf.extend(encode_avro_int(6));
3426 buf.extend(encode_avro_long(0));
3427 buf.extend(encode_avro_long(0));
3428 let mut cursor = AvroCursor::new(&buf);
3429 decoder.decode(&mut cursor).unwrap();
3430 let arr = decoder.flush(None).unwrap();
3431 let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
3432 assert_eq!(outer.len(), 1);
3433 assert_eq!(outer.value_length(0), 1);
3434 let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
3435 assert_eq!(inner.len(), 1);
3436 assert_eq!(inner.value_length(0), 2);
3437 let values = inner
3438 .values()
3439 .as_any()
3440 .downcast_ref::<Int32Array>()
3441 .unwrap();
3442 assert_eq!(values.values(), &[5, 6]);
3443 }
3444
3445 #[test]
3446 fn test_array_decoding_empty_array() {
3447 let value_type = avro_from_codec(Codec::Utf8);
3448 let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
3449 let mut decoder = Decoder::try_new(&map_type).unwrap();
3450 let data = encode_avro_long(0);
3451 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3452 let array = decoder.flush(None).unwrap();
3453 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3454 assert_eq!(list_arr.len(), 1);
3455 assert_eq!(list_arr.value_length(0), 0);
3456 }
3457
3458 #[test]
3459 fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
3460 use crate::schema::Array;
3461 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3462 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3463 attributes: Attributes::default(),
3464 }));
3465 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3466 items: Box::new(Schema::Union(vec![
3467 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3468 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3469 ])),
3470 attributes: Attributes::default(),
3471 }));
3472 let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
3473 if let Codec::List(inner) = dt.codec() {
3474 assert_eq!(
3475 inner.nullability(),
3476 Some(Nullability::NullFirst),
3477 "items should be nullable"
3478 );
3479 } else {
3480 panic!("expected List codec");
3481 }
3482 let mut decoder = Decoder::try_new(&dt).unwrap();
3483 let mut data = encode_avro_long(2);
3484 data.extend(encode_avro_int(10));
3485 data.extend(encode_avro_int(20));
3486 data.extend(encode_avro_long(0));
3487 let mut cursor = AvroCursor::new(&data);
3488 decoder.decode(&mut cursor).unwrap();
3489 assert_eq!(
3490 cursor.position(),
3491 data.len(),
3492 "all bytes should be consumed"
3493 );
3494 let array = decoder.flush(None).unwrap();
3495 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3496 assert_eq!(list_arr.len(), 1, "one list/row");
3497 assert_eq!(list_arr.value_length(0), 2, "two items in the list");
3498 let values = list_arr.values().as_primitive::<Int32Type>();
3499 assert_eq!(values.len(), 2);
3500 assert_eq!(values.value(0), 10);
3501 assert_eq!(values.value(1), 20);
3502 assert!(!values.is_null(0));
3503 assert!(!values.is_null(1));
3504 }
3505
3506 #[test]
3507 fn test_decimal_decoding_fixed256() {
3508 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3509 let mut decoder = Decoder::try_new(&dt).unwrap();
3510 let row1 = [
3511 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3512 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3513 0x00, 0x00, 0x30, 0x39,
3514 ];
3515 let row2 = [
3516 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3517 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3518 0xFF, 0xFF, 0xFF, 0x85,
3519 ];
3520 let mut data = Vec::new();
3521 data.extend_from_slice(&row1);
3522 data.extend_from_slice(&row2);
3523 let mut cursor = AvroCursor::new(&data);
3524 decoder.decode(&mut cursor).unwrap();
3525 decoder.decode(&mut cursor).unwrap();
3526 let arr = decoder.flush(None).unwrap();
3527 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3528 assert_eq!(dec.len(), 2);
3529 assert_eq!(dec.value_as_string(0), "123.45");
3530 assert_eq!(dec.value_as_string(1), "-1.23");
3531 }
3532
3533 #[test]
3534 fn test_decimal_decoding_fixed128() {
3535 let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
3536 let mut decoder = Decoder::try_new(&dt).unwrap();
3537 let row1 = [
3538 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3539 0x30, 0x39,
3540 ];
3541 let row2 = [
3542 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3543 0xFF, 0x85,
3544 ];
3545 let mut data = Vec::new();
3546 data.extend_from_slice(&row1);
3547 data.extend_from_slice(&row2);
3548 let mut cursor = AvroCursor::new(&data);
3549 decoder.decode(&mut cursor).unwrap();
3550 decoder.decode(&mut cursor).unwrap();
3551 let arr = decoder.flush(None).unwrap();
3552 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3553 assert_eq!(dec.len(), 2);
3554 assert_eq!(dec.value_as_string(0), "123.45");
3555 assert_eq!(dec.value_as_string(1), "-1.23");
3556 }
3557
3558 #[test]
3559 fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3560 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3561 let mut decoder = Decoder::try_new(&dt).unwrap();
3562 let row1 = [
3563 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3564 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3565 0x00, 0x00, 0x30, 0x39,
3566 ];
3567 let row2 = [
3568 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3569 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3570 0xFF, 0xFF, 0xFF, 0x85,
3571 ];
3572 let mut data = Vec::new();
3573 data.extend_from_slice(&row1);
3574 data.extend_from_slice(&row2);
3575 let mut cursor = AvroCursor::new(&data);
3576 decoder.decode(&mut cursor).unwrap();
3577 decoder.decode(&mut cursor).unwrap();
3578 let arr = decoder.flush(None).unwrap();
3579 #[cfg(feature = "small_decimals")]
3580 {
3581 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3582 assert_eq!(dec.len(), 2);
3583 assert_eq!(dec.value_as_string(0), "123.45");
3584 assert_eq!(dec.value_as_string(1), "-1.23");
3585 }
3586 #[cfg(not(feature = "small_decimals"))]
3587 {
3588 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3589 assert_eq!(dec.len(), 2);
3590 assert_eq!(dec.value_as_string(0), "123.45");
3591 assert_eq!(dec.value_as_string(1), "-1.23");
3592 }
3593 }
3594
3595 #[test]
3596 fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3597 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3598 let mut decoder = Decoder::try_new(&dt).unwrap();
3599 let row1 = [
3600 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3601 0x30, 0x39,
3602 ];
3603 let row2 = [
3604 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3605 0xFF, 0x85,
3606 ];
3607 let mut data = Vec::new();
3608 data.extend_from_slice(&row1);
3609 data.extend_from_slice(&row2);
3610 let mut cursor = AvroCursor::new(&data);
3611 decoder.decode(&mut cursor).unwrap();
3612 decoder.decode(&mut cursor).unwrap();
3613
3614 let arr = decoder.flush(None).unwrap();
3615 #[cfg(feature = "small_decimals")]
3616 {
3617 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3618 assert_eq!(dec.len(), 2);
3619 assert_eq!(dec.value_as_string(0), "123.45");
3620 assert_eq!(dec.value_as_string(1), "-1.23");
3621 }
3622 #[cfg(not(feature = "small_decimals"))]
3623 {
3624 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3625 assert_eq!(dec.len(), 2);
3626 assert_eq!(dec.value_as_string(0), "123.45");
3627 assert_eq!(dec.value_as_string(1), "-1.23");
3628 }
3629 }
3630
3631 #[test]
3632 fn test_decimal_decoding_bytes_with_nulls() {
3633 let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3634 let inner = Decoder::try_new(&dt).unwrap();
3635 let mut decoder = Decoder::Nullable(
3636 NullablePlan::ReadTag {
3637 nullability: Nullability::NullSecond,
3638 resolution: ResolutionPlan::Promotion(Promotion::Direct),
3639 },
3640 NullBufferBuilder::new(DEFAULT_CAPACITY),
3641 Box::new(inner),
3642 );
3643 let mut data = Vec::new();
3644 data.extend_from_slice(&encode_avro_int(0));
3645 data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3646 data.extend_from_slice(&encode_avro_int(1));
3647 data.extend_from_slice(&encode_avro_int(0));
3648 data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3649 let mut cursor = AvroCursor::new(&data);
3650 decoder.decode(&mut cursor).unwrap();
3651 decoder.decode(&mut cursor).unwrap();
3652 decoder.decode(&mut cursor).unwrap();
3653 let arr = decoder.flush(None).unwrap();
3654 #[cfg(feature = "small_decimals")]
3655 {
3656 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3657 assert_eq!(dec_arr.len(), 3);
3658 assert!(dec_arr.is_valid(0));
3659 assert!(!dec_arr.is_valid(1));
3660 assert!(dec_arr.is_valid(2));
3661 assert_eq!(dec_arr.value_as_string(0), "123.4");
3662 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3663 }
3664 #[cfg(not(feature = "small_decimals"))]
3665 {
3666 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3667 assert_eq!(dec_arr.len(), 3);
3668 assert!(dec_arr.is_valid(0));
3669 assert!(!dec_arr.is_valid(1));
3670 assert!(dec_arr.is_valid(2));
3671 assert_eq!(dec_arr.value_as_string(0), "123.4");
3672 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3673 }
3674 }
3675
3676 #[test]
3677 fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3678 let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3679 let inner = Decoder::try_new(&dt).unwrap();
3680 let mut decoder = Decoder::Nullable(
3681 NullablePlan::ReadTag {
3682 nullability: Nullability::NullSecond,
3683 resolution: ResolutionPlan::Promotion(Promotion::Direct),
3684 },
3685 NullBufferBuilder::new(DEFAULT_CAPACITY),
3686 Box::new(inner),
3687 );
3688 let row1 = [
3689 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3690 0xE2, 0x40,
3691 ];
3692 let row3 = [
3693 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3694 0x1D, 0xC0,
3695 ];
3696 let mut data = Vec::new();
3697 data.extend_from_slice(&encode_avro_int(0));
3698 data.extend_from_slice(&row1);
3699 data.extend_from_slice(&encode_avro_int(1));
3700 data.extend_from_slice(&encode_avro_int(0));
3701 data.extend_from_slice(&row3);
3702 let mut cursor = AvroCursor::new(&data);
3703 decoder.decode(&mut cursor).unwrap();
3704 decoder.decode(&mut cursor).unwrap();
3705 decoder.decode(&mut cursor).unwrap();
3706 let arr = decoder.flush(None).unwrap();
3707 #[cfg(feature = "small_decimals")]
3708 {
3709 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3710 assert_eq!(dec_arr.len(), 3);
3711 assert!(dec_arr.is_valid(0));
3712 assert!(!dec_arr.is_valid(1));
3713 assert!(dec_arr.is_valid(2));
3714 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3715 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3716 }
3717 #[cfg(not(feature = "small_decimals"))]
3718 {
3719 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3720 assert_eq!(dec_arr.len(), 3);
3721 assert!(dec_arr.is_valid(0));
3722 assert!(!dec_arr.is_valid(1));
3723 assert!(dec_arr.is_valid(2));
3724 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3725 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3726 }
3727 }
3728
3729 #[test]
3730 fn test_enum_decoding() {
3731 let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3732 let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3733 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3734 let mut data = Vec::new();
3735 data.extend_from_slice(&encode_avro_int(2));
3736 data.extend_from_slice(&encode_avro_int(0));
3737 data.extend_from_slice(&encode_avro_int(1));
3738 let mut cursor = AvroCursor::new(&data);
3739 decoder.decode(&mut cursor).unwrap();
3740 decoder.decode(&mut cursor).unwrap();
3741 decoder.decode(&mut cursor).unwrap();
3742 let array = decoder.flush(None).unwrap();
3743 let dict_array = array
3744 .as_any()
3745 .downcast_ref::<DictionaryArray<Int32Type>>()
3746 .unwrap();
3747 assert_eq!(dict_array.len(), 3);
3748 let values = dict_array
3749 .values()
3750 .as_any()
3751 .downcast_ref::<StringArray>()
3752 .unwrap();
3753 assert_eq!(values.value(0), "A");
3754 assert_eq!(values.value(1), "B");
3755 assert_eq!(values.value(2), "C");
3756 assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3757 }
3758
3759 #[test]
3760 fn test_enum_decoding_with_nulls() {
3761 let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3762 let enum_codec = Codec::Enum(symbols.clone());
3763 let avro_type =
3764 AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3765 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3766 let mut data = Vec::new();
3767 data.extend_from_slice(&encode_avro_long(1));
3768 data.extend_from_slice(&encode_avro_int(1));
3769 data.extend_from_slice(&encode_avro_long(0));
3770 data.extend_from_slice(&encode_avro_long(1));
3771 data.extend_from_slice(&encode_avro_int(0));
3772 let mut cursor = AvroCursor::new(&data);
3773 decoder.decode(&mut cursor).unwrap();
3774 decoder.decode(&mut cursor).unwrap();
3775 decoder.decode(&mut cursor).unwrap();
3776 let array = decoder.flush(None).unwrap();
3777 let dict_array = array
3778 .as_any()
3779 .downcast_ref::<DictionaryArray<Int32Type>>()
3780 .unwrap();
3781 assert_eq!(dict_array.len(), 3);
3782 assert!(dict_array.is_valid(0));
3783 assert!(dict_array.is_null(1));
3784 assert!(dict_array.is_valid(2));
3785 let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3786 assert_eq!(dict_array.keys(), &expected_keys);
3787 let values = dict_array
3788 .values()
3789 .as_any()
3790 .downcast_ref::<StringArray>()
3791 .unwrap();
3792 assert_eq!(values.value(0), "X");
3793 assert_eq!(values.value(1), "Y");
3794 }
3795
3796 #[test]
3797 fn test_duration_decoding_with_nulls() {
3798 let duration_codec = Codec::Interval;
3799 let avro_type = AvroDataType::new(
3800 duration_codec,
3801 Default::default(),
3802 Some(Nullability::NullFirst),
3803 );
3804 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3805 let mut data = Vec::new();
3806 data.extend_from_slice(&encode_avro_long(1)); let mut duration1 = Vec::new();
3809 duration1.extend_from_slice(&1u32.to_le_bytes());
3810 duration1.extend_from_slice(&2u32.to_le_bytes());
3811 duration1.extend_from_slice(&3u32.to_le_bytes());
3812 data.extend_from_slice(&duration1);
3813 data.extend_from_slice(&encode_avro_long(0)); data.extend_from_slice(&encode_avro_long(1)); let mut duration2 = Vec::new();
3817 duration2.extend_from_slice(&4u32.to_le_bytes());
3818 duration2.extend_from_slice(&5u32.to_le_bytes());
3819 duration2.extend_from_slice(&6u32.to_le_bytes());
3820 data.extend_from_slice(&duration2);
3821 let mut cursor = AvroCursor::new(&data);
3822 decoder.decode(&mut cursor).unwrap();
3823 decoder.decode(&mut cursor).unwrap();
3824 decoder.decode(&mut cursor).unwrap();
3825 let array = decoder.flush(None).unwrap();
3826 let interval_array = array
3827 .as_any()
3828 .downcast_ref::<IntervalMonthDayNanoArray>()
3829 .unwrap();
3830 assert_eq!(interval_array.len(), 3);
3831 assert!(interval_array.is_valid(0));
3832 assert!(interval_array.is_null(1));
3833 assert!(interval_array.is_valid(2));
3834 let expected = IntervalMonthDayNanoArray::from(vec![
3835 Some(IntervalMonthDayNano {
3836 months: 1,
3837 days: 2,
3838 nanoseconds: 3_000_000,
3839 }),
3840 None,
3841 Some(IntervalMonthDayNano {
3842 months: 4,
3843 days: 5,
3844 nanoseconds: 6_000_000,
3845 }),
3846 ]);
3847 assert_eq!(interval_array, &expected);
3848 }
3849
3850 #[cfg(feature = "avro_custom_types")]
3851 #[test]
3852 fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3853 let avro_type = AvroDataType::new(
3854 Codec::IntervalMonthDayNano,
3855 Default::default(),
3856 Some(Nullability::NullFirst),
3857 );
3858 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3859 let mut data = Vec::new();
3860 data.extend_from_slice(&encode_avro_long(1));
3862 data.extend_from_slice(&1i32.to_le_bytes());
3863 data.extend_from_slice(&(-2i32).to_le_bytes());
3864 data.extend_from_slice(&3i64.to_le_bytes());
3865 data.extend_from_slice(&encode_avro_long(0));
3867 data.extend_from_slice(&encode_avro_long(1));
3869 data.extend_from_slice(&(-4i32).to_le_bytes());
3870 data.extend_from_slice(&5i32.to_le_bytes());
3871 data.extend_from_slice(&(-6i64).to_le_bytes());
3872 let mut cursor = AvroCursor::new(&data);
3873 decoder.decode(&mut cursor).unwrap();
3874 decoder.decode(&mut cursor).unwrap();
3875 decoder.decode(&mut cursor).unwrap();
3876 let array = decoder.flush(None).unwrap();
3877 let interval_array = array
3878 .as_any()
3879 .downcast_ref::<IntervalMonthDayNanoArray>()
3880 .unwrap();
3881 assert_eq!(interval_array.len(), 3);
3882 let expected = IntervalMonthDayNanoArray::from(vec![
3883 Some(IntervalMonthDayNano::new(1, -2, 3)),
3884 None,
3885 Some(IntervalMonthDayNano::new(-4, 5, -6)),
3886 ]);
3887 assert_eq!(interval_array, &expected);
3888 }
3889
3890 #[test]
3891 fn test_duration_decoding_empty() {
3892 let duration_codec = Codec::Interval;
3893 let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3894 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3895 let array = decoder.flush(None).unwrap();
3896 assert_eq!(array.len(), 0);
3897 }
3898
3899 #[test]
3900 #[cfg(feature = "avro_custom_types")]
3901 fn test_duration_seconds_decoding() {
3902 let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3903 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3904 let mut data = Vec::new();
3905 data.extend_from_slice(&encode_avro_long(0));
3907 data.extend_from_slice(&encode_avro_long(-1));
3908 data.extend_from_slice(&encode_avro_long(2));
3909 let mut cursor = AvroCursor::new(&data);
3910 decoder.decode(&mut cursor).unwrap();
3911 decoder.decode(&mut cursor).unwrap();
3912 decoder.decode(&mut cursor).unwrap();
3913 let array = decoder.flush(None).unwrap();
3914 let dur = array
3915 .as_any()
3916 .downcast_ref::<DurationSecondArray>()
3917 .unwrap();
3918 assert_eq!(dur.values(), &[0, -1, 2]);
3919 }
3920
3921 #[test]
3922 #[cfg(feature = "avro_custom_types")]
3923 fn test_duration_milliseconds_decoding() {
3924 let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3925 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3926 let mut data = Vec::new();
3927 for v in [1i64, 0, -2] {
3928 data.extend_from_slice(&encode_avro_long(v));
3929 }
3930 let mut cursor = AvroCursor::new(&data);
3931 for _ in 0..3 {
3932 decoder.decode(&mut cursor).unwrap();
3933 }
3934 let array = decoder.flush(None).unwrap();
3935 let dur = array
3936 .as_any()
3937 .downcast_ref::<DurationMillisecondArray>()
3938 .unwrap();
3939 assert_eq!(dur.values(), &[1, 0, -2]);
3940 }
3941
3942 #[test]
3943 #[cfg(feature = "avro_custom_types")]
3944 fn test_duration_microseconds_decoding() {
3945 let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3946 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3947 let mut data = Vec::new();
3948 for v in [5i64, -6, 7] {
3949 data.extend_from_slice(&encode_avro_long(v));
3950 }
3951 let mut cursor = AvroCursor::new(&data);
3952 for _ in 0..3 {
3953 decoder.decode(&mut cursor).unwrap();
3954 }
3955 let array = decoder.flush(None).unwrap();
3956 let dur = array
3957 .as_any()
3958 .downcast_ref::<DurationMicrosecondArray>()
3959 .unwrap();
3960 assert_eq!(dur.values(), &[5, -6, 7]);
3961 }
3962
3963 #[test]
3964 #[cfg(feature = "avro_custom_types")]
3965 fn test_duration_nanoseconds_decoding() {
3966 let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3967 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3968 let mut data = Vec::new();
3969 for v in [8i64, 9, -10] {
3970 data.extend_from_slice(&encode_avro_long(v));
3971 }
3972 let mut cursor = AvroCursor::new(&data);
3973 for _ in 0..3 {
3974 decoder.decode(&mut cursor).unwrap();
3975 }
3976 let array = decoder.flush(None).unwrap();
3977 let dur = array
3978 .as_any()
3979 .downcast_ref::<DurationNanosecondArray>()
3980 .unwrap();
3981 assert_eq!(dur.values(), &[8, 9, -10]);
3982 }
3983
3984 #[test]
3985 fn test_nullable_decode_error_bitmap_corruption() {
3986 let avro_type = AvroDataType::new(
3988 Codec::Int32,
3989 Default::default(),
3990 Some(Nullability::NullSecond),
3991 );
3992 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3993
3994 let mut row1 = Vec::new();
3996 row1.extend_from_slice(&encode_avro_int(1));
3997
3998 let mut row2 = Vec::new();
4000 row2.extend_from_slice(&encode_avro_int(0)); let mut row3 = Vec::new();
4004 row3.extend_from_slice(&encode_avro_int(0)); row3.extend_from_slice(&encode_avro_int(42)); decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
4008 assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
4010
4011 let array = decoder.flush(None).unwrap();
4012
4013 assert_eq!(array.len(), 2);
4015 let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
4016 assert!(int_array.is_null(0)); assert_eq!(int_array.value(1), 42); }
4019
4020 #[test]
4021 fn test_enum_mapping_reordered_symbols() {
4022 let reader_symbols: Arc<[String]> =
4023 vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
4024 let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
4025 let default_index: i32 = -1;
4026 let mut dec = Decoder::Enum(
4027 Vec::with_capacity(DEFAULT_CAPACITY),
4028 reader_symbols.clone(),
4029 Some(EnumResolution {
4030 mapping,
4031 default_index,
4032 }),
4033 );
4034 let mut data = Vec::new();
4035 data.extend_from_slice(&encode_avro_int(0));
4036 data.extend_from_slice(&encode_avro_int(1));
4037 data.extend_from_slice(&encode_avro_int(2));
4038 let mut cur = AvroCursor::new(&data);
4039 dec.decode(&mut cur).unwrap();
4040 dec.decode(&mut cur).unwrap();
4041 dec.decode(&mut cur).unwrap();
4042 let arr = dec.flush(None).unwrap();
4043 let dict = arr
4044 .as_any()
4045 .downcast_ref::<DictionaryArray<Int32Type>>()
4046 .unwrap();
4047 let expected_keys = Int32Array::from(vec![2, 0, 1]);
4048 assert_eq!(dict.keys(), &expected_keys);
4049 let values = dict
4050 .values()
4051 .as_any()
4052 .downcast_ref::<StringArray>()
4053 .unwrap();
4054 assert_eq!(values.value(0), "B");
4055 assert_eq!(values.value(1), "C");
4056 assert_eq!(values.value(2), "A");
4057 }
4058
4059 #[test]
4060 fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
4061 let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
4062 let default_index: i32 = 1;
4063 let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
4064 let mut dec = Decoder::Enum(
4065 Vec::with_capacity(DEFAULT_CAPACITY),
4066 reader_symbols.clone(),
4067 Some(EnumResolution {
4068 mapping,
4069 default_index,
4070 }),
4071 );
4072 let mut data = Vec::new();
4073 data.extend_from_slice(&encode_avro_int(0));
4074 data.extend_from_slice(&encode_avro_int(1));
4075 data.extend_from_slice(&encode_avro_int(99));
4076 let mut cur = AvroCursor::new(&data);
4077 dec.decode(&mut cur).unwrap();
4078 dec.decode(&mut cur).unwrap();
4079 dec.decode(&mut cur).unwrap();
4080 let arr = dec.flush(None).unwrap();
4081 let dict = arr
4082 .as_any()
4083 .downcast_ref::<DictionaryArray<Int32Type>>()
4084 .unwrap();
4085 let expected_keys = Int32Array::from(vec![0, 1, 1]);
4086 assert_eq!(dict.keys(), &expected_keys);
4087 let values = dict
4088 .values()
4089 .as_any()
4090 .downcast_ref::<StringArray>()
4091 .unwrap();
4092 assert_eq!(values.value(0), "A");
4093 assert_eq!(values.value(1), "B");
4094 }
4095
4096 #[test]
4097 fn test_enum_mapping_unknown_symbol_without_default_errors() {
4098 let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
4099 let default_index: i32 = -1; let mapping: Arc<[i32]> = Arc::from(vec![-1]);
4101 let mut dec = Decoder::Enum(
4102 Vec::with_capacity(DEFAULT_CAPACITY),
4103 reader_symbols,
4104 Some(EnumResolution {
4105 mapping,
4106 default_index,
4107 }),
4108 );
4109 let data = encode_avro_int(0);
4110 let mut cur = AvroCursor::new(&data);
4111 let err = dec
4112 .decode(&mut cur)
4113 .expect_err("expected decode error for unresolved enum without default");
4114 let msg = err.to_string();
4115 assert!(
4116 msg.contains("not resolvable") && msg.contains("no default"),
4117 "unexpected error message: {msg}"
4118 );
4119 }
4120
4121 fn make_record_resolved_decoder(
4122 reader_fields: &[(&str, DataType, bool)],
4123 writer_projections: Vec<FieldProjection>,
4124 ) -> Decoder {
4125 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4126 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4127 for (name, dt, nullable) in reader_fields {
4128 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4129 let enc = match dt {
4130 DataType::Int32 => Decoder::Int32(Vec::new()),
4131 DataType::Int64 => Decoder::Int64(Vec::new()),
4132 DataType::Utf8 => {
4133 Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
4134 }
4135 other => panic!("Unsupported test reader field type: {other:?}"),
4136 };
4137 encodings.push(enc);
4138 }
4139 let fields: Fields = field_refs.into();
4140 Decoder::Record(
4141 fields,
4142 encodings,
4143 vec![None; reader_fields.len()],
4144 Some(Projector {
4145 writer_projections,
4146 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4147 }),
4148 )
4149 }
4150
4151 #[test]
4152 fn test_skip_writer_trailing_field_int32() {
4153 let mut dec = make_record_resolved_decoder(
4154 &[("id", arrow_schema::DataType::Int32, false)],
4155 vec![
4156 FieldProjection::ToReader(0),
4157 FieldProjection::Skip(super::Skipper::Int32),
4158 ],
4159 );
4160 let mut data = Vec::new();
4161 data.extend_from_slice(&encode_avro_int(7));
4162 data.extend_from_slice(&encode_avro_int(999));
4163 let mut cur = AvroCursor::new(&data);
4164 dec.decode(&mut cur).unwrap();
4165 assert_eq!(cur.position(), data.len());
4166 let arr = dec.flush(None).unwrap();
4167 let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
4168 assert_eq!(struct_arr.len(), 1);
4169 let id = struct_arr
4170 .column_by_name("id")
4171 .unwrap()
4172 .as_any()
4173 .downcast_ref::<Int32Array>()
4174 .unwrap();
4175 assert_eq!(id.value(0), 7);
4176 }
4177
4178 #[test]
4179 fn test_skip_writer_middle_field_string() {
4180 let mut dec = make_record_resolved_decoder(
4181 &[
4182 ("id", DataType::Int32, false),
4183 ("score", DataType::Int64, false),
4184 ],
4185 vec![
4186 FieldProjection::ToReader(0),
4187 FieldProjection::Skip(Skipper::String),
4188 FieldProjection::ToReader(1),
4189 ],
4190 );
4191 let mut data = Vec::new();
4192 data.extend_from_slice(&encode_avro_int(42));
4193 data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
4194 data.extend_from_slice(&encode_avro_long(1000));
4195 let mut cur = AvroCursor::new(&data);
4196 dec.decode(&mut cur).unwrap();
4197 assert_eq!(cur.position(), data.len());
4198 let arr = dec.flush(None).unwrap();
4199 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4200 let id = s
4201 .column_by_name("id")
4202 .unwrap()
4203 .as_any()
4204 .downcast_ref::<Int32Array>()
4205 .unwrap();
4206 let score = s
4207 .column_by_name("score")
4208 .unwrap()
4209 .as_any()
4210 .downcast_ref::<Int64Array>()
4211 .unwrap();
4212 assert_eq!(id.value(0), 42);
4213 assert_eq!(score.value(0), 1000);
4214 }
4215
4216 #[test]
4217 fn test_skip_writer_array_with_negative_block_count_fast() {
4218 let mut dec = make_record_resolved_decoder(
4219 &[("id", DataType::Int32, false)],
4220 vec![
4221 FieldProjection::Skip(super::Skipper::List(Box::new(Skipper::Int32))),
4222 FieldProjection::ToReader(0),
4223 ],
4224 );
4225 let mut array_payload = Vec::new();
4226 array_payload.extend_from_slice(&encode_avro_int(1));
4227 array_payload.extend_from_slice(&encode_avro_int(2));
4228 array_payload.extend_from_slice(&encode_avro_int(3));
4229 let mut data = Vec::new();
4230 data.extend_from_slice(&encode_avro_long(-3));
4231 data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
4232 data.extend_from_slice(&array_payload);
4233 data.extend_from_slice(&encode_avro_long(0));
4234 data.extend_from_slice(&encode_avro_int(5));
4235 let mut cur = AvroCursor::new(&data);
4236 dec.decode(&mut cur).unwrap();
4237 assert_eq!(cur.position(), data.len());
4238 let arr = dec.flush(None).unwrap();
4239 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4240 let id = s
4241 .column_by_name("id")
4242 .unwrap()
4243 .as_any()
4244 .downcast_ref::<Int32Array>()
4245 .unwrap();
4246 assert_eq!(id.len(), 1);
4247 assert_eq!(id.value(0), 5);
4248 }
4249
4250 #[test]
4251 fn test_skip_writer_map_with_negative_block_count_fast() {
4252 let mut dec = make_record_resolved_decoder(
4253 &[("id", DataType::Int32, false)],
4254 vec![
4255 FieldProjection::Skip(Skipper::Map(Box::new(Skipper::Int32))),
4256 FieldProjection::ToReader(0),
4257 ],
4258 );
4259 let mut entries = Vec::new();
4260 entries.extend_from_slice(&encode_avro_bytes(b"k1"));
4261 entries.extend_from_slice(&encode_avro_int(10));
4262 entries.extend_from_slice(&encode_avro_bytes(b"k2"));
4263 entries.extend_from_slice(&encode_avro_int(20));
4264 let mut data = Vec::new();
4265 data.extend_from_slice(&encode_avro_long(-2));
4266 data.extend_from_slice(&encode_avro_long(entries.len() as i64));
4267 data.extend_from_slice(&entries);
4268 data.extend_from_slice(&encode_avro_long(0));
4269 data.extend_from_slice(&encode_avro_int(123));
4270 let mut cur = AvroCursor::new(&data);
4271 dec.decode(&mut cur).unwrap();
4272 assert_eq!(cur.position(), data.len());
4273 let arr = dec.flush(None).unwrap();
4274 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4275 let id = s
4276 .column_by_name("id")
4277 .unwrap()
4278 .as_any()
4279 .downcast_ref::<Int32Array>()
4280 .unwrap();
4281 assert_eq!(id.len(), 1);
4282 assert_eq!(id.value(0), 123);
4283 }
4284
4285 #[test]
4286 fn test_skip_writer_nullable_field_union_nullfirst() {
4287 let mut dec = make_record_resolved_decoder(
4288 &[("id", DataType::Int32, false)],
4289 vec![
4290 FieldProjection::Skip(super::Skipper::Nullable(
4291 Nullability::NullFirst,
4292 Box::new(super::Skipper::Int32),
4293 )),
4294 FieldProjection::ToReader(0),
4295 ],
4296 );
4297 let mut row1 = Vec::new();
4298 row1.extend_from_slice(&encode_avro_long(0));
4299 row1.extend_from_slice(&encode_avro_int(5));
4300 let mut row2 = Vec::new();
4301 row2.extend_from_slice(&encode_avro_long(1));
4302 row2.extend_from_slice(&encode_avro_int(123));
4303 row2.extend_from_slice(&encode_avro_int(7));
4304 let mut cur1 = AvroCursor::new(&row1);
4305 let mut cur2 = AvroCursor::new(&row2);
4306 dec.decode(&mut cur1).unwrap();
4307 dec.decode(&mut cur2).unwrap();
4308 assert_eq!(cur1.position(), row1.len());
4309 assert_eq!(cur2.position(), row2.len());
4310 let arr = dec.flush(None).unwrap();
4311 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4312 let id = s
4313 .column_by_name("id")
4314 .unwrap()
4315 .as_any()
4316 .downcast_ref::<Int32Array>()
4317 .unwrap();
4318 assert_eq!(id.len(), 2);
4319 assert_eq!(id.value(0), 5);
4320 assert_eq!(id.value(1), 7);
4321 }
4322
4323 fn make_dense_union_avro(
4324 children: Vec<(Codec, &'_ str, DataType)>,
4325 type_ids: Vec<i8>,
4326 ) -> AvroDataType {
4327 let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
4328 let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
4329 for (codec, name, dt) in children.into_iter() {
4330 avro_children.push(AvroDataType::new(codec, Default::default(), None));
4331 fields.push(arrow_schema::Field::new(name, dt, true));
4332 }
4333 let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
4334 let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
4335 AvroDataType::new(union_codec, Default::default(), None)
4336 }
4337
4338 #[test]
4339 fn test_union_dense_two_children_custom_type_ids() {
4340 let union_dt = make_dense_union_avro(
4341 vec![
4342 (Codec::Int32, "i", DataType::Int32),
4343 (Codec::Utf8, "s", DataType::Utf8),
4344 ],
4345 vec![2, 5],
4346 );
4347 let mut dec = Decoder::try_new(&union_dt).unwrap();
4348 let mut r1 = Vec::new();
4349 r1.extend_from_slice(&encode_avro_long(0));
4350 r1.extend_from_slice(&encode_avro_int(7));
4351 let mut r2 = Vec::new();
4352 r2.extend_from_slice(&encode_avro_long(1));
4353 r2.extend_from_slice(&encode_avro_bytes(b"x"));
4354 let mut r3 = Vec::new();
4355 r3.extend_from_slice(&encode_avro_long(0));
4356 r3.extend_from_slice(&encode_avro_int(-1));
4357 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4358 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4359 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4360 let array = dec.flush(None).unwrap();
4361 let ua = array
4362 .as_any()
4363 .downcast_ref::<UnionArray>()
4364 .expect("expected UnionArray");
4365 assert_eq!(ua.len(), 3);
4366 assert_eq!(ua.type_id(0), 2);
4367 assert_eq!(ua.type_id(1), 5);
4368 assert_eq!(ua.type_id(2), 2);
4369 assert_eq!(ua.value_offset(0), 0);
4370 assert_eq!(ua.value_offset(1), 0);
4371 assert_eq!(ua.value_offset(2), 1);
4372 let int_child = ua
4373 .child(2)
4374 .as_any()
4375 .downcast_ref::<Int32Array>()
4376 .expect("int child");
4377 assert_eq!(int_child.len(), 2);
4378 assert_eq!(int_child.value(0), 7);
4379 assert_eq!(int_child.value(1), -1);
4380 let str_child = ua
4381 .child(5)
4382 .as_any()
4383 .downcast_ref::<StringArray>()
4384 .expect("string child");
4385 assert_eq!(str_child.len(), 1);
4386 assert_eq!(str_child.value(0), "x");
4387 }
4388
4389 #[test]
4390 fn test_union_dense_with_null_and_string_children() {
4391 let union_dt = make_dense_union_avro(
4392 vec![
4393 (Codec::Null, "n", DataType::Null),
4394 (Codec::Utf8, "s", DataType::Utf8),
4395 ],
4396 vec![42, 7],
4397 );
4398 let mut dec = Decoder::try_new(&union_dt).unwrap();
4399 let r1 = encode_avro_long(0);
4400 let mut r2 = Vec::new();
4401 r2.extend_from_slice(&encode_avro_long(1));
4402 r2.extend_from_slice(&encode_avro_bytes(b"abc"));
4403 let r3 = encode_avro_long(0);
4404 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4405 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4406 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4407 let array = dec.flush(None).unwrap();
4408 let ua = array
4409 .as_any()
4410 .downcast_ref::<UnionArray>()
4411 .expect("expected UnionArray");
4412 assert_eq!(ua.len(), 3);
4413 assert_eq!(ua.type_id(0), 42);
4414 assert_eq!(ua.type_id(1), 7);
4415 assert_eq!(ua.type_id(2), 42);
4416 assert_eq!(ua.value_offset(0), 0);
4417 assert_eq!(ua.value_offset(1), 0);
4418 assert_eq!(ua.value_offset(2), 1);
4419 let null_child = ua
4420 .child(42)
4421 .as_any()
4422 .downcast_ref::<NullArray>()
4423 .expect("null child");
4424 assert_eq!(null_child.len(), 2);
4425 let str_child = ua
4426 .child(7)
4427 .as_any()
4428 .downcast_ref::<StringArray>()
4429 .expect("string child");
4430 assert_eq!(str_child.len(), 1);
4431 assert_eq!(str_child.value(0), "abc");
4432 }
4433
4434 #[test]
4435 fn test_union_decode_negative_branch_index_errors() {
4436 let union_dt = make_dense_union_avro(
4437 vec![
4438 (Codec::Int32, "i", DataType::Int32),
4439 (Codec::Utf8, "s", DataType::Utf8),
4440 ],
4441 vec![0, 1],
4442 );
4443 let mut dec = Decoder::try_new(&union_dt).unwrap();
4444 let row = encode_avro_long(-1); let err = dec
4446 .decode(&mut AvroCursor::new(&row))
4447 .expect_err("expected error for negative branch index");
4448 let msg = err.to_string();
4449 assert!(
4450 msg.contains("Negative union branch index"),
4451 "unexpected error message: {msg}"
4452 );
4453 }
4454
4455 #[test]
4456 fn test_union_decode_out_of_range_branch_index_errors() {
4457 let union_dt = make_dense_union_avro(
4458 vec![
4459 (Codec::Int32, "i", DataType::Int32),
4460 (Codec::Utf8, "s", DataType::Utf8),
4461 ],
4462 vec![10, 11],
4463 );
4464 let mut dec = Decoder::try_new(&union_dt).unwrap();
4465 let row = encode_avro_long(2);
4466 let err = dec
4467 .decode(&mut AvroCursor::new(&row))
4468 .expect_err("expected error for out-of-range branch index");
4469 let msg = err.to_string();
4470 assert!(
4471 msg.contains("out of range"),
4472 "unexpected error message: {msg}"
4473 );
4474 }
4475
4476 #[test]
4477 fn test_union_sparse_mode_not_supported() {
4478 let children: Vec<AvroDataType> = vec![
4479 AvroDataType::new(Codec::Int32, Default::default(), None),
4480 AvroDataType::new(Codec::Utf8, Default::default(), None),
4481 ];
4482 let uf = UnionFields::try_new(
4483 vec![1, 3],
4484 vec![
4485 arrow_schema::Field::new("i", DataType::Int32, true),
4486 arrow_schema::Field::new("s", DataType::Utf8, true),
4487 ],
4488 )
4489 .unwrap();
4490 let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
4491 let dt = AvroDataType::new(codec, Default::default(), None);
4492 let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
4493 let msg = err.to_string();
4494 assert!(
4495 msg.contains("Sparse Arrow unions are not yet supported"),
4496 "unexpected error message: {msg}"
4497 );
4498 }
4499
4500 fn make_record_decoder_with_projector_defaults(
4501 reader_fields: &[(&str, DataType, bool)],
4502 field_defaults: Vec<Option<AvroLiteral>>,
4503 default_injections: Vec<(usize, AvroLiteral)>,
4504 ) -> Decoder {
4505 assert_eq!(
4506 field_defaults.len(),
4507 reader_fields.len(),
4508 "field_defaults must have one entry per reader field"
4509 );
4510 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4511 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4512 for (name, dt, nullable) in reader_fields {
4513 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4514 let enc = match dt {
4515 DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
4516 DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
4517 DataType::Utf8 => Decoder::String(
4518 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4519 Vec::with_capacity(DEFAULT_CAPACITY),
4520 ),
4521 other => panic!("Unsupported test field type in helper: {other:?}"),
4522 };
4523 encodings.push(enc);
4524 }
4525 let fields: Fields = field_refs.into();
4526 let projector = Projector {
4527 writer_projections: vec![],
4528 default_injections: Arc::from(default_injections),
4529 };
4530 Decoder::Record(fields, encodings, field_defaults, Some(projector))
4531 }
4532
4533 #[cfg(feature = "avro_custom_types")]
4534 #[test]
4535 fn test_default_append_custom_integer_range_validation() {
4536 let mut d_i8 = Decoder::Int8(Vec::with_capacity(DEFAULT_CAPACITY));
4537 d_i8.append_default(&AvroLiteral::Int(i8::MIN as i32))
4538 .unwrap();
4539 d_i8.append_default(&AvroLiteral::Int(i8::MAX as i32))
4540 .unwrap();
4541 let err_i8_high = d_i8
4542 .append_default(&AvroLiteral::Int(i8::MAX as i32 + 1))
4543 .unwrap_err();
4544 assert!(err_i8_high.to_string().contains("out of range for i8"));
4545 let err_i8_low = d_i8
4546 .append_default(&AvroLiteral::Int(i8::MIN as i32 - 1))
4547 .unwrap_err();
4548 assert!(err_i8_low.to_string().contains("out of range for i8"));
4549 let arr_i8 = d_i8.flush(None).unwrap();
4550 let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4551 assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4552
4553 let mut d_i16 = Decoder::Int16(Vec::with_capacity(DEFAULT_CAPACITY));
4554 d_i16
4555 .append_default(&AvroLiteral::Int(i16::MIN as i32))
4556 .unwrap();
4557 d_i16
4558 .append_default(&AvroLiteral::Int(i16::MAX as i32))
4559 .unwrap();
4560 let err_i16_high = d_i16
4561 .append_default(&AvroLiteral::Int(i16::MAX as i32 + 1))
4562 .unwrap_err();
4563 assert!(err_i16_high.to_string().contains("out of range for i16"));
4564 let err_i16_low = d_i16
4565 .append_default(&AvroLiteral::Int(i16::MIN as i32 - 1))
4566 .unwrap_err();
4567 assert!(err_i16_low.to_string().contains("out of range for i16"));
4568 let arr_i16 = d_i16.flush(None).unwrap();
4569 let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4570 assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4571
4572 let mut d_u8 = Decoder::UInt8(Vec::with_capacity(DEFAULT_CAPACITY));
4573 d_u8.append_default(&AvroLiteral::Int(0)).unwrap();
4574 d_u8.append_default(&AvroLiteral::Int(u8::MAX as i32))
4575 .unwrap();
4576 let err_u8_neg = d_u8.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4577 assert!(err_u8_neg.to_string().contains("out of range for u8"));
4578 let err_u8_high = d_u8
4579 .append_default(&AvroLiteral::Int(u8::MAX as i32 + 1))
4580 .unwrap_err();
4581 assert!(err_u8_high.to_string().contains("out of range for u8"));
4582 let arr_u8 = d_u8.flush(None).unwrap();
4583 let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4584 assert_eq!(values_u8.values(), &[0, u8::MAX]);
4585
4586 let mut d_u16 = Decoder::UInt16(Vec::with_capacity(DEFAULT_CAPACITY));
4587 d_u16.append_default(&AvroLiteral::Int(0)).unwrap();
4588 d_u16
4589 .append_default(&AvroLiteral::Int(u16::MAX as i32))
4590 .unwrap();
4591 let err_u16_neg = d_u16.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4592 assert!(err_u16_neg.to_string().contains("out of range for u16"));
4593 let err_u16_high = d_u16
4594 .append_default(&AvroLiteral::Int(u16::MAX as i32 + 1))
4595 .unwrap_err();
4596 assert!(err_u16_high.to_string().contains("out of range for u16"));
4597 let arr_u16 = d_u16.flush(None).unwrap();
4598 let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4599 assert_eq!(values_u16.values(), &[0, u16::MAX]);
4600
4601 let mut d_u32 = Decoder::UInt32(Vec::with_capacity(DEFAULT_CAPACITY));
4602 d_u32.append_default(&AvroLiteral::Long(0)).unwrap();
4603 d_u32
4604 .append_default(&AvroLiteral::Long(u32::MAX as i64))
4605 .unwrap();
4606 let err_u32_neg = d_u32.append_default(&AvroLiteral::Long(-1)).unwrap_err();
4607 assert!(err_u32_neg.to_string().contains("out of range for u32"));
4608 let err_u32_high = d_u32
4609 .append_default(&AvroLiteral::Long(u32::MAX as i64 + 1))
4610 .unwrap_err();
4611 assert!(err_u32_high.to_string().contains("out of range for u32"));
4612 let arr_u32 = d_u32.flush(None).unwrap();
4613 let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4614 assert_eq!(values_u32.values(), &[0, u32::MAX]);
4615 }
4616
4617 #[cfg(feature = "avro_custom_types")]
4618 #[test]
4619 fn test_decode_custom_integer_range_validation() {
4620 let mut d_i8 = Decoder::try_new(&avro_from_codec(Codec::Int8)).unwrap();
4621 d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32)))
4622 .unwrap();
4623 d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32)))
4624 .unwrap();
4625 let err_i8_high = d_i8
4626 .decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32 + 1)))
4627 .unwrap_err();
4628 assert!(err_i8_high.to_string().contains("out of range for i8"));
4629 let err_i8_low = d_i8
4630 .decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32 - 1)))
4631 .unwrap_err();
4632 assert!(err_i8_low.to_string().contains("out of range for i8"));
4633 let arr_i8 = d_i8.flush(None).unwrap();
4634 let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4635 assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4636
4637 let mut d_i16 = Decoder::try_new(&avro_from_codec(Codec::Int16)).unwrap();
4638 d_i16
4639 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32)))
4640 .unwrap();
4641 d_i16
4642 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32)))
4643 .unwrap();
4644 let err_i16_high = d_i16
4645 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32 + 1)))
4646 .unwrap_err();
4647 assert!(err_i16_high.to_string().contains("out of range for i16"));
4648 let err_i16_low = d_i16
4649 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32 - 1)))
4650 .unwrap_err();
4651 assert!(err_i16_low.to_string().contains("out of range for i16"));
4652 let arr_i16 = d_i16.flush(None).unwrap();
4653 let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4654 assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4655
4656 let mut d_u8 = Decoder::try_new(&avro_from_codec(Codec::UInt8)).unwrap();
4657 d_u8.decode(&mut AvroCursor::new(&encode_avro_int(0)))
4658 .unwrap();
4659 d_u8.decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32)))
4660 .unwrap();
4661 let err_u8_neg = d_u8
4662 .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4663 .unwrap_err();
4664 assert!(err_u8_neg.to_string().contains("out of range for u8"));
4665 let err_u8_high = d_u8
4666 .decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32 + 1)))
4667 .unwrap_err();
4668 assert!(err_u8_high.to_string().contains("out of range for u8"));
4669 let arr_u8 = d_u8.flush(None).unwrap();
4670 let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4671 assert_eq!(values_u8.values(), &[0, u8::MAX]);
4672
4673 let mut d_u16 = Decoder::try_new(&avro_from_codec(Codec::UInt16)).unwrap();
4674 d_u16
4675 .decode(&mut AvroCursor::new(&encode_avro_int(0)))
4676 .unwrap();
4677 d_u16
4678 .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32)))
4679 .unwrap();
4680 let err_u16_neg = d_u16
4681 .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4682 .unwrap_err();
4683 assert!(err_u16_neg.to_string().contains("out of range for u16"));
4684 let err_u16_high = d_u16
4685 .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32 + 1)))
4686 .unwrap_err();
4687 assert!(err_u16_high.to_string().contains("out of range for u16"));
4688 let arr_u16 = d_u16.flush(None).unwrap();
4689 let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4690 assert_eq!(values_u16.values(), &[0, u16::MAX]);
4691
4692 let mut d_u32 = Decoder::try_new(&avro_from_codec(Codec::UInt32)).unwrap();
4693 d_u32
4694 .decode(&mut AvroCursor::new(&encode_avro_long(0)))
4695 .unwrap();
4696 d_u32
4697 .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64)))
4698 .unwrap();
4699 let err_u32_neg = d_u32
4700 .decode(&mut AvroCursor::new(&encode_avro_long(-1)))
4701 .unwrap_err();
4702 assert!(err_u32_neg.to_string().contains("out of range for u32"));
4703 let err_u32_high = d_u32
4704 .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64 + 1)))
4705 .unwrap_err();
4706 assert!(err_u32_high.to_string().contains("out of range for u32"));
4707 let arr_u32 = d_u32.flush(None).unwrap();
4708 let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4709 assert_eq!(values_u32.values(), &[0, u32::MAX]);
4710 }
4711
4712 #[test]
4713 fn test_default_append_int32_and_int64_from_int_and_long() {
4714 let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4715 d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
4716 let arr = d_i32.flush(None).unwrap();
4717 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4718 assert_eq!(a.len(), 1);
4719 assert_eq!(a.value(0), 42);
4720 let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
4721 d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
4722 d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
4723 let arr64 = d_i64.flush(None).unwrap();
4724 let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
4725 assert_eq!(a64.len(), 2);
4726 assert_eq!(a64.value(0), 5);
4727 assert_eq!(a64.value(1), 7);
4728 }
4729
4730 #[test]
4731 fn test_default_append_floats_and_doubles() {
4732 let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
4733 d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
4734 let arr32 = d_f32.flush(None).unwrap();
4735 let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
4736 assert_eq!(a.value(0), 1.5);
4737 let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4738 d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
4739 let arr64 = d_f64.flush(None).unwrap();
4740 let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
4741 assert_eq!(b.value(0), 2.25);
4742 }
4743
4744 #[test]
4745 fn test_default_append_string_and_bytes() {
4746 let mut d_str = Decoder::String(
4747 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4748 Vec::with_capacity(DEFAULT_CAPACITY),
4749 );
4750 d_str
4751 .append_default(&AvroLiteral::String("hi".into()))
4752 .unwrap();
4753 let s_arr = d_str.flush(None).unwrap();
4754 let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
4755 assert_eq!(arr.value(0), "hi");
4756 let mut d_bytes = Decoder::Binary(
4757 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4758 Vec::with_capacity(DEFAULT_CAPACITY),
4759 );
4760 d_bytes
4761 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4762 .unwrap();
4763 let b_arr = d_bytes.flush(None).unwrap();
4764 let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
4765 assert_eq!(barr.value(0), &[1, 2, 3]);
4766 let mut d_str_err = Decoder::String(
4767 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4768 Vec::with_capacity(DEFAULT_CAPACITY),
4769 );
4770 let err = d_str_err
4771 .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
4772 .unwrap_err();
4773 assert!(
4774 err.to_string()
4775 .contains("Default for string must be string"),
4776 "unexpected error: {err:?}"
4777 );
4778 }
4779
4780 #[test]
4781 fn test_default_append_nullable_int32_null_and_value() {
4782 let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4783 let mut dec = Decoder::Nullable(
4784 NullablePlan::ReadTag {
4785 nullability: Nullability::NullFirst,
4786 resolution: ResolutionPlan::Promotion(Promotion::Direct),
4787 },
4788 NullBufferBuilder::new(DEFAULT_CAPACITY),
4789 Box::new(inner),
4790 );
4791 dec.append_default(&AvroLiteral::Null).unwrap();
4792 dec.append_default(&AvroLiteral::Int(11)).unwrap();
4793 let arr = dec.flush(None).unwrap();
4794 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4795 assert_eq!(a.len(), 2);
4796 assert!(a.is_null(0));
4797 assert_eq!(a.value(1), 11);
4798 }
4799
4800 #[test]
4801 fn test_default_append_array_of_ints() {
4802 let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4803 let mut d = Decoder::try_new(&list_dt).unwrap();
4804 let items = vec![
4805 AvroLiteral::Int(1),
4806 AvroLiteral::Int(2),
4807 AvroLiteral::Int(3),
4808 ];
4809 d.append_default(&AvroLiteral::Array(items)).unwrap();
4810 let arr = d.flush(None).unwrap();
4811 let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4812 assert_eq!(list.len(), 1);
4813 assert_eq!(list.value_length(0), 3);
4814 let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4815 assert_eq!(vals.values(), &[1, 2, 3]);
4816 }
4817
4818 #[test]
4819 fn test_default_append_map_string_to_int() {
4820 let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4821 let mut d = Decoder::try_new(&map_dt).unwrap();
4822 let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4823 m.insert("k1".to_string(), AvroLiteral::Int(10));
4824 m.insert("k2".to_string(), AvroLiteral::Int(20));
4825 d.append_default(&AvroLiteral::Map(m)).unwrap();
4826 let arr = d.flush(None).unwrap();
4827 let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4828 assert_eq!(map.len(), 1);
4829 assert_eq!(map.value_length(0), 2);
4830 let binding = map.value(0);
4831 let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4832 let k = entries
4833 .column_by_name("key")
4834 .unwrap()
4835 .as_any()
4836 .downcast_ref::<StringArray>()
4837 .unwrap();
4838 let v = entries
4839 .column_by_name("value")
4840 .unwrap()
4841 .as_any()
4842 .downcast_ref::<Int32Array>()
4843 .unwrap();
4844 let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4845 assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4846 let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4847 assert_eq!(vals, [10, 20].into_iter().collect());
4848 }
4849
4850 #[test]
4851 fn test_default_append_enum_by_symbol() {
4852 let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4853 let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4854 d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4855 let arr = d.flush(None).unwrap();
4856 let dict = arr
4857 .as_any()
4858 .downcast_ref::<DictionaryArray<Int32Type>>()
4859 .unwrap();
4860 assert_eq!(dict.len(), 1);
4861 let expected = Int32Array::from(vec![1]);
4862 assert_eq!(dict.keys(), &expected);
4863 let values = dict
4864 .values()
4865 .as_any()
4866 .downcast_ref::<StringArray>()
4867 .unwrap();
4868 assert_eq!(values.value(1), "B");
4869 }
4870
4871 #[test]
4872 fn test_default_append_uuid_and_type_error() {
4873 let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4874 let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4875 d.append_default(&AvroLiteral::String(uuid_str.into()))
4876 .unwrap();
4877 let arr_ref = d.flush(None).unwrap();
4878 let arr = arr_ref
4879 .as_any()
4880 .downcast_ref::<FixedSizeBinaryArray>()
4881 .unwrap();
4882 assert_eq!(arr.value_length(), 16);
4883 assert_eq!(arr.len(), 1);
4884 let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4885 let err = d2
4886 .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4887 .unwrap_err();
4888 assert!(
4889 err.to_string().contains("Default for uuid must be string"),
4890 "unexpected error: {err:?}"
4891 );
4892 }
4893
4894 #[test]
4895 fn test_default_append_fixed_and_length_mismatch() {
4896 let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4897 d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4898 .unwrap();
4899 let arr_ref = d.flush(None).unwrap();
4900 let arr = arr_ref
4901 .as_any()
4902 .downcast_ref::<FixedSizeBinaryArray>()
4903 .unwrap();
4904 assert_eq!(arr.value_length(), 4);
4905 assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4906 let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4907 let err = d_err
4908 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4909 .unwrap_err();
4910 assert!(
4911 err.to_string().contains("Fixed default length"),
4912 "unexpected error: {err:?}"
4913 );
4914 }
4915
4916 #[test]
4917 fn test_default_append_duration_and_length_validation() {
4918 let dt = avro_from_codec(Codec::Interval);
4919 let mut d = Decoder::try_new(&dt).unwrap();
4920 let mut bytes = Vec::with_capacity(12);
4921 bytes.extend_from_slice(&1u32.to_le_bytes());
4922 bytes.extend_from_slice(&2u32.to_le_bytes());
4923 bytes.extend_from_slice(&3u32.to_le_bytes());
4924 d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4925 let arr_ref = d.flush(None).unwrap();
4926 let arr = arr_ref
4927 .as_any()
4928 .downcast_ref::<IntervalMonthDayNanoArray>()
4929 .unwrap();
4930 assert_eq!(arr.len(), 1);
4931 let v = arr.value(0);
4932 assert_eq!(v.months, 1);
4933 assert_eq!(v.days, 2);
4934 assert_eq!(v.nanoseconds, 3_000_000);
4935 let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4936 let err = d_err
4937 .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4938 .unwrap_err();
4939 assert!(
4940 err.to_string()
4941 .contains("Duration default must be exactly 12 bytes"),
4942 "unexpected error: {err:?}"
4943 );
4944 }
4945
4946 #[test]
4947 fn test_default_append_decimal256_from_bytes() {
4948 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4949 let mut d = Decoder::try_new(&dt).unwrap();
4950 let pos: [u8; 32] = [
4951 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4952 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4953 0x00, 0x00, 0x30, 0x39,
4954 ];
4955 d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4956 let neg: [u8; 32] = [
4957 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4958 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4959 0xFF, 0xFF, 0xFF, 0x85,
4960 ];
4961 d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4962 let arr = d.flush(None).unwrap();
4963 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4964 assert_eq!(dec.len(), 2);
4965 assert_eq!(dec.value_as_string(0), "123.45");
4966 assert_eq!(dec.value_as_string(1), "-1.23");
4967 }
4968
4969 #[test]
4970 fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4971 let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4972 let mut rec = make_record_decoder_with_projector_defaults(
4973 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4974 field_defaults,
4975 vec![],
4976 );
4977 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4978 map.insert("a".to_string(), AvroLiteral::Int(7));
4979 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4980 let arr = rec.flush(None).unwrap();
4981 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4982 let a = s
4983 .column_by_name("a")
4984 .unwrap()
4985 .as_any()
4986 .downcast_ref::<Int32Array>()
4987 .unwrap();
4988 let b = s
4989 .column_by_name("b")
4990 .unwrap()
4991 .as_any()
4992 .downcast_ref::<StringArray>()
4993 .unwrap();
4994 assert_eq!(a.value(0), 7);
4995 assert_eq!(b.value(0), "hi");
4996 }
4997
4998 #[test]
4999 fn test_record_append_default_null_uses_projector_field_defaults() {
5000 let field_defaults = vec![
5001 Some(AvroLiteral::Int(5)),
5002 Some(AvroLiteral::String("x".into())),
5003 ];
5004 let mut rec = make_record_decoder_with_projector_defaults(
5005 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
5006 field_defaults,
5007 vec![],
5008 );
5009 rec.append_default(&AvroLiteral::Null).unwrap();
5010 let arr = rec.flush(None).unwrap();
5011 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5012 let a = s
5013 .column_by_name("a")
5014 .unwrap()
5015 .as_any()
5016 .downcast_ref::<Int32Array>()
5017 .unwrap();
5018 let b = s
5019 .column_by_name("b")
5020 .unwrap()
5021 .as_any()
5022 .downcast_ref::<StringArray>()
5023 .unwrap();
5024 assert_eq!(a.value(0), 5);
5025 assert_eq!(b.value(0), "x");
5026 }
5027
5028 #[test]
5029 fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
5030 {
5031 let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
5032 let mut field_refs: Vec<FieldRef> = Vec::new();
5033 let mut encoders: Vec<Decoder> = Vec::new();
5034 for (name, dt, nullable) in &fields {
5035 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
5036 }
5037 let enc_a = Decoder::Nullable(
5038 NullablePlan::ReadTag {
5039 nullability: Nullability::NullSecond,
5040 resolution: ResolutionPlan::Promotion(Promotion::Direct),
5041 },
5042 NullBufferBuilder::new(DEFAULT_CAPACITY),
5043 Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
5044 );
5045 let enc_b = Decoder::Nullable(
5046 NullablePlan::ReadTag {
5047 nullability: Nullability::NullSecond,
5048 resolution: ResolutionPlan::Promotion(Promotion::Direct),
5049 },
5050 NullBufferBuilder::new(DEFAULT_CAPACITY),
5051 Box::new(Decoder::String(
5052 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
5053 Vec::with_capacity(DEFAULT_CAPACITY),
5054 )),
5055 );
5056 encoders.push(enc_a);
5057 encoders.push(enc_b);
5058 let field_defaults = vec![None, None]; let projector = Projector {
5060 writer_projections: vec![],
5061 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
5062 };
5063 let mut rec = Decoder::Record(field_refs.into(), encoders, field_defaults, Some(projector));
5064 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
5065 map.insert("a".to_string(), AvroLiteral::Int(9));
5066 rec.append_default(&AvroLiteral::Map(map)).unwrap();
5067 let arr = rec.flush(None).unwrap();
5068 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5069 let a = s
5070 .column_by_name("a")
5071 .unwrap()
5072 .as_any()
5073 .downcast_ref::<Int32Array>()
5074 .unwrap();
5075 let b = s
5076 .column_by_name("b")
5077 .unwrap()
5078 .as_any()
5079 .downcast_ref::<StringArray>()
5080 .unwrap();
5081 assert!(a.is_valid(0));
5082 assert_eq!(a.value(0), 9);
5083 assert!(b.is_null(0));
5084 }
5085
5086 #[test]
5087 fn test_projector_default_injection_when_writer_lacks_fields() {
5088 let defaults = vec![None, None];
5089 let injections = vec![
5090 (0, AvroLiteral::Int(99)),
5091 (1, AvroLiteral::String("alice".into())),
5092 ];
5093 let mut rec = make_record_decoder_with_projector_defaults(
5094 &[
5095 ("id", DataType::Int32, false),
5096 ("name", DataType::Utf8, false),
5097 ],
5098 defaults,
5099 injections,
5100 );
5101 rec.decode(&mut AvroCursor::new(&[])).unwrap();
5102 let arr = rec.flush(None).unwrap();
5103 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5104 let id = s
5105 .column_by_name("id")
5106 .unwrap()
5107 .as_any()
5108 .downcast_ref::<Int32Array>()
5109 .unwrap();
5110 let name = s
5111 .column_by_name("name")
5112 .unwrap()
5113 .as_any()
5114 .downcast_ref::<StringArray>()
5115 .unwrap();
5116 assert_eq!(id.value(0), 99);
5117 assert_eq!(name.value(0), "alice");
5118 }
5119
5120 #[test]
5121 fn union_type_ids_are_not_child_indexes() {
5122 let encodings: Vec<AvroDataType> =
5123 vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
5124 let fields: UnionFields = [
5125 (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
5126 (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
5127 ]
5128 .into_iter()
5129 .collect();
5130 let dt = avro_from_codec(Codec::Union(
5131 encodings.into(),
5132 fields.clone(),
5133 UnionMode::Dense,
5134 ));
5135 let mut dec = Decoder::try_new(&dt).expect("decoder");
5136 let mut b1 = encode_avro_long(1);
5137 b1.extend(encode_avro_bytes("hi".as_bytes()));
5138 dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
5139 let mut b0 = encode_avro_long(0);
5140 b0.extend(encode_avro_int(5));
5141 dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
5142 let arr = dec.flush(None).expect("flush");
5143 let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
5144 assert_eq!(ua.len(), 2);
5145 assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
5146 assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
5147 assert_eq!(ua.value_offset(0), 0);
5148 assert_eq!(ua.value_offset(1), 0);
5149 let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
5150 assert_eq!(utf8_child.len(), 1);
5151 assert_eq!(utf8_child.value(0), "hi");
5152 let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
5153 assert_eq!(int_child.len(), 1);
5154 assert_eq!(int_child.value(0), 5);
5155 let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
5156 assert_eq!(type_ids, vec![42_i8, 7_i8]);
5157 }
5158
5159 #[cfg(feature = "avro_custom_types")]
5160 #[test]
5161 fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
5162 for codec in [
5163 Codec::DurationNanos,
5164 Codec::DurationMicros,
5165 Codec::DurationMillis,
5166 Codec::DurationSeconds,
5167 ] {
5168 let dt = make_avro_dt(codec.clone(), None);
5169 let s = Skipper::from_avro(&dt)?;
5170 match s {
5171 Skipper::Int64 => {}
5172 other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
5173 }
5174 }
5175 Ok(())
5176 }
5177
5178 #[cfg(feature = "avro_custom_types")]
5179 #[test]
5180 fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
5181 let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
5182 for codec in [
5183 Codec::DurationNanos,
5184 Codec::DurationMicros,
5185 Codec::DurationMillis,
5186 Codec::DurationSeconds,
5187 ] {
5188 let dt = make_avro_dt(codec.clone(), None);
5189 let s = Skipper::from_avro(&dt)?;
5190 for &v in &values {
5191 let bytes = encode_avro_long(v);
5192 let mut cursor = AvroCursor::new(&bytes);
5193 s.skip(&mut cursor)?;
5194 assert_eq!(
5195 cursor.position(),
5196 bytes.len(),
5197 "did not consume all bytes for {:?} value {}",
5198 codec,
5199 v
5200 );
5201 }
5202 }
5203 Ok(())
5204 }
5205
5206 #[cfg(feature = "avro_custom_types")]
5207 #[test]
5208 fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
5209 let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
5210 let s = Skipper::from_avro(&dt)?;
5211 match &s {
5212 Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
5213 Skipper::Int64 => {}
5214 ref other => panic!("expected inner Int64, got {:?}", other),
5215 },
5216 other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
5217 }
5218 {
5219 let buf = encode_vlq_u64(0);
5220 let mut cursor = AvroCursor::new(&buf);
5221 s.skip(&mut cursor)?;
5222 assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
5223 }
5224 {
5225 let mut buf = encode_vlq_u64(1);
5226 buf.extend(encode_avro_long(0));
5227 let mut cursor = AvroCursor::new(&buf);
5228 s.skip(&mut cursor)?;
5229 assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
5230 }
5231
5232 Ok(())
5233 }
5234
5235 #[cfg(feature = "avro_custom_types")]
5236 #[test]
5237 fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
5238 let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
5239 let s = Skipper::from_avro(&dt)?;
5240 match &s {
5241 Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
5242 Skipper::Int64 => {}
5243 ref other => panic!("expected inner Int64, got {:?}", other),
5244 },
5245 other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
5246 }
5247 {
5248 let buf = encode_vlq_u64(1);
5249 let mut cursor = AvroCursor::new(&buf);
5250 s.skip(&mut cursor)?;
5251 assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
5252 }
5253 {
5254 let mut buf = encode_vlq_u64(0);
5255 buf.extend(encode_avro_long(-1));
5256 let mut cursor = AvroCursor::new(&buf);
5257 s.skip(&mut cursor)?;
5258 assert_eq!(
5259 cursor.position(),
5260 1 + encode_avro_long(-1).len(),
5261 "expected to consume tag=0 + long(-1)"
5262 );
5263 }
5264 Ok(())
5265 }
5266
5267 #[test]
5268 fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
5269 let dt = make_avro_dt(Codec::Interval, None);
5270 let s = Skipper::from_avro(&dt)?;
5271 match s {
5272 Skipper::DurationFixed12 => {}
5273 other => panic!("expected DurationFixed12, got {:?}", other),
5274 }
5275 let payload = vec![0u8; 12];
5276 let mut cursor = AvroCursor::new(&payload);
5277 s.skip(&mut cursor)?;
5278 assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
5279 Ok(())
5280 }
5281
5282 #[cfg(feature = "avro_custom_types")]
5283 #[test]
5284 fn test_run_end_encoded_width16_int32_basic_grouping() {
5285 use arrow_array::RunArray;
5286 use std::sync::Arc;
5287 let inner = avro_from_codec(Codec::Int32);
5288 let ree = AvroDataType::new(
5289 Codec::RunEndEncoded(Arc::new(inner), 16),
5290 Default::default(),
5291 None,
5292 );
5293 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5294 for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
5295 let bytes = encode_avro_int(v);
5296 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5297 }
5298 let arr = dec.flush(None).expect("flush");
5299 let ra = arr
5300 .as_any()
5301 .downcast_ref::<RunArray<Int16Type>>()
5302 .expect("RunArray<Int16Type>");
5303 assert_eq!(ra.len(), 9);
5304 assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
5305 let vals = ra
5306 .values()
5307 .as_ref()
5308 .as_any()
5309 .downcast_ref::<Int32Array>()
5310 .expect("values Int32");
5311 assert_eq!(vals.values(), &[1, 2, 3]);
5312 }
5313
5314 #[cfg(feature = "avro_custom_types")]
5315 #[test]
5316 fn test_run_end_encoded_width32_nullable_values_group_nulls() {
5317 use arrow_array::RunArray;
5318 use std::sync::Arc;
5319 let inner = AvroDataType::new(
5320 Codec::Int32,
5321 Default::default(),
5322 Some(Nullability::NullSecond),
5323 );
5324 let ree = AvroDataType::new(
5325 Codec::RunEndEncoded(Arc::new(inner), 32),
5326 Default::default(),
5327 None,
5328 );
5329 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5330 let seq: [Option<i32>; 8] = [
5331 None,
5332 None,
5333 Some(7),
5334 Some(7),
5335 Some(7),
5336 None,
5337 Some(5),
5338 Some(5),
5339 ];
5340 for item in seq {
5341 let mut bytes = Vec::new();
5342 match item {
5343 None => bytes.extend_from_slice(&encode_vlq_u64(1)),
5344 Some(v) => {
5345 bytes.extend_from_slice(&encode_vlq_u64(0));
5346 bytes.extend_from_slice(&encode_avro_int(v));
5347 }
5348 }
5349 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5350 }
5351 let arr = dec.flush(None).expect("flush");
5352 let ra = arr
5353 .as_any()
5354 .downcast_ref::<RunArray<Int32Type>>()
5355 .expect("RunArray<Int32Type>");
5356 assert_eq!(ra.len(), 8);
5357 assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
5358 let vals = ra
5359 .values()
5360 .as_ref()
5361 .as_any()
5362 .downcast_ref::<Int32Array>()
5363 .expect("values Int32 (nullable)");
5364 assert_eq!(vals.len(), 4);
5365 assert!(vals.is_null(0));
5366 assert_eq!(vals.value(1), 7);
5367 assert!(vals.is_null(2));
5368 assert_eq!(vals.value(3), 5);
5369 }
5370
5371 #[cfg(feature = "avro_custom_types")]
5372 #[test]
5373 fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
5374 use arrow_array::RunArray;
5375 let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
5376 let ree = Decoder::RunEndEncoded(
5377 8, 0,
5379 Box::new(inner_values),
5380 );
5381 let mut dec = Decoder::Nullable(
5382 NullablePlan::FromSingle {
5383 resolution: ResolutionPlan::Promotion(Promotion::IntToDouble),
5384 },
5385 NullBufferBuilder::new(DEFAULT_CAPACITY),
5386 Box::new(ree),
5387 );
5388 for v in [1, 1, 2, 2, 2] {
5389 let bytes = encode_avro_int(v);
5390 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5391 }
5392 let arr = dec.flush(None).expect("flush");
5393 let ra = arr
5394 .as_any()
5395 .downcast_ref::<RunArray<Int64Type>>()
5396 .expect("RunArray<Int64Type>");
5397 assert_eq!(ra.len(), 5);
5398 assert_eq!(ra.run_ends().values(), &[2, 5]);
5399 let vals = ra
5400 .values()
5401 .as_ref()
5402 .as_any()
5403 .downcast_ref::<Float64Array>()
5404 .expect("values Float64");
5405 assert_eq!(vals.values(), &[1.0, 2.0]);
5406 }
5407
5408 #[cfg(feature = "avro_custom_types")]
5409 #[test]
5410 fn test_run_end_encoded_unsupported_run_end_width_errors() {
5411 use std::sync::Arc;
5412 let inner = avro_from_codec(Codec::Int32);
5413 let dt = AvroDataType::new(
5414 Codec::RunEndEncoded(Arc::new(inner), 3),
5415 Default::default(),
5416 None,
5417 );
5418 let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
5419 let msg = err.to_string();
5420 assert!(
5421 msg.contains("Unsupported run-end width")
5422 && msg.contains("16/32/64 bits or 2/4/8 bytes"),
5423 "unexpected error message: {msg}"
5424 );
5425 }
5426
5427 #[cfg(feature = "avro_custom_types")]
5428 #[test]
5429 fn test_run_end_encoded_empty_input_is_empty_runarray() {
5430 use arrow_array::RunArray;
5431 use std::sync::Arc;
5432 let inner = avro_from_codec(Codec::Utf8);
5433 let dt = AvroDataType::new(
5434 Codec::RunEndEncoded(Arc::new(inner), 4),
5435 Default::default(),
5436 None,
5437 );
5438 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5439 let arr = dec.flush(None).expect("flush");
5440 let ra = arr
5441 .as_any()
5442 .downcast_ref::<RunArray<Int32Type>>()
5443 .expect("RunArray<Int32Type>");
5444 assert_eq!(ra.len(), 0);
5445 assert_eq!(ra.run_ends().len(), 0);
5446 assert_eq!(ra.values().len(), 0);
5447 }
5448
5449 #[cfg(feature = "avro_custom_types")]
5450 #[test]
5451 fn test_run_end_encoded_strings_grouping_width32_bits() {
5452 use arrow_array::RunArray;
5453 use std::sync::Arc;
5454 let inner = avro_from_codec(Codec::Utf8);
5455 let dt = AvroDataType::new(
5456 Codec::RunEndEncoded(Arc::new(inner), 32),
5457 Default::default(),
5458 None,
5459 );
5460 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5461 for s in ["a", "a", "bb", "bb", "bb", "a"] {
5462 let bytes = encode_avro_bytes(s.as_bytes());
5463 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5464 }
5465 let arr = dec.flush(None).expect("flush");
5466 let ra = arr
5467 .as_any()
5468 .downcast_ref::<RunArray<Int32Type>>()
5469 .expect("RunArray<Int32Type>");
5470 assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
5471 let vals = ra
5472 .values()
5473 .as_ref()
5474 .as_any()
5475 .downcast_ref::<StringArray>()
5476 .expect("values String");
5477 assert_eq!(vals.len(), 3);
5478 assert_eq!(vals.value(0), "a");
5479 assert_eq!(vals.value(1), "bb");
5480 assert_eq!(vals.value(2), "a");
5481 }
5482
5483 #[cfg(not(feature = "avro_custom_types"))]
5484 #[test]
5485 fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
5486 let dt = avro_from_codec(Codec::Int32);
5487 let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
5488 for v in [1, 2, 3] {
5489 let bytes = encode_avro_int(v);
5490 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5491 }
5492 let arr = dec.flush(None).expect("flush");
5493 let a = arr
5494 .as_any()
5495 .downcast_ref::<Int32Array>()
5496 .expect("Int32Array");
5497 assert_eq!(a.values(), &[1, 2, 3]);
5498 }
5499
5500 #[test]
5501 fn test_timestamp_nanos_decoding_offset_zero() {
5502 let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::OffsetZero)));
5503 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5504 let mut data = Vec::new();
5505 for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5506 data.extend_from_slice(&encode_avro_long(v));
5507 }
5508 let mut cur = AvroCursor::new(&data);
5509 for _ in 0..4 {
5510 decoder.decode(&mut cur).expect("decode nanos ts");
5511 }
5512 let array = decoder.flush(None).expect("flush nanos ts");
5513 let ts = array
5514 .as_any()
5515 .downcast_ref::<TimestampNanosecondArray>()
5516 .expect("TimestampNanosecondArray");
5517 assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5518 match ts.data_type() {
5519 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5520 assert_eq!(tz.as_deref(), Some("+00:00"));
5521 }
5522 other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
5523 }
5524 }
5525
5526 #[test]
5527 fn test_timestamp_nanos_decoding_utc() {
5528 let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::Utc)));
5529 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5530 let mut data = Vec::new();
5531 for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5532 data.extend_from_slice(&encode_avro_long(v));
5533 }
5534 let mut cur = AvroCursor::new(&data);
5535 for _ in 0..4 {
5536 decoder.decode(&mut cur).expect("decode nanos ts");
5537 }
5538 let array = decoder.flush(None).expect("flush nanos ts");
5539 let ts = array
5540 .as_any()
5541 .downcast_ref::<TimestampNanosecondArray>()
5542 .expect("TimestampNanosecondArray");
5543 assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5544 match ts.data_type() {
5545 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5546 assert_eq!(tz.as_deref(), Some("UTC"));
5547 }
5548 other => panic!("expected Timestamp(Nanosecond, Some(\"UTC\")), got {other:?}"),
5549 }
5550 }
5551
5552 #[test]
5553 fn test_timestamp_nanos_decoding_local() {
5554 let avro_type = avro_from_codec(Codec::TimestampNanos(None));
5555 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5556 let mut data = Vec::new();
5557 for v in [10_i64, 20_i64, -30_i64] {
5558 data.extend_from_slice(&encode_avro_long(v));
5559 }
5560 let mut cur = AvroCursor::new(&data);
5561 for _ in 0..3 {
5562 decoder.decode(&mut cur).expect("decode nanos ts");
5563 }
5564 let array = decoder.flush(None).expect("flush nanos ts");
5565 let ts = array
5566 .as_any()
5567 .downcast_ref::<TimestampNanosecondArray>()
5568 .expect("TimestampNanosecondArray");
5569 assert_eq!(ts.values(), &[10, 20, -30]);
5570 match ts.data_type() {
5571 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5572 assert_eq!(tz.as_deref(), None);
5573 }
5574 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5575 }
5576 }
5577
5578 #[test]
5579 fn test_timestamp_nanos_decoding_with_nulls() {
5580 let avro_type = AvroDataType::new(
5581 Codec::TimestampNanos(None),
5582 Default::default(),
5583 Some(Nullability::NullFirst),
5584 );
5585 let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
5586 let mut data = Vec::new();
5587 data.extend_from_slice(&encode_avro_long(1));
5588 data.extend_from_slice(&encode_avro_long(42));
5589 data.extend_from_slice(&encode_avro_long(0));
5590 data.extend_from_slice(&encode_avro_long(1));
5591 data.extend_from_slice(&encode_avro_long(-7));
5592 let mut cur = AvroCursor::new(&data);
5593 for _ in 0..3 {
5594 decoder.decode(&mut cur).expect("decode nullable nanos ts");
5595 }
5596 let array = decoder.flush(None).expect("flush nullable nanos ts");
5597 let ts = array
5598 .as_any()
5599 .downcast_ref::<TimestampNanosecondArray>()
5600 .expect("TimestampNanosecondArray");
5601 assert_eq!(ts.len(), 3);
5602 assert!(ts.is_valid(0));
5603 assert!(ts.is_null(1));
5604 assert!(ts.is_valid(2));
5605 assert_eq!(ts.value(0), 42);
5606 assert_eq!(ts.value(2), -7);
5607 match ts.data_type() {
5608 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5609 assert_eq!(tz.as_deref(), None);
5610 }
5611 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5612 }
5613 }
5614}