1use crate::codec::{
21 AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo, ResolvedField,
22 ResolvedRecord, ResolvedUnion, Tz,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36 DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37 Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use strum_macros::AsRefStr;
42use uuid::Uuid;
43
44use std::cmp::Ordering;
45use std::mem;
46use std::sync::Arc;
47
48const DEFAULT_CAPACITY: usize = 1024;
49
50macro_rules! decode_decimal {
52 ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
53 let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
54 $builder.append_value(<$Int>::from_be_bytes(bytes));
55 }};
56}
57
58macro_rules! flush_decimal {
60 ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
61 let (_, vals, _) = $builder.finish().into_parts();
62 let dec = <$ArrayTy>::try_new(vals, $nulls)?
63 .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
64 Arc::new(dec) as ArrayRef
65 }};
66}
67
68macro_rules! append_decimal_default {
71 ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
72 match $lit {
73 AvroLiteral::Bytes(b) => {
74 let ext = sign_cast_to::<$N>(b)?;
75 let val = <$Int>::from_be_bytes(ext);
76 $builder.append_value(val);
77 Ok(())
78 }
79 _ => Err(AvroError::InvalidArgument(
80 concat!(
81 "Default for ",
82 $name,
83 " must be bytes (two's-complement big-endian)"
84 )
85 .to_string(),
86 )),
87 }
88 }};
89}
90
91#[derive(Debug)]
93pub(crate) struct RecordDecoder {
94 schema: SchemaRef,
95 fields: Vec<Decoder>,
96 projector: Option<Projector>,
97}
98
99impl RecordDecoder {
100 pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
111 match data_type.codec() {
112 Codec::Struct(reader_fields) => {
113 let mut arrow_fields = Vec::with_capacity(reader_fields.len());
115 let mut encodings = Vec::with_capacity(reader_fields.len());
116 let mut field_defaults = Vec::with_capacity(reader_fields.len());
117 for avro_field in reader_fields.iter() {
118 arrow_fields.push(avro_field.field());
119 encodings.push(Decoder::try_new(avro_field.data_type())?);
120
121 if let Some(ResolutionInfo::DefaultValue(lit)) =
122 avro_field.data_type().resolution.as_ref()
123 {
124 field_defaults.push(Some(lit.clone()));
125 } else {
126 field_defaults.push(None);
127 }
128 }
129 let projector = match data_type.resolution.as_ref() {
130 Some(ResolutionInfo::Record(rec)) => {
131 Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
132 }
133 _ => None,
134 };
135 Ok(Self {
136 schema: Arc::new(ArrowSchema::new(arrow_fields)),
137 fields: encodings,
138 projector,
139 })
140 }
141 other => Err(AvroError::ParseError(format!(
142 "Expected record got {other:?}"
143 ))),
144 }
145 }
146
147 pub(crate) fn schema(&self) -> &SchemaRef {
149 &self.schema
150 }
151
152 pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
154 let mut cursor = AvroCursor::new(buf);
155 match self.projector.as_mut() {
156 Some(proj) => {
157 for _ in 0..count {
158 proj.project_record(&mut cursor, &mut self.fields)?;
159 }
160 }
161 None => {
162 for _ in 0..count {
163 for field in &mut self.fields {
164 field.decode(&mut cursor)?;
165 }
166 }
167 }
168 }
169 Ok(cursor.position())
170 }
171
172 pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
174 let arrays = self
175 .fields
176 .iter_mut()
177 .map(|x| x.flush(None))
178 .collect::<Result<Vec<_>, _>>()?;
179 RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
180 }
181}
182
183#[derive(Debug, AsRefStr)]
184enum Decoder {
185 Null(usize),
186 Boolean(BooleanBufferBuilder),
187 Int32(Vec<i32>),
188 Int64(Vec<i64>),
189 #[cfg(feature = "avro_custom_types")]
190 DurationSecond(Vec<i64>),
191 #[cfg(feature = "avro_custom_types")]
192 DurationMillisecond(Vec<i64>),
193 #[cfg(feature = "avro_custom_types")]
194 DurationMicrosecond(Vec<i64>),
195 #[cfg(feature = "avro_custom_types")]
196 DurationNanosecond(Vec<i64>),
197 #[cfg(feature = "avro_custom_types")]
198 Int8(Vec<i8>),
199 #[cfg(feature = "avro_custom_types")]
200 Int16(Vec<i16>),
201 #[cfg(feature = "avro_custom_types")]
202 UInt8(Vec<u8>),
203 #[cfg(feature = "avro_custom_types")]
204 UInt16(Vec<u16>),
205 #[cfg(feature = "avro_custom_types")]
206 UInt32(Vec<u32>),
207 #[cfg(feature = "avro_custom_types")]
208 UInt64(Vec<u64>),
209 #[cfg(feature = "avro_custom_types")]
210 Float16(Vec<u16>), #[cfg(feature = "avro_custom_types")]
212 Date64(Vec<i64>),
213 #[cfg(feature = "avro_custom_types")]
214 TimeNanos(Vec<i64>),
215 #[cfg(feature = "avro_custom_types")]
216 Time32Secs(Vec<i32>),
217 #[cfg(feature = "avro_custom_types")]
218 TimestampSecs(bool, Vec<i64>),
219 #[cfg(feature = "avro_custom_types")]
220 IntervalYearMonth(Vec<i32>),
221 #[cfg(feature = "avro_custom_types")]
222 IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
223 #[cfg(feature = "avro_custom_types")]
224 IntervalDayTime(Vec<IntervalDayTime>),
225 Float32(Vec<f32>),
226 Float64(Vec<f64>),
227 Date32(Vec<i32>),
228 TimeMillis(Vec<i32>),
229 TimeMicros(Vec<i64>),
230 TimestampMillis(Option<Tz>, Vec<i64>),
231 TimestampMicros(Option<Tz>, Vec<i64>),
232 TimestampNanos(Option<Tz>, Vec<i64>),
233 Int32ToInt64(Vec<i64>),
234 Int32ToFloat32(Vec<f32>),
235 Int32ToFloat64(Vec<f64>),
236 Int64ToFloat32(Vec<f32>),
237 Int64ToFloat64(Vec<f64>),
238 Float32ToFloat64(Vec<f64>),
239 BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
240 StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
241 Binary(OffsetBufferBuilder<i32>, Vec<u8>),
242 String(OffsetBufferBuilder<i32>, Vec<u8>),
244 StringView(OffsetBufferBuilder<i32>, Vec<u8>),
246 Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
247 Record(
248 Fields,
249 Vec<Decoder>,
250 Vec<Option<AvroLiteral>>,
251 Option<Projector>,
252 ),
253 Map(
254 FieldRef,
255 OffsetBufferBuilder<i32>,
256 OffsetBufferBuilder<i32>,
257 Vec<u8>,
258 Box<Decoder>,
259 ),
260 Fixed(i32, Vec<u8>),
261 Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
262 Duration(IntervalMonthDayNanoBuilder),
263 Uuid(Vec<u8>),
264 #[cfg(feature = "small_decimals")]
265 Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
266 #[cfg(feature = "small_decimals")]
267 Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
268 Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
269 Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
270 #[cfg(feature = "avro_custom_types")]
271 RunEndEncoded(u8, usize, Box<Decoder>),
272 Union(UnionDecoder),
273 Nullable(NullablePlan, NullBufferBuilder, Box<Decoder>),
274}
275
276impl Decoder {
277 fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
278 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
279 if info.writer_is_union && !info.reader_is_union {
280 let mut clone = data_type.clone();
281 clone.resolution = None; let target = Self::try_new_internal(&clone)?;
283 let decoder = Self::Union(
284 UnionDecoderBuilder::new()
285 .with_resolved_union(info.clone())
286 .with_target(target)
287 .build()?,
288 );
289 return Ok(decoder);
290 }
291 }
292 Self::try_new_internal(data_type)
293 }
294
295 fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
296 let promotion = match data_type.resolution.as_ref() {
298 Some(ResolutionInfo::Promotion(p)) => Some(*p),
299 _ => None,
300 };
301 let decoder = match (data_type.codec(), promotion) {
302 (Codec::Int64, Some(Promotion::IntToLong)) => {
303 Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
304 }
305 (Codec::Float32, Some(Promotion::IntToFloat)) => {
306 Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
307 }
308 (Codec::Float64, Some(Promotion::IntToDouble)) => {
309 Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
310 }
311 (Codec::Float32, Some(Promotion::LongToFloat)) => {
312 Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
313 }
314 (Codec::Float64, Some(Promotion::LongToDouble)) => {
315 Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
316 }
317 (Codec::Float64, Some(Promotion::FloatToDouble)) => {
318 Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
319 }
320 (Codec::Utf8, Some(Promotion::BytesToString))
321 | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
322 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
323 Vec::with_capacity(DEFAULT_CAPACITY),
324 ),
325 (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
326 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
327 Vec::with_capacity(DEFAULT_CAPACITY),
328 ),
329 (Codec::Null, _) => Self::Null(0),
330 (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
331 (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
332 (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
333 (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
334 (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
335 (Codec::Binary, _) => Self::Binary(
336 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
337 Vec::with_capacity(DEFAULT_CAPACITY),
338 ),
339 (Codec::Utf8, _) => Self::String(
340 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
341 Vec::with_capacity(DEFAULT_CAPACITY),
342 ),
343 (Codec::Utf8View, _) => Self::StringView(
344 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
345 Vec::with_capacity(DEFAULT_CAPACITY),
346 ),
347 (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
348 (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
349 (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
350 (Codec::TimestampMillis(tz), _) => {
351 Self::TimestampMillis(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
352 }
353 (Codec::TimestampMicros(tz), _) => {
354 Self::TimestampMicros(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
355 }
356 (Codec::TimestampNanos(tz), _) => {
357 Self::TimestampNanos(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
358 }
359 #[cfg(feature = "avro_custom_types")]
360 (Codec::DurationNanos, _) => {
361 Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
362 }
363 #[cfg(feature = "avro_custom_types")]
364 (Codec::DurationMicros, _) => {
365 Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
366 }
367 #[cfg(feature = "avro_custom_types")]
368 (Codec::DurationMillis, _) => {
369 Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
370 }
371 #[cfg(feature = "avro_custom_types")]
372 (Codec::DurationSeconds, _) => {
373 Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
374 }
375 #[cfg(feature = "avro_custom_types")]
376 (Codec::Int8, _) => Self::Int8(Vec::with_capacity(DEFAULT_CAPACITY)),
377 #[cfg(feature = "avro_custom_types")]
378 (Codec::Int16, _) => Self::Int16(Vec::with_capacity(DEFAULT_CAPACITY)),
379 #[cfg(feature = "avro_custom_types")]
380 (Codec::UInt8, _) => Self::UInt8(Vec::with_capacity(DEFAULT_CAPACITY)),
381 #[cfg(feature = "avro_custom_types")]
382 (Codec::UInt16, _) => Self::UInt16(Vec::with_capacity(DEFAULT_CAPACITY)),
383 #[cfg(feature = "avro_custom_types")]
384 (Codec::UInt32, _) => Self::UInt32(Vec::with_capacity(DEFAULT_CAPACITY)),
385 #[cfg(feature = "avro_custom_types")]
386 (Codec::UInt64, _) => Self::UInt64(Vec::with_capacity(DEFAULT_CAPACITY)),
387 #[cfg(feature = "avro_custom_types")]
388 (Codec::Float16, _) => Self::Float16(Vec::with_capacity(DEFAULT_CAPACITY)),
389 #[cfg(feature = "avro_custom_types")]
390 (Codec::Date64, _) => Self::Date64(Vec::with_capacity(DEFAULT_CAPACITY)),
391 #[cfg(feature = "avro_custom_types")]
392 (Codec::TimeNanos, _) => Self::TimeNanos(Vec::with_capacity(DEFAULT_CAPACITY)),
393 #[cfg(feature = "avro_custom_types")]
394 (Codec::Time32Secs, _) => Self::Time32Secs(Vec::with_capacity(DEFAULT_CAPACITY)),
395 #[cfg(feature = "avro_custom_types")]
396 (Codec::TimestampSecs(is_utc), _) => {
397 Self::TimestampSecs(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
398 }
399 #[cfg(feature = "avro_custom_types")]
400 (Codec::IntervalYearMonth, _) => {
401 Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
402 }
403 #[cfg(feature = "avro_custom_types")]
404 (Codec::IntervalMonthDayNano, _) => {
405 Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
406 }
407 #[cfg(feature = "avro_custom_types")]
408 (Codec::IntervalDayTime, _) => {
409 Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
410 }
411 (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
412 (Codec::Decimal(precision, scale, size), _) => {
413 let p = *precision;
414 let s = *scale;
415 let prec = p as u8;
416 let scl = s.unwrap_or(0) as i8;
417 #[cfg(feature = "small_decimals")]
418 {
419 if p <= DECIMAL32_MAX_PRECISION as usize {
420 let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
421 .with_precision_and_scale(prec, scl)?;
422 Self::Decimal32(p, s, *size, builder)
423 } else if p <= DECIMAL64_MAX_PRECISION as usize {
424 let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
425 .with_precision_and_scale(prec, scl)?;
426 Self::Decimal64(p, s, *size, builder)
427 } else if p <= DECIMAL128_MAX_PRECISION as usize {
428 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
429 .with_precision_and_scale(prec, scl)?;
430 Self::Decimal128(p, s, *size, builder)
431 } else if p <= DECIMAL256_MAX_PRECISION as usize {
432 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
433 .with_precision_and_scale(prec, scl)?;
434 Self::Decimal256(p, s, *size, builder)
435 } else {
436 return Err(AvroError::ParseError(format!(
437 "Decimal precision {p} exceeds maximum supported"
438 )));
439 }
440 }
441 #[cfg(not(feature = "small_decimals"))]
442 {
443 if p <= DECIMAL128_MAX_PRECISION as usize {
444 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
445 .with_precision_and_scale(prec, scl)?;
446 Self::Decimal128(p, s, *size, builder)
447 } else if p <= DECIMAL256_MAX_PRECISION as usize {
448 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
449 .with_precision_and_scale(prec, scl)?;
450 Self::Decimal256(p, s, *size, builder)
451 } else {
452 return Err(AvroError::ParseError(format!(
453 "Decimal precision {p} exceeds maximum supported"
454 )));
455 }
456 }
457 }
458 (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
459 (Codec::List(item), _) => {
460 let decoder = Self::try_new(item)?;
461 Self::Array(
462 Arc::new(item.field_with_name("item")),
463 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
464 Box::new(decoder),
465 )
466 }
467 (Codec::Enum(symbols), _) => {
468 let res = match data_type.resolution.as_ref() {
469 Some(ResolutionInfo::EnumMapping(mapping)) => {
470 Some(EnumResolution::new(mapping))
471 }
472 _ => None,
473 };
474 Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
475 }
476 (Codec::Struct(fields), _) => {
477 let mut arrow_fields = Vec::with_capacity(fields.len());
478 let mut encodings = Vec::with_capacity(fields.len());
479 let mut field_defaults = Vec::with_capacity(fields.len());
480 for avro_field in fields.iter() {
481 let encoding = Self::try_new(avro_field.data_type())?;
482 arrow_fields.push(avro_field.field());
483 encodings.push(encoding);
484
485 if let Some(ResolutionInfo::DefaultValue(lit)) =
486 avro_field.data_type().resolution.as_ref()
487 {
488 field_defaults.push(Some(lit.clone()));
489 } else {
490 field_defaults.push(None);
491 }
492 }
493 let projector =
494 if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
495 Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
496 } else {
497 None
498 };
499 Self::Record(arrow_fields.into(), encodings, field_defaults, projector)
500 }
501 (Codec::Map(child), _) => {
502 let val_field = child.field_with_name("value");
503 let map_field = Arc::new(ArrowField::new(
504 "entries",
505 DataType::Struct(Fields::from(vec![
506 ArrowField::new("key", DataType::Utf8, false),
507 val_field,
508 ])),
509 false,
510 ));
511 let val_dec = Self::try_new(child)?;
512 Self::Map(
513 map_field,
514 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
515 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
516 Vec::with_capacity(DEFAULT_CAPACITY),
517 Box::new(val_dec),
518 )
519 }
520 (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
521 (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
522 let decoders = encodings
523 .iter()
524 .map(Self::try_new_internal)
525 .collect::<Result<Vec<_>, _>>()?;
526 if fields.len() != decoders.len() {
527 return Err(AvroError::SchemaError(format!(
528 "Union has {} fields but {} decoders",
529 fields.len(),
530 decoders.len()
531 )));
532 }
533 let branch_count = decoders.len();
536 let max_addr = (i32::MAX as usize) + 1;
537 if branch_count > max_addr {
538 return Err(AvroError::SchemaError(format!(
539 "Union has {branch_count} branches, which exceeds the maximum addressable \
540 branches by an Avro int tag ({} + 1).",
541 i32::MAX
542 )));
543 }
544 let mut builder = UnionDecoderBuilder::new()
545 .with_fields(fields.clone())
546 .with_branches(decoders);
547 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
548 if info.reader_is_union {
549 builder = builder.with_resolved_union(info.clone());
550 }
551 }
552 Self::Union(builder.build()?)
553 }
554 (Codec::Union(_, _, _), _) => {
555 return Err(AvroError::NYI(
556 "Sparse Arrow unions are not yet supported".to_string(),
557 ));
558 }
559 #[cfg(feature = "avro_custom_types")]
560 (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
561 let inner = Self::try_new(values_dt)?;
562 let byte_width: u8 = match *width_bits_or_bytes {
563 2 | 4 | 8 => *width_bits_or_bytes,
564 16 => 2,
565 32 => 4,
566 64 => 8,
567 other => {
568 return Err(AvroError::InvalidArgument(format!(
569 "Unsupported run-end width {other} for RunEndEncoded; \
570 expected 16/32/64 bits or 2/4/8 bytes"
571 )));
572 }
573 };
574 Self::RunEndEncoded(byte_width, 0, Box::new(inner))
575 }
576 };
577 Ok(match data_type.nullability() {
578 Some(nullability) => {
579 let plan = match &data_type.resolution {
581 None => NullablePlan::ReadTag {
582 nullability,
583 resolution: ResolutionPlan::Promotion(Promotion::Direct),
584 },
585 Some(ResolutionInfo::Promotion(_)) => {
586 NullablePlan::FromSingle {
589 resolution: ResolutionPlan::Promotion(Promotion::Direct),
590 }
591 }
592 Some(ResolutionInfo::Union(info)) if !info.writer_is_union => {
593 let Some(Some((_, resolution))) = info.writer_to_reader.first() else {
594 return Err(AvroError::SchemaError(
595 "unexpected union resolution info for non-union writer and union reader type".into(),
596 ));
597 };
598 let resolution = ResolutionPlan::try_new(&decoder, resolution)?;
599 NullablePlan::FromSingle { resolution }
600 }
601 Some(ResolutionInfo::Union(info)) => {
602 let Some((_, resolution)) =
603 info.writer_to_reader[nullability.non_null_index()].as_ref()
604 else {
605 return Err(AvroError::SchemaError(
606 "unexpected union resolution info for nullable writer type".into(),
607 ));
608 };
609 NullablePlan::ReadTag {
610 nullability,
611 resolution: ResolutionPlan::try_new(&decoder, resolution)?,
612 }
613 }
614 Some(resolution) => NullablePlan::FromSingle {
615 resolution: ResolutionPlan::try_new(&decoder, resolution)?,
616 },
617 };
618 Self::Nullable(
619 plan,
620 NullBufferBuilder::new(DEFAULT_CAPACITY),
621 Box::new(decoder),
622 )
623 }
624 None => decoder,
625 })
626 }
627
628 fn append_null(&mut self) -> Result<(), AvroError> {
630 match self {
631 Self::Null(count) => *count += 1,
632 Self::Boolean(b) => b.append(false),
633 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
634 Self::Int64(v)
635 | Self::Int32ToInt64(v)
636 | Self::TimeMicros(v)
637 | Self::TimestampMillis(_, v)
638 | Self::TimestampMicros(_, v)
639 | Self::TimestampNanos(_, v) => v.push(0),
640 #[cfg(feature = "avro_custom_types")]
641 Self::DurationSecond(v)
642 | Self::DurationMillisecond(v)
643 | Self::DurationMicrosecond(v)
644 | Self::DurationNanosecond(v) => v.push(0),
645 #[cfg(feature = "avro_custom_types")]
646 Self::Int8(v) => v.push(0),
647 #[cfg(feature = "avro_custom_types")]
648 Self::Int16(v) => v.push(0),
649 #[cfg(feature = "avro_custom_types")]
650 Self::UInt8(v) => v.push(0),
651 #[cfg(feature = "avro_custom_types")]
652 Self::UInt16(v) => v.push(0),
653 #[cfg(feature = "avro_custom_types")]
654 Self::UInt32(v) => v.push(0),
655 #[cfg(feature = "avro_custom_types")]
656 Self::UInt64(v) => v.push(0),
657 #[cfg(feature = "avro_custom_types")]
658 Self::Float16(v) => v.push(0),
659 #[cfg(feature = "avro_custom_types")]
660 Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => v.push(0),
661 #[cfg(feature = "avro_custom_types")]
662 Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
663 #[cfg(feature = "avro_custom_types")]
664 Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
665 #[cfg(feature = "avro_custom_types")]
666 Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
667 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
668 Self::Float64(v)
669 | Self::Int32ToFloat64(v)
670 | Self::Int64ToFloat64(v)
671 | Self::Float32ToFloat64(v) => v.push(0.),
672 Self::Binary(offsets, _)
673 | Self::String(offsets, _)
674 | Self::StringView(offsets, _)
675 | Self::BytesToString(offsets, _)
676 | Self::StringToBytes(offsets, _) => {
677 offsets.push_length(0);
678 }
679 Self::Uuid(v) => {
680 v.extend([0; 16]);
681 }
682 Self::Array(_, offsets, _) => {
683 offsets.push_length(0);
684 }
685 Self::Record(_, e, _, _) => {
686 for encoding in e.iter_mut() {
687 encoding.append_null()?;
688 }
689 }
690 Self::Map(_, _koff, moff, _, _) => {
691 moff.push_length(0);
692 }
693 Self::Fixed(sz, accum) => {
694 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
695 }
696 #[cfg(feature = "small_decimals")]
697 Self::Decimal32(_, _, _, builder) => builder.append_value(0),
698 #[cfg(feature = "small_decimals")]
699 Self::Decimal64(_, _, _, builder) => builder.append_value(0),
700 Self::Decimal128(_, _, _, builder) => builder.append_value(0),
701 Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
702 Self::Enum(indices, _, _) => indices.push(0),
703 Self::Duration(builder) => builder.append_null(),
704 #[cfg(feature = "avro_custom_types")]
705 Self::RunEndEncoded(_, len, inner) => {
706 *len += 1;
707 inner.append_null()?;
708 }
709 Self::Union(u) => u.append_null()?,
710 Self::Nullable(_, null_buffer, inner) => {
711 null_buffer.append(false);
712 inner.append_null()?;
713 }
714 }
715 Ok(())
716 }
717
718 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
720 match self {
721 Self::Nullable(_, nb, inner) => {
722 if matches!(lit, AvroLiteral::Null) {
723 nb.append(false);
724 inner.append_null()
725 } else {
726 nb.append(true);
727 inner.append_default(lit)
728 }
729 }
730 Self::Null(count) => match lit {
731 AvroLiteral::Null => {
732 *count += 1;
733 Ok(())
734 }
735 _ => Err(AvroError::InvalidArgument(
736 "Non-null default for null type".to_string(),
737 )),
738 },
739 Self::Boolean(b) => match lit {
740 AvroLiteral::Boolean(v) => {
741 b.append(*v);
742 Ok(())
743 }
744 _ => Err(AvroError::InvalidArgument(
745 "Default for boolean must be boolean".to_string(),
746 )),
747 },
748 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
749 AvroLiteral::Int(i) => {
750 v.push(*i);
751 Ok(())
752 }
753 _ => Err(AvroError::InvalidArgument(
754 "Default for int32/date32/time-millis must be int".to_string(),
755 )),
756 },
757 #[cfg(feature = "avro_custom_types")]
758 Self::DurationSecond(v)
759 | Self::DurationMillisecond(v)
760 | Self::DurationMicrosecond(v)
761 | Self::DurationNanosecond(v) => match lit {
762 AvroLiteral::Long(i) => {
763 v.push(*i);
764 Ok(())
765 }
766 _ => Err(AvroError::InvalidArgument(
767 "Default for duration long must be long".to_string(),
768 )),
769 },
770 #[cfg(feature = "avro_custom_types")]
771 Self::Int8(v) => match lit {
772 AvroLiteral::Int(i) => {
773 let x = i8::try_from(*i).map_err(|_| {
774 AvroError::InvalidArgument(format!(
775 "Default for int8 out of range for i8: {i}"
776 ))
777 })?;
778 v.push(x);
779 Ok(())
780 }
781 _ => Err(AvroError::InvalidArgument(
782 "Default for int8 must be int".to_string(),
783 )),
784 },
785 #[cfg(feature = "avro_custom_types")]
786 Self::Int16(v) => match lit {
787 AvroLiteral::Int(i) => {
788 let x = i16::try_from(*i).map_err(|_| {
789 AvroError::InvalidArgument(format!(
790 "Default for int16 out of range for i16: {i}"
791 ))
792 })?;
793 v.push(x);
794 Ok(())
795 }
796 _ => Err(AvroError::InvalidArgument(
797 "Default for int16 must be int".to_string(),
798 )),
799 },
800 #[cfg(feature = "avro_custom_types")]
801 Self::UInt8(v) => match lit {
802 AvroLiteral::Int(i) => {
803 let x = u8::try_from(*i).map_err(|_| {
804 AvroError::InvalidArgument(format!(
805 "Default for uint8 out of range for u8: {i}"
806 ))
807 })?;
808 v.push(x);
809 Ok(())
810 }
811 _ => Err(AvroError::InvalidArgument(
812 "Default for uint8 must be int".to_string(),
813 )),
814 },
815 #[cfg(feature = "avro_custom_types")]
816 Self::UInt16(v) => match lit {
817 AvroLiteral::Int(i) => {
818 let x = u16::try_from(*i).map_err(|_| {
819 AvroError::InvalidArgument(format!(
820 "Default for uint16 out of range for u16: {i}"
821 ))
822 })?;
823 v.push(x);
824 Ok(())
825 }
826 _ => Err(AvroError::InvalidArgument(
827 "Default for uint16 must be int".to_string(),
828 )),
829 },
830 #[cfg(feature = "avro_custom_types")]
831 Self::UInt32(v) => match lit {
832 AvroLiteral::Long(i) => {
833 let x = u32::try_from(*i).map_err(|_| {
834 AvroError::InvalidArgument(format!(
835 "Default for uint32 out of range for u32: {i}"
836 ))
837 })?;
838 v.push(x);
839 Ok(())
840 }
841 _ => Err(AvroError::InvalidArgument(
842 "Default for uint32 must be long".to_string(),
843 )),
844 },
845 #[cfg(feature = "avro_custom_types")]
846 Self::UInt64(v) => match lit {
847 AvroLiteral::Bytes(b) => {
848 if b.len() != 8 {
849 return Err(AvroError::InvalidArgument(format!(
850 "uint64 default must be exactly 8 bytes, got {}",
851 b.len()
852 )));
853 }
854 v.push(u64::from_le_bytes([
855 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
856 ]));
857 Ok(())
858 }
859 _ => Err(AvroError::InvalidArgument(
860 "Default for uint64 must be bytes (8-byte LE)".to_string(),
861 )),
862 },
863 #[cfg(feature = "avro_custom_types")]
864 Self::Float16(v) => match lit {
865 AvroLiteral::Bytes(b) => {
866 if b.len() != 2 {
867 return Err(AvroError::InvalidArgument(format!(
868 "float16 default must be exactly 2 bytes, got {}",
869 b.len()
870 )));
871 }
872 v.push(u16::from_le_bytes([b[0], b[1]]));
873 Ok(())
874 }
875 _ => Err(AvroError::InvalidArgument(
876 "Default for float16 must be bytes (2-byte LE IEEE-754)".to_string(),
877 )),
878 },
879 #[cfg(feature = "avro_custom_types")]
880 Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => match lit {
881 AvroLiteral::Long(i) => {
882 v.push(*i);
883 Ok(())
884 }
885 _ => Err(AvroError::InvalidArgument(
886 "Default for date64/time-nanos/timestamp-secs must be long".to_string(),
887 )),
888 },
889 #[cfg(feature = "avro_custom_types")]
890 Self::Time32Secs(v) => match lit {
891 AvroLiteral::Int(i) => {
892 v.push(*i);
893 Ok(())
894 }
895 _ => Err(AvroError::InvalidArgument(
896 "Default for time32-secs must be int".to_string(),
897 )),
898 },
899 #[cfg(feature = "avro_custom_types")]
900 Self::IntervalYearMonth(v) => match lit {
901 AvroLiteral::Bytes(b) => {
902 if b.len() != 4 {
903 return Err(AvroError::InvalidArgument(format!(
904 "interval-year-month default must be exactly 4 bytes, got {}",
905 b.len()
906 )));
907 }
908 v.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
909 Ok(())
910 }
911 _ => Err(AvroError::InvalidArgument(
912 "Default for interval-year-month must be bytes (4-byte LE)".to_string(),
913 )),
914 },
915 #[cfg(feature = "avro_custom_types")]
916 Self::IntervalMonthDayNano(v) => match lit {
917 AvroLiteral::Bytes(b) => {
918 if b.len() != 16 {
919 return Err(AvroError::InvalidArgument(format!(
920 "interval-month-day-nano default must be exactly 16 bytes, got {}",
921 b.len()
922 )));
923 }
924 let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
925 let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
926 let nanos =
927 i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
928 v.push(IntervalMonthDayNano::new(months, days, nanos));
929 Ok(())
930 }
931 _ => Err(AvroError::InvalidArgument(
932 "Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
933 )),
934 },
935 #[cfg(feature = "avro_custom_types")]
936 Self::IntervalDayTime(v) => match lit {
937 AvroLiteral::Bytes(b) => {
938 if b.len() != 8 {
939 return Err(AvroError::InvalidArgument(format!(
940 "interval-day-time default must be exactly 8 bytes, got {}",
941 b.len()
942 )));
943 }
944 let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
945 let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
946 v.push(IntervalDayTime::new(days, milliseconds));
947 Ok(())
948 }
949 _ => Err(AvroError::InvalidArgument(
950 "Default for interval-day-time must be bytes (8-byte LE)".to_string(),
951 )),
952 },
953 Self::Int64(v)
954 | Self::Int32ToInt64(v)
955 | Self::TimeMicros(v)
956 | Self::TimestampMillis(_, v)
957 | Self::TimestampMicros(_, v)
958 | Self::TimestampNanos(_, v) => match lit {
959 AvroLiteral::Long(i) => {
960 v.push(*i);
961 Ok(())
962 }
963 AvroLiteral::Int(i) => {
964 v.push(*i as i64);
965 Ok(())
966 }
967 _ => Err(AvroError::InvalidArgument(
968 "Default for long/time-micros/timestamp must be long or int".to_string(),
969 )),
970 },
971 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
972 AvroLiteral::Float(f) => {
973 v.push(*f);
974 Ok(())
975 }
976 _ => Err(AvroError::InvalidArgument(
977 "Default for float must be float".to_string(),
978 )),
979 },
980 Self::Float64(v)
981 | Self::Int32ToFloat64(v)
982 | Self::Int64ToFloat64(v)
983 | Self::Float32ToFloat64(v) => match lit {
984 AvroLiteral::Double(f) => {
985 v.push(*f);
986 Ok(())
987 }
988 _ => Err(AvroError::InvalidArgument(
989 "Default for double must be double".to_string(),
990 )),
991 },
992 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
993 AvroLiteral::Bytes(b) => {
994 offsets.push_length(b.len());
995 values.extend_from_slice(b);
996 Ok(())
997 }
998 _ => Err(AvroError::InvalidArgument(
999 "Default for bytes must be bytes".to_string(),
1000 )),
1001 },
1002 Self::BytesToString(offsets, values)
1003 | Self::String(offsets, values)
1004 | Self::StringView(offsets, values) => match lit {
1005 AvroLiteral::String(s) => {
1006 let b = s.as_bytes();
1007 offsets.push_length(b.len());
1008 values.extend_from_slice(b);
1009 Ok(())
1010 }
1011 _ => Err(AvroError::InvalidArgument(
1012 "Default for string must be string".to_string(),
1013 )),
1014 },
1015 Self::Uuid(values) => match lit {
1016 AvroLiteral::String(s) => {
1017 let uuid = Uuid::try_parse(s).map_err(|e| {
1018 AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
1019 })?;
1020 values.extend_from_slice(uuid.as_bytes());
1021 Ok(())
1022 }
1023 _ => Err(AvroError::InvalidArgument(
1024 "Default for uuid must be string".to_string(),
1025 )),
1026 },
1027 Self::Fixed(sz, accum) => match lit {
1028 AvroLiteral::Bytes(b) => {
1029 if b.len() != *sz as usize {
1030 return Err(AvroError::InvalidArgument(format!(
1031 "Fixed default length {} does not match size {sz}",
1032 b.len(),
1033 )));
1034 }
1035 accum.extend_from_slice(b);
1036 Ok(())
1037 }
1038 _ => Err(AvroError::InvalidArgument(
1039 "Default for fixed must be bytes".to_string(),
1040 )),
1041 },
1042 #[cfg(feature = "small_decimals")]
1043 Self::Decimal32(_, _, _, builder) => {
1044 append_decimal_default!(lit, builder, 4, i32, "decimal32")
1045 }
1046 #[cfg(feature = "small_decimals")]
1047 Self::Decimal64(_, _, _, builder) => {
1048 append_decimal_default!(lit, builder, 8, i64, "decimal64")
1049 }
1050 Self::Decimal128(_, _, _, builder) => {
1051 append_decimal_default!(lit, builder, 16, i128, "decimal128")
1052 }
1053 Self::Decimal256(_, _, _, builder) => {
1054 append_decimal_default!(lit, builder, 32, i256, "decimal256")
1055 }
1056 Self::Duration(builder) => match lit {
1057 AvroLiteral::Bytes(b) => {
1058 if b.len() != 12 {
1059 return Err(AvroError::InvalidArgument(format!(
1060 "Duration default must be exactly 12 bytes, got {}",
1061 b.len()
1062 )));
1063 }
1064 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1065 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1066 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1067 let nanos = (millis as i64) * 1_000_000;
1068 builder.append_value(IntervalMonthDayNano::new(
1069 months as i32,
1070 days as i32,
1071 nanos,
1072 ));
1073 Ok(())
1074 }
1075 _ => Err(AvroError::InvalidArgument(
1076 "Default for duration must be 12-byte little-endian months/days/millis"
1077 .to_string(),
1078 )),
1079 },
1080 Self::Array(_, offsets, inner) => match lit {
1081 AvroLiteral::Array(items) => {
1082 offsets.push_length(items.len());
1083 for item in items {
1084 inner.append_default(item)?;
1085 }
1086 Ok(())
1087 }
1088 _ => Err(AvroError::InvalidArgument(
1089 "Default for array must be an array literal".to_string(),
1090 )),
1091 },
1092 Self::Map(_, koff, moff, kdata, valdec) => match lit {
1093 AvroLiteral::Map(entries) => {
1094 moff.push_length(entries.len());
1095 for (k, v) in entries {
1096 let kb = k.as_bytes();
1097 koff.push_length(kb.len());
1098 kdata.extend_from_slice(kb);
1099 valdec.append_default(v)?;
1100 }
1101 Ok(())
1102 }
1103 _ => Err(AvroError::InvalidArgument(
1104 "Default for map must be a map/object literal".to_string(),
1105 )),
1106 },
1107 Self::Enum(indices, symbols, _) => match lit {
1108 AvroLiteral::Enum(sym) => {
1109 let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
1110 AvroError::InvalidArgument(format!(
1111 "Enum default symbol {sym:?} not in reader symbols"
1112 ))
1113 })?;
1114 indices.push(pos as i32);
1115 Ok(())
1116 }
1117 _ => Err(AvroError::InvalidArgument(
1118 "Default for enum must be a symbol".to_string(),
1119 )),
1120 },
1121 #[cfg(feature = "avro_custom_types")]
1122 Self::RunEndEncoded(_, len, inner) => {
1123 *len += 1;
1124 inner.append_default(lit)
1125 }
1126 Self::Union(u) => u.append_default(lit),
1127 Self::Record(field_meta, decoders, field_defaults, _) => match lit {
1128 AvroLiteral::Map(entries) => {
1129 for (i, dec) in decoders.iter_mut().enumerate() {
1130 let name = field_meta[i].name();
1131 if let Some(sub) = entries.get(name) {
1132 dec.append_default(sub)?;
1133 } else if let Some(default_literal) = field_defaults[i].as_ref() {
1134 dec.append_default(default_literal)?;
1135 } else {
1136 dec.append_null()?;
1137 }
1138 }
1139 Ok(())
1140 }
1141 AvroLiteral::Null => {
1142 for (i, dec) in decoders.iter_mut().enumerate() {
1143 if let Some(default_literal) = field_defaults[i].as_ref() {
1144 dec.append_default(default_literal)?;
1145 } else {
1146 dec.append_null()?;
1147 }
1148 }
1149 Ok(())
1150 }
1151 _ => Err(AvroError::InvalidArgument(
1152 "Default for record must be a map/object or null".to_string(),
1153 )),
1154 },
1155 }
1156 }
1157
1158 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1160 match self {
1161 Self::Null(x) => *x += 1,
1162 Self::Boolean(values) => values.append(buf.get_bool()?),
1163 Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
1164 values.push(buf.get_int()?)
1165 }
1166 Self::Int64(values)
1167 | Self::TimeMicros(values)
1168 | Self::TimestampMillis(_, values)
1169 | Self::TimestampMicros(_, values)
1170 | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
1171 #[cfg(feature = "avro_custom_types")]
1172 Self::DurationSecond(values)
1173 | Self::DurationMillisecond(values)
1174 | Self::DurationMicrosecond(values)
1175 | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
1176 #[cfg(feature = "avro_custom_types")]
1177 Self::Int8(values) => {
1178 let raw = buf.get_int()?;
1179 let x = i8::try_from(raw).map_err(|_| {
1180 AvroError::ParseError(format!("int8 value {raw} out of range for i8"))
1181 })?;
1182 values.push(x);
1183 }
1184 #[cfg(feature = "avro_custom_types")]
1185 Self::Int16(values) => {
1186 let raw = buf.get_int()?;
1187 let x = i16::try_from(raw).map_err(|_| {
1188 AvroError::ParseError(format!("int16 value {raw} out of range for i16"))
1189 })?;
1190 values.push(x);
1191 }
1192 #[cfg(feature = "avro_custom_types")]
1193 Self::UInt8(values) => {
1194 let raw = buf.get_int()?;
1195 let x = u8::try_from(raw).map_err(|_| {
1196 AvroError::ParseError(format!("uint8 value {raw} out of range for u8"))
1197 })?;
1198 values.push(x);
1199 }
1200 #[cfg(feature = "avro_custom_types")]
1201 Self::UInt16(values) => {
1202 let raw = buf.get_int()?;
1203 let x = u16::try_from(raw).map_err(|_| {
1204 AvroError::ParseError(format!("uint16 value {raw} out of range for u16"))
1205 })?;
1206 values.push(x);
1207 }
1208 #[cfg(feature = "avro_custom_types")]
1209 Self::UInt32(values) => {
1210 let raw = buf.get_long()?;
1211 let x = u32::try_from(raw).map_err(|_| {
1212 AvroError::ParseError(format!("uint32 value {raw} out of range for u32"))
1213 })?;
1214 values.push(x);
1215 }
1216 #[cfg(feature = "avro_custom_types")]
1217 Self::UInt64(values) => {
1218 let b = buf.get_fixed(8)?;
1219 values.push(u64::from_le_bytes([
1220 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
1221 ]));
1222 }
1223 #[cfg(feature = "avro_custom_types")]
1224 Self::Float16(values) => {
1225 let b = buf.get_fixed(2)?;
1226 values.push(u16::from_le_bytes([b[0], b[1]]));
1227 }
1228 #[cfg(feature = "avro_custom_types")]
1229 Self::Date64(values) | Self::TimeNanos(values) | Self::TimestampSecs(_, values) => {
1230 values.push(buf.get_long()?)
1231 }
1232 #[cfg(feature = "avro_custom_types")]
1233 Self::Time32Secs(values) => values.push(buf.get_int()?),
1234 #[cfg(feature = "avro_custom_types")]
1235 Self::IntervalYearMonth(values) => {
1236 let b = buf.get_fixed(4)?;
1237 values.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
1238 }
1239 #[cfg(feature = "avro_custom_types")]
1240 Self::IntervalMonthDayNano(values) => {
1241 let b = buf.get_fixed(16)?;
1242 let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1243 let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1244 let nanos =
1245 i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
1246 values.push(IntervalMonthDayNano::new(months, days, nanos));
1247 }
1248 #[cfg(feature = "avro_custom_types")]
1249 Self::IntervalDayTime(values) => {
1250 let b = buf.get_fixed(8)?;
1251 let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1253 let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1254 values.push(IntervalDayTime::new(days, milliseconds));
1255 }
1256 Self::Float32(values) => values.push(buf.get_float()?),
1257 Self::Float64(values) => values.push(buf.get_double()?),
1258 Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
1259 Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
1260 Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
1261 Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
1262 Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
1263 Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
1264 Self::StringToBytes(offsets, values)
1265 | Self::BytesToString(offsets, values)
1266 | Self::Binary(offsets, values)
1267 | Self::String(offsets, values)
1268 | Self::StringView(offsets, values) => {
1269 let data = buf.get_bytes()?;
1270 offsets.push_length(data.len());
1271 values.extend_from_slice(data);
1272 }
1273 Self::Uuid(values) => {
1274 let s_bytes = buf.get_bytes()?;
1275 let s = std::str::from_utf8(s_bytes).map_err(|e| {
1276 AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
1277 })?;
1278 let uuid = Uuid::try_parse(s)
1279 .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
1280 values.extend_from_slice(uuid.as_bytes());
1281 }
1282 Self::Array(_, off, encoding) => {
1283 let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
1284 off.push_length(total_items);
1285 }
1286 Self::Record(_, encodings, _, None) => {
1287 for encoding in encodings {
1288 encoding.decode(buf)?;
1289 }
1290 }
1291 Self::Record(_, encodings, _, Some(proj)) => {
1292 proj.project_record(buf, encodings)?;
1293 }
1294 Self::Map(_, koff, moff, kdata, valdec) => {
1295 let newly_added = read_blocks(buf, |cur| {
1296 let kb = cur.get_bytes()?;
1297 koff.push_length(kb.len());
1298 kdata.extend_from_slice(kb);
1299 valdec.decode(cur)
1300 })?;
1301 moff.push_length(newly_added);
1302 }
1303 Self::Fixed(sz, accum) => {
1304 let fx = buf.get_fixed(*sz as usize)?;
1305 accum.extend_from_slice(fx);
1306 }
1307 #[cfg(feature = "small_decimals")]
1308 Self::Decimal32(_, _, size, builder) => {
1309 decode_decimal!(size, buf, builder, 4, i32);
1310 }
1311 #[cfg(feature = "small_decimals")]
1312 Self::Decimal64(_, _, size, builder) => {
1313 decode_decimal!(size, buf, builder, 8, i64);
1314 }
1315 Self::Decimal128(_, _, size, builder) => {
1316 decode_decimal!(size, buf, builder, 16, i128);
1317 }
1318 Self::Decimal256(_, _, size, builder) => {
1319 decode_decimal!(size, buf, builder, 32, i256);
1320 }
1321 Self::Enum(indices, _, None) => {
1322 indices.push(buf.get_int()?);
1323 }
1324 Self::Enum(indices, _, Some(res)) => {
1325 let raw = buf.get_int()?;
1326 let resolved = res.resolve(raw)?;
1327 indices.push(resolved);
1328 }
1329 Self::Duration(builder) => {
1330 let b = buf.get_fixed(12)?;
1331 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1332 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1333 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1334 let nanos = (millis as i64) * 1_000_000;
1335 builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
1336 }
1337 #[cfg(feature = "avro_custom_types")]
1338 Self::RunEndEncoded(_, len, inner) => {
1339 *len += 1;
1340 inner.decode(buf)?;
1341 }
1342 Self::Union(u) => u.decode(buf)?,
1343 Self::Nullable(plan, nb, encoding) => {
1344 match plan {
1345 NullablePlan::FromSingle { resolution } => {
1346 encoding.decode_with_resolution(buf, resolution)?;
1347 nb.append(true);
1348 }
1349 NullablePlan::ReadTag {
1350 nullability,
1351 resolution,
1352 } => {
1353 let branch = buf.read_vlq()?;
1354 let is_not_null = match *nullability {
1355 Nullability::NullFirst => branch != 0,
1356 Nullability::NullSecond => branch == 0,
1357 };
1358 if is_not_null {
1359 encoding.decode_with_resolution(buf, resolution)?;
1361 } else {
1362 encoding.append_null()?;
1363 }
1364 nb.append(is_not_null);
1365 }
1366 }
1367 }
1368 }
1369 Ok(())
1370 }
1371
1372 fn decode_with_promotion(
1373 &mut self,
1374 buf: &mut AvroCursor<'_>,
1375 promotion: Promotion,
1376 ) -> Result<(), AvroError> {
1377 #[cfg(feature = "avro_custom_types")]
1378 if let Self::RunEndEncoded(_, len, inner) = self {
1379 *len += 1;
1380 return inner.decode_with_promotion(buf, promotion);
1381 }
1382
1383 macro_rules! promote_numeric_to {
1384 ($variant:ident, $getter:ident, $to:ty) => {{
1385 match self {
1386 Self::$variant(v) => {
1387 let x = buf.$getter()?;
1388 v.push(x as $to);
1389 Ok(())
1390 }
1391 other => Err(AvroError::ParseError(format!(
1392 "Promotion {promotion} target mismatch: expected {}, got {}",
1393 stringify!($variant),
1394 <Self as ::std::convert::AsRef<str>>::as_ref(other)
1395 ))),
1396 }
1397 }};
1398 }
1399 match promotion {
1400 Promotion::Direct => self.decode(buf),
1401 Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1402 Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1403 Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1404 Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1405 Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1406 Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1407 Promotion::StringToBytes => match self {
1408 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1409 let data = buf.get_bytes()?;
1410 offsets.push_length(data.len());
1411 values.extend_from_slice(data);
1412 Ok(())
1413 }
1414 other => Err(AvroError::ParseError(format!(
1415 "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1416 <Self as AsRef<str>>::as_ref(other)
1417 ))),
1418 },
1419 Promotion::BytesToString => match self {
1420 Self::String(offsets, values)
1421 | Self::StringView(offsets, values)
1422 | Self::BytesToString(offsets, values) => {
1423 let data = buf.get_bytes()?;
1424 offsets.push_length(data.len());
1425 values.extend_from_slice(data);
1426 Ok(())
1427 }
1428 other => Err(AvroError::ParseError(format!(
1429 "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1430 <Self as AsRef<str>>::as_ref(other)
1431 ))),
1432 },
1433 }
1434 }
1435
1436 fn decode_with_resolution<'d>(
1437 &'d mut self,
1438 buf: &mut AvroCursor<'_>,
1439 resolution: &'d ResolutionPlan,
1440 ) -> Result<(), AvroError> {
1441 #[cfg(feature = "avro_custom_types")]
1442 if let Self::RunEndEncoded(_, len, inner) = self {
1443 *len += 1;
1444 return inner.decode_with_resolution(buf, resolution);
1445 }
1446
1447 match resolution {
1448 ResolutionPlan::Promotion(promotion) => {
1449 let promotion = *promotion;
1450 self.decode_with_promotion(buf, promotion)
1451 }
1452 ResolutionPlan::DefaultValue(lit) => self.append_default(lit),
1453 ResolutionPlan::EnumMapping(res) => {
1454 let Self::Enum(indices, _, _) = self else {
1455 return Err(AvroError::SchemaError(
1456 "enum mapping resolution provided for non-enum decoder".into(),
1457 ));
1458 };
1459 let raw = buf.get_int()?;
1460 let resolved = res.resolve(raw)?;
1461 indices.push(resolved);
1462 Ok(())
1463 }
1464 ResolutionPlan::Record(proj) => {
1465 let Self::Record(_, encodings, _, _) = self else {
1466 return Err(AvroError::SchemaError(
1467 "record projection provided for non-record decoder".into(),
1468 ));
1469 };
1470 proj.project_record(buf, encodings)
1471 }
1472 }
1473 }
1474
1475 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1477 Ok(match self {
1478 Self::Nullable(_, n, e) => e.flush(n.finish())?,
1479 Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1480 Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1481 Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1482 Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1483 Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1484 Self::TimeMillis(values) => {
1485 Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1486 }
1487 Self::TimeMicros(values) => {
1488 Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1489 }
1490 Self::TimestampMillis(tz, values) => Arc::new(
1491 flush_primitive::<TimestampMillisecondType>(values, nulls)
1492 .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1493 ),
1494 Self::TimestampMicros(tz, values) => Arc::new(
1495 flush_primitive::<TimestampMicrosecondType>(values, nulls)
1496 .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1497 ),
1498 Self::TimestampNanos(tz, values) => Arc::new(
1499 flush_primitive::<TimestampNanosecondType>(values, nulls)
1500 .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1501 ),
1502 #[cfg(feature = "avro_custom_types")]
1503 Self::DurationSecond(values) => {
1504 Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1505 }
1506 #[cfg(feature = "avro_custom_types")]
1507 Self::DurationMillisecond(values) => {
1508 Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1509 }
1510 #[cfg(feature = "avro_custom_types")]
1511 Self::DurationMicrosecond(values) => {
1512 Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1513 }
1514 #[cfg(feature = "avro_custom_types")]
1515 Self::DurationNanosecond(values) => {
1516 Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1517 }
1518 #[cfg(feature = "avro_custom_types")]
1519 Self::Int8(values) => Arc::new(flush_primitive::<Int8Type>(values, nulls)),
1520 #[cfg(feature = "avro_custom_types")]
1521 Self::Int16(values) => Arc::new(flush_primitive::<Int16Type>(values, nulls)),
1522 #[cfg(feature = "avro_custom_types")]
1523 Self::UInt8(values) => Arc::new(flush_primitive::<UInt8Type>(values, nulls)),
1524 #[cfg(feature = "avro_custom_types")]
1525 Self::UInt16(values) => Arc::new(flush_primitive::<UInt16Type>(values, nulls)),
1526 #[cfg(feature = "avro_custom_types")]
1527 Self::UInt32(values) => Arc::new(flush_primitive::<UInt32Type>(values, nulls)),
1528 #[cfg(feature = "avro_custom_types")]
1529 Self::UInt64(values) => Arc::new(flush_primitive::<UInt64Type>(values, nulls)),
1530 #[cfg(feature = "avro_custom_types")]
1531 Self::Float16(values) => {
1532 let len = values.len();
1535 let buf: Buffer = std::mem::take(values).into();
1536 let scalar_buf = ScalarBuffer::new(buf, 0, len);
1537 Arc::new(Float16Array::new(scalar_buf, nulls))
1538 }
1539 #[cfg(feature = "avro_custom_types")]
1540 Self::Date64(values) => Arc::new(flush_primitive::<Date64Type>(values, nulls)),
1541 #[cfg(feature = "avro_custom_types")]
1542 Self::TimeNanos(values) => {
1543 Arc::new(flush_primitive::<Time64NanosecondType>(values, nulls))
1544 }
1545 #[cfg(feature = "avro_custom_types")]
1546 Self::Time32Secs(values) => {
1547 Arc::new(flush_primitive::<Time32SecondType>(values, nulls))
1548 }
1549 #[cfg(feature = "avro_custom_types")]
1550 Self::TimestampSecs(is_utc, values) => Arc::new(
1551 flush_primitive::<TimestampSecondType>(values, nulls)
1552 .with_timezone_opt(is_utc.then(|| "+00:00")),
1553 ),
1554 #[cfg(feature = "avro_custom_types")]
1555 Self::IntervalYearMonth(values) => {
1556 Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
1557 }
1558 #[cfg(feature = "avro_custom_types")]
1559 Self::IntervalMonthDayNano(values) => {
1560 Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1561 }
1562 #[cfg(feature = "avro_custom_types")]
1563 Self::IntervalDayTime(values) => {
1564 Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
1565 }
1566 Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1567 Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1568 Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1569 Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1570 Arc::new(flush_primitive::<Float32Type>(values, nulls))
1571 }
1572 Self::Int32ToFloat64(values)
1573 | Self::Int64ToFloat64(values)
1574 | Self::Float32ToFloat64(values) => {
1575 Arc::new(flush_primitive::<Float64Type>(values, nulls))
1576 }
1577 Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1578 let offsets = flush_offsets(offsets);
1579 let values = flush_values(values).into();
1580 Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1581 }
1582 Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1583 let offsets = flush_offsets(offsets);
1584 let values = flush_values(values).into();
1585 Arc::new(StringArray::try_new(offsets, values, nulls)?)
1586 }
1587 Self::StringView(offsets, values) => {
1588 let offsets = flush_offsets(offsets);
1589 let values = flush_values(values);
1590 let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1591 let values: Vec<&str> = (0..array.len())
1592 .map(|i| {
1593 if array.is_valid(i) {
1594 array.value(i)
1595 } else {
1596 ""
1597 }
1598 })
1599 .collect();
1600 Arc::new(StringViewArray::from(values))
1601 }
1602 Self::Array(field, offsets, values) => {
1603 let values = values.flush(None)?;
1604 let offsets = flush_offsets(offsets);
1605 Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1606 }
1607 Self::Record(fields, encodings, _, _) => {
1608 let arrays = encodings
1609 .iter_mut()
1610 .map(|x| x.flush(None))
1611 .collect::<Result<Vec<_>, _>>()?;
1612 Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1613 }
1614 Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1615 let moff = flush_offsets(m_off);
1616 let koff = flush_offsets(k_off);
1617 let kd = flush_values(kdata).into();
1618 let val_arr = valdec.flush(None)?;
1619 let key_arr = StringArray::try_new(koff, kd, None)?;
1620 if key_arr.len() != val_arr.len() {
1621 return Err(AvroError::InvalidArgument(format!(
1622 "Map keys length ({}) != map values length ({})",
1623 key_arr.len(),
1624 val_arr.len()
1625 )));
1626 }
1627 let final_len = moff.len() - 1;
1628 if let Some(n) = &nulls {
1629 if n.len() != final_len {
1630 return Err(AvroError::InvalidArgument(format!(
1631 "Map array null buffer length {} != final map length {final_len}",
1632 n.len()
1633 )));
1634 }
1635 }
1636 let entries_fields = match map_field.data_type() {
1637 DataType::Struct(fields) => fields.clone(),
1638 other => {
1639 return Err(AvroError::InvalidArgument(format!(
1640 "Map entries field must be a Struct, got {other:?}"
1641 )));
1642 }
1643 };
1644 let entries_struct =
1645 StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1646 let map_arr =
1647 MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1648 Arc::new(map_arr)
1649 }
1650 Self::Fixed(sz, accum) => {
1651 let b: Buffer = flush_values(accum).into();
1652 let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1653 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1654 Arc::new(arr)
1655 }
1656 Self::Uuid(values) => {
1657 let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1658 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1659 Arc::new(arr)
1660 }
1661 #[cfg(feature = "small_decimals")]
1662 Self::Decimal32(precision, scale, _, builder) => {
1663 flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1664 }
1665 #[cfg(feature = "small_decimals")]
1666 Self::Decimal64(precision, scale, _, builder) => {
1667 flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1668 }
1669 Self::Decimal128(precision, scale, _, builder) => {
1670 flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1671 }
1672 Self::Decimal256(precision, scale, _, builder) => {
1673 flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1674 }
1675 Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1676 Self::Duration(builder) => {
1677 let (_, vals, _) = builder.finish().into_parts();
1678 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1679 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1680 Arc::new(vals)
1681 }
1682 #[cfg(feature = "avro_custom_types")]
1683 Self::RunEndEncoded(width, len, inner) => {
1684 let values = inner.flush(nulls)?;
1685 let n = *len;
1686 let arr = values.as_ref();
1687 let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1688 if n > 0 {
1689 run_starts.push(0);
1690 for i in 1..n {
1691 if !values_equal_at(arr, i - 1, i) {
1692 run_starts.push(i);
1693 }
1694 }
1695 }
1696 if n > (u32::MAX as usize) {
1697 return Err(AvroError::InvalidArgument(format!(
1698 "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1699 )));
1700 }
1701 let run_count = run_starts.len();
1702 let take_idx: PrimitiveArray<UInt32Type> =
1703 run_starts.iter().map(|&s| s as u32).collect();
1704 let per_run_values = if run_count == 0 {
1705 values.slice(0, 0)
1706 } else {
1707 take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1708 AvroError::ParseError(format!("take() for REE values failed: {e}"))
1709 })?
1710 };
1711
1712 macro_rules! build_run_array {
1713 ($Native:ty, $ArrowTy:ty) => {{
1714 let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1715 for (idx, &_start) in run_starts.iter().enumerate() {
1716 let end = if idx + 1 < run_count {
1717 run_starts[idx + 1]
1718 } else {
1719 n
1720 };
1721 ends.push(end as $Native);
1722 }
1723 let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1724 let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1725 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1726 Arc::new(run_arr) as ArrayRef
1727 }};
1728 }
1729 match *width {
1730 2 => {
1731 if n > i16::MAX as usize {
1732 return Err(AvroError::InvalidArgument(format!(
1733 "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1734 )));
1735 }
1736 build_run_array!(i16, Int16Type)
1737 }
1738 4 => build_run_array!(i32, Int32Type),
1739 8 => build_run_array!(i64, Int64Type),
1740 other => {
1741 return Err(AvroError::InvalidArgument(format!(
1742 "Unsupported run-end width {other} for RunEndEncoded"
1743 )));
1744 }
1745 }
1746 }
1747 Self::Union(u) => u.flush(nulls)?,
1748 })
1749 }
1750}
1751
1752#[derive(Debug)]
1754enum NullablePlan {
1755 ReadTag {
1757 nullability: Nullability,
1758 resolution: ResolutionPlan,
1759 },
1760 FromSingle { resolution: ResolutionPlan },
1763}
1764
1765#[derive(Debug)]
1767enum ResolutionPlan {
1768 Promotion(Promotion),
1770 DefaultValue(AvroLiteral),
1772 EnumMapping(EnumResolution),
1774 Record(Projector),
1776}
1777
1778impl ResolutionPlan {
1779 fn try_new(decoder: &Decoder, resolution: &ResolutionInfo) -> Result<Self, AvroError> {
1780 match (decoder, resolution) {
1781 (_, ResolutionInfo::Promotion(p)) => Ok(ResolutionPlan::Promotion(*p)),
1782 (_, ResolutionInfo::DefaultValue(lit)) => Ok(ResolutionPlan::DefaultValue(lit.clone())),
1783 (_, ResolutionInfo::EnumMapping(m)) => {
1784 Ok(ResolutionPlan::EnumMapping(EnumResolution::new(m)))
1785 }
1786 (Decoder::Record(_, _, field_defaults, _), ResolutionInfo::Record(r)) => Ok(
1787 ResolutionPlan::Record(ProjectorBuilder::try_new(r, field_defaults).build()?),
1788 ),
1789 (_, ResolutionInfo::Record(_)) => Err(AvroError::SchemaError(
1790 "record resolution on non-record decoder".into(),
1791 )),
1792 (_, ResolutionInfo::Union(_)) => Err(AvroError::SchemaError(
1793 "union variant cannot be resolved to a union type".into(),
1794 )),
1795 }
1796 }
1797}
1798
1799#[derive(Debug)]
1800struct EnumResolution {
1801 mapping: Arc<[i32]>,
1802 default_index: i32,
1803}
1804
1805impl EnumResolution {
1806 fn new(mapping: &EnumMapping) -> Self {
1807 EnumResolution {
1808 mapping: mapping.mapping.clone(),
1809 default_index: mapping.default_index,
1810 }
1811 }
1812
1813 fn resolve(&self, index: i32) -> Result<i32, AvroError> {
1814 let resolved = usize::try_from(index)
1815 .ok()
1816 .and_then(|idx| self.mapping.get(idx).copied())
1817 .filter(|&idx| idx >= 0)
1818 .unwrap_or(self.default_index);
1819 if resolved >= 0 {
1820 Ok(resolved)
1821 } else {
1822 Err(AvroError::ParseError(format!(
1823 "Enum symbol index {index} not resolvable and no default provided",
1824 )))
1825 }
1826 }
1827}
1828
1829#[derive(Debug)]
1831struct DispatchLookupTable {
1832 to_reader: Box<[i8]>,
1848 resolution: Box<[ResolutionPlan]>,
1853}
1854
1855const NO_SOURCE: i8 = -1;
1858
1859impl DispatchLookupTable {
1860 fn from_writer_to_reader(
1861 reader_branches: &[Decoder],
1862 resolution_map: &[Option<(usize, ResolutionInfo)>],
1863 ) -> Result<Self, AvroError> {
1864 let mut to_reader = Vec::with_capacity(resolution_map.len());
1865 let mut resolution = Vec::with_capacity(resolution_map.len());
1866 for map in resolution_map {
1867 match map {
1868 Some((idx, res)) => {
1869 let idx = *idx;
1870 let idx_i8 = i8::try_from(idx).map_err(|_| {
1871 AvroError::SchemaError(format!(
1872 "Reader branch index {idx} exceeds i8 range (max {})",
1873 i8::MAX
1874 ))
1875 })?;
1876 let plan = ResolutionPlan::try_new(&reader_branches[idx], res)?;
1877 to_reader.push(idx_i8);
1878 resolution.push(plan);
1879 }
1880 None => {
1881 to_reader.push(NO_SOURCE);
1882 resolution.push(ResolutionPlan::DefaultValue(AvroLiteral::Null));
1883 }
1884 }
1885 }
1886 Ok(Self {
1887 to_reader: to_reader.into_boxed_slice(),
1888 resolution: resolution.into_boxed_slice(),
1889 })
1890 }
1891
1892 #[inline]
1894 fn resolve(&self, writer_index: usize) -> Option<(usize, &ResolutionPlan)> {
1895 let reader_index = *self.to_reader.get(writer_index)?;
1896 (reader_index >= 0).then(|| (reader_index as usize, &self.resolution[writer_index]))
1897 }
1898}
1899
1900#[derive(Debug)]
1901struct UnionDecoder {
1902 fields: UnionFields,
1903 branches: UnionDecoderBranches,
1904 default_emit_idx: usize,
1905 null_emit_idx: usize,
1906 plan: UnionReadPlan,
1907}
1908
1909#[derive(Debug, Default)]
1910struct UnionDecoderBranches {
1911 decoders: Vec<Decoder>,
1912 reader_type_codes: Vec<i8>,
1913 type_ids: Vec<i8>,
1914 offsets: Vec<i32>,
1915 counts: Vec<i32>,
1916}
1917
1918impl UnionDecoderBranches {
1919 fn new(decoders: Vec<Decoder>, reader_type_codes: Vec<i8>) -> Self {
1920 let branch_len = decoders.len().max(reader_type_codes.len());
1921 Self {
1922 decoders,
1923 reader_type_codes,
1924 type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1925 offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1926 counts: vec![0; branch_len],
1927 }
1928 }
1929
1930 fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1931 let branches_len = self.decoders.len();
1932 let Some(reader_branch) = self.decoders.get_mut(reader_idx) else {
1933 return Err(AvroError::ParseError(format!(
1934 "Union branch index {reader_idx} out of range ({branches_len} branches)"
1935 )));
1936 };
1937 self.type_ids.push(self.reader_type_codes[reader_idx]);
1938 self.offsets.push(self.counts[reader_idx]);
1939 self.counts[reader_idx] += 1;
1940 Ok(reader_branch)
1941 }
1942}
1943
1944impl Default for UnionDecoder {
1945 fn default() -> Self {
1946 Self {
1947 fields: UnionFields::empty(),
1948 branches: Default::default(),
1949 default_emit_idx: 0,
1950 null_emit_idx: 0,
1951 plan: UnionReadPlan::Passthrough,
1952 }
1953 }
1954}
1955
1956#[derive(Debug)]
1957enum UnionReadPlan {
1958 ReaderUnion {
1959 lookup_table: DispatchLookupTable,
1960 },
1961 FromSingle {
1962 reader_idx: usize,
1963 resolution: ResolutionPlan,
1964 },
1965 ToSingle {
1966 target: Box<Decoder>,
1967 lookup_table: DispatchLookupTable,
1968 },
1969 Passthrough,
1970}
1971
1972impl UnionReadPlan {
1973 fn from_resolved(
1974 reader_branches: &[Decoder],
1975 resolved: Option<ResolvedUnion>,
1976 ) -> Result<Self, AvroError> {
1977 let Some(info) = resolved else {
1978 return Ok(Self::Passthrough);
1979 };
1980 match (info.writer_is_union, info.reader_is_union) {
1981 (true, true) => {
1982 let lookup_table =
1983 DispatchLookupTable::from_writer_to_reader(reader_branches, &info.writer_to_reader)?;
1984 Ok(Self::ReaderUnion { lookup_table })
1985 }
1986 (false, true) => {
1987 let Some((idx, resolution)) =
1988 info.writer_to_reader.first().and_then(Option::as_ref)
1989 else {
1990 return Err(AvroError::SchemaError(
1991 "Writer type does not match any reader union branch".to_string(),
1992 ));
1993 };
1994 let reader_idx = *idx;
1995 Ok(Self::FromSingle {
1996 reader_idx,
1997 resolution: ResolutionPlan::try_new(&reader_branches[reader_idx], resolution)?,
1998 })
1999 }
2000 (true, false) => Err(AvroError::InvalidArgument(
2001 "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
2002 .to_string(),
2003 )),
2004 _ => Err(AvroError::SchemaError(
2006 "ResolvedUnion constructed for non-union sides; resolver should return None"
2007 .to_string(),
2008 )),
2009 }
2010 }
2011}
2012
2013impl UnionDecoder {
2014 fn try_new(
2015 fields: UnionFields,
2016 branches: Vec<Decoder>,
2017 resolved: Option<ResolvedUnion>,
2018 ) -> Result<Self, AvroError> {
2019 let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
2020 let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
2021 let default_emit_idx = 0;
2022 let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
2023 let max_addr = (i32::MAX as usize) + 1;
2025 if branches.len() > max_addr {
2026 return Err(AvroError::SchemaError(format!(
2027 "Reader union has {} branches, which exceeds the maximum addressable \
2028 branches by an Avro int tag ({} + 1).",
2029 branches.len(),
2030 i32::MAX
2031 )));
2032 }
2033 let plan = UnionReadPlan::from_resolved(&branches, resolved)?;
2034 Ok(Self {
2035 fields,
2036 branches: UnionDecoderBranches::new(branches, reader_type_codes),
2037 default_emit_idx,
2038 null_emit_idx,
2039 plan,
2040 })
2041 }
2042
2043 fn with_single_target(target: Decoder, info: ResolvedUnion) -> Result<Self, AvroError> {
2044 debug_assert!(info.writer_is_union && !info.reader_is_union);
2046 let mut reader_branches = [target];
2047 let lookup_table =
2048 DispatchLookupTable::from_writer_to_reader(&reader_branches, &info.writer_to_reader)?;
2049 let target = Box::new(mem::replace(&mut reader_branches[0], Decoder::Null(0)));
2050 Ok(Self {
2051 plan: UnionReadPlan::ToSingle {
2052 target,
2053 lookup_table,
2054 },
2055 ..Self::default()
2056 })
2057 }
2058
2059 #[inline]
2060 fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
2061 let raw = buf.get_long()?;
2066 if raw < 0 {
2067 return Err(AvroError::ParseError(format!(
2068 "Negative union branch index {raw}"
2069 )));
2070 }
2071 usize::try_from(raw).map_err(|_| {
2072 AvroError::ParseError(format!(
2073 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2074 (usize::BITS as usize)
2075 ))
2076 })
2077 }
2078
2079 #[inline]
2080 fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
2081 where
2082 F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
2083 {
2084 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2085 return action(target);
2086 }
2087 let reader_idx = match &self.plan {
2088 UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
2089 _ => fallback_idx,
2090 };
2091 self.branches.emit_to(reader_idx).and_then(action)
2092 }
2093
2094 fn append_null(&mut self) -> Result<(), AvroError> {
2095 self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
2096 }
2097
2098 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
2099 self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
2100 }
2101
2102 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2103 match &mut self.plan {
2104 UnionReadPlan::Passthrough => {
2105 let reader_idx = Self::read_tag(buf)?;
2106 let decoder = self.branches.emit_to(reader_idx)?;
2107 decoder.decode(buf)
2108 }
2109 UnionReadPlan::ReaderUnion { lookup_table } => {
2110 let idx = Self::read_tag(buf)?;
2111 let Some((reader_idx, resolution)) = lookup_table.resolve(idx) else {
2112 return Err(AvroError::ParseError(format!(
2113 "Union branch index {idx} not resolvable by reader schema"
2114 )));
2115 };
2116 let decoder = self.branches.emit_to(reader_idx)?;
2117 decoder.decode_with_resolution(buf, resolution)
2118 }
2119 UnionReadPlan::FromSingle {
2120 reader_idx,
2121 resolution,
2122 } => {
2123 let decoder = self.branches.emit_to(*reader_idx)?;
2124 decoder.decode_with_resolution(buf, resolution)
2125 }
2126 UnionReadPlan::ToSingle {
2127 target,
2128 lookup_table,
2129 } => {
2130 let idx = Self::read_tag(buf)?;
2131 let Some((_, resolution)) = lookup_table.resolve(idx) else {
2132 return Err(AvroError::ParseError(format!(
2133 "Writer union branch index {idx} not resolvable by reader schema"
2134 )));
2135 };
2136 target.decode_with_resolution(buf, resolution)
2137 }
2138 }
2139 }
2140
2141 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
2142 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2143 return target.flush(nulls);
2144 }
2145 debug_assert!(
2146 nulls.is_none(),
2147 "UnionArray does not accept a validity bitmap; \
2148 nulls should have been materialized as a Null child during decode"
2149 );
2150 let children = self
2151 .branches
2152 .decoders
2153 .iter_mut()
2154 .map(|d| d.flush(None))
2155 .collect::<Result<Vec<_>, _>>()?;
2156 let arr = UnionArray::try_new(
2157 self.fields.clone(),
2158 flush_values(&mut self.branches.type_ids)
2159 .into_iter()
2160 .collect(),
2161 Some(
2162 flush_values(&mut self.branches.offsets)
2163 .into_iter()
2164 .collect(),
2165 ),
2166 children,
2167 )
2168 .map_err(|e| AvroError::ParseError(e.to_string()))?;
2169 Ok(Arc::new(arr))
2170 }
2171}
2172
2173#[derive(Debug, Default)]
2174struct UnionDecoderBuilder {
2175 fields: Option<UnionFields>,
2176 branches: Option<Vec<Decoder>>,
2177 resolved: Option<ResolvedUnion>,
2178 target: Option<Decoder>,
2179}
2180
2181impl UnionDecoderBuilder {
2182 fn new() -> Self {
2183 Self::default()
2184 }
2185
2186 fn with_fields(mut self, fields: UnionFields) -> Self {
2187 self.fields = Some(fields);
2188 self
2189 }
2190
2191 fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
2192 self.branches = Some(branches);
2193 self
2194 }
2195
2196 fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
2197 self.resolved = Some(resolved_union);
2198 self
2199 }
2200
2201 fn with_target(mut self, target: Decoder) -> Self {
2202 self.target = Some(target);
2203 self
2204 }
2205
2206 fn build(self) -> Result<UnionDecoder, AvroError> {
2207 match (self.resolved, self.fields, self.branches, self.target) {
2208 (resolved, Some(fields), Some(branches), None) => {
2209 UnionDecoder::try_new(fields, branches, resolved)
2210 }
2211 (Some(info), None, None, Some(target))
2212 if info.writer_is_union && !info.reader_is_union =>
2213 {
2214 UnionDecoder::with_single_target(target, info)
2215 }
2216 _ => Err(AvroError::InvalidArgument(
2217 "Invalid UnionDecoderBuilder configuration: expected either \
2218 (fields + branches + resolved) with no target for reader-unions, or \
2219 (resolved + target) with no fields/branches for writer-union to single."
2220 .to_string(),
2221 )),
2222 }
2223 }
2224}
2225
2226#[derive(Debug, Copy, Clone)]
2227enum NegativeBlockBehavior {
2228 ProcessItems,
2229 SkipBySize,
2230}
2231
2232#[inline]
2233fn skip_blocks(
2234 buf: &mut AvroCursor,
2235 mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2236) -> Result<usize, AvroError> {
2237 process_blockwise(
2238 buf,
2239 move |c| skip_item(c),
2240 NegativeBlockBehavior::SkipBySize,
2241 )
2242}
2243
2244#[inline]
2245fn flush_dict(
2246 indices: &mut Vec<i32>,
2247 symbols: &[String],
2248 nulls: Option<NullBuffer>,
2249) -> Result<ArrayRef, AvroError> {
2250 let keys = flush_primitive::<Int32Type>(indices, nulls);
2251 let values = Arc::new(StringArray::from_iter_values(
2252 symbols.iter().map(|s| s.as_str()),
2253 ));
2254 DictionaryArray::try_new(keys, values)
2255 .map_err(Into::into)
2256 .map(|arr| Arc::new(arr) as ArrayRef)
2257}
2258
2259#[inline]
2260fn read_blocks(
2261 buf: &mut AvroCursor,
2262 decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2263) -> Result<usize, AvroError> {
2264 process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
2265}
2266
2267#[inline]
2268fn process_blockwise(
2269 buf: &mut AvroCursor,
2270 mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2271 negative_behavior: NegativeBlockBehavior,
2272) -> Result<usize, AvroError> {
2273 let mut total = 0usize;
2274 loop {
2275 let block_count = buf.get_long()?;
2280 match block_count.cmp(&0) {
2281 Ordering::Equal => break,
2282 Ordering::Less => {
2283 let count = (-block_count) as usize;
2284 let size_in_bytes = buf.get_long()? as usize;
2286 match negative_behavior {
2287 NegativeBlockBehavior::ProcessItems => {
2288 for _ in 0..count {
2290 on_item(buf)?;
2291 }
2292 }
2293 NegativeBlockBehavior::SkipBySize => {
2294 let _ = buf.get_fixed(size_in_bytes)?;
2296 }
2297 }
2298 total += count;
2299 }
2300 Ordering::Greater => {
2301 let count = block_count as usize;
2302 for _ in 0..count {
2303 on_item(buf)?;
2304 }
2305 total += count;
2306 }
2307 }
2308 }
2309 Ok(total)
2310}
2311
2312#[inline]
2313fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
2314 std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
2315}
2316
2317#[inline]
2318fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
2319 std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
2320}
2321
2322#[inline]
2323fn flush_primitive<T: ArrowPrimitiveType>(
2324 values: &mut Vec<T::Native>,
2325 nulls: Option<NullBuffer>,
2326) -> PrimitiveArray<T> {
2327 PrimitiveArray::new(flush_values(values).into(), nulls)
2328}
2329
2330#[inline]
2331fn read_decimal_bytes_be<const N: usize>(
2332 buf: &mut AvroCursor<'_>,
2333 size: &Option<usize>,
2334) -> Result<[u8; N], AvroError> {
2335 match size {
2336 Some(n) if *n == N => {
2337 let raw = buf.get_fixed(N)?;
2338 let mut arr = [0u8; N];
2339 arr.copy_from_slice(raw);
2340 Ok(arr)
2341 }
2342 Some(n) => {
2343 let raw = buf.get_fixed(*n)?;
2344 sign_cast_to::<N>(raw)
2345 }
2346 None => {
2347 let raw = buf.get_bytes()?;
2348 sign_cast_to::<N>(raw)
2349 }
2350 }
2351}
2352
2353#[inline]
2362fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
2363 let len = raw.len();
2364 if len == N {
2366 let mut out = [0u8; N];
2367 out.copy_from_slice(raw);
2368 return Ok(out);
2369 }
2370 let first = raw.first().copied().unwrap_or(0u8);
2372 let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
2373 let mut out = [sign_byte; N];
2375 if len > N {
2376 let extra = len - N;
2379 if raw[..extra].iter().any(|&b| b != sign_byte) {
2381 return Err(AvroError::ParseError(format!(
2382 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2383 len, N
2384 )));
2385 }
2386 if N > 0 {
2387 let first_kept = raw[extra];
2388 let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
2389 if sign_bit_mismatch {
2390 return Err(AvroError::ParseError(format!(
2391 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2392 len, N
2393 )));
2394 }
2395 }
2396 out.copy_from_slice(&raw[extra..]);
2397 return Ok(out);
2398 }
2399 out[N - len..].copy_from_slice(raw);
2400 Ok(out)
2401}
2402
2403#[cfg(feature = "avro_custom_types")]
2404#[inline]
2405fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
2406 match (arr.is_null(i), arr.is_null(j)) {
2407 (true, true) => true,
2408 (true, false) | (false, true) => false,
2409 (false, false) => {
2410 let a = arr.slice(i, 1);
2411 let b = arr.slice(j, 1);
2412 a == b
2413 }
2414 }
2415}
2416
2417#[derive(Debug)]
2418struct Projector {
2419 writer_projections: Vec<FieldProjection>,
2420 default_injections: Arc<[(usize, AvroLiteral)]>,
2421}
2422
2423#[derive(Debug)]
2424enum FieldProjection {
2425 ToReader(usize),
2426 Skip(Skipper),
2427}
2428
2429#[derive(Debug)]
2430struct ProjectorBuilder<'a> {
2431 rec: &'a ResolvedRecord,
2432 field_defaults: &'a [Option<AvroLiteral>],
2433}
2434
2435impl<'a> ProjectorBuilder<'a> {
2436 #[inline]
2437 fn try_new(rec: &'a ResolvedRecord, field_defaults: &'a [Option<AvroLiteral>]) -> Self {
2438 Self {
2439 rec,
2440 field_defaults,
2441 }
2442 }
2443
2444 #[inline]
2445 fn build(self) -> Result<Projector, AvroError> {
2446 let mut default_injections: Vec<(usize, AvroLiteral)> =
2447 Vec::with_capacity(self.rec.default_fields.len());
2448 for &idx in self.rec.default_fields.as_ref() {
2449 let lit = self
2450 .field_defaults
2451 .get(idx)
2452 .and_then(|lit| lit.clone())
2453 .unwrap_or(AvroLiteral::Null);
2454 default_injections.push((idx, lit));
2455 }
2456 let writer_projections = self
2457 .rec
2458 .writer_fields
2459 .iter()
2460 .map(|field| match field {
2461 ResolvedField::ToReader(index, _) => Ok(FieldProjection::ToReader(*index)),
2462 ResolvedField::Skip(datatype) => {
2463 let skipper = Skipper::from_avro(datatype)?;
2464 Ok(FieldProjection::Skip(skipper))
2465 }
2466 })
2467 .collect::<Result<_, AvroError>>()?;
2468 Ok(Projector {
2469 writer_projections,
2470 default_injections: default_injections.into(),
2471 })
2472 }
2473}
2474
2475impl Projector {
2476 #[inline]
2477 fn project_record(
2478 &self,
2479 buf: &mut AvroCursor<'_>,
2480 encodings: &mut [Decoder],
2481 ) -> Result<(), AvroError> {
2482 for field_proj in self.writer_projections.iter() {
2483 match field_proj {
2484 FieldProjection::ToReader(index) => encodings[*index].decode(buf)?,
2485 FieldProjection::Skip(skipper) => skipper.skip(buf)?,
2486 }
2487 }
2488 for (reader_index, lit) in self.default_injections.as_ref() {
2489 encodings[*reader_index].append_default(lit)?;
2490 }
2491 Ok(())
2492 }
2493}
2494
2495#[derive(Debug)]
2501enum Skipper {
2502 Null,
2503 Boolean,
2504 Int32,
2505 Int64,
2506 Float32,
2507 Float64,
2508 Bytes,
2509 String,
2510 TimeMicros,
2511 TimestampMillis,
2512 TimestampMicros,
2513 TimestampNanos,
2514 Fixed(usize),
2515 Decimal(Option<usize>),
2516 UuidString,
2517 Enum,
2518 DurationFixed12,
2519 List(Box<Skipper>),
2520 Map(Box<Skipper>),
2521 Struct(Vec<Skipper>),
2522 Union(Vec<Skipper>),
2523 Nullable(Nullability, Box<Skipper>),
2524 #[cfg(feature = "avro_custom_types")]
2525 RunEndEncoded(Box<Skipper>),
2526}
2527
2528impl Skipper {
2529 fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
2530 let mut base = match dt.codec() {
2531 Codec::Null => Self::Null,
2532 Codec::Boolean => Self::Boolean,
2533 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
2534 Codec::Int64 => Self::Int64,
2535 Codec::TimeMicros => Self::TimeMicros,
2536 Codec::TimestampMillis(_) => Self::TimestampMillis,
2537 Codec::TimestampMicros(_) => Self::TimestampMicros,
2538 Codec::TimestampNanos(_) => Self::TimestampNanos,
2539 #[cfg(feature = "avro_custom_types")]
2540 Codec::DurationNanos
2541 | Codec::DurationMicros
2542 | Codec::DurationMillis
2543 | Codec::DurationSeconds => Self::Int64,
2544 #[cfg(feature = "avro_custom_types")]
2545 Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 | Codec::Time32Secs => {
2546 Self::Int32
2547 }
2548 #[cfg(feature = "avro_custom_types")]
2549 Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
2550 Self::Int64
2551 }
2552 #[cfg(feature = "avro_custom_types")]
2553 Codec::UInt64 => Self::Fixed(8),
2554 #[cfg(feature = "avro_custom_types")]
2555 Codec::Float16 => Self::Fixed(2),
2556 #[cfg(feature = "avro_custom_types")]
2557 Codec::IntervalYearMonth => Self::Fixed(4),
2558 #[cfg(feature = "avro_custom_types")]
2559 Codec::IntervalMonthDayNano => Self::Fixed(16),
2560 #[cfg(feature = "avro_custom_types")]
2561 Codec::IntervalDayTime => Self::Fixed(8),
2562 Codec::Float32 => Self::Float32,
2563 Codec::Float64 => Self::Float64,
2564 Codec::Binary => Self::Bytes,
2565 Codec::Utf8 | Codec::Utf8View => Self::String,
2566 Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2567 Codec::Decimal(_, _, size) => Self::Decimal(*size),
2568 Codec::Uuid => Self::UuidString, Codec::Enum(_) => Self::Enum,
2570 Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2571 Codec::Struct(fields) => {
2572 if let Some(ResolutionInfo::Record(rec)) = dt.resolution.as_ref() {
2573 Self::Struct(
2574 rec.writer_fields
2575 .iter()
2576 .map(|wf| match wf {
2577 ResolvedField::ToReader(_, wdt) | ResolvedField::Skip(wdt) => {
2578 Skipper::from_avro(wdt)
2579 }
2580 })
2581 .collect::<Result<_, _>>()?,
2582 )
2583 } else {
2584 Self::Struct(
2585 fields
2586 .iter()
2587 .map(|f| Skipper::from_avro(f.data_type()))
2588 .collect::<Result<_, _>>()?,
2589 )
2590 }
2591 }
2592 Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2593 Codec::Interval => Self::DurationFixed12,
2594 Codec::Union(encodings, _, _) => {
2595 let max_addr = (i32::MAX as usize) + 1;
2596 if encodings.len() > max_addr {
2597 return Err(AvroError::SchemaError(format!(
2598 "Writer union has {} branches, which exceeds the maximum addressable \
2599 branches by an Avro int tag ({} + 1).",
2600 encodings.len(),
2601 i32::MAX
2602 )));
2603 }
2604 Self::Union(
2605 encodings
2606 .iter()
2607 .map(Skipper::from_avro)
2608 .collect::<Result<_, _>>()?,
2609 )
2610 }
2611 #[cfg(feature = "avro_custom_types")]
2612 Codec::RunEndEncoded(inner, _w) => {
2613 Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2614 }
2615 };
2616 if let Some(n) = dt.nullability() {
2617 base = Self::Nullable(n, Box::new(base));
2618 }
2619 Ok(base)
2620 }
2621
2622 fn skip(&self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2623 match self {
2624 Self::Null => Ok(()),
2625 Self::Boolean => {
2626 buf.get_bool()?;
2627 Ok(())
2628 }
2629 Self::Int32 => {
2630 buf.skip_int()?;
2631 Ok(())
2632 }
2633 Self::Int64
2634 | Self::TimeMicros
2635 | Self::TimestampMillis
2636 | Self::TimestampMicros
2637 | Self::TimestampNanos => {
2638 buf.skip_long()?;
2639 Ok(())
2640 }
2641 Self::Float32 => {
2642 buf.get_float()?;
2643 Ok(())
2644 }
2645 Self::Float64 => {
2646 buf.get_double()?;
2647 Ok(())
2648 }
2649 Self::Bytes | Self::String | Self::UuidString => {
2650 buf.get_bytes()?;
2651 Ok(())
2652 }
2653 Self::Fixed(sz) => {
2654 buf.get_fixed(*sz)?;
2655 Ok(())
2656 }
2657 Self::Decimal(size) => {
2658 if let Some(s) = size {
2659 buf.get_fixed(*s)
2660 } else {
2661 buf.get_bytes()
2662 }?;
2663 Ok(())
2664 }
2665 Self::Enum => {
2666 buf.skip_int()?;
2667 Ok(())
2668 }
2669 Self::DurationFixed12 => {
2670 buf.get_fixed(12)?;
2671 Ok(())
2672 }
2673 Self::List(item) => {
2674 skip_blocks(buf, |c| item.skip(c))?;
2675 Ok(())
2676 }
2677 Self::Map(value) => {
2678 skip_blocks(buf, |c| {
2679 c.get_bytes()?; value.skip(c)
2681 })?;
2682 Ok(())
2683 }
2684 Self::Struct(fields) => {
2685 for f in fields.iter() {
2686 f.skip(buf)?
2687 }
2688 Ok(())
2689 }
2690 Self::Union(encodings) => {
2691 let raw = buf.get_long()?;
2693 if raw < 0 {
2694 return Err(AvroError::ParseError(format!(
2695 "Negative union branch index {raw}"
2696 )));
2697 }
2698 let idx: usize = usize::try_from(raw).map_err(|_| {
2699 AvroError::ParseError(format!(
2700 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2701 (usize::BITS as usize)
2702 ))
2703 })?;
2704 let Some(encoding) = encodings.get(idx) else {
2705 return Err(AvroError::ParseError(format!(
2706 "Union branch index {idx} out of range for skipper ({} branches)",
2707 encodings.len()
2708 )));
2709 };
2710 encoding.skip(buf)
2711 }
2712 Self::Nullable(order, inner) => {
2713 let branch = buf.read_vlq()?;
2714 let is_not_null = match *order {
2715 Nullability::NullFirst => branch != 0,
2716 Nullability::NullSecond => branch == 0,
2717 };
2718 if is_not_null {
2719 inner.skip(buf)?;
2720 }
2721 Ok(())
2722 }
2723 #[cfg(feature = "avro_custom_types")]
2724 Self::RunEndEncoded(inner) => inner.skip(buf),
2725 }
2726 }
2727}
2728
2729#[cfg(test)]
2730mod tests {
2731 use super::*;
2732 use crate::codec::AvroFieldBuilder;
2733 use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2734 use arrow_array::cast::AsArray;
2735 use indexmap::IndexMap;
2736 use std::collections::HashMap;
2737
2738 fn encode_avro_int(value: i32) -> Vec<u8> {
2739 let mut buf = Vec::new();
2740 let mut v = (value << 1) ^ (value >> 31);
2741 while v & !0x7F != 0 {
2742 buf.push(((v & 0x7F) | 0x80) as u8);
2743 v >>= 7;
2744 }
2745 buf.push(v as u8);
2746 buf
2747 }
2748
2749 fn encode_avro_long(value: i64) -> Vec<u8> {
2750 let mut buf = Vec::new();
2751 let mut v = (value << 1) ^ (value >> 63);
2752 while v & !0x7F != 0 {
2753 buf.push(((v & 0x7F) | 0x80) as u8);
2754 v >>= 7;
2755 }
2756 buf.push(v as u8);
2757 buf
2758 }
2759
2760 fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2761 let mut buf = encode_avro_long(bytes.len() as i64);
2762 buf.extend_from_slice(bytes);
2763 buf
2764 }
2765
2766 fn avro_from_codec(codec: Codec) -> AvroDataType {
2767 AvroDataType::new(codec, Default::default(), None)
2768 }
2769
2770 fn resolved_root_datatype(
2771 writer: Schema<'static>,
2772 reader: Schema<'static>,
2773 use_utf8view: bool,
2774 strict_mode: bool,
2775 ) -> AvroDataType {
2776 let writer_record = Schema::Complex(ComplexType::Record(Record {
2778 name: "Root",
2779 namespace: None,
2780 doc: None,
2781 aliases: vec![],
2782 fields: vec![Field {
2783 name: "v",
2784 r#type: writer,
2785 default: None,
2786 doc: None,
2787 aliases: vec![],
2788 }],
2789 attributes: Attributes::default(),
2790 }));
2791
2792 let reader_record = Schema::Complex(ComplexType::Record(Record {
2794 name: "Root",
2795 namespace: None,
2796 doc: None,
2797 aliases: vec![],
2798 fields: vec![Field {
2799 name: "v",
2800 r#type: reader,
2801 default: None,
2802 doc: None,
2803 aliases: vec![],
2804 }],
2805 attributes: Attributes::default(),
2806 }));
2807
2808 let field = AvroFieldBuilder::new(&writer_record)
2810 .with_reader_schema(&reader_record)
2811 .with_utf8view(use_utf8view)
2812 .with_strict_mode(strict_mode)
2813 .build()
2814 .expect("schema resolution should succeed");
2815
2816 match field.data_type().codec() {
2817 Codec::Struct(fields) => fields[0].data_type().clone(),
2818 other => panic!("expected wrapper struct, got {other:?}"),
2819 }
2820 }
2821
2822 fn decoder_for_promotion(
2823 writer: PrimitiveType,
2824 reader: PrimitiveType,
2825 use_utf8view: bool,
2826 ) -> Decoder {
2827 let ws = Schema::TypeName(TypeName::Primitive(writer));
2828 let rs = Schema::TypeName(TypeName::Primitive(reader));
2829 let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2830 Decoder::try_new(&dt).unwrap()
2831 }
2832
2833 fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2834 AvroDataType::new(codec, HashMap::new(), nullability)
2835 }
2836
2837 #[cfg(feature = "avro_custom_types")]
2838 fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2839 let mut out = Vec::with_capacity(10);
2840 while x >= 0x80 {
2841 out.push((x as u8) | 0x80);
2842 x >>= 7;
2843 }
2844 out.push(x as u8);
2845 out
2846 }
2847
2848 #[test]
2849 fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2850 let ws = Schema::Union(vec![
2851 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2852 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2853 ]);
2854 let rs = Schema::Union(vec![
2855 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2856 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2857 ]);
2858
2859 let dt = resolved_root_datatype(ws, rs, false, false);
2860 let mut dec = Decoder::try_new(&dt).unwrap();
2861
2862 let mut rec1 = encode_avro_long(0);
2863 rec1.extend(encode_avro_int(7));
2864 let mut cur1 = AvroCursor::new(&rec1);
2865 dec.decode(&mut cur1).unwrap();
2866
2867 let mut rec2 = encode_avro_long(1);
2868 rec2.extend(encode_avro_bytes("abc".as_bytes()));
2869 let mut cur2 = AvroCursor::new(&rec2);
2870 dec.decode(&mut cur2).unwrap();
2871
2872 let arr = dec.flush(None).unwrap();
2873 let ua = arr
2874 .as_any()
2875 .downcast_ref::<UnionArray>()
2876 .expect("dense union output");
2877
2878 assert_eq!(
2879 ua.type_id(0),
2880 1,
2881 "first value must select reader 'long' branch"
2882 );
2883 assert_eq!(ua.value_offset(0), 0);
2884
2885 assert_eq!(
2886 ua.type_id(1),
2887 0,
2888 "second value must select reader 'string' branch"
2889 );
2890 assert_eq!(ua.value_offset(1), 0);
2891
2892 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2893 assert_eq!(long_child.len(), 1);
2894 assert_eq!(long_child.value(0), 7);
2895
2896 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2897 assert_eq!(str_child.len(), 1);
2898 assert_eq!(str_child.value(0), "abc");
2899 }
2900
2901 #[test]
2902 fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2903 let ws = Schema::Union(vec![
2904 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2905 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2906 ]);
2907 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2908
2909 let dt = resolved_root_datatype(ws, rs, false, false);
2910 let mut dec = Decoder::try_new(&dt).unwrap();
2911
2912 let mut data = encode_avro_long(0);
2913 data.extend(encode_avro_int(5));
2914 let mut cur = AvroCursor::new(&data);
2915 dec.decode(&mut cur).unwrap();
2916
2917 let arr = dec.flush(None).unwrap();
2918 let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2919 assert_eq!(out.len(), 1);
2920 assert_eq!(out.value(0), 5);
2921 }
2922
2923 #[test]
2924 fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2925 let ws = Schema::Union(vec![
2926 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2927 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2928 ]);
2929 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2930
2931 let dt = resolved_root_datatype(ws, rs, false, false);
2932 let mut dec = Decoder::try_new(&dt).unwrap();
2933
2934 let mut data = encode_avro_long(1);
2935 data.extend(encode_avro_bytes("z".as_bytes()));
2936 let mut cur = AvroCursor::new(&data);
2937 let res = dec.decode(&mut cur);
2938 assert!(
2939 res.is_err(),
2940 "expected error when writer union branch does not resolve to reader non-union type"
2941 );
2942 }
2943
2944 #[test]
2945 fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2946 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2947 let rs = Schema::Union(vec![
2948 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2949 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2950 ]);
2951
2952 let dt = resolved_root_datatype(ws, rs, false, false);
2953 let mut dec = Decoder::try_new(&dt).unwrap();
2954
2955 let data = encode_avro_int(6);
2956 let mut cur = AvroCursor::new(&data);
2957 dec.decode(&mut cur).unwrap();
2958
2959 let arr = dec.flush(None).unwrap();
2960 let ua = arr
2961 .as_any()
2962 .downcast_ref::<UnionArray>()
2963 .expect("dense union output");
2964 assert_eq!(ua.len(), 1);
2965 assert_eq!(
2966 ua.type_id(0),
2967 1,
2968 "must resolve to reader 'long' branch (type_id 1)"
2969 );
2970 assert_eq!(ua.value_offset(0), 0);
2971
2972 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2973 assert_eq!(long_child.len(), 1);
2974 assert_eq!(long_child.value(0), 6);
2975
2976 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2977 assert_eq!(str_child.len(), 0, "string branch must be empty");
2978 }
2979
2980 #[test]
2981 fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2982 let ws = Schema::Union(vec![
2983 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2984 Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2985 ]);
2986 let rs = Schema::Union(vec![
2987 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2988 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2989 ]);
2990
2991 let dt = resolved_root_datatype(ws, rs, false, false);
2992 let mut dec = Decoder::try_new(&dt).unwrap();
2993
2994 let mut data = encode_avro_long(1);
2995 data.push(1);
2996 let mut cur = AvroCursor::new(&data);
2997 let res = dec.decode(&mut cur);
2998 assert!(
2999 res.is_err(),
3000 "expected error for unmapped writer 'boolean' branch"
3001 );
3002 }
3003
3004 #[test]
3005 fn test_schema_resolution_promotion_int_to_long() {
3006 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
3007 assert!(matches!(dec, Decoder::Int32ToInt64(_)));
3008 for v in [0, 1, -2, 123456] {
3009 let data = encode_avro_int(v);
3010 let mut cur = AvroCursor::new(&data);
3011 dec.decode(&mut cur).unwrap();
3012 }
3013 let arr = dec.flush(None).unwrap();
3014 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
3015 assert_eq!(a.value(0), 0);
3016 assert_eq!(a.value(1), 1);
3017 assert_eq!(a.value(2), -2);
3018 assert_eq!(a.value(3), 123456);
3019 }
3020
3021 #[test]
3022 fn test_schema_resolution_promotion_int_to_float() {
3023 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
3024 assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
3025 for v in [0, 42, -7] {
3026 let data = encode_avro_int(v);
3027 let mut cur = AvroCursor::new(&data);
3028 dec.decode(&mut cur).unwrap();
3029 }
3030 let arr = dec.flush(None).unwrap();
3031 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3032 assert_eq!(a.value(0), 0.0);
3033 assert_eq!(a.value(1), 42.0);
3034 assert_eq!(a.value(2), -7.0);
3035 }
3036
3037 #[test]
3038 fn test_schema_resolution_promotion_int_to_double() {
3039 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
3040 assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
3041 for v in [1, -1, 10_000] {
3042 let data = encode_avro_int(v);
3043 let mut cur = AvroCursor::new(&data);
3044 dec.decode(&mut cur).unwrap();
3045 }
3046 let arr = dec.flush(None).unwrap();
3047 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3048 assert_eq!(a.value(0), 1.0);
3049 assert_eq!(a.value(1), -1.0);
3050 assert_eq!(a.value(2), 10_000.0);
3051 }
3052
3053 #[test]
3054 fn test_schema_resolution_promotion_long_to_float() {
3055 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
3056 assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
3057 for v in [0_i64, 1_000_000_i64, -123_i64] {
3058 let data = encode_avro_long(v);
3059 let mut cur = AvroCursor::new(&data);
3060 dec.decode(&mut cur).unwrap();
3061 }
3062 let arr = dec.flush(None).unwrap();
3063 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3064 assert_eq!(a.value(0), 0.0);
3065 assert_eq!(a.value(1), 1_000_000.0);
3066 assert_eq!(a.value(2), -123.0);
3067 }
3068
3069 #[test]
3070 fn test_schema_resolution_promotion_long_to_double() {
3071 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
3072 assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
3073 for v in [2_i64, -2_i64, 9_223_372_i64] {
3074 let data = encode_avro_long(v);
3075 let mut cur = AvroCursor::new(&data);
3076 dec.decode(&mut cur).unwrap();
3077 }
3078 let arr = dec.flush(None).unwrap();
3079 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3080 assert_eq!(a.value(0), 2.0);
3081 assert_eq!(a.value(1), -2.0);
3082 assert_eq!(a.value(2), 9_223_372.0);
3083 }
3084
3085 #[test]
3086 fn test_schema_resolution_promotion_float_to_double() {
3087 let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
3088 assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
3089 for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
3090 let data = v.to_le_bytes().to_vec();
3091 let mut cur = AvroCursor::new(&data);
3092 dec.decode(&mut cur).unwrap();
3093 }
3094 let arr = dec.flush(None).unwrap();
3095 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3096 assert_eq!(a.value(0), 0.5_f64);
3097 assert_eq!(a.value(1), -3.25_f64);
3098 assert_eq!(a.value(2), 1.0e6_f64);
3099 }
3100
3101 #[test]
3102 fn test_schema_resolution_promotion_bytes_to_string_utf8() {
3103 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
3104 assert!(matches!(dec, Decoder::BytesToString(_, _)));
3105 for s in ["hello", "world", "héllo"] {
3106 let data = encode_avro_bytes(s.as_bytes());
3107 let mut cur = AvroCursor::new(&data);
3108 dec.decode(&mut cur).unwrap();
3109 }
3110 let arr = dec.flush(None).unwrap();
3111 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3112 assert_eq!(a.value(0), "hello");
3113 assert_eq!(a.value(1), "world");
3114 assert_eq!(a.value(2), "héllo");
3115 }
3116
3117 #[test]
3118 fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
3119 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
3120 assert!(matches!(dec, Decoder::BytesToString(_, _)));
3121 let data = encode_avro_bytes("abc".as_bytes());
3122 let mut cur = AvroCursor::new(&data);
3123 dec.decode(&mut cur).unwrap();
3124 let arr = dec.flush(None).unwrap();
3125 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3126 assert_eq!(a.value(0), "abc");
3127 }
3128
3129 #[test]
3130 fn test_schema_resolution_promotion_string_to_bytes() {
3131 let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
3132 assert!(matches!(dec, Decoder::StringToBytes(_, _)));
3133 for s in ["", "abc", "data"] {
3134 let data = encode_avro_bytes(s.as_bytes());
3135 let mut cur = AvroCursor::new(&data);
3136 dec.decode(&mut cur).unwrap();
3137 }
3138 let arr = dec.flush(None).unwrap();
3139 let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3140 assert_eq!(a.value(0), b"");
3141 assert_eq!(a.value(1), b"abc");
3142 assert_eq!(a.value(2), "data".as_bytes());
3143 }
3144
3145 #[test]
3146 fn test_schema_resolution_no_promotion_passthrough_int() {
3147 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3148 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3149 let writer_record = Schema::Complex(ComplexType::Record(Record {
3151 name: "Root",
3152 namespace: None,
3153 doc: None,
3154 aliases: vec![],
3155 fields: vec![Field {
3156 name: "v",
3157 r#type: ws,
3158 default: None,
3159 doc: None,
3160 aliases: vec![],
3161 }],
3162 attributes: Attributes::default(),
3163 }));
3164 let reader_record = Schema::Complex(ComplexType::Record(Record {
3165 name: "Root",
3166 namespace: None,
3167 doc: None,
3168 aliases: vec![],
3169 fields: vec![Field {
3170 name: "v",
3171 r#type: rs,
3172 default: None,
3173 doc: None,
3174 aliases: vec![],
3175 }],
3176 attributes: Attributes::default(),
3177 }));
3178 let field = AvroFieldBuilder::new(&writer_record)
3179 .with_reader_schema(&reader_record)
3180 .with_utf8view(false)
3181 .with_strict_mode(false)
3182 .build()
3183 .unwrap();
3184 let dt = match field.data_type().codec() {
3186 Codec::Struct(fields) => fields[0].data_type().clone(),
3187 other => panic!("expected wrapper struct, got {other:?}"),
3188 };
3189 let mut dec = Decoder::try_new(&dt).unwrap();
3190 assert!(matches!(dec, Decoder::Int32(_)));
3191 for v in [7, -9] {
3192 let data = encode_avro_int(v);
3193 let mut cur = AvroCursor::new(&data);
3194 dec.decode(&mut cur).unwrap();
3195 }
3196 let arr = dec.flush(None).unwrap();
3197 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3198 assert_eq!(a.value(0), 7);
3199 assert_eq!(a.value(1), -9);
3200 }
3201
3202 #[test]
3203 fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
3204 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3205 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
3206 let writer_record = Schema::Complex(ComplexType::Record(Record {
3207 name: "Root",
3208 namespace: None,
3209 doc: None,
3210 aliases: vec![],
3211 fields: vec![Field {
3212 name: "v",
3213 r#type: ws,
3214 default: None,
3215 doc: None,
3216 aliases: vec![],
3217 }],
3218 attributes: Attributes::default(),
3219 }));
3220 let reader_record = Schema::Complex(ComplexType::Record(Record {
3221 name: "Root",
3222 namespace: None,
3223 doc: None,
3224 aliases: vec![],
3225 fields: vec![Field {
3226 name: "v",
3227 r#type: rs,
3228 default: None,
3229 doc: None,
3230 aliases: vec![],
3231 }],
3232 attributes: Attributes::default(),
3233 }));
3234 let res = AvroFieldBuilder::new(&writer_record)
3235 .with_reader_schema(&reader_record)
3236 .with_utf8view(false)
3237 .with_strict_mode(false)
3238 .build();
3239 assert!(res.is_err(), "expected error for illegal promotion");
3240 }
3241
3242 #[test]
3243 fn test_map_decoding_one_entry() {
3244 let value_type = avro_from_codec(Codec::Utf8);
3245 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3246 let mut decoder = Decoder::try_new(&map_type).unwrap();
3247 let mut data = Vec::new();
3249 data.extend_from_slice(&encode_avro_long(1));
3250 data.extend_from_slice(&encode_avro_bytes(b"hello")); data.extend_from_slice(&encode_avro_bytes(b"world")); data.extend_from_slice(&encode_avro_long(0));
3253 let mut cursor = AvroCursor::new(&data);
3254 decoder.decode(&mut cursor).unwrap();
3255 let array = decoder.flush(None).unwrap();
3256 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3257 assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 1);
3259 let entries = map_arr.value(0);
3260 let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
3261 assert_eq!(struct_entries.len(), 1);
3262 let key_arr = struct_entries
3263 .column_by_name("key")
3264 .unwrap()
3265 .as_any()
3266 .downcast_ref::<StringArray>()
3267 .unwrap();
3268 let val_arr = struct_entries
3269 .column_by_name("value")
3270 .unwrap()
3271 .as_any()
3272 .downcast_ref::<StringArray>()
3273 .unwrap();
3274 assert_eq!(key_arr.value(0), "hello");
3275 assert_eq!(val_arr.value(0), "world");
3276 }
3277
3278 #[test]
3279 fn test_map_decoding_empty() {
3280 let value_type = avro_from_codec(Codec::Utf8);
3281 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3282 let mut decoder = Decoder::try_new(&map_type).unwrap();
3283 let data = encode_avro_long(0);
3284 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3285 let array = decoder.flush(None).unwrap();
3286 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3287 assert_eq!(map_arr.len(), 1);
3288 assert_eq!(map_arr.value_length(0), 0);
3289 }
3290
3291 #[test]
3292 fn test_fixed_decoding() {
3293 let avro_type = avro_from_codec(Codec::Fixed(3));
3294 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3295
3296 let data1 = [1u8, 2, 3];
3297 let mut cursor1 = AvroCursor::new(&data1);
3298 decoder
3299 .decode(&mut cursor1)
3300 .expect("Failed to decode data1");
3301 assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
3302 let data2 = [4u8, 5, 6];
3303 let mut cursor2 = AvroCursor::new(&data2);
3304 decoder
3305 .decode(&mut cursor2)
3306 .expect("Failed to decode data2");
3307 assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
3308 let array = decoder.flush(None).expect("Failed to flush decoder");
3309 assert_eq!(array.len(), 2, "Array should contain two items");
3310 let fixed_size_binary_array = array
3311 .as_any()
3312 .downcast_ref::<FixedSizeBinaryArray>()
3313 .expect("Failed to downcast to FixedSizeBinaryArray");
3314 assert_eq!(
3315 fixed_size_binary_array.value_length(),
3316 3,
3317 "Fixed size of binary values should be 3"
3318 );
3319 assert_eq!(
3320 fixed_size_binary_array.value(0),
3321 &[1, 2, 3],
3322 "First item mismatch"
3323 );
3324 assert_eq!(
3325 fixed_size_binary_array.value(1),
3326 &[4, 5, 6],
3327 "Second item mismatch"
3328 );
3329 }
3330
3331 #[test]
3332 fn test_fixed_decoding_empty() {
3333 let avro_type = avro_from_codec(Codec::Fixed(5));
3334 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3335
3336 let array = decoder
3337 .flush(None)
3338 .expect("Failed to flush decoder for empty input");
3339
3340 assert_eq!(array.len(), 0, "Array should be empty");
3341 let fixed_size_binary_array = array
3342 .as_any()
3343 .downcast_ref::<FixedSizeBinaryArray>()
3344 .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
3345
3346 assert_eq!(
3347 fixed_size_binary_array.value_length(),
3348 5,
3349 "Fixed size of binary values should be 5 as per type"
3350 );
3351 }
3352
3353 #[test]
3354 fn test_uuid_decoding() {
3355 let avro_type = avro_from_codec(Codec::Uuid);
3356 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3357 let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
3358 let data = encode_avro_bytes(uuid_str.as_bytes());
3359 let mut cursor = AvroCursor::new(&data);
3360 decoder.decode(&mut cursor).expect("Failed to decode data");
3361 assert_eq!(
3362 cursor.position(),
3363 data.len(),
3364 "Cursor should advance by varint size + data size"
3365 );
3366 let array = decoder.flush(None).expect("Failed to flush decoder");
3367 let fixed_size_binary_array = array
3368 .as_any()
3369 .downcast_ref::<FixedSizeBinaryArray>()
3370 .expect("Array should be a FixedSizeBinaryArray");
3371 assert_eq!(fixed_size_binary_array.len(), 1);
3372 assert_eq!(fixed_size_binary_array.value_length(), 16);
3373 let expected_bytes = [
3374 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
3375 0x6b, 0xf6,
3376 ];
3377 assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
3378 }
3379
3380 #[test]
3381 fn test_array_decoding() {
3382 let item_dt = avro_from_codec(Codec::Int32);
3383 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3384 let mut decoder = Decoder::try_new(&list_dt).unwrap();
3385 let mut row1 = Vec::new();
3386 row1.extend_from_slice(&encode_avro_long(2));
3387 row1.extend_from_slice(&encode_avro_int(10));
3388 row1.extend_from_slice(&encode_avro_int(20));
3389 row1.extend_from_slice(&encode_avro_long(0));
3390 let row2 = encode_avro_long(0);
3391 let mut cursor = AvroCursor::new(&row1);
3392 decoder.decode(&mut cursor).unwrap();
3393 let mut cursor2 = AvroCursor::new(&row2);
3394 decoder.decode(&mut cursor2).unwrap();
3395 let array = decoder.flush(None).unwrap();
3396 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3397 assert_eq!(list_arr.len(), 2);
3398 let offsets = list_arr.value_offsets();
3399 assert_eq!(offsets, &[0, 2, 2]);
3400 let values = list_arr.values();
3401 let int_arr = values.as_primitive::<Int32Type>();
3402 assert_eq!(int_arr.len(), 2);
3403 assert_eq!(int_arr.value(0), 10);
3404 assert_eq!(int_arr.value(1), 20);
3405 }
3406
3407 #[test]
3408 fn test_array_decoding_with_negative_block_count() {
3409 let item_dt = avro_from_codec(Codec::Int32);
3410 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3411 let mut decoder = Decoder::try_new(&list_dt).unwrap();
3412 let mut data = encode_avro_long(-3);
3413 data.extend_from_slice(&encode_avro_long(12));
3414 data.extend_from_slice(&encode_avro_int(1));
3415 data.extend_from_slice(&encode_avro_int(2));
3416 data.extend_from_slice(&encode_avro_int(3));
3417 data.extend_from_slice(&encode_avro_long(0));
3418 let mut cursor = AvroCursor::new(&data);
3419 decoder.decode(&mut cursor).unwrap();
3420 let array = decoder.flush(None).unwrap();
3421 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3422 assert_eq!(list_arr.len(), 1);
3423 assert_eq!(list_arr.value_length(0), 3);
3424 let values = list_arr.values().as_primitive::<Int32Type>();
3425 assert_eq!(values.len(), 3);
3426 assert_eq!(values.value(0), 1);
3427 assert_eq!(values.value(1), 2);
3428 assert_eq!(values.value(2), 3);
3429 }
3430
3431 #[test]
3432 fn test_nested_array_decoding() {
3433 let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3434 let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
3435 let mut decoder = Decoder::try_new(&nested_ty).unwrap();
3436 let mut buf = Vec::new();
3437 buf.extend(encode_avro_long(1));
3438 buf.extend(encode_avro_long(2));
3439 buf.extend(encode_avro_int(5));
3440 buf.extend(encode_avro_int(6));
3441 buf.extend(encode_avro_long(0));
3442 buf.extend(encode_avro_long(0));
3443 let mut cursor = AvroCursor::new(&buf);
3444 decoder.decode(&mut cursor).unwrap();
3445 let arr = decoder.flush(None).unwrap();
3446 let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
3447 assert_eq!(outer.len(), 1);
3448 assert_eq!(outer.value_length(0), 1);
3449 let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
3450 assert_eq!(inner.len(), 1);
3451 assert_eq!(inner.value_length(0), 2);
3452 let values = inner
3453 .values()
3454 .as_any()
3455 .downcast_ref::<Int32Array>()
3456 .unwrap();
3457 assert_eq!(values.values(), &[5, 6]);
3458 }
3459
3460 #[test]
3461 fn test_array_decoding_empty_array() {
3462 let value_type = avro_from_codec(Codec::Utf8);
3463 let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
3464 let mut decoder = Decoder::try_new(&map_type).unwrap();
3465 let data = encode_avro_long(0);
3466 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3467 let array = decoder.flush(None).unwrap();
3468 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3469 assert_eq!(list_arr.len(), 1);
3470 assert_eq!(list_arr.value_length(0), 0);
3471 }
3472
3473 #[test]
3474 fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
3475 use crate::schema::Array;
3476 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3477 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3478 attributes: Attributes::default(),
3479 }));
3480 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3481 items: Box::new(Schema::Union(vec![
3482 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3483 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3484 ])),
3485 attributes: Attributes::default(),
3486 }));
3487 let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
3488 if let Codec::List(inner) = dt.codec() {
3489 assert_eq!(
3490 inner.nullability(),
3491 Some(Nullability::NullFirst),
3492 "items should be nullable"
3493 );
3494 } else {
3495 panic!("expected List codec");
3496 }
3497 let mut decoder = Decoder::try_new(&dt).unwrap();
3498 let mut data = encode_avro_long(2);
3499 data.extend(encode_avro_int(10));
3500 data.extend(encode_avro_int(20));
3501 data.extend(encode_avro_long(0));
3502 let mut cursor = AvroCursor::new(&data);
3503 decoder.decode(&mut cursor).unwrap();
3504 assert_eq!(
3505 cursor.position(),
3506 data.len(),
3507 "all bytes should be consumed"
3508 );
3509 let array = decoder.flush(None).unwrap();
3510 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3511 assert_eq!(list_arr.len(), 1, "one list/row");
3512 assert_eq!(list_arr.value_length(0), 2, "two items in the list");
3513 let values = list_arr.values().as_primitive::<Int32Type>();
3514 assert_eq!(values.len(), 2);
3515 assert_eq!(values.value(0), 10);
3516 assert_eq!(values.value(1), 20);
3517 assert!(!values.is_null(0));
3518 assert!(!values.is_null(1));
3519 }
3520
3521 #[test]
3522 fn test_decimal_decoding_fixed256() {
3523 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3524 let mut decoder = Decoder::try_new(&dt).unwrap();
3525 let row1 = [
3526 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3527 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3528 0x00, 0x00, 0x30, 0x39,
3529 ];
3530 let row2 = [
3531 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3532 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3533 0xFF, 0xFF, 0xFF, 0x85,
3534 ];
3535 let mut data = Vec::new();
3536 data.extend_from_slice(&row1);
3537 data.extend_from_slice(&row2);
3538 let mut cursor = AvroCursor::new(&data);
3539 decoder.decode(&mut cursor).unwrap();
3540 decoder.decode(&mut cursor).unwrap();
3541 let arr = decoder.flush(None).unwrap();
3542 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3543 assert_eq!(dec.len(), 2);
3544 assert_eq!(dec.value_as_string(0), "123.45");
3545 assert_eq!(dec.value_as_string(1), "-1.23");
3546 }
3547
3548 #[test]
3549 fn test_decimal_decoding_fixed128() {
3550 let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
3551 let mut decoder = Decoder::try_new(&dt).unwrap();
3552 let row1 = [
3553 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3554 0x30, 0x39,
3555 ];
3556 let row2 = [
3557 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3558 0xFF, 0x85,
3559 ];
3560 let mut data = Vec::new();
3561 data.extend_from_slice(&row1);
3562 data.extend_from_slice(&row2);
3563 let mut cursor = AvroCursor::new(&data);
3564 decoder.decode(&mut cursor).unwrap();
3565 decoder.decode(&mut cursor).unwrap();
3566 let arr = decoder.flush(None).unwrap();
3567 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3568 assert_eq!(dec.len(), 2);
3569 assert_eq!(dec.value_as_string(0), "123.45");
3570 assert_eq!(dec.value_as_string(1), "-1.23");
3571 }
3572
3573 #[test]
3574 fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3575 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3576 let mut decoder = Decoder::try_new(&dt).unwrap();
3577 let row1 = [
3578 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3579 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3580 0x00, 0x00, 0x30, 0x39,
3581 ];
3582 let row2 = [
3583 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3584 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3585 0xFF, 0xFF, 0xFF, 0x85,
3586 ];
3587 let mut data = Vec::new();
3588 data.extend_from_slice(&row1);
3589 data.extend_from_slice(&row2);
3590 let mut cursor = AvroCursor::new(&data);
3591 decoder.decode(&mut cursor).unwrap();
3592 decoder.decode(&mut cursor).unwrap();
3593 let arr = decoder.flush(None).unwrap();
3594 #[cfg(feature = "small_decimals")]
3595 {
3596 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3597 assert_eq!(dec.len(), 2);
3598 assert_eq!(dec.value_as_string(0), "123.45");
3599 assert_eq!(dec.value_as_string(1), "-1.23");
3600 }
3601 #[cfg(not(feature = "small_decimals"))]
3602 {
3603 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3604 assert_eq!(dec.len(), 2);
3605 assert_eq!(dec.value_as_string(0), "123.45");
3606 assert_eq!(dec.value_as_string(1), "-1.23");
3607 }
3608 }
3609
3610 #[test]
3611 fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3612 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3613 let mut decoder = Decoder::try_new(&dt).unwrap();
3614 let row1 = [
3615 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3616 0x30, 0x39,
3617 ];
3618 let row2 = [
3619 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3620 0xFF, 0x85,
3621 ];
3622 let mut data = Vec::new();
3623 data.extend_from_slice(&row1);
3624 data.extend_from_slice(&row2);
3625 let mut cursor = AvroCursor::new(&data);
3626 decoder.decode(&mut cursor).unwrap();
3627 decoder.decode(&mut cursor).unwrap();
3628
3629 let arr = decoder.flush(None).unwrap();
3630 #[cfg(feature = "small_decimals")]
3631 {
3632 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3633 assert_eq!(dec.len(), 2);
3634 assert_eq!(dec.value_as_string(0), "123.45");
3635 assert_eq!(dec.value_as_string(1), "-1.23");
3636 }
3637 #[cfg(not(feature = "small_decimals"))]
3638 {
3639 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3640 assert_eq!(dec.len(), 2);
3641 assert_eq!(dec.value_as_string(0), "123.45");
3642 assert_eq!(dec.value_as_string(1), "-1.23");
3643 }
3644 }
3645
3646 #[test]
3647 fn test_decimal_decoding_bytes_with_nulls() {
3648 let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3649 let inner = Decoder::try_new(&dt).unwrap();
3650 let mut decoder = Decoder::Nullable(
3651 NullablePlan::ReadTag {
3652 nullability: Nullability::NullSecond,
3653 resolution: ResolutionPlan::Promotion(Promotion::Direct),
3654 },
3655 NullBufferBuilder::new(DEFAULT_CAPACITY),
3656 Box::new(inner),
3657 );
3658 let mut data = Vec::new();
3659 data.extend_from_slice(&encode_avro_int(0));
3660 data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3661 data.extend_from_slice(&encode_avro_int(1));
3662 data.extend_from_slice(&encode_avro_int(0));
3663 data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3664 let mut cursor = AvroCursor::new(&data);
3665 decoder.decode(&mut cursor).unwrap();
3666 decoder.decode(&mut cursor).unwrap();
3667 decoder.decode(&mut cursor).unwrap();
3668 let arr = decoder.flush(None).unwrap();
3669 #[cfg(feature = "small_decimals")]
3670 {
3671 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3672 assert_eq!(dec_arr.len(), 3);
3673 assert!(dec_arr.is_valid(0));
3674 assert!(!dec_arr.is_valid(1));
3675 assert!(dec_arr.is_valid(2));
3676 assert_eq!(dec_arr.value_as_string(0), "123.4");
3677 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3678 }
3679 #[cfg(not(feature = "small_decimals"))]
3680 {
3681 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3682 assert_eq!(dec_arr.len(), 3);
3683 assert!(dec_arr.is_valid(0));
3684 assert!(!dec_arr.is_valid(1));
3685 assert!(dec_arr.is_valid(2));
3686 assert_eq!(dec_arr.value_as_string(0), "123.4");
3687 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3688 }
3689 }
3690
3691 #[test]
3692 fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3693 let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3694 let inner = Decoder::try_new(&dt).unwrap();
3695 let mut decoder = Decoder::Nullable(
3696 NullablePlan::ReadTag {
3697 nullability: Nullability::NullSecond,
3698 resolution: ResolutionPlan::Promotion(Promotion::Direct),
3699 },
3700 NullBufferBuilder::new(DEFAULT_CAPACITY),
3701 Box::new(inner),
3702 );
3703 let row1 = [
3704 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3705 0xE2, 0x40,
3706 ];
3707 let row3 = [
3708 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3709 0x1D, 0xC0,
3710 ];
3711 let mut data = Vec::new();
3712 data.extend_from_slice(&encode_avro_int(0));
3713 data.extend_from_slice(&row1);
3714 data.extend_from_slice(&encode_avro_int(1));
3715 data.extend_from_slice(&encode_avro_int(0));
3716 data.extend_from_slice(&row3);
3717 let mut cursor = AvroCursor::new(&data);
3718 decoder.decode(&mut cursor).unwrap();
3719 decoder.decode(&mut cursor).unwrap();
3720 decoder.decode(&mut cursor).unwrap();
3721 let arr = decoder.flush(None).unwrap();
3722 #[cfg(feature = "small_decimals")]
3723 {
3724 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3725 assert_eq!(dec_arr.len(), 3);
3726 assert!(dec_arr.is_valid(0));
3727 assert!(!dec_arr.is_valid(1));
3728 assert!(dec_arr.is_valid(2));
3729 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3730 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3731 }
3732 #[cfg(not(feature = "small_decimals"))]
3733 {
3734 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3735 assert_eq!(dec_arr.len(), 3);
3736 assert!(dec_arr.is_valid(0));
3737 assert!(!dec_arr.is_valid(1));
3738 assert!(dec_arr.is_valid(2));
3739 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3740 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3741 }
3742 }
3743
3744 #[test]
3745 fn test_enum_decoding() {
3746 let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3747 let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3748 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3749 let mut data = Vec::new();
3750 data.extend_from_slice(&encode_avro_int(2));
3751 data.extend_from_slice(&encode_avro_int(0));
3752 data.extend_from_slice(&encode_avro_int(1));
3753 let mut cursor = AvroCursor::new(&data);
3754 decoder.decode(&mut cursor).unwrap();
3755 decoder.decode(&mut cursor).unwrap();
3756 decoder.decode(&mut cursor).unwrap();
3757 let array = decoder.flush(None).unwrap();
3758 let dict_array = array
3759 .as_any()
3760 .downcast_ref::<DictionaryArray<Int32Type>>()
3761 .unwrap();
3762 assert_eq!(dict_array.len(), 3);
3763 let values = dict_array
3764 .values()
3765 .as_any()
3766 .downcast_ref::<StringArray>()
3767 .unwrap();
3768 assert_eq!(values.value(0), "A");
3769 assert_eq!(values.value(1), "B");
3770 assert_eq!(values.value(2), "C");
3771 assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3772 }
3773
3774 #[test]
3775 fn test_enum_decoding_with_nulls() {
3776 let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3777 let enum_codec = Codec::Enum(symbols.clone());
3778 let avro_type =
3779 AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3780 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3781 let mut data = Vec::new();
3782 data.extend_from_slice(&encode_avro_long(1));
3783 data.extend_from_slice(&encode_avro_int(1));
3784 data.extend_from_slice(&encode_avro_long(0));
3785 data.extend_from_slice(&encode_avro_long(1));
3786 data.extend_from_slice(&encode_avro_int(0));
3787 let mut cursor = AvroCursor::new(&data);
3788 decoder.decode(&mut cursor).unwrap();
3789 decoder.decode(&mut cursor).unwrap();
3790 decoder.decode(&mut cursor).unwrap();
3791 let array = decoder.flush(None).unwrap();
3792 let dict_array = array
3793 .as_any()
3794 .downcast_ref::<DictionaryArray<Int32Type>>()
3795 .unwrap();
3796 assert_eq!(dict_array.len(), 3);
3797 assert!(dict_array.is_valid(0));
3798 assert!(dict_array.is_null(1));
3799 assert!(dict_array.is_valid(2));
3800 let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3801 assert_eq!(dict_array.keys(), &expected_keys);
3802 let values = dict_array
3803 .values()
3804 .as_any()
3805 .downcast_ref::<StringArray>()
3806 .unwrap();
3807 assert_eq!(values.value(0), "X");
3808 assert_eq!(values.value(1), "Y");
3809 }
3810
3811 #[test]
3812 fn test_duration_decoding_with_nulls() {
3813 let duration_codec = Codec::Interval;
3814 let avro_type = AvroDataType::new(
3815 duration_codec,
3816 Default::default(),
3817 Some(Nullability::NullFirst),
3818 );
3819 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3820 let mut data = Vec::new();
3821 data.extend_from_slice(&encode_avro_long(1)); let mut duration1 = Vec::new();
3824 duration1.extend_from_slice(&1u32.to_le_bytes());
3825 duration1.extend_from_slice(&2u32.to_le_bytes());
3826 duration1.extend_from_slice(&3u32.to_le_bytes());
3827 data.extend_from_slice(&duration1);
3828 data.extend_from_slice(&encode_avro_long(0)); data.extend_from_slice(&encode_avro_long(1)); let mut duration2 = Vec::new();
3832 duration2.extend_from_slice(&4u32.to_le_bytes());
3833 duration2.extend_from_slice(&5u32.to_le_bytes());
3834 duration2.extend_from_slice(&6u32.to_le_bytes());
3835 data.extend_from_slice(&duration2);
3836 let mut cursor = AvroCursor::new(&data);
3837 decoder.decode(&mut cursor).unwrap();
3838 decoder.decode(&mut cursor).unwrap();
3839 decoder.decode(&mut cursor).unwrap();
3840 let array = decoder.flush(None).unwrap();
3841 let interval_array = array
3842 .as_any()
3843 .downcast_ref::<IntervalMonthDayNanoArray>()
3844 .unwrap();
3845 assert_eq!(interval_array.len(), 3);
3846 assert!(interval_array.is_valid(0));
3847 assert!(interval_array.is_null(1));
3848 assert!(interval_array.is_valid(2));
3849 let expected = IntervalMonthDayNanoArray::from(vec![
3850 Some(IntervalMonthDayNano {
3851 months: 1,
3852 days: 2,
3853 nanoseconds: 3_000_000,
3854 }),
3855 None,
3856 Some(IntervalMonthDayNano {
3857 months: 4,
3858 days: 5,
3859 nanoseconds: 6_000_000,
3860 }),
3861 ]);
3862 assert_eq!(interval_array, &expected);
3863 }
3864
3865 #[cfg(feature = "avro_custom_types")]
3866 #[test]
3867 fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3868 let avro_type = AvroDataType::new(
3869 Codec::IntervalMonthDayNano,
3870 Default::default(),
3871 Some(Nullability::NullFirst),
3872 );
3873 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3874 let mut data = Vec::new();
3875 data.extend_from_slice(&encode_avro_long(1));
3877 data.extend_from_slice(&1i32.to_le_bytes());
3878 data.extend_from_slice(&(-2i32).to_le_bytes());
3879 data.extend_from_slice(&3i64.to_le_bytes());
3880 data.extend_from_slice(&encode_avro_long(0));
3882 data.extend_from_slice(&encode_avro_long(1));
3884 data.extend_from_slice(&(-4i32).to_le_bytes());
3885 data.extend_from_slice(&5i32.to_le_bytes());
3886 data.extend_from_slice(&(-6i64).to_le_bytes());
3887 let mut cursor = AvroCursor::new(&data);
3888 decoder.decode(&mut cursor).unwrap();
3889 decoder.decode(&mut cursor).unwrap();
3890 decoder.decode(&mut cursor).unwrap();
3891 let array = decoder.flush(None).unwrap();
3892 let interval_array = array
3893 .as_any()
3894 .downcast_ref::<IntervalMonthDayNanoArray>()
3895 .unwrap();
3896 assert_eq!(interval_array.len(), 3);
3897 let expected = IntervalMonthDayNanoArray::from(vec![
3898 Some(IntervalMonthDayNano::new(1, -2, 3)),
3899 None,
3900 Some(IntervalMonthDayNano::new(-4, 5, -6)),
3901 ]);
3902 assert_eq!(interval_array, &expected);
3903 }
3904
3905 #[test]
3906 fn test_duration_decoding_empty() {
3907 let duration_codec = Codec::Interval;
3908 let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3909 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3910 let array = decoder.flush(None).unwrap();
3911 assert_eq!(array.len(), 0);
3912 }
3913
3914 #[test]
3915 #[cfg(feature = "avro_custom_types")]
3916 fn test_duration_seconds_decoding() {
3917 let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3918 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3919 let mut data = Vec::new();
3920 data.extend_from_slice(&encode_avro_long(0));
3922 data.extend_from_slice(&encode_avro_long(-1));
3923 data.extend_from_slice(&encode_avro_long(2));
3924 let mut cursor = AvroCursor::new(&data);
3925 decoder.decode(&mut cursor).unwrap();
3926 decoder.decode(&mut cursor).unwrap();
3927 decoder.decode(&mut cursor).unwrap();
3928 let array = decoder.flush(None).unwrap();
3929 let dur = array
3930 .as_any()
3931 .downcast_ref::<DurationSecondArray>()
3932 .unwrap();
3933 assert_eq!(dur.values(), &[0, -1, 2]);
3934 }
3935
3936 #[test]
3937 #[cfg(feature = "avro_custom_types")]
3938 fn test_duration_milliseconds_decoding() {
3939 let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3940 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3941 let mut data = Vec::new();
3942 for v in [1i64, 0, -2] {
3943 data.extend_from_slice(&encode_avro_long(v));
3944 }
3945 let mut cursor = AvroCursor::new(&data);
3946 for _ in 0..3 {
3947 decoder.decode(&mut cursor).unwrap();
3948 }
3949 let array = decoder.flush(None).unwrap();
3950 let dur = array
3951 .as_any()
3952 .downcast_ref::<DurationMillisecondArray>()
3953 .unwrap();
3954 assert_eq!(dur.values(), &[1, 0, -2]);
3955 }
3956
3957 #[test]
3958 #[cfg(feature = "avro_custom_types")]
3959 fn test_duration_microseconds_decoding() {
3960 let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3961 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3962 let mut data = Vec::new();
3963 for v in [5i64, -6, 7] {
3964 data.extend_from_slice(&encode_avro_long(v));
3965 }
3966 let mut cursor = AvroCursor::new(&data);
3967 for _ in 0..3 {
3968 decoder.decode(&mut cursor).unwrap();
3969 }
3970 let array = decoder.flush(None).unwrap();
3971 let dur = array
3972 .as_any()
3973 .downcast_ref::<DurationMicrosecondArray>()
3974 .unwrap();
3975 assert_eq!(dur.values(), &[5, -6, 7]);
3976 }
3977
3978 #[test]
3979 #[cfg(feature = "avro_custom_types")]
3980 fn test_duration_nanoseconds_decoding() {
3981 let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3982 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3983 let mut data = Vec::new();
3984 for v in [8i64, 9, -10] {
3985 data.extend_from_slice(&encode_avro_long(v));
3986 }
3987 let mut cursor = AvroCursor::new(&data);
3988 for _ in 0..3 {
3989 decoder.decode(&mut cursor).unwrap();
3990 }
3991 let array = decoder.flush(None).unwrap();
3992 let dur = array
3993 .as_any()
3994 .downcast_ref::<DurationNanosecondArray>()
3995 .unwrap();
3996 assert_eq!(dur.values(), &[8, 9, -10]);
3997 }
3998
3999 #[test]
4000 fn test_nullable_decode_error_bitmap_corruption() {
4001 let avro_type = AvroDataType::new(
4003 Codec::Int32,
4004 Default::default(),
4005 Some(Nullability::NullSecond),
4006 );
4007 let mut decoder = Decoder::try_new(&avro_type).unwrap();
4008
4009 let mut row1 = Vec::new();
4011 row1.extend_from_slice(&encode_avro_int(1));
4012
4013 let mut row2 = Vec::new();
4015 row2.extend_from_slice(&encode_avro_int(0)); let mut row3 = Vec::new();
4019 row3.extend_from_slice(&encode_avro_int(0)); row3.extend_from_slice(&encode_avro_int(42)); decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
4023 assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
4025
4026 let array = decoder.flush(None).unwrap();
4027
4028 assert_eq!(array.len(), 2);
4030 let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
4031 assert!(int_array.is_null(0)); assert_eq!(int_array.value(1), 42); }
4034
4035 #[test]
4036 fn test_enum_mapping_reordered_symbols() {
4037 let reader_symbols: Arc<[String]> =
4038 vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
4039 let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
4040 let default_index: i32 = -1;
4041 let mut dec = Decoder::Enum(
4042 Vec::with_capacity(DEFAULT_CAPACITY),
4043 reader_symbols.clone(),
4044 Some(EnumResolution {
4045 mapping,
4046 default_index,
4047 }),
4048 );
4049 let mut data = Vec::new();
4050 data.extend_from_slice(&encode_avro_int(0));
4051 data.extend_from_slice(&encode_avro_int(1));
4052 data.extend_from_slice(&encode_avro_int(2));
4053 let mut cur = AvroCursor::new(&data);
4054 dec.decode(&mut cur).unwrap();
4055 dec.decode(&mut cur).unwrap();
4056 dec.decode(&mut cur).unwrap();
4057 let arr = dec.flush(None).unwrap();
4058 let dict = arr
4059 .as_any()
4060 .downcast_ref::<DictionaryArray<Int32Type>>()
4061 .unwrap();
4062 let expected_keys = Int32Array::from(vec![2, 0, 1]);
4063 assert_eq!(dict.keys(), &expected_keys);
4064 let values = dict
4065 .values()
4066 .as_any()
4067 .downcast_ref::<StringArray>()
4068 .unwrap();
4069 assert_eq!(values.value(0), "B");
4070 assert_eq!(values.value(1), "C");
4071 assert_eq!(values.value(2), "A");
4072 }
4073
4074 #[test]
4075 fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
4076 let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
4077 let default_index: i32 = 1;
4078 let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
4079 let mut dec = Decoder::Enum(
4080 Vec::with_capacity(DEFAULT_CAPACITY),
4081 reader_symbols.clone(),
4082 Some(EnumResolution {
4083 mapping,
4084 default_index,
4085 }),
4086 );
4087 let mut data = Vec::new();
4088 data.extend_from_slice(&encode_avro_int(0));
4089 data.extend_from_slice(&encode_avro_int(1));
4090 data.extend_from_slice(&encode_avro_int(99));
4091 let mut cur = AvroCursor::new(&data);
4092 dec.decode(&mut cur).unwrap();
4093 dec.decode(&mut cur).unwrap();
4094 dec.decode(&mut cur).unwrap();
4095 let arr = dec.flush(None).unwrap();
4096 let dict = arr
4097 .as_any()
4098 .downcast_ref::<DictionaryArray<Int32Type>>()
4099 .unwrap();
4100 let expected_keys = Int32Array::from(vec![0, 1, 1]);
4101 assert_eq!(dict.keys(), &expected_keys);
4102 let values = dict
4103 .values()
4104 .as_any()
4105 .downcast_ref::<StringArray>()
4106 .unwrap();
4107 assert_eq!(values.value(0), "A");
4108 assert_eq!(values.value(1), "B");
4109 }
4110
4111 #[test]
4112 fn test_enum_mapping_unknown_symbol_without_default_errors() {
4113 let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
4114 let default_index: i32 = -1; let mapping: Arc<[i32]> = Arc::from(vec![-1]);
4116 let mut dec = Decoder::Enum(
4117 Vec::with_capacity(DEFAULT_CAPACITY),
4118 reader_symbols,
4119 Some(EnumResolution {
4120 mapping,
4121 default_index,
4122 }),
4123 );
4124 let data = encode_avro_int(0);
4125 let mut cur = AvroCursor::new(&data);
4126 let err = dec
4127 .decode(&mut cur)
4128 .expect_err("expected decode error for unresolved enum without default");
4129 let msg = err.to_string();
4130 assert!(
4131 msg.contains("not resolvable") && msg.contains("no default"),
4132 "unexpected error message: {msg}"
4133 );
4134 }
4135
4136 fn make_record_resolved_decoder(
4137 reader_fields: &[(&str, DataType, bool)],
4138 writer_projections: Vec<FieldProjection>,
4139 ) -> Decoder {
4140 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4141 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4142 for (name, dt, nullable) in reader_fields {
4143 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4144 let enc = match dt {
4145 DataType::Int32 => Decoder::Int32(Vec::new()),
4146 DataType::Int64 => Decoder::Int64(Vec::new()),
4147 DataType::Utf8 => {
4148 Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
4149 }
4150 other => panic!("Unsupported test reader field type: {other:?}"),
4151 };
4152 encodings.push(enc);
4153 }
4154 let fields: Fields = field_refs.into();
4155 Decoder::Record(
4156 fields,
4157 encodings,
4158 vec![None; reader_fields.len()],
4159 Some(Projector {
4160 writer_projections,
4161 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4162 }),
4163 )
4164 }
4165
4166 #[test]
4167 fn test_skip_writer_trailing_field_int32() {
4168 let mut dec = make_record_resolved_decoder(
4169 &[("id", arrow_schema::DataType::Int32, false)],
4170 vec![
4171 FieldProjection::ToReader(0),
4172 FieldProjection::Skip(super::Skipper::Int32),
4173 ],
4174 );
4175 let mut data = Vec::new();
4176 data.extend_from_slice(&encode_avro_int(7));
4177 data.extend_from_slice(&encode_avro_int(999));
4178 let mut cur = AvroCursor::new(&data);
4179 dec.decode(&mut cur).unwrap();
4180 assert_eq!(cur.position(), data.len());
4181 let arr = dec.flush(None).unwrap();
4182 let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
4183 assert_eq!(struct_arr.len(), 1);
4184 let id = struct_arr
4185 .column_by_name("id")
4186 .unwrap()
4187 .as_any()
4188 .downcast_ref::<Int32Array>()
4189 .unwrap();
4190 assert_eq!(id.value(0), 7);
4191 }
4192
4193 #[test]
4194 fn test_skip_writer_middle_field_string() {
4195 let mut dec = make_record_resolved_decoder(
4196 &[
4197 ("id", DataType::Int32, false),
4198 ("score", DataType::Int64, false),
4199 ],
4200 vec![
4201 FieldProjection::ToReader(0),
4202 FieldProjection::Skip(Skipper::String),
4203 FieldProjection::ToReader(1),
4204 ],
4205 );
4206 let mut data = Vec::new();
4207 data.extend_from_slice(&encode_avro_int(42));
4208 data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
4209 data.extend_from_slice(&encode_avro_long(1000));
4210 let mut cur = AvroCursor::new(&data);
4211 dec.decode(&mut cur).unwrap();
4212 assert_eq!(cur.position(), data.len());
4213 let arr = dec.flush(None).unwrap();
4214 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4215 let id = s
4216 .column_by_name("id")
4217 .unwrap()
4218 .as_any()
4219 .downcast_ref::<Int32Array>()
4220 .unwrap();
4221 let score = s
4222 .column_by_name("score")
4223 .unwrap()
4224 .as_any()
4225 .downcast_ref::<Int64Array>()
4226 .unwrap();
4227 assert_eq!(id.value(0), 42);
4228 assert_eq!(score.value(0), 1000);
4229 }
4230
4231 #[test]
4232 fn test_skip_writer_array_with_negative_block_count_fast() {
4233 let mut dec = make_record_resolved_decoder(
4234 &[("id", DataType::Int32, false)],
4235 vec![
4236 FieldProjection::Skip(super::Skipper::List(Box::new(Skipper::Int32))),
4237 FieldProjection::ToReader(0),
4238 ],
4239 );
4240 let mut array_payload = Vec::new();
4241 array_payload.extend_from_slice(&encode_avro_int(1));
4242 array_payload.extend_from_slice(&encode_avro_int(2));
4243 array_payload.extend_from_slice(&encode_avro_int(3));
4244 let mut data = Vec::new();
4245 data.extend_from_slice(&encode_avro_long(-3));
4246 data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
4247 data.extend_from_slice(&array_payload);
4248 data.extend_from_slice(&encode_avro_long(0));
4249 data.extend_from_slice(&encode_avro_int(5));
4250 let mut cur = AvroCursor::new(&data);
4251 dec.decode(&mut cur).unwrap();
4252 assert_eq!(cur.position(), data.len());
4253 let arr = dec.flush(None).unwrap();
4254 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4255 let id = s
4256 .column_by_name("id")
4257 .unwrap()
4258 .as_any()
4259 .downcast_ref::<Int32Array>()
4260 .unwrap();
4261 assert_eq!(id.len(), 1);
4262 assert_eq!(id.value(0), 5);
4263 }
4264
4265 #[test]
4266 fn test_skip_writer_map_with_negative_block_count_fast() {
4267 let mut dec = make_record_resolved_decoder(
4268 &[("id", DataType::Int32, false)],
4269 vec![
4270 FieldProjection::Skip(Skipper::Map(Box::new(Skipper::Int32))),
4271 FieldProjection::ToReader(0),
4272 ],
4273 );
4274 let mut entries = Vec::new();
4275 entries.extend_from_slice(&encode_avro_bytes(b"k1"));
4276 entries.extend_from_slice(&encode_avro_int(10));
4277 entries.extend_from_slice(&encode_avro_bytes(b"k2"));
4278 entries.extend_from_slice(&encode_avro_int(20));
4279 let mut data = Vec::new();
4280 data.extend_from_slice(&encode_avro_long(-2));
4281 data.extend_from_slice(&encode_avro_long(entries.len() as i64));
4282 data.extend_from_slice(&entries);
4283 data.extend_from_slice(&encode_avro_long(0));
4284 data.extend_from_slice(&encode_avro_int(123));
4285 let mut cur = AvroCursor::new(&data);
4286 dec.decode(&mut cur).unwrap();
4287 assert_eq!(cur.position(), data.len());
4288 let arr = dec.flush(None).unwrap();
4289 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4290 let id = s
4291 .column_by_name("id")
4292 .unwrap()
4293 .as_any()
4294 .downcast_ref::<Int32Array>()
4295 .unwrap();
4296 assert_eq!(id.len(), 1);
4297 assert_eq!(id.value(0), 123);
4298 }
4299
4300 #[test]
4301 fn test_skip_writer_nullable_field_union_nullfirst() {
4302 let mut dec = make_record_resolved_decoder(
4303 &[("id", DataType::Int32, false)],
4304 vec![
4305 FieldProjection::Skip(super::Skipper::Nullable(
4306 Nullability::NullFirst,
4307 Box::new(super::Skipper::Int32),
4308 )),
4309 FieldProjection::ToReader(0),
4310 ],
4311 );
4312 let mut row1 = Vec::new();
4313 row1.extend_from_slice(&encode_avro_long(0));
4314 row1.extend_from_slice(&encode_avro_int(5));
4315 let mut row2 = Vec::new();
4316 row2.extend_from_slice(&encode_avro_long(1));
4317 row2.extend_from_slice(&encode_avro_int(123));
4318 row2.extend_from_slice(&encode_avro_int(7));
4319 let mut cur1 = AvroCursor::new(&row1);
4320 let mut cur2 = AvroCursor::new(&row2);
4321 dec.decode(&mut cur1).unwrap();
4322 dec.decode(&mut cur2).unwrap();
4323 assert_eq!(cur1.position(), row1.len());
4324 assert_eq!(cur2.position(), row2.len());
4325 let arr = dec.flush(None).unwrap();
4326 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4327 let id = s
4328 .column_by_name("id")
4329 .unwrap()
4330 .as_any()
4331 .downcast_ref::<Int32Array>()
4332 .unwrap();
4333 assert_eq!(id.len(), 2);
4334 assert_eq!(id.value(0), 5);
4335 assert_eq!(id.value(1), 7);
4336 }
4337
4338 fn make_dense_union_avro(
4339 children: Vec<(Codec, &'_ str, DataType)>,
4340 type_ids: Vec<i8>,
4341 ) -> AvroDataType {
4342 let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
4343 let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
4344 for (codec, name, dt) in children.into_iter() {
4345 avro_children.push(AvroDataType::new(codec, Default::default(), None));
4346 fields.push(arrow_schema::Field::new(name, dt, true));
4347 }
4348 let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
4349 let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
4350 AvroDataType::new(union_codec, Default::default(), None)
4351 }
4352
4353 #[test]
4354 fn test_union_dense_two_children_custom_type_ids() {
4355 let union_dt = make_dense_union_avro(
4356 vec![
4357 (Codec::Int32, "i", DataType::Int32),
4358 (Codec::Utf8, "s", DataType::Utf8),
4359 ],
4360 vec![2, 5],
4361 );
4362 let mut dec = Decoder::try_new(&union_dt).unwrap();
4363 let mut r1 = Vec::new();
4364 r1.extend_from_slice(&encode_avro_long(0));
4365 r1.extend_from_slice(&encode_avro_int(7));
4366 let mut r2 = Vec::new();
4367 r2.extend_from_slice(&encode_avro_long(1));
4368 r2.extend_from_slice(&encode_avro_bytes(b"x"));
4369 let mut r3 = Vec::new();
4370 r3.extend_from_slice(&encode_avro_long(0));
4371 r3.extend_from_slice(&encode_avro_int(-1));
4372 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4373 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4374 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4375 let array = dec.flush(None).unwrap();
4376 let ua = array
4377 .as_any()
4378 .downcast_ref::<UnionArray>()
4379 .expect("expected UnionArray");
4380 assert_eq!(ua.len(), 3);
4381 assert_eq!(ua.type_id(0), 2);
4382 assert_eq!(ua.type_id(1), 5);
4383 assert_eq!(ua.type_id(2), 2);
4384 assert_eq!(ua.value_offset(0), 0);
4385 assert_eq!(ua.value_offset(1), 0);
4386 assert_eq!(ua.value_offset(2), 1);
4387 let int_child = ua
4388 .child(2)
4389 .as_any()
4390 .downcast_ref::<Int32Array>()
4391 .expect("int child");
4392 assert_eq!(int_child.len(), 2);
4393 assert_eq!(int_child.value(0), 7);
4394 assert_eq!(int_child.value(1), -1);
4395 let str_child = ua
4396 .child(5)
4397 .as_any()
4398 .downcast_ref::<StringArray>()
4399 .expect("string child");
4400 assert_eq!(str_child.len(), 1);
4401 assert_eq!(str_child.value(0), "x");
4402 }
4403
4404 #[test]
4405 fn test_union_dense_with_null_and_string_children() {
4406 let union_dt = make_dense_union_avro(
4407 vec![
4408 (Codec::Null, "n", DataType::Null),
4409 (Codec::Utf8, "s", DataType::Utf8),
4410 ],
4411 vec![42, 7],
4412 );
4413 let mut dec = Decoder::try_new(&union_dt).unwrap();
4414 let r1 = encode_avro_long(0);
4415 let mut r2 = Vec::new();
4416 r2.extend_from_slice(&encode_avro_long(1));
4417 r2.extend_from_slice(&encode_avro_bytes(b"abc"));
4418 let r3 = encode_avro_long(0);
4419 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4420 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4421 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4422 let array = dec.flush(None).unwrap();
4423 let ua = array
4424 .as_any()
4425 .downcast_ref::<UnionArray>()
4426 .expect("expected UnionArray");
4427 assert_eq!(ua.len(), 3);
4428 assert_eq!(ua.type_id(0), 42);
4429 assert_eq!(ua.type_id(1), 7);
4430 assert_eq!(ua.type_id(2), 42);
4431 assert_eq!(ua.value_offset(0), 0);
4432 assert_eq!(ua.value_offset(1), 0);
4433 assert_eq!(ua.value_offset(2), 1);
4434 let null_child = ua
4435 .child(42)
4436 .as_any()
4437 .downcast_ref::<NullArray>()
4438 .expect("null child");
4439 assert_eq!(null_child.len(), 2);
4440 let str_child = ua
4441 .child(7)
4442 .as_any()
4443 .downcast_ref::<StringArray>()
4444 .expect("string child");
4445 assert_eq!(str_child.len(), 1);
4446 assert_eq!(str_child.value(0), "abc");
4447 }
4448
4449 #[test]
4450 fn test_union_decode_negative_branch_index_errors() {
4451 let union_dt = make_dense_union_avro(
4452 vec![
4453 (Codec::Int32, "i", DataType::Int32),
4454 (Codec::Utf8, "s", DataType::Utf8),
4455 ],
4456 vec![0, 1],
4457 );
4458 let mut dec = Decoder::try_new(&union_dt).unwrap();
4459 let row = encode_avro_long(-1); let err = dec
4461 .decode(&mut AvroCursor::new(&row))
4462 .expect_err("expected error for negative branch index");
4463 let msg = err.to_string();
4464 assert!(
4465 msg.contains("Negative union branch index"),
4466 "unexpected error message: {msg}"
4467 );
4468 }
4469
4470 #[test]
4471 fn test_union_decode_out_of_range_branch_index_errors() {
4472 let union_dt = make_dense_union_avro(
4473 vec![
4474 (Codec::Int32, "i", DataType::Int32),
4475 (Codec::Utf8, "s", DataType::Utf8),
4476 ],
4477 vec![10, 11],
4478 );
4479 let mut dec = Decoder::try_new(&union_dt).unwrap();
4480 let row = encode_avro_long(2);
4481 let err = dec
4482 .decode(&mut AvroCursor::new(&row))
4483 .expect_err("expected error for out-of-range branch index");
4484 let msg = err.to_string();
4485 assert!(
4486 msg.contains("out of range"),
4487 "unexpected error message: {msg}"
4488 );
4489 }
4490
4491 #[test]
4492 fn test_union_sparse_mode_not_supported() {
4493 let children: Vec<AvroDataType> = vec![
4494 AvroDataType::new(Codec::Int32, Default::default(), None),
4495 AvroDataType::new(Codec::Utf8, Default::default(), None),
4496 ];
4497 let uf = UnionFields::try_new(
4498 vec![1, 3],
4499 vec![
4500 arrow_schema::Field::new("i", DataType::Int32, true),
4501 arrow_schema::Field::new("s", DataType::Utf8, true),
4502 ],
4503 )
4504 .unwrap();
4505 let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
4506 let dt = AvroDataType::new(codec, Default::default(), None);
4507 let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
4508 let msg = err.to_string();
4509 assert!(
4510 msg.contains("Sparse Arrow unions are not yet supported"),
4511 "unexpected error message: {msg}"
4512 );
4513 }
4514
4515 fn make_record_decoder_with_projector_defaults(
4516 reader_fields: &[(&str, DataType, bool)],
4517 field_defaults: Vec<Option<AvroLiteral>>,
4518 default_injections: Vec<(usize, AvroLiteral)>,
4519 ) -> Decoder {
4520 assert_eq!(
4521 field_defaults.len(),
4522 reader_fields.len(),
4523 "field_defaults must have one entry per reader field"
4524 );
4525 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4526 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4527 for (name, dt, nullable) in reader_fields {
4528 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4529 let enc = match dt {
4530 DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
4531 DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
4532 DataType::Utf8 => Decoder::String(
4533 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4534 Vec::with_capacity(DEFAULT_CAPACITY),
4535 ),
4536 other => panic!("Unsupported test field type in helper: {other:?}"),
4537 };
4538 encodings.push(enc);
4539 }
4540 let fields: Fields = field_refs.into();
4541 let projector = Projector {
4542 writer_projections: vec![],
4543 default_injections: Arc::from(default_injections),
4544 };
4545 Decoder::Record(fields, encodings, field_defaults, Some(projector))
4546 }
4547
4548 #[cfg(feature = "avro_custom_types")]
4549 #[test]
4550 fn test_default_append_custom_integer_range_validation() {
4551 let mut d_i8 = Decoder::Int8(Vec::with_capacity(DEFAULT_CAPACITY));
4552 d_i8.append_default(&AvroLiteral::Int(i8::MIN as i32))
4553 .unwrap();
4554 d_i8.append_default(&AvroLiteral::Int(i8::MAX as i32))
4555 .unwrap();
4556 let err_i8_high = d_i8
4557 .append_default(&AvroLiteral::Int(i8::MAX as i32 + 1))
4558 .unwrap_err();
4559 assert!(err_i8_high.to_string().contains("out of range for i8"));
4560 let err_i8_low = d_i8
4561 .append_default(&AvroLiteral::Int(i8::MIN as i32 - 1))
4562 .unwrap_err();
4563 assert!(err_i8_low.to_string().contains("out of range for i8"));
4564 let arr_i8 = d_i8.flush(None).unwrap();
4565 let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4566 assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4567
4568 let mut d_i16 = Decoder::Int16(Vec::with_capacity(DEFAULT_CAPACITY));
4569 d_i16
4570 .append_default(&AvroLiteral::Int(i16::MIN as i32))
4571 .unwrap();
4572 d_i16
4573 .append_default(&AvroLiteral::Int(i16::MAX as i32))
4574 .unwrap();
4575 let err_i16_high = d_i16
4576 .append_default(&AvroLiteral::Int(i16::MAX as i32 + 1))
4577 .unwrap_err();
4578 assert!(err_i16_high.to_string().contains("out of range for i16"));
4579 let err_i16_low = d_i16
4580 .append_default(&AvroLiteral::Int(i16::MIN as i32 - 1))
4581 .unwrap_err();
4582 assert!(err_i16_low.to_string().contains("out of range for i16"));
4583 let arr_i16 = d_i16.flush(None).unwrap();
4584 let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4585 assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4586
4587 let mut d_u8 = Decoder::UInt8(Vec::with_capacity(DEFAULT_CAPACITY));
4588 d_u8.append_default(&AvroLiteral::Int(0)).unwrap();
4589 d_u8.append_default(&AvroLiteral::Int(u8::MAX as i32))
4590 .unwrap();
4591 let err_u8_neg = d_u8.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4592 assert!(err_u8_neg.to_string().contains("out of range for u8"));
4593 let err_u8_high = d_u8
4594 .append_default(&AvroLiteral::Int(u8::MAX as i32 + 1))
4595 .unwrap_err();
4596 assert!(err_u8_high.to_string().contains("out of range for u8"));
4597 let arr_u8 = d_u8.flush(None).unwrap();
4598 let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4599 assert_eq!(values_u8.values(), &[0, u8::MAX]);
4600
4601 let mut d_u16 = Decoder::UInt16(Vec::with_capacity(DEFAULT_CAPACITY));
4602 d_u16.append_default(&AvroLiteral::Int(0)).unwrap();
4603 d_u16
4604 .append_default(&AvroLiteral::Int(u16::MAX as i32))
4605 .unwrap();
4606 let err_u16_neg = d_u16.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4607 assert!(err_u16_neg.to_string().contains("out of range for u16"));
4608 let err_u16_high = d_u16
4609 .append_default(&AvroLiteral::Int(u16::MAX as i32 + 1))
4610 .unwrap_err();
4611 assert!(err_u16_high.to_string().contains("out of range for u16"));
4612 let arr_u16 = d_u16.flush(None).unwrap();
4613 let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4614 assert_eq!(values_u16.values(), &[0, u16::MAX]);
4615
4616 let mut d_u32 = Decoder::UInt32(Vec::with_capacity(DEFAULT_CAPACITY));
4617 d_u32.append_default(&AvroLiteral::Long(0)).unwrap();
4618 d_u32
4619 .append_default(&AvroLiteral::Long(u32::MAX as i64))
4620 .unwrap();
4621 let err_u32_neg = d_u32.append_default(&AvroLiteral::Long(-1)).unwrap_err();
4622 assert!(err_u32_neg.to_string().contains("out of range for u32"));
4623 let err_u32_high = d_u32
4624 .append_default(&AvroLiteral::Long(u32::MAX as i64 + 1))
4625 .unwrap_err();
4626 assert!(err_u32_high.to_string().contains("out of range for u32"));
4627 let arr_u32 = d_u32.flush(None).unwrap();
4628 let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4629 assert_eq!(values_u32.values(), &[0, u32::MAX]);
4630 }
4631
4632 #[cfg(feature = "avro_custom_types")]
4633 #[test]
4634 fn test_decode_custom_integer_range_validation() {
4635 let mut d_i8 = Decoder::try_new(&avro_from_codec(Codec::Int8)).unwrap();
4636 d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32)))
4637 .unwrap();
4638 d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32)))
4639 .unwrap();
4640 let err_i8_high = d_i8
4641 .decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32 + 1)))
4642 .unwrap_err();
4643 assert!(err_i8_high.to_string().contains("out of range for i8"));
4644 let err_i8_low = d_i8
4645 .decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32 - 1)))
4646 .unwrap_err();
4647 assert!(err_i8_low.to_string().contains("out of range for i8"));
4648 let arr_i8 = d_i8.flush(None).unwrap();
4649 let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4650 assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4651
4652 let mut d_i16 = Decoder::try_new(&avro_from_codec(Codec::Int16)).unwrap();
4653 d_i16
4654 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32)))
4655 .unwrap();
4656 d_i16
4657 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32)))
4658 .unwrap();
4659 let err_i16_high = d_i16
4660 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32 + 1)))
4661 .unwrap_err();
4662 assert!(err_i16_high.to_string().contains("out of range for i16"));
4663 let err_i16_low = d_i16
4664 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32 - 1)))
4665 .unwrap_err();
4666 assert!(err_i16_low.to_string().contains("out of range for i16"));
4667 let arr_i16 = d_i16.flush(None).unwrap();
4668 let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4669 assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4670
4671 let mut d_u8 = Decoder::try_new(&avro_from_codec(Codec::UInt8)).unwrap();
4672 d_u8.decode(&mut AvroCursor::new(&encode_avro_int(0)))
4673 .unwrap();
4674 d_u8.decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32)))
4675 .unwrap();
4676 let err_u8_neg = d_u8
4677 .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4678 .unwrap_err();
4679 assert!(err_u8_neg.to_string().contains("out of range for u8"));
4680 let err_u8_high = d_u8
4681 .decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32 + 1)))
4682 .unwrap_err();
4683 assert!(err_u8_high.to_string().contains("out of range for u8"));
4684 let arr_u8 = d_u8.flush(None).unwrap();
4685 let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4686 assert_eq!(values_u8.values(), &[0, u8::MAX]);
4687
4688 let mut d_u16 = Decoder::try_new(&avro_from_codec(Codec::UInt16)).unwrap();
4689 d_u16
4690 .decode(&mut AvroCursor::new(&encode_avro_int(0)))
4691 .unwrap();
4692 d_u16
4693 .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32)))
4694 .unwrap();
4695 let err_u16_neg = d_u16
4696 .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4697 .unwrap_err();
4698 assert!(err_u16_neg.to_string().contains("out of range for u16"));
4699 let err_u16_high = d_u16
4700 .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32 + 1)))
4701 .unwrap_err();
4702 assert!(err_u16_high.to_string().contains("out of range for u16"));
4703 let arr_u16 = d_u16.flush(None).unwrap();
4704 let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4705 assert_eq!(values_u16.values(), &[0, u16::MAX]);
4706
4707 let mut d_u32 = Decoder::try_new(&avro_from_codec(Codec::UInt32)).unwrap();
4708 d_u32
4709 .decode(&mut AvroCursor::new(&encode_avro_long(0)))
4710 .unwrap();
4711 d_u32
4712 .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64)))
4713 .unwrap();
4714 let err_u32_neg = d_u32
4715 .decode(&mut AvroCursor::new(&encode_avro_long(-1)))
4716 .unwrap_err();
4717 assert!(err_u32_neg.to_string().contains("out of range for u32"));
4718 let err_u32_high = d_u32
4719 .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64 + 1)))
4720 .unwrap_err();
4721 assert!(err_u32_high.to_string().contains("out of range for u32"));
4722 let arr_u32 = d_u32.flush(None).unwrap();
4723 let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4724 assert_eq!(values_u32.values(), &[0, u32::MAX]);
4725 }
4726
4727 #[test]
4728 fn test_default_append_int32_and_int64_from_int_and_long() {
4729 let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4730 d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
4731 let arr = d_i32.flush(None).unwrap();
4732 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4733 assert_eq!(a.len(), 1);
4734 assert_eq!(a.value(0), 42);
4735 let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
4736 d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
4737 d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
4738 let arr64 = d_i64.flush(None).unwrap();
4739 let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
4740 assert_eq!(a64.len(), 2);
4741 assert_eq!(a64.value(0), 5);
4742 assert_eq!(a64.value(1), 7);
4743 }
4744
4745 #[test]
4746 fn test_default_append_floats_and_doubles() {
4747 let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
4748 d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
4749 let arr32 = d_f32.flush(None).unwrap();
4750 let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
4751 assert_eq!(a.value(0), 1.5);
4752 let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4753 d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
4754 let arr64 = d_f64.flush(None).unwrap();
4755 let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
4756 assert_eq!(b.value(0), 2.25);
4757 }
4758
4759 #[test]
4760 fn test_default_append_string_and_bytes() {
4761 let mut d_str = Decoder::String(
4762 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4763 Vec::with_capacity(DEFAULT_CAPACITY),
4764 );
4765 d_str
4766 .append_default(&AvroLiteral::String("hi".into()))
4767 .unwrap();
4768 let s_arr = d_str.flush(None).unwrap();
4769 let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
4770 assert_eq!(arr.value(0), "hi");
4771 let mut d_bytes = Decoder::Binary(
4772 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4773 Vec::with_capacity(DEFAULT_CAPACITY),
4774 );
4775 d_bytes
4776 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4777 .unwrap();
4778 let b_arr = d_bytes.flush(None).unwrap();
4779 let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
4780 assert_eq!(barr.value(0), &[1, 2, 3]);
4781 let mut d_str_err = Decoder::String(
4782 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4783 Vec::with_capacity(DEFAULT_CAPACITY),
4784 );
4785 let err = d_str_err
4786 .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
4787 .unwrap_err();
4788 assert!(
4789 err.to_string()
4790 .contains("Default for string must be string"),
4791 "unexpected error: {err:?}"
4792 );
4793 }
4794
4795 #[test]
4796 fn test_default_append_nullable_int32_null_and_value() {
4797 let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4798 let mut dec = Decoder::Nullable(
4799 NullablePlan::ReadTag {
4800 nullability: Nullability::NullFirst,
4801 resolution: ResolutionPlan::Promotion(Promotion::Direct),
4802 },
4803 NullBufferBuilder::new(DEFAULT_CAPACITY),
4804 Box::new(inner),
4805 );
4806 dec.append_default(&AvroLiteral::Null).unwrap();
4807 dec.append_default(&AvroLiteral::Int(11)).unwrap();
4808 let arr = dec.flush(None).unwrap();
4809 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4810 assert_eq!(a.len(), 2);
4811 assert!(a.is_null(0));
4812 assert_eq!(a.value(1), 11);
4813 }
4814
4815 #[test]
4816 fn test_default_append_array_of_ints() {
4817 let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4818 let mut d = Decoder::try_new(&list_dt).unwrap();
4819 let items = vec![
4820 AvroLiteral::Int(1),
4821 AvroLiteral::Int(2),
4822 AvroLiteral::Int(3),
4823 ];
4824 d.append_default(&AvroLiteral::Array(items)).unwrap();
4825 let arr = d.flush(None).unwrap();
4826 let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4827 assert_eq!(list.len(), 1);
4828 assert_eq!(list.value_length(0), 3);
4829 let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4830 assert_eq!(vals.values(), &[1, 2, 3]);
4831 }
4832
4833 #[test]
4834 fn test_default_append_map_string_to_int() {
4835 let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4836 let mut d = Decoder::try_new(&map_dt).unwrap();
4837 let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4838 m.insert("k1".to_string(), AvroLiteral::Int(10));
4839 m.insert("k2".to_string(), AvroLiteral::Int(20));
4840 d.append_default(&AvroLiteral::Map(m)).unwrap();
4841 let arr = d.flush(None).unwrap();
4842 let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4843 assert_eq!(map.len(), 1);
4844 assert_eq!(map.value_length(0), 2);
4845 let binding = map.value(0);
4846 let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4847 let k = entries
4848 .column_by_name("key")
4849 .unwrap()
4850 .as_any()
4851 .downcast_ref::<StringArray>()
4852 .unwrap();
4853 let v = entries
4854 .column_by_name("value")
4855 .unwrap()
4856 .as_any()
4857 .downcast_ref::<Int32Array>()
4858 .unwrap();
4859 let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4860 assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4861 let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4862 assert_eq!(vals, [10, 20].into_iter().collect());
4863 }
4864
4865 #[test]
4866 fn test_default_append_enum_by_symbol() {
4867 let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4868 let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4869 d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4870 let arr = d.flush(None).unwrap();
4871 let dict = arr
4872 .as_any()
4873 .downcast_ref::<DictionaryArray<Int32Type>>()
4874 .unwrap();
4875 assert_eq!(dict.len(), 1);
4876 let expected = Int32Array::from(vec![1]);
4877 assert_eq!(dict.keys(), &expected);
4878 let values = dict
4879 .values()
4880 .as_any()
4881 .downcast_ref::<StringArray>()
4882 .unwrap();
4883 assert_eq!(values.value(1), "B");
4884 }
4885
4886 #[test]
4887 fn test_default_append_uuid_and_type_error() {
4888 let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4889 let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4890 d.append_default(&AvroLiteral::String(uuid_str.into()))
4891 .unwrap();
4892 let arr_ref = d.flush(None).unwrap();
4893 let arr = arr_ref
4894 .as_any()
4895 .downcast_ref::<FixedSizeBinaryArray>()
4896 .unwrap();
4897 assert_eq!(arr.value_length(), 16);
4898 assert_eq!(arr.len(), 1);
4899 let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4900 let err = d2
4901 .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4902 .unwrap_err();
4903 assert!(
4904 err.to_string().contains("Default for uuid must be string"),
4905 "unexpected error: {err:?}"
4906 );
4907 }
4908
4909 #[test]
4910 fn test_default_append_fixed_and_length_mismatch() {
4911 let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4912 d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4913 .unwrap();
4914 let arr_ref = d.flush(None).unwrap();
4915 let arr = arr_ref
4916 .as_any()
4917 .downcast_ref::<FixedSizeBinaryArray>()
4918 .unwrap();
4919 assert_eq!(arr.value_length(), 4);
4920 assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4921 let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4922 let err = d_err
4923 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4924 .unwrap_err();
4925 assert!(
4926 err.to_string().contains("Fixed default length"),
4927 "unexpected error: {err:?}"
4928 );
4929 }
4930
4931 #[test]
4932 fn test_default_append_duration_and_length_validation() {
4933 let dt = avro_from_codec(Codec::Interval);
4934 let mut d = Decoder::try_new(&dt).unwrap();
4935 let mut bytes = Vec::with_capacity(12);
4936 bytes.extend_from_slice(&1u32.to_le_bytes());
4937 bytes.extend_from_slice(&2u32.to_le_bytes());
4938 bytes.extend_from_slice(&3u32.to_le_bytes());
4939 d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4940 let arr_ref = d.flush(None).unwrap();
4941 let arr = arr_ref
4942 .as_any()
4943 .downcast_ref::<IntervalMonthDayNanoArray>()
4944 .unwrap();
4945 assert_eq!(arr.len(), 1);
4946 let v = arr.value(0);
4947 assert_eq!(v.months, 1);
4948 assert_eq!(v.days, 2);
4949 assert_eq!(v.nanoseconds, 3_000_000);
4950 let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4951 let err = d_err
4952 .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4953 .unwrap_err();
4954 assert!(
4955 err.to_string()
4956 .contains("Duration default must be exactly 12 bytes"),
4957 "unexpected error: {err:?}"
4958 );
4959 }
4960
4961 #[test]
4962 fn test_default_append_decimal256_from_bytes() {
4963 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4964 let mut d = Decoder::try_new(&dt).unwrap();
4965 let pos: [u8; 32] = [
4966 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4967 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4968 0x00, 0x00, 0x30, 0x39,
4969 ];
4970 d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4971 let neg: [u8; 32] = [
4972 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4973 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4974 0xFF, 0xFF, 0xFF, 0x85,
4975 ];
4976 d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4977 let arr = d.flush(None).unwrap();
4978 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4979 assert_eq!(dec.len(), 2);
4980 assert_eq!(dec.value_as_string(0), "123.45");
4981 assert_eq!(dec.value_as_string(1), "-1.23");
4982 }
4983
4984 #[test]
4985 fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4986 let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4987 let mut rec = make_record_decoder_with_projector_defaults(
4988 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4989 field_defaults,
4990 vec![],
4991 );
4992 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4993 map.insert("a".to_string(), AvroLiteral::Int(7));
4994 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4995 let arr = rec.flush(None).unwrap();
4996 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4997 let a = s
4998 .column_by_name("a")
4999 .unwrap()
5000 .as_any()
5001 .downcast_ref::<Int32Array>()
5002 .unwrap();
5003 let b = s
5004 .column_by_name("b")
5005 .unwrap()
5006 .as_any()
5007 .downcast_ref::<StringArray>()
5008 .unwrap();
5009 assert_eq!(a.value(0), 7);
5010 assert_eq!(b.value(0), "hi");
5011 }
5012
5013 #[test]
5014 fn test_record_append_default_null_uses_projector_field_defaults() {
5015 let field_defaults = vec![
5016 Some(AvroLiteral::Int(5)),
5017 Some(AvroLiteral::String("x".into())),
5018 ];
5019 let mut rec = make_record_decoder_with_projector_defaults(
5020 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
5021 field_defaults,
5022 vec![],
5023 );
5024 rec.append_default(&AvroLiteral::Null).unwrap();
5025 let arr = rec.flush(None).unwrap();
5026 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5027 let a = s
5028 .column_by_name("a")
5029 .unwrap()
5030 .as_any()
5031 .downcast_ref::<Int32Array>()
5032 .unwrap();
5033 let b = s
5034 .column_by_name("b")
5035 .unwrap()
5036 .as_any()
5037 .downcast_ref::<StringArray>()
5038 .unwrap();
5039 assert_eq!(a.value(0), 5);
5040 assert_eq!(b.value(0), "x");
5041 }
5042
5043 #[test]
5044 fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
5045 {
5046 let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
5047 let mut field_refs: Vec<FieldRef> = Vec::new();
5048 let mut encoders: Vec<Decoder> = Vec::new();
5049 for (name, dt, nullable) in &fields {
5050 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
5051 }
5052 let enc_a = Decoder::Nullable(
5053 NullablePlan::ReadTag {
5054 nullability: Nullability::NullSecond,
5055 resolution: ResolutionPlan::Promotion(Promotion::Direct),
5056 },
5057 NullBufferBuilder::new(DEFAULT_CAPACITY),
5058 Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
5059 );
5060 let enc_b = Decoder::Nullable(
5061 NullablePlan::ReadTag {
5062 nullability: Nullability::NullSecond,
5063 resolution: ResolutionPlan::Promotion(Promotion::Direct),
5064 },
5065 NullBufferBuilder::new(DEFAULT_CAPACITY),
5066 Box::new(Decoder::String(
5067 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
5068 Vec::with_capacity(DEFAULT_CAPACITY),
5069 )),
5070 );
5071 encoders.push(enc_a);
5072 encoders.push(enc_b);
5073 let field_defaults = vec![None, None]; let projector = Projector {
5075 writer_projections: vec![],
5076 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
5077 };
5078 let mut rec = Decoder::Record(field_refs.into(), encoders, field_defaults, Some(projector));
5079 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
5080 map.insert("a".to_string(), AvroLiteral::Int(9));
5081 rec.append_default(&AvroLiteral::Map(map)).unwrap();
5082 let arr = rec.flush(None).unwrap();
5083 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5084 let a = s
5085 .column_by_name("a")
5086 .unwrap()
5087 .as_any()
5088 .downcast_ref::<Int32Array>()
5089 .unwrap();
5090 let b = s
5091 .column_by_name("b")
5092 .unwrap()
5093 .as_any()
5094 .downcast_ref::<StringArray>()
5095 .unwrap();
5096 assert!(a.is_valid(0));
5097 assert_eq!(a.value(0), 9);
5098 assert!(b.is_null(0));
5099 }
5100
5101 #[test]
5102 fn test_projector_default_injection_when_writer_lacks_fields() {
5103 let defaults = vec![None, None];
5104 let injections = vec![
5105 (0, AvroLiteral::Int(99)),
5106 (1, AvroLiteral::String("alice".into())),
5107 ];
5108 let mut rec = make_record_decoder_with_projector_defaults(
5109 &[
5110 ("id", DataType::Int32, false),
5111 ("name", DataType::Utf8, false),
5112 ],
5113 defaults,
5114 injections,
5115 );
5116 rec.decode(&mut AvroCursor::new(&[])).unwrap();
5117 let arr = rec.flush(None).unwrap();
5118 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5119 let id = s
5120 .column_by_name("id")
5121 .unwrap()
5122 .as_any()
5123 .downcast_ref::<Int32Array>()
5124 .unwrap();
5125 let name = s
5126 .column_by_name("name")
5127 .unwrap()
5128 .as_any()
5129 .downcast_ref::<StringArray>()
5130 .unwrap();
5131 assert_eq!(id.value(0), 99);
5132 assert_eq!(name.value(0), "alice");
5133 }
5134
5135 #[test]
5136 fn union_type_ids_are_not_child_indexes() {
5137 let encodings: Vec<AvroDataType> =
5138 vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
5139 let fields: UnionFields = [
5140 (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
5141 (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
5142 ]
5143 .into_iter()
5144 .collect();
5145 let dt = avro_from_codec(Codec::Union(
5146 encodings.into(),
5147 fields.clone(),
5148 UnionMode::Dense,
5149 ));
5150 let mut dec = Decoder::try_new(&dt).expect("decoder");
5151 let mut b1 = encode_avro_long(1);
5152 b1.extend(encode_avro_bytes("hi".as_bytes()));
5153 dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
5154 let mut b0 = encode_avro_long(0);
5155 b0.extend(encode_avro_int(5));
5156 dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
5157 let arr = dec.flush(None).expect("flush");
5158 let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
5159 assert_eq!(ua.len(), 2);
5160 assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
5161 assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
5162 assert_eq!(ua.value_offset(0), 0);
5163 assert_eq!(ua.value_offset(1), 0);
5164 let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
5165 assert_eq!(utf8_child.len(), 1);
5166 assert_eq!(utf8_child.value(0), "hi");
5167 let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
5168 assert_eq!(int_child.len(), 1);
5169 assert_eq!(int_child.value(0), 5);
5170 let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
5171 assert_eq!(type_ids, vec![42_i8, 7_i8]);
5172 }
5173
5174 #[cfg(feature = "avro_custom_types")]
5175 #[test]
5176 fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
5177 for codec in [
5178 Codec::DurationNanos,
5179 Codec::DurationMicros,
5180 Codec::DurationMillis,
5181 Codec::DurationSeconds,
5182 ] {
5183 let dt = make_avro_dt(codec.clone(), None);
5184 let s = Skipper::from_avro(&dt)?;
5185 match s {
5186 Skipper::Int64 => {}
5187 other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
5188 }
5189 }
5190 Ok(())
5191 }
5192
5193 #[cfg(feature = "avro_custom_types")]
5194 #[test]
5195 fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
5196 let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
5197 for codec in [
5198 Codec::DurationNanos,
5199 Codec::DurationMicros,
5200 Codec::DurationMillis,
5201 Codec::DurationSeconds,
5202 ] {
5203 let dt = make_avro_dt(codec.clone(), None);
5204 let s = Skipper::from_avro(&dt)?;
5205 for &v in &values {
5206 let bytes = encode_avro_long(v);
5207 let mut cursor = AvroCursor::new(&bytes);
5208 s.skip(&mut cursor)?;
5209 assert_eq!(
5210 cursor.position(),
5211 bytes.len(),
5212 "did not consume all bytes for {:?} value {}",
5213 codec,
5214 v
5215 );
5216 }
5217 }
5218 Ok(())
5219 }
5220
5221 #[cfg(feature = "avro_custom_types")]
5222 #[test]
5223 fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
5224 let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
5225 let s = Skipper::from_avro(&dt)?;
5226 match &s {
5227 Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
5228 Skipper::Int64 => {}
5229 ref other => panic!("expected inner Int64, got {:?}", other),
5230 },
5231 other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
5232 }
5233 {
5234 let buf = encode_vlq_u64(0);
5235 let mut cursor = AvroCursor::new(&buf);
5236 s.skip(&mut cursor)?;
5237 assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
5238 }
5239 {
5240 let mut buf = encode_vlq_u64(1);
5241 buf.extend(encode_avro_long(0));
5242 let mut cursor = AvroCursor::new(&buf);
5243 s.skip(&mut cursor)?;
5244 assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
5245 }
5246
5247 Ok(())
5248 }
5249
5250 #[cfg(feature = "avro_custom_types")]
5251 #[test]
5252 fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
5253 let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
5254 let s = Skipper::from_avro(&dt)?;
5255 match &s {
5256 Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
5257 Skipper::Int64 => {}
5258 ref other => panic!("expected inner Int64, got {:?}", other),
5259 },
5260 other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
5261 }
5262 {
5263 let buf = encode_vlq_u64(1);
5264 let mut cursor = AvroCursor::new(&buf);
5265 s.skip(&mut cursor)?;
5266 assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
5267 }
5268 {
5269 let mut buf = encode_vlq_u64(0);
5270 buf.extend(encode_avro_long(-1));
5271 let mut cursor = AvroCursor::new(&buf);
5272 s.skip(&mut cursor)?;
5273 assert_eq!(
5274 cursor.position(),
5275 1 + encode_avro_long(-1).len(),
5276 "expected to consume tag=0 + long(-1)"
5277 );
5278 }
5279 Ok(())
5280 }
5281
5282 #[test]
5283 fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
5284 let dt = make_avro_dt(Codec::Interval, None);
5285 let s = Skipper::from_avro(&dt)?;
5286 match s {
5287 Skipper::DurationFixed12 => {}
5288 other => panic!("expected DurationFixed12, got {:?}", other),
5289 }
5290 let payload = vec![0u8; 12];
5291 let mut cursor = AvroCursor::new(&payload);
5292 s.skip(&mut cursor)?;
5293 assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
5294 Ok(())
5295 }
5296
5297 #[cfg(feature = "avro_custom_types")]
5298 #[test]
5299 fn test_run_end_encoded_width16_int32_basic_grouping() {
5300 use arrow_array::RunArray;
5301 use std::sync::Arc;
5302 let inner = avro_from_codec(Codec::Int32);
5303 let ree = AvroDataType::new(
5304 Codec::RunEndEncoded(Arc::new(inner), 16),
5305 Default::default(),
5306 None,
5307 );
5308 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5309 for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
5310 let bytes = encode_avro_int(v);
5311 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5312 }
5313 let arr = dec.flush(None).expect("flush");
5314 let ra = arr
5315 .as_any()
5316 .downcast_ref::<RunArray<Int16Type>>()
5317 .expect("RunArray<Int16Type>");
5318 assert_eq!(ra.len(), 9);
5319 assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
5320 let vals = ra
5321 .values()
5322 .as_ref()
5323 .as_any()
5324 .downcast_ref::<Int32Array>()
5325 .expect("values Int32");
5326 assert_eq!(vals.values(), &[1, 2, 3]);
5327 }
5328
5329 #[cfg(feature = "avro_custom_types")]
5330 #[test]
5331 fn test_run_end_encoded_width32_nullable_values_group_nulls() {
5332 use arrow_array::RunArray;
5333 use std::sync::Arc;
5334 let inner = AvroDataType::new(
5335 Codec::Int32,
5336 Default::default(),
5337 Some(Nullability::NullSecond),
5338 );
5339 let ree = AvroDataType::new(
5340 Codec::RunEndEncoded(Arc::new(inner), 32),
5341 Default::default(),
5342 None,
5343 );
5344 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5345 let seq: [Option<i32>; 8] = [
5346 None,
5347 None,
5348 Some(7),
5349 Some(7),
5350 Some(7),
5351 None,
5352 Some(5),
5353 Some(5),
5354 ];
5355 for item in seq {
5356 let mut bytes = Vec::new();
5357 match item {
5358 None => bytes.extend_from_slice(&encode_vlq_u64(1)),
5359 Some(v) => {
5360 bytes.extend_from_slice(&encode_vlq_u64(0));
5361 bytes.extend_from_slice(&encode_avro_int(v));
5362 }
5363 }
5364 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5365 }
5366 let arr = dec.flush(None).expect("flush");
5367 let ra = arr
5368 .as_any()
5369 .downcast_ref::<RunArray<Int32Type>>()
5370 .expect("RunArray<Int32Type>");
5371 assert_eq!(ra.len(), 8);
5372 assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
5373 let vals = ra
5374 .values()
5375 .as_ref()
5376 .as_any()
5377 .downcast_ref::<Int32Array>()
5378 .expect("values Int32 (nullable)");
5379 assert_eq!(vals.len(), 4);
5380 assert!(vals.is_null(0));
5381 assert_eq!(vals.value(1), 7);
5382 assert!(vals.is_null(2));
5383 assert_eq!(vals.value(3), 5);
5384 }
5385
5386 #[cfg(feature = "avro_custom_types")]
5387 #[test]
5388 fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
5389 use arrow_array::RunArray;
5390 let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
5391 let ree = Decoder::RunEndEncoded(
5392 8, 0,
5394 Box::new(inner_values),
5395 );
5396 let mut dec = Decoder::Nullable(
5397 NullablePlan::FromSingle {
5398 resolution: ResolutionPlan::Promotion(Promotion::IntToDouble),
5399 },
5400 NullBufferBuilder::new(DEFAULT_CAPACITY),
5401 Box::new(ree),
5402 );
5403 for v in [1, 1, 2, 2, 2] {
5404 let bytes = encode_avro_int(v);
5405 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5406 }
5407 let arr = dec.flush(None).expect("flush");
5408 let ra = arr
5409 .as_any()
5410 .downcast_ref::<RunArray<Int64Type>>()
5411 .expect("RunArray<Int64Type>");
5412 assert_eq!(ra.len(), 5);
5413 assert_eq!(ra.run_ends().values(), &[2, 5]);
5414 let vals = ra
5415 .values()
5416 .as_ref()
5417 .as_any()
5418 .downcast_ref::<Float64Array>()
5419 .expect("values Float64");
5420 assert_eq!(vals.values(), &[1.0, 2.0]);
5421 }
5422
5423 #[cfg(feature = "avro_custom_types")]
5424 #[test]
5425 fn test_run_end_encoded_unsupported_run_end_width_errors() {
5426 use std::sync::Arc;
5427 let inner = avro_from_codec(Codec::Int32);
5428 let dt = AvroDataType::new(
5429 Codec::RunEndEncoded(Arc::new(inner), 3),
5430 Default::default(),
5431 None,
5432 );
5433 let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
5434 let msg = err.to_string();
5435 assert!(
5436 msg.contains("Unsupported run-end width")
5437 && msg.contains("16/32/64 bits or 2/4/8 bytes"),
5438 "unexpected error message: {msg}"
5439 );
5440 }
5441
5442 #[cfg(feature = "avro_custom_types")]
5443 #[test]
5444 fn test_run_end_encoded_empty_input_is_empty_runarray() {
5445 use arrow_array::RunArray;
5446 use std::sync::Arc;
5447 let inner = avro_from_codec(Codec::Utf8);
5448 let dt = AvroDataType::new(
5449 Codec::RunEndEncoded(Arc::new(inner), 4),
5450 Default::default(),
5451 None,
5452 );
5453 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5454 let arr = dec.flush(None).expect("flush");
5455 let ra = arr
5456 .as_any()
5457 .downcast_ref::<RunArray<Int32Type>>()
5458 .expect("RunArray<Int32Type>");
5459 assert_eq!(ra.len(), 0);
5460 assert_eq!(ra.run_ends().len(), 0);
5461 assert_eq!(ra.values().len(), 0);
5462 }
5463
5464 #[cfg(feature = "avro_custom_types")]
5465 #[test]
5466 fn test_run_end_encoded_strings_grouping_width32_bits() {
5467 use arrow_array::RunArray;
5468 use std::sync::Arc;
5469 let inner = avro_from_codec(Codec::Utf8);
5470 let dt = AvroDataType::new(
5471 Codec::RunEndEncoded(Arc::new(inner), 32),
5472 Default::default(),
5473 None,
5474 );
5475 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5476 for s in ["a", "a", "bb", "bb", "bb", "a"] {
5477 let bytes = encode_avro_bytes(s.as_bytes());
5478 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5479 }
5480 let arr = dec.flush(None).expect("flush");
5481 let ra = arr
5482 .as_any()
5483 .downcast_ref::<RunArray<Int32Type>>()
5484 .expect("RunArray<Int32Type>");
5485 assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
5486 let vals = ra
5487 .values()
5488 .as_ref()
5489 .as_any()
5490 .downcast_ref::<StringArray>()
5491 .expect("values String");
5492 assert_eq!(vals.len(), 3);
5493 assert_eq!(vals.value(0), "a");
5494 assert_eq!(vals.value(1), "bb");
5495 assert_eq!(vals.value(2), "a");
5496 }
5497
5498 #[cfg(not(feature = "avro_custom_types"))]
5499 #[test]
5500 fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
5501 let dt = avro_from_codec(Codec::Int32);
5502 let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
5503 for v in [1, 2, 3] {
5504 let bytes = encode_avro_int(v);
5505 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5506 }
5507 let arr = dec.flush(None).expect("flush");
5508 let a = arr
5509 .as_any()
5510 .downcast_ref::<Int32Array>()
5511 .expect("Int32Array");
5512 assert_eq!(a.values(), &[1, 2, 3]);
5513 }
5514
5515 #[test]
5516 fn test_timestamp_nanos_decoding_offset_zero() {
5517 let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::OffsetZero)));
5518 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5519 let mut data = Vec::new();
5520 for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5521 data.extend_from_slice(&encode_avro_long(v));
5522 }
5523 let mut cur = AvroCursor::new(&data);
5524 for _ in 0..4 {
5525 decoder.decode(&mut cur).expect("decode nanos ts");
5526 }
5527 let array = decoder.flush(None).expect("flush nanos ts");
5528 let ts = array
5529 .as_any()
5530 .downcast_ref::<TimestampNanosecondArray>()
5531 .expect("TimestampNanosecondArray");
5532 assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5533 match ts.data_type() {
5534 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5535 assert_eq!(tz.as_deref(), Some("+00:00"));
5536 }
5537 other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
5538 }
5539 }
5540
5541 #[test]
5542 fn test_timestamp_nanos_decoding_utc() {
5543 let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::Utc)));
5544 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5545 let mut data = Vec::new();
5546 for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5547 data.extend_from_slice(&encode_avro_long(v));
5548 }
5549 let mut cur = AvroCursor::new(&data);
5550 for _ in 0..4 {
5551 decoder.decode(&mut cur).expect("decode nanos ts");
5552 }
5553 let array = decoder.flush(None).expect("flush nanos ts");
5554 let ts = array
5555 .as_any()
5556 .downcast_ref::<TimestampNanosecondArray>()
5557 .expect("TimestampNanosecondArray");
5558 assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5559 match ts.data_type() {
5560 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5561 assert_eq!(tz.as_deref(), Some("UTC"));
5562 }
5563 other => panic!("expected Timestamp(Nanosecond, Some(\"UTC\")), got {other:?}"),
5564 }
5565 }
5566
5567 #[test]
5568 fn test_timestamp_nanos_decoding_local() {
5569 let avro_type = avro_from_codec(Codec::TimestampNanos(None));
5570 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5571 let mut data = Vec::new();
5572 for v in [10_i64, 20_i64, -30_i64] {
5573 data.extend_from_slice(&encode_avro_long(v));
5574 }
5575 let mut cur = AvroCursor::new(&data);
5576 for _ in 0..3 {
5577 decoder.decode(&mut cur).expect("decode nanos ts");
5578 }
5579 let array = decoder.flush(None).expect("flush nanos ts");
5580 let ts = array
5581 .as_any()
5582 .downcast_ref::<TimestampNanosecondArray>()
5583 .expect("TimestampNanosecondArray");
5584 assert_eq!(ts.values(), &[10, 20, -30]);
5585 match ts.data_type() {
5586 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5587 assert_eq!(tz.as_deref(), None);
5588 }
5589 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5590 }
5591 }
5592
5593 #[test]
5594 fn test_timestamp_nanos_decoding_with_nulls() {
5595 let avro_type = AvroDataType::new(
5596 Codec::TimestampNanos(None),
5597 Default::default(),
5598 Some(Nullability::NullFirst),
5599 );
5600 let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
5601 let mut data = Vec::new();
5602 data.extend_from_slice(&encode_avro_long(1));
5603 data.extend_from_slice(&encode_avro_long(42));
5604 data.extend_from_slice(&encode_avro_long(0));
5605 data.extend_from_slice(&encode_avro_long(1));
5606 data.extend_from_slice(&encode_avro_long(-7));
5607 let mut cur = AvroCursor::new(&data);
5608 for _ in 0..3 {
5609 decoder.decode(&mut cur).expect("decode nullable nanos ts");
5610 }
5611 let array = decoder.flush(None).expect("flush nullable nanos ts");
5612 let ts = array
5613 .as_any()
5614 .downcast_ref::<TimestampNanosecondArray>()
5615 .expect("TimestampNanosecondArray");
5616 assert_eq!(ts.len(), 3);
5617 assert!(ts.is_valid(0));
5618 assert!(ts.is_null(1));
5619 assert!(ts.is_valid(2));
5620 assert_eq!(ts.value(0), 42);
5621 assert_eq!(ts.value(2), -7);
5622 match ts.data_type() {
5623 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5624 assert_eq!(tz.as_deref(), None);
5625 }
5626 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5627 }
5628 }
5629}